Skip to content
Snippets Groups Projects
cdr.c 140 KiB
Newer Older
  • Learn to ignore specific revisions
  • 	cdr = cdr_object_get_by_name(channel_name);
    
    	if (!cdr) {
    		ast_cli(a->fd, "Unknown channel: %s\n", channel_name);
    		return;
    	}
    
    	ast_cli(a->fd, "\n");
    	ast_cli(a->fd, "Call Detail Record (CDR) Information for %s\n", channel_name);
    	ast_cli(a->fd, "--------------------------------------------------\n");
    	ast_cli(a->fd, TITLE_STRING, "AccountCode", "CallerID", "Dst. Channel", "LastApp", "Data", "Start", "Answer", "End", "Billsec", "Duration");
    
    	ao2_lock(cdr);
    	for (it_cdr = cdr; it_cdr; it_cdr = it_cdr->next) {
    		struct timeval end;
    
    		if (snapshot_is_dialed(it_cdr->party_a.snapshot)) {
    			continue;
    		}
    		ast_callerid_merge(clid, sizeof(clid), it_cdr->party_a.snapshot->caller_name, it_cdr->party_a.snapshot->caller_number, "");
    		if (ast_tvzero(it_cdr->end)) {
    			end = ast_tvnow();
    		} else {
    			end = it_cdr->end;
    		}
    		cdr_get_tv(it_cdr->start, "%T", start_time_buffer, sizeof(start_time_buffer));
    		cdr_get_tv(it_cdr->answer, "%T", answer_time_buffer, sizeof(answer_time_buffer));
    		cdr_get_tv(end, "%T", end_time_buffer, sizeof(end_time_buffer));
    		ast_cli(a->fd, FORMAT_STRING,
    				it_cdr->party_a.snapshot->accountcode,
    				clid,
    				it_cdr->party_b.snapshot ? it_cdr->party_b.snapshot->name : "<none>",
    				it_cdr->appl,
    				it_cdr->data,
    				start_time_buffer,
    				answer_time_buffer,
    				end_time_buffer,
    				(long)ast_tvdiff_ms(end, it_cdr->answer) / 1000,
    				(long)ast_tvdiff_ms(end, it_cdr->start) / 1000);
    	}
    	ao2_unlock(cdr);
    
    #undef FORMAT_STRING
    #undef TITLE_STRING
    }
    
    static char *handle_cli_show(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
    {
    	switch (cmd) {
    	case CLI_INIT:
    			e->command = "cdr show active";
    			e->usage =
    				"Usage: cdr show active [channel]\n"
    				"	Displays a summary of all Call Detail Records when [channel]\n"
    				"	is omitted; displays all of the Call Detail Records\n"
    				"	currently in flight for a given [channel] when [channel] is\n"
    				"	specified.\n\n"
    				"	Note that this will not display Call Detail Records that\n"
    				"	have already been dispatched to a backend storage, nor for\n"
    				"	channels that are no longer active.\n";
    			return NULL;
    	case CLI_GENERATE:
    		return cli_complete_show(a);
    	}
    
    	if (a->argc > 4) {
    		return CLI_SHOWUSAGE;
    	} else if (a->argc < 4) {
    		cli_show_channels(a);
    	} else {
    		cli_show_channel(a);
    	}
    
    	return CLI_SUCCESS;
    }
    
    
    static char *handle_cli_status(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
    
    	struct cdr_beitem *beitem = NULL;
    
    	struct module_config *mod_cfg;
    
    	int cnt = 0;
    	long nextbatchtime = 0;
    
    		e->command = "cdr show status";
    
    			"Usage: cdr show status\n"
    
    			"	Displays the Call Detail Record engine system status.\n";
    		return NULL;
    	case CLI_GENERATE:
    		return NULL;
    	}
    
    	mod_cfg = ao2_global_obj_ref(module_configs);
    	if (!mod_cfg) {
    		return CLI_FAILURE;
    	}
    
    
    	ast_cli(a->fd, "\n");
    	ast_cli(a->fd, "Call Detail Record (CDR) settings\n");
    	ast_cli(a->fd, "----------------------------------\n");
    
    	ast_cli(a->fd, "  Logging:                    %s\n", ast_test_flag(&mod_cfg->general->settings, CDR_ENABLED) ? "Enabled" : "Disabled");
    	ast_cli(a->fd, "  Mode:                       %s\n", ast_test_flag(&mod_cfg->general->settings, CDR_BATCHMODE) ? "Batch" : "Simple");
    	if (ast_test_flag(&mod_cfg->general->settings, CDR_ENABLED)) {
    		ast_cli(a->fd, "  Log unanswered calls:       %s\n", ast_test_flag(&mod_cfg->general->settings, CDR_UNANSWERED) ? "Yes" : "No");
    		ast_cli(a->fd, "  Log congestion:             %s\n\n", ast_test_flag(&mod_cfg->general->settings, CDR_CONGESTION) ? "Yes" : "No");
    		if (ast_test_flag(&mod_cfg->general->settings, CDR_BATCHMODE)) {
    
    			ast_cli(a->fd, "* Batch Mode Settings\n");
    			ast_cli(a->fd, "  -------------------\n");
    
    			if (batch)
    				cnt = batch->size;
    			if (cdr_sched > -1)
    				nextbatchtime = ast_sched_when(sched, cdr_sched);
    
    			ast_cli(a->fd, "  Safe shutdown:              %s\n", ast_test_flag(&mod_cfg->general->batch_settings.settings, BATCH_MODE_SAFE_SHUTDOWN) ? "Enabled" : "Disabled");
    			ast_cli(a->fd, "  Threading model:            %s\n", ast_test_flag(&mod_cfg->general->batch_settings.settings, BATCH_MODE_SCHEDULER_ONLY) ? "Scheduler only" : "Scheduler plus separate threads");
    
    			ast_cli(a->fd, "  Current batch size:         %d record%s\n", cnt, ESS(cnt));
    
    			ast_cli(a->fd, "  Maximum batch size:         %u record%s\n", mod_cfg->general->batch_settings.size, ESS(mod_cfg->general->batch_settings.size));
    			ast_cli(a->fd, "  Maximum batch time:         %u second%s\n", mod_cfg->general->batch_settings.time, ESS(mod_cfg->general->batch_settings.time));
    
    			ast_cli(a->fd, "  Next batch processing time: %ld second%s\n\n", nextbatchtime, ESS(nextbatchtime));
    
    		ast_cli(a->fd, "* Registered Backends\n");
    		ast_cli(a->fd, "  -------------------\n");
    
    		AST_RWLIST_RDLOCK(&be_list);
    
    		if (AST_RWLIST_EMPTY(&be_list)) {
    			ast_cli(a->fd, "    (none)\n");
    		} else {
    			AST_RWLIST_TRAVERSE(&be_list, beitem, list) {
    
    				ast_cli(a->fd, "    %s%s\n", beitem->name, beitem->suspended ? " (suspended) " : "");
    
    		AST_RWLIST_UNLOCK(&be_list);
    
    		ast_cli(a->fd, "\n");
    
    static char *handle_cli_submit(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
    
    	struct module_config *mod_cfg;
    
    	switch (cmd) {
    	case CLI_INIT:
    		e->command = "cdr submit";
    
    			"Posts all pending batched CDR data to the configured CDR\n"
    			"backend engine modules.\n";
    
    	mod_cfg = ao2_global_obj_ref(module_configs);
    	if (!mod_cfg) {
    		return CLI_FAILURE;
    
    	if (!ast_test_flag(&mod_cfg->general->settings, CDR_ENABLED)) {
    		ast_cli(a->fd, "Cannot submit CDR batch: CDR engine disabled.\n");
    	} else if (ast_test_flag(&mod_cfg->general->settings, CDR_BATCHMODE)) {
    
    		ast_cli(a->fd, "Cannot submit CDR batch: batch mode not enabled.\n");
    
    	} else {
    		submit_unscheduled_batch();
    		ast_cli(a->fd, "Submitted CDRs to backend engines for processing.  This may take a while.\n");
    
    static struct ast_cli_entry cli_commands[] = {
    	AST_CLI_DEFINE(handle_cli_submit, "Posts all pending batched CDR data"),
    	AST_CLI_DEFINE(handle_cli_status, "Display the CDR status"),
    	AST_CLI_DEFINE(handle_cli_show, "Display active CDRs for channels"),
    	AST_CLI_DEFINE(handle_cli_debug, "Enable debugging in the CDR engine"),
    };
    
    
    /*!
     * \brief This dispatches *all* \ref cdr_objects. It should only be used during
     * shutdown, so that we get billing records for everything that we can.
     */
    static int cdr_object_dispatch_all_cb(void *obj, void *arg, int flags)
    
    	struct cdr_object *cdr = obj;
    	struct cdr_object *it_cdr;
    
    Matthew Jordan's avatar
    Matthew Jordan committed
    	ao2_lock(cdr);
    
    	for (it_cdr = cdr; it_cdr; it_cdr = it_cdr->next) {
    		cdr_object_transition_state(it_cdr, &finalized_state_fn_table);
    
    	cdr_object_dispatch(cdr);
    
    Matthew Jordan's avatar
    Matthew Jordan committed
    	ao2_unlock(cdr);
    
    	cdr_all_unlink(cdr);
    
    	return CMP_MATCH;
    
    static void finalize_batch_mode(void)
    {
    	if (cdr_thread == AST_PTHREADT_NULL) {
    
    	/* wake up the thread so it will exit */
    	pthread_cancel(cdr_thread);
    	pthread_kill(cdr_thread, SIGURG);
    	pthread_join(cdr_thread, NULL);
    	cdr_thread = AST_PTHREADT_NULL;
    	ast_cond_destroy(&cdr_pending_cond);
    	ast_cdr_engine_term();
    }
    
    struct stasis_message_router *ast_cdr_message_router(void)
    {
    	if (!stasis_router) {
    		return NULL;
    	}
    
    	ao2_bump(stasis_router);
    	return stasis_router;
    }
    
    
     * \brief Destroy the active Stasis subscriptions
    
     */
    static void destroy_subscriptions(void)
    {
    	channel_subscription = stasis_forward_cancel(channel_subscription);
    	bridge_subscription = stasis_forward_cancel(bridge_subscription);
    	parking_subscription = stasis_forward_cancel(parking_subscription);
    }
    
    /*!
     * \brief Create the Stasis subcriptions for CDRs
     */
    static int create_subscriptions(void)
    {
    	if (!cdr_topic) {
    		return -1;
    	}
    
    
    	if (channel_subscription || bridge_subscription || parking_subscription) {
    		return 0;
    	}
    
    
    	channel_subscription = stasis_forward_all(ast_channel_topic_all_cached(), cdr_topic);
    	if (!channel_subscription) {
    		return -1;
    	}
    	bridge_subscription = stasis_forward_all(ast_bridge_topic_all_cached(), cdr_topic);
    	if (!bridge_subscription) {
    		return -1;
    	}
    	parking_subscription = stasis_forward_all(ast_parking_topic(), cdr_topic);
    	if (!parking_subscription) {
    		return -1;
    	}
    
    	return 0;
    }
    
    
    static int process_config(int reload)
    {
    	if (!reload) {
    		if (aco_info_init(&cfg_info)) {
    			return 1;
    
    		aco_option_register(&cfg_info, "enable", ACO_EXACT, general_options, DEFAULT_ENABLED, OPT_BOOLFLAG_T, 1, FLDSET(struct ast_cdr_config, settings), CDR_ENABLED);
    		aco_option_register(&cfg_info, "debug", ACO_EXACT, general_options, 0, OPT_BOOLFLAG_T, 1, FLDSET(struct ast_cdr_config, settings), CDR_DEBUG);
    		aco_option_register(&cfg_info, "unanswered", ACO_EXACT, general_options, DEFAULT_UNANSWERED, OPT_BOOLFLAG_T, 1, FLDSET(struct ast_cdr_config, settings), CDR_UNANSWERED);
    		aco_option_register(&cfg_info, "congestion", ACO_EXACT, general_options, 0, OPT_BOOLFLAG_T, 1, FLDSET(struct ast_cdr_config, settings), CDR_CONGESTION);
    		aco_option_register(&cfg_info, "batch", ACO_EXACT, general_options, DEFAULT_BATCHMODE, OPT_BOOLFLAG_T, 1, FLDSET(struct ast_cdr_config, settings), CDR_BATCHMODE);
    		aco_option_register(&cfg_info, "endbeforehexten", ACO_EXACT, general_options, DEFAULT_END_BEFORE_H_EXTEN, OPT_BOOLFLAG_T, 1, FLDSET(struct ast_cdr_config, settings), CDR_END_BEFORE_H_EXTEN);
    		aco_option_register(&cfg_info, "initiatedseconds", ACO_EXACT, general_options, DEFAULT_INITIATED_SECONDS, OPT_BOOLFLAG_T, 1, FLDSET(struct ast_cdr_config, settings), CDR_INITIATED_SECONDS);
    		aco_option_register(&cfg_info, "scheduleronly", ACO_EXACT, general_options, DEFAULT_BATCH_SCHEDULER_ONLY, OPT_BOOLFLAG_T, 1, FLDSET(struct ast_cdr_config, batch_settings.settings), BATCH_MODE_SCHEDULER_ONLY);
    		aco_option_register(&cfg_info, "safeshutdown", ACO_EXACT, general_options, DEFAULT_BATCH_SAFE_SHUTDOWN, OPT_BOOLFLAG_T, 1, FLDSET(struct ast_cdr_config, batch_settings.settings), BATCH_MODE_SAFE_SHUTDOWN);
    		aco_option_register(&cfg_info, "size", ACO_EXACT, general_options, DEFAULT_BATCH_SIZE, OPT_UINT_T, PARSE_IN_RANGE, FLDSET(struct ast_cdr_config, batch_settings.size), 0, MAX_BATCH_SIZE);
    		aco_option_register(&cfg_info, "time", ACO_EXACT, general_options, DEFAULT_BATCH_TIME, OPT_UINT_T, PARSE_IN_RANGE, FLDSET(struct ast_cdr_config, batch_settings.time), 0, MAX_BATCH_TIME);
    	}
    
    
    	if (aco_process_config(&cfg_info, reload) == ACO_PROCESS_ERROR) {
    		struct module_config *mod_cfg;
    
    		if (reload) {
    
    		/* If we couldn't process the configuration and this wasn't a reload,
    		 * create a default config
    		 */
    
    		mod_cfg = module_config_alloc();
    		if (!mod_cfg
    			|| aco_set_defaults(&general_option, "general", mod_cfg->general)) {
    			ao2_cleanup(mod_cfg);
    			return 1;
    
    		ast_log(LOG_NOTICE, "Failed to process CDR configuration; using defaults\n");
    		ao2_global_obj_replace_unref(module_configs, mod_cfg);
    		cdr_set_debug_mode(mod_cfg);
    		ao2_cleanup(mod_cfg);
    
    Matthew Jordan's avatar
    Matthew Jordan committed
    static void cdr_engine_cleanup(void)
    {
    
    	destroy_subscriptions();
    
    static void cdr_engine_shutdown(void)
    {
    
    	stasis_message_router_unsubscribe_and_join(stasis_router);
    	stasis_router = NULL;
    
    	ao2_cleanup(cdr_topic);
    	cdr_topic = NULL;
    
    
    	STASIS_MESSAGE_TYPE_CLEANUP(cdr_sync_message_type);
    
    
    	ao2_callback(active_cdrs_master, OBJ_NODATA | OBJ_MULTIPLE | OBJ_UNLINK,
    
    		cdr_object_dispatch_all_cb, NULL);
    
    	ast_cli_unregister_multiple(cli_commands, ARRAY_LEN(cli_commands));
    
    	ast_sched_context_destroy(sched);
    	sched = NULL;
    	ast_free(batch);
    	batch = NULL;
    
    	aco_info_destroy(&cfg_info);
    	ao2_global_obj_release(module_configs);
    
    
    	ao2_container_unregister("cdrs_master");
    	ao2_cleanup(active_cdrs_master);
    	active_cdrs_master = NULL;
    
    
    	ao2_container_unregister("cdrs_all");
    	ao2_cleanup(active_cdrs_all);
    	active_cdrs_all = NULL;
    
    }
    
    static void cdr_enable_batch_mode(struct ast_cdr_config *config)
    {
    	SCOPED_LOCK(batch, &cdr_batch_lock, ast_mutex_lock, ast_mutex_unlock);
    
    	/* Only create the thread level portions once */
    	if (cdr_thread == AST_PTHREADT_NULL) {
    		ast_cond_init(&cdr_pending_cond, NULL);
    		if (ast_pthread_create_background(&cdr_thread, NULL, do_cdr, NULL) < 0) {
    			ast_log(LOG_ERROR, "Unable to start CDR thread.\n");
    			return;
    		}
    	}
    
    	/* Kill the currently scheduled item */
    	AST_SCHED_DEL(sched, cdr_sched);
    	cdr_sched = ast_sched_add(sched, config->batch_settings.time * 1000, submit_scheduled_batch, NULL);
    
    	ast_log(LOG_NOTICE, "CDR batch mode logging enabled, first of either size %u or time %u seconds.\n",
    
    			config->batch_settings.size, config->batch_settings.time);
    
     * \brief Print master CDR container object.
    
     * \param v_obj A pointer to the object we want printed.
    
     * \param where User data needed by prnt to determine where to put output.
     * \param prnt Print output callback function to use.
     *
     * \return Nothing
     */
    
    static void cdr_master_print_fn(void *v_obj, void *where, ao2_prnt_fn *prnt)
    
    {
    	struct cdr_object *cdr = v_obj;
    	struct cdr_object *it_cdr;
    
    	if (!cdr) {
    		return;
    	}
    	for (it_cdr = cdr; it_cdr; it_cdr = it_cdr->next) {
    
    		prnt(where, "Party A: %s; Party B: %s; Bridge %s\n",
    			it_cdr->party_a.snapshot->name,
    			it_cdr->party_b.snapshot ? it_cdr->party_b.snapshot->name : "<unknown>",
    			it_cdr->bridge);
    
    /*!
     * \internal
     * \brief Print all CDR container object.
     * \since 13.19.0
     *
     * \param v_obj A pointer to the object we want printed.
     * \param where User data needed by prnt to determine where to put output.
     * \param prnt Print output callback function to use.
     *
     * \return Nothing
     */
    static void cdr_all_print_fn(void *v_obj, void *where, ao2_prnt_fn *prnt)
    {
    	struct cdr_object *cdr = v_obj;
    
    	if (!cdr) {
    		return;
    	}
    	prnt(where, "Party A: %s; Party B: %s; Bridge %s",
    		cdr->party_a.snapshot->name,
    		cdr->party_b.snapshot ? cdr->party_b.snapshot->name : "<unknown>",
    		cdr->bridge);
    }
    
    
    /*!
     * \brief Checks if CDRs are enabled and enables/disables the necessary options
     */
    static int cdr_toggle_runtime_options(void)
    
    	struct module_config *mod_cfg;
    
    	mod_cfg = ao2_global_obj_ref(module_configs);
    	if (mod_cfg
    		&& ast_test_flag(&mod_cfg->general->settings, CDR_ENABLED)) {
    
    		if (create_subscriptions()) {
    			destroy_subscriptions();
    			ast_log(AST_LOG_ERROR, "Failed to create Stasis subscriptions\n");
    
    			return -1;
    		}
    		if (ast_test_flag(&mod_cfg->general->settings, CDR_BATCHMODE)) {
    			cdr_enable_batch_mode(mod_cfg->general);
    		} else {
    			ast_log(LOG_NOTICE, "CDR simple logging enabled.\n");
    		}
    	} else {
    		destroy_subscriptions();
    		ast_log(LOG_NOTICE, "CDR logging disabled.\n");
    	}
    
    }
    
    int ast_cdr_engine_init(void)
    {
    
    	cdr_topic = stasis_topic_create("cdr_engine");
    	if (!cdr_topic) {
    		return -1;
    	}
    
    	stasis_router = stasis_message_router_create(cdr_topic);
    	if (!stasis_router) {
    		return -1;
    	}
    
    	stasis_message_router_set_congestion_limits(stasis_router, -1,
    		10 * AST_TASKPROCESSOR_HIGH_WATER_LEVEL);
    
    
    	if (STASIS_MESSAGE_TYPE_INIT(cdr_sync_message_type)) {
    		return -1;
    	}
    
    
    	stasis_message_router_add_cache_update(stasis_router, ast_channel_snapshot_type(), handle_channel_cache_message, NULL);
    	stasis_message_router_add(stasis_router, ast_channel_dial_type(), handle_dial_message, NULL);
    	stasis_message_router_add(stasis_router, ast_channel_entered_bridge_type(), handle_bridge_enter_message, NULL);
    	stasis_message_router_add(stasis_router, ast_channel_left_bridge_type(), handle_bridge_leave_message, NULL);
    	stasis_message_router_add(stasis_router, ast_parked_call_type(), handle_parked_call_message, NULL);
    
    	stasis_message_router_add(stasis_router, cdr_sync_message_type(), handle_cdr_sync_message, NULL);
    
    	active_cdrs_master = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
    		NUM_CDR_BUCKETS, cdr_master_hash_fn, NULL, cdr_master_cmp_fn);
    	if (!active_cdrs_master) {
    
    	ao2_container_register("cdrs_master", active_cdrs_master, cdr_master_print_fn);
    
    
    	active_cdrs_all = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
    		NUM_CDR_BUCKETS, cdr_all_hash_fn, NULL, cdr_all_cmp_fn);
    	if (!active_cdrs_all) {
    		return -1;
    	}
    	ao2_container_register("cdrs_all", active_cdrs_all, cdr_all_print_fn);
    
    	sched = ast_sched_context_create();
    
    	if (!sched) {
    		ast_log(LOG_ERROR, "Unable to create schedule context.\n");
    		return -1;
    	}
    
    
    	ast_cli_register_multiple(cli_commands, ARRAY_LEN(cli_commands));
    
    Matthew Jordan's avatar
    Matthew Jordan committed
    	ast_register_cleanup(cdr_engine_cleanup);
    
    	ast_register_atexit(cdr_engine_shutdown);
    
    	return cdr_toggle_runtime_options();
    
    	RAII_VAR(struct module_config *, mod_cfg, ao2_global_obj_ref(module_configs), ao2_cleanup);
    
    	/* Since this is called explicitly during process shutdown, we might not have ever
    	 * been initialized. If so, the config object will be NULL.
    	 */
    	if (!mod_cfg) {
    		return;
    	}
    
    	if (cdr_sync_message_type()) {
    
    		void *payload;
    		struct stasis_message *message;
    
    		if (!stasis_router) {
    			return;
    		}
    
    
    		/* Make sure we have the needed items */
    		payload = ao2_alloc(sizeof(*payload), NULL);
    
    		ast_debug(1, "CDR Engine termination request received; waiting on messages...\n");
    
    		message = stasis_message_create(cdr_sync_message_type(), payload);
    		if (message) {
    			stasis_message_router_publish_sync(stasis_router, message);
    		}
    
    		ao2_cleanup(message);
    		ao2_cleanup(payload);
    
    	}
    
    	if (ast_test_flag(&mod_cfg->general->settings, CDR_BATCHMODE)) {
    		cdr_submit_batch(ast_test_flag(&mod_cfg->general->batch_settings.settings, BATCH_MODE_SAFE_SHUTDOWN));
    	}
    
    int ast_cdr_engine_reload(void)
    
    	struct module_config *old_mod_cfg;
    	struct module_config *mod_cfg;
    
    	old_mod_cfg = ao2_global_obj_ref(module_configs);
    
    	if (!old_mod_cfg || process_config(1)) {
    
    		ao2_cleanup(old_mod_cfg);
    
    	mod_cfg = ao2_global_obj_ref(module_configs);
    
    	if (!mod_cfg
    		|| !ast_test_flag(&mod_cfg->general->settings, CDR_ENABLED)
    		|| !ast_test_flag(&mod_cfg->general->settings, CDR_BATCHMODE)) {
    
    		/* If batch mode used to be enabled, finalize the batch */
    		if (ast_test_flag(&old_mod_cfg->general->settings, CDR_BATCHMODE)) {
    			finalize_batch_mode();
    
    	ao2_cleanup(old_mod_cfg);
    
    	return cdr_toggle_runtime_options();