usrsock/server: replace the wakeup source to eventfd

Signed-off-by: chao.an <anchao@xiaomi.com>
This commit is contained in:
chao.an 2021-04-19 11:16:49 +08:00 committed by Xiang Xiao
parent 2214e51457
commit 084cc55064

View File

@ -28,9 +28,10 @@
#include <fcntl.h>
#include <poll.h>
#include <pthread.h>
#include <signal.h>
#include <string.h>
#include <sys/eventfd.h>
#include <nuttx/net/dns.h>
#include <nuttx/net/net.h>
#include <nuttx/rptun/openamp.h>
@ -39,13 +40,13 @@
struct usrsock_rpmsg_s
{
pid_t pid;
struct file *eventfp;
pthread_mutex_t mutex;
pthread_cond_t cond;
struct iovec iov[CONFIG_NETUTILS_USRSOCK_NIOVEC];
struct socket socks[CONFIG_NETUTILS_USRSOCK_NSOCK_DESCRIPTORS];
struct rpmsg_endpoint *epts[CONFIG_NETUTILS_USRSOCK_NSOCK_DESCRIPTORS];
struct pollfd pfds[CONFIG_NETUTILS_USRSOCK_NSOCK_DESCRIPTORS];
struct pollfd pfds[CONFIG_NETUTILS_USRSOCK_NSOCK_DESCRIPTORS + 1];
};
/****************************************************************************
@ -108,9 +109,10 @@ static void usrsock_rpmsg_ns_unbind(struct rpmsg_endpoint *ept);
static int usrsock_rpmsg_ept_cb(struct rpmsg_endpoint *ept, void *data,
size_t len, uint32_t src, void *priv);
static int usrsock_rpmsg_notify_poll(struct usrsock_rpmsg_s *priv);
static int usrsock_rpmsg_prepare_poll(struct usrsock_rpmsg_s *priv,
struct pollfd *pfds);
static void usrsock_rpmsg_process_poll(struct usrsock_rpmsg_s *priv,
static bool usrsock_rpmsg_process_poll(struct usrsock_rpmsg_s *priv,
struct pollfd *pfds, int count);
/****************************************************************************
@ -235,7 +237,7 @@ static int usrsock_rpmsg_socket_handler(struct rpmsg_endpoint *ept,
pthread_mutex_lock(&priv->mutex);
priv->pfds[ret].ptr = &priv->socks[ret];
priv->pfds[ret].events = POLLIN;
kill(priv->pid, SIGUSR1); /* Wakeup the poll thread */
usrsock_rpmsg_notify_poll(priv);
pthread_mutex_unlock(&priv->mutex);
retr = usrsock_rpmsg_send_event(ept, ret, USRSOCK_EVENT_SENDTO_READY);
}
@ -260,7 +262,7 @@ static int usrsock_rpmsg_close_handler(struct rpmsg_endpoint *ept,
/* Signal and wait the poll thread to wakeup */
pthread_mutex_lock(&priv->mutex);
kill(priv->pid, SIGUSR1);
usrsock_rpmsg_notify_poll(priv);
pthread_cond_wait(&priv->cond, &priv->mutex);
pthread_mutex_unlock(&priv->mutex);
@ -305,7 +307,7 @@ static int usrsock_rpmsg_connect_handler(struct rpmsg_endpoint *ept,
priv->pfds[req->usockid].events |= POLLOUT;
}
kill(priv->pid, SIGUSR1); /* Wakeup the poll thread */
usrsock_rpmsg_notify_poll(priv);
pthread_mutex_unlock(&priv->mutex);
if (!inprogress)
{
@ -428,7 +430,7 @@ out:
{
pthread_mutex_lock(&priv->mutex);
priv->pfds[req->usockid].events |= POLLOUT;
kill(priv->pid, SIGUSR1); /* Wakeup the poll thread */
usrsock_rpmsg_notify_poll(priv);
pthread_mutex_unlock(&priv->mutex);
}
@ -490,7 +492,7 @@ static int usrsock_rpmsg_recvfrom_handler(struct rpmsg_endpoint *ept,
{
pthread_mutex_lock(&priv->mutex);
priv->pfds[req->usockid].events |= POLLIN;
kill(priv->pid, SIGUSR1); /* Wakeup the poll thread */
usrsock_rpmsg_notify_poll(priv);
pthread_mutex_unlock(&priv->mutex);
}
@ -625,7 +627,7 @@ static int usrsock_rpmsg_listen_handler(struct rpmsg_endpoint *ept,
pthread_mutex_lock(&priv->mutex);
priv->pfds[req->usockid].ptr = &priv->socks[req->usockid];
priv->pfds[req->usockid].events = POLLIN;
kill(priv->pid, SIGUSR1); /* Wakeup the poll thread */
usrsock_rpmsg_notify_poll(priv);
pthread_mutex_unlock(&priv->mutex);
}
@ -695,7 +697,7 @@ static int usrsock_rpmsg_accept_handler(struct rpmsg_endpoint *ept,
pthread_mutex_lock(&priv->mutex);
priv->pfds[i].ptr = &priv->socks[i];
priv->pfds[i].events = POLLIN;
kill(priv->pid, SIGUSR1); /* Wakeup the poll thread */
usrsock_rpmsg_notify_poll(priv);
pthread_mutex_unlock(&priv->mutex);
usrsock_rpmsg_send_event(ept, i, USRSOCK_EVENT_SENDTO_READY);
}
@ -807,7 +809,7 @@ static void usrsock_rpmsg_ns_unbind(struct rpmsg_endpoint *ept)
/* Signal and wait the poll thread to wakeup */
pthread_mutex_lock(&priv->mutex);
kill(priv->pid, SIGUSR1);
usrsock_rpmsg_notify_poll(priv);
pthread_cond_wait(&priv->cond, &priv->mutex);
pthread_mutex_unlock(&priv->mutex);
@ -840,6 +842,13 @@ static int usrsock_rpmsg_ept_cb(struct rpmsg_endpoint *ept, void *data,
return -EINVAL;
}
static int usrsock_rpmsg_notify_poll(struct usrsock_rpmsg_s *priv)
{
eventfd_t value = 1ULL;
return file_write(priv->eventfp, &value, sizeof(value));
}
static int usrsock_rpmsg_prepare_poll(struct usrsock_rpmsg_s *priv,
struct pollfd *pfds)
{
@ -861,64 +870,86 @@ static int usrsock_rpmsg_prepare_poll(struct usrsock_rpmsg_s *priv,
}
}
pfds[count].ptr = priv->eventfp;
pfds[count++].events = POLLIN | POLLFILE;
pthread_mutex_unlock(&priv->mutex);
return count;
}
static void usrsock_rpmsg_process_poll(struct usrsock_rpmsg_s *priv,
static bool usrsock_rpmsg_process_poll(struct usrsock_rpmsg_s *priv,
struct pollfd *pfds, int count)
{
bool prepare = false;
int i;
int j;
for (i = 0; i < count; i++)
{
j = (struct socket *)pfds[i].ptr - priv->socks;
pthread_mutex_lock(&priv->mutex);
if (priv->epts[j] != NULL)
{
int events = 0;
if (pfds[i].ptr == priv->eventfp)
{
if (pfds[i].revents & POLLIN)
{
events |= USRSOCK_EVENT_RECVFROM_AVAIL;
eventfd_t value;
/* Stop poll in until recv get called */
pfds[i].events &= ~POLLIN;
priv->pfds[j].events &= ~POLLIN;
file_read(priv->eventfp, &value, sizeof(value));
}
if (pfds[i].revents & POLLOUT)
prepare = true;
}
else
{
int j;
j = (struct socket *)pfds[i].ptr - priv->socks;
if (priv->epts[j] != NULL)
{
events |= USRSOCK_EVENT_SENDTO_READY;
int events = 0;
/* Stop poll out until send get called */
if (pfds[i].revents & POLLIN)
{
events |= USRSOCK_EVENT_RECVFROM_AVAIL;
pfds[i].events &= ~POLLOUT;
priv->pfds[j].events &= ~POLLOUT;
}
/* Stop poll in until recv get called */
if (pfds[i].revents & (POLLHUP | POLLERR))
{
events |= USRSOCK_EVENT_REMOTE_CLOSED;
pfds[i].events &= ~POLLIN;
priv->pfds[j].events &= ~POLLIN;
}
/* Stop poll at all */
if (pfds[i].revents & POLLOUT)
{
events |= USRSOCK_EVENT_SENDTO_READY;
pfds[i].ptr = NULL;
priv->pfds[j].ptr = NULL;
}
/* Stop poll out until send get called */
if (events != 0)
{
usrsock_rpmsg_send_event(priv->epts[j], j, events);
pfds[i].events &= ~POLLOUT;
priv->pfds[j].events &= ~POLLOUT;
}
if (pfds[i].revents & (POLLHUP | POLLERR))
{
events |= USRSOCK_EVENT_REMOTE_CLOSED;
/* Stop poll at all */
pfds[i].ptr = NULL;
priv->pfds[j].ptr = NULL;
}
if (events != 0)
{
usrsock_rpmsg_send_event(priv->epts[j], j, events);
}
}
}
pthread_mutex_unlock(&priv->mutex);
}
return prepare;
}
/****************************************************************************
@ -927,10 +958,11 @@ static void usrsock_rpmsg_process_poll(struct usrsock_rpmsg_s *priv,
int main(int argc, char *argv[])
{
struct pollfd pfds[CONFIG_NETUTILS_USRSOCK_NSOCK_DESCRIPTORS];
struct pollfd pfds[CONFIG_NETUTILS_USRSOCK_NSOCK_DESCRIPTORS + 1];
struct usrsock_rpmsg_s *priv;
sigset_t sigmask;
bool prepare = true;
int ret;
int fd;
priv = calloc(1, sizeof(*priv));
if (priv == NULL)
@ -938,16 +970,21 @@ int main(int argc, char *argv[])
return -ENOMEM;
}
priv->pid = getpid();
pthread_mutex_init(&priv->mutex, NULL);
pthread_cond_init(&priv->cond, NULL);
signal(SIGUSR1, SIG_IGN);
sigprocmask(SIG_SETMASK, NULL, &sigmask);
sigaddset(&sigmask, SIGUSR1);
sigprocmask(SIG_SETMASK, &sigmask, NULL);
sigdelset(&sigmask, SIGUSR1);
fd = eventfd(0, 0);
if (fd < 0)
{
ret = -errno;
goto free_priv;
}
ret = fs_getfilep(fd, &priv->eventfp);
if (ret < 0)
{
goto free_fd;
}
ret = rpmsg_register_callback(priv,
NULL,
@ -955,24 +992,25 @@ int main(int argc, char *argv[])
usrsock_rpmsg_ns_bind);
if (ret < 0)
{
goto free_priv;
goto free_fd;
}
while (1)
{
/* Collect all socks which need monitor */
if (prepare)
{
ret = usrsock_rpmsg_prepare_poll(priv, pfds);
}
/* Monitor the state change from them */
if (ppoll(pfds, ret, NULL, &sigmask) >= 0)
if (poll(pfds, ret, -1) > 0)
{
/* Process all changed socks */
usrsock_rpmsg_process_poll(priv, pfds, ret);
}
else if (errno == EINTR)
{
/* Collect all socks which need monitor */
ret = usrsock_rpmsg_prepare_poll(priv, pfds);
prepare = usrsock_rpmsg_process_poll(priv, pfds, ret);
}
}
@ -980,6 +1018,8 @@ int main(int argc, char *argv[])
NULL,
NULL,
usrsock_rpmsg_ns_bind);
free_fd:
close(fd);
free_priv:
pthread_cond_destroy(&priv->cond);
pthread_mutex_destroy(&priv->mutex);