/**************************************************************************** * net/rpmsg/rpmsg_sockif.c * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. The * ASF licenses this file to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * License for the specific language governing permissions and limitations * under the License. * ****************************************************************************/ /**************************************************************************** * Included Files ****************************************************************************/ #include #include #include #include #include #include #include #include #include #include #include #include #include "socket/socket.h" /**************************************************************************** * Pre-processor Definitions ****************************************************************************/ #ifndef MIN # define MIN(a, b) ((a) < (b) ? (a) : (b)) #endif #define RPMSG_SOCKET_CMD_SYNC 1 #define RPMSG_SOCKET_CMD_DATA 2 #define RPMSG_SOCKET_NAME_PREFIX "rpmsg-socket:" #define RPMSG_SOCKET_NAME_PREFIX_LEN 13 static_assert(RPMSG_SOCKET_NAME_SIZE + RPMSG_SOCKET_NAME_PREFIX_LEN <= RPMSG_NAME_SIZE, "socket name size config error"); /**************************************************************************** * Private Types ****************************************************************************/ begin_packed_struct struct rpmsg_socket_sync_s { uint32_t cmd; uint32_t size; } end_packed_struct; begin_packed_struct struct rpmsg_socket_data_s { uint32_t cmd; uint32_t pos; /* Act data len, don't include len itself when SOCK_DGRAM */ uint32_t len; char data[0]; } end_packed_struct; struct rpmsg_socket_conn_s { struct rpmsg_endpoint ept; struct sockaddr_rpmsg rpaddr; uint16_t crefs; FAR struct pollfd *fds[CONFIG_NET_RPMSG_NPOLLWAITERS]; sem_t sendsem; sem_t sendlock; sem_t recvsem; sem_t recvlock; FAR void *recvdata; uint32_t recvlen; FAR struct circbuf_s recvbuf; FAR struct socket *psock; FAR struct rpmsg_socket_conn_s *next; /* server listen-scoket listening: backlog > 0; * server listen-scoket closed: backlog = -1; * others: backlog = 0; */ int backlog; /* Flow control, descript send side */ uint32_t sendsize; uint32_t sendpos; uint32_t ackpos; /* Flow control, descript recv side */ uint32_t recvpos; uint32_t lastpos; }; /**************************************************************************** * Private Function Prototypes ****************************************************************************/ static int rpmsg_socket_setup(FAR struct socket *psock, int protocol); static sockcaps_t rpmsg_socket_sockcaps(FAR struct socket *psock); static void rpmsg_socket_addref(FAR struct socket *psock); static int rpmsg_socket_bind(FAR struct socket *psock, FAR const struct sockaddr *addr, socklen_t addrlen); static int rpmsg_socket_getsockname(FAR struct socket *psock, FAR struct sockaddr *addr, FAR socklen_t *addrlen); static int rpmsg_socket_getconnname(FAR struct socket *psock, FAR struct sockaddr *addr, FAR socklen_t *addrlen); static int rpmsg_socket_listen(FAR struct socket *psock, int backlog); static int rpmsg_socket_connect(FAR struct socket *psock, FAR const struct sockaddr *addr, socklen_t addrlen); static int rpmsg_socket_accept(FAR struct socket *psock, FAR struct sockaddr *addr, FAR socklen_t *addrlen, FAR struct socket *newsock); static int rpmsg_socket_poll(FAR struct socket *psock, FAR struct pollfd *fds, bool setup); static ssize_t rpmsg_socket_sendmsg(FAR struct socket *psock, FAR struct msghdr *msg, int flags); static ssize_t rpmsg_socket_recvmsg(FAR struct socket *psock, FAR struct msghdr *msg, int flags); static int rpmsg_socket_close(FAR struct socket *psock); /**************************************************************************** * Public Data ****************************************************************************/ const struct sock_intf_s g_rpmsg_sockif = { rpmsg_socket_setup, /* si_setup */ rpmsg_socket_sockcaps, /* si_sockcaps */ rpmsg_socket_addref, /* si_addref */ rpmsg_socket_bind, /* si_bind */ rpmsg_socket_getsockname, /* si_getsockname */ rpmsg_socket_getconnname, /* si_getconnname */ rpmsg_socket_listen, /* si_listen */ rpmsg_socket_connect, /* si_connect */ rpmsg_socket_accept, /* si_accept */ rpmsg_socket_poll, /* si_poll */ rpmsg_socket_sendmsg, /* si_sendmsg */ rpmsg_socket_recvmsg, /* si_recvmsg */ rpmsg_socket_close /* si_close */ }; /**************************************************************************** * Private Data ****************************************************************************/ static unsigned int g_rpmsg_id; /**************************************************************************** * Private Functions ****************************************************************************/ static inline void rpmsg_socket_lock(FAR sem_t *sem) { net_lockedwait_uninterruptible(sem); } static inline void rpmsg_socket_unlock(FAR sem_t *sem) { nxsem_post(sem); } static inline void rpmsg_socket_post(FAR sem_t *sem) { int semcount; nxsem_get_value(sem, &semcount); if (semcount < 1) { nxsem_post(sem); } } static void rpmsg_socket_pollnotify(FAR struct rpmsg_socket_conn_s *conn, pollevent_t eventset) { int i; for (i = 0; i < CONFIG_NET_RPMSG_NPOLLWAITERS; i++) { FAR struct pollfd *fds = conn->fds[i]; if (fds) { fds->revents |= (fds->events & eventset); if (fds->revents != 0) { rpmsg_socket_post(fds->sem); } } } } static FAR struct rpmsg_socket_conn_s *rpmsg_socket_alloc(void) { FAR struct rpmsg_socket_conn_s *conn; conn = kmm_zalloc(sizeof(struct rpmsg_socket_conn_s)); if (!conn) { return NULL; } circbuf_init(&conn->recvbuf, NULL, 0); nxsem_init(&conn->sendlock, 0, 1); nxsem_init(&conn->recvlock, 0, 1); nxsem_init(&conn->sendsem, 0, 0); nxsem_init(&conn->recvsem, 0, 0); nxsem_set_protocol(&conn->sendsem, SEM_PRIO_NONE); nxsem_set_protocol(&conn->recvsem, SEM_PRIO_NONE); conn->crefs = 1; return conn; } static void rpmsg_socket_free(FAR struct rpmsg_socket_conn_s *conn) { circbuf_uninit(&conn->recvbuf); nxsem_destroy(&conn->recvlock); nxsem_destroy(&conn->sendlock); nxsem_destroy(&conn->sendsem); nxsem_destroy(&conn->recvsem); kmm_free(conn); } static int rpmsg_socket_wakeup(FAR struct rpmsg_socket_conn_s *conn) { struct rpmsg_socket_data_s msg; uint32_t space; int ret = 0; if (!conn->ept.rdev) { return ret; } rpmsg_socket_lock(&conn->recvlock); space = conn->recvpos - conn->lastpos; if (space > circbuf_size(&conn->recvbuf) / 2) { conn->lastpos = conn->recvpos; msg.cmd = RPMSG_SOCKET_CMD_DATA; msg.pos = conn->recvpos; msg.len = 0; ret = 1; } rpmsg_socket_unlock(&conn->recvlock); return ret ? rpmsg_send(&conn->ept, &msg, sizeof(msg)) : 0; } static inline uint32_t rpmsg_socket_get_space( FAR struct rpmsg_socket_conn_s *conn) { return conn->sendsize - (conn->sendpos - conn->ackpos); } static int rpmsg_socket_ept_cb(FAR struct rpmsg_endpoint *ept, FAR void *data, size_t len, uint32_t src, FAR void *priv) { FAR struct rpmsg_socket_conn_s *conn = ept->priv; FAR struct rpmsg_socket_sync_s *head = data; if (head->cmd == RPMSG_SOCKET_CMD_SYNC) { conn->sendsize = head->size; conn->psock->s_flags |= _SF_CONNECTED; _SO_SETERRNO(conn->psock, OK); rpmsg_socket_post(&conn->sendsem); rpmsg_socket_pollnotify(conn, POLLOUT); } else { FAR struct rpmsg_socket_data_s *msg = data; FAR uint8_t *buf = (FAR uint8_t *)msg->data; rpmsg_socket_lock(&conn->sendlock); conn->ackpos = msg->pos; if (rpmsg_socket_get_space(conn) > 0) { rpmsg_socket_post(&conn->sendsem); rpmsg_socket_pollnotify(conn, POLLOUT); } rpmsg_socket_unlock(&conn->sendlock); if (len > sizeof(*msg)) { len -= sizeof(*msg); DEBUGASSERT(len == msg->len || len == msg->len + sizeof(uint32_t)); rpmsg_socket_lock(&conn->recvlock); if (conn->recvdata) { conn->recvlen = MIN(conn->recvlen, msg->len); if (len == msg->len) { /* SOCK_STREAM */ conn->recvpos += conn->recvlen; memcpy(conn->recvdata, buf, conn->recvlen); buf += conn->recvlen; len -= conn->recvlen; } else { /* SOCK_DGRAM */ conn->recvpos += len; memcpy(conn->recvdata, buf + sizeof(uint32_t), conn->recvlen); len = 0; } conn->recvdata = NULL; nxsem_post(&conn->recvsem); } if (len > 0) { ssize_t written; written = circbuf_write(&conn->recvbuf, buf, len); if (written != len) { nerr("circbuf_write overflow, %d, %d\n", written, len); } rpmsg_socket_pollnotify(conn, POLLIN); } rpmsg_socket_unlock(&conn->recvlock); } } return 0; } static inline void rpmsg_socket_destroy_ept( FAR struct rpmsg_socket_conn_s *conn) { if (!conn) { return; } rpmsg_socket_lock(&conn->recvlock); if (conn->ept.rdev) { if (conn->backlog) { /* Listen socket */ conn->backlog = -1; } rpmsg_destroy_ept(&conn->ept); rpmsg_socket_post(&conn->sendsem); rpmsg_socket_post(&conn->recvsem); rpmsg_socket_pollnotify(conn, POLLIN | POLLOUT); } rpmsg_socket_unlock(&conn->recvlock); } static void rpmsg_socket_ns_bound(struct rpmsg_endpoint *ept) { FAR struct rpmsg_socket_conn_s *conn = ept->priv; struct rpmsg_socket_sync_s msg; msg.cmd = RPMSG_SOCKET_CMD_SYNC; msg.size = circbuf_size(&conn->recvbuf); rpmsg_send(&conn->ept, &msg, sizeof(msg)); } static void rpmsg_socket_ns_unbind(FAR struct rpmsg_endpoint *ept) { rpmsg_socket_destroy_ept(ept->priv); } static void rpmsg_socket_device_created(FAR struct rpmsg_device *rdev, FAR void *priv) { FAR struct rpmsg_socket_conn_s *conn = priv; char buf[RPMSG_NAME_SIZE]; if (conn->ept.rdev) { return; } if (strcmp(conn->rpaddr.rp_cpu, rpmsg_get_cpuname(rdev)) == 0) { conn->ept.priv = conn; conn->ept.ns_bound_cb = rpmsg_socket_ns_bound; snprintf(buf, sizeof(buf), "%s%s", RPMSG_SOCKET_NAME_PREFIX, conn->rpaddr.rp_name); rpmsg_create_ept(&conn->ept, rdev, buf, RPMSG_ADDR_ANY, RPMSG_ADDR_ANY, rpmsg_socket_ept_cb, rpmsg_socket_ns_unbind); } } static void rpmsg_socket_device_destroy(FAR struct rpmsg_device *rdev, FAR void *priv) { FAR struct rpmsg_socket_conn_s *conn = priv; if (strcmp(conn->rpaddr.rp_cpu, rpmsg_get_cpuname(rdev)) == 0) { rpmsg_socket_destroy_ept(conn); } } static void rpmsg_socket_ns_bind(FAR struct rpmsg_device *rdev, FAR void *priv, FAR const char *name, uint32_t dest) { FAR struct rpmsg_socket_conn_s *server = priv; FAR struct rpmsg_socket_conn_s *tmp; FAR struct rpmsg_socket_conn_s *new; char buf[RPMSG_NAME_SIZE]; int cnt = 0; int ret; snprintf(buf, sizeof(buf), "%s%s", RPMSG_SOCKET_NAME_PREFIX, server->rpaddr.rp_name); if (strncmp(name, buf, strlen(buf))) { return; } if (strlen(server->rpaddr.rp_cpu) && strcmp(server->rpaddr.rp_cpu, rpmsg_get_cpuname(rdev))) { /* Bind specific CPU, then only listen that CPU */ return; } new = rpmsg_socket_alloc(); if (!new) { return; } ret = circbuf_resize(&new->recvbuf, CONFIG_NET_RPMSG_RXBUF_SIZE); if (ret < 0) { rpmsg_socket_free(new); return; } new->ept.priv = new; ret = rpmsg_create_ept(&new->ept, rdev, name, RPMSG_ADDR_ANY, dest, rpmsg_socket_ept_cb, rpmsg_socket_ns_unbind); if (ret < 0) { rpmsg_socket_free(new); return; } rpmsg_socket_ns_bound(&new->ept); strcpy(new->rpaddr.rp_cpu, rpmsg_get_cpuname(rdev)); strcpy(new->rpaddr.rp_name, name + RPMSG_SOCKET_NAME_PREFIX_LEN); rpmsg_socket_lock(&server->recvlock); for (tmp = server; tmp->next; tmp = tmp->next) { if (++cnt >= server->backlog) { /* Reject the connection */ rpmsg_socket_unlock(&server->recvlock); rpmsg_destroy_ept(&new->ept); rpmsg_socket_free(new); return; } } tmp->next = new; new->psock = server->psock; rpmsg_socket_unlock(&server->recvlock); rpmsg_socket_post(&server->recvsem); rpmsg_socket_pollnotify(server, POLLIN); } static int rpmsg_socket_getaddr(FAR struct rpmsg_socket_conn_s *conn, FAR struct sockaddr *addr, FAR socklen_t *addrlen) { if (!addr || *addrlen < sizeof(struct sockaddr_rpmsg)) { return -EINVAL; } memcpy(addr, &conn->rpaddr, sizeof(struct sockaddr_rpmsg)); *addrlen = sizeof(struct sockaddr_rpmsg); return 0; } static int rpmsg_socket_setaddr(FAR struct rpmsg_socket_conn_s *conn, FAR const struct sockaddr *addr, socklen_t addrlen, bool suffix) { FAR struct sockaddr_rpmsg *rpaddr = (FAR struct sockaddr_rpmsg *)addr; if (rpaddr->rp_family != AF_RPMSG || addrlen != sizeof(struct sockaddr_rpmsg)) { return -EINVAL; } memcpy(&conn->rpaddr, rpaddr, sizeof(struct sockaddr_rpmsg)); if (suffix) { size_t len; rpaddr = &conn->rpaddr; len = strlen(rpaddr->rp_name); snprintf(&rpaddr->rp_name[len], sizeof(rpaddr->rp_name) - len - 1, ":%u", g_rpmsg_id++); } return 0; } static int rpmsg_socket_setup(FAR struct socket *psock, int protocol) { FAR struct rpmsg_socket_conn_s *conn; conn = rpmsg_socket_alloc(); if (!conn) { return -ENOMEM; } psock->s_conn = conn; conn->psock = psock; return 0; } static sockcaps_t rpmsg_socket_sockcaps(FAR struct socket *psock) { return SOCKCAP_NONBLOCKING; } static void rpmsg_socket_addref(FAR struct socket *psock) { FAR struct rpmsg_socket_conn_s *conn = psock->s_conn; conn->crefs++; } static int rpmsg_socket_bind(FAR struct socket *psock, FAR const struct sockaddr *addr, socklen_t addrlen) { return rpmsg_socket_setaddr(psock->s_conn, addr, addrlen, false); } static int rpmsg_socket_getsockname(FAR struct socket *psock, FAR struct sockaddr *addr, FAR socklen_t *addrlen) { int ret; ret = rpmsg_socket_getaddr(psock->s_conn, addr, addrlen); if (ret >= 0) { strncpy(((struct sockaddr_rpmsg *)addr)->rp_cpu, CONFIG_RPTUN_LOCAL_CPUNAME, RPMSG_SOCKET_CPU_SIZE); } return ret; } static int rpmsg_socket_getconnname(FAR struct socket *psock, FAR struct sockaddr *addr, FAR socklen_t *addrlen) { return rpmsg_socket_getaddr(psock->s_conn, addr, addrlen); } static int rpmsg_socket_listen(FAR struct socket *psock, int backlog) { FAR struct rpmsg_socket_conn_s *server = psock->s_conn; if (psock->s_type != SOCK_STREAM) { return -ENOSYS; } if (!_SS_ISBOUND(psock->s_flags) || backlog <= 0) { return -EINVAL; } server->backlog = backlog; return rpmsg_register_callback(server, NULL, NULL, rpmsg_socket_ns_bind); } static int rpmsg_socket_connect_internal(FAR struct socket *psock) { FAR struct rpmsg_socket_conn_s *conn = psock->s_conn; int ret; ret = circbuf_resize(&conn->recvbuf, CONFIG_NET_RPMSG_RXBUF_SIZE); if (ret < 0) { return ret; } ret = rpmsg_register_callback(conn, rpmsg_socket_device_created, rpmsg_socket_device_destroy, NULL); if (ret < 0) { return ret; } if (conn->sendsize == 0) { if (_SS_ISNONBLOCK(psock->s_flags)) { return -EINPROGRESS; } ret = net_timedwait(&conn->sendsem, _SO_TIMEOUT(psock->s_rcvtimeo)); if (ret < 0) { rpmsg_unregister_callback(conn, rpmsg_socket_device_created, rpmsg_socket_device_destroy, NULL); } } return ret; } static int rpmsg_socket_connect(FAR struct socket *psock, FAR const struct sockaddr *addr, socklen_t addrlen) { FAR struct rpmsg_socket_conn_s *conn = psock->s_conn; int ret; if (_SS_ISCONNECTED(psock->s_flags)) { return -EISCONN; } ret = rpmsg_socket_setaddr(conn, addr, addrlen, psock->s_type == SOCK_STREAM); if (ret < 0) { return ret; } return rpmsg_socket_connect_internal(psock); } static int rpmsg_socket_accept(FAR struct socket *psock, FAR struct sockaddr *addr, FAR socklen_t *addrlen, FAR struct socket *newsock) { FAR struct rpmsg_socket_conn_s *server = psock->s_conn; int ret = 0; if (server->backlog == -1) { return -ECONNRESET; } if (!_SS_ISLISTENING(psock->s_flags)) { return -EINVAL; } while (1) { FAR struct rpmsg_socket_conn_s *conn = NULL; rpmsg_socket_lock(&server->recvlock); if (server->next) { conn = server->next; server->next = conn->next; conn->next = NULL; } rpmsg_socket_unlock(&server->recvlock); if (conn) { rpmsg_register_callback(conn, rpmsg_socket_device_created, rpmsg_socket_device_destroy, NULL); if (conn->sendsize == 0) { net_lockedwait(&conn->sendsem); } newsock->s_domain = psock->s_domain; newsock->s_sockif = psock->s_sockif; newsock->s_type = SOCK_STREAM; newsock->s_conn = conn; rpmsg_socket_getaddr(conn, addr, addrlen); break; } else { if (_SS_ISNONBLOCK(psock->s_flags)) { ret = -EAGAIN; break; } else { ret = net_lockedwait(&server->recvsem); if (server->backlog == -1) { ret = -ECONNRESET; } if (ret < 0) { break; } } } } return ret; } static int rpmsg_socket_poll(FAR struct socket *psock, FAR struct pollfd *fds, bool setup) { FAR struct rpmsg_socket_conn_s *conn = psock->s_conn; pollevent_t eventset = 0; int ret = 0; int i; if (setup) { for (i = 0; i < CONFIG_NET_RPMSG_NPOLLWAITERS; i++) { /* Find an available slot */ if (!conn->fds[i]) { /* Bind the poll structure and this slot */ conn->fds[i] = fds; fds->priv = &conn->fds[i]; break; } } if (i >= CONFIG_NET_RPMSG_NPOLLWAITERS) { fds->priv = NULL; ret = -EBUSY; goto errout; } /* Immediately notify on any of the requested events */ if (_SS_ISLISTENING(psock->s_flags)) { if (conn->backlog == -1) { ret = -ECONNRESET; goto errout; } if (conn->next) { eventset |= (fds->events & POLLIN); } } else if (_SS_ISCONNECTED(psock->s_flags)) { if (!conn->ept.rdev) { ret = -ECONNRESET; goto errout; } rpmsg_socket_lock(&conn->sendlock); if (rpmsg_socket_get_space(conn) > 0) { eventset |= (fds->events & POLLOUT); } rpmsg_socket_unlock(&conn->sendlock); rpmsg_socket_lock(&conn->recvlock); if (!circbuf_is_empty(&conn->recvbuf)) { eventset |= (fds->events & POLLIN); } rpmsg_socket_unlock(&conn->recvlock); } else if (!_SS_ISCONNECTED(psock->s_flags) && _SS_ISNONBLOCK(psock->s_flags)) { ret = OK; } else { ret = -EPERM; goto errout; } if (eventset) { rpmsg_socket_pollnotify(conn, eventset); } } else if (fds->priv != NULL) { for (i = 0; i < CONFIG_NET_RPMSG_NPOLLWAITERS; i++) { if (fds == conn->fds[i]) { conn->fds[i] = NULL; fds->priv = NULL; break; } } } errout: return ret; } static uint32_t rpmsg_socket_get_iovlen(FAR const struct iovec *buf, size_t iovcnt) { uint32_t len = 0; while (iovcnt--) { len += (buf++)->iov_len; } return len; } static ssize_t rpmsg_socket_send_continuous(FAR struct socket *psock, FAR const struct iovec *buf, size_t iovcnt) { FAR struct rpmsg_socket_conn_s *conn = psock->s_conn; uint32_t len = rpmsg_socket_get_iovlen(buf, iovcnt); uint32_t written = 0; uint32_t offset = 0; int ret = 0; while (written < len) { FAR struct rpmsg_socket_data_s *msg; uint32_t block_written = 0; uint32_t ipcsize; uint32_t block; rpmsg_socket_lock(&conn->sendlock); block = MIN(len - written, rpmsg_socket_get_space(conn)); rpmsg_socket_unlock(&conn->sendlock); if (block == 0) { if (!_SS_ISNONBLOCK(psock->s_flags)) { ret = net_timedwait(&conn->sendsem, _SO_TIMEOUT(psock->s_sndtimeo)); if (!conn->ept.rdev) { ret = -ECONNRESET; } if (ret < 0) { break; } } else { ret = -EAGAIN; break; } continue; } msg = rpmsg_get_tx_payload_buffer(&conn->ept, &ipcsize, true); if (!msg) { ret = -EINVAL; break; } rpmsg_socket_lock(&conn->sendlock); block = MIN(len - written, rpmsg_socket_get_space(conn)); block = MIN(block, ipcsize - sizeof(*msg)); msg->cmd = RPMSG_SOCKET_CMD_DATA; msg->pos = conn->recvpos; msg->len = block; while (block_written < block) { uint32_t chunk = MIN(block - block_written, buf->iov_len - offset); memcpy(msg->data + block_written, (FAR const char *)buf->iov_base + offset, chunk); offset += chunk; if (offset == buf->iov_len) { buf++; offset = 0; } block_written += chunk; } conn->lastpos = conn->recvpos; conn->sendpos += msg->len; rpmsg_socket_unlock(&conn->sendlock); ret = rpmsg_send_nocopy(&conn->ept, msg, block + sizeof(*msg)); if (ret < 0) { break; } written += block; } return written ? written : ret; } static ssize_t rpmsg_socket_send_single(FAR struct socket *psock, FAR const struct iovec *buf, size_t iovcnt) { FAR struct rpmsg_socket_conn_s *conn = psock->s_conn; FAR struct rpmsg_socket_data_s *msg; uint32_t len = rpmsg_socket_get_iovlen(buf, iovcnt); uint32_t total = len + sizeof(*msg) + sizeof(uint32_t); uint32_t written = 0; uint32_t ipcsize; uint32_t space; char *msgpos; int ret; if (total > conn->sendsize) { return -EFBIG; } while (1) { rpmsg_socket_lock(&conn->sendlock); space = rpmsg_socket_get_space(conn); rpmsg_socket_unlock(&conn->sendlock); if (space >= total - sizeof(*msg)) break; if (!_SS_ISNONBLOCK(psock->s_flags)) { ret = net_timedwait(&conn->sendsem, _SO_TIMEOUT(psock->s_sndtimeo)); if (!conn->ept.rdev) { ret = -ECONNRESET; } if (ret < 0) { return ret; } } else { return -EAGAIN; } } msg = rpmsg_get_tx_payload_buffer(&conn->ept, &ipcsize, true); if (!msg) { return -EINVAL; } rpmsg_socket_lock(&conn->sendlock); space = rpmsg_socket_get_space(conn); total = MIN(total, space + sizeof(*msg)); total = MIN(total, ipcsize); len = total - sizeof(*msg) - sizeof(uint32_t); /* SOCK_DGRAM need write len to buffer */ msg->cmd = RPMSG_SOCKET_CMD_DATA; msg->pos = conn->recvpos; msg->len = len; memcpy(msg->data, &len, sizeof(uint32_t)); msgpos = msg->data + sizeof(uint32_t); while (written < len) { if (len - written < buf->iov_len) { memcpy(msgpos, buf->iov_base, len - written); written = len; } else { memcpy(msgpos, buf->iov_base, buf->iov_len); written += buf->iov_len; msgpos += buf->iov_len; buf++; } } conn->lastpos = conn->recvpos; conn->sendpos += len + sizeof(uint32_t); rpmsg_socket_unlock(&conn->sendlock); ret = rpmsg_send_nocopy(&conn->ept, msg, total); return ret > 0 ? len : ret; } static ssize_t rpmsg_socket_send_internal(FAR struct socket *psock, FAR const void *buf, size_t len, int flags) { FAR struct rpmsg_socket_conn_s *conn = psock->s_conn; if (!conn->ept.rdev) { /* return ECONNRESET if lower IPC closed */ return -ECONNRESET; } if (psock->s_type == SOCK_STREAM) { return rpmsg_socket_send_continuous(psock, buf, len); } else { return rpmsg_socket_send_single(psock, buf, len); } } static ssize_t rpmsg_socket_sendmsg(FAR struct socket *psock, FAR struct msghdr *msg, int flags) { FAR const struct iovec *buf = msg->msg_iov; size_t len = msg->msg_iovlen; FAR const struct sockaddr *to = msg->msg_name; socklen_t tolen = msg->msg_namelen; ssize_t ret; if (!_SS_ISCONNECTED(psock->s_flags)) { if (to == NULL) { return -ENOTCONN; } ret = rpmsg_socket_connect(psock, to, tolen); if (ret < 0) { return ret; } } return rpmsg_socket_send_internal(psock, buf, len, flags); } static ssize_t rpmsg_socket_recvmsg(FAR struct socket *psock, FAR struct msghdr *msg, int flags) { FAR void *buf = msg->msg_iov->iov_base; size_t len = msg->msg_iov->iov_len; FAR struct sockaddr *from = msg->msg_name; FAR socklen_t *fromlen = &msg->msg_namelen; FAR struct rpmsg_socket_conn_s *conn = psock->s_conn; ssize_t ret; if (psock->s_type != SOCK_STREAM && _SS_ISBOUND(psock->s_flags) && !_SS_ISCONNECTED(psock->s_flags)) { ret = rpmsg_socket_connect_internal(psock); if (ret < 0) { return ret; } } if (!_SS_ISCONNECTED(psock->s_flags)) { return -EISCONN; } rpmsg_socket_lock(&conn->recvlock); if (psock->s_type != SOCK_STREAM) { uint32_t datalen; ret = circbuf_read(&conn->recvbuf, &datalen, sizeof(uint32_t)); if (ret > 0) { ret = circbuf_read(&conn->recvbuf, buf, MIN(datalen, len)); if (ret > 0 && datalen > ret) { circbuf_skip(&conn->recvbuf, datalen - ret); } conn->recvpos += datalen + sizeof(uint32_t); } } else { ret = circbuf_read(&conn->recvbuf, buf, len); conn->recvpos += ret > 0 ? ret : 0; } if (ret > 0) { goto out; } if (!conn->ept.rdev) { /* return EOF if lower IPC closed */ ret = 0; goto out; } if (_SS_ISNONBLOCK(psock->s_flags)) { ret = -EAGAIN; goto out; } conn->recvdata = buf; conn->recvlen = len; rpmsg_socket_unlock(&conn->recvlock); ret = net_timedwait(&conn->recvsem, _SO_TIMEOUT(psock->s_rcvtimeo)); if (!conn->ept.rdev) { ret = -ECONNRESET; } rpmsg_socket_lock(&conn->recvlock); if (!conn->recvdata) { ret = conn->recvlen; } else { conn->recvdata = NULL; } out: rpmsg_socket_unlock(&conn->recvlock); if (ret > 0) { rpmsg_socket_wakeup(conn); rpmsg_socket_getaddr(conn, from, fromlen); } return ret; } static int rpmsg_socket_close(FAR struct socket *psock) { FAR struct rpmsg_socket_conn_s *conn = psock->s_conn; if (conn->crefs > 1) { conn->crefs--; return 0; } if (conn->backlog) { rpmsg_unregister_callback(conn, NULL, NULL, rpmsg_socket_ns_bind); } else { rpmsg_unregister_callback(conn, rpmsg_socket_device_created, rpmsg_socket_device_destroy, NULL); } rpmsg_socket_destroy_ept(conn); rpmsg_socket_free(conn); return 0; }