/****************************************************************************
 * apps/examples/usrsocktest/usrsocktest_daemon.c
 *
 *   Copyright (C) 2015, 2017 Haltian Ltd. All rights reserved.
 *    Author: Jussi Kivilinna <jussi.kivilinna@haltian.com>
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 *
 * 1. Redistributions of source code must retain the above copyright
 *    notice, this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright
 *    notice, this list of conditions and the following disclaimer in
 *    the documentation and/or other materials provided with the
 *    distribution.
 * 3. Neither the name NuttX nor the names of its contributors may be
 *    used to endorse or promote products derived from this software
 *    without specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
 * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
 * OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 *
 ****************************************************************************/

/****************************************************************************
 * Included Files
 ****************************************************************************/

#include <nuttx/config.h>

#include <stdint.h>
#include <stdbool.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <debug.h>
#include <fcntl.h>
#include <errno.h>
#include <poll.h>
#include <pthread.h>

#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <nuttx/net/usrsock.h>

#include "defines.h"

/****************************************************************************
 * Definitions
 ****************************************************************************/

#ifndef dbg
  #define dbg _warn
#endif

#define usrsocktest_dbg(...) ((void)0)

#define TEST_SOCKET_SOCKID_BASE 10000U
#define TEST_SOCKET_COUNT 8

#ifndef ARRAY_SIZE
#  define ARRAY_SIZE(x) (sizeof(x) / sizeof((x)[0]))
#endif

#define noinline

/****************************************************************************
 * Private Types
 ****************************************************************************/

struct test_socket_s
{
  bool opened:1;
  bool connected:1;
  bool blocked_connect:1;
  bool block_send:1;
  bool connect_refused:1;
  bool disconnected:1;
  int recv_avail_bytes;
  FAR void *endp;
  struct usrsock_message_req_ack_s pending_resp;
};

struct delayed_cmd_s
{
  sq_entry_t node;
  pthread_t tid;
  sem_t startsem;
  int pipefd;
  uint16_t delay_msec;
  char cmd;
};

/****************************************************************************
 * Private Data
 ****************************************************************************/

static struct daemon_priv_s
{
  FAR const struct usrsocktest_daemon_conf_s *conf;
  pthread_t tid;
  bool joined;

  int pipefd[2];
  sem_t wakewaitsem;
  unsigned int sockets_active;
  unsigned int sockets_connected;
  unsigned int sockets_waiting_connect;
  unsigned int sockets_recv_empty;
  unsigned int sockets_not_connected_refused;
  unsigned int sockets_remote_disconnected;
  size_t total_send_bytes;
  size_t total_recv_bytes;
  bool do_not_poll_usrsock;

  struct test_socket_s test_sockets[TEST_SOCKET_COUNT];
  sq_queue_t delayed_cmd_threads;
} g_ub_daemon =
  {
    .joined = true,
    .conf = NULL,
  };

static pthread_mutex_t daemon_mutex = PTHREAD_MUTEX_INITIALIZER;

/****************************************************************************
 * Public Data
 ****************************************************************************/

const struct usrsocktest_daemon_conf_s usrsocktest_daemon_defconf =
    USRSOCKTEST_DAEMON_CONF_DEFAULTS;
struct usrsocktest_daemon_conf_s usrsocktest_daemon_config;

/****************************************************************************
 * Private Functions
 ****************************************************************************/

static int test_socket_alloc(FAR struct daemon_priv_s *priv)
{
  int i;

  for (i = 0; i < ARRAY_SIZE(priv->test_sockets); i++)
    {
      FAR struct test_socket_s *tsock = &priv->test_sockets[i];

      if (!tsock->opened)
        {
          memset(tsock, 0, sizeof(*tsock));
          tsock->opened = true;
          tsock->block_send = priv->conf->endpoint_block_send;
          tsock->recv_avail_bytes =
              priv->conf->endpoint_recv_avail_from_start ?
                  priv->conf->endpoint_recv_avail : 0;
          priv->sockets_active++;
          if (tsock->recv_avail_bytes == 0)
            priv->sockets_recv_empty++;
          return i + TEST_SOCKET_SOCKID_BASE;
        }
    }

  return -1;
}

static FAR struct test_socket_s *test_socket_get(
  FAR struct daemon_priv_s *priv,
  int sockid)
{
  if (sockid < TEST_SOCKET_SOCKID_BASE)
    {
      return NULL;
    }

  sockid -= TEST_SOCKET_SOCKID_BASE;
  if (sockid >= ARRAY_SIZE(priv->test_sockets))
    {
      return NULL;
    }

  return &priv->test_sockets[sockid];
}

static int test_socket_free(FAR struct daemon_priv_s *priv, int sockid)
{
  FAR struct test_socket_s *tsock = test_socket_get(priv, sockid);

  if (!tsock)
    {
      return -EBADFD;
    }

  if (!tsock->opened)
    {
      return -EFAULT;
    }

  if (tsock->connected)
    {
      priv->sockets_connected--;
      tsock->connected = false;
    }

  if (tsock->blocked_connect)
    {
      priv->sockets_waiting_connect--;
      tsock->blocked_connect = false;
    }

  if (tsock->endp)
    {
      free(tsock->endp);
      usrsocktest_endp_malloc_cnt--;
      tsock->endp = NULL;
    }

  if (tsock->recv_avail_bytes == 0)
    {
      priv->sockets_recv_empty--;
    }

  if (tsock->connect_refused)
    {
      priv->sockets_not_connected_refused--;
    }

  if (tsock->disconnected)
    {
      priv->sockets_remote_disconnected--;
    }

  tsock->opened = false;
  priv->sockets_active--;

  return 0;
}

static int tsock_send_event(int fd, FAR struct daemon_priv_s *priv,
                            FAR struct test_socket_s *tsock, int events)
{
  ssize_t wlen;
  int i;
  FAR struct usrsock_message_socket_event_s event = {
  };

  event.head.flags = USRSOCK_MESSAGE_FLAG_EVENT;
  event.head.msgid = USRSOCK_MESSAGE_SOCKET_EVENT;

  for (i = 0; i < ARRAY_SIZE(priv->test_sockets); i++)
    {
      if (tsock == &priv->test_sockets[i])
        break;
    }

  if (i == ARRAY_SIZE(priv->test_sockets))
    {
      return -EINVAL;
    }

  event.usockid = i + TEST_SOCKET_SOCKID_BASE;
  event.events = events;

  wlen = write(fd, &event, sizeof(event));
  if (wlen < 0)
    {
      return -errno;
    }

  if (wlen != sizeof(event))
    {
      return -ENOSPC;
    }

  return OK;
}

