net/netlink/netlink_conn.c: Add support for asynchronous Netlink responses.

This commit is contained in:
Gregory Nutt 2019-11-18 09:48:14 -06:00
parent ab78f0ca0a
commit 6479775721
4 changed files with 266 additions and 33 deletions

View File

@ -26,6 +26,23 @@ config NETLINK_CONNS
---help---
Maximum number of Netlink connections (all tasks).
config NETLINK_MAXPENDING
int "Max pending responses"
default 1
---help---
This defines the maximum number of threads that can be waiting for
a NetLink response. If there is never more than one recv() or
recvfrom() per socket, then there need be only 1. This only
accounts for a perverse case where more than one thread is waiting
on recv() or recvfrom().
config NETLINK_SIGNAL
int "Response notification signal"
default 15
---help---
This is the signal number that is used to wake up threads waiting
for a response to be received.
menu "Netlink Protocols"
config NETLINK_ROUTE

View File

@ -57,6 +57,8 @@
* Pre-processor Definitions
****************************************************************************/
#define NETLINK_NO_WAITER ((pid_t)-1)
/****************************************************************************
* Public Type Definitions
****************************************************************************/
@ -102,7 +104,11 @@ struct netlink_conn_s
uint8_t crefs; /* Reference counts on this instance */
uint8_t protocol; /* See NETLINK_* definitions */
/* Buffered response data */
/* Threads waiting for a response */
pid_t waiter[CONFIG_NETLINK_MAXPENDING];
/* Queued response data */
sq_queue_t resplist; /* Singly linked list of responses*/
};
@ -190,15 +196,34 @@ FAR struct netlink_conn_s *netlink_active(FAR struct sockaddr_nl *addr);
* Description:
* Add response data at the tail of the pending response list.
*
* Assumptions:
* The caller has the network locked to prevent concurrent access to the
* socket.
* Note: The network will be momentarily locked to support exclusive
* access to the pending response list.
*
****************************************************************************/
void netlink_add_response(FAR struct socket *psock,
FAR struct netlink_response_s *resp);
/****************************************************************************
* Name: netlink_tryget_response
*
* Description:
* Return the next response from the head of the pending response list.
* Responses are returned one-at-a-time in FIFO order.
*
* Note: The network will be momentarily locked to support exclusive
* access to the pending response list.
*
* Returned Value:
* The next response from the head of the pending response list is
* returned. NULL will be returned if the pending response list is
* empty
*
****************************************************************************/
FAR struct netlink_response_s *
netlink_tryget_response(FAR struct socket *psock);
/****************************************************************************
* Name: netlink_get_response
*
@ -206,13 +231,18 @@ void netlink_add_response(FAR struct socket *psock,
* Return the next response from the head of the pending response list.
* Responses are returned one-at-a-time in FIFO order.
*
* Assumptions:
* The caller has the network locked to prevent concurrent access to the
* socket.
* Note: The network will be momentarily locked to support exclusive
* access to the pending response list.
*
* Returned Value:
* The next response from the head of the pending response list is
* always returned. This function will block until a response is
* received if the pending response list is empty.
*
****************************************************************************/
FAR struct netlink_response_s *netlink_get_response(FAR struct socket *psock);
FAR struct netlink_response_s *
netlink_get_response(FAR struct socket *psock);
/****************************************************************************
* Name: netlink_route_sendto()

View File

@ -50,9 +50,11 @@
#include <nuttx/kmalloc.h>
#include <nuttx/semaphore.h>
#include <nuttx/signal.h>
#include <nuttx/net/netconfig.h>
#include <nuttx/net/net.h>
#include "utils/utils.h"
#include "netlink/netlink.h"
#ifdef CONFIG_NET_NETLINK
@ -107,6 +109,74 @@ static void _netlink_semgive(FAR sem_t *sem)
(void)nxsem_post(sem);
}
/****************************************************************************
* Name: netlink_notify_waiters
*
* Description:
* Notify all threads waiting for a response.
*
* Assumptions:
* The network is locked.
*
****************************************************************************/
static void netlink_notify_waiters(FAR struct netlink_conn_s *conn)
{
int ret;
int i;
/* Notify every pending thread. Lock the scheduler while we do this so
* there there is no thrashing: All waiters will be restarted, but only
* the highest priority waiter will get to run and will receive the
* response.
*/
sched_lock();
for (i = 0; i < CONFIG_NETLINK_MAXPENDING; i++)
{
if (conn->waiter[i] > 0)
{
ret = nxsig_kill(conn->waiter[i], CONFIG_NETLINK_SIGNAL);
if (ret < 0)
{
nerr("ERROR: nxsig_kill() failed: %d\n", ret);
UNUSED(ret);
}
}
}
sched_unlock();
}
/****************************************************************************
* Name: netlink_add_waiter
*
* Description:
* Add one more waiter to the list of waiters.
*
* Assumptions:
* The network is locked.
*
****************************************************************************/
static int netlink_add_waiter(FAR struct netlink_conn_s *conn)
{
int i;
for (i = 0; i < CONFIG_NETLINK_MAXPENDING; i++)
{
if (conn->waiter[i] <= 0)
{
conn->waiter[i] = getpid();
return OK;
}
}
nerr("ERROR: Too many waiters\n");
DEBUGPANIC();
return -ENOSPC;
}
/****************************************************************************
* Public Functions
****************************************************************************/
@ -153,17 +223,25 @@ void netlink_initialize(void)
FAR struct netlink_conn_s *netlink_alloc(void)
{
FAR struct netlink_conn_s *conn;
int i;
/* The free list is protected by a semaphore (that behaves like a mutex). */
_netlink_semtake(&g_free_sem);
conn = (FAR struct netlink_conn_s *)dq_remfirst(&g_free_netlink_connections);
if (conn)
if (conn != NULL)
{
/* Make sure that the connection is marked as uninitialized */
memset(conn, 0, sizeof(*conn));
/* With no waiters */
for (i = 0; i < CONFIG_NETLINK_MAXPENDING; i++)
{
conn->waiter[i] = NETLINK_NO_WAITER;
}
/* Enqueue the connection into the active list */
dq_addlast(&conn->node, &g_active_netlink_connections);
@ -249,8 +327,11 @@ FAR struct netlink_conn_s *netlink_nextconn(FAR struct netlink_conn_s *conn)
FAR struct netlink_conn_s *netlink_active(FAR struct sockaddr_nl *addr)
{
FAR struct netlink_conn_s *conn = NULL;
#warning "Missing logic for NETLINK active"
/* This function is used to handle routing of incoming messages to sockets
* connected to the address. There is no such use case for NetLink
* sockets.
*/
return NULL;
}
@ -260,9 +341,8 @@ FAR struct netlink_conn_s *netlink_active(FAR struct sockaddr_nl *addr)
* Description:
* Add response data at the tail of the pending response list.
*
* Assumptions:
* The caller has the network locked to prevent concurrent access to the
* socket.
* Note: The network will be momentarily locked to support exclusive
* access to the pending response list.
*
****************************************************************************/
@ -274,24 +354,39 @@ void netlink_add_response(FAR struct socket *psock,
DEBUGASSERT(psock != NULL && psock->s_conn != NULL && resp != NULL);
conn = (FAR struct netlink_conn_s *)psock->s_conn;
/* Add the response to the end of the FIFO list */
net_lock();
sq_addlast(&resp->flink, &conn->resplist);
/* Notify any waiters that a response is available */
netlink_notify_waiters(conn);
net_unlock();
}
/****************************************************************************
* Name: netlink_get_response
* Name: netlink_tryget_response
*
* Description:
* Return the next response from the head of the pending response list.
* Responses are returned one-at-a-time in FIFO order.
*
* Assumptions:
* The caller has the network locked to prevent concurrent access to the
* socket.
* Note: The network will be momentarily locked to support exclusive
* access to the pending response list.
*
* Returned Value:
* The next response from the head of the pending response list is
* returned. NULL will be returned if the pending response list is
* empty
*
****************************************************************************/
FAR struct netlink_response_s *netlink_get_response(FAR struct socket *psock)
FAR struct netlink_response_s *
netlink_tryget_response(FAR struct socket *psock)
{
FAR struct netlink_response_s *resp;
FAR struct netlink_conn_s *conn;
DEBUGASSERT(psock != NULL && psock->s_conn != NULL);
@ -302,7 +397,92 @@ FAR struct netlink_response_s *netlink_get_response(FAR struct socket *psock)
* NULL).
*/
return (FAR struct netlink_response_s *)sq_remfirst(&conn->resplist);
net_lock();
resp = (FAR struct netlink_response_s *)sq_remfirst(&conn->resplist);
net_unlock();
return resp;
}
/****************************************************************************
* Name: netlink_get_response
*
* Description:
* Return the next response from the head of the pending response list.
* Responses are returned one-at-a-time in FIFO order.
*
* Note: The network will be momentarily locked to support exclusive
* access to the pending response list.
*
* Returned Value:
* The next response from the head of the pending response list is
* always returned. This function will block until a response is
* received if the pending response list is empty.
*
****************************************************************************/
FAR struct netlink_response_s *
netlink_get_response(FAR struct socket *psock)
{
FAR struct netlink_response_s *resp;
FAR struct netlink_conn_s *conn;
FAR struct siginfo info;
unsigned int count;
sigset_t set;
irqstate_t flags;
int ret;
DEBUGASSERT(psock != NULL && psock->s_conn != NULL);
conn = (FAR struct netlink_conn_s *)psock->s_conn;
/* Loop, until a response is received. A loop is used because in the case
* of multiple waiters, all waiters will be awakened, but only the highest
* priority waiter will get the response.
*/
net_lock();
while ((resp = netlink_tryget_response(psock)) == NULL)
{
/* Add this task as a waiter */
ret = netlink_add_waiter(conn);
if (ret < 0)
{
nerr("ERROR: netlink_add_waiter failed: %d\n", ret);
}
/* Break any network lock while we wait */
flags = enter_critical_section();
ret = net_breaklock(&count);
if (ret < 0)
{
/* net_breaklock() would only fail if we were not the holder of
* lock. But we do hold the lock?
*/
nerr("ERROR: net_breaklock failed: %d\n", ret);
DEBUGPANIC();
}
/* Wait for a response */
sigemptyset(&set);
sigaddset(&set, CONFIG_NETLINK_SIGNAL);
ret = sigwaitinfo(&set, &info);
if (ret < 0)
{
nerr("ERROR: sigwaitinfo failed: %d\n", ret);
}
/* Restore the network lock */
net_restorelock(count);
leave_critical_section(flags);
}
return resp;
}
#endif /* CONFIG_NET_NETLINK */

View File

@ -706,10 +706,7 @@ static int netlink_route_terminator(FAR struct socket *psock,
/* Finally, add the response to the list of pending responses */
net_lock();
netlink_add_response(psock, (FAR struct netlink_response_s *)alloc);
net_unlock();
return OK;
}
#endif
@ -1049,21 +1046,30 @@ ssize_t netlink_route_recvfrom(FAR struct socket *psock,
DEBUGASSERT(psock != NULL && nlmsg != NULL &&
len >= sizeof(struct nlmsghdr));
/* Find the response to this message */
net_lock();
entry = (FAR struct netlink_response_s *)netlink_get_response(psock);
net_unlock();
/* Find the response to this message. The return value */
entry = (FAR struct netlink_response_s *)netlink_tryget_response(psock);
if (entry == NULL)
{
/* REVISIT: The correct behavior here for a blocking socket would be
* to wait until the data becomes available. This is not an issue for
* the currently supported operations since they are fully synchronous
* but will become an issue in the future.
/* No response is variable, but presumably, one is expected. Check
* if the socket has been configured for non-blocking operation.
* REVISIT: I think there needs to be some higher level logic to
* select Netlink non-blocking sockets.
*/
return -ENOENT;
if (_SS_ISNONBLOCK(psock->s_flags))
{
return -EAGAIN;
}
/* Wait for the response. This should always succeed. */
entry = (FAR struct netlink_response_s *)netlink_get_response(psock);
DEBUGASSERT(entry != NULL);
if (entry == NULL)
{
return -EPIPE;
}
}
if (len < entry->msg.nlmsg_len)