Add monitoring support via kqueue(2). Based on patch from FreeBSD ports, authored by wg@FreeBSD.org and se@FreeBSD.org. However, this version doesn't create any thread, it uses main event dispatcher.

Some effort was made to unify monitoring via kqueue and via inotify
APIs. Now both provide their implementation of add_watch() function.

I guess there are some logical bugs in vnode_process(). With this commit
it would be better provide code as is, and resolve bugs separately.
This commit is contained in:
Gleb Smirnoff 2017-12-29 02:05:46 -08:00 committed by Justin Maggard
parent 184607cb56
commit 5e320f2798
7 changed files with 300 additions and 42 deletions

View File

@ -31,7 +31,7 @@ minidlnad_SOURCES = minidlna.c upnphttp.c upnpdescgen.c upnpsoap.c \
containers.c avahi.c tagutils/tagutils.c
if HAVE_KQUEUE
minidlnad_SOURCES += kqueue.c
minidlnad_SOURCES += kqueue.c monitor_kqueue.c
else
minidlnad_SOURCES += select.c
endif

11
event.h
View File

@ -11,6 +11,7 @@ typedef enum {
#ifdef HAVE_KQUEUE
EVENT_READ = EVFILT_READ,
EVENT_WRITE = EVFILT_WRITE,
EVENT_VNODE = EVFILT_VNODE,
#else
EVENT_READ,
EVENT_WRITE,
@ -20,12 +21,20 @@ typedef enum {
#define EV_FLAG_CLOSING 0x00000001
typedef void event_process_t(struct event *);
#ifdef HAVE_KQUEUE
typedef void event_vnode_process_t(struct event *, u_int);
#endif
struct event {
int fd;
int index;
event_t rdwr;
event_process_t *process;
union {
event_process_t *process;
#ifdef HAVE_KQUEUE
event_vnode_process_t *process_vnode;
#endif
};
void *data;
};

View File

@ -16,7 +16,7 @@
#include "event.h"
#include "log.h"
static int kqueue_set(struct event *ev, short filter, u_short flags);
static int kqueue_set(struct event *, short, u_short, u_int);
static event_module_init_t kqueue_init;
static event_module_fini_t kqueue_fini;
@ -75,9 +75,19 @@ kqueue_fini()
static int
kqueue_add(struct event *ev)
{
u_int fflags;
u_short flags;
if (ev->rdwr == EVFILT_VNODE) {
flags = EV_ADD | EV_ENABLE | EV_CLEAR;
fflags = NOTE_DELETE | NOTE_WRITE | NOTE_EXTEND;
} else {
flags = EV_ADD | EV_ENABLE;
fflags = 0;
}
DPRINTF(E_DEBUG, L_GENERAL, "kqueue_add %d\n", ev->fd);
return (kqueue_set(ev, ev->rdwr, EV_ADD | EV_ENABLE));
return (kqueue_set(ev, ev->rdwr, flags, fflags));
}
static int
@ -110,11 +120,11 @@ kqueue_del(struct event *ev, int flags)
return (0);
DPRINTF(E_DEBUG, L_GENERAL, "kqueue_del %d\n", ev->fd);
return (kqueue_set(ev, ev->rdwr, EV_DELETE));
return (kqueue_set(ev, ev->rdwr, EV_DELETE, 0));
}
static int
kqueue_set(struct event *ev, short filter, u_short flags)
kqueue_set(struct event *ev, short filter, u_short flags, u_int fflags)
{
struct kevent *kev;
struct timespec ts;
@ -137,7 +147,7 @@ kqueue_set(struct event *ev, short filter, u_short flags)
kev->filter = filter;
kev->flags = flags;
kev->udata = ev;
kev->fflags = 0;
kev->fflags = fflags;
kev->data = 0;
ev->index = nchanges++;
@ -174,13 +184,6 @@ kqueue_process(u_long timer)
DPRINTF(E_FATAL, L_GENERAL, "kevent(): %s. EXITING\n", strerror(errno));
}
/* XXXGL */
for (int i = 0; i < n; i++) {
struct event *ev;
ev = (struct event *)change_list[i].udata;
}
DPRINTF(E_DEBUG, L_GENERAL, "kevent events: %d\n", events);
if (events == 0) {
@ -205,6 +208,9 @@ kqueue_process(u_long timer)
case EVFILT_WRITE:
ev->process(ev);
break;
case EVFILT_VNODE:
ev->process_vnode(ev, event_list[i].fflags);
break;
default:
DPRINTF(E_ERROR, L_GENERAL,
"unexpected kevent() filter %d",

View File

@ -408,7 +408,6 @@ rescan:
if (ret || GETFLAG(RESCAN_MASK))
{
#if USE_FORK
SETFLAG(SCANNING_MASK);
sqlite3_close(db);
*scanner_pid = fork();
open_db(&db);
@ -425,6 +424,8 @@ rescan:
{
start_scanner();
}
else
SETFLAG(SCANNING_MASK);
#else
start_scanner();
#endif
@ -1153,7 +1154,13 @@ main(int argc, char **argv)
else if (pthread_create(&inotify_thread, NULL, start_inotify, NULL) != 0)
DPRINTF(E_FATAL, L_GENERAL, "ERROR: pthread_create() failed for start_inotify. EXITING\n");
}
#endif
#endif /* HAVE_INOTIFY */
#ifdef HAVE_KQUEUE
if (!GETFLAG(SCANNING_MASK))
kqueue_monitor_start();
#endif /* HAVE_KQUEUE */
smonitor = OpenAndConfMonitorSocket();
if (smonitor > 0)
{
@ -1270,14 +1277,13 @@ main(int argc, char **argv)
}
#endif
if (GETFLAG(SCANNING_MASK))
{
if (!scanner_pid || kill(scanner_pid, 0) != 0)
{
CLEARFLAG(SCANNING_MASK);
if (_get_dbtime() != lastdbtime)
updateID++;
}
if (GETFLAG(SCANNING_MASK) && kill(scanner_pid, 0) != 0) {
CLEARFLAG(SCANNING_MASK);
if (_get_dbtime() != lastdbtime)
updateID++;
#ifdef HAVE_KQUEUE
kqueue_monitor_start();
#endif /* HAVE_KQUEUE */
}
event_module.process(timeout);

View File

@ -108,7 +108,7 @@ raise_watch_limit(unsigned int limit)
fclose(max_watches);
}
static int
int
add_watch(int fd, const char * path)
{
struct watch *nw;
@ -123,14 +123,14 @@ add_watch(int fd, const char * path)
if( wd < 0 )
{
DPRINTF(E_ERROR, L_INOTIFY, "inotify_add_watch(%s) [%s]\n", path, strerror(errno));
return -1;
return (errno);
}
nw = malloc(sizeof(struct watch));
if( nw == NULL )
{
DPRINTF(E_ERROR, L_INOTIFY, "malloc() error\n");
return -1;
return (ENOMEM);
}
nw->wd = wd;
nw->next = NULL;
@ -147,7 +147,8 @@ add_watch(int fd, const char * path)
}
lastwatch = nw;
return wd;
DPRINTF(E_INFO, L_INOTIFY, "Added watch to %s [%d]\n", path, wd);
return (0);
}
static int
@ -339,7 +340,7 @@ check_nfo(const char *path)
" and MIME glob 'video/*' limit 1", file, file);
}
static int
int
monitor_insert_file(const char *name, const char *path)
{
int len;
@ -504,20 +505,10 @@ monitor_insert_directory(int fd, char *name, const char * path)
free(parent_buf);
}
#ifdef HAVE_WATCH
if( fd > 0 )
{
#ifdef HAVE_INOTIFY
int wd = add_watch(fd, path);
if( wd == -1 )
{
DPRINTF(E_ERROR, L_INOTIFY, "add_watch() failed\n");
}
else
{
DPRINTF(E_INFO, L_INOTIFY, "Added watch to %s [%d]\n", path, wd);
}
#endif
}
add_watch(fd, path);
#endif
dir_types = valid_media_types(path);