static FAR void *find_endpoint(FAR struct daemon_priv_s *priv,
                               in_addr_t ipaddr)
{
  FAR struct sockaddr_in *endpaddr;
  int ok;

  endpaddr = malloc(sizeof(*endpaddr));
  usrsocktest_endp_malloc_cnt++;
  assert(endpaddr);

  ok = inet_pton(AF_INET, priv->conf->endpoint_addr,
                 &endpaddr->sin_addr.s_addr);
  endpaddr->sin_family = AF_INET;
  endpaddr->sin_port = htons(priv->conf->endpoint_port);
  assert(ok);

  if (endpaddr->sin_addr.s_addr == ipaddr)
    {
      return endpaddr;
    }

  free(endpaddr);
  usrsocktest_endp_malloc_cnt--;
  return NULL;
}

static bool endpoint_connect(FAR struct daemon_priv_s *priv, FAR void *endp,
                             uint16_t port)
{
  FAR struct sockaddr_in *endpaddr = endp;

  if (endpaddr->sin_port == port)
    {
      return true;
    }
  else
    {
      return false;
    }
}

static void get_endpoint_sockaddr(FAR void *endp,
                                  FAR struct sockaddr_in *endpaddr)
{
  *endpaddr = *(FAR struct sockaddr_in *)endp;
}

static ssize_t
read_req(int fd, FAR const struct usrsock_request_common_s *common_hdr,
         FAR void *req, size_t reqsize)
{
  ssize_t rlen;
  int err;

  rlen = read(fd, (uint8_t *)req + sizeof(*common_hdr),
              reqsize - sizeof(*common_hdr));
  if (rlen < 0)
    {
      err = errno;
      usrsocktest_dbg(
          "Error reading %d bytes of request: ret=%d, errno=%d\n",
          reqsize - sizeof(*common_hdr), (int)rlen, errno);
      return -err;
    }

  if (rlen + sizeof(*common_hdr) != reqsize)
    {
      return -EMSGSIZE;
    }

  return rlen;
}

static int socket_request(int fd, FAR struct daemon_priv_s *priv,
                          FAR void *hdrbuf)
{
  FAR struct usrsock_request_socket_s *req = hdrbuf;
  int socketid;
  ssize_t wlen;
  struct usrsock_message_req_ack_s resp = {
  };

  /* Validate input. */

  if (req->domain != priv->conf->supported_domain)
    {
      socketid = -EAFNOSUPPORT;
    }
  else if (req->type != priv->conf->supported_type ||
           req->protocol != priv->conf->supported_protocol)
    {
      socketid = -EPROTONOSUPPORT;
    }
  else if (priv->sockets_active >= priv->conf->max_sockets)
    {
      socketid = -EMFILE;
    }
  else
    {
      /* Allocate socket. */

      socketid = test_socket_alloc(priv);
      if (socketid < 0)
        socketid = -ENFILE;
    }

  /* Prepare response. */

  resp.head.msgid = USRSOCK_MESSAGE_RESPONSE_ACK;
  resp.head.flags = 0;
  resp.xid = req->head.xid;
  resp.result = socketid;

  /* Send response. */

  wlen = write(fd, &resp, sizeof(resp));
  if (wlen < 0)
    {
      return -errno;
    }

  if (wlen != sizeof(resp))
    {
      return -ENOSPC;
    }

  return OK;
}

static int close_request(int fd, FAR struct daemon_priv_s *priv,
                         FAR void *hdrbuf)
{
  FAR struct usrsock_request_close_s *req = hdrbuf;
  ssize_t wlen;
  int ret;
  struct usrsock_message_req_ack_s resp = {
  };

  /* Check if this socket exists. */

  ret = test_socket_free(priv, req->usockid);

  /* Prepare response. */

  resp.head.msgid = USRSOCK_MESSAGE_RESPONSE_ACK;
  resp.xid = req->head.xid;
  if (priv->conf->delay_all_responses)
    {
      resp.head.flags = USRSOCK_MESSAGE_FLAG_REQ_IN_PROGRESS;
      resp.result = -EAGAIN;
    }
  else
    {
      resp.head.flags = 0;
      resp.result = ret;
    }

  /* Send response. */

  wlen = write(fd, &resp, sizeof(resp));
  if (wlen < 0)
    {
      return -errno;
    }

  if (wlen != sizeof(resp))
    {
      return -ENOSPC;
    }

  if (priv->conf->delay_all_responses)
    {
      pthread_mutex_unlock(&daemon_mutex);
      usleep(50 * 1000);
      pthread_mutex_lock(&daemon_mutex);

      /* Previous write was acknowledgment to request, informing that request
       * is still in progress. Now write actual completion response.
       */

      resp.result = ret;
      resp.head.flags &= ~USRSOCK_MESSAGE_FLAG_REQ_IN_PROGRESS;

      wlen = write(fd, &resp, sizeof(resp));
      if (wlen < 0)
        {
          return -errno;
        }

      if (wlen != sizeof(resp))
        {
          return -ENOSPC;
        }
    }

  return OK;
}

