diff --git a/Makefile.am b/Makefile.am index 0b68e47..c267996 100644 --- a/Makefile.am +++ b/Makefile.am @@ -28,7 +28,8 @@ minidlnad_SOURCES = minidlna.c upnphttp.c upnpdescgen.c upnpsoap.c \ sql.c utils.c metadata.c scanner.c monitor.c \ tivo_utils.c tivo_beacon.c tivo_commands.c \ playlist.c image_utils.c albumart.c log.c \ - containers.c avahi.c tagutils/tagutils.c + containers.c avahi.c tagutils/tagutils.c \ + select.c if HAVE_VORBISFILE vorbislibs = -lvorbis -logg diff --git a/event.h b/event.h new file mode 100644 index 0000000..77ae7da --- /dev/null +++ b/event.h @@ -0,0 +1,33 @@ +struct event; + +typedef enum { + EVENT_READ, + EVENT_WRITE, + EVENT_RDWR, +} event_t; + +typedef void event_process_t(struct event *); + +struct event { + int fd; + int index; + event_t rdwr; + event_process_t *process; + void *data; +}; + +typedef int event_module_add_t(struct event *); +typedef int event_module_init_t(void); +typedef void event_module_fini_t(void); +typedef int event_module_process_t(u_long); +struct event_module { + event_module_add_t *add; + event_module_add_t *del; + event_module_add_t *enable; + event_module_add_t *disable; + event_module_process_t *process; + event_module_init_t *init; + event_module_fini_t *fini; +}; + +extern struct event_module event_module; diff --git a/getifaddr.c b/getifaddr.c index f0d3af3..5f95031 100644 --- a/getifaddr.c +++ b/getifaddr.c @@ -43,6 +43,7 @@ #endif #include "config.h" +#include "event.h" #if HAVE_GETIFADDRS # include # ifdef __linux__ @@ -377,9 +378,10 @@ OpenAndConfMonitorSocket(void) } void -ProcessMonitorEvent(int s) +ProcessMonitorEvent(struct event *ev) { #ifdef HAVE_NETLINK + int s = ev->fd; int len; char buf[4096]; struct nlmsghdr *nlh; diff --git a/getifaddr.h b/getifaddr.h index a2447f7..5631360 100644 --- a/getifaddr.h +++ b/getifaddr.h @@ -43,7 +43,7 @@ int get_remote_mac(struct in_addr ip_addr, unsigned char *mac); void reload_ifaces(int notify); int OpenAndConfMonitorSocket(); -void ProcessMonitorEvent(int s); +void ProcessMonitorEvent(struct event *); #endif diff --git a/minidlna.c b/minidlna.c index ef45294..d233975 100644 --- a/minidlna.c +++ b/minidlna.c @@ -77,6 +77,7 @@ #include #endif +#include "event.h" #include "upnpglobalvars.h" #include "sql.h" #include "upnphttp.h" @@ -101,7 +102,9 @@ # warning "Your SQLite3 library appears to be too old! Please use 3.5.1 or newer." # define sqlite3_threadsafe() 0 #endif - + +static LIST_HEAD(httplisthead, upnphttp) upnphttphead; + /* OpenAndConfHTTPSocket() : * setup the socket used to handle incoming HTTP connections. */ static int @@ -146,6 +149,46 @@ OpenAndConfHTTPSocket(unsigned short port) return s; } +/* ProcessListen() : + * accept incoming HTTP connection. */ +static void +ProcessListen(struct event *ev) +{ + int shttp; + socklen_t clientnamelen; + struct sockaddr_in clientname; + clientnamelen = sizeof(struct sockaddr_in); + + shttp = accept(ev->fd, (struct sockaddr *)&clientname, &clientnamelen); + if (shttp<0) + { + DPRINTF(E_ERROR, L_GENERAL, "accept(http): %s\n", strerror(errno)); + } + else + { + struct upnphttp * tmp = 0; + DPRINTF(E_DEBUG, L_GENERAL, "HTTP connection from %s:%d\n", + inet_ntoa(clientname.sin_addr), + ntohs(clientname.sin_port) ); + /*if (fcntl(shttp, F_SETFL, O_NONBLOCK) < 0) { + DPRINTF(E_ERROR, L_GENERAL, "fcntl F_SETFL, O_NONBLOCK\n"); + }*/ + /* Create a new upnphttp object and add it to + * the active upnphttp object list */ + tmp = New_upnphttp(shttp); + if (tmp) + { + tmp->clientaddr = clientname.sin_addr; + LIST_INSERT_HEAD(&upnphttphead, tmp, entries); + } + else + { + DPRINTF(E_ERROR, L_GENERAL, "New_upnphttp() failed\n"); + close(shttp); + } + } +} + /* Handler for the SIGTERM signal (kill) * SIGINT is also handled */ static void @@ -1043,6 +1086,8 @@ init(int argc, char **argv) return 1; } + event_module.init(); + return 0; } @@ -1054,22 +1099,21 @@ main(int argc, char **argv) int ret, i; int shttpl = -1; int smonitor = -1; - LIST_HEAD(httplisthead, upnphttp) upnphttphead; struct upnphttp * e = 0; struct upnphttp * next; - fd_set readset; /* for select() */ - fd_set writeset; - struct timeval timeout, timeofday, lastnotifytime = {0, 0}; + struct timeval tv, timeofday, lastnotifytime = {0, 0}; time_t lastupdatetime = 0, lastdbtime = 0; - int max_fd = -1; + u_long timeout; /* in milliseconds */ int last_changecnt = 0; pid_t scanner_pid = 0; pthread_t inotify_thread = 0; + struct event ssdpev, httpev, monev; #ifdef TIVO_SUPPORT uint8_t beacon_interval = 5; int sbeacon = -1; struct sockaddr_in tivo_bcast; struct timeval lastbeacontime = {0, 0}; + struct event beaconev; #endif for (i = 0; i < L_MAX; i++) @@ -1108,6 +1152,11 @@ main(int argc, char **argv) } #endif smonitor = OpenAndConfMonitorSocket(); + if (smonitor > 0) + { + monev = (struct event ){ .fd = smonitor, .rdwr = EVENT_READ, .process = ProcessMonitorEvent }; + event_module.add(&monev); + } sssdp = OpenAndConfSSDPReceiveSocket(); if (sssdp < 0) @@ -1117,11 +1166,19 @@ main(int argc, char **argv) if (SubmitServicesToMiniSSDPD(lan_addr[0].str, runtime_vars.port) < 0) DPRINTF(E_FATAL, L_GENERAL, "Failed to connect to MiniSSDPd. EXITING"); } + else + { + ssdpev = (struct event ){ .fd = sssdp, .rdwr = EVENT_READ, .process = ProcessSSDPRequest }; + event_module.add(&ssdpev); + } + /* open socket for HTTP connections. */ shttpl = OpenAndConfHTTPSocket(runtime_vars.port); if (shttpl < 0) DPRINTF(E_FATAL, L_GENERAL, "Failed to open socket for HTTP. EXITING\n"); DPRINTF(E_WARN, L_GENERAL, "HTTP listening on port %d\n", runtime_vars.port); + httpev = (struct event ){ .fd = shttpl, .rdwr = EVENT_READ, .process = ProcessListen }; + event_module.add(&httpev); #ifdef TIVO_SUPPORT if (GETFLAG(TIVO_MASK)) @@ -1142,6 +1199,8 @@ main(int argc, char **argv) if(sbeacon < 0) DPRINTF(E_FATAL, L_GENERAL, "Failed to open sockets for sending Tivo beacon notify " "messages. EXITING\n"); + beaconev = { .fd = sbeacon, .rdwr = EVENT_READ, .process = ProcessTiVoBeacon }; + event_module.add(&beaconev); tivo_bcast.sin_family = AF_INET; tivo_bcast.sin_addr.s_addr = htonl(getBcastAddress()); tivo_bcast.sin_port = htons(2190); @@ -1155,64 +1214,58 @@ main(int argc, char **argv) /* main loop */ while (!quitting) { + if (gettimeofday(&timeofday, 0) < 0) + DPRINTF(E_FATAL, L_GENERAL, "gettimeofday(): %s\n", strerror(errno)); /* Check if we need to send SSDP NOTIFY messages and do it if * needed */ - if (gettimeofday(&timeofday, 0) < 0) + tv = lastnotifytime; + tv.tv_sec += runtime_vars.notify_interval; + if (timevalcmp(&timeofday, &tv, >=)) { - DPRINTF(E_ERROR, L_GENERAL, "gettimeofday(): %s\n", strerror(errno)); - timeout.tv_sec = runtime_vars.notify_interval; - timeout.tv_usec = 0; + DPRINTF(E_DEBUG, L_SSDP, "Sending SSDP notifies\n"); + for (i = 0; i < n_lan_addr; i++) + { + SendSSDPNotifies(lan_addr[i].snotify, lan_addr[i].str, + runtime_vars.port, runtime_vars.notify_interval); + } + lastnotifytime = timeofday; + timeout = runtime_vars.notify_interval * 1000; } else { - /* the comparison is not very precise but who cares ? */ - if (timeofday.tv_sec >= (lastnotifytime.tv_sec + runtime_vars.notify_interval)) + timevalsub(&tv, &timeofday); + timeout = tv.tv_sec * 1000 + tv.tv_usec / 1000; + } +#ifdef TIVO_SUPPORT + if (sbeacon >= 0) + { + u_long beacontimeout; + + tv = lastbeacontime; + tv.tv_sec += beacon_interval; + if (timevalcmp(&timeofday, &tv, >=)) { - DPRINTF(E_DEBUG, L_SSDP, "Sending SSDP notifies\n"); - for (i = 0; i < n_lan_addr; i++) - { - SendSSDPNotifies(lan_addr[i].snotify, lan_addr[i].str, - runtime_vars.port, runtime_vars.notify_interval); - } - memcpy(&lastnotifytime, &timeofday, sizeof(struct timeval)); - timeout.tv_sec = runtime_vars.notify_interval; - timeout.tv_usec = 0; + sendBeaconMessage(sbeacon, &tivo_bcast, sizeof(struct sockaddr_in), 1); + lastbeacontime = timeofday; + beacontimeout = beacon_interval * 1000; + if (timeout > beacon_interval * 1000) + timeout = beacon_interval * 1000; + /* Beacons should be sent every 5 seconds or + * so for the first minute, then every minute + * or so thereafter. */ + if (beacon_interval == 5 && (timeofday.tv_sec - startup_time) > 60) + beacon_interval = 60; } else { - timeout.tv_sec = lastnotifytime.tv_sec + runtime_vars.notify_interval - - timeofday.tv_sec; - if (timeofday.tv_usec > lastnotifytime.tv_usec) - { - timeout.tv_usec = 1000000 + lastnotifytime.tv_usec - - timeofday.tv_usec; - timeout.tv_sec--; - } - else - timeout.tv_usec = lastnotifytime.tv_usec - timeofday.tv_usec; + timevalsub(&tv, &timeofday); + beacontimeout = tv.tv_sec * 1000 + + tv.tv_usec / 1000; } -#ifdef TIVO_SUPPORT - if (sbeacon >= 0) - { - if (timeofday.tv_sec >= (lastbeacontime.tv_sec + beacon_interval)) - { - sendBeaconMessage(sbeacon, &tivo_bcast, sizeof(struct sockaddr_in), 1); - memcpy(&lastbeacontime, &timeofday, sizeof(struct timeval)); - if (timeout.tv_sec > beacon_interval) - { - timeout.tv_sec = beacon_interval; - timeout.tv_usec = 0; - } - /* Beacons should be sent every 5 seconds or so for the first minute, - * then every minute or so thereafter. */ - if (beacon_interval == 5 && (timeofday.tv_sec - startup_time) > 60) - beacon_interval = 60; - } - else if (timeout.tv_sec > (lastbeacontime.tv_sec + beacon_interval + 1 - timeofday.tv_sec)) - timeout.tv_sec = lastbeacontime.tv_sec + beacon_interval - timeofday.tv_sec; - } -#endif + if (timeout > beacontimeout) + timeout = beacontimeout; } +#endif if (GETFLAG(SCANNING_MASK)) { @@ -1224,75 +1277,16 @@ main(int argc, char **argv) } } - /* select open sockets (SSDP, HTTP listen, and all HTTP soap sockets) */ - FD_ZERO(&readset); + event_module.process(timeout); + if (quitting) + goto shutdown; - if (sssdp >= 0) - { - FD_SET(sssdp, &readset); - max_fd = MAX(max_fd, sssdp); - } - - if (shttpl >= 0) - { - FD_SET(shttpl, &readset); - max_fd = MAX(max_fd, shttpl); - } -#ifdef TIVO_SUPPORT - if (sbeacon >= 0) - { - FD_SET(sbeacon, &readset); - max_fd = MAX(max_fd, sbeacon); - } -#endif - if (smonitor >= 0) - { - FD_SET(smonitor, &readset); - max_fd = MAX(max_fd, smonitor); - } + upnpevents_gc(); - i = 0; /* active HTTP connections count */ - for (e = upnphttphead.lh_first; e != NULL; e = e->entries.le_next) - { - if ((e->socket >= 0) && (e->state <= 2)) - { - FD_SET(e->socket, &readset); - max_fd = MAX(max_fd, e->socket); - i++; - } - } - FD_ZERO(&writeset); - upnpevents_selectfds(&readset, &writeset, &max_fd); - - ret = select(max_fd+1, &readset, &writeset, 0, &timeout); - if (ret < 0) - { - if(quitting) goto shutdown; - if(errno == EINTR) continue; - DPRINTF(E_ERROR, L_GENERAL, "select(all): %s\n", strerror(errno)); - DPRINTF(E_FATAL, L_GENERAL, "Failed to select open sockets. EXITING\n"); - } - upnpevents_processfds(&readset, &writeset); - /* process SSDP packets */ - if (sssdp >= 0 && FD_ISSET(sssdp, &readset)) - { - /*DPRINTF(E_DEBUG, L_GENERAL, "Received SSDP Packet\n");*/ - ProcessSSDPRequest(sssdp, (unsigned short)runtime_vars.port); - } -#ifdef TIVO_SUPPORT - if (sbeacon >= 0 && FD_ISSET(sbeacon, &readset)) - { - /*DPRINTF(E_DEBUG, L_GENERAL, "Received UDP Packet\n");*/ - ProcessTiVoBeacon(sbeacon); - } -#endif - if (smonitor >= 0 && FD_ISSET(smonitor, &readset)) - { - ProcessMonitorEvent(smonitor); - } /* increment SystemUpdateID if the content database has changed, * and if there is an active HTTP connection, at most once every 2 seconds */ - if (i && (timeofday.tv_sec >= (lastupdatetime + 2))) + if (!LIST_EMPTY(&upnphttphead) && + (timeofday.tv_sec >= (lastupdatetime + 2))) { if (GETFLAG(SCANNING_MASK)) { @@ -1311,48 +1305,6 @@ main(int argc, char **argv) lastupdatetime = timeofday.tv_sec; } } - /* process active HTTP connections */ - for (e = upnphttphead.lh_first; e != NULL; e = e->entries.le_next) - { - if ((e->socket >= 0) && (e->state <= 2) && (FD_ISSET(e->socket, &readset))) - Process_upnphttp(e); - } - /* process incoming HTTP connections */ - if (shttpl >= 0 && FD_ISSET(shttpl, &readset)) - { - int shttp; - socklen_t clientnamelen; - struct sockaddr_in clientname; - clientnamelen = sizeof(struct sockaddr_in); - shttp = accept(shttpl, (struct sockaddr *)&clientname, &clientnamelen); - if (shttp<0) - { - DPRINTF(E_ERROR, L_GENERAL, "accept(http): %s\n", strerror(errno)); - } - else - { - struct upnphttp * tmp = 0; - DPRINTF(E_DEBUG, L_GENERAL, "HTTP connection from %s:%d\n", - inet_ntoa(clientname.sin_addr), - ntohs(clientname.sin_port) ); - /*if (fcntl(shttp, F_SETFL, O_NONBLOCK) < 0) { - DPRINTF(E_ERROR, L_GENERAL, "fcntl F_SETFL, O_NONBLOCK\n"); - }*/ - /* Create a new upnphttp object and add it to - * the active upnphttp object list */ - tmp = New_upnphttp(shttp); - if (tmp) - { - tmp->clientaddr = clientname.sin_addr; - LIST_INSERT_HEAD(&upnphttphead, tmp, entries); - } - else - { - DPRINTF(E_ERROR, L_GENERAL, "New_upnphttp() failed\n"); - close(shttp); - } - } - } /* delete finished HTTP connections */ for (e = upnphttphead.lh_first; e != NULL; e = next) { diff --git a/minissdp.c b/minissdp.c index 74eef3c..2c7d037 100644 --- a/minissdp.c +++ b/minissdp.c @@ -42,6 +42,7 @@ #include #include +#include "event.h" #include "minidlnapath.h" #include "upnphttp.h" #include "upnpglobalvars.h" @@ -482,8 +483,9 @@ close: /* ProcessSSDPRequest() * process SSDP M-SEARCH requests and responds to them */ void -ProcessSSDPRequest(int s, unsigned short port) +ProcessSSDPRequest(struct event *ev) { + int s = ev->fd; int n; char bufr[1500]; struct sockaddr_in sendername; @@ -724,8 +726,8 @@ ProcessSSDPRequest(int s, unsigned short port) break; } _usleep(13000, 20000); - SendSSDPResponse(s, sendername, i, - host, port, len_r); + SendSSDPResponse(s, sendername, i, host, + (unsigned short)runtime_vars.port, len_r); return; } /* Responds to request with ST: ssdp:all */ @@ -736,8 +738,9 @@ ProcessSSDPRequest(int s, unsigned short port) for (i=0; known_service_types[i]; i++) { l = strlen(known_service_types[i]); - SendSSDPResponse(s, sendername, i, - host, port, len_r); + SendSSDPResponse(s, sendername, i, host, + (unsigned short)runtime_vars.port, + len_r); } } } diff --git a/minissdp.h b/minissdp.h index f6a5c16..8ac01fb 100644 --- a/minissdp.h +++ b/minissdp.h @@ -35,7 +35,7 @@ int OpenAndConfSSDPNotifySocket(struct lan_addr_s *iface); void SendSSDPNotifies(int s, const char *host, unsigned short port, unsigned int lifetime); -void ProcessSSDPRequest(int s, unsigned short port); +void ProcessSSDPRequest(struct event *ev); int SendSSDPGoodbyes(int s); diff --git a/select.c b/select.c new file mode 100644 index 0000000..ebec886 --- /dev/null +++ b/select.c @@ -0,0 +1,183 @@ +/* + * Copyright (C) Igor Sysoev + * Copyright (C) Nginx, Inc. + */ + +#include +#include +#include +#include +#include +#include +#include + +#include "event.h" +#include "log.h" + +static event_module_init_t select_init; +static event_module_fini_t select_fini; +static event_module_add_t select_add; +static event_module_add_t select_del; +static event_module_process_t select_process; + +static fd_set master_read_fd_set; +static fd_set master_write_fd_set; +static fd_set work_read_fd_set; +static fd_set work_write_fd_set; + +static struct event **events; +static int nevents; +static int max_fd; + +struct event_module event_module = { + .add = select_add, + .del = select_del, + .enable = select_add, + .disable = select_del, + .process = select_process, + .init = select_init, + .fini = select_fini, +}; + +static int +select_init(void) +{ + + events = calloc(FD_SETSIZE, sizeof(struct event *)); + if (events == NULL) + return (ENOMEM); + + FD_ZERO(&master_read_fd_set); + FD_ZERO(&master_write_fd_set); + max_fd = 0; + nevents = 0; + + return (0); +} + + +static void +select_fini(void) +{ + + free(events); + events = NULL; +} + +static int +select_add(struct event *ev) +{ + + assert(ev->fd <= FD_SETSIZE); + + switch (ev->rdwr) { + case EVENT_READ: + FD_SET(ev->fd, &master_read_fd_set); + break; + case EVENT_WRITE: + FD_SET(ev->fd, &master_write_fd_set); + break; + case EVENT_RDWR: + FD_SET(ev->fd, &master_read_fd_set); + FD_SET(ev->fd, &master_write_fd_set); + break; + } + + if (max_fd != -1 && max_fd < ev->fd) + max_fd = ev->fd; + + events[nevents] = ev; + ev->index = nevents++; + + return (0); +} + +static int +select_del(struct event *ev) +{ + + assert(ev->fd <= FD_SETSIZE); + + switch (ev->rdwr) { + case EVENT_READ: + FD_CLR(ev->fd, &master_read_fd_set); + break; + case EVENT_WRITE: + FD_CLR(ev->fd, &master_write_fd_set); + break; + case EVENT_RDWR: + FD_CLR(ev->fd, &master_read_fd_set); + FD_CLR(ev->fd, &master_write_fd_set); + break; + } + + if (max_fd == ev->fd) + max_fd = -1; + + if (ev->index < --nevents) { + struct event *ev0; + + ev0 = events[nevents]; + events[ev->index] = ev0; + ev0->index = ev->index; + } + ev->index = -1; + + return (0); +} + +static int +select_process(u_long msec) +{ + struct timeval tv, *tp; + struct event *ev; + int ready; + + /* Need to rescan for max_fd. */ + if (max_fd == -1) + for (int i = 0; i < nevents; i++) { + if (max_fd < events[i]->fd) + max_fd = events[i]->fd; + } + + tv.tv_sec = (long) (msec / 1000); + tv.tv_usec = (long) ((msec % 1000) * 1000); + tp = &tv; + + work_read_fd_set = master_read_fd_set; + work_write_fd_set = master_write_fd_set; + + ready = select(max_fd + 1, &work_read_fd_set, &work_write_fd_set, NULL, tp); + + if (ready == -1) { + if (errno == EINTR) + return (errno); + DPRINTF(E_ERROR, L_GENERAL, "select(all): %s\n", strerror(errno)); + DPRINTF(E_FATAL, L_GENERAL, "Failed to select open sockets. EXITING\n"); + } + + if (ready == 0) + return (0); + + for (int i = 0; i < nevents; i++) { + ev = events[i]; + + switch (ev->rdwr) { + case EVENT_READ: + if (FD_ISSET(ev->fd, &work_read_fd_set)) + ev->process(ev); + break; + case EVENT_WRITE: + if (FD_ISSET(ev->fd, &work_write_fd_set)) + ev->process(ev); + break; + case EVENT_RDWR: + if (FD_ISSET(ev->fd, &work_read_fd_set) || + FD_ISSET(ev->fd, &work_write_fd_set)) + ev->process(ev); + break; + } + } + + return (0); +} diff --git a/upnpevents.c b/upnpevents.c index 8d19fe1..e673a34 100644 --- a/upnpevents.c +++ b/upnpevents.c @@ -59,9 +59,11 @@ #include #include #include +#include #include #include +#include "event.h" #include "upnpevents.h" #include "minidlnapath.h" #include "upnpglobalvars.h" @@ -82,10 +84,9 @@ struct subscriber { }; struct upnp_event_notify { + struct event ev; LIST_ENTRY(upnp_event_notify) entries; - int s; /* socket */ - enum { ECreated=1, - EConnecting, + enum { EConnecting, ESending, EWaitingForResponse, EFinished, @@ -101,8 +102,8 @@ struct upnp_event_notify { }; /* prototypes */ -static void -upnp_event_create_notify(struct subscriber * sub); +static void upnp_event_create_notify(struct subscriber * sub); +static void upnp_event_process_notify(struct event *ev); /* Subscriber list */ LIST_HEAD(listhead, subscriber) subscriberlist = { NULL }; @@ -224,30 +225,35 @@ upnp_event_var_change_notify(enum subscriber_service_enum service) } } -/* create and add the notify object to the list */ +/* create and add the notify object to the list, start connecting */ static void -upnp_event_create_notify(struct subscriber * sub) +upnp_event_create_notify(struct subscriber *sub) { struct upnp_event_notify * obj; - int flags; + int flags, s, i; + const char *p; + unsigned short port; + struct sockaddr_in addr; + + assert(sub); + obj = calloc(1, sizeof(struct upnp_event_notify)); if(!obj) { DPRINTF(E_ERROR, L_HTTP, "%s: calloc(): %s\n", "upnp_event_create_notify", strerror(errno)); return; } obj->sub = sub; - obj->state = ECreated; - obj->s = socket(PF_INET, SOCK_STREAM, 0); - if(obj->s<0) { + s = socket(PF_INET, SOCK_STREAM, 0); + if(s < 0) { DPRINTF(E_ERROR, L_HTTP, "%s: socket(): %s\n", "upnp_event_create_notify", strerror(errno)); goto error; } - if((flags = fcntl(obj->s, F_GETFL, 0)) < 0) { + if((flags = fcntl(s, F_GETFL, 0)) < 0) { DPRINTF(E_ERROR, L_HTTP, "%s: fcntl(..F_GETFL..): %s\n", "upnp_event_create_notify", strerror(errno)); goto error; } - if(fcntl(obj->s, F_SETFL, flags | O_NONBLOCK) < 0) { + if(fcntl(s, F_SETFL, flags | O_NONBLOCK) < 0) { DPRINTF(E_ERROR, L_HTTP, "%s: fcntl(..F_SETFL..): %s\n", "upnp_event_create_notify", strerror(errno)); goto error; @@ -255,28 +261,9 @@ upnp_event_create_notify(struct subscriber * sub) if(sub) sub->notify = obj; LIST_INSERT_HEAD(¬ifylist, obj, entries); - return; -error: - if(obj->s >= 0) - close(obj->s); - free(obj); -} -static void -upnp_event_notify_connect(struct upnp_event_notify * obj) -{ - int i; - const char * p; - unsigned short port; - struct sockaddr_in addr; - if(!obj) - return; memset(&addr, 0, sizeof(addr)); i = 0; - if(obj->sub == NULL) { - obj->state = EError; - return; - } p = obj->sub->callback; p += 7; /* http:// */ while(*p != '/' && *p != ':' && i < (sizeof(obj->addrstr)-1)) @@ -306,12 +293,23 @@ upnp_event_notify_connect(struct upnp_event_notify * obj) DPRINTF(E_DEBUG, L_HTTP, "%s: '%s' %hu '%s'\n", "upnp_event_notify_connect", obj->addrstr, port, obj->path); obj->state = EConnecting; - if(connect(obj->s, (struct sockaddr *)&addr, sizeof(addr)) < 0) { + if(connect(s, (struct sockaddr *)&addr, sizeof(addr)) < 0) { if(errno != EINPROGRESS && errno != EWOULDBLOCK) { DPRINTF(E_ERROR, L_HTTP, "%s: connect(): %s\n", "upnp_event_notify_connect", strerror(errno)); obj->state = EError; } + } else { + obj->ev = (struct event ){ .fd = s, .rdwr = EVENT_WRITE, + .process = upnp_event_process_notify, .data = obj }; + event_module.add(&obj->ev); } + + return; + +error: + if(s >= 0) + close(s); + free(obj); } static void upnp_event_prepare(struct upnp_event_notify * obj) @@ -331,10 +329,9 @@ static void upnp_event_prepare(struct upnp_event_notify * obj) "%.*s\r\n"; char * xml; int l; - if(obj->sub == NULL) { - obj->state = EError; - return; - } + + assert(obj->sub); + switch(obj->sub->service) { case EContentDirectory: xml = getVarsContentDirectory(&l); @@ -364,30 +361,37 @@ static void upnp_event_send(struct upnp_event_notify * obj) int i; //DEBUG DPRINTF(E_DEBUG, L_HTTP, "Sending UPnP Event:\n%s", obj->buffer+obj->sent); while( obj->sent < obj->tosend ) { - i = send(obj->s, obj->buffer + obj->sent, obj->tosend - obj->sent, 0); + i = send(obj->ev.fd, obj->buffer + obj->sent, obj->tosend - obj->sent, 0); if(i<0) { DPRINTF(E_WARN, L_HTTP, "%s: send(): %s\n", "upnp_event_send", strerror(errno)); obj->state = EError; + event_module.del(&obj->ev); return; } obj->sent += i; } - if(obj->sent == obj->tosend) + if(obj->sent == obj->tosend) { obj->state = EWaitingForResponse; + event_module.del(&obj->ev); + obj->ev.rdwr = EVENT_READ; + event_module.add(&obj->ev); + } } static void upnp_event_recv(struct upnp_event_notify * obj) { int n; - n = recv(obj->s, obj->buffer, obj->buffersize, 0); + n = recv(obj->ev.fd, obj->buffer, obj->buffersize, 0); if(n<0) { DPRINTF(E_ERROR, L_HTTP, "%s: recv(): %s\n", "upnp_event_recv", strerror(errno)); obj->state = EError; + event_module.del(&obj->ev); return; } DPRINTF(E_DEBUG, L_HTTP, "%s: (%dbytes) %.*s\n", "upnp_event_recv", n, n, obj->buffer); obj->state = EFinished; + event_module.del(&obj->ev); if(obj->sub) { obj->sub->seq++; @@ -397,8 +401,10 @@ static void upnp_event_recv(struct upnp_event_notify * obj) } static void -upnp_event_process_notify(struct upnp_event_notify * obj) +upnp_event_process_notify(struct event *ev) { + struct upnp_event_notify *obj = ev->data; + switch(obj->state) { case EConnecting: /* now connected or failed to connect */ @@ -412,66 +418,28 @@ upnp_event_process_notify(struct upnp_event_notify * obj) upnp_event_recv(obj); break; case EFinished: - close(obj->s); - obj->s = -1; + close(obj->ev.fd); + obj->ev.fd = -1; break; default: DPRINTF(E_ERROR, L_HTTP, "upnp_event_process_notify: unknown state\n"); } } -void upnpevents_selectfds(fd_set *readset, fd_set *writeset, int * max_fd) -{ - struct upnp_event_notify * obj; - for(obj = notifylist.lh_first; obj != NULL; obj = obj->entries.le_next) { - DPRINTF(E_DEBUG, L_HTTP, "upnpevents_selectfds: %p %d %d\n", - obj, obj->state, obj->s); - if(obj->s >= 0) { - switch(obj->state) { - case ECreated: - upnp_event_notify_connect(obj); - if(obj->state != EConnecting) - break; - case EConnecting: - case ESending: - FD_SET(obj->s, writeset); - if(obj->s > *max_fd) - *max_fd = obj->s; - break; - case EWaitingForResponse: - FD_SET(obj->s, readset); - if(obj->s > *max_fd) - *max_fd = obj->s; - break; - default: - break; - } - } - } -} - -void upnpevents_processfds(fd_set *readset, fd_set *writeset) +void upnpevents_gc(void) { struct upnp_event_notify * obj; struct upnp_event_notify * next; struct subscriber * sub; struct subscriber * subnext; time_t curtime; - for(obj = notifylist.lh_first; obj != NULL; obj = obj->entries.le_next) { - DPRINTF(E_DEBUG, L_HTTP, "%s: %p %d %d %d %d\n", - "upnpevents_processfds", obj, obj->state, obj->s, - FD_ISSET(obj->s, readset), FD_ISSET(obj->s, writeset)); - if(obj->s >= 0) { - if(FD_ISSET(obj->s, readset) || FD_ISSET(obj->s, writeset)) - upnp_event_process_notify(obj); - } - } + obj = notifylist.lh_first; while(obj != NULL) { next = obj->entries.le_next; if(obj->state == EError || obj->state == EFinished) { - if(obj->s >= 0) { - close(obj->s); + if(obj->ev.fd >= 0) { + close(obj->ev.fd); } if(obj->sub) obj->sub->notify = NULL; diff --git a/upnpevents.h b/upnpevents.h index 5dcd0d9..fa6c104 100644 --- a/upnpevents.h +++ b/upnpevents.h @@ -63,12 +63,10 @@ upnpevents_addSubscriber(const char * eventurl, int upnpevents_removeSubscriber(const char * sid, int sidlen); void upnpevents_removeSubscribers(void); +void upnpevents_gc(void); int renewSubscription(const char * sid, int sidlen, int timeout); -void upnpevents_selectfds(fd_set *readset, fd_set *writeset, int * max_fd); -void upnpevents_processfds(fd_set *readset, fd_set *writeset); - #ifdef USE_MINIUPNPDCTL void write_events_details(int s); #endif diff --git a/upnphttp.c b/upnphttp.c index 16631e2..8b66ab9 100644 --- a/upnphttp.c +++ b/upnphttp.c @@ -64,6 +64,7 @@ #include #include "config.h" +#include "event.h" #include "upnpglobalvars.h" #include "upnphttp.h" #include "upnpdescgen.h" @@ -101,6 +102,7 @@ static void SendResp_caption(struct upnphttp *, char * url); static void SendResp_resizedimg(struct upnphttp *, char * url); static void SendResp_thumbnail(struct upnphttp *, char * url); static void SendResp_dlnafile(struct upnphttp *, char * url); +static void Process_upnphttp(struct event *ev); struct upnphttp * New_upnphttp(int s) @@ -112,18 +114,21 @@ New_upnphttp(int s) if(ret == NULL) return NULL; memset(ret, 0, sizeof(struct upnphttp)); - ret->socket = s; + ret->ev = (struct event ){ .fd = s, .rdwr = EVENT_READ, .process = Process_upnphttp, .data = ret }; + event_module.add(&ret->ev); return ret; } void CloseSocket_upnphttp(struct upnphttp * h) { - if(close(h->socket) < 0) + + event_module.del(&h->ev); + if(close(h->ev.fd) < 0) { - DPRINTF(E_ERROR, L_HTTP, "CloseSocket_upnphttp: close(%d): %s\n", h->socket, strerror(errno)); + DPRINTF(E_ERROR, L_HTTP, "CloseSocket_upnphttp: close(%d): %s\n", h->ev.fd, strerror(errno)); } - h->socket = -1; + h->ev.fd = -1; h->state = 100; } @@ -132,7 +137,7 @@ Delete_upnphttp(struct upnphttp * h) { if(h) { - if(h->socket >= 0) + if(h->ev.fd >= 0) CloseSocket_upnphttp(h); free(h->req_buf); free(h->res_buf); @@ -1039,18 +1044,17 @@ ProcessHttpQuery_upnphttp(struct upnphttp * h) } } - -void -Process_upnphttp(struct upnphttp * h) +static void +Process_upnphttp(struct event *ev) { char buf[2048]; + struct upnphttp *h = ev->data; int n; - if(!h) - return; + switch(h->state) { case 0: - n = recv(h->socket, buf, 2048, 0); + n = recv(h->ev.fd, buf, 2048, 0); if(n<0) { DPRINTF(E_ERROR, L_HTTP, "recv (state0): %s\n", strerror(errno)); @@ -1096,7 +1100,7 @@ Process_upnphttp(struct upnphttp * h) break; case 1: case 2: - n = recv(h->socket, buf, sizeof(buf), 0); + n = recv(h->ev.fd, buf, sizeof(buf), 0); if(n < 0) { DPRINTF(E_ERROR, L_HTTP, "recv (state%d): %s\n", h->state, strerror(errno)); @@ -1223,7 +1227,7 @@ SendResp_upnphttp(struct upnphttp * h) { int n; DPRINTF(E_DEBUG, L_HTTP, "HTTP RESPONSE: %.*s\n", h->res_buflen, h->res_buf); - n = send(h->socket, h->res_buf, h->res_buflen, 0); + n = send(h->ev.fd, h->res_buf, h->res_buflen, 0); if(n<0) { DPRINTF(E_ERROR, L_HTTP, "send(res_buf): %s\n", strerror(errno)); @@ -1241,7 +1245,7 @@ send_data(struct upnphttp * h, char * header, size_t size, int flags) { int n; - n = send(h->socket, header, size, flags); + n = send(h->ev.fd, header, size, flags); if(n<0) { DPRINTF(E_ERROR, L_HTTP, "send(res_buf): %s\n", strerror(errno)); @@ -1275,7 +1279,7 @@ send_file(struct upnphttp * h, int sendfd, off_t offset, off_t end_offset) if( try_sendfile ) { send_size = ( ((end_offset - offset) < MAX_BUFFER_SIZE) ? (end_offset - offset + 1) : MAX_BUFFER_SIZE); - ret = sys_sendfile(h->socket, sendfd, &offset, send_size); + ret = sys_sendfile(h->ev.fd, sendfd, &offset, send_size); if( ret == -1 ) { DPRINTF(E_DEBUG, L_HTTP, "sendfile error :: error no. %d [%s]\n", errno, strerror(errno)); @@ -1305,7 +1309,7 @@ send_file(struct upnphttp * h, int sendfd, off_t offset, off_t end_offset) else break; } - ret = write(h->socket, buf, ret); + ret = write(h->ev.fd, buf, ret); if( ret == -1 ) { DPRINTF(E_DEBUG, L_HTTP, "write error :: error no. %d [%s]\n", errno, strerror(errno)); if( errno == EAGAIN ) diff --git a/upnphttp.h b/upnphttp.h index d708946..e28a943 100644 --- a/upnphttp.h +++ b/upnphttp.h @@ -75,7 +75,7 @@ enum httpCommands { }; struct upnphttp { - int socket; + struct event ev; struct in_addr clientaddr; /* client address */ int iface; int state; @@ -144,10 +144,6 @@ CloseSocket_upnphttp(struct upnphttp *); void Delete_upnphttp(struct upnphttp *); -/* Process_upnphttp() */ -void -Process_upnphttp(struct upnphttp *); - /* BuildHeader_upnphttp() * build the header for the HTTP Response * also allocate the buffer for body data */ diff --git a/upnpsoap.c b/upnpsoap.c index 8219937..a41e6a6 100644 --- a/upnpsoap.c +++ b/upnpsoap.c @@ -61,6 +61,7 @@ #include #include +#include "event.h" #include "upnpglobalvars.h" #include "utils.h" #include "upnphttp.h" diff --git a/uuid.c b/uuid.c index 3cd6f5a..6e6a9ec 100644 --- a/uuid.c +++ b/uuid.c @@ -39,6 +39,7 @@ #include #endif +#include "event.h" #include "uuid.h" #include "getifaddr.h" #include "log.h"