From 48d49e5a7c28c8b05d8633240197f20b01fa678b Mon Sep 17 00:00:00 2001 From: ligd Date: Fri, 8 Jan 2021 13:43:33 +0800 Subject: [PATCH] mqueue: add poll support Change-Id: I7e908f6a6c00158c0946587dd79ae3dc5d279d37 Signed-off-by: ligd --- fs/mqueue/Kconfig | 6 ++ fs/mqueue/mq_open.c | 100 +++++++++++++++++++++++++++++++++- fs/vfs/fs_poll.c | 3 +- include/nuttx/mqueue.h | 20 +++++++ sched/mqueue/mq_rcvinternal.c | 5 +- sched/mqueue/mq_sndinternal.c | 6 +- 6 files changed, 136 insertions(+), 4 deletions(-) diff --git a/fs/mqueue/Kconfig b/fs/mqueue/Kconfig index 3e816d07d0..284ddb1515 100644 --- a/fs/mqueue/Kconfig +++ b/fs/mqueue/Kconfig @@ -11,4 +11,10 @@ config FS_MQUEUE_MPATH ---help--- The path to where POSIX message queues will exist in the VFS namespace. +config FS_MQUEUE_NPOLLWAITERS + int "Maximum number of poll waiters" + default 4 + ---help--- + The maximum number of waiters for the poll operation. + endif # !DISABLE_MQUEUE diff --git a/fs/mqueue/mq_open.c b/fs/mqueue/mq_open.c index 86901e83ac..80c5aab5e3 100644 --- a/fs/mqueue/mq_open.c +++ b/fs/mqueue/mq_open.c @@ -43,6 +43,8 @@ ****************************************************************************/ static int nxmq_file_close(FAR struct file *filep); +static int nxmq_file_poll(FAR struct file *filep, + struct pollfd *fds, bool setup); /**************************************************************************** * Private Data @@ -56,7 +58,7 @@ static const struct file_operations g_nxmq_fileops = NULL, /* write */ NULL, /* seek */ NULL, /* ioctl */ - NULL, /* poll */ + nxmq_file_poll, /* poll */ #ifndef CONFIG_DISABLE_PSEUDOFS_OPERATIONS NULL, /* unlink */ #endif @@ -84,6 +86,76 @@ static int nxmq_file_close(FAR struct file *filep) return 0; } +static int nxmq_file_poll(FAR struct file *filep, + struct pollfd *fds, bool setup) +{ + FAR struct inode *inode = filep->f_inode; + FAR struct mqueue_inode_s *msgq = inode->i_private; + pollevent_t eventset = 0; + irqstate_t flags; + int ret = 0; + int i; + + flags = enter_critical_section(); + + if (setup) + { + for (i = 0; i < CONFIG_FS_MQUEUE_NPOLLWAITERS; i++) + { + /* Find an available slot */ + + if (!msgq->fds[i]) + { + /* Bind the poll structure and this slot */ + + msgq->fds[i] = fds; + fds->priv = &msgq->fds[i]; + break; + } + } + + if (i >= CONFIG_FS_MQUEUE_NPOLLWAITERS) + { + fds->priv = NULL; + ret = -EBUSY; + goto errout; + } + + /* Immediately notify on any of the requested events */ + + if (msgq->nmsgs < msgq->maxmsgs) + { + eventset |= (fds->events & POLLOUT); + } + + if (msgq->nmsgs) + { + eventset |= (fds->events & POLLIN); + } + + if (eventset) + { + nxmq_pollnotify(msgq, eventset); + } + } + else if (fds->priv != NULL) + { + for (i = 0; i < CONFIG_FS_MQUEUE_NPOLLWAITERS; i++) + { + if (fds == msgq->fds[i]) + { + msgq->fds[i] = NULL; + fds->priv = NULL; + break; + } + } + } + +errout: + leave_critical_section(flags); + return ret; +} + static int file_mq_vopen(FAR struct file *mq, FAR const char *mq_name, int oflags, va_list ap, int *created) { @@ -287,6 +359,32 @@ static mqd_t nxmq_vopen(FAR const char *mq_name, int oflags, va_list ap) * Public Functions ****************************************************************************/ +void nxmq_pollnotify(FAR struct mqueue_inode_s *msgq, pollevent_t eventset) +{ + int i; + + for (i = 0; i < CONFIG_FS_MQUEUE_NPOLLWAITERS; i++) + { + FAR struct pollfd *fds = msgq->fds[i]; + + if (fds) + { + fds->revents |= (fds->events & eventset); + + if (fds->revents != 0) + { + int semcount; + + nxsem_get_value(fds->sem, &semcount); + if (semcount < 1) + { + nxsem_post(fds->sem); + } + } + } + } +} + /**************************************************************************** * Name: file_mq_open * diff --git a/fs/vfs/fs_poll.c b/fs/vfs/fs_poll.c index 5db0d6d554..31967b90d0 100644 --- a/fs/vfs/fs_poll.c +++ b/fs/vfs/fs_poll.c @@ -308,7 +308,8 @@ int file_poll(FAR struct file *filep, FAR struct pollfd *fds, bool setup) * If not, return -ENOSYS */ - if ((INODE_IS_DRIVER(inode) || INODE_IS_SOCKET(inode)) && + if ((INODE_IS_DRIVER(inode) || INODE_IS_MQUEUE(inode) || + INODE_IS_SOCKET(inode)) && inode->u.i_ops != NULL && inode->u.i_ops->poll != NULL) { /* Yes, it does... Setup the poll */ diff --git a/include/nuttx/mqueue.h b/include/nuttx/mqueue.h index 6a880b2bf4..9752d7d1ed 100644 --- a/include/nuttx/mqueue.h +++ b/include/nuttx/mqueue.h @@ -35,6 +35,7 @@ #include #include #include +#include #if CONFIG_MQ_MAXMSGSIZE > 0 @@ -99,6 +100,7 @@ struct mqueue_inode_s pid_t ntpid; /* Notification: Receiving Task's PID */ struct sigevent ntevent; /* Notification description */ struct sigwork_s ntwork; /* Notification work */ + FAR struct pollfd *fds[CONFIG_FS_MQUEUE_NPOLLWAITERS]; }; /**************************************************************************** @@ -394,6 +396,24 @@ void nxmq_free_msgq(FAR struct mqueue_inode_s *msgq); FAR struct mqueue_inode_s *nxmq_alloc_msgq(mode_t mode, FAR struct mq_attr *attr); +/**************************************************************************** + * Name: nxmq_pollnotify + * + * Description: + * pollnotify, used for notify the poll + * + * Input Parameters: + * msgq - Named essage queue + * eventset - evnet + * + * Returned Value: + * The allocated and initialized message queue structure or NULL in the + * event of a failure. + * + ****************************************************************************/ + +void nxmq_pollnotify(FAR struct mqueue_inode_s *msgq, pollevent_t eventset); + /**************************************************************************** * Name: file_mq_open * diff --git a/sched/mqueue/mq_rcvinternal.c b/sched/mqueue/mq_rcvinternal.c index 1a22bae684..def10acd67 100644 --- a/sched/mqueue/mq_rcvinternal.c +++ b/sched/mqueue/mq_rcvinternal.c @@ -203,7 +203,10 @@ int nxmq_wait_receive(FAR struct mqueue_inode_s *msgq, if (newmsg) { - msgq->nmsgs--; + if (msgq->nmsgs-- == msgq->maxmsgs) + { + nxmq_pollnotify(msgq, POLLOUT); + } } *rcvmsg = newmsg; diff --git a/sched/mqueue/mq_sndinternal.c b/sched/mqueue/mq_sndinternal.c index be2a3b3d05..b9025f912e 100644 --- a/sched/mqueue/mq_sndinternal.c +++ b/sched/mqueue/mq_sndinternal.c @@ -365,7 +365,11 @@ int nxmq_do_send(FAR struct mqueue_inode_s *msgq, /* Increment the count of messages in the queue */ - msgq->nmsgs++; + if (msgq->nmsgs++ == 0) + { + nxmq_pollnotify(msgq, POLLIN); + } + leave_critical_section(flags); /* Check if we need to notify any tasks that are attached to the