static int connect_request(int fd, FAR struct daemon_priv_s *priv,
                           FAR void *hdrbuf)
{
  FAR struct usrsock_request_connect_s *req = hdrbuf;
  struct sockaddr_in addr;
  FAR struct test_socket_s *tsock;
  ssize_t wlen;
  ssize_t rlen;
  int ret = 0;
  struct usrsock_message_req_ack_s resp = {
  };

  DEBUGASSERT(priv);
  DEBUGASSERT(req);

  /* Check if this socket exists. */

  tsock = test_socket_get(priv, req->usockid);
  if (!tsock)
    {
      ret = -EBADFD;
      goto prepare;
    }

  /* Check if this socket is already connected. */

  if (tsock->connected)
    {
      ret = -EISCONN;
      goto prepare;
    }

  /* Check if address size ok. */

  if (req->addrlen > sizeof(addr))
    {
      ret = -EFAULT;
      goto prepare;
    }

  /* Read address. */

  rlen = read(fd, &addr, sizeof(addr));
  if (rlen < 0 || rlen < req->addrlen)
    {
      ret = -EFAULT;
      goto prepare;
    }

  /* Check address family. */

  if (addr.sin_family != priv->conf->supported_domain)
    {
      ret = -EAFNOSUPPORT;
      goto prepare;
    }

  /* Check if there is endpoint with target address */

  tsock->endp = find_endpoint(priv, addr.sin_addr.s_addr);
  if (!tsock->endp)
    {
      ret = -ENETUNREACH;
      goto prepare;
    }

  /* Check if there is port open at endpoint */

  if (!endpoint_connect(priv, tsock->endp, addr.sin_port))
    {
      free(tsock->endp);
      usrsocktest_endp_malloc_cnt--;
      tsock->endp = NULL;
      ret = -ECONNREFUSED;
      goto prepare;
    }

  ret = OK;

prepare:

  /* Prepare response. */

  resp.xid = req->head.xid;
  resp.head.msgid = USRSOCK_MESSAGE_RESPONSE_ACK;
  resp.head.flags = 0;

  if (priv->conf->endpoint_block_connect)
    {
      resp.head.flags = USRSOCK_MESSAGE_FLAG_REQ_IN_PROGRESS;
      resp.result = ret;

      /* Mark connection as blocked */

      priv->sockets_waiting_connect++;
      tsock->blocked_connect = true;
      tsock->pending_resp = resp;
    }
  else if (priv->conf->delay_all_responses)
    {
      resp.head.flags = USRSOCK_MESSAGE_FLAG_REQ_IN_PROGRESS;
      resp.result = -EINPROGRESS;
      tsock->blocked_connect = false;
    }
  else
    {
      if (ret == OK)
        {
          priv->sockets_connected++;
          tsock->connected = true;
        }

      resp.head.flags = 0;
      resp.result = ret;
      tsock->blocked_connect = false;
    }

  /* Send response. */

  wlen = write(fd, &resp, sizeof(resp));
  if (wlen < 0)
    {
      return -errno;
    }

  if (wlen != sizeof(resp))
    {
      return -ENOSPC;
    }

  if (priv->conf->endpoint_block_connect)
    {
      tsock->pending_resp.head.flags &=
        ~USRSOCK_MESSAGE_FLAG_REQ_IN_PROGRESS;
    }
  else
    {
      int events;

      if (priv->conf->delay_all_responses)
        {
          pthread_mutex_unlock(&daemon_mutex);
          usleep(50 * 1000);
          pthread_mutex_lock(&daemon_mutex);

          /* Previous write was acknowledgment to request, informing that
           * request is still in progress. Now write actual completion
           * response.
           */

          resp.result = ret;

          if (ret == OK)
            {
              priv->sockets_connected++;
              tsock->connected = true;
            }

          resp.head.flags &= ~USRSOCK_MESSAGE_FLAG_REQ_IN_PROGRESS;

          wlen = write(fd, &resp, sizeof(resp));
          if (wlen < 0)
            {
              return -errno;
            }

          if (wlen != sizeof(resp))
            {
              return -ENOSPC;
            }
        }

      events = 0;
      if (!tsock->block_send)
        {
          events |= USRSOCK_EVENT_SENDTO_READY;
        }

      if (tsock->recv_avail_bytes > 0)
        {
          events |= USRSOCK_EVENT_RECVFROM_AVAIL;
        }

      if (events)
        {
          wlen = tsock_send_event(fd, priv, tsock, events);
          if (wlen < 0)
            {
              return wlen;
            }
        }
    }

  return OK;
}

static int sendto_request(int fd, FAR struct daemon_priv_s *priv,
                          FAR void *hdrbuf)
{
  FAR struct usrsock_request_sendto_s *req = hdrbuf;
  FAR struct test_socket_s *tsock;
  ssize_t wlen;
  ssize_t rlen;
  int ret = 0;
  uint8_t sendbuf[16];
  int sendbuflen = 0;
  struct usrsock_message_req_ack_s resp = {
  };

  DEBUGASSERT(priv);
  DEBUGASSERT(req);

  /* Check if this socket exists. */

  tsock = test_socket_get(priv, req->usockid);
  if (!tsock)
    {
      ret = -EBADFD;
      goto prepare;
    }

  /* Check if this socket is connected. */

  if (!tsock->connected)
    {
      ret = -ENOTCONN;
      goto prepare;
    }

  /* Check if address size non-zero. */

  if (req->addrlen > 0)
    {
      ret = -EISCONN; /* connection-mode socket do not accept address */
      goto prepare;
    }

  /* Can send? */

  if (!tsock->block_send)
    {
      /* Check if request has data. */

      if (req->buflen > 0)
        {
          sendbuflen = req->buflen;
          if (sendbuflen > sizeof(sendbuf))
            sendbuflen = sizeof(sendbuf);

          /* Read data. */

          rlen = read(fd, sendbuf, sendbuflen);
          if (rlen < 0 || rlen < sendbuflen)
            {
              ret = -EFAULT;
              goto prepare;
            }

          /* Debug print */

          usrsocktest_dbg("got %d bytes of data: '%.*s'\n",
                          sendbuflen, sendbuflen, sendbuf);
        }
    }
  else
    {
      ret = -EAGAIN; /* blocked. */
      goto prepare;
    }

  ret = sendbuflen;

prepare:

  /* Prepare response. */

  resp.xid = req->head.xid;
  resp.head.msgid = USRSOCK_MESSAGE_RESPONSE_ACK;
  resp.head.flags = 0;

  if (priv->conf->delay_all_responses)
    {
      resp.head.flags = USRSOCK_MESSAGE_FLAG_REQ_IN_PROGRESS;
      resp.result = -EINPROGRESS;
    }
  else
    {
      if (ret > 0)
        {
          priv->total_send_bytes += ret;
        }

      resp.head.flags = 0;
      resp.result = ret;
    }

  /* Send response. */

  wlen = write(fd, &resp, sizeof(resp));
  if (wlen < 0)
    {
      return -errno;
    }

  if (wlen != sizeof(resp))
    {
      return -ENOSPC;
    }

  if (priv->conf->delay_all_responses)
    {
      pthread_mutex_unlock(&daemon_mutex);
      usleep(50 * 1000);
      pthread_mutex_lock(&daemon_mutex);

      /* Previous write was acknowledgment to request, informing that request
       * is still in progress. Now write actual completion response.
       */

      resp.result = ret;

      if (ret > 0)
        {
          priv->total_send_bytes += ret;
        }

      resp.head.flags &= ~USRSOCK_MESSAGE_FLAG_REQ_IN_PROGRESS;

      wlen = write(fd, &resp, sizeof(resp));
      if (wlen < 0)
        {
          return -errno;
        }

      if (wlen != sizeof(resp))
        {
          return -ENOSPC;
        }
    }

  if (!tsock->block_send)
    {
      /* Let kernel-side know that there is space for more send data. */

      wlen = tsock_send_event(fd, priv, tsock, USRSOCK_EVENT_SENDTO_READY);
      if (wlen < 0)
        {
          return wlen;
        }
    }

  return OK;
}

