Add kqueue event module. Code inspired by nginx. There are a lot of room for optimization here, this is just first working version.

This commit is contained in:
Gleb Smirnoff 2017-12-28 10:39:17 -08:00 committed by Justin Maggard
parent aefe4dd0bf
commit 7ba9e52fc8
7 changed files with 254 additions and 32 deletions

View File

@ -28,8 +28,13 @@ minidlnad_SOURCES = minidlna.c upnphttp.c upnpdescgen.c upnpsoap.c \
sql.c utils.c metadata.c scanner.c monitor.c \
tivo_utils.c tivo_beacon.c tivo_commands.c \
playlist.c image_utils.c albumart.c log.c \
containers.c avahi.c tagutils/tagutils.c \
select.c
containers.c avahi.c tagutils/tagutils.c
if HAVE_KQUEUE
minidlnad_SOURCES += kqueue.c
else
minidlnad_SOURCES += select.c
endif
if HAVE_VORBISFILE
vorbislibs = -lvorbis -logg

View File

@ -512,6 +512,8 @@ AC_CHECK_FUNCS(inotify_init, AC_DEFINE(HAVE_INOTIFY,1,[Whether kernel has inotif
])
])
AC_CHECK_FUNCS(kqueue, AM_CONDITIONAL(HAVE_KQUEUE, true), AM_CONDITIONAL(HAVE_KQUEUE, false))
################################################################################################################
### Build Options

20
event.h
View File

