diff --git a/pbx/pbx_spool.c b/pbx/pbx_spool.c index 00d604137d7aa25bbf64cd9462df5175b4156e49..0665b477cff294f4ea954511ba8bdbc356cf3890 100644 --- a/pbx/pbx_spool.c +++ b/pbx/pbx_spool.c @@ -26,11 +26,6 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") -/* Handling of call files using inotify is not functioning correctly currently: - * Issue 18089 - https://issues.asterisk.org/view.php?id=18089 - */ -#undef HAVE_INOTIFY - #include <sys/stat.h> #include <time.h> #include <utime.h> @@ -67,7 +62,7 @@ enum { */ SPOOL_FLAG_ALWAYS_DELETE = (1 << 0), /* Don't unlink the call file after processing, move in qdonedir */ - SPOOL_FLAG_ARCHIVE = (1 << 1) + SPOOL_FLAG_ARCHIVE = (1 << 1), }; static char qdir[255]; @@ -98,6 +93,10 @@ struct outgoing { struct ast_flags options; /*!< options */ }; +#if defined(HAVE_INOTIFY) || defined(HAVE_KQUEUE) +static void queue_file(const char *filename, time_t when); +#endif + static int init_outgoing(struct outgoing *o) { o->priority = 1; @@ -260,24 +259,20 @@ static int apply_outgoing(struct outgoing *o, const char *fn, FILE *f) static void safe_append(struct outgoing *o, time_t now, char *s) { - int fd; FILE *f; - struct utimbuf tbuf; + struct utimbuf tbuf = { .actime = now, .modtime = now + o->retrytime }; - if ((fd = open(o->fn, O_WRONLY | O_APPEND)) < 0) - return; + ast_debug(1, "Outgoing %s/%s: %s\n", o->tech, o->dest, s); - if ((f = fdopen(fd, "a"))) { + if ((f = fopen(o->fn, "a"))) { fprintf(f, "\n%s: %ld %d (%ld)\n", s, (long)ast_mainpid, o->retries, (long) now); fclose(f); - } else - close(fd); + } /* Update the file time */ - tbuf.actime = now; - tbuf.modtime = now + o->retrytime; - if (utime(o->fn, &tbuf)) + if (utime(o->fn, &tbuf)) { ast_log(LOG_WARNING, "Unable to set utime on %s: %s\n", o->fn, strerror(errno)); + } } /*! @@ -288,7 +283,6 @@ static void safe_append(struct outgoing *o, time_t now, char *s) */ static int remove_from_queue(struct outgoing *o, const char *status) { - int fd; FILE *f; char newfn[256]; const char *bname; @@ -297,8 +291,9 @@ static int remove_from_queue(struct outgoing *o, const char *status) struct stat current_file_status; if (!stat(o->fn, ¤t_file_status)) { - if (time(NULL) < current_file_status.st_mtime) + if (time(NULL) < current_file_status.st_mtime) { return 0; + } } } @@ -313,26 +308,28 @@ static int remove_from_queue(struct outgoing *o, const char *status) return -1; } - if ((fd = open(o->fn, O_WRONLY | O_APPEND))) { - if ((f = fdopen(fd, "a"))) { - fprintf(f, "Status: %s\n", status); - fclose(f); - } else - close(fd); + if (!(bname = strrchr(o->fn, '/'))) { + bname = o->fn; + } else { + bname++; } - if (!(bname = strrchr(o->fn, '/'))) - bname = o->fn; - else - bname++; snprintf(newfn, sizeof(newfn), "%s/%s", qdonedir, bname); /* a existing call file the archive dir is overwritten */ unlink(newfn); if (rename(o->fn, newfn) != 0) { unlink(o->fn); return -1; - } else - return 0; + } + + /* Only append to the file AFTER we move it out of the watched directory, + * otherwise the fclose() causes another event for inotify(7) */ + if ((f = fopen(newfn, "a"))) { + fprintf(f, "Status: %s\n", status); + fclose(f); + } + + return 0; } static void *attempt_thread(void *data) @@ -357,6 +354,9 @@ static void *attempt_thread(void *data) } else { /* Notate that the call is still active */ safe_append(o, time(NULL), "EndRetry"); +#if defined(HAVE_INOTIFY) || defined(HAVE_KQUEUE) + queue_file(o->fn, time(NULL) + o->retrytime); +#endif } } else { ast_log(LOG_NOTICE, "Call completed to %s/%s\n", o->tech, o->dest); @@ -388,7 +388,7 @@ static int scan_service(const char *fn, time_t now) ast_log(LOG_WARNING, "Out of memory ;(\n"); return -1; } - + if (init_outgoing(o)) { /* No need to call free_outgoing here since we know the failure * was to allocate string fields and no variables have been allocated @@ -399,10 +399,12 @@ static int scan_service(const char *fn, time_t now) } /* Attempt to open the file */ - if (!(f = fopen(fn, "r+"))) { + if (!(f = fopen(fn, "r"))) { remove_from_queue(o, "Failed"); free_outgoing(o); +#if !defined(HAVE_INOTIFY) && !defined(HAVE_KQUEUE) ast_log(LOG_WARNING, "Unable to open %s: %s, deleting\n", fn, strerror(errno)); +#endif return -1; } @@ -414,7 +416,7 @@ static int scan_service(const char *fn, time_t now) fclose(f); return -1; } - + #if 0 printf("Filename: %s, Retries: %d, max: %d\n", fn, o->retries, o->maxretries); #endif @@ -432,7 +434,7 @@ static int scan_service(const char *fn, time_t now) so abort their retry and continue as we were... */ if (o->callingpid) safe_append(o, time(NULL), "AbortRetry"); - + safe_append(o, now, "StartRetry"); launch_service(o); } @@ -453,8 +455,9 @@ struct direntry { char name[0]; }; +static AST_LIST_HEAD_STATIC(dirlist, direntry); /* Only one thread is accessing this list, so no lock is necessary */ -static AST_LIST_HEAD_NOLOCK_STATIC(dirlist, direntry); +static AST_LIST_HEAD_NOLOCK_STATIC(createlist, direntry); static void queue_file(const char *filename, time_t when) { @@ -482,17 +485,18 @@ static void queue_file(const char *filename, time_t when) when = st.st_mtime; } -#ifndef HAVE_INOTIFY - /* Need to check the existing list for kqueue(2), in order to avoid duplicates. */ + /* Need to check the existing list in order to avoid duplicates. */ + AST_LIST_LOCK(&dirlist); AST_LIST_TRAVERSE(&dirlist, cur, list) { if (cur->mtime == when && !strcmp(filename, cur->name)) { + AST_LIST_UNLOCK(&dirlist); return; } } -#endif if ((res = when) > now || (res = scan_service(filename, now)) > 0) { if (!(new = ast_calloc(1, sizeof(*new) + strlen(filename) + 1))) { + AST_LIST_UNLOCK(&dirlist); return; } new->mtime = res; @@ -515,8 +519,43 @@ static void queue_file(const char *filename, time_t when) } } } + AST_LIST_UNLOCK(&dirlist); } +#ifdef HAVE_INOTIFY +static void queue_file_create(const char *filename) +{ + struct direntry *cur; + + AST_LIST_TRAVERSE(&createlist, cur, list) { + if (!strcmp(cur->name, filename)) { + return; + } + } + + if (!(cur = ast_calloc(1, sizeof(*cur) + strlen(filename) + 1))) { + return; + } + strcpy(cur->name, filename); + AST_LIST_INSERT_TAIL(&createlist, cur, list); +} + +static void queue_file_write(const char *filename) +{ + struct direntry *cur; + /* Only queue entries where an IN_CREATE preceded the IN_CLOSE_WRITE */ + AST_LIST_TRAVERSE_SAFE_BEGIN(&createlist, cur, list) { + if (!strcmp(cur->name, filename)) { + AST_LIST_REMOVE_CURRENT(list); + ast_free(cur); + queue_file(filename, 0); + break; + } + } + AST_LIST_TRAVERSE_SAFE_END +} +#endif + static void *scan_thread(void *unused) { DIR *dir; @@ -524,15 +563,10 @@ static void *scan_thread(void *unused) time_t now; struct timespec ts = { .tv_sec = 1 }; #ifdef HAVE_INOTIFY - int res; + ssize_t res; int inotify_fd = inotify_init(); - struct { - struct inotify_event iev; - /* It may not look like we're using this element, but when we read - * from inotify_fd, the event is typically larger than the first - * struct, and overflows into this second one. */ - char name[FILENAME_MAX + 1]; - } buf; + struct inotify_event *iev; + char buf[8192] __attribute__((aligned (sizeof(int)))); struct pollfd pfd = { .fd = inotify_fd, .events = POLLIN }; #else struct timespec nowait = { 0, 1 }; @@ -557,7 +591,7 @@ static void *scan_thread(void *unused) } #ifdef HAVE_INOTIFY - inotify_add_watch(inotify_fd, qdir, IN_CLOSE_WRITE | IN_MOVED_TO); + inotify_add_watch(inotify_fd, qdir, IN_CREATE | IN_CLOSE_WRITE | IN_MOVED_TO); #endif /* First, run through the directory and clear existing entries */ @@ -567,7 +601,7 @@ static void *scan_thread(void *unused) } #ifndef HAVE_INOTIFY - EV_SET(&kev, dirfd(dir), EVFILT_VNODE, EV_ADD | EV_ENABLE, NOTE_WRITE | NOTE_EXTEND | NOTE_DELETE | NOTE_REVOKE | NOTE_ATTRIB, 0, NULL); + EV_SET(&kev, dirfd(dir), EVFILT_VNODE, EV_ADD | EV_ENABLE | EV_CLEAR, NOTE_WRITE, 0, NULL); if (kevent(inotify_fd, &kev, 1, NULL, 0, &nowait) < 0 && errno != 0) { ast_log(LOG_ERROR, "Unable to watch directory %s: %s\n", qdir, strerror(errno)); } @@ -595,17 +629,23 @@ static void *scan_thread(void *unused) int waittime = next == INT_MAX ? -1 : (next - now) * 1000; /* When a file arrives, add it to the queue, in mtime order. */ if ((res = poll(&pfd, 1, waittime)) > 0 && (stage = 1) && - (res = read(inotify_fd, &buf, sizeof(buf))) >= sizeof(buf.iev)) { + (res = read(inotify_fd, &buf, sizeof(buf))) >= sizeof(*iev)) { + ssize_t len = 0; /* File(s) added to directory, add them to my list */ - do { - queue_file(buf.iev.name, 0); - res -= sizeof(buf.iev) + buf.iev.len; - if (res >= sizeof(buf.iev)) { - memmove(&buf.iev, &buf.iev.name[buf.iev.len], res); - continue; + for (iev = (void *) buf; res >= sizeof(*iev); iev = (struct inotify_event *) (((char *) iev) + len)) { + if (iev->mask & IN_CREATE) { + queue_file_create(iev->name); + } else if (iev->mask & IN_CLOSE_WRITE) { + queue_file_write(iev->name); + } else if (iev->mask & IN_MOVED_TO) { + queue_file(iev->name, 0); + } else { + ast_log(LOG_ERROR, "Unexpected event %d for file '%s'\n", (int) iev->mask, iev->name); } - break; - } while (1); + + len = sizeof(*iev) + iev->len; + res -= len; + } } else if (res < 0 && errno != EINTR && errno != EAGAIN) { ast_debug(1, "Got an error back from %s(2): %s\n", stage ? "read" : "poll", strerror(errno)); } @@ -626,11 +666,13 @@ static void *scan_thread(void *unused) } /* Empty the list of all entries ready to be processed */ + AST_LIST_LOCK(&dirlist); while (!AST_LIST_EMPTY(&dirlist) && AST_LIST_FIRST(&dirlist)->mtime <= now) { cur = AST_LIST_REMOVE_HEAD(&dirlist, list); queue_file(cur->name, cur->mtime); ast_free(cur); } + AST_LIST_UNLOCK(&dirlist); } return NULL; } @@ -645,7 +687,7 @@ static void *scan_thread(void *unused) int res; time_t last = 0, next = 0, now; struct timespec ts = { .tv_sec = 1 }; - + while (!ast_fully_booted) { nanosleep(&ts, NULL); } @@ -663,7 +705,7 @@ static void *scan_thread(void *unused) /* Make sure it is time for us to execute our check */ if ((st.st_mtime == last) && (next && (next > now))) continue; - + #if 0 printf("atime: %ld, mtime: %ld, ctime: %ld\n", st.st_atime, st.st_mtime, st.st_ctime); printf("Ooh, something changed / timeout\n");