static int recvfrom_request(int fd, FAR struct daemon_priv_s *priv,
                            FAR void *hdrbuf)
{
  FAR struct usrsock_request_recvfrom_s *req = hdrbuf;
  FAR struct test_socket_s *tsock;
  ssize_t wlen;
  size_t i;
  int ret = 0;
  size_t outbuflen;
  struct sockaddr_in endpointaddr;
  struct usrsock_message_datareq_ack_s resp = {
  };

  DEBUGASSERT(priv);
  DEBUGASSERT(req);

  /* Check if this socket exists. */

  tsock = test_socket_get(priv, req->usockid);
  if (!tsock)
    {
      ret = -EBADFD;
      goto prepare;
    }

  /* Check if this socket is connected. */

  if (!tsock->connected)
    {
      ret = -ENOTCONN;
      goto prepare;
    }

  get_endpoint_sockaddr(tsock->endp, &endpointaddr);

  /* Do we have recv data available? */

  if (tsock->recv_avail_bytes > 0)
    {
      outbuflen = req->max_buflen;

      if (outbuflen > tsock->recv_avail_bytes)
        {
          outbuflen = tsock->recv_avail_bytes;
        }
    }
  else
    {
      ret = -EAGAIN; /* blocked. */
      goto prepare;
    }

  ret = outbuflen;

prepare:

  /* Prepare response. */

  resp.reqack.xid = req->head.xid;
  resp.reqack.head.msgid = USRSOCK_MESSAGE_RESPONSE_DATA_ACK;
  resp.reqack.head.flags = 0;

  if (priv->conf->delay_all_responses)
    {
      resp.reqack.head.flags = USRSOCK_MESSAGE_FLAG_REQ_IN_PROGRESS;
      resp.reqack.result = -EINPROGRESS;
      resp.valuelen = 0;
      resp.valuelen_nontrunc = 0;

      /* Send ack response. */

      wlen = write(fd, &resp, sizeof(resp));
      if (wlen < 0)
        {
          return -errno;
        }

      if (wlen != sizeof(resp))
        {
          return -ENOSPC;
        }

      pthread_mutex_unlock(&daemon_mutex);
      usleep(50 * 1000);
      pthread_mutex_lock(&daemon_mutex);

      /* Previous write was acknowledgment to request, informing that request
       * is still in progress. Now write actual completion response.
       */

      resp.reqack.head.msgid = USRSOCK_MESSAGE_RESPONSE_DATA_ACK;
      resp.reqack.head.flags &= ~USRSOCK_MESSAGE_FLAG_REQ_IN_PROGRESS;
    }

  resp.reqack.head.flags = 0;
  resp.reqack.result = ret;
  if (ret >= 0)
    {
      priv->total_recv_bytes += ret;
      resp.valuelen_nontrunc = sizeof(endpointaddr);
      resp.valuelen = resp.valuelen_nontrunc;
      if (resp.valuelen > req->max_addrlen)
        resp.valuelen = req->max_addrlen;
    }
  else
    {
      resp.valuelen = 0;
      resp.valuelen_nontrunc = 0;
    }

  /* Send response. */

  wlen = write(fd, &resp, sizeof(resp));
  if (wlen < 0)
    {
      return -errno;
    }

  if (wlen != sizeof(resp))
    {
      return -ENOSPC;
    }

  if (resp.valuelen > 0)
    {
      /* Send address (value) */

      wlen = write(fd, &endpointaddr, resp.valuelen);
      if (wlen < 0)
        {
          return -errno;
        }

      if (wlen != resp.valuelen)
        {
          return -ENOSPC;
        }
    }

  if (resp.reqack.result > 0)
    {
      /* Send buffer */

      for (i = 0; i < resp.reqack.result; i++)
        {
          char tmp = 'a' + i;

          /* Check if MSG_PEEK flag is specified. */

          if ((req->flags & MSG_PEEK) != MSG_PEEK)
            {
              tsock->recv_avail_bytes--;
            }

          wlen = write(fd, &tmp, 1);
          if (wlen < 0)
            {
              return -errno;
            }

          if (wlen != 1)
            {
              return -ENOSPC;
            }
        }

      if (tsock->recv_avail_bytes == 0)
        {
          priv->sockets_recv_empty++;
        }
    }

  if (tsock->recv_avail_bytes > 0)
    {
      /* Let kernel-side know that there is more recv data. */

      wlen = tsock_send_event(fd, priv, tsock, USRSOCK_EVENT_RECVFROM_AVAIL);
      if (wlen < 0)
        {
          return wlen;
        }
    }

  return OK;
}

static int setsockopt_request(int fd, FAR struct daemon_priv_s *priv,
                              FAR void *hdrbuf)
{
  FAR struct usrsock_request_setsockopt_s *req = hdrbuf;
  FAR struct test_socket_s *tsock;
  ssize_t wlen;
  ssize_t rlen;
  int ret = 0;
  int value;
  struct usrsock_message_req_ack_s resp = {
  };

  DEBUGASSERT(priv);
  DEBUGASSERT(req);

  /* Check if this socket exists. */

  tsock = test_socket_get(priv, req->usockid);
  if (!tsock)
    {
      ret = -EBADFD;
      goto prepare;
    }

  if (req->level != SOL_SOCKET)
    {
      usrsocktest_dbg("setsockopt: level=%d not supported\n", req->level);
      ret = -ENOPROTOOPT;
      goto prepare;
    }

  if (req->option != SO_REUSEADDR)
    {
      usrsocktest_dbg("setsockopt: option=%d not supported\n", req->option);
      ret = -ENOPROTOOPT;
      goto prepare;
    }

  if (req->valuelen < sizeof(value))
    {
      ret = -EINVAL;
      goto prepare;
    }

  /* Read value. */

  rlen = read(fd, &value, sizeof(value));
  if (rlen < 0 || rlen < sizeof(value))
    {
      ret = -EFAULT;
      goto prepare;
    }

  /* Debug print */

  usrsocktest_dbg("setsockopt: option=%d value=%d\n", req->option, value);

  ret = OK;

prepare:

  /* Prepare response. */

  resp.xid = req->head.xid;
  resp.head.msgid = USRSOCK_MESSAGE_RESPONSE_ACK;
  resp.head.flags = 0;

  if (priv->conf->delay_all_responses)
    {
      resp.head.flags = USRSOCK_MESSAGE_FLAG_REQ_IN_PROGRESS;
      resp.result = -EINPROGRESS;
    }
  else
    {
      resp.head.flags = 0;
      resp.result = ret;
    }

  /* Send response. */

  wlen = write(fd, &resp, sizeof(resp));
  if (wlen < 0)
    {
      return -errno;
    }

  if (wlen != sizeof(resp))
    {
      return -ENOSPC;
    }

  if (priv->conf->delay_all_responses)
    {
      pthread_mutex_unlock(&daemon_mutex);
      usleep(50 * 1000);
      pthread_mutex_lock(&daemon_mutex);

      /* Previous write was acknowledgment to request, informing that request
       * is still in progress. Now write actual completion response.
       */

      resp.result = ret;
      resp.head.flags &= ~USRSOCK_MESSAGE_FLAG_REQ_IN_PROGRESS;

      wlen = write(fd, &resp, sizeof(resp));
      if (wlen < 0)
        {
          return -errno;
        }

      if (wlen != sizeof(resp))
        {
          return -ENOSPC;
        }
    }

  return OK;
}

