From 5e320f279899dd09fcba44eddee8ecc728b53413 Mon Sep 17 00:00:00 2001 From: Gleb Smirnoff Date: Fri, 29 Dec 2017 02:05:46 -0800 Subject: [PATCH] Add monitoring support via kqueue(2). Based on patch from FreeBSD ports, authored by wg@FreeBSD.org and se@FreeBSD.org. However, this version doesn't create any thread, it uses main event dispatcher. Some effort was made to unify monitoring via kqueue and via inotify APIs. Now both provide their implementation of add_watch() function. I guess there are some logical bugs in vnode_process(). With this commit it would be better provide code as is, and resolve bugs separately. --- Makefile.am | 2 +- event.h | 11 ++- kqueue.c | 30 +++--- minidlna.c | 26 ++++-- monitor.c | 27 ++---- monitor.h | 10 ++ monitor_kqueue.c | 236 +++++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 300 insertions(+), 42 deletions(-) create mode 100644 monitor_kqueue.c diff --git a/Makefile.am b/Makefile.am index eb78e11..1d7d73f 100644 --- a/Makefile.am +++ b/Makefile.am @@ -31,7 +31,7 @@ minidlnad_SOURCES = minidlna.c upnphttp.c upnpdescgen.c upnpsoap.c \ containers.c avahi.c tagutils/tagutils.c if HAVE_KQUEUE -minidlnad_SOURCES += kqueue.c +minidlnad_SOURCES += kqueue.c monitor_kqueue.c else minidlnad_SOURCES += select.c endif diff --git a/event.h b/event.h index 7fba74b..2e677f3 100644 --- a/event.h +++ b/event.h @@ -11,6 +11,7 @@ typedef enum { #ifdef HAVE_KQUEUE EVENT_READ = EVFILT_READ, EVENT_WRITE = EVFILT_WRITE, + EVENT_VNODE = EVFILT_VNODE, #else EVENT_READ, EVENT_WRITE, @@ -20,12 +21,20 @@ typedef enum { #define EV_FLAG_CLOSING 0x00000001 typedef void event_process_t(struct event *); +#ifdef HAVE_KQUEUE +typedef void event_vnode_process_t(struct event *, u_int); +#endif struct event { int fd; int index; event_t rdwr; - event_process_t *process; + union { + event_process_t *process; +#ifdef HAVE_KQUEUE + event_vnode_process_t *process_vnode; +#endif + }; void *data; }; diff --git a/kqueue.c b/kqueue.c index 649ed88..d92a340 100644 --- a/kqueue.c +++ b/kqueue.c @@ -16,7 +16,7 @@ #include "event.h" #include "log.h" -static int kqueue_set(struct event *ev, short filter, u_short flags); +static int kqueue_set(struct event *, short, u_short, u_int); static event_module_init_t kqueue_init; static event_module_fini_t kqueue_fini; @@ -75,9 +75,19 @@ kqueue_fini() static int kqueue_add(struct event *ev) { + u_int fflags; + u_short flags; + + if (ev->rdwr == EVFILT_VNODE) { + flags = EV_ADD | EV_ENABLE | EV_CLEAR; + fflags = NOTE_DELETE | NOTE_WRITE | NOTE_EXTEND; + } else { + flags = EV_ADD | EV_ENABLE; + fflags = 0; + } DPRINTF(E_DEBUG, L_GENERAL, "kqueue_add %d\n", ev->fd); - return (kqueue_set(ev, ev->rdwr, EV_ADD | EV_ENABLE)); + return (kqueue_set(ev, ev->rdwr, flags, fflags)); } static int @@ -110,11 +120,11 @@ kqueue_del(struct event *ev, int flags) return (0); DPRINTF(E_DEBUG, L_GENERAL, "kqueue_del %d\n", ev->fd); - return (kqueue_set(ev, ev->rdwr, EV_DELETE)); + return (kqueue_set(ev, ev->rdwr, EV_DELETE, 0)); } static int -kqueue_set(struct event *ev, short filter, u_short flags) +kqueue_set(struct event *ev, short filter, u_short flags, u_int fflags) { struct kevent *kev; struct timespec ts; @@ -137,7 +147,7 @@ kqueue_set(struct event *ev, short filter, u_short flags) kev->filter = filter; kev->flags = flags; kev->udata = ev; - kev->fflags = 0; + kev->fflags = fflags; kev->data = 0; ev->index = nchanges++; @@ -174,13 +184,6 @@ kqueue_process(u_long timer) DPRINTF(E_FATAL, L_GENERAL, "kevent(): %s. EXITING\n", strerror(errno)); } - /* XXXGL */ - for (int i = 0; i < n; i++) { - struct event *ev; - - ev = (struct event *)change_list[i].udata; - } - DPRINTF(E_DEBUG, L_GENERAL, "kevent events: %d\n", events); if (events == 0) { @@ -205,6 +208,9 @@ kqueue_process(u_long timer) case EVFILT_WRITE: ev->process(ev); break; + case EVFILT_VNODE: + ev->process_vnode(ev, event_list[i].fflags); + break; default: DPRINTF(E_ERROR, L_GENERAL, "unexpected kevent() filter %d", diff --git a/minidlna.c b/minidlna.c index 3534b10..3da4f15 100644 --- a/minidlna.c +++ b/minidlna.c @@ -408,7 +408,6 @@ rescan: if (ret || GETFLAG(RESCAN_MASK)) { #if USE_FORK - SETFLAG(SCANNING_MASK); sqlite3_close(db); *scanner_pid = fork(); open_db(&db); @@ -425,6 +424,8 @@ rescan: { start_scanner(); } + else + SETFLAG(SCANNING_MASK); #else start_scanner(); #endif @@ -1153,7 +1154,13 @@ main(int argc, char **argv) else if (pthread_create(&inotify_thread, NULL, start_inotify, NULL) != 0) DPRINTF(E_FATAL, L_GENERAL, "ERROR: pthread_create() failed for start_inotify. EXITING\n"); } -#endif +#endif /* HAVE_INOTIFY */ + +#ifdef HAVE_KQUEUE + if (!GETFLAG(SCANNING_MASK)) + kqueue_monitor_start(); +#endif /* HAVE_KQUEUE */ + smonitor = OpenAndConfMonitorSocket(); if (smonitor > 0) { @@ -1270,14 +1277,13 @@ main(int argc, char **argv) } #endif - if (GETFLAG(SCANNING_MASK)) - { - if (!scanner_pid || kill(scanner_pid, 0) != 0) - { - CLEARFLAG(SCANNING_MASK); - if (_get_dbtime() != lastdbtime) - updateID++; - } + if (GETFLAG(SCANNING_MASK) && kill(scanner_pid, 0) != 0) { + CLEARFLAG(SCANNING_MASK); + if (_get_dbtime() != lastdbtime) + updateID++; +#ifdef HAVE_KQUEUE + kqueue_monitor_start(); +#endif /* HAVE_KQUEUE */ } event_module.process(timeout); diff --git a/monitor.c b/monitor.c index a4f7359..45f6830 100644 --- a/monitor.c +++ b/monitor.c @@ -108,7 +108,7 @@ raise_watch_limit(unsigned int limit) fclose(max_watches); } -static int +int add_watch(int fd, const char * path) { struct watch *nw; @@ -123,14 +123,14 @@ add_watch(int fd, const char * path) if( wd < 0 ) { DPRINTF(E_ERROR, L_INOTIFY, "inotify_add_watch(%s) [%s]\n", path, strerror(errno)); - return -1; + return (errno); } nw = malloc(sizeof(struct watch)); if( nw == NULL ) { DPRINTF(E_ERROR, L_INOTIFY, "malloc() error\n"); - return -1; + return (ENOMEM); } nw->wd = wd; nw->next = NULL; @@ -147,7 +147,8 @@ add_watch(int fd, const char * path) } lastwatch = nw; - return wd; + DPRINTF(E_INFO, L_INOTIFY, "Added watch to %s [%d]\n", path, wd); + return (0); } static int @@ -339,7 +340,7 @@ check_nfo(const char *path) " and MIME glob 'video/*' limit 1", file, file); } -static int +int monitor_insert_file(const char *name, const char *path) { int len; @@ -504,20 +505,10 @@ monitor_insert_directory(int fd, char *name, const char * path) free(parent_buf); } +#ifdef HAVE_WATCH if( fd > 0 ) - { - #ifdef HAVE_INOTIFY - int wd = add_watch(fd, path); - if( wd == -1 ) - { - DPRINTF(E_ERROR, L_INOTIFY, "add_watch() failed\n"); - } - else - { - DPRINTF(E_INFO, L_INOTIFY, "Added watch to %s [%d]\n", path, wd); - } - #endif - } + add_watch(fd, path); +#endif dir_types = valid_media_types(path); diff --git a/monitor.h b/monitor.h index 35ab12e..00731ba 100644 --- a/monitor.h +++ b/monitor.h @@ -1,8 +1,18 @@ +int monitor_insert_file(const char *name, const char *path); int monitor_insert_directory(int fd, char *name, const char * path); int monitor_remove_file(const char * path); int monitor_remove_directory(int fd, const char * path); +#if defined(HAVE_INOTIFY) && defined(HAVE_KQUEUE) +#define HAVE_WATCH 1 +int add_watch(int, const char *); +#endif + #ifdef HAVE_INOTIFY void * start_inotify(); #endif + +#ifdef HAVE_KQUEUE +void kqueue_monitor_start(); +#endif diff --git a/monitor_kqueue.c b/monitor_kqueue.c new file mode 100644 index 0000000..a0fc5e7 --- /dev/null +++ b/monitor_kqueue.c @@ -0,0 +1,236 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "event.h" +#include "log.h" +#include "monitor.h" +#include "minidlnatypes.h" +#include "upnpglobalvars.h" +#include "sql.h" +#include "utils.h" + +static void +vnode_process(struct event *ev, u_int fflags) +{ + const char *path; + char *sql, **result, tmp_path[PATH_MAX], *esc_name; + int rows, result_path_len; + DIR* d; + struct dirent *entry; + bool found_flag; + + path = (const char *)ev->data; + + if (fflags & NOTE_DELETE) { + DPRINTF(E_DEBUG, L_INOTIFY, "Path [%s] deleted.\n", path); + close(ev->fd); + free(ev); + monitor_remove_directory(0, path); + return; + } else if ((fflags & (NOTE_WRITE | NOTE_LINK)) == + (NOTE_WRITE | NOTE_LINK)) { + + DPRINTF(E_DEBUG, L_INOTIFY, "Directory [%s] content updated\n", + path); + sql = sqlite3_mprintf("SELECT PATH from DETAILS where " + "(PATH > '%q/' and PATH <= '%q/%c') and SIZE = ''", + path, path, 0xFF); + DPRINTF(E_DEBUG, L_INOTIFY, "SQL: %s\n", sql); + if ((sql_get_table(db, sql, &result, &rows, NULL) != + SQLITE_OK)) { + DPRINTF(E_WARN, L_INOTIFY, + "Read state [%s]: Query failed\n", path); + goto err1; + } + + for (int i = 1; i <= rows; i++) { + DPRINTF(E_DEBUG, L_INOTIFY, + "Indexed content: %s\n", result[i]); + if (access(result[i], R_OK) == -1) + monitor_remove_directory(0, result[i]); + } + + if ((d = opendir(path)) == NULL) { + DPRINTF(E_ERROR, L_INOTIFY, "Can't list [%s] (%s)\n", + path, strerror(errno)); + goto err2; + } + + for (entry = readdir(d); entry != NULL; entry = readdir(d)) { + if ((entry->d_type != DT_DIR) || + (strcmp(entry->d_name, "..") == 0) || + (strcmp(entry->d_name, ".") == 0)) + continue; + + result_path_len = snprintf(tmp_path, PATH_MAX, + "%s/%s", path, entry->d_name); + if (result_path_len >= PATH_MAX) { + DPRINTF(E_ERROR, L_INOTIFY, + "File path too long for %s!", + entry->d_name); + continue; + } + + DPRINTF(E_DEBUG, L_INOTIFY, "Walking %s\n", tmp_path); + found_flag = false; + for (int i = 1; i <= rows; i++) { + if (strcmp(result[i], tmp_path) == 0) { + found_flag = true; + break; + } + } + if (!found_flag) { + esc_name = strdup(entry->d_name); + if (esc_name == NULL) { + DPRINTF(E_ERROR, L_INOTIFY, + "strdup error"); + continue; + } + esc_name = modifyString(esc_name, "&", "&amp;", 0); + monitor_insert_directory(1, esc_name, tmp_path); + free(esc_name); + } + } + } else if (fflags & NOTE_WRITE) { + + DPRINTF(E_DEBUG, L_INOTIFY, "File [%s] content updated\n", + path); + sql = sqlite3_mprintf("SELECT PATH from DETAILS where " + "(PATH > '%q/' and PATH <= '%q/%c') and SIZE <> ''", + path, path, 0xFF); + if (sql_get_table(db, sql, &result, &rows, NULL) != SQLITE_OK) { + DPRINTF(E_WARN, L_INOTIFY, + "Read state [%s]: Query failed\n", path); + goto err1; + } + + for (int i = 1; i <= rows; i++) { + DPRINTF(E_DEBUG, L_INOTIFY, + "Indexed content: %s\n", result[i]); + if (access(result[i], R_OK) == -1) + monitor_remove_file(result[i]); + } + + if ((d = opendir(path)) == NULL) { + DPRINTF(E_ERROR, L_INOTIFY, + "Can't list [%s] (%s)\n", path, strerror(errno)); + goto err2; + } + + for (entry = readdir(d); entry != NULL; entry = readdir(d)) { + if ((entry->d_type != DT_REG) && + (entry->d_type != DT_LNK)) + continue; + + result_path_len = snprintf(tmp_path, PATH_MAX, "%s/%s", + path, entry->d_name); + if (result_path_len >= PATH_MAX) { + DPRINTF(E_ERROR, L_INOTIFY, + "File path too long for %s!", + entry->d_name); + continue; + } + DPRINTF(E_DEBUG, L_INOTIFY, "Walking %s\n", tmp_path); + found_flag = false; + for (int i = 1; i <= rows; i++) + if (strcmp(result[i], tmp_path) == 0) { + found_flag = true; + break; + } + if (!found_flag ) { + struct stat st; + + if (stat(tmp_path, &st) != 0) { + DPRINTF(E_ERROR, L_INOTIFY, + "stat(%s): %s\n", tmp_path, + strerror(errno)); + continue; + } + esc_name = strdup(entry->d_name); + if (esc_name == NULL) { + DPRINTF(E_ERROR, L_INOTIFY, + "strdup error"); + continue; + } + esc_name = modifyString(esc_name, "&", "&amp;", 0); + if (S_ISDIR(st.st_mode)) + monitor_insert_directory(1, esc_name, tmp_path); + else + monitor_insert_file(esc_name, tmp_path); + free(esc_name); + } + } + } else + return; + + closedir(d); +err2: + sqlite3_free_table(result); +err1: + sqlite3_free(sql); +} + +int +add_watch(int fd __unused, const char *path) +{ + struct event *ev; + int wd; + + wd = open(path, O_RDONLY); + if (wd < 0) { + DPRINTF(E_ERROR, L_INOTIFY, "open(%s) [%s]\n", + path, strerror(errno)); + return (errno); + } + + if ((ev = malloc(sizeof(struct event))) == NULL) { + DPRINTF(E_ERROR, L_INOTIFY, "malloc() error\n"); + close(wd); + return (ENOMEM); + } + if ((ev->data = strdup(path)) == NULL) { + DPRINTF(E_ERROR, L_INOTIFY, "strdup() error\n"); + close(wd); + free(ev); + return (ENOMEM); + } + ev->fd = wd; + ev->rdwr = EVENT_VNODE; + ev->process_vnode = vnode_process; + + DPRINTF(E_DEBUG, L_INOTIFY, "kqueue add_watch [%s]\n", path); + event_module.add(ev); + + return (0); +} + +/* + * XXXGL: this function has too much copypaste of inotify_create_watches(). + * We need to split out inotify stuff from monitor.c into monitor_inotify.c, + * compile the latter on Linux and this file on FreeBSD, and keep monitor.c + * itself platform independent. + */ +void +kqueue_monitor_start() +{ + struct media_dir_s *media_path; + char **result; + int rows; + + DPRINTF(E_DEBUG, L_INOTIFY, "kqueue monitoring starting\n"); + for (media_path = media_dirs; media_path != NULL; + media_path = media_path->next) + add_watch(0, media_path->path); + sql_get_table(db, "SELECT PATH from DETAILS where MIME is NULL and PATH is not NULL", &result, &rows, NULL); + for (int i = 1; i <= rows; i++ ) + add_watch(0, result[i]); + sqlite3_free_table(result); +}