nuttx-apps/examples/usrsocktest/usrsocktest_daemon.c
Xiang Xiao 857158451b Unify the void cast usage
1.Remove void cast for function because many place ignore the returned value witout cast
2.Replace void cast for variable with UNUSED macro

Change-Id: Ie644129a563244a6397036789c4c3ea83c4e9b09
Signed-off-by: Xiang Xiao <xiaoxiang@xiaomi.com>
2020-01-02 23:21:01 +08:00

1949 lines
44 KiB
C

/****************************************************************************
* examples/usrsocktest/usrsocktest_daemon.c
*
* Copyright (C) 2015, 2017 Haltian Ltd. All rights reserved.
* Author: Jussi Kivilinna <jussi.kivilinna@haltian.com>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
* 3. Neither the name NuttX nor the names of its contributors may be
* used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
* COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
* OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
* AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
****************************************************************************/
/****************************************************************************
* Included Files
****************************************************************************/
#include <nuttx/config.h>
#include <stdint.h>
#include <stdbool.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <debug.h>
#include <fcntl.h>
#include <errno.h>
#include <poll.h>
#include <pthread.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <nuttx/net/usrsock.h>
#include "defines.h"
/****************************************************************************
* Definitions
****************************************************************************/
#ifndef dbg
#define dbg _warn
#endif
#define usrsocktest_dbg(...) ((void)0)
#define TEST_SOCKET_SOCKID_BASE 10000U
#define TEST_SOCKET_COUNT 8
#ifndef ARRAY_SIZE
# define ARRAY_SIZE(x) (sizeof(x) / sizeof((x)[0]))
#endif
#define noinline
/****************************************************************************
* Private Types
****************************************************************************/
struct test_socket_s
{
bool opened:1;
bool connected:1;
bool blocked_connect:1;
bool block_send:1;
bool connect_refused:1;
bool disconnected:1;
int recv_avail_bytes;
FAR void *endp;
struct usrsock_message_req_ack_s pending_resp;
};
struct delayed_cmd_s
{
sq_entry_t node;
pthread_t tid;
sem_t startsem;
int pipefd;
uint16_t delay_msec;
char cmd;
};
/****************************************************************************
* Private Data
****************************************************************************/
static struct daemon_priv_s
{
FAR const struct usrsocktest_daemon_conf_s *conf;
pthread_t tid;
bool joined;
int pipefd[2];
sem_t wakewaitsem;
unsigned int sockets_active;
unsigned int sockets_connected;
unsigned int sockets_waiting_connect;
unsigned int sockets_recv_empty;
unsigned int sockets_not_connected_refused;
unsigned int sockets_remote_disconnected;
size_t total_send_bytes;
size_t total_recv_bytes;
bool do_not_poll_usrsock;
struct test_socket_s test_sockets[TEST_SOCKET_COUNT];
sq_queue_t delayed_cmd_threads;
} g_ub_daemon =
{
.joined = true,
.conf = NULL,
};
static pthread_mutex_t daemon_mutex = PTHREAD_MUTEX_INITIALIZER;
/****************************************************************************
* Public Data
****************************************************************************/
const struct usrsocktest_daemon_conf_s usrsocktest_daemon_defconf =
USRSOCKTEST_DAEMON_CONF_DEFAULTS;
struct usrsocktest_daemon_conf_s usrsocktest_daemon_config;
/****************************************************************************
* Private Functions
****************************************************************************/
static int test_socket_alloc(FAR struct daemon_priv_s *priv)
{
int i;
for (i = 0; i < ARRAY_SIZE(priv->test_sockets); i++)
{
FAR struct test_socket_s *tsock = &priv->test_sockets[i];
if (!tsock->opened)
{
memset(tsock, 0, sizeof(*tsock));
tsock->opened = true;
tsock->block_send = priv->conf->endpoint_block_send;
tsock->recv_avail_bytes =
priv->conf->endpoint_recv_avail_from_start ?
priv->conf->endpoint_recv_avail : 0;
priv->sockets_active++;
if (tsock->recv_avail_bytes == 0)
priv->sockets_recv_empty++;
return i + TEST_SOCKET_SOCKID_BASE;
}
}
return -1;
}
static FAR struct test_socket_s *test_socket_get(FAR struct daemon_priv_s *priv,
int sockid)
{
if (sockid < TEST_SOCKET_SOCKID_BASE)
return NULL;
sockid -= TEST_SOCKET_SOCKID_BASE;
if (sockid >= ARRAY_SIZE(priv->test_sockets))
return NULL;
return &priv->test_sockets[sockid];
}
static int test_socket_free(FAR struct daemon_priv_s *priv, int sockid)
{
FAR struct test_socket_s *tsock = test_socket_get(priv, sockid);
if (!tsock)
return -EBADFD;
if (!tsock->opened)
return -EFAULT;
if (tsock->connected)
{
priv->sockets_connected--;
tsock->connected = false;
}
if (tsock->blocked_connect)
{
priv->sockets_waiting_connect--;
tsock->blocked_connect = false;
}
if (tsock->endp)
{
free(tsock->endp);
usrsocktest_endp_malloc_cnt--;
tsock->endp = NULL;
}
if (tsock->recv_avail_bytes == 0)
{
priv->sockets_recv_empty--;
}
if (tsock->connect_refused)
{
priv->sockets_not_connected_refused--;
}
if (tsock->disconnected)
{
priv->sockets_remote_disconnected--;
}
tsock->opened = false;
priv->sockets_active--;
return 0;
}
static int tsock_send_event(int fd, FAR struct daemon_priv_s *priv,
FAR struct test_socket_s *tsock, int events)
{
FAR struct usrsock_message_socket_event_s event = {};
ssize_t wlen;
int i;
event.head.flags = USRSOCK_MESSAGE_FLAG_EVENT;
event.head.msgid = USRSOCK_MESSAGE_SOCKET_EVENT;
for (i = 0; i < ARRAY_SIZE(priv->test_sockets); i++)
{
if (tsock == &priv->test_sockets[i])
break;
}
if (i == ARRAY_SIZE(priv->test_sockets))
return -EINVAL;
event.usockid = i + TEST_SOCKET_SOCKID_BASE;
event.events = events;
wlen = write(fd, &event, sizeof(event));
if (wlen < 0)
return -errno;
if (wlen != sizeof(event))
return -ENOSPC;
return OK;
}
static FAR void *find_endpoint(FAR struct daemon_priv_s *priv,
in_addr_t ipaddr)
{
FAR struct sockaddr_in *endpaddr;
int ok;
endpaddr = malloc(sizeof(*endpaddr));
usrsocktest_endp_malloc_cnt++;
assert(endpaddr);
ok = inet_pton(AF_INET, priv->conf->endpoint_addr,
&endpaddr->sin_addr.s_addr);
endpaddr->sin_family = AF_INET;
endpaddr->sin_port = htons(priv->conf->endpoint_port);
assert(ok);
if (endpaddr->sin_addr.s_addr == ipaddr)
return endpaddr;
free(endpaddr);
usrsocktest_endp_malloc_cnt--;
return NULL;
}
static bool endpoint_connect(FAR struct daemon_priv_s *priv, FAR void *endp,
uint16_t port)
{
FAR struct sockaddr_in *endpaddr = endp;
if (endpaddr->sin_port == port)
return true;
else
return false;
}
static void get_endpoint_sockaddr(FAR void *endp,
FAR struct sockaddr_in *endpaddr)
{
*endpaddr = *(FAR struct sockaddr_in *)endp;
}
static ssize_t
read_req(int fd, FAR const struct usrsock_request_common_s *common_hdr,
FAR void *req, size_t reqsize)
{
ssize_t rlen;
int err;
rlen = read(fd, (uint8_t *)req + sizeof(*common_hdr),
reqsize - sizeof(*common_hdr));
if (rlen < 0)
{
err = errno;
usrsocktest_dbg("Error reading %d bytes of request: ret=%d, errno=%d\n",
reqsize - sizeof(*common_hdr), (int)rlen, errno);
return -err;
}
if (rlen + sizeof(*common_hdr) != reqsize)
{
return -EMSGSIZE;
}
return rlen;
}
static int socket_request(int fd, FAR struct daemon_priv_s *priv,
FAR void *hdrbuf)
{
FAR struct usrsock_request_socket_s *req = hdrbuf;
struct usrsock_message_req_ack_s resp = {};
int socketid;
ssize_t wlen;
/* Validate input. */
if (req->domain != priv->conf->supported_domain)
{
socketid = -EAFNOSUPPORT;
}
else if (req->type != priv->conf->supported_type ||
req->protocol != priv->conf->supported_protocol)
{
socketid = -EPROTONOSUPPORT;
}
else if (priv->sockets_active >= priv->conf->max_sockets)
{
socketid = -EMFILE;
}
else
{
/* Allocate socket. */
socketid = test_socket_alloc(priv);
if (socketid < 0)
socketid = -ENFILE;
}
/* Prepare response. */
resp.head.msgid = USRSOCK_MESSAGE_RESPONSE_ACK;
resp.head.flags = 0;
resp.xid = req->head.xid;
resp.result = socketid;
/* Send response. */
wlen = write(fd, &resp, sizeof(resp));
if (wlen < 0)
return -errno;
if (wlen != sizeof(resp))
return -ENOSPC;
return OK;
}
static int close_request(int fd, FAR struct daemon_priv_s *priv,
FAR void *hdrbuf)
{
FAR struct usrsock_request_close_s *req = hdrbuf;
struct usrsock_message_req_ack_s resp = {};
ssize_t wlen;
int ret;
/* Check if this socket exists. */
ret = test_socket_free(priv, req->usockid);
/* Prepare response. */
resp.head.msgid = USRSOCK_MESSAGE_RESPONSE_ACK;
resp.xid = req->head.xid;
if (priv->conf->delay_all_responses)
{
resp.head.flags = USRSOCK_MESSAGE_FLAG_REQ_IN_PROGRESS;
resp.result = -EAGAIN;
}
else
{
resp.head.flags = 0;
resp.result = ret;
}
/* Send response. */
wlen = write(fd, &resp, sizeof(resp));
if (wlen < 0)
return -errno;
if (wlen != sizeof(resp))
return -ENOSPC;
if (priv->conf->delay_all_responses)
{
pthread_mutex_unlock(&daemon_mutex);
usleep(50 * 1000);
pthread_mutex_lock(&daemon_mutex);
/* Previous write was acknowledgment to request, informing that request
* is still in progress. Now write actual completion response. */
resp.result = ret;
resp.head.flags &= ~USRSOCK_MESSAGE_FLAG_REQ_IN_PROGRESS;
wlen = write(fd, &resp, sizeof(resp));
if (wlen < 0)
return -errno;
if (wlen != sizeof(resp))
return -ENOSPC;
}
return OK;
}
static int connect_request(int fd, FAR struct daemon_priv_s *priv,
FAR void *hdrbuf)
{
FAR struct usrsock_request_connect_s *req = hdrbuf;
struct sockaddr_in addr;
FAR struct test_socket_s *tsock;
struct usrsock_message_req_ack_s resp = {};
ssize_t wlen, rlen;
int ret = 0;
DEBUGASSERT(priv);
DEBUGASSERT(req);
/* Check if this socket exists. */
tsock = test_socket_get(priv, req->usockid);
if (!tsock)
{
ret = -EBADFD;
goto prepare;
}
/* Check if this socket is already connected. */
if (tsock->connected)
{
ret = -EISCONN;
goto prepare;
}
/* Check if address size ok. */
if (req->addrlen > sizeof(addr))
{
ret = -EFAULT;
goto prepare;
}
/* Read address. */
rlen = read(fd, &addr, sizeof(addr));
if (rlen < 0 || rlen < req->addrlen)
{
ret = -EFAULT;
goto prepare;
}
/* Check address family. */
if (addr.sin_family != priv->conf->supported_domain)
{
ret = -EAFNOSUPPORT;
goto prepare;
}
/* Check if there is endpoint with target address */
tsock->endp = find_endpoint(priv, addr.sin_addr.s_addr);
if (!tsock->endp)
{
ret = -ENETUNREACH;
goto prepare;
}
/* Check if there is port open at endpoint */
if (!endpoint_connect(priv, tsock->endp, addr.sin_port))
{
free(tsock->endp);
usrsocktest_endp_malloc_cnt--;
tsock->endp = NULL;
ret = -ECONNREFUSED;
goto prepare;
}
ret = OK;
prepare:
/* Prepare response. */
resp.xid = req->head.xid;
resp.head.msgid = USRSOCK_MESSAGE_RESPONSE_ACK;
resp.head.flags = 0;
if (priv->conf->endpoint_block_connect)
{
resp.head.flags = USRSOCK_MESSAGE_FLAG_REQ_IN_PROGRESS;
resp.result = ret;
/* Mark connection as blocked */
priv->sockets_waiting_connect++;
tsock->blocked_connect = true;
tsock->pending_resp = resp;
}
else if (priv->conf->delay_all_responses)
{
resp.head.flags = USRSOCK_MESSAGE_FLAG_REQ_IN_PROGRESS;
resp.result = -EINPROGRESS;
tsock->blocked_connect = false;
}
else
{
if (ret == OK)
{
priv->sockets_connected++;
tsock->connected = true;
}
resp.head.flags = 0;
resp.result = ret;
tsock->blocked_connect = false;
}
/* Send response. */
wlen = write(fd, &resp, sizeof(resp));
if (wlen < 0)
return -errno;
if (wlen != sizeof(resp))
return -ENOSPC;
if (priv->conf->endpoint_block_connect)
{
tsock->pending_resp.head.flags &= ~USRSOCK_MESSAGE_FLAG_REQ_IN_PROGRESS;
}
else
{
int events;
if (priv->conf->delay_all_responses)
{
pthread_mutex_unlock(&daemon_mutex);
usleep(50 * 1000);
pthread_mutex_lock(&daemon_mutex);
/* Previous write was acknowledgment to request, informing that request
* is still in progress. Now write actual completion response. */
resp.result = ret;
if (ret == OK)
{
priv->sockets_connected++;
tsock->connected = true;
}
resp.head.flags &= ~USRSOCK_MESSAGE_FLAG_REQ_IN_PROGRESS;
wlen = write(fd, &resp, sizeof(resp));
if (wlen < 0)
return -errno;
if (wlen != sizeof(resp))
return -ENOSPC;
}
events = 0;
if (!tsock->block_send)
events |= USRSOCK_EVENT_SENDTO_READY;
if (tsock->recv_avail_bytes > 0)
events |= USRSOCK_EVENT_RECVFROM_AVAIL;
if (events)
{
wlen = tsock_send_event(fd, priv, tsock, events);
if (wlen < 0)
return wlen;
}
}
return OK;
}
static int sendto_request(int fd, FAR struct daemon_priv_s *priv,
FAR void *hdrbuf)
{
FAR struct usrsock_request_sendto_s *req = hdrbuf;
FAR struct test_socket_s *tsock;
struct usrsock_message_req_ack_s resp = {};
ssize_t wlen, rlen;
int ret = 0;
uint8_t sendbuf[16];
int sendbuflen = 0;
DEBUGASSERT(priv);
DEBUGASSERT(req);
/* Check if this socket exists. */
tsock = test_socket_get(priv, req->usockid);
if (!tsock)
{
ret = -EBADFD;
goto prepare;
}
/* Check if this socket is connected. */
if (!tsock->connected)
{
ret = -ENOTCONN;
goto prepare;
}
/* Check if address size non-zero. */
if (req->addrlen > 0)
{
ret = -EISCONN; /* connection-mode socket do not accept address */
goto prepare;
}
/* Can send? */
if (!tsock->block_send)
{
/* Check if request has data. */
if (req->buflen > 0)
{
sendbuflen = req->buflen;
if (sendbuflen > sizeof(sendbuf))
sendbuflen = sizeof(sendbuf);
/* Read data. */
rlen = read(fd, sendbuf, sendbuflen);
if (rlen < 0 || rlen < sendbuflen)
{
ret = -EFAULT;
goto prepare;
}
/* Debug print */
usrsocktest_dbg("got %d bytes of data: '%.*s'\n",
sendbuflen, sendbuflen, sendbuf);
}
}
else
{
ret = -EAGAIN; /* blocked. */
goto prepare;
}
ret = sendbuflen;
prepare:
/* Prepare response. */
resp.xid = req->head.xid;
resp.head.msgid = USRSOCK_MESSAGE_RESPONSE_ACK;
resp.head.flags = 0;
if (priv->conf->delay_all_responses)
{
resp.head.flags = USRSOCK_MESSAGE_FLAG_REQ_IN_PROGRESS;
resp.result = -EINPROGRESS;
}
else
{
if (ret > 0)
{
priv->total_send_bytes += ret;
}
resp.head.flags = 0;
resp.result = ret;
}
/* Send response. */
wlen = write(fd, &resp, sizeof(resp));
if (wlen < 0)
return -errno;
if (wlen != sizeof(resp))
return -ENOSPC;
if (priv->conf->delay_all_responses)
{
pthread_mutex_unlock(&daemon_mutex);
usleep(50 * 1000);
pthread_mutex_lock(&daemon_mutex);
/* Previous write was acknowledgment to request, informing that request
* is still in progress. Now write actual completion response. */
resp.result = ret;
if (ret > 0)
{
priv->total_send_bytes += ret;
}
resp.head.flags &= ~USRSOCK_MESSAGE_FLAG_REQ_IN_PROGRESS;
wlen = write(fd, &resp, sizeof(resp));
if (wlen < 0)
return -errno;
if (wlen != sizeof(resp))
return -ENOSPC;
}
if (!tsock->block_send)
{
/* Let kernel-side know that there is space for more send data. */
wlen = tsock_send_event(fd, priv, tsock, USRSOCK_EVENT_SENDTO_READY);
if (wlen < 0)
return wlen;
}
return OK;
}
static int recvfrom_request(int fd, FAR struct daemon_priv_s *priv,
FAR void *hdrbuf)
{
FAR struct usrsock_request_recvfrom_s *req = hdrbuf;
FAR struct test_socket_s *tsock;
struct usrsock_message_datareq_ack_s resp = {};
ssize_t wlen;
size_t i;
int ret = 0;
size_t outbuflen;
struct sockaddr_in endpointaddr;
DEBUGASSERT(priv);
DEBUGASSERT(req);
/* Check if this socket exists. */
tsock = test_socket_get(priv, req->usockid);
if (!tsock)
{
ret = -EBADFD;
goto prepare;
}
/* Check if this socket is connected. */
if (!tsock->connected)
{
ret = -ENOTCONN;
goto prepare;
}
get_endpoint_sockaddr(tsock->endp, &endpointaddr);
/* Do we have recv data available? */
if (tsock->recv_avail_bytes > 0)
{
outbuflen = req->max_buflen;
if (outbuflen > tsock->recv_avail_bytes)
{
outbuflen = tsock->recv_avail_bytes;
}
}
else
{
ret = -EAGAIN; /* blocked. */
goto prepare;
}
ret = outbuflen;
prepare:
/* Prepare response. */
resp.reqack.xid = req->head.xid;
resp.reqack.head.msgid = USRSOCK_MESSAGE_RESPONSE_DATA_ACK;
resp.reqack.head.flags = 0;
if (priv->conf->delay_all_responses)
{
resp.reqack.head.flags = USRSOCK_MESSAGE_FLAG_REQ_IN_PROGRESS;
resp.reqack.result = -EINPROGRESS;
resp.valuelen = 0;
resp.valuelen_nontrunc = 0;
/* Send ack response. */
wlen = write(fd, &resp, sizeof(resp));
if (wlen < 0)
return -errno;
if (wlen != sizeof(resp))
return -ENOSPC;
pthread_mutex_unlock(&daemon_mutex);
usleep(50 * 1000);
pthread_mutex_lock(&daemon_mutex);
/* Previous write was acknowledgment to request, informing that request
* is still in progress. Now write actual completion response. */
resp.reqack.head.msgid = USRSOCK_MESSAGE_RESPONSE_DATA_ACK;
resp.reqack.head.flags &= ~USRSOCK_MESSAGE_FLAG_REQ_IN_PROGRESS;
}
resp.reqack.head.flags = 0;
resp.reqack.result = ret;
if (ret >= 0)
{
priv->total_recv_bytes += ret;
resp.valuelen_nontrunc = sizeof(endpointaddr);
resp.valuelen = resp.valuelen_nontrunc;
if (resp.valuelen > req->max_addrlen)
resp.valuelen = req->max_addrlen;
}
else
{
resp.valuelen = 0;
resp.valuelen_nontrunc = 0;
}
/* Send response. */
wlen = write(fd, &resp, sizeof(resp));
if (wlen < 0)
return -errno;
if (wlen != sizeof(resp))
return -ENOSPC;
if (resp.valuelen > 0)
{
/* Send address (value) */
wlen = write(fd, &endpointaddr, resp.valuelen);
if (wlen < 0)
return -errno;
if (wlen != resp.valuelen)
return -ENOSPC;
}
if (resp.reqack.result > 0)
{
/* Send buffer */
for (i = 0; i < resp.reqack.result; i++)
{
char tmp = 'a' + i;
tsock->recv_avail_bytes--;
wlen = write(fd, &tmp, 1);
if (wlen < 0)
return -errno;
if (wlen != 1)
return -ENOSPC;
}
if (tsock->recv_avail_bytes == 0)
priv->sockets_recv_empty++;
}
if (tsock->recv_avail_bytes > 0)
{
/* Let kernel-side know that there is more recv data. */
wlen = tsock_send_event(fd, priv, tsock, USRSOCK_EVENT_RECVFROM_AVAIL);
if (wlen < 0)
return wlen;
}
return OK;
}
static int setsockopt_request(int fd, FAR struct daemon_priv_s *priv,
FAR void *hdrbuf)
{
FAR struct usrsock_request_setsockopt_s *req = hdrbuf;
FAR struct test_socket_s *tsock;
struct usrsock_message_req_ack_s resp = {};
ssize_t wlen, rlen;
int ret = 0;
int value;
DEBUGASSERT(priv);
DEBUGASSERT(req);
/* Check if this socket exists. */
tsock = test_socket_get(priv, req->usockid);
if (!tsock)
{
ret = -EBADFD;
goto prepare;
}
if (req->level != SOL_SOCKET)
{
usrsocktest_dbg("setsockopt: level=%d not supported\n", req->level);
ret = -ENOPROTOOPT;
goto prepare;
}
if (req->option != SO_REUSEADDR)
{
usrsocktest_dbg("setsockopt: option=%d not supported\n", req->option);
ret = -ENOPROTOOPT;
goto prepare;
}
if (req->valuelen < sizeof(value))
{
ret = -EINVAL;
goto prepare;
}
/* Read value. */
rlen = read(fd, &value, sizeof(value));
if (rlen < 0 || rlen < sizeof(value))
{
ret = -EFAULT;
goto prepare;
}
/* Debug print */
usrsocktest_dbg("setsockopt: option=%d value=%d\n", req->option, value);
ret = OK;
prepare:
/* Prepare response. */
resp.xid = req->head.xid;
resp.head.msgid = USRSOCK_MESSAGE_RESPONSE_ACK;
resp.head.flags = 0;
if (priv->conf->delay_all_responses)
{
resp.head.flags = USRSOCK_MESSAGE_FLAG_REQ_IN_PROGRESS;
resp.result = -EINPROGRESS;
}
else
{
resp.head.flags = 0;
resp.result = ret;
}
/* Send response. */
wlen = write(fd, &resp, sizeof(resp));
if (wlen < 0)
return -errno;
if (wlen != sizeof(resp))
return -ENOSPC;
if (priv->conf->delay_all_responses)
{
pthread_mutex_unlock(&daemon_mutex);
usleep(50 * 1000);
pthread_mutex_lock(&daemon_mutex);
/* Previous write was acknowledgment to request, informing that request
* is still in progress. Now write actual completion response. */
resp.result = ret;
resp.head.flags &= ~USRSOCK_MESSAGE_FLAG_REQ_IN_PROGRESS;
wlen = write(fd, &resp, sizeof(resp));
if (wlen < 0)
return -errno;
if (wlen != sizeof(resp))
return -ENOSPC;
}
return OK;
}
static int getsockopt_request(int fd, FAR struct daemon_priv_s *priv,
FAR void *hdrbuf)
{
FAR struct usrsock_request_getsockopt_s *req = hdrbuf;
FAR struct test_socket_s *tsock;
struct usrsock_message_datareq_ack_s resp = {};
ssize_t wlen;
int ret = 0;
int value;
DEBUGASSERT(priv);
DEBUGASSERT(req);
/* Check if this socket exists. */
tsock = test_socket_get(priv, req->usockid);
if (!tsock)
{
ret = -EBADFD;
goto prepare;
}
if (req->level != SOL_SOCKET)
{
usrsocktest_dbg("getsockopt: level=%d not supported\n", req->level);
ret = -ENOPROTOOPT;
goto prepare;
}
if (req->option != SO_REUSEADDR)
{
usrsocktest_dbg("getsockopt: option=%d not supported\n", req->option);
ret = -ENOPROTOOPT;
goto prepare;
}
if (req->max_valuelen < sizeof(value))
{
ret = -EINVAL;
goto prepare;
}
value = 0;
ret = OK;
prepare:
/* Prepare response. */
resp.reqack.xid = req->head.xid;
resp.reqack.head.msgid = USRSOCK_MESSAGE_RESPONSE_DATA_ACK;
resp.reqack.head.flags = 0;
if (priv->conf->delay_all_responses)
{
resp.reqack.head.flags = USRSOCK_MESSAGE_FLAG_REQ_IN_PROGRESS;
resp.reqack.result = -EINPROGRESS;
resp.valuelen = 0;
resp.valuelen_nontrunc = 0;
/* Send ack response. */
wlen = write(fd, &resp, sizeof(resp));
if (wlen < 0)
return -errno;
if (wlen != sizeof(resp))
return -ENOSPC;
pthread_mutex_unlock(&daemon_mutex);
usleep(50 * 1000);
pthread_mutex_lock(&daemon_mutex);
/* Previous write was acknowledgment to request, informing that request
* is still in progress. Now write actual completion response. */
resp.reqack.head.msgid = USRSOCK_MESSAGE_RESPONSE_DATA_ACK;
resp.reqack.head.flags &= ~USRSOCK_MESSAGE_FLAG_REQ_IN_PROGRESS;
}
resp.reqack.head.flags = 0;
resp.reqack.result = ret;
if (ret >= 0)
{
resp.valuelen = sizeof(value);
}
else
{
resp.valuelen = 0;
}
/* Send response. */
wlen = write(fd, &resp, sizeof(resp));
if (wlen < 0)
return -errno;
if (wlen != sizeof(resp))
return -ENOSPC;
if (resp.valuelen > 0)
{
/* Send address (value) */
wlen = write(fd, &value, resp.valuelen);
if (wlen < 0)
return -errno;
if (wlen != resp.valuelen)
return -ENOSPC;
}
return OK;
}
static int getsockname_request(int fd, FAR struct daemon_priv_s *priv,
FAR void *hdrbuf)
{
FAR struct usrsock_request_getsockname_s *req = hdrbuf;
FAR struct test_socket_s *tsock;
struct usrsock_message_datareq_ack_s resp = {};
ssize_t wlen;
int ret = 0;
struct sockaddr_in addr;
DEBUGASSERT(priv);
DEBUGASSERT(req);
/* Check if this socket exists. */
tsock = test_socket_get(priv, req->usockid);
if (!tsock)
{
ret = -EBADFD;
goto prepare;
}
ret = inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr.s_addr);
addr.sin_family = AF_INET;
addr.sin_port = htons(12345);
ret = ret == 1 ? 0 : -EINVAL;
prepare:
/* Prepare response. */
resp.reqack.xid = req->head.xid;
resp.reqack.head.msgid = USRSOCK_MESSAGE_RESPONSE_DATA_ACK;
resp.reqack.head.flags = 0;
if (priv->conf->delay_all_responses)
{
resp.reqack.head.flags = USRSOCK_MESSAGE_FLAG_REQ_IN_PROGRESS;
resp.reqack.result = -EINPROGRESS;
resp.valuelen = 0;
resp.valuelen_nontrunc = 0;
/* Send ack response. */
wlen = write(fd, &resp, sizeof(resp));
if (wlen < 0)
return -errno;
if (wlen != sizeof(resp))
return -ENOSPC;
pthread_mutex_unlock(&daemon_mutex);
usleep(50 * 1000);
pthread_mutex_lock(&daemon_mutex);
/* Previous write was acknowledgment to request, informing that request
* is still in progress. Now write actual completion response. */
resp.reqack.head.msgid = USRSOCK_MESSAGE_RESPONSE_DATA_ACK;
resp.reqack.head.flags &= ~USRSOCK_MESSAGE_FLAG_REQ_IN_PROGRESS;
}
resp.reqack.head.flags = 0;
resp.reqack.result = ret;
if (ret >= 0)
{
resp.valuelen = sizeof(addr);
resp.valuelen_nontrunc = sizeof(addr);
if (resp.valuelen > req->max_addrlen)
resp.valuelen = req->max_addrlen;
}
else
{
resp.valuelen = 0;
resp.valuelen_nontrunc = 0;
}
/* Send response. */
wlen = write(fd, &resp, sizeof(resp));
if (wlen < 0)
return -errno;
if (wlen != sizeof(resp))
return -ENOSPC;
if (resp.valuelen > 0)
{
/* Send address (value) */
wlen = write(fd, &addr, resp.valuelen);
if (wlen < 0)
return -errno;
if (wlen != resp.valuelen)
return -ENOSPC;
}
return OK;
}
static int handle_usrsock_request(int fd, FAR struct daemon_priv_s *priv)
{
static const struct
{
unsigned int hdrlen;
int (CODE *fn)(int fd, FAR struct daemon_priv_s *priv, FAR void *req);
} handlers[USRSOCK_REQUEST__MAX] =
{
[USRSOCK_REQUEST_SOCKET] =
{
sizeof(struct usrsock_request_socket_s),
socket_request,
},
[USRSOCK_REQUEST_CLOSE] =
{
sizeof(struct usrsock_request_close_s),
close_request,
},
[USRSOCK_REQUEST_CONNECT] =
{
sizeof(struct usrsock_request_connect_s),
connect_request,
},
[USRSOCK_REQUEST_SENDTO] =
{
sizeof(struct usrsock_request_sendto_s),
sendto_request,
},
[USRSOCK_REQUEST_RECVFROM] =
{
sizeof(struct usrsock_request_recvfrom_s),
recvfrom_request,
},
[USRSOCK_REQUEST_SETSOCKOPT] =
{
sizeof(struct usrsock_request_setsockopt_s),
setsockopt_request,
},
[USRSOCK_REQUEST_GETSOCKOPT] =
{
sizeof(struct usrsock_request_getsockopt_s),
getsockopt_request,
},
[USRSOCK_REQUEST_GETSOCKNAME] =
{
sizeof(struct usrsock_request_getsockname_s),
getsockname_request,
},
};
uint8_t hdrbuf[16];
FAR struct usrsock_request_common_s *common_hdr = (FAR void *)hdrbuf;
ssize_t rlen;
rlen = read(fd, common_hdr, sizeof(*common_hdr));
if (rlen < 0)
return -errno;
if (rlen != sizeof(*common_hdr))
return -EMSGSIZE;
if (common_hdr->reqid >= USRSOCK_REQUEST__MAX ||
!handlers[common_hdr->reqid].fn)
{
usrsocktest_dbg("Unknown request type: %d\n", common_hdr->reqid);
return -EIO;
}
assert(handlers[common_hdr->reqid].hdrlen < sizeof(hdrbuf));
rlen = read_req(fd, common_hdr, hdrbuf, handlers[common_hdr->reqid].hdrlen);
if (rlen < 0)
return rlen;
return handlers[common_hdr->reqid].fn(fd, priv, hdrbuf);
}
static int unblock_sendto(int fd, FAR struct daemon_priv_s *priv,
FAR struct test_socket_s *tsock)
{
if (tsock->block_send)
{
int ret;
tsock->block_send = false;
ret = tsock_send_event(fd, priv, tsock, USRSOCK_EVENT_SENDTO_READY);
if (ret < 0)
return ret;
}
return OK;
}
static int reset_recv_avail(int fd, FAR struct daemon_priv_s *priv,
FAR struct test_socket_s *tsock)
{
if (tsock->recv_avail_bytes == 0)
{
int ret;
priv->sockets_recv_empty--;
tsock->recv_avail_bytes = priv->conf->endpoint_recv_avail;
ret = tsock_send_event(fd, priv, tsock,
USRSOCK_EVENT_RECVFROM_AVAIL);
if (ret < 0)
return ret;
}
return OK;
}
static int disconnect_connection(int fd, FAR struct daemon_priv_s *priv,
FAR struct test_socket_s *tsock)
{
if (tsock->connected)
{
int ret;
tsock->disconnected = true;
tsock->connected = false;
priv->sockets_connected--;
priv->sockets_remote_disconnected++;
ret = tsock_send_event(fd, priv, tsock, USRSOCK_EVENT_REMOTE_CLOSED);
if (ret < 0)
return ret;
}
return OK;
}
static int establish_blocked_connection(int fd, FAR struct daemon_priv_s *priv,
FAR struct test_socket_s *tsock)
{
if (tsock->blocked_connect)
{
FAR struct usrsock_message_req_ack_s *resp = &tsock->pending_resp;
ssize_t wlen;
int events;
if (resp->result == OK)
{
priv->sockets_connected++;
tsock->connected = true;
}
tsock->blocked_connect = false;
priv->sockets_waiting_connect--;
resp->head.flags &= ~USRSOCK_MESSAGE_FLAG_REQ_IN_PROGRESS;
wlen = write(fd, resp, sizeof(*resp));
if (wlen < 0)
return -errno;
if (wlen != sizeof(*resp))
return -ENOSPC;
events = 0;
if (!tsock->block_send)
events |= USRSOCK_EVENT_SENDTO_READY;
if (tsock->recv_avail_bytes > 0)
events |= USRSOCK_EVENT_RECVFROM_AVAIL;
if (events)
{
wlen = tsock_send_event(fd, priv, tsock, events);
if (wlen < 0)
return wlen;
}
}
return OK;
}
static int fail_blocked_connection(int fd, FAR struct daemon_priv_s *priv,
FAR struct test_socket_s *tsock)
{
if (tsock->blocked_connect)
{
FAR struct usrsock_message_req_ack_s *resp = &tsock->pending_resp;
ssize_t wlen;
resp->result = -ECONNREFUSED;
priv->sockets_not_connected_refused++;
tsock->connect_refused = true;
tsock->blocked_connect = false;
priv->sockets_waiting_connect--;
resp->head.flags &= ~USRSOCK_MESSAGE_FLAG_REQ_IN_PROGRESS;
wlen = write(fd, resp, sizeof(*resp));
if (wlen < 0)
return -errno;
if (wlen != sizeof(*resp))
return -ENOSPC;
}
return OK;
}
static int for_each_connection(int fd, FAR struct daemon_priv_s *priv,
int (CODE *iter_fn)(
int fd,
FAR struct daemon_priv_s *priv,
FAR struct test_socket_s *tsock))
{
int i;
for (i = 0; i < ARRAY_SIZE(priv->test_sockets); i++)
{
FAR struct test_socket_s *tsock = &priv->test_sockets[i];
if (tsock->opened)
{
int ret = iter_fn(fd, priv, tsock);
if (ret < 0)
return ret;
}
}
return OK;
}
static FAR void *usrsocktest_daemon(FAR void *param)
{
FAR struct daemon_priv_s *priv = param;
bool stopped;
int ret;
int fd;
usrsocktest_dbg("\n");
priv->sockets_active = 0;
fd = open("/dev/usrsock", O_RDWR);
if (fd < 0)
{
ret = -errno;
goto errout;
}
do
{
struct pollfd pfd[2] = {};
int npfds = 0;
int usrsock_pfdpos = -1;
int pipe_pdfpos = -1;
stopped = false;
/* Wait for request from kernel side. */
pthread_mutex_lock(&daemon_mutex);
if (!priv->do_not_poll_usrsock && fd >= 0)
{
pfd[npfds].fd = fd;
pfd[npfds].events = POLLIN;
usrsock_pfdpos = npfds++;
}
pfd[npfds].fd = priv->pipefd[0];
pfd[npfds].events = POLLIN;
pipe_pdfpos = npfds++;
pthread_mutex_unlock(&daemon_mutex);
ret = poll(pfd, npfds, -1);
if (ret < 0)
{
/* Error? */
ret = -errno;
goto errout;
}
if (usrsock_pfdpos >= 0 && (pfd[usrsock_pfdpos].revents & POLLIN))
{
pthread_mutex_lock(&daemon_mutex);
ret = handle_usrsock_request(fd, priv);
pthread_mutex_unlock(&daemon_mutex);
if (ret < 0)
goto errout;
}
if (pipe_pdfpos >= 0 && (pfd[pipe_pdfpos].revents & POLLIN))
{
char in;
if (read(pfd[pipe_pdfpos].fd, &in, 1) == 1)
{
pthread_mutex_lock(&daemon_mutex);
switch (in)
{
case 'S':
stopped = true;
ret = 0;
break;
case 's':
stopped = false;
ret = 0;
break;
case 'E':
ret = for_each_connection(fd, priv,
&establish_blocked_connection);
break;
case 'F':
ret = for_each_connection(fd, priv,
&fail_blocked_connection);
break;
case 'D':
ret = for_each_connection(fd, priv,
&disconnect_connection);
break;
case 'W':
ret = for_each_connection(fd, priv,
&unblock_sendto);
break;
case 'r':
ret = for_each_connection(fd, priv,
&reset_recv_avail);
break;
case 'K':
/* Kill usrsockdev */
if (fd >= 0)
{
close(fd);
fd = -1;
}
break;
case '*':
sem_post(&priv->wakewaitsem);
break; /* woke thread. */
}
pthread_mutex_unlock(&daemon_mutex);
if (ret < 0)
goto errout;
}
}
usleep(1);
}
while (!stopped);
ret = OK;
errout:
if (fd >= 0)
{
close(fd);
}
usrsocktest_dbg("ret: %d\n", ret);
return (FAR void *)(intptr_t)ret;
}
static int get_daemon_value(FAR struct daemon_priv_s *priv,
FAR void *dst, FAR const void *src, size_t len)
{
int ret = 0;
if ((uintptr_t)src < (uintptr_t)priv ||
(uintptr_t)src >= (uintptr_t)priv + sizeof(*priv) || len <= 0)
{
/* Not daemon value */
return -EINVAL;
}
pthread_mutex_lock(&daemon_mutex);
if (priv->conf == NULL)
{
/* Not running? */
ret = -ENODEV;
goto out;
}
memmove(dst, src, len);
out:
pthread_mutex_unlock(&daemon_mutex);
return ret;
}
static FAR void *delayed_cmd_thread(FAR void *priv)
{
FAR struct delayed_cmd_s *cmd = priv;
if (cmd->delay_msec)
sem_post(&cmd->startsem);
usleep(cmd->delay_msec * 1000);
write(cmd->pipefd, &cmd->cmd, 1);
if (!cmd->delay_msec)
sem_post(&cmd->startsem);
return NULL;
}
/****************************************************************************
* Public Functions
****************************************************************************/
int usrsocktest_daemon_start(FAR const struct usrsocktest_daemon_conf_s *conf)
{
FAR struct daemon_priv_s *priv = &g_ub_daemon;
pthread_attr_t attr;
int ret;
usrsocktest_dbg("\n");
pthread_mutex_lock(&daemon_mutex);
if (priv->conf != NULL || !priv->joined)
{
/* Already running? */
ret = -EALREADY;
goto out;
}
/* Clear daemon private data. */
memset(priv, 0, sizeof(*priv));
/* Allocate pipe for daemon commands. */
ret = pipe(priv->pipefd);
if (ret != OK)
{
ret = -errno;
goto out;
}
ret = pthread_attr_init(&attr);
if (ret != OK)
{
ret = -ret;
goto errout_closepipe;
}
sem_init(&priv->wakewaitsem, 0, 0);
priv->joined = false;
priv->conf = conf;
ret = pthread_create(&priv->tid, &attr, usrsocktest_daemon, priv);
if (ret != OK)
{
sem_destroy(&priv->wakewaitsem);
priv->joined = true;
priv->conf = NULL;
ret = -ret;
goto errout_closepipe;
}
errout_closepipe:
if (ret != OK)
{
close(priv->pipefd[0]);
close(priv->pipefd[1]);
}
out:
pthread_mutex_unlock(&daemon_mutex);
usrsocktest_dbg("ret: %d\n", ret);
return ret;
}
int usrsocktest_daemon_stop(void)
{
FAR struct daemon_priv_s *priv = &g_ub_daemon;
FAR struct delayed_cmd_s *item, *next;
FAR pthread_addr_t retval;
char stopped;
int ret;
int i;
usrsocktest_dbg("\n");
pthread_mutex_lock(&daemon_mutex);
if (priv->conf == NULL)
{
/* Not running? */
ret = -ENODEV;
goto out;
}
item = (void *)sq_peek(&priv->delayed_cmd_threads);
while (item)
{
next = (void *)sq_next(&item->node);
pthread_mutex_unlock(&daemon_mutex);
pthread_join(item->tid, &retval);
pthread_mutex_lock(&daemon_mutex);
sq_rem(&item->node, &priv->delayed_cmd_threads);
free(item);
usrsocktest_dcmd_malloc_cnt--;
item = next;
}
pthread_mutex_unlock(&daemon_mutex);
stopped = 'S';
write(priv->pipefd[1], &stopped, 1);
ret = pthread_join(priv->tid, &retval);
pthread_mutex_lock(&daemon_mutex);
if (ret != OK)
{
ret = -ret;
goto out;
}
for (i = 0; i < ARRAY_SIZE(priv->test_sockets); i++)
{
if (priv->test_sockets[i].opened && priv->test_sockets[i].endp != NULL)
{
free(priv->test_sockets[i].endp);
priv->test_sockets[i].endp = NULL;
usrsocktest_endp_malloc_cnt--;
}
}
priv->conf = NULL;
close(priv->pipefd[0]);
close(priv->pipefd[1]);
sem_destroy(&priv->wakewaitsem);
priv->joined = true;
ret = (intptr_t)retval;
out:
pthread_mutex_unlock(&daemon_mutex);
usrsocktest_dbg("ret: %d\n", ret);
return ret;
}
int usrsocktest_daemon_get_num_active_sockets(void)
{
FAR struct daemon_priv_s *priv = &g_ub_daemon;
int ret, err;
err = get_daemon_value(priv, &ret, &priv->sockets_active, sizeof(ret));
if (err < 0)
return err;
return ret;
}
int usrsocktest_daemon_get_num_connected_sockets(void)
{
FAR struct daemon_priv_s *priv = &g_ub_daemon;
int ret, err;
err = get_daemon_value(priv, &ret, &priv->sockets_connected, sizeof(ret));
if (err < 0)
return err;
return ret;
}
int usrsocktest_daemon_get_num_waiting_connect_sockets(void)
{
FAR struct daemon_priv_s *priv = &g_ub_daemon;
int ret, err;
err = get_daemon_value(priv, &ret, &priv->sockets_waiting_connect, sizeof(ret));
if (err < 0)
return err;
return ret;
}
int usrsocktest_daemon_get_num_recv_empty_sockets(void)
{
FAR struct daemon_priv_s *priv = &g_ub_daemon;
int ret, err;
err = get_daemon_value(priv, &ret, &priv->sockets_recv_empty, sizeof(ret));
if (err < 0)
return err;
return ret;
}
ssize_t usrsocktest_daemon_get_send_bytes(void)
{
FAR struct daemon_priv_s *priv = &g_ub_daemon;
size_t ret;
int err;
err = get_daemon_value(priv, &ret, &priv->total_send_bytes, sizeof(ret));
if (err < 0)
return err;
return ret;
}
ssize_t usrsocktest_daemon_get_recv_bytes(void)
{
FAR struct daemon_priv_s *priv = &g_ub_daemon;
size_t ret;
int err;
err = get_daemon_value(priv, &ret, &priv->total_recv_bytes, sizeof(ret));
if (err < 0)
return err;
return ret;
}
int usrsocktest_daemon_get_num_unreachable_sockets(void)
{
FAR struct daemon_priv_s *priv = &g_ub_daemon;
int ret, err;
err = get_daemon_value(priv, &ret, &priv->sockets_not_connected_refused,
sizeof(ret));
if (err < 0)
return err;
return ret;
}
int usrsocktest_daemon_get_num_remote_disconnected_sockets(void)
{
FAR struct daemon_priv_s *priv = &g_ub_daemon;
int ret, err;
err = get_daemon_value(priv, &ret, &priv->sockets_remote_disconnected,
sizeof(ret));
if (err < 0)
return err;
return ret;
}
int usrsocktest_daemon_pause_usrsock_handling(bool pause)
{
FAR struct daemon_priv_s *priv = &g_ub_daemon;
int ret;
char cmd = '*';
pthread_mutex_lock(&daemon_mutex);
if (priv->conf == NULL)
{
/* Not running? */
pthread_mutex_unlock(&daemon_mutex);
return -ENODEV;
}
priv->do_not_poll_usrsock = pause;
write(priv->pipefd[1], &cmd, 1);
ret = OK;
pthread_mutex_unlock(&daemon_mutex);
sem_wait(&priv->wakewaitsem);
return ret;
}
bool usrsocktest_send_delayed_command(const char cmd, unsigned int delay_msec)
{
FAR struct daemon_priv_s *priv = &g_ub_daemon;
pthread_attr_t attr;
FAR struct delayed_cmd_s *delayed_cmd;
int ret;
if (priv->conf == NULL)
{
/* Not running? */
return false;
}
delayed_cmd = calloc(1, sizeof(*delayed_cmd));
if (!delayed_cmd)
{
return false;
}
usrsocktest_dcmd_malloc_cnt++;
delayed_cmd->delay_msec = delay_msec;
delayed_cmd->cmd = cmd;
delayed_cmd->pipefd = priv->pipefd[1];
sem_init(&delayed_cmd->startsem, 0, 0);
ret = pthread_attr_init(&attr);
if (ret != OK)
{
free(delayed_cmd);
usrsocktest_dcmd_malloc_cnt--;
return false;
}
ret = pthread_create(&delayed_cmd->tid, &attr, delayed_cmd_thread,
delayed_cmd);
if (ret != OK)
{
free(delayed_cmd);
usrsocktest_dcmd_malloc_cnt--;
return false;
}
while (sem_wait(&delayed_cmd->startsem) != OK);
sq_addlast(&delayed_cmd->node, &priv->delayed_cmd_threads);
return true;
}
bool usrsocktest_daemon_establish_waiting_connections(void)
{
return usrsocktest_send_delayed_command('E', 0);
}