static int getsockopt_request(int fd, FAR struct daemon_priv_s *priv,
                              FAR void *hdrbuf)
{
  FAR struct usrsock_request_getsockopt_s *req = hdrbuf;
  FAR struct test_socket_s *tsock;
  ssize_t wlen;
  int ret = 0;
  int value;
  struct usrsock_message_datareq_ack_s resp = {
  };

  DEBUGASSERT(priv);
  DEBUGASSERT(req);

  /* Check if this socket exists. */

  tsock = test_socket_get(priv, req->usockid);
  if (!tsock)
    {
      ret = -EBADFD;
      goto prepare;
    }

  if (req->level != SOL_SOCKET)
    {
      usrsocktest_dbg("getsockopt: level=%d not supported\n", req->level);
      ret = -ENOPROTOOPT;
      goto prepare;
    }

  if (req->option != SO_REUSEADDR)
    {
      usrsocktest_dbg("getsockopt: option=%d not supported\n", req->option);
      ret = -ENOPROTOOPT;
      goto prepare;
    }

  if (req->max_valuelen < sizeof(value))
    {
      ret = -EINVAL;
      goto prepare;
    }

  value = 0;
  ret = OK;

prepare:

  /* Prepare response. */

  resp.reqack.xid = req->head.xid;
  resp.reqack.head.msgid = USRSOCK_MESSAGE_RESPONSE_DATA_ACK;
  resp.reqack.head.flags = 0;

  if (priv->conf->delay_all_responses)
    {
      resp.reqack.head.flags = USRSOCK_MESSAGE_FLAG_REQ_IN_PROGRESS;
      resp.reqack.result = -EINPROGRESS;
      resp.valuelen = 0;
      resp.valuelen_nontrunc = 0;

      /* Send ack response. */

      wlen = write(fd, &resp, sizeof(resp));
      if (wlen < 0)
        {
          return -errno;
        }

      if (wlen != sizeof(resp))
        {
          return -ENOSPC;
        }

      pthread_mutex_unlock(&daemon_mutex);
      usleep(50 * 1000);
      pthread_mutex_lock(&daemon_mutex);

      /* Previous write was acknowledgment to request, informing that request
       * is still in progress. Now write actual completion response.
       */

      resp.reqack.head.msgid = USRSOCK_MESSAGE_RESPONSE_DATA_ACK;
      resp.reqack.head.flags &= ~USRSOCK_MESSAGE_FLAG_REQ_IN_PROGRESS;
    }

  resp.reqack.head.flags = 0;
  resp.reqack.result = ret;
  if (ret >= 0)
    {
      resp.valuelen = sizeof(value);
    }
  else
    {
      resp.valuelen = 0;
    }

  /* Send response. */

  wlen = write(fd, &resp, sizeof(resp));
  if (wlen < 0)
    {
      return -errno;
    }

  if (wlen != sizeof(resp))
    {
      return -ENOSPC;
    }

  if (resp.valuelen > 0)
    {
      /* Send address (value) */

      wlen = write(fd, &value, resp.valuelen);
      if (wlen < 0)
        {
          return -errno;
        }

      if (wlen != resp.valuelen)
        {
          return -ENOSPC;
        }
    }

  return OK;
}

static int getsockname_request(int fd, FAR struct daemon_priv_s *priv,
                               FAR void *hdrbuf)
{
  FAR struct usrsock_request_getsockname_s *req = hdrbuf;
  FAR struct test_socket_s *tsock;
  ssize_t wlen;
  int ret = 0;
  struct sockaddr_in addr;
  struct usrsock_message_datareq_ack_s resp = {
  };

  DEBUGASSERT(priv);
  DEBUGASSERT(req);

  /* Check if this socket exists. */

  tsock = test_socket_get(priv, req->usockid);
  if (!tsock)
    {
      ret = -EBADFD;
      goto prepare;
    }

  ret = inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr.s_addr);
  addr.sin_family = AF_INET;
  addr.sin_port = htons(12345);
  ret = ret == 1 ? 0 : -EINVAL;

prepare:

  /* Prepare response. */

  resp.reqack.xid = req->head.xid;
  resp.reqack.head.msgid = USRSOCK_MESSAGE_RESPONSE_DATA_ACK;
  resp.reqack.head.flags = 0;

  if (priv->conf->delay_all_responses)
    {
      resp.reqack.head.flags = USRSOCK_MESSAGE_FLAG_REQ_IN_PROGRESS;
      resp.reqack.result = -EINPROGRESS;
      resp.valuelen = 0;
      resp.valuelen_nontrunc = 0;

      /* Send ack response. */

      wlen = write(fd, &resp, sizeof(resp));
      if (wlen < 0)
        {
          return -errno;
        }

      if (wlen != sizeof(resp))
        {
          return -ENOSPC;
        }

      pthread_mutex_unlock(&daemon_mutex);
      usleep(50 * 1000);
      pthread_mutex_lock(&daemon_mutex);

      /* Previous write was acknowledgment to request, informing that request
       * is still in progress. Now write actual completion response.
       */

      resp.reqack.head.msgid = USRSOCK_MESSAGE_RESPONSE_DATA_ACK;
      resp.reqack.head.flags &= ~USRSOCK_MESSAGE_FLAG_REQ_IN_PROGRESS;
    }

  resp.reqack.head.flags = 0;
  resp.reqack.result = ret;
  if (ret >= 0)
    {
      resp.valuelen = sizeof(addr);
      resp.valuelen_nontrunc = sizeof(addr);
      if (resp.valuelen > req->max_addrlen)
        {
          resp.valuelen = req->max_addrlen;
        }
    }
  else
    {
      resp.valuelen = 0;
      resp.valuelen_nontrunc = 0;
    }

  /* Send response. */

  wlen = write(fd, &resp, sizeof(resp));
  if (wlen < 0)
    {
      return -errno;
    }

  if (wlen != sizeof(resp))
    {
      return -ENOSPC;
    }

  if (resp.valuelen > 0)
    {
      /* Send address (value) */

      wlen = write(fd, &addr, resp.valuelen);
      if (wlen < 0)
        {
          return -errno;
        }

      if (wlen != resp.valuelen)
        {
          return -ENOSPC;
        }
    }

  return OK;
}