@ -1,11 +1,24 @@
#include "config.h"
#ifdef HAVE_KQUEUE
#include <sys/types.h>
#include <sys/event.h>
#endif
struct event;
typedef enum {
#ifdef HAVE_KQUEUE
EVENT_READ = EVFILT_READ,
EVENT_WRITE = EVFILT_WRITE,
#else
EVENT_READ,
EVENT_WRITE,
EVENT_RDWR,
#endif
} event_t;
#define EV_FLAG_CLOSING 0x00000001
typedef void event_process_t(struct event *);
struct event {
@ -17,14 +30,13 @@ struct event {
};
typedef int event_module_add_t(struct event *);
typedef int event_module_del_t(struct event *, int flags);
typedef int event_module_init_t(void);
typedef void event_module_fini_t(void);
typedef int event_module_process_t(u_long);
struct event_module {
event_module_add_t *add;
event_module_add_t *del;
event_module_add_t *enable;
event_module_add_t *disable;
event_module_del_t *del;
event_module_process_t *process;
event_module_init_t *init;
event_module_fini_t *fini;

217
kqueue.c Normal file
View File

@ -0,0 +1,217 @@
/*
* Copyright (C) Igor Sysoev
* Copyright (C) Nginx, Inc.
*/
#include <sys/types.h>
#include <sys/event.h>
#include <assert.h>
#include <errno.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include "event.h"
#include "log.h"
static int kqueue_set(struct event *ev, short filter, u_short flags);
static event_module_init_t kqueue_init;
static event_module_fini_t kqueue_fini;
static event_module_add_t kqueue_add;
static event_module_del_t kqueue_del;
static event_module_process_t kqueue_process;
static int kq;
static struct kevent *change_list;
static struct kevent *event_list;
static u_int nchanges;
#define MAXCHANGES 128
#define MAXEVENTS 128
struct event_module event_module = {
.add = kqueue_add,
.del = kqueue_del,
.process = kqueue_process,
.init = kqueue_init,
.fini = kqueue_fini,
};
static int
kqueue_init(void)
{
kq = kqueue();
if (kq == -1)
return (errno);
change_list = calloc(MAXCHANGES, sizeof(struct kevent));
event_list = calloc(MAXEVENTS, sizeof(struct kevent));
if (change_list == NULL || event_list == NULL)
return (ENOMEM);
nchanges = 0;
return (0);
}
static void
kqueue_fini()
{
(void )close(kq);
kq = -1;
free(change_list);
free(event_list);
change_list = NULL;
event_list = NULL;
nchanges = 0;
}
static int
kqueue_add(struct event *ev)
{
DPRINTF(E_DEBUG, L_GENERAL, "kqueue_add %d\n", ev->fd);
return (kqueue_set(ev, ev->rdwr, EV_ADD | EV_ENABLE));
}
static int
kqueue_del(struct event *ev, int flags)
{
/*
* If the event is still not passed to a kernel,
* we will not pass it.
*/
assert(ev->fd >= 0);
if (ev->index < nchanges &&
change_list[ev->index].udata == ev) {
if (ev->index < --nchanges) {
struct event *ev0;
ev0 = (struct event *)change_list[nchanges].udata;
change_list[ev->index] = change_list[nchanges];
ev0->index = ev->index;
}
return (0);
}
/*
* when the file descriptor is closed the kqueue automatically deletes
* its filters so we do not need to delete explicitly the event
* before the closing the file descriptor.
*/
if (flags & EV_FLAG_CLOSING)
return (0);
DPRINTF(E_DEBUG, L_GENERAL, "kqueue_del %d\n", ev->fd);
return (kqueue_set(ev, ev->rdwr, EV_DELETE));
}
static int
kqueue_set(struct event *ev, short filter, u_short flags)
{
struct kevent *kev;
struct timespec ts;
if (nchanges >= MAXCHANGES) {
DPRINTF(E_WARN, L_GENERAL, "kqueue change list is filled up\n");
ts.tv_sec = 0;
ts.tv_nsec = 0;
if (kevent(kq, change_list, (int) nchanges, NULL, 0, &ts) == -1) {
DPRINTF(E_ERROR, L_GENERAL,"kevent() failed: %s\n", strerror(errno));
return (errno);
}
nchanges = 0;
}
kev = &change_list[nchanges];
kev->ident = ev->fd;
kev->filter = filter;
kev->flags = flags;
kev->udata = ev;
kev->fflags = 0;
kev->data = 0;
ev->index = nchanges++;
return (0);
}
static int
kqueue_process(u_long timer)
{
struct event *ev;
int events, n;
struct timespec ts, *tp;
n = (int) nchanges;
nchanges = 0;
if (timer == 0) {
tp = NULL;
} else {
ts.tv_sec = timer / 1000;
ts.tv_nsec = (timer % 1000) * 1000000;
tp = &ts;
}
DPRINTF(E_DEBUG, L_GENERAL, "kevent timer: %lu, changes: %d\n",
timer, n);
events = kevent(kq, change_list, n, event_list, MAXEVENTS, tp);
if (events == -1) {
if (errno == EINTR)
return (errno);
DPRINTF(E_FATAL, L_GENERAL, "kevent(): %s. EXITING\n", strerror(errno));
}
/* XXXGL */
for (int i = 0; i < n; i++) {
struct event *ev;
ev = (struct event *)change_list[i].udata;
}
DPRINTF(E_DEBUG, L_GENERAL, "kevent events: %d\n", events);
if (events == 0) {
if (timer != 0)
return (0);
DPRINTF(E_FATAL, L_GENERAL, "kevent() returned no events. EXITING\n");
}
for (int i = 0; i < events; i++) {
if (event_list[i].flags & EV_ERROR) {
DPRINTF(E_ERROR, L_GENERAL,
"kevent() error %d on %d filter:%d flags:0x%x\n",
(int)event_list[i].data, (int)event_list[i].ident,
event_list[i].filter, event_list[i].flags);
continue;
}
ev = (struct event *)event_list[i].udata;
switch (event_list[i].filter) {
case EVFILT_READ:
case EVFILT_WRITE:
ev->process(ev);
break;
default:
DPRINTF(E_ERROR, L_GENERAL,
"unexpected kevent() filter %d",
event_list[i].filter);
continue;
}
}
return (0);
}

View File

@ -17,7 +17,7 @@
static event_module_init_t select_init;
static event_module_fini_t select_fini;
static event_module_add_t select_add;
static event_module_add_t select_del;
static event_module_del_t select_del;
static event_module_process_t select_process;
static fd_set master_read_fd_set;
@ -32,8 +32,6 @@ static int max_fd;
struct event_module event_module = {
.add = select_add,
.del = select_del,
.enable = select_add,
.disable = select_del,
.process = select_process,
.init = select_init,
.fini = select_fini,
@ -68,7 +66,7 @@ static int
select_add(struct event *ev)
{
assert(ev->fd <= FD_SETSIZE);
assert(ev->fd < FD_SETSIZE);
switch (ev->rdwr) {
case EVENT_READ:
@ -77,10 +75,6 @@ select_add(struct event *ev)
case EVENT_WRITE:
FD_SET(ev->fd, &master_write_fd_set);
break;
case EVENT_RDWR:
FD_SET(ev->fd, &master_read_fd_set);
FD_SET(ev->fd, &master_write_fd_set);
break;
}
if (max_fd != -1 && max_fd < ev->fd)
@ -89,14 +83,16 @@ select_add(struct event *ev)
events[nevents] = ev;
ev->index = nevents++;
assert(nevents < FD_SETSIZE);
return (0);
}
static int
select_del(struct event *ev)
select_del(struct event *ev, int flags)
{
assert(ev->fd <= FD_SETSIZE);
assert(ev->fd < FD_SETSIZE);
switch (ev->rdwr) {
case EVENT_READ:
@ -105,10 +101,6 @@ select_del(struct event *ev)
case EVENT_WRITE:
FD_CLR(ev->fd, &master_write_fd_set);
break;
case EVENT_RDWR:
FD_CLR(ev->fd, &master_read_fd_set);
FD_CLR(ev->fd, &master_write_fd_set);
break;
}
if (max_fd == ev->fd)
@ -152,8 +144,7 @@ select_process(u_long msec)
if (ready == -1) {
if (errno == EINTR)
return (errno);
DPRINTF(E_ERROR, L_GENERAL, "select(all): %s\n", strerror(errno));
DPRINTF(E_FATAL, L_GENERAL, "Failed to select open sockets. EXITING\n");
DPRINTF(E_FATAL, L_GENERAL, "select(): %s. EXITING\n", strerror(errno));
}
if (ready == 0)
@ -171,11 +162,6 @@ select_process(u_long msec)
if (FD_ISSET(ev->fd, &work_write_fd_set))
ev->process(ev);
break;
case EVENT_RDWR:
if (FD_ISSET(ev->fd, &work_read_fd_set) ||
FD_ISSET(ev->fd, &work_write_fd_set))
ev->process(ev);
break;
}
}

View File

@ -365,14 +365,14 @@ static void upnp_event_send(struct upnp_event_notify * obj)
if(i<0) {
DPRINTF(E_WARN, L_HTTP, "%s: send(): %s\n", "upnp_event_send", strerror(errno));
obj->state = EError;
event_module.del(&obj->ev);
event_module.del(&obj->ev, 0);
return;
}
obj->sent += i;
}
if(obj->sent == obj->tosend) {
obj->state = EWaitingForResponse;
event_module.del(&obj->ev);
event_module.del(&obj->ev, 0);
obj->ev.rdwr = EVENT_READ;
event_module.add(&obj->ev);
}
@ -385,13 +385,13 @@ static void upnp_event_recv(struct upnp_event_notify * obj)
if(n<0) {
DPRINTF(E_ERROR, L_HTTP, "%s: recv(): %s\n", "upnp_event_recv", strerror(errno));
obj->state = EError;
event_module.del(&obj->ev);
event_module.del(&obj->ev, 0);
return;
}
DPRINTF(E_DEBUG, L_HTTP, "%s: (%dbytes) %.*s\n", "upnp_event_recv",
n, n, obj->buffer);
obj->state = EFinished;
event_module.del(&obj->ev);
event_module.del(&obj->ev, 0);
if(obj->sub)
{
obj->sub->seq++;

View File

@ -123,7 +123,7 @@ void
CloseSocket_upnphttp(struct upnphttp * h)
{
event_module.del(&h->ev);
event_module.del(&h->ev, EV_FLAG_CLOSING);
if(close(h->ev.fd) < 0)
{
DPRINTF(E_ERROR, L_HTTP, "CloseSocket_upnphttp: close(%d): %s\n", h->ev.fd, strerror(errno));