diff --git a/Makefile.am b/Makefile.am index c267996..eb78e11 100644 --- a/Makefile.am +++ b/Makefile.am @@ -28,8 +28,13 @@ minidlnad_SOURCES = minidlna.c upnphttp.c upnpdescgen.c upnpsoap.c \ sql.c utils.c metadata.c scanner.c monitor.c \ tivo_utils.c tivo_beacon.c tivo_commands.c \ playlist.c image_utils.c albumart.c log.c \ - containers.c avahi.c tagutils/tagutils.c \ - select.c + containers.c avahi.c tagutils/tagutils.c + +if HAVE_KQUEUE +minidlnad_SOURCES += kqueue.c +else +minidlnad_SOURCES += select.c +endif if HAVE_VORBISFILE vorbislibs = -lvorbis -logg diff --git a/configure.ac b/configure.ac index f343d21..8eb55c9 100644 --- a/configure.ac +++ b/configure.ac @@ -512,6 +512,8 @@ AC_CHECK_FUNCS(inotify_init, AC_DEFINE(HAVE_INOTIFY,1,[Whether kernel has inotif ]) ]) +AC_CHECK_FUNCS(kqueue, AM_CONDITIONAL(HAVE_KQUEUE, true), AM_CONDITIONAL(HAVE_KQUEUE, false)) + ################################################################################################################ ### Build Options diff --git a/event.h b/event.h index 77ae7da..7fba74b 100644 --- a/event.h +++ b/event.h @@ -1,11 +1,24 @@ +#include "config.h" + +#ifdef HAVE_KQUEUE +#include +#include +#endif + struct event; typedef enum { +#ifdef HAVE_KQUEUE + EVENT_READ = EVFILT_READ, + EVENT_WRITE = EVFILT_WRITE, +#else EVENT_READ, EVENT_WRITE, - EVENT_RDWR, +#endif } event_t; +#define EV_FLAG_CLOSING 0x00000001 + typedef void event_process_t(struct event *); struct event { @@ -17,14 +30,13 @@ struct event { }; typedef int event_module_add_t(struct event *); +typedef int event_module_del_t(struct event *, int flags); typedef int event_module_init_t(void); typedef void event_module_fini_t(void); typedef int event_module_process_t(u_long); struct event_module { event_module_add_t *add; - event_module_add_t *del; - event_module_add_t *enable; - event_module_add_t *disable; + event_module_del_t *del; event_module_process_t *process; event_module_init_t *init; event_module_fini_t *fini; diff --git a/kqueue.c b/kqueue.c new file mode 100644 index 0000000..649ed88 --- /dev/null +++ b/kqueue.c @@ -0,0 +1,217 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) Nginx, Inc. + */ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "event.h" +#include "log.h" + +static int kqueue_set(struct event *ev, short filter, u_short flags); + +static event_module_init_t kqueue_init; +static event_module_fini_t kqueue_fini; +static event_module_add_t kqueue_add; +static event_module_del_t kqueue_del; +static event_module_process_t kqueue_process; + +static int kq; +static struct kevent *change_list; +static struct kevent *event_list; +static u_int nchanges; + +#define MAXCHANGES 128 +#define MAXEVENTS 128 + +struct event_module event_module = { + .add = kqueue_add, + .del = kqueue_del, + .process = kqueue_process, + .init = kqueue_init, + .fini = kqueue_fini, +}; + +static int +kqueue_init(void) +{ + + kq = kqueue(); + if (kq == -1) + return (errno); + + change_list = calloc(MAXCHANGES, sizeof(struct kevent)); + event_list = calloc(MAXEVENTS, sizeof(struct kevent)); + if (change_list == NULL || event_list == NULL) + return (ENOMEM); + + nchanges = 0; + + return (0); +} + +static void +kqueue_fini() +{ + + (void )close(kq); + kq = -1; + + free(change_list); + free(event_list); + change_list = NULL; + event_list = NULL; + nchanges = 0; +} + +static int +kqueue_add(struct event *ev) +{ + + DPRINTF(E_DEBUG, L_GENERAL, "kqueue_add %d\n", ev->fd); + return (kqueue_set(ev, ev->rdwr, EV_ADD | EV_ENABLE)); +} + +static int +kqueue_del(struct event *ev, int flags) +{ + + /* + * If the event is still not passed to a kernel, + * we will not pass it. + */ + assert(ev->fd >= 0); + if (ev->index < nchanges && + change_list[ev->index].udata == ev) { + if (ev->index < --nchanges) { + struct event *ev0; + + ev0 = (struct event *)change_list[nchanges].udata; + change_list[ev->index] = change_list[nchanges]; + ev0->index = ev->index; + } + return (0); + } + + /* + * when the file descriptor is closed the kqueue automatically deletes + * its filters so we do not need to delete explicitly the event + * before the closing the file descriptor. + */ + if (flags & EV_FLAG_CLOSING) + return (0); + + DPRINTF(E_DEBUG, L_GENERAL, "kqueue_del %d\n", ev->fd); + return (kqueue_set(ev, ev->rdwr, EV_DELETE)); +} + +static int +kqueue_set(struct event *ev, short filter, u_short flags) +{ + struct kevent *kev; + struct timespec ts; + + if (nchanges >= MAXCHANGES) { + DPRINTF(E_WARN, L_GENERAL, "kqueue change list is filled up\n"); + + ts.tv_sec = 0; + ts.tv_nsec = 0; + + if (kevent(kq, change_list, (int) nchanges, NULL, 0, &ts) == -1) { + DPRINTF(E_ERROR, L_GENERAL,"kevent() failed: %s\n", strerror(errno)); + return (errno); + } + nchanges = 0; + } + + kev = &change_list[nchanges]; + kev->ident = ev->fd; + kev->filter = filter; + kev->flags = flags; + kev->udata = ev; + kev->fflags = 0; + kev->data = 0; + + ev->index = nchanges++; + + return (0); +} + +static int +kqueue_process(u_long timer) +{ + struct event *ev; + int events, n; + struct timespec ts, *tp; + + n = (int) nchanges; + nchanges = 0; + + if (timer == 0) { + tp = NULL; + } else { + ts.tv_sec = timer / 1000; + ts.tv_nsec = (timer % 1000) * 1000000; + tp = &ts; + } + + DPRINTF(E_DEBUG, L_GENERAL, "kevent timer: %lu, changes: %d\n", + timer, n); + + events = kevent(kq, change_list, n, event_list, MAXEVENTS, tp); + + if (events == -1) { + if (errno == EINTR) + return (errno); + DPRINTF(E_FATAL, L_GENERAL, "kevent(): %s. EXITING\n", strerror(errno)); + } + + /* XXXGL */ + for (int i = 0; i < n; i++) { + struct event *ev; + + ev = (struct event *)change_list[i].udata; + } + + DPRINTF(E_DEBUG, L_GENERAL, "kevent events: %d\n", events); + + if (events == 0) { + if (timer != 0) + return (0); + DPRINTF(E_FATAL, L_GENERAL, "kevent() returned no events. EXITING\n"); + } + + for (int i = 0; i < events; i++) { + if (event_list[i].flags & EV_ERROR) { + DPRINTF(E_ERROR, L_GENERAL, + "kevent() error %d on %d filter:%d flags:0x%x\n", + (int)event_list[i].data, (int)event_list[i].ident, + event_list[i].filter, event_list[i].flags); + continue; + } + + ev = (struct event *)event_list[i].udata; + + switch (event_list[i].filter) { + case EVFILT_READ: + case EVFILT_WRITE: + ev->process(ev); + break; + default: + DPRINTF(E_ERROR, L_GENERAL, + "unexpected kevent() filter %d", + event_list[i].filter); + continue; + } + } + + return (0); +} diff --git a/select.c b/select.c index ebec886..a79a4a0 100644 --- a/select.c +++ b/select.c @@ -17,7 +17,7 @@ static event_module_init_t select_init; static event_module_fini_t select_fini; static event_module_add_t select_add; -static event_module_add_t select_del; +static event_module_del_t select_del; static event_module_process_t select_process; static fd_set master_read_fd_set; @@ -32,8 +32,6 @@ static int max_fd; struct event_module event_module = { .add = select_add, .del = select_del, - .enable = select_add, - .disable = select_del, .process = select_process, .init = select_init, .fini = select_fini, @@ -68,7 +66,7 @@ static int select_add(struct event *ev) { - assert(ev->fd <= FD_SETSIZE); + assert(ev->fd < FD_SETSIZE); switch (ev->rdwr) { case EVENT_READ: @@ -77,10 +75,6 @@ select_add(struct event *ev) case EVENT_WRITE: FD_SET(ev->fd, &master_write_fd_set); break; - case EVENT_RDWR: - FD_SET(ev->fd, &master_read_fd_set); - FD_SET(ev->fd, &master_write_fd_set); - break; } if (max_fd != -1 && max_fd < ev->fd) @@ -89,14 +83,16 @@ select_add(struct event *ev) events[nevents] = ev; ev->index = nevents++; + assert(nevents < FD_SETSIZE); + return (0); } static int -select_del(struct event *ev) +select_del(struct event *ev, int flags) { - assert(ev->fd <= FD_SETSIZE); + assert(ev->fd < FD_SETSIZE); switch (ev->rdwr) { case EVENT_READ: @@ -105,10 +101,6 @@ select_del(struct event *ev) case EVENT_WRITE: FD_CLR(ev->fd, &master_write_fd_set); break; - case EVENT_RDWR: - FD_CLR(ev->fd, &master_read_fd_set); - FD_CLR(ev->fd, &master_write_fd_set); - break; } if (max_fd == ev->fd) @@ -152,8 +144,7 @@ select_process(u_long msec) if (ready == -1) { if (errno == EINTR) return (errno); - DPRINTF(E_ERROR, L_GENERAL, "select(all): %s\n", strerror(errno)); - DPRINTF(E_FATAL, L_GENERAL, "Failed to select open sockets. EXITING\n"); + DPRINTF(E_FATAL, L_GENERAL, "select(): %s. EXITING\n", strerror(errno)); } if (ready == 0) @@ -171,11 +162,6 @@ select_process(u_long msec) if (FD_ISSET(ev->fd, &work_write_fd_set)) ev->process(ev); break; - case EVENT_RDWR: - if (FD_ISSET(ev->fd, &work_read_fd_set) || - FD_ISSET(ev->fd, &work_write_fd_set)) - ev->process(ev); - break; } } diff --git a/upnpevents.c b/upnpevents.c index e673a34..4de6ce8 100644 --- a/upnpevents.c +++ b/upnpevents.c @@ -365,14 +365,14 @@ static void upnp_event_send(struct upnp_event_notify * obj) if(i<0) { DPRINTF(E_WARN, L_HTTP, "%s: send(): %s\n", "upnp_event_send", strerror(errno)); obj->state = EError; - event_module.del(&obj->ev); + event_module.del(&obj->ev, 0); return; } obj->sent += i; } if(obj->sent == obj->tosend) { obj->state = EWaitingForResponse; - event_module.del(&obj->ev); + event_module.del(&obj->ev, 0); obj->ev.rdwr = EVENT_READ; event_module.add(&obj->ev); } @@ -385,13 +385,13 @@ static void upnp_event_recv(struct upnp_event_notify * obj) if(n<0) { DPRINTF(E_ERROR, L_HTTP, "%s: recv(): %s\n", "upnp_event_recv", strerror(errno)); obj->state = EError; - event_module.del(&obj->ev); + event_module.del(&obj->ev, 0); return; } DPRINTF(E_DEBUG, L_HTTP, "%s: (%dbytes) %.*s\n", "upnp_event_recv", n, n, obj->buffer); obj->state = EFinished; - event_module.del(&obj->ev); + event_module.del(&obj->ev, 0); if(obj->sub) { obj->sub->seq++; diff --git a/upnphttp.c b/upnphttp.c index 8b66ab9..974434e 100644 --- a/upnphttp.c +++ b/upnphttp.c @@ -123,7 +123,7 @@ void CloseSocket_upnphttp(struct upnphttp * h) { - event_module.del(&h->ev); + event_module.del(&h->ev, EV_FLAG_CLOSING); if(close(h->ev.fd) < 0) { DPRINTF(E_ERROR, L_HTTP, "CloseSocket_upnphttp: close(%d): %s\n", h->ev.fd, strerror(errno));