static int handle_usrsock_request(int fd, FAR struct daemon_priv_s *priv)
{
  static const struct
  {
    unsigned int hdrlen;
    int (CODE *fn)(int fd, FAR struct daemon_priv_s *priv, FAR void *req);
  }

  handlers[USRSOCK_REQUEST__MAX] =
    {
      [USRSOCK_REQUEST_SOCKET] =
        {
          sizeof(struct usrsock_request_socket_s),
          socket_request,
        },

      [USRSOCK_REQUEST_CLOSE] =
        {
          sizeof(struct usrsock_request_close_s),
          close_request,
        },

      [USRSOCK_REQUEST_CONNECT] =
        {
          sizeof(struct usrsock_request_connect_s),
          connect_request,
        },

      [USRSOCK_REQUEST_SENDTO] =
        {
          sizeof(struct usrsock_request_sendto_s),
          sendto_request,
        },

      [USRSOCK_REQUEST_RECVFROM] =
        {
          sizeof(struct usrsock_request_recvfrom_s),
          recvfrom_request,
        },

      [USRSOCK_REQUEST_SETSOCKOPT] =
        {
          sizeof(struct usrsock_request_setsockopt_s),
          setsockopt_request,
        },

      [USRSOCK_REQUEST_GETSOCKOPT] =
        {
          sizeof(struct usrsock_request_getsockopt_s),
          getsockopt_request,
        },

      [USRSOCK_REQUEST_GETSOCKNAME] =
        {
          sizeof(struct usrsock_request_getsockname_s),
          getsockname_request,
        },
    };

  uint8_t hdrbuf[16];
  FAR struct usrsock_request_common_s *common_hdr = (FAR void *)hdrbuf;
  ssize_t rlen;

  rlen = read(fd, common_hdr, sizeof(*common_hdr));
  if (rlen < 0)
    {
      return -errno;
    }

  if (rlen != sizeof(*common_hdr))
    {
      return -EMSGSIZE;
    }

  if (common_hdr->reqid >= USRSOCK_REQUEST__MAX ||
      !handlers[common_hdr->reqid].fn)
    {
      usrsocktest_dbg("Unknown request type: %d\n", common_hdr->reqid);
      return -EIO;
    }

  assert(handlers[common_hdr->reqid].hdrlen < sizeof(hdrbuf));

  rlen = read_req(fd, common_hdr, hdrbuf,
                  handlers[common_hdr->reqid].hdrlen);
  if (rlen < 0)
    {
      return rlen;
    }

  return handlers[common_hdr->reqid].fn(fd, priv, hdrbuf);
}

static int unblock_sendto(int fd, FAR struct daemon_priv_s *priv,
                          FAR struct test_socket_s *tsock)
{
  if (tsock->block_send)
    {
      int ret;

      tsock->block_send = false;

      ret = tsock_send_event(fd, priv, tsock, USRSOCK_EVENT_SENDTO_READY);
      if (ret < 0)
        {
          return ret;
        }
    }

  return OK;
}

static int reset_recv_avail(int fd, FAR struct daemon_priv_s *priv,
                            FAR struct test_socket_s *tsock)
{
  if (tsock->recv_avail_bytes == 0)
    {
      int ret;

      priv->sockets_recv_empty--;

      tsock->recv_avail_bytes = priv->conf->endpoint_recv_avail;

      ret = tsock_send_event(fd, priv, tsock,
                             USRSOCK_EVENT_RECVFROM_AVAIL);
      if (ret < 0)
        {
          return ret;
        }
    }

  return OK;
}

static int disconnect_connection(int fd, FAR struct daemon_priv_s *priv,
                                 FAR struct test_socket_s *tsock)
{
  if (tsock->connected)
    {
      int ret;

      tsock->disconnected = true;
      tsock->connected = false;

      priv->sockets_connected--;
      priv->sockets_remote_disconnected++;

      ret = tsock_send_event(fd, priv, tsock, USRSOCK_EVENT_REMOTE_CLOSED);
      if (ret < 0)
        {
          return ret;
        }
    }

  return OK;
}

static int establish_blocked_connection(int fd,
                                        FAR struct daemon_priv_s *priv,
                                        FAR struct test_socket_s *tsock)
{
  if (tsock->blocked_connect)
    {
      FAR struct usrsock_message_req_ack_s *resp = &tsock->pending_resp;
      ssize_t wlen;
      int events;

      if (resp->result == OK)
        {
          priv->sockets_connected++;
          tsock->connected = true;
        }

      tsock->blocked_connect = false;

      priv->sockets_waiting_connect--;
      resp->head.flags &= ~USRSOCK_MESSAGE_FLAG_REQ_IN_PROGRESS;

      wlen = write(fd, resp, sizeof(*resp));
      if (wlen < 0)
        {
          return -errno;
        }

      if (wlen != sizeof(*resp))
        {
          return -ENOSPC;
        }

      events = 0;
      if (!tsock->block_send)
        {
          events |= USRSOCK_EVENT_SENDTO_READY;
        }

      if (tsock->recv_avail_bytes > 0)
        {
          events |= USRSOCK_EVENT_RECVFROM_AVAIL;
        }

      if (events)
        {
          wlen = tsock_send_event(fd, priv, tsock, events);
          if (wlen < 0)
            {
              return wlen;
            }
        }
    }

  return OK;
}

static int fail_blocked_connection(int fd, FAR struct daemon_priv_s *priv,
                                   FAR struct test_socket_s *tsock)
{
  if (tsock->blocked_connect)
    {
      FAR struct usrsock_message_req_ack_s *resp = &tsock->pending_resp;
      ssize_t wlen;

      resp->result = -ECONNREFUSED;
      priv->sockets_not_connected_refused++;
      tsock->connect_refused = true;
      tsock->blocked_connect = false;

      priv->sockets_waiting_connect--;
      resp->head.flags &= ~USRSOCK_MESSAGE_FLAG_REQ_IN_PROGRESS;

      wlen = write(fd, resp, sizeof(*resp));
      if (wlen < 0)
        {
          return -errno;
        }

      if (wlen != sizeof(*resp))
        {
          return -ENOSPC;
        }
    }

  return OK;
}

