Support for pluggable event modules, stage 1. Take out select() loop and associated code into a separate module select.c. Detailed list of changes down below.

The idea is taken from the nginx web server, but much simplified and
almost no copypaste left. This will allow minidlna to use different
event dispatcher APIs, which would be defined at compile time.

My personal goal is to convert minidlna to kqueue(2) on FreeBSD. This
would later allow for kqueue based directory change notification, which
won't conflict with select(2) like the current patch does.
Other platforms will also benefit from the pluggability of the event
system, Linux can switch to epoll(2) or at least to poll(2).

Detailed list of changes:

* event.h [New]
  Our internal API to unify different event dispatch systems.
* select.c [New]
  Much simplified version of nginx's ngx_select_module.c.
* minidlna.c
  - Split out listen socket event processing into separate function
    ProcessListen(), which matches event_process_t type.
  - Create and initialize struct event for the monitor socket, SSDP
    socket, HTTP socket and beacon socket.
  - Simplify and make more precise timeout calculation using
    helper timeval functions from utils.c. Treat gettimeofday() error
    as a fatal event.
  - Rip out all stuff related to select(2). Just call event_module.process().
* upnpevents.c
  - Embed struct event into upnp_event_notify.
  - Merge upnp_event_create_notify() with upnp_event_notify_connect().
    Start connecting immediately after socket creation. Garbage collect
    now useless ECreated state.
  - Make upnp_event_process_notify() of event_process_t type, and use it
    as process callback for upnp_event_notify event.
  - Looks like we always create upnp_event_notify with existing subscriber,
    and never clear it later. Remove checks for obj->sub and assert that it
    is never NULL. Simplifies things.
  - When switching obj state, add/del it to event dispatcher accrodingly.
  - Garbage collect upnpevents_selectfds().
  - Garbage collect select(2) related stuff from upnpevents_processfds().
    Rename function to upnpevents_gc(), since the remaining functionality
    is garbage collecting, not file descriptor processing.
    Actually, this can be simplified even more. We can safely close sockets
    and free objects immediately, eliminating need for upnpevents_gc(). But
    this change would be beyond scope of this commit.
* upnphttp.c, upnphttp.h
  Embed struct event into struct upnphttp. Adjust Process_upnphttp() to match
  event_process_t type. Add/del to event dispatcher once creating/closing a
  socket.
* minissdp.c, minissdp.h
  Make ProcessSSDPRequest() of event_process_t type.
* getifaddr.c, getifaddr.h
  Make ProcessMonitorEvent() of event_process_t type.
This commit is contained in:
Gleb Smirnoff
2017-12-27 15:08:40 -08:00
committed by Justin Maggard
parent 04e243c85c
commit f9a78d598e
14 changed files with 419 additions and 277 deletions

View File

