From 3c1af2feed12d474e651f4157d89a684dfc20512 Mon Sep 17 00:00:00 2001 From: Gregory Nutt Date: Tue, 12 May 2015 07:41:12 -0600 Subject: [PATCH] Local sockets: Add poll support for Unix stream sockets. From Jussi Kivilinna. --- drivers/pipes/pipe_common.c | 67 +++++++- drivers/pipes/pipe_common.h | 1 + fs/vfs/fs_poll.c | 79 ++++++--- include/nuttx/fs/fs.h | 23 +++ net/local/local.h | 66 +++++++- net/local/local_accept.c | 34 +++- net/local/local_conn.c | 5 + net/local/local_connect.c | 2 + net/local/local_listen.c | 18 -- net/local/local_netpoll.c | 321 +++++++++++++++++++++++++++++++++++- 10 files changed, 561 insertions(+), 55 deletions(-) diff --git a/drivers/pipes/pipe_common.c b/drivers/pipes/pipe_common.c index dbd3b7568b..e2f33aceec 100644 --- a/drivers/pipes/pipe_common.c +++ b/drivers/pipes/pipe_common.c @@ -120,12 +120,25 @@ static void pipecommon_pollnotify(FAR struct pipe_dev_s *dev, pollevent_t events { int i; + if (eventset & POLLERR) + { + eventset &= ~(POLLOUT | POLLIN); + } + for (i = 0; i < CONFIG_DEV_PIPE_NPOLLWAITERS; i++) { struct pollfd *fds = dev->d_fds[i]; if (fds) { - fds->revents |= (fds->events & eventset); + fds->revents |= eventset & (fds->events | POLLERR | POLLHUP); + + if ((fds->revents & (POLLOUT | POLLHUP)) == (POLLOUT | POLLHUP)) + { + /* POLLOUT and POLLHUP are mutually exclusive. */ + + fds->revents &= ~POLLOUT; + } + if (fds->revents != 0) { fvdbg("Report events: %02x\n", fds->revents); @@ -222,7 +235,7 @@ int pipecommon_open(FAR struct file *filep) dev->d_refs++; - /* If opened for writing, increment the count of writers on on the pipe instance */ + /* If opened for writing, increment the count of writers on the pipe instance */ if ((filep->f_oflags & O_WROK) != 0) { @@ -241,6 +254,13 @@ int pipecommon_open(FAR struct file *filep) } } + /* If opened for reading, increment the count of reader on on the pipe instance */ + + if ((filep->f_oflags & O_RDOK) != 0) + { + dev->d_nreaders++; + } + /* If opened for read-only, then wait for either (1) at least one writer * on the pipe (policy == 0), or (2) until there is buffered data to be * read (policy == 1). @@ -326,6 +346,27 @@ int pipecommon_close(FAR struct file *filep) { sem_post(&dev->d_rdsem); } + + /* Inform poll readers that other end closed. */ + + pipecommon_pollnotify(dev, POLLHUP); + } + } + + /* If opened for reading, decrement the count of readers on the pipe + * instance. + */ + + if ((filep->f_oflags & O_RDOK) != 0) + { + if (--dev->d_nreaders <= 0) + { + if (PIPE_IS_POLICY_0(dev->d_flags)) + { + /* Inform poll writers that other end closed. */ + + pipecommon_pollnotify(dev, POLLERR); + } } } } @@ -349,6 +390,7 @@ int pipecommon_close(FAR struct file *filep) dev->d_rdndx = 0; dev->d_refs = 0; dev->d_nwriters = 0; + dev->d_nreaders = 0; #ifndef CONFIG_DISABLE_PSEUDOFS_OPERATIONS /* If, in addition, we have been unlinked, then also need to free the @@ -423,7 +465,7 @@ ssize_t pipecommon_read(FAR struct file *filep, FAR char *buffer, size_t len) ret = sem_wait(&dev->d_rdsem); sched_unlock(); - if (ret < 0 || sem_wait(&dev->d_bfsem) < 0) + if (ret < 0 || sem_wait(&dev->d_bfsem) < 0) { return ERROR; } @@ -641,7 +683,8 @@ int pipecommon_poll(FAR struct file *filep, FAR struct pollfd *fds, nbytes = (CONFIG_DEV_PIPE_SIZE-1) + dev->d_wrndx - dev->d_rdndx; } - /* Notify the POLLOUT event if the pipe is not full */ + /* Notify the POLLOUT event if the pipe is not full, but only if + * there is readers. */ eventset = 0; if (nbytes < (CONFIG_DEV_PIPE_SIZE-1)) @@ -656,6 +699,22 @@ int pipecommon_poll(FAR struct file *filep, FAR struct pollfd *fds, eventset |= POLLIN; } + /* Notify the POLLHUP event if the pipe is empty and no writers */ + + if (nbytes == 0 && dev->d_nwriters <= 0) + { + eventset |= POLLHUP; + } + + /* Change POLLOUT to POLLERR, if no readers and policy 0. */ + + if ((eventset | POLLOUT) && + PIPE_IS_POLICY_0(dev->d_flags) && + dev->d_nreaders <= 0) + { + eventset |= POLLERR; + } + if (eventset) { pipecommon_pollnotify(dev, eventset); diff --git a/drivers/pipes/pipe_common.h b/drivers/pipes/pipe_common.h index b9ef6e9932..5bbe850bb3 100644 --- a/drivers/pipes/pipe_common.h +++ b/drivers/pipes/pipe_common.h @@ -109,6 +109,7 @@ struct pipe_dev_s pipe_ndx_t d_rdndx; /* Index in d_buffer to return the next byte read */ uint8_t d_refs; /* References counts on pipe (limited to 255) */ uint8_t d_nwriters; /* Number of reference counts for write access */ + uint8_t d_nreaders; /* Number of reference counts for read access */ uint8_t d_pipeno; /* Pipe minor number */ uint8_t d_flags; /* See PIPE_FLAG_* definitions */ uint8_t *d_buffer; /* Buffer allocated when device opened */ diff --git a/fs/vfs/fs_poll.c b/fs/vfs/fs_poll.c index b3adb0a314..f5873f3a78 100644 --- a/fs/vfs/fs_poll.c +++ b/fs/vfs/fs_poll.c @@ -102,10 +102,6 @@ static int poll_semtake(FAR sem_t *sem) #if CONFIG_NFILE_DESCRIPTORS > 0 static int poll_fdsetup(int fd, FAR struct pollfd *fds, bool setup) { - FAR struct file *filep; - FAR struct inode *inode; - int ret = -ENOSYS; - /* Check for a valid file descriptor */ if ((unsigned int)fd >= CONFIG_NFILE_DESCRIPTORS) @@ -124,29 +120,7 @@ static int poll_fdsetup(int fd, FAR struct pollfd *fds, bool setup) } } - /* Get the file pointer corresponding to this file descriptor */ - - filep = fs_getfilep(fd); - if (!filep) - { - /* The errno value has already been set */ - - return ERROR; - } - - /* Is a driver registered? Does it support the poll method? - * If not, return -ENOSYS - */ - - inode = filep->f_inode; - if (inode && inode->u.i_ops && inode->u.i_ops->poll) - { - /* Yes, then setup the poll */ - - ret = (int)inode->u.i_ops->poll(filep, fds, setup); - } - - return ret; + return file_poll(fd, fds, setup); } #endif @@ -265,6 +239,57 @@ static inline int poll_teardown(FAR struct pollfd *fds, nfds_t nfds, int *count, * Public Functions ****************************************************************************/ +/**************************************************************************** + * Function: file_poll + * + * Description: + * The standard poll() operation redirects operations on file descriptors + * to this function. + * + * Input Parameters: + * fd - The file descriptor of interest + * fds - The structure describing the events to be monitored, OR NULL if + * this is a request to stop monitoring events. + * setup - true: Setup up the poll; false: Teardown the poll + * + * Returned Value: + * 0: Success; Negated errno on failure + * + ****************************************************************************/ + +#if CONFIG_NFILE_DESCRIPTORS > 0 +int file_poll(int fd, FAR struct pollfd *fds, bool setup) +{ + FAR struct file *filep; + FAR struct inode *inode; + int ret = -ENOSYS; + + /* Get the file pointer corresponding to this file descriptor */ + + filep = fs_getfilep(fd); + if (!filep) + { + /* The errno value has already been set */ + + return -get_errno(); + } + + /* Is a driver registered? Does it support the poll method? + * If not, return -ENOSYS + */ + + inode = filep->f_inode; + if (inode && inode->u.i_ops && inode->u.i_ops->poll) + { + /* Yes, then setup the poll */ + + ret = (int)inode->u.i_ops->poll(filep, fds, setup); + } + + return ret; +} +#endif + /**************************************************************************** * Name: poll * diff --git a/include/nuttx/fs/fs.h b/include/nuttx/fs/fs.h index dd71e6222e..3d2d6c3c73 100644 --- a/include/nuttx/fs/fs.h +++ b/include/nuttx/fs/fs.h @@ -841,6 +841,29 @@ int file_fsync(FAR struct file *filep); int file_vfcntl(FAR struct file *filep, int cmd, va_list ap); #endif +/* fs/fs_poll.c *************************************************************/ +/**************************************************************************** + * Function: file_poll + * + * Description: + * The standard poll() operation redirects operations on file descriptors + * to this function. + * + * Input Parameters: + * fd - The file descriptor of interest + * fds - The structure describing the events to be monitored, OR NULL if + * this is a request to stop monitoring events. + * setup - true: Setup up the poll; false: Teardown the poll + * + * Returned Value: + * 0: Success; Negated errno on failure + * + ****************************************************************************/ + +#if CONFIG_NFILE_DESCRIPTORS > 0 +int file_poll(int fd, FAR struct pollfd *fds, bool setup); +#endif + /* drivers/dev_null.c *******************************************************/ /**************************************************************************** * Name: devnull_register diff --git a/net/local/local.h b/net/local/local.h index 3c4bb525c9..5378d19b84 100644 --- a/net/local/local.h +++ b/net/local/local.h @@ -49,15 +49,19 @@ #include #include #include +#include #ifdef CONFIG_NET_LOCAL /**************************************************************************** * Pre-processor Definitions ****************************************************************************/ -/* Not yet any support for poll/select operations on Unix domain sockets */ #undef HAVE_LOCAL_POLL +#ifndef CONFIG_DISABLE_POLL +# define HAVE_LOCAL_POLL 1 +# define LOCAL_ACCEPT_NPOLLWAITERS 2 +#endif /* Packet format in FIFO: * @@ -145,6 +149,14 @@ struct local_conn_s sem_t lc_waitsem; /* Use to wait for a connection to be accepted */ +#ifdef HAVE_LOCAL_POLL + /* The following is a list if poll structures of threads waiting for + * socket accept events. + */ + + struct pollfd *lc_accept_fds[LOCAL_ACCEPT_NPOLLWAITERS]; +#endif + /* Union of fields unique to SOCK_STREAM client, server, and connected * peers. */ @@ -619,6 +631,58 @@ int local_open_sender(FAR struct local_conn_s *conn, FAR const char *path, bool nonblock); #endif + +/**************************************************************************** + * Name: local_accept_pollnotify + ****************************************************************************/ + +#ifdef HAVE_LOCAL_POLL +void local_accept_pollnotify(FAR struct local_conn_s *conn, + pollevent_t eventset); +#else +#define local_accept_pollnotify(conn, eventset) ((void)(conn)) +#endif + +/**************************************************************************** + * Function: local_pollsetup + * + * Description: + * Setup to monitor events on one Unix domain socket + * + * Input Parameters: + * psock - The Unix domain socket of interest + * fds - The structure describing the events to be monitored, OR NULL if + * this is a request to stop monitoring events. + * + * Returned Value: + * 0: Success; Negated errno on failure + * + ****************************************************************************/ + +#ifdef HAVE_LOCAL_POLL +int local_pollsetup(FAR struct socket *psock, FAR struct pollfd *fds); +#endif + +/**************************************************************************** + * Function: local_pollteardown + * + * Description: + * Teardown monitoring of events on a Unix domain socket + * + * Input Parameters: + * psock - The Unix domain socket of interest + * fds - The structure describing the events to be monitored, OR NULL if + * this is a request to stop monitoring events. + * + * Returned Value: + * 0: Success; Negated errno on failure + * + ****************************************************************************/ + +#ifdef HAVE_LOCAL_POLL +int local_pollteardown(FAR struct socket *psock, FAR struct pollfd *fds); +#endif + #undef EXTERN #ifdef __cplusplus } diff --git a/net/local/local_accept.c b/net/local/local_accept.c index 60816640b7..59ab57145f 100644 --- a/net/local/local_accept.c +++ b/net/local/local_accept.c @@ -51,6 +51,38 @@ #include "socket/socket.h" #include "local/local.h" +/**************************************************************************** + * Private Functions + ****************************************************************************/ + +/**************************************************************************** + * Function: local_waitlisten + ****************************************************************************/ + +static int local_waitlisten(FAR struct local_conn_s *server) +{ + int ret; + + /* Loop until a connection is requested or we receive a signal */ + + while (dq_empty(&server->u.server.lc_waiters)) + { + /* No.. wait for a connection or a signal */ + + ret = sem_wait(&server->lc_waitsem); + if (ret < 0) + { + int errval = errno; + DEBUGASSERT(errval == EINTR); + return -errval; + } + } + + /* There is a client waiting for the connection */ + + return OK; +} + /**************************************************************************** * Public Functions ****************************************************************************/ @@ -215,7 +247,7 @@ int psock_local_accept(FAR struct socket *psock, FAR struct sockaddr *addr, /* Otherwise, listen for a connection and try again. */ - ret = local_listen(server, server->u.server.lc_backlog); + ret = local_waitlisten(server); if (ret < 0) { return ret; diff --git a/net/local/local_conn.c b/net/local/local_conn.c index 68006f2d76..c45b49832f 100644 --- a/net/local/local_conn.c +++ b/net/local/local_conn.c @@ -94,6 +94,9 @@ FAR struct local_conn_s *local_alloc(void) conn->lc_outfd = -1; #ifdef CONFIG_NET_LOCAL_STREAM sem_init(&conn->lc_waitsem, 0, 0); +#ifdef HAVE_LOCAL_POLL + memset(conn->lc_accept_fds, 0, sizeof(conn->lc_accept_fds)); +#endif #endif } @@ -118,6 +121,7 @@ void local_free(FAR struct local_conn_s *conn) if (conn->lc_infd >= 0) { close(conn->lc_infd); + conn->lc_infd = -1; } /* Make sure that the write-only FIFO is closed */ @@ -125,6 +129,7 @@ void local_free(FAR struct local_conn_s *conn) if (conn->lc_outfd >= 0) { close(conn->lc_outfd); + conn->lc_outfd = -1; } #ifdef CONFIG_NET_LOCAL_STREAM diff --git a/net/local/local_connect.c b/net/local/local_connect.c index b4bdc59eb6..714afe1838 100644 --- a/net/local/local_connect.c +++ b/net/local/local_connect.c @@ -160,6 +160,7 @@ int inline local_stream_connect(FAR struct local_conn_s *client, dq_addlast(&client->lc_node, &server->u.server.lc_waiters); client->lc_state = LOCAL_STATE_ACCEPT; + local_accept_pollnotify(server, POLLIN); _local_semgive(&server->lc_waitsem); net_unlock(state); @@ -197,6 +198,7 @@ int inline local_stream_connect(FAR struct local_conn_s *client, errout_with_outfd: (void)close(client->lc_outfd); + client->lc_outfd = -1; errout_with_fifos: (void)local_release_fifos(client); diff --git a/net/local/local_listen.c b/net/local/local_listen.c index 69d0b457c4..712055c9ce 100644 --- a/net/local/local_listen.c +++ b/net/local/local_listen.c @@ -85,7 +85,6 @@ dq_queue_t g_local_listeners; int local_listen(FAR struct local_conn_s *server, int backlog) { net_lock_t state; - int ret; /* Some sanity checks */ @@ -128,23 +127,6 @@ int local_listen(FAR struct local_conn_s *server, int backlog) server->lc_state = LOCAL_STATE_LISTENING; } - /* Loop until a connection is requested or we receive a signal */ - - while (dq_empty(&server->u.server.lc_waiters)) - { - /* No.. wait for a connection or a signal */ - - ret = sem_wait(&server->lc_waitsem); - if (ret < 0) - { - int errval = errno; - DEBUGASSERT(errval == EINTR); - return -errval; - } - } - - /* There is a client waiting for the connection */ - return OK; } diff --git a/net/local/local_netpoll.c b/net/local/local_netpoll.c index b25b7bc87f..01ebe0884a 100644 --- a/net/local/local_netpoll.c +++ b/net/local/local_netpoll.c @@ -39,14 +39,127 @@ #include +#include +#include +#include +#include + +#include +#include +#include + +#include "socket/socket.h" #include "local/local.h" #ifdef HAVE_LOCAL_POLL +/**************************************************************************** + * Function: local_accept_pollsetup + ****************************************************************************/ + +#ifdef CONFIG_NET_LOCAL_STREAM +static int local_accept_pollsetup(FAR struct local_conn_s *conn, + FAR struct pollfd *fds, + bool setup) +{ + net_lock_t state; + pollevent_t eventset; + int ret = OK; + int i; + + state = net_lock(); + if (setup) + { + /* This is a request to set up the poll. Find an available + * slot for the poll structure reference + */ + + for (i = 0; i < LOCAL_ACCEPT_NPOLLWAITERS; i++) + { + /* Find an available slot */ + + if (!conn->lc_accept_fds[i]) + { + /* Bind the poll structure and this slot */ + + conn->lc_accept_fds[i] = fds; + fds->priv = &conn->lc_accept_fds[i]; + break; + } + } + + if (i >= LOCAL_ACCEPT_NPOLLWAITERS) + { + fds->priv = NULL; + ret = -EBUSY; + goto errout; + } + + eventset = 0; + if (dq_peek(&conn->u.server.lc_waiters) != NULL) + { + eventset |= POLLIN; + } + + if (eventset) + { + local_accept_pollnotify(conn, eventset); + } + } + else + { + /* This is a request to tear down the poll. */ + + struct pollfd **slot = (struct pollfd **)fds->priv; + + if (!slot) + { + ret = -EIO; + goto errout; + } + + /* Remove all memory of the poll setup */ + + *slot = NULL; + fds->priv = NULL; + } + +errout: + net_unlock(state); + return ret; +} +#endif + /**************************************************************************** * Public Functions ****************************************************************************/ +/**************************************************************************** + * Name: local_accept_pollnotify + ****************************************************************************/ + +void local_accept_pollnotify(FAR struct local_conn_s *conn, + pollevent_t eventset) +{ +#ifdef CONFIG_NET_LOCAL_STREAM + int i; + + for (i = 0; i < LOCAL_ACCEPT_NPOLLWAITERS; i++) + { + struct pollfd *fds = conn->lc_accept_fds[i]; + if (fds) + { + fds->revents |= (fds->events & eventset); + if (fds->revents != 0) + { + ndbg("Report events: %02x\n", fds->revents); + sem_post(fds->sem); + } + } + } +#endif +} + /**************************************************************************** * Function: local_pollsetup * @@ -65,8 +178,125 @@ int local_pollsetup(FAR struct socket *psock, FAR struct pollfd *fds) { -#warning Missing logic - return -ENOSYS; + FAR struct local_conn_s *conn; + int ret = -ENOSYS; + + conn = (FAR struct local_conn_s *)psock->s_conn; + + if (conn->lc_proto == SOCK_DGRAM) + { + return ret; + } + +#ifdef CONFIG_NET_LOCAL_STREAM + if (conn->lc_state == LOCAL_STATE_LISTENING && + conn->lc_type == LOCAL_TYPE_PATHNAME) + { + return local_accept_pollsetup(conn, fds, true); + } + + if (conn->lc_state == LOCAL_STATE_DISCONNECTED) + { + fds->priv = NULL; + goto pollerr; + } + + switch (fds->events & (POLLIN | POLLOUT)) + { + case (POLLIN | POLLOUT): + { + FAR struct pollfd *shadowfds; + + /* Poll wants to check state for both input and output. */ + + if (conn->lc_infd < 0 || conn->lc_outfd < 0) + { + fds->priv = NULL; + goto pollerr; + } + + /* Allocate shadow pollfds. */ + + shadowfds = kmm_zalloc(2 * sizeof(struct pollfd)); + if (!shadowfds) + { + return -ENOMEM; + } + + shadowfds[0].fd = conn->lc_infd; + shadowfds[0].sem = fds->sem; + shadowfds[0].events = fds->events & ~POLLOUT; + + shadowfds[1].fd = conn->lc_outfd; + shadowfds[1].sem = fds->sem; + shadowfds[1].events = fds->events & ~POLLIN; + + /* Setup poll for both shadow pollfds. */ + + ret = file_poll(conn->lc_infd, &shadowfds[0], true); + if (ret >= 0) + { + ret = file_poll(conn->lc_outfd, &shadowfds[1], true); + if (ret < 0) + { + (void)file_poll(conn->lc_infd, &shadowfds[0], false); + } + } + + if (ret < 0) + { + kmm_free(shadowfds); + fds->priv = NULL; + goto pollerr; + } + else + { + fds->priv = shadowfds; + ret = OK; + } + } + break; + + case POLLIN: + { + /* Poll wants to check state for input only. */ + + if (conn->lc_infd < 0) + { + fds->priv = NULL; + goto pollerr; + } + + ret = file_poll(conn->lc_infd, fds, true); + } + break; + + case POLLOUT: + { + /* Poll wants to check state for output only. */ + + if (conn->lc_outfd < 0) + { + fds->priv = NULL; + goto pollerr; + } + + ret = file_poll(conn->lc_outfd, fds, true); + } + break; + + default: + ret = OK; + break; + } +#endif + + return ret; + +pollerr: + fds->revents |= POLLERR; + sem_post(fds->sem); + return OK; } /**************************************************************************** @@ -87,8 +317,91 @@ int local_pollsetup(FAR struct socket *psock, FAR struct pollfd *fds) int local_pollteardown(FAR struct socket *psock, FAR struct pollfd *fds) { -#warning Missing logic - return -ENOSYS; + FAR struct local_conn_s *conn; + int status = OK; + int ret = -ENOSYS; + + conn = (FAR struct local_conn_s *)psock->s_conn; + + if (conn->lc_proto == SOCK_DGRAM) + { + return ret; + } + +#ifdef CONFIG_NET_LOCAL_STREAM + if (conn->lc_state == LOCAL_STATE_LISTENING && + conn->lc_type == LOCAL_TYPE_PATHNAME) + { + return local_accept_pollsetup(conn, fds, false); + } + + if (conn->lc_state == LOCAL_STATE_DISCONNECTED) + { + return OK; + } + + switch (fds->events & (POLLIN | POLLOUT)) + { + case (POLLIN | POLLOUT): + { + FAR struct pollfd *shadowfds = fds->priv; + + if (shadowfds == NULL) + { + return OK; + } + + DEBUGASSERT(shadowfds[0].fd == conn->lc_infd); + DEBUGASSERT(shadowfds[1].fd == conn->lc_outfd); + + /* Teardown for both shadow pollfds. */ + + ret = file_poll(conn->lc_infd, &shadowfds[0], false); + if (ret < 0) + { + status = ret; + } + + ret = file_poll(conn->lc_outfd, &shadowfds[1], false); + if (ret < 0) + { + status = ret; + } + + fds->revents |= shadowfds[0].revents | shadowfds[1].revents; + fds->priv = NULL; + kmm_free(shadowfds); + } + break; + + case POLLIN: + { + if (fds->priv == NULL) + { + return OK; + } + + status = file_poll(conn->lc_infd, fds, false); + } + break; + + case POLLOUT: + { + if (fds->priv == NULL) + { + return OK; + } + + status = file_poll(conn->lc_outfd, fds, false); + } + break; + + default: + break; + } +#endif + + return status; } #endif /* HAVE_LOCAL_POLL */