static int for_each_connection(int fd, FAR struct daemon_priv_s *priv,
                               int (CODE *iter_fn)(
                                   int fd,
                                   FAR struct daemon_priv_s *priv,
                                   FAR struct test_socket_s *tsock))
{
  int i;

  for (i = 0; i < ARRAY_SIZE(priv->test_sockets); i++)
    {
      FAR struct test_socket_s *tsock = &priv->test_sockets[i];

      if (tsock->opened)
        {
          int ret = iter_fn(fd, priv, tsock);
          if (ret < 0)
            {
              return ret;
            }
        }
    }

  return OK;
}

static FAR void *usrsocktest_daemon(FAR void *param)
{
  FAR struct daemon_priv_s *priv = param;
  bool stopped;
  int ret;
  int fd;

  usrsocktest_dbg("\n");

  priv->sockets_active = 0;

  fd = open("/dev/usrsock", O_RDWR);
  if (fd < 0)
    {
      ret = -errno;
      goto errout;
    }

  do
    {
      int npfds = 0;
      int usrsock_pfdpos = -1;
      int pipe_pdfpos = -1;
      struct pollfd pfd[2] = {
      };

      stopped = false;

      /* Wait for request from kernel side. */

      pthread_mutex_lock(&daemon_mutex);
      if (!priv->do_not_poll_usrsock && fd >= 0)
        {
          pfd[npfds].fd = fd;
          pfd[npfds].events = POLLIN;
          usrsock_pfdpos = npfds++;
        }

      pfd[npfds].fd = priv->pipefd[0];
      pfd[npfds].events = POLLIN;
      pipe_pdfpos = npfds++;
      pthread_mutex_unlock(&daemon_mutex);

      ret = poll(pfd, npfds, -1);
      if (ret < 0)
        {
          /* Error? */

          ret = -errno;
          goto errout;
        }

      if (usrsock_pfdpos >= 0 && (pfd[usrsock_pfdpos].revents & POLLIN))
        {
          pthread_mutex_lock(&daemon_mutex);
          ret = handle_usrsock_request(fd, priv);
          pthread_mutex_unlock(&daemon_mutex);
          if (ret < 0)
            {
              goto errout;
            }
        }

      if (pipe_pdfpos >= 0 && (pfd[pipe_pdfpos].revents & POLLIN))
        {
          char in;

          if (read(pfd[pipe_pdfpos].fd, &in, 1) == 1)
            {
              pthread_mutex_lock(&daemon_mutex);

              switch (in)
                {
                case 'S':
                  stopped = true;
                  ret = 0;
                  break;
                case 's':
                  stopped = false;
                  ret = 0;
                  break;
                case 'E':
                  ret = for_each_connection(fd, priv,
                                            &establish_blocked_connection);
                  break;
                case 'F':
                  ret = for_each_connection(fd, priv,
                                            &fail_blocked_connection);
                  break;
                case 'D':
                  ret = for_each_connection(fd, priv,
                                            &disconnect_connection);
                  break;
                case 'W':
                  ret = for_each_connection(fd, priv,
                                            &unblock_sendto);
                  break;
                case 'r':
                  ret = for_each_connection(fd, priv,
                                            &reset_recv_avail);
                  break;
                case 'K':

                  /* Kill usrsockdev */

                  if (fd >= 0)
                    {
                      close(fd);
                      fd = -1;
                    }

                  break;
                case '*':
                  sem_post(&priv->wakewaitsem);
                  break; /* woke thread. */
                }

              pthread_mutex_unlock(&daemon_mutex);

              if (ret < 0)
                {
                  goto errout;
                }
            }
        }

      usleep(1);
    }
  while (!stopped);

  ret = OK;
errout:
  if (fd >= 0)
    {
      close(fd);
    }

  usrsocktest_dbg("ret: %d\n", ret);

  return (FAR void *)(intptr_t)ret;
}

static int get_daemon_value(FAR struct daemon_priv_s *priv,
                            FAR void *dst, FAR const void *src, size_t len)
{
  int ret = 0;

  if ((uintptr_t)src < (uintptr_t)priv ||
      (uintptr_t)src >= (uintptr_t)priv + sizeof(*priv) || len <= 0)
    {
      /* Not daemon value */

      return -EINVAL;
    }

  pthread_mutex_lock(&daemon_mutex);

  if (priv->conf == NULL)
    {
      /* Not running? */

      ret = -ENODEV;
      goto out;
    }

  memmove(dst, src, len);

out:
  pthread_mutex_unlock(&daemon_mutex);

  return ret;
}

static FAR void *delayed_cmd_thread(FAR void *priv)
{
  FAR struct delayed_cmd_s *cmd = priv;

  if (cmd->delay_msec)
    {
      sem_post(&cmd->startsem);
    }

  usleep(cmd->delay_msec * 1000);

  write(cmd->pipefd, &cmd->cmd, 1);

  if (!cmd->delay_msec)
    {
      sem_post(&cmd->startsem);
    }

  return NULL;
}

/****************************************************************************
 * Public Functions
 ****************************************************************************/

int usrsocktest_daemon_start(
  FAR const struct usrsocktest_daemon_conf_s *conf)
{
  FAR struct daemon_priv_s *priv = &g_ub_daemon;
  pthread_attr_t attr;
  int ret;

  usrsocktest_dbg("\n");

  pthread_mutex_lock(&daemon_mutex);

  if (priv->conf != NULL || !priv->joined)
    {
      /* Already running? */

      ret = -EALREADY;
      goto out;
    }

  /* Clear daemon private data. */

  memset(priv, 0, sizeof(*priv));

  /* Allocate pipe for daemon commands. */

  ret = pipe(priv->pipefd);
  if (ret != OK)
    {
      ret = -errno;
      goto out;
    }

  ret = pthread_attr_init(&attr);
  if (ret != OK)
    {
      ret = -ret;
      goto errout_closepipe;
    }

  sem_init(&priv->wakewaitsem, 0, 0);

  priv->joined = false;
  priv->conf = conf;

  ret = pthread_create(&priv->tid, &attr, usrsocktest_daemon, priv);
  if (ret != OK)
    {
      sem_destroy(&priv->wakewaitsem);
      priv->joined = true;
      priv->conf = NULL;
      ret = -ret;
      goto errout_closepipe;
    }

errout_closepipe:
  if (ret != OK)
    {
      close(priv->pipefd[0]);
      close(priv->pipefd[1]);
    }

out:
  pthread_mutex_unlock(&daemon_mutex);
  usrsocktest_dbg("ret: %d\n", ret);
  return ret;
}