@ -77,6 +77,7 @@
#include <libintl.h>
#endif
#include "event.h"
#include "upnpglobalvars.h"
#include "sql.h"
#include "upnphttp.h"
@ -101,7 +102,9 @@
# warning "Your SQLite3 library appears to be too old! Please use 3.5.1 or newer."
# define sqlite3_threadsafe() 0
#endif
static LIST_HEAD(httplisthead, upnphttp) upnphttphead;
/* OpenAndConfHTTPSocket() :
* setup the socket used to handle incoming HTTP connections. */
static int
@ -146,6 +149,46 @@ OpenAndConfHTTPSocket(unsigned short port)
return s;
}
/* ProcessListen() :
* accept incoming HTTP connection. */
static void
ProcessListen(struct event *ev)
{
int shttp;
socklen_t clientnamelen;
struct sockaddr_in clientname;
clientnamelen = sizeof(struct sockaddr_in);
shttp = accept(ev->fd, (struct sockaddr *)&clientname, &clientnamelen);
if (shttp<0)
{
DPRINTF(E_ERROR, L_GENERAL, "accept(http): %s\n", strerror(errno));
}
else
{
struct upnphttp * tmp = 0;
DPRINTF(E_DEBUG, L_GENERAL, "HTTP connection from %s:%d\n",
inet_ntoa(clientname.sin_addr),
ntohs(clientname.sin_port) );
/*if (fcntl(shttp, F_SETFL, O_NONBLOCK) < 0) {
DPRINTF(E_ERROR, L_GENERAL, "fcntl F_SETFL, O_NONBLOCK\n");
}*/
/* Create a new upnphttp object and add it to
* the active upnphttp object list */
tmp = New_upnphttp(shttp);
if (tmp)
{
tmp->clientaddr = clientname.sin_addr;
LIST_INSERT_HEAD(&upnphttphead, tmp, entries);
}
else
{
DPRINTF(E_ERROR, L_GENERAL, "New_upnphttp() failed\n");
close(shttp);
}
}
}
/* Handler for the SIGTERM signal (kill)
* SIGINT is also handled */
static void
@ -1043,6 +1086,8 @@ init(int argc, char **argv)
return 1;
}
event_module.init();
return 0;
}
@ -1054,22 +1099,21 @@ main(int argc, char **argv)
int ret, i;
int shttpl = -1;
int smonitor = -1;
LIST_HEAD(httplisthead, upnphttp) upnphttphead;
struct upnphttp * e = 0;
struct upnphttp * next;
fd_set readset; /* for select() */
fd_set writeset;
struct timeval timeout, timeofday, lastnotifytime = {0, 0};
struct timeval tv, timeofday, lastnotifytime = {0, 0};
time_t lastupdatetime = 0, lastdbtime = 0;
int max_fd = -1;
u_long timeout; /* in milliseconds */
int last_changecnt = 0;
pid_t scanner_pid = 0;
pthread_t inotify_thread = 0;
struct event ssdpev, httpev, monev;
#ifdef TIVO_SUPPORT
uint8_t beacon_interval = 5;
int sbeacon = -1;
struct sockaddr_in tivo_bcast;
struct timeval lastbeacontime = {0, 0};
struct event beaconev;
#endif
for (i = 0; i < L_MAX; i++)
@ -1108,6 +1152,11 @@ main(int argc, char **argv)
}
#endif
smonitor = OpenAndConfMonitorSocket();
if (smonitor > 0)
{
monev = (struct event ){ .fd = smonitor, .rdwr = EVENT_READ, .process = ProcessMonitorEvent };
event_module.add(&monev);
}
sssdp = OpenAndConfSSDPReceiveSocket();
if (sssdp < 0)
@ -1117,11 +1166,19 @@ main(int argc, char **argv)
if (SubmitServicesToMiniSSDPD(lan_addr[0].str, runtime_vars.port) < 0)
DPRINTF(E_FATAL, L_GENERAL, "Failed to connect to MiniSSDPd. EXITING");
}
else
{
ssdpev = (struct event ){ .fd = sssdp, .rdwr = EVENT_READ, .process = ProcessSSDPRequest };
event_module.add(&ssdpev);
}
/* open socket for HTTP connections. */
shttpl = OpenAndConfHTTPSocket(runtime_vars.port);
if (shttpl < 0)
DPRINTF(E_FATAL, L_GENERAL, "Failed to open socket for HTTP. EXITING\n");
DPRINTF(E_WARN, L_GENERAL, "HTTP listening on port %d\n", runtime_vars.port);
httpev = (struct event ){ .fd = shttpl, .rdwr = EVENT_READ, .process = ProcessListen };
event_module.add(&httpev);
#ifdef TIVO_SUPPORT
if (GETFLAG(TIVO_MASK))
@ -1142,6 +1199,8 @@ main(int argc, char **argv)
if(sbeacon < 0)
DPRINTF(E_FATAL, L_GENERAL, "Failed to open sockets for sending Tivo beacon notify "
"messages. EXITING\n");
beaconev = { .fd = sbeacon, .rdwr = EVENT_READ, .process = ProcessTiVoBeacon };
event_module.add(&beaconev);
tivo_bcast.sin_family = AF_INET;
tivo_bcast.sin_addr.s_addr = htonl(getBcastAddress());
tivo_bcast.sin_port = htons(2190);
@ -1155,64 +1214,58 @@ main(int argc, char **argv)
/* main loop */
while (!quitting)
{
if (gettimeofday(&timeofday, 0) < 0)
DPRINTF(E_FATAL, L_GENERAL, "gettimeofday(): %s\n", strerror(errno));
/* Check if we need to send SSDP NOTIFY messages and do it if
* needed */
if (gettimeofday(&timeofday, 0) < 0)
tv = lastnotifytime;
tv.tv_sec += runtime_vars.notify_interval;
if (timevalcmp(&timeofday, &tv, >=))
{
DPRINTF(E_ERROR, L_GENERAL, "gettimeofday(): %s\n", strerror(errno));
timeout.tv_sec = runtime_vars.notify_interval;
timeout.tv_usec = 0;
DPRINTF(E_DEBUG, L_SSDP, "Sending SSDP notifies\n");
for (i = 0; i < n_lan_addr; i++)
{
SendSSDPNotifies(lan_addr[i].snotify, lan_addr[i].str,
runtime_vars.port, runtime_vars.notify_interval);
}
lastnotifytime = timeofday;
timeout = runtime_vars.notify_interval * 1000;
}
else
{
/* the comparison is not very precise but who cares ? */
if (timeofday.tv_sec >= (lastnotifytime.tv_sec + runtime_vars.notify_interval))
timevalsub(&tv, &timeofday);
timeout = tv.tv_sec * 1000 + tv.tv_usec / 1000;
}
#ifdef TIVO_SUPPORT
if (sbeacon >= 0)
{
u_long beacontimeout;
tv = lastbeacontime;
tv.tv_sec += beacon_interval;
if (timevalcmp(&timeofday, &tv, >=))
{
DPRINTF(E_DEBUG, L_SSDP, "Sending SSDP notifies\n");
for (i = 0; i < n_lan_addr; i++)
{
SendSSDPNotifies(lan_addr[i].snotify, lan_addr[i].str,
runtime_vars.port, runtime_vars.notify_interval);
}
memcpy(&lastnotifytime, &timeofday, sizeof(struct timeval));
timeout.tv_sec = runtime_vars.notify_interval;
timeout.tv_usec = 0;
sendBeaconMessage(sbeacon, &tivo_bcast, sizeof(struct sockaddr_in), 1);
lastbeacontime = timeofday;
beacontimeout = beacon_interval * 1000;
if (timeout > beacon_interval * 1000)
timeout = beacon_interval * 1000;
/* Beacons should be sent every 5 seconds or
* so for the first minute, then every minute
* or so thereafter. */
if (beacon_interval == 5 && (timeofday.tv_sec - startup_time) > 60)
beacon_interval = 60;
}
else
{
timeout.tv_sec = lastnotifytime.tv_sec + runtime_vars.notify_interval
- timeofday.tv_sec;
if (timeofday.tv_usec > lastnotifytime.tv_usec)
{
timeout.tv_usec = 1000000 + lastnotifytime.tv_usec
- timeofday.tv_usec;
timeout.tv_sec--;
}
else
timeout.tv_usec = lastnotifytime.tv_usec - timeofday.tv_usec;
timevalsub(&tv, &timeofday);
beacontimeout = tv.tv_sec * 1000 +
tv.tv_usec / 1000;
}
#ifdef TIVO_SUPPORT
if (sbeacon >= 0)
{
if (timeofday.tv_sec >= (lastbeacontime.tv_sec + beacon_interval))
{
sendBeaconMessage(sbeacon, &tivo_bcast, sizeof(struct sockaddr_in), 1);
memcpy(&lastbeacontime, &timeofday, sizeof(struct timeval));
if (timeout.tv_sec > beacon_interval)
{
timeout.tv_sec = beacon_interval;
timeout.tv_usec = 0;
}
/* Beacons should be sent every 5 seconds or so for the first minute,
* then every minute or so thereafter. */
if (beacon_interval == 5 && (timeofday.tv_sec - startup_time) > 60)
beacon_interval = 60;
}
else if (timeout.tv_sec > (lastbeacontime.tv_sec + beacon_interval + 1 - timeofday.tv_sec))
timeout.tv_sec = lastbeacontime.tv_sec + beacon_interval - timeofday.tv_sec;
}
#endif
if (timeout > beacontimeout)
timeout = beacontimeout;
}
#endif
if (GETFLAG(SCANNING_MASK))
{
@ -1224,75 +1277,16 @@ main(int argc, char **argv)
}
}
/* select open sockets (SSDP, HTTP listen, and all HTTP soap sockets) */
FD_ZERO(&readset);
event_module.process(timeout);
if (quitting)
goto shutdown;
if (sssdp >= 0)
{
FD_SET(sssdp, &readset);
max_fd = MAX(max_fd, sssdp);
}
if (shttpl >= 0)
{
FD_SET(shttpl, &readset);
max_fd = MAX(max_fd, shttpl);
}
#ifdef TIVO_SUPPORT
if (sbeacon >= 0)
{
FD_SET(sbeacon, &readset);
max_fd = MAX(max_fd, sbeacon);
}
#endif
if (smonitor >= 0)
{
FD_SET(smonitor, &readset);
max_fd = MAX(max_fd, smonitor);
}
upnpevents_gc();
i = 0; /* active HTTP connections count */
for (e = upnphttphead.lh_first; e != NULL; e = e->entries.le_next)
{
if ((e->socket >= 0) && (e->state <= 2))
{
FD_SET(e->socket, &readset);
max_fd = MAX(max_fd, e->socket);
i++;
}
}
FD_ZERO(&writeset);
upnpevents_selectfds(&readset, &writeset, &max_fd);
ret = select(max_fd+1, &readset, &writeset, 0, &timeout);
if (ret < 0)
{
if(quitting) goto shutdown;
if(errno == EINTR) continue;
DPRINTF(E_ERROR, L_GENERAL, "select(all): %s\n", strerror(errno));
DPRINTF(E_FATAL, L_GENERAL, "Failed to select open sockets. EXITING\n");
}
upnpevents_processfds(&readset, &writeset);
/* process SSDP packets */
if (sssdp >= 0 && FD_ISSET(sssdp, &readset))
{
/*DPRINTF(E_DEBUG, L_GENERAL, "Received SSDP Packet\n");*/
ProcessSSDPRequest(sssdp, (unsigned short)runtime_vars.port);
}
#ifdef TIVO_SUPPORT
if (sbeacon >= 0 && FD_ISSET(sbeacon, &readset))
{
/*DPRINTF(E_DEBUG, L_GENERAL, "Received UDP Packet\n");*/
ProcessTiVoBeacon(sbeacon);
}
#endif
if (smonitor >= 0 && FD_ISSET(smonitor, &readset))
{
ProcessMonitorEvent(smonitor);
}
/* increment SystemUpdateID if the content database has changed,
* and if there is an active HTTP connection, at most once every 2 seconds */
if (i && (timeofday.tv_sec >= (lastupdatetime + 2)))
if (!LIST_EMPTY(&upnphttphead) &&
(timeofday.tv_sec >= (lastupdatetime + 2)))
{
if (GETFLAG(SCANNING_MASK))
{
@ -1311,48 +1305,6 @@ main(int argc, char **argv)
lastupdatetime = timeofday.tv_sec;
}
}
/* process active HTTP connections */
for (e = upnphttphead.lh_first; e != NULL; e = e->entries.le_next)
{
if ((e->socket >= 0) && (e->state <= 2) && (FD_ISSET(e->socket, &readset)))
Process_upnphttp(e);
}
/* process incoming HTTP connections */
if (shttpl >= 0 && FD_ISSET(shttpl, &readset))
{
int shttp;
socklen_t clientnamelen;
struct sockaddr_in clientname;
clientnamelen = sizeof(struct sockaddr_in);
shttp = accept(shttpl, (struct sockaddr *)&clientname, &clientnamelen);
if (shttp<0)
{
DPRINTF(E_ERROR, L_GENERAL, "accept(http): %s\n", strerror(errno));
}
else
{
struct upnphttp * tmp = 0;
DPRINTF(E_DEBUG, L_GENERAL, "HTTP connection from %s:%d\n",
inet_ntoa(clientname.sin_addr),
ntohs(clientname.sin_port) );
/*if (fcntl(shttp, F_SETFL, O_NONBLOCK) < 0) {
DPRINTF(E_ERROR, L_GENERAL, "fcntl F_SETFL, O_NONBLOCK\n");
}*/
/* Create a new upnphttp object and add it to
* the active upnphttp object list */
tmp = New_upnphttp(shttp);
if (tmp)
{
tmp->clientaddr = clientname.sin_addr;
LIST_INSERT_HEAD(&upnphttphead, tmp, entries);
}
else
{
DPRINTF(E_ERROR, L_GENERAL, "New_upnphttp() failed\n");
close(shttp);
}
}
}
/* delete finished HTTP connections */
for (e = upnphttphead.lh_first; e != NULL; e = next)
{