Newer
Older
/*
* Asterisk -- A telephony toolkit for Linux.
*
* Call Detail Record API
*
* Copyright (C) 1999 - 2005, Digium, Inc.
* Mark Spencer <markster@digium.com>
*
* This program is free software, distributed under the terms of
* the GNU General Public License.
*
* Includes code and algorithms from the Zapata library.
*
*/
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <signal.h>
Kevin P. Fleming
committed
#include "asterisk/lock.h"
#include "asterisk/channel.h"
#include "asterisk/cdr.h"
#include "asterisk/logger.h"
#include "asterisk/callerid.h"
#include "asterisk/causes.h"
#include "asterisk/options.h"
Kevin P. Fleming
committed
#include "asterisk/utils.h"
#include "asterisk/sched.h"
#include "asterisk/config.h"
#include "asterisk/cli.h"
#include "asterisk/module.h"
int ast_default_amaflags = AST_CDR_DOCUMENTATION;
char ast_default_accountcode[AST_MAX_ACCOUNT_CODE] = "";
struct ast_cdr_batch_item {
struct ast_cdr *cdr;
struct ast_cdr_batch_item *next;
};
static struct ast_cdr_batch {
int size;
struct ast_cdr_batch_item *head;
struct ast_cdr_batch_item *tail;
} *batch = NULL;
static struct sched_context *sched;
static int cdr_sched = -1;
static pthread_t cdr_thread = AST_PTHREADT_NULL;
#define BATCH_SIZE_DEFAULT 100
#define BATCH_TIME_DEFAULT 300
#define BATCH_SCHEDULER_ONLY_DEFAULT 0
#define BATCH_SAFE_SHUTDOWN_DEFAULT 1
static int enabled;
static int batchmode;
static int batchsize;
static int batchtime;
static int batchscheduleronly;
static int batchsafeshutdown;
AST_MUTEX_DEFINE_STATIC(cdr_batch_lock);
/* these are used to wake up the CDR thread when there's work to do */
AST_MUTEX_DEFINE_STATIC(cdr_pending_lock);
pthread_cond_t cdr_pending_cond;
/*
* We do a lot of checking here in the CDR code to try to be sure we don't ever let a CDR slip
* through our fingers somehow. If someone allocates a CDR, it must be completely handled normally
* or a WARNING shall be logged, so that we can best keep track of any escape condition where the CDR
* isn't properly generated and posted.
*/
int ast_cdr_register(char *name, char *desc, ast_cdrbe be)
{
struct ast_cdr_beitem *i;
if (!name)
return -1;
if (!be) {
ast_log(LOG_WARNING, "CDR engine '%s' lacks backend\n", name);
return -1;
}
AST_LIST_LOCK(&be_list);
AST_LIST_TRAVERSE(&be_list, i, list) {
if (i) {
ast_log(LOG_WARNING, "Already have a CDR backend called '%s'\n", name);
return -1;
}
ast_copy_string(i->name, name, sizeof(i->name));
ast_copy_string(i->desc, desc, sizeof(i->desc));
AST_LIST_LOCK(&be_list);
AST_LIST_INSERT_HEAD(&be_list, i, list);
AST_LIST_UNLOCK(&be_list);
return 0;
}
void ast_cdr_unregister(char *name)
{
struct ast_cdr_beitem *i = NULL;
AST_LIST_LOCK(&be_list);
AST_LIST_TRAVERSE_SAFE_BEGIN(&be_list, i, list) {
AST_LIST_REMOVE_CURRENT(&be_list, list);
if (option_verbose > 1)
ast_verbose(VERBOSE_PREFIX_2 "Unregistered '%s' CDR backend\n", name);
free(i);
AST_LIST_TRAVERSE_SAFE_END;
AST_LIST_UNLOCK(&be_list);
static const char *ast_cdr_getvar_internal(struct ast_cdr *cdr, const char *name, int recur)
{
struct ast_var_t *variables;
struct varshead *headp;
if (!name || ast_strlen_zero(name))
return NULL;
while (cdr) {
AST_LIST_TRAVERSE(headp, variables, entries) {
if (!strcasecmp(name, ast_var_name(variables)))
return ast_var_value(variables);
return NULL;
}
void ast_cdr_getvar(struct ast_cdr *cdr, const char *name, char **ret, char *workspace, int workspacelen, int recur)
{
struct tm tm;
time_t t;
const char *fmt = "%Y-%m-%d %T";
*ret = NULL;
/* special vars (the ones from the struct ast_cdr when requested by name)
I'd almost say we should convert all the stringed vals to vars */
if (!strcasecmp(name, "clid"))
ast_copy_string(workspace, cdr->clid, workspacelen);
else if (!strcasecmp(name, "src"))
ast_copy_string(workspace, cdr->src, workspacelen);
else if (!strcasecmp(name, "dst"))
ast_copy_string(workspace, cdr->dst, workspacelen);
else if (!strcasecmp(name, "dcontext"))
ast_copy_string(workspace, cdr->dcontext, workspacelen);
else if (!strcasecmp(name, "channel"))
ast_copy_string(workspace, cdr->channel, workspacelen);
else if (!strcasecmp(name, "dstchannel"))
ast_copy_string(workspace, cdr->dstchannel, workspacelen);
else if (!strcasecmp(name, "lastapp"))
ast_copy_string(workspace, cdr->lastapp, workspacelen);
else if (!strcasecmp(name, "lastdata"))
ast_copy_string(workspace, cdr->lastdata, workspacelen);
else if (!strcasecmp(name, "start")) {
t = cdr->start.tv_sec;
if (t) {
strftime(workspace, workspacelen, fmt, &tm);
}
} else if (!strcasecmp(name, "answer")) {
t = cdr->start.tv_sec;
if (t) {
strftime(workspace, workspacelen, fmt, &tm);
}
} else if (!strcasecmp(name, "end")) {
t = cdr->start.tv_sec;
if (t) {
strftime(workspace, workspacelen, fmt, &tm);
}
snprintf(workspace, workspacelen, "%d", cdr->duration);
snprintf(workspace, workspacelen, "%d", cdr->billsec);
else if (!strcasecmp(name, "disposition"))
ast_copy_string(workspace, ast_cdr_disp2str(cdr->disposition), workspacelen);
else if (!strcasecmp(name, "amaflags"))
ast_copy_string(workspace, ast_cdr_flags2str(cdr->amaflags), workspacelen);
else if (!strcasecmp(name, "accountcode"))
ast_copy_string(workspace, cdr->accountcode, workspacelen);
else if (!strcasecmp(name, "uniqueid"))
ast_copy_string(workspace, cdr->uniqueid, workspacelen);
else if (!strcasecmp(name, "userfield"))
ast_copy_string(workspace, cdr->userfield, workspacelen);
else if ((varbuf = ast_cdr_getvar_internal(cdr, name, recur)))
ast_copy_string(workspace, varbuf, workspacelen);
if (!ast_strlen_zero(workspace))
int ast_cdr_setvar(struct ast_cdr *cdr, const char *name, const char *value, int recur)
{
struct ast_var_t *newvariable;
Anthony Minessale II
committed
const char *read_only[] = { "clid", "src", "dst", "dcontext", "channel", "dstchannel",
"lastapp", "lastdata", "start", "answer", "end", "duration",
"billsec", "disposition", "amaflags", "accountcode", "uniqueid",
"userfield", NULL };
Anthony Minessale II
committed
int x;
Anthony Minessale II
committed
for(x = 0; read_only[x]; x++) {
if (!strcasecmp(name, read_only[x])) {
ast_log(LOG_ERROR, "Attempt to set a read-only variable!.\n");
return -1;
}
}
ast_log(LOG_ERROR, "Attempt to set a variable on a nonexistent CDR record.\n");
while (cdr) {
headp = &cdr->varshead;
AST_LIST_TRAVERSE_SAFE_BEGIN(headp, newvariable, entries) {
if (!strcasecmp(ast_var_name(newvariable), name)) {
/* there is already such a variable, delete it */
ast_var_delete(newvariable);
break;
}
}
if (value) {
newvariable = ast_var_assign(name, value);
AST_LIST_INSERT_HEAD(headp, newvariable, entries);
}
if (!recur) {
break;
}
return 0;
}
int ast_cdr_copy_vars(struct ast_cdr *to_cdr, struct ast_cdr *from_cdr)
{
struct ast_var_t *variables, *newvariable = NULL;
char *var, *val;
int x = 0;
headpa = &from_cdr->varshead;
headpb = &to_cdr->varshead;
AST_LIST_TRAVERSE(headpa,variables,entries) {
if (variables &&
(var = ast_var_name(variables)) && (val = ast_var_value(variables)) &&
!ast_strlen_zero(var) && !ast_strlen_zero(val)) {
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
newvariable = ast_var_assign(var, val);
AST_LIST_INSERT_HEAD(headpb, newvariable, entries);
x++;
}
}
return x;
}
#define CDR_CLEN 18
int ast_cdr_serialize_variables(struct ast_cdr *cdr, char *buf, size_t size, char delim, char sep, int recur)
{
struct ast_var_t *variables;
struct varshead *headp;
char *var=NULL ,*val=NULL;
char *tmp = NULL;
char workspace[256];
int workspacelen;
int total = 0, x = 0, i = 0;
const char *cdrcols[CDR_CLEN] = {
"clid",
"src",
"dst",
"dcontext",
"channel",
"dstchannel",
"lastapp",
"lastdata",
"start",
"answer",
"end",
"duration",
"billsec",
"disposition",
"amaflags",
"accountcode",
"uniqueid",
"userfield"
};
memset(buf,0,size);
while (cdr) {
x++;
if (x > 1) {
strncat(buf, "\n", size);
}
AST_LIST_TRAVERSE(headp,variables,entries) {
if (cdr && variables &&
(var = ast_var_name(variables)) && (val = ast_var_value(variables)) &&
!ast_strlen_zero(var) && !ast_strlen_zero(val)) {
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
snprintf(buf + strlen(buf), size - strlen(buf), "level %d: %s%c%s%c", x, var, delim, val, sep);
if (strlen(buf) >= size) {
ast_log(LOG_ERROR,"Data Buffer Size Exceeded!\n");
break;
}
total++;
} else
break;
}
for (i = 0 ; i < CDR_CLEN; i++) {
workspacelen = sizeof(workspace);
ast_cdr_getvar(cdr, cdrcols[i], &tmp, workspace, workspacelen, 0);
if (!tmp)
continue;
snprintf(buf + strlen(buf), size - strlen(buf), "level %d: %s%c%s%c", x, cdrcols[i], delim, tmp, sep);
if (strlen(buf) >= size) {
ast_log(LOG_ERROR,"Data Buffer Size Exceeded!\n");
break;
}
total++;
}
if (!recur) {
break;
}
return total;
}
void ast_cdr_free_vars(struct ast_cdr *cdr, int recur)
{
struct varshead *headp;
struct ast_var_t *vardata;
/* clear variables */
headp = &cdr->varshead;
while (!AST_LIST_EMPTY(headp)) {
vardata = AST_LIST_REMOVE_HEAD(headp, entries);
ast_var_delete(vardata);
}
char *chan;
struct ast_cdr *next;
while (cdr) {
next = cdr->next;
chan = !ast_strlen_zero(cdr->channel) ? cdr->channel : "<unknown>";
if (!ast_test_flag(cdr, AST_CDR_FLAG_POSTED) && !ast_test_flag(cdr, AST_CDR_FLAG_POST_DISABLED))
ast_log(LOG_WARNING, "CDR on channel '%s' not posted\n", chan);
if (!cdr->end.tv_sec && !cdr->end.tv_usec)
ast_log(LOG_WARNING, "CDR on channel '%s' lacks end\n", chan);
if (!cdr->start.tv_sec && !cdr->start.tv_usec)
ast_log(LOG_WARNING, "CDR on channel '%s' lacks start\n", chan);
}
}
struct ast_cdr *ast_cdr_alloc(void)
{
struct ast_cdr *cdr;
cdr = malloc(sizeof(*cdr));
if (cdr)
memset(cdr, 0, sizeof(*cdr));
return cdr;
}
void ast_cdr_start(struct ast_cdr *cdr)
{
char *chan;
if (!ast_test_flag(cdr, AST_CDR_FLAG_LOCKED)) {
chan = !ast_strlen_zero(cdr->channel) ? cdr->channel : "<unknown>";
if (ast_test_flag(cdr, AST_CDR_FLAG_POSTED))
ast_log(LOG_WARNING, "CDR on channel '%s' already posted\n", chan);
if (cdr->start.tv_sec || cdr->start.tv_usec)
ast_log(LOG_WARNING, "CDR on channel '%s' already started\n", chan);
gettimeofday(&cdr->start, NULL);
}
}
}
void ast_cdr_answer(struct ast_cdr *cdr)
{
char *chan;
chan = !ast_strlen_zero(cdr->channel) ? cdr->channel : "<unknown>";
if (ast_test_flag(cdr, AST_CDR_FLAG_POSTED))
ast_log(LOG_WARNING, "CDR on channel '%s' already posted\n", chan);
if (cdr->disposition < AST_CDR_ANSWERED)
cdr->disposition = AST_CDR_ANSWERED;
if (!cdr->answer.tv_sec && !cdr->answer.tv_usec) {
gettimeofday(&cdr->answer, NULL);
}
}
}
void ast_cdr_busy(struct ast_cdr *cdr)
{
char *chan;
if (!ast_test_flag(cdr, AST_CDR_FLAG_LOCKED)) {
chan = !ast_strlen_zero(cdr->channel) ? cdr->channel : "<unknown>";
if (ast_test_flag(cdr, AST_CDR_FLAG_POSTED))
ast_log(LOG_WARNING, "CDR on channel '%s' already posted\n", chan);
if (cdr->disposition < AST_CDR_BUSY)
cdr->disposition = AST_CDR_BUSY;
}
chan = !ast_strlen_zero(cdr->channel) ? cdr->channel : "<unknown>";
if (ast_test_flag(cdr, AST_CDR_FLAG_POSTED))
if (!ast_test_flag(cdr, AST_CDR_FLAG_LOCKED))
cdr->disposition = AST_CDR_FAILED;
}
}
int ast_cdr_disposition(struct ast_cdr *cdr, int cause)
{
int res = 0;
case AST_CAUSE_BUSY:
ast_cdr_busy(cdr);
break;
case AST_CAUSE_FAILURE:
ast_cdr_failed(cdr);
break;
case AST_CAUSE_NORMAL:
break;
case AST_CAUSE_NOTDEFINED:
res = -1;
break;
default:
res = -1;
ast_log(LOG_WARNING, "Cause not handled\n");
void ast_cdr_setdestchan(struct ast_cdr *cdr, char *chann)
{
char *chan;
chan = !ast_strlen_zero(cdr->channel) ? cdr->channel : "<unknown>";
if (ast_test_flag(cdr, AST_CDR_FLAG_POSTED))
ast_log(LOG_WARNING, "CDR on channel '%s' already posted\n", chan);
if (!ast_test_flag(cdr, AST_CDR_FLAG_LOCKED))
ast_copy_string(cdr->dstchannel, chann, sizeof(cdr->dstchannel));
}
}
void ast_cdr_setapp(struct ast_cdr *cdr, char *app, char *data)
{
char *chan;
if (!ast_test_flag(cdr, AST_CDR_FLAG_LOCKED)) {
chan = !ast_strlen_zero(cdr->channel) ? cdr->channel : "<unknown>";
if (ast_test_flag(cdr, AST_CDR_FLAG_POSTED))
ast_log(LOG_WARNING, "CDR on channel '%s' already posted\n", chan);
if (!app)
app = "";
ast_copy_string(cdr->lastapp, app, sizeof(cdr->lastapp));
ast_copy_string(cdr->lastdata, data, sizeof(cdr->lastdata));
int ast_cdr_setcid(struct ast_cdr *cdr, struct ast_channel *c)
{
char tmp[AST_MAX_EXTENSION] = "";
if (!ast_test_flag(cdr, AST_CDR_FLAG_LOCKED)) {
/* Grab source from ANI or normal Caller*ID */
num = c->cid.cid_ani ? c->cid.cid_ani : c->cid.cid_num;
if (c->cid.cid_name && num)
snprintf(tmp, sizeof(tmp), "\"%s\" <%s>", c->cid.cid_name, num);
else if (c->cid.cid_name)
strncpy(tmp, c->cid.cid_name, sizeof(tmp) - 1);
else if (num)
strncpy(tmp, num, sizeof(tmp) - 1);
ast_copy_string(cdr->clid, tmp, sizeof(cdr->clid));
ast_copy_string(cdr->src, num ? num : "", sizeof(cdr->src));
int ast_cdr_init(struct ast_cdr *cdr, struct ast_channel *c)
{
char *chan;
if (!ast_test_flag(cdr, AST_CDR_FLAG_LOCKED)) {
chan = !ast_strlen_zero(cdr->channel) ? cdr->channel : "<unknown>";
if (!ast_strlen_zero(cdr->channel))
ast_log(LOG_WARNING, "CDR already initialized on '%s'\n", chan);
ast_copy_string(cdr->channel, c->name, sizeof(cdr->channel));
/* Grab source from ANI or normal Caller*ID */
num = c->cid.cid_ani ? c->cid.cid_ani : c->cid.cid_num;
if (c->cid.cid_name && num)
snprintf(tmp, sizeof(tmp), "\"%s\" <%s>", c->cid.cid_name, num);
else if (c->cid.cid_name)
strncpy(tmp, c->cid.cid_name, sizeof(tmp) - 1);
else if (num)
strncpy(tmp, num, sizeof(tmp) - 1);
ast_copy_string(cdr->clid, tmp, sizeof(cdr->clid));
ast_copy_string(cdr->src, num ? num : "", sizeof(cdr->src));
cdr->disposition = (c->_state == AST_STATE_UP) ? AST_CDR_ANSWERED : AST_CDR_NOANSWER;
cdr->amaflags = c->amaflags ? c->amaflags : ast_default_amaflags;
ast_copy_string(cdr->accountcode, c->accountcode, sizeof(cdr->accountcode));
ast_copy_string(cdr->dst, c->exten, sizeof(cdr->dst));
ast_copy_string(cdr->dcontext, c->context, sizeof(cdr->dcontext));
ast_copy_string(cdr->uniqueid, c->uniqueid, sizeof(cdr->uniqueid));
}
return 0;
}
void ast_cdr_end(struct ast_cdr *cdr)
{
char *chan;
chan = !ast_strlen_zero(cdr->channel) ? cdr->channel : "<unknown>";
if (ast_test_flag(cdr, AST_CDR_FLAG_POSTED))
ast_log(LOG_WARNING, "CDR on channel '%s' already posted\n", chan);
if (!cdr->start.tv_sec && !cdr->start.tv_usec)
ast_log(LOG_WARNING, "CDR on channel '%s' has not started\n", chan);
if (!cdr->end.tv_sec && !cdr->end.tv_usec)
gettimeofday(&cdr->end, NULL);
}
}
char *ast_cdr_disp2str(int disposition)
{
switch (disposition) {
case AST_CDR_NOANSWER:
return "NO ANSWER";
case AST_CDR_BUSY:
return "BUSY";
case AST_CDR_ANSWERED:
return "ANSWERED";
}
}
char *ast_cdr_flags2str(int flag)
{
switch(flag) {
case AST_CDR_OMIT:
return "OMIT";
case AST_CDR_BILLING:
return "BILLING";
case AST_CDR_DOCUMENTATION:
return "DOCUMENTATION";
}
return "Unknown";
}
int ast_cdr_setaccount(struct ast_channel *chan, const char *account)
ast_copy_string(chan->accountcode, account, sizeof(chan->accountcode));
if (!ast_test_flag(cdr, AST_CDR_FLAG_LOCKED))
ast_copy_string(cdr->accountcode, chan->accountcode, sizeof(cdr->accountcode));
cdr = cdr->next;
}
int ast_cdr_setamaflags(struct ast_channel *chan, const char *flag)
{
struct ast_cdr *cdr = chan->cdr;
int newflag;
newflag = ast_cdr_amaflags2int(flag);
int ast_cdr_setuserfield(struct ast_channel *chan, const char *userfield)
{
struct ast_cdr *cdr = chan->cdr;
if (!ast_test_flag(cdr, AST_CDR_FLAG_LOCKED))
ast_copy_string(cdr->userfield, userfield, sizeof(cdr->userfield));
cdr = cdr->next;
}
int ast_cdr_appenduserfield(struct ast_channel *chan, const char *userfield)
{
struct ast_cdr *cdr = chan->cdr;
int len = strlen(cdr->userfield);
if (!ast_test_flag(cdr, AST_CDR_FLAG_LOCKED))
strncpy(cdr->userfield+len, userfield, sizeof(cdr->userfield) - len - 1);
int ast_cdr_update(struct ast_channel *c)
{
struct ast_cdr *cdr = c->cdr;
if (!ast_test_flag(cdr, AST_CDR_FLAG_LOCKED)) {
num = c->cid.cid_ani ? c->cid.cid_ani : c->cid.cid_num;
if (c->cid.cid_name && num)
snprintf(tmp, sizeof(tmp), "\"%s\" <%s>", c->cid.cid_name, num);
else if (c->cid.cid_name)
strncpy(tmp, c->cid.cid_name, sizeof(tmp) - 1);
else if (num)
strncpy(tmp, num, sizeof(tmp) - 1);
ast_copy_string(cdr->clid, tmp, sizeof(cdr->clid));
ast_copy_string(cdr->src, num ? num : "", sizeof(cdr->src));
ast_copy_string(cdr->accountcode, c->accountcode, sizeof(cdr->accountcode));
ast_copy_string(cdr->dst, (ast_strlen_zero(c->macroexten)) ? c->exten : c->macroexten, sizeof(cdr->dst));
ast_copy_string(cdr->dcontext, (ast_strlen_zero(c->macrocontext)) ? c->context : c->macrocontext, sizeof(cdr->dcontext));
Martin Pycko
committed
}
int ast_cdr_amaflags2int(const char *flag)
{
if (!strcasecmp(flag, "default"))
return 0;
if (!strcasecmp(flag, "omit"))
return AST_CDR_OMIT;
if (!strcasecmp(flag, "billing"))
return AST_CDR_BILLING;
if (!strcasecmp(flag, "documentation"))
return AST_CDR_DOCUMENTATION;
return -1;
}
static void post_cdr(struct ast_cdr *cdr)
chan = !ast_strlen_zero(cdr->channel) ? cdr->channel : "<unknown>";
if (ast_test_flag(cdr, AST_CDR_FLAG_POSTED))
ast_log(LOG_WARNING, "CDR on channel '%s' already posted\n", chan);
if (!cdr->end.tv_sec && !cdr->end.tv_usec)
ast_log(LOG_WARNING, "CDR on channel '%s' lacks end\n", chan);
if (!cdr->start.tv_sec && !cdr->start.tv_usec)
ast_log(LOG_WARNING, "CDR on channel '%s' lacks start\n", chan);
cdr->duration = cdr->end.tv_sec - cdr->start.tv_sec + (cdr->end.tv_usec - cdr->start.tv_usec) / 1000000;
cdr->billsec = cdr->end.tv_sec - cdr->answer.tv_sec + (cdr->end.tv_usec - cdr->answer.tv_usec) / 1000000;
ast_set_flag(cdr, AST_CDR_FLAG_POSTED);
AST_LIST_LOCK(&be_list);
AST_LIST_TRAVERSE(&be_list, i, list) {
void ast_cdr_reset(struct ast_cdr *cdr, int flags)
struct ast_flags tmp = {flags};
struct ast_cdr *dup;
/* Detach if post is requested */
if (ast_test_flag(&tmp, AST_CDR_FLAG_LOCKED) || !ast_test_flag(cdr, AST_CDR_FLAG_LOCKED)) {
if (ast_test_flag(&tmp, AST_CDR_FLAG_POSTED)) {
dup = ast_cdr_alloc();
memcpy(dup, cdr, sizeof(*dup));
ast_cdr_detach(dup);
ast_set_flag(cdr, AST_CDR_FLAG_POSTED);
ast_cdr_free_vars(cdr, 0);
}
ast_clear_flag(cdr, AST_FLAGS_ALL);
memset(&cdr->start, 0, sizeof(cdr->start));
memset(&cdr->end, 0, sizeof(cdr->end));
memset(&cdr->answer, 0, sizeof(cdr->answer));
cdr->billsec = 0;
cdr->duration = 0;
ast_cdr_start(cdr);
cdr->disposition = AST_CDR_NOANSWER;
Martin Pycko
committed
}
struct ast_cdr *ast_cdr_append(struct ast_cdr *cdr, struct ast_cdr *newcdr)
{
struct ast_cdr *ret;
cdr = cdr->next;
cdr->next = newcdr;
} else {
ret = newcdr;
}
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/* Don't call without cdr_batch_lock */
static void reset_batch(void)
{
batch->size = 0;
batch->head = NULL;
batch->tail = NULL;
}
/* Don't call without cdr_batch_lock */
static int init_batch(void)
{
/* This is the single meta-batch used to keep track of all CDRs during the entire life of the program */
batch = malloc(sizeof(*batch));
if (!batch) {
ast_log(LOG_WARNING, "CDR: out of memory while trying to handle batched records, data will most likely be lost\n");
return -1;
}
reset_batch();
return 0;
}
static void *do_batch_backend_process(void *data)
{
struct ast_cdr_batch_item *processeditem;
struct ast_cdr_batch_item *batchitem = data;
/* Push each CDR into storage mechanism(s) and free all the memory */
while (batchitem) {
post_cdr(batchitem->cdr);
ast_cdr_free(batchitem->cdr);
processeditem = batchitem;
batchitem = batchitem->next;
free(processeditem);
}
return NULL;
}
void ast_cdr_submit_batch(int shutdown)
{
struct ast_cdr_batch_item *oldbatchitems = NULL;
pthread_attr_t attr;
pthread_t batch_post_thread = AST_PTHREADT_NULL;
/* if there's no batch, or no CDRs in the batch, then there's nothing to do */
if (!batch || !batch->head)
return;
/* move the old CDRs aside, and prepare a new CDR batch */
ast_mutex_lock(&cdr_batch_lock);
oldbatchitems = batch->head;
reset_batch();
ast_mutex_unlock(&cdr_batch_lock);
/* if configured, spawn a new thread to post these CDRs,
also try to save as much as possible if we are shutting down safely */
if (batchscheduleronly || shutdown) {
if (option_debug)
ast_log(LOG_DEBUG, "CDR single-threaded batch processing begins now\n");
do_batch_backend_process(oldbatchitems);
} else {
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
if (ast_pthread_create(&batch_post_thread, &attr, do_batch_backend_process, oldbatchitems)) {
ast_log(LOG_WARNING, "CDR processing thread could not detach, now trying in this thread\n");
do_batch_backend_process(oldbatchitems);
} else {
if (option_debug)
ast_log(LOG_DEBUG, "CDR multi-threaded batch processing begins now\n");
}
}
}
static int submit_scheduled_batch(void *data)
{
ast_cdr_submit_batch(0);
/* manually reschedule from this point in time */
cdr_sched = ast_sched_add(sched, batchtime * 1000, submit_scheduled_batch, NULL);
/* returning zero so the scheduler does not automatically reschedule */
return 0;
}
static void submit_unscheduled_batch(void)
{
/* this is okay since we are not being called from within the scheduler */
if (cdr_sched > -1)
ast_sched_del(sched, cdr_sched);
/* schedule the submission to occur ASAP (1 ms) */
cdr_sched = ast_sched_add(sched, 1, submit_scheduled_batch, NULL);
/* signal the do_cdr thread to wakeup early and do some work (that lazy thread ;) */
pthread_mutex_lock(&cdr_pending_lock);
pthread_cond_signal(&cdr_pending_cond);
pthread_mutex_unlock(&cdr_pending_lock);
}
void ast_cdr_detach(struct ast_cdr *cdr)
{
struct ast_cdr_batch_item *newtail;
int curr;
/* maybe they disabled CDR stuff completely, so just drop it */
if (!enabled) {
if (option_debug)
ast_log(LOG_DEBUG, "Dropping CDR !\n");
ast_set_flag(cdr, AST_CDR_FLAG_POST_DISABLED);
ast_cdr_free(cdr);
return;
}
/* post stuff immediately if we are not in batch mode, this is legacy behaviour */
if (!batchmode) {
post_cdr(cdr);
ast_cdr_free(cdr);
return;
}
/* otherwise, each CDR gets put into a batch list (at the end) */
if (option_debug)
ast_log(LOG_DEBUG, "CDR detaching from this thread\n");
/* we'll need a new tail for every CDR */
newtail = malloc(sizeof(*newtail));
if (!newtail) {
ast_log(LOG_WARNING, "CDR: out of memory while trying to detach, will try in this thread instead\n");
post_cdr(cdr);
ast_cdr_free(cdr);
return;
}
memset(newtail, 0, sizeof(*newtail));
/* don't traverse a whole list (just keep track of the tail) */
ast_mutex_lock(&cdr_batch_lock);
if (!batch)
init_batch();
if (!batch->head) {
/* new batch is empty, so point the head at the new tail */
batch->head = newtail;
} else {
/* already got a batch with something in it, so just append a new tail */
batch->tail->next = newtail;
}
newtail->cdr = cdr;
batch->tail = newtail;
curr = batch->size++;
ast_mutex_unlock(&cdr_batch_lock);
/* if we have enough stuff to post, then do it */
if (curr >= (batchsize - 1))