int usrsocktest_daemon_stop(void)
{
  FAR struct daemon_priv_s *priv = &g_ub_daemon;
  FAR struct delayed_cmd_s *item;
  FAR struct delayed_cmd_s *next;
  FAR pthread_addr_t retval;
  char stopped;
  int ret;
  int i;

  usrsocktest_dbg("\n");

  pthread_mutex_lock(&daemon_mutex);

  if (priv->conf == NULL)
    {
      /* Not running? */

      ret = -ENODEV;
      goto out;
    }

  item = (void *)sq_peek(&priv->delayed_cmd_threads);
  while (item)
    {
      next = (void *)sq_next(&item->node);

      pthread_mutex_unlock(&daemon_mutex);
      pthread_join(item->tid, &retval);
      pthread_mutex_lock(&daemon_mutex);
      sq_rem(&item->node, &priv->delayed_cmd_threads);
      free(item);
      usrsocktest_dcmd_malloc_cnt--;

      item = next;
    }

  pthread_mutex_unlock(&daemon_mutex);
  stopped = 'S';
  write(priv->pipefd[1], &stopped, 1);

  ret = pthread_join(priv->tid, &retval);
  pthread_mutex_lock(&daemon_mutex);
  if (ret != OK)
    {
      ret = -ret;
      goto out;
    }

  for (i = 0; i < ARRAY_SIZE(priv->test_sockets); i++)
    {
      if (priv->test_sockets[i].opened && priv->test_sockets[i].endp != NULL)
        {
          free(priv->test_sockets[i].endp);
          priv->test_sockets[i].endp = NULL;
          usrsocktest_endp_malloc_cnt--;
        }
    }

  priv->conf = NULL;
  close(priv->pipefd[0]);
  close(priv->pipefd[1]);
  sem_destroy(&priv->wakewaitsem);

  priv->joined = true;
  ret = (intptr_t)retval;

out:
  pthread_mutex_unlock(&daemon_mutex);
  usrsocktest_dbg("ret: %d\n", ret);
  return ret;
}

int usrsocktest_daemon_get_num_active_sockets(void)
{
  FAR struct daemon_priv_s *priv = &g_ub_daemon;
  int ret;
  int err;

  err = get_daemon_value(priv, &ret, &priv->sockets_active, sizeof(ret));
  if (err < 0)
    {
      return err;
    }

  return ret;
}

int usrsocktest_daemon_get_num_connected_sockets(void)
{
  FAR struct daemon_priv_s *priv = &g_ub_daemon;
  int ret;
  int err;

  err = get_daemon_value(priv, &ret, &priv->sockets_connected, sizeof(ret));
  if (err < 0)
    {
      return err;
    }

  return ret;
}

int usrsocktest_daemon_get_num_waiting_connect_sockets(void)
{
  FAR struct daemon_priv_s *priv = &g_ub_daemon;
  int ret;
  int err;

  err = get_daemon_value(priv, &ret, &priv->sockets_waiting_connect,
                         sizeof(ret));
  if (err < 0)
    {
      return err;
    }

  return ret;
}

int usrsocktest_daemon_get_num_recv_empty_sockets(void)
{
  FAR struct daemon_priv_s *priv = &g_ub_daemon;
  int ret;
  int err;

  err = get_daemon_value(priv, &ret, &priv->sockets_recv_empty, sizeof(ret));
  if (err < 0)
    {
      return err;
    }

  return ret;
}

ssize_t usrsocktest_daemon_get_send_bytes(void)
{
  FAR struct daemon_priv_s *priv = &g_ub_daemon;
  size_t ret;
  int err;

  err = get_daemon_value(priv, &ret, &priv->total_send_bytes, sizeof(ret));
  if (err < 0)
    {
      return err;
    }

  return ret;
}

ssize_t usrsocktest_daemon_get_recv_bytes(void)
{
  FAR struct daemon_priv_s *priv = &g_ub_daemon;
  size_t ret;
  int err;

  err = get_daemon_value(priv, &ret, &priv->total_recv_bytes, sizeof(ret));
  if (err < 0)
    {
      return err;
    }

  return ret;
}

int usrsocktest_daemon_get_num_unreachable_sockets(void)
{
  FAR struct daemon_priv_s *priv = &g_ub_daemon;
  int ret;
  int err;

  err = get_daemon_value(priv, &ret, &priv->sockets_not_connected_refused,
                         sizeof(ret));
  if (err < 0)
    {
      return err;
    }

  return ret;
}

int usrsocktest_daemon_get_num_remote_disconnected_sockets(void)
{
  FAR struct daemon_priv_s *priv = &g_ub_daemon;
  int ret;
  int err;

  err = get_daemon_value(priv, &ret, &priv->sockets_remote_disconnected,
                         sizeof(ret));
  if (err < 0)
    {
      return err;
    }

  return ret;
}

int usrsocktest_daemon_pause_usrsock_handling(bool pause)
{
  FAR struct daemon_priv_s *priv = &g_ub_daemon;
  int ret;
  char cmd = '*';

  pthread_mutex_lock(&daemon_mutex);

  if (priv->conf == NULL)
    {
      /* Not running? */

      pthread_mutex_unlock(&daemon_mutex);
      return -ENODEV;
    }

  priv->do_not_poll_usrsock = pause;

  write(priv->pipefd[1], &cmd, 1);
  ret = OK;

  pthread_mutex_unlock(&daemon_mutex);

  sem_wait(&priv->wakewaitsem);

  return ret;
}

bool usrsocktest_send_delayed_command(const char cmd,
                                      unsigned int delay_msec)
{
  FAR struct daemon_priv_s *priv = &g_ub_daemon;
  pthread_attr_t attr;
  FAR struct delayed_cmd_s *delayed_cmd;
  int ret;

  if (priv->conf == NULL)
    {
      /* Not running? */

      return false;
    }

  delayed_cmd = calloc(1, sizeof(*delayed_cmd));
  if (!delayed_cmd)
    {
      return false;
    }

  usrsocktest_dcmd_malloc_cnt++;

  delayed_cmd->delay_msec = delay_msec;
  delayed_cmd->cmd = cmd;
  delayed_cmd->pipefd = priv->pipefd[1];
  sem_init(&delayed_cmd->startsem, 0, 0);

  ret = pthread_attr_init(&attr);
  if (ret != OK)
    {
      free(delayed_cmd);
      usrsocktest_dcmd_malloc_cnt--;
      return false;
    }

  ret = pthread_create(&delayed_cmd->tid, &attr, delayed_cmd_thread,
                       delayed_cmd);
  if (ret != OK)
    {
      free(delayed_cmd);
      usrsocktest_dcmd_malloc_cnt--;
      return false;
    }

  while (sem_wait(&delayed_cmd->startsem) != OK);

  sq_addlast(&delayed_cmd->node, &priv->delayed_cmd_threads);

  return true;
}

bool usrsocktest_daemon_establish_waiting_connections(void)
{
  return usrsocktest_send_delayed_command('E', 0);
}