View File

@ -1,8 +1,18 @@
int monitor_insert_file(const char *name, const char *path);
int monitor_insert_directory(int fd, char *name, const char * path);
int monitor_remove_file(const char * path);
int monitor_remove_directory(int fd, const char * path);
#if defined(HAVE_INOTIFY) && defined(HAVE_KQUEUE)
#define HAVE_WATCH 1
int add_watch(int, const char *);
#endif
#ifdef HAVE_INOTIFY
void *
start_inotify();
#endif
#ifdef HAVE_KQUEUE
void kqueue_monitor_start();
#endif

236
monitor_kqueue.c Normal file
View File

@ -0,0 +1,236 @@
#include <sys/stat.h>
#include <sys/event.h>
#include <limits.h>
#include <dirent.h>
#include <errno.h>
#include <stdlib.h>
#include <stdio.h>
#include <stdbool.h>
#include <string.h>
#include <unistd.h>
#include "event.h"
#include "log.h"
#include "monitor.h"
#include "minidlnatypes.h"
#include "upnpglobalvars.h"
#include "sql.h"
#include "utils.h"
static void
vnode_process(struct event *ev, u_int fflags)
{
const char *path;
char *sql, **result, tmp_path[PATH_MAX], *esc_name;
int rows, result_path_len;
DIR* d;
struct dirent *entry;
bool found_flag;
path = (const char *)ev->data;
if (fflags & NOTE_DELETE) {
DPRINTF(E_DEBUG, L_INOTIFY, "Path [%s] deleted.\n", path);
close(ev->fd);
free(ev);
monitor_remove_directory(0, path);
return;
} else if ((fflags & (NOTE_WRITE | NOTE_LINK)) ==
(NOTE_WRITE | NOTE_LINK)) {
DPRINTF(E_DEBUG, L_INOTIFY, "Directory [%s] content updated\n",
path);
sql = sqlite3_mprintf("SELECT PATH from DETAILS where "
"(PATH > '%q/' and PATH <= '%q/%c') and SIZE = ''",
path, path, 0xFF);
DPRINTF(E_DEBUG, L_INOTIFY, "SQL: %s\n", sql);
if ((sql_get_table(db, sql, &result, &rows, NULL) !=
SQLITE_OK)) {
DPRINTF(E_WARN, L_INOTIFY,
"Read state [%s]: Query failed\n", path);
goto err1;
}
for (int i = 1; i <= rows; i++) {
DPRINTF(E_DEBUG, L_INOTIFY,
"Indexed content: %s\n", result[i]);
if (access(result[i], R_OK) == -1)
monitor_remove_directory(0, result[i]);
}
if ((d = opendir(path)) == NULL) {
DPRINTF(E_ERROR, L_INOTIFY, "Can't list [%s] (%s)\n",
path, strerror(errno));
goto err2;
}
for (entry = readdir(d); entry != NULL; entry = readdir(d)) {
if ((entry->d_type != DT_DIR) ||
(strcmp(entry->d_name, "..") == 0) ||
(strcmp(entry->d_name, ".") == 0))
continue;
result_path_len = snprintf(tmp_path, PATH_MAX,
"%s/%s", path, entry->d_name);
if (result_path_len >= PATH_MAX) {
DPRINTF(E_ERROR, L_INOTIFY,
"File path too long for %s!",
entry->d_name);
continue;
}
DPRINTF(E_DEBUG, L_INOTIFY, "Walking %s\n", tmp_path);
found_flag = false;
for (int i = 1; i <= rows; i++) {
if (strcmp(result[i], tmp_path) == 0) {
found_flag = true;
break;
}
}
if (!found_flag) {
esc_name = strdup(entry->d_name);
if (esc_name == NULL) {
DPRINTF(E_ERROR, L_INOTIFY,
"strdup error");
continue;
}
esc_name = modifyString(esc_name, "&", "&amp;amp;", 0);
monitor_insert_directory(1, esc_name, tmp_path);
free(esc_name);
}
}
} else if (fflags & NOTE_WRITE) {
DPRINTF(E_DEBUG, L_INOTIFY, "File [%s] content updated\n",
path);
sql = sqlite3_mprintf("SELECT PATH from DETAILS where "
"(PATH > '%q/' and PATH <= '%q/%c') and SIZE <> ''",
path, path, 0xFF);
if (sql_get_table(db, sql, &result, &rows, NULL) != SQLITE_OK) {
DPRINTF(E_WARN, L_INOTIFY,
"Read state [%s]: Query failed\n", path);
goto err1;
}
for (int i = 1; i <= rows; i++) {
DPRINTF(E_DEBUG, L_INOTIFY,
"Indexed content: %s\n", result[i]);
if (access(result[i], R_OK) == -1)
monitor_remove_file(result[i]);
}
if ((d = opendir(path)) == NULL) {
DPRINTF(E_ERROR, L_INOTIFY,
"Can't list [%s] (%s)\n", path, strerror(errno));
goto err2;
}
for (entry = readdir(d); entry != NULL; entry = readdir(d)) {
if ((entry->d_type != DT_REG) &&
(entry->d_type != DT_LNK))
continue;
result_path_len = snprintf(tmp_path, PATH_MAX, "%s/%s",
path, entry->d_name);
if (result_path_len >= PATH_MAX) {
DPRINTF(E_ERROR, L_INOTIFY,
"File path too long for %s!",
entry->d_name);
continue;
}
DPRINTF(E_DEBUG, L_INOTIFY, "Walking %s\n", tmp_path);
found_flag = false;
for (int i = 1; i <= rows; i++)
if (strcmp(result[i], tmp_path) == 0) {
found_flag = true;
break;
}
if (!found_flag ) {
struct stat st;
if (stat(tmp_path, &st) != 0) {
DPRINTF(E_ERROR, L_INOTIFY,
"stat(%s): %s\n", tmp_path,
strerror(errno));
continue;
}
esc_name = strdup(entry->d_name);
if (esc_name == NULL) {
DPRINTF(E_ERROR, L_INOTIFY,
"strdup error");
continue;
}
esc_name = modifyString(esc_name, "&", "&amp;amp;", 0);
if (S_ISDIR(st.st_mode))
monitor_insert_directory(1, esc_name, tmp_path);
else
monitor_insert_file(esc_name, tmp_path);
free(esc_name);
}
}
} else
return;
closedir(d);
err2:
sqlite3_free_table(result);
err1:
sqlite3_free(sql);
}
int
add_watch(int fd __unused, const char *path)
{
struct event *ev;
int wd;
wd = open(path, O_RDONLY);
if (wd < 0) {
DPRINTF(E_ERROR, L_INOTIFY, "open(%s) [%s]\n",
path, strerror(errno));
return (errno);
}
if ((ev = malloc(sizeof(struct event))) == NULL) {
DPRINTF(E_ERROR, L_INOTIFY, "malloc() error\n");
close(wd);
return (ENOMEM);
}
if ((ev->data = strdup(path)) == NULL) {
DPRINTF(E_ERROR, L_INOTIFY, "strdup() error\n");
close(wd);
free(ev);
return (ENOMEM);
}
ev->fd = wd;
ev->rdwr = EVENT_VNODE;
ev->process_vnode = vnode_process;
DPRINTF(E_DEBUG, L_INOTIFY, "kqueue add_watch [%s]\n", path);
event_module.add(ev);
return (0);
}
/*
* XXXGL: this function has too much copypaste of inotify_create_watches().
* We need to split out inotify stuff from monitor.c into monitor_inotify.c,
* compile the latter on Linux and this file on FreeBSD, and keep monitor.c
* itself platform independent.
*/
void
kqueue_monitor_start()
{
struct media_dir_s *media_path;
char **result;
int rows;
DPRINTF(E_DEBUG, L_INOTIFY, "kqueue monitoring starting\n");
for (media_path = media_dirs; media_path != NULL;
media_path = media_path->next)
add_watch(0, media_path->path);
sql_get_table(db, "SELECT PATH from DETAILS where MIME is NULL and PATH is not NULL", &result, &rows, NULL);
for (int i = 1; i <= rows; i++ )
add_watch(0, result[i]);
sqlite3_free_table(result);
}