net/local: make the call return of each process consistent with linux

move the accept logic into connect flow.

In order to successfully establish a blocking connection between
the client and server on the same thread.

nonblock is not affected, and the block connect is now the same
as the nonblock flow, other apis are not affected.

Signed-off-by: zhanghongyu <zhanghongyu@xiaomi.com>
This commit is contained in:
zhanghongyu 2023-12-19 17:37:08 +08:00 committed by Xiang Xiao
parent 81fccf96db
commit d50b1778f7
9 changed files with 188 additions and 203 deletions

View File

@ -80,7 +80,6 @@ enum local_state_s
/* SOCK_STREAM peers only */ /* SOCK_STREAM peers only */
LOCAL_STATE_ACCEPT, /* Client waiting for a connection */ LOCAL_STATE_ACCEPT, /* Client waiting for a connection */
LOCAL_STATE_CONNECTING, /* Non-blocking connect */
LOCAL_STATE_CONNECTED, /* Peer connected */ LOCAL_STATE_CONNECTED, /* Peer connected */
LOCAL_STATE_DISCONNECTED /* Peer disconnected */ LOCAL_STATE_DISCONNECTED /* Peer disconnected */
}; };
@ -144,7 +143,6 @@ struct local_conn_s
/* SOCK_STREAM fields common to both client and server */ /* SOCK_STREAM fields common to both client and server */
sem_t lc_waitsem; /* Use to wait for a connection to be accepted */ sem_t lc_waitsem; /* Use to wait for a connection to be accepted */
sem_t lc_donesem; /* Use to wait for client connected done */
FAR struct socket *lc_psock; /* A reference to the socket structure */ FAR struct socket *lc_psock; /* A reference to the socket structure */
/* The following is a list if poll structures of threads waiting for /* The following is a list if poll structures of threads waiting for
@ -169,13 +167,12 @@ struct local_conn_s
dq_queue_t lc_waiters; /* List of connections waiting to be accepted */ dq_queue_t lc_waiters; /* List of connections waiting to be accepted */
} server; } server;
/* Fields unique to the connecting client side */ /* Fields unique to the connecting accept side */
struct struct
{ {
volatile int lc_result; /* Result of the connection operation (client) */
dq_entry_t lc_waiter; /* Linked to the lc_waiters lists */ dq_entry_t lc_waiter; /* Linked to the lc_waiters lists */
} client; } accept;
} u; } u;
#endif /* CONFIG_NET_LOCAL_STREAM */ #endif /* CONFIG_NET_LOCAL_STREAM */
}; };
@ -214,6 +211,20 @@ struct socket; /* Forward reference */
FAR struct local_conn_s *local_alloc(void); FAR struct local_conn_s *local_alloc(void);
/****************************************************************************
* Name: local_alloc_accept
*
* Description:
* Called when a client calls connect and can find the appropriate
* connection in LISTEN. In that case, this function will create
* a new connection and initialize it.
*
****************************************************************************/
int local_alloc_accept(FAR struct local_conn_s *server,
FAR struct local_conn_s *client,
FAR struct local_conn_s **accept);
/**************************************************************************** /****************************************************************************
* Name: local_free * Name: local_free
* *
@ -699,6 +710,16 @@ int local_set_pollthreshold(FAR struct local_conn_s *conn,
unsigned long threshold); unsigned long threshold);
#endif #endif
/****************************************************************************
* Name: local_set_nonblocking
*
* Description:
* Set the local conntion to nonblocking mode
*
****************************************************************************/
int local_set_nonblocking(FAR struct local_conn_s *conn);
#undef EXTERN #undef EXTERN
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -62,7 +62,7 @@ static int local_waitlisten(FAR struct local_conn_s *server)
} }
} }
/* There is a client waiting for the connection */ /* There is an accept conn waiting to be processed */
return OK; return OK;
} }
@ -100,11 +100,10 @@ int local_accept(FAR struct socket *psock, FAR struct sockaddr *addr,
int flags) int flags)
{ {
FAR struct local_conn_s *server; FAR struct local_conn_s *server;
FAR struct local_conn_s *client;
FAR struct local_conn_s *conn; FAR struct local_conn_s *conn;
FAR dq_entry_t *waiter; FAR dq_entry_t *waiter;
bool nonblock = !!(flags & SOCK_NONBLOCK); bool nonblock = !!(flags & SOCK_NONBLOCK);
int ret; int ret = OK;
/* Some sanity checks */ /* Some sanity checks */
@ -133,7 +132,7 @@ int local_accept(FAR struct socket *psock, FAR struct sockaddr *addr,
for (; ; ) for (; ; )
{ {
/* Are there pending connections. Remove the client from the /* Are there pending connections. Remove the accpet from the
* head of the waiting list. * head of the waiting list.
*/ */
@ -141,116 +140,35 @@ int local_accept(FAR struct socket *psock, FAR struct sockaddr *addr,
if (waiter) if (waiter)
{ {
client = container_of(waiter, struct local_conn_s, conn = container_of(waiter, struct local_conn_s,
u.client.lc_waiter); u.accept.lc_waiter);
local_addref(client); /* Decrement the number of pending accpets */
/* Decrement the number of pending clients */
DEBUGASSERT(server->u.server.lc_pending > 0); DEBUGASSERT(server->u.server.lc_pending > 0);
server->u.server.lc_pending--; server->u.server.lc_pending--;
/* Create a new connection structure for the server side of the /* Setup the accpet socket structure */
* connection.
*/
conn = local_alloc(); conn->lc_psock = newsock;
if (!conn)
{
nerr("ERROR: Failed to allocate new connection structure\n");
ret = -ENOMEM;
}
else
{
/* Initialize the new connection structure */
local_addref(conn);
conn->lc_proto = SOCK_STREAM;
conn->lc_type = LOCAL_TYPE_PATHNAME;
conn->lc_state = LOCAL_STATE_CONNECTED;
conn->lc_psock = psock;
conn->lc_peer = client;
client->lc_peer = conn;
strlcpy(conn->lc_path, client->lc_path, sizeof(conn->lc_path));
conn->lc_instance_id = client->lc_instance_id;
/* Open the server-side write-only FIFO. This should not
* block.
*/
ret = local_open_server_tx(conn, nonblock);
if (ret < 0)
{
nerr("ERROR: Failed to open write-only FIFOs for %s: %d\n",
conn->lc_path, ret);
}
}
/* Do we have a connection? Is the write-side FIFO opened? */
if (ret == OK)
{
DEBUGASSERT(conn->lc_outfile.f_inode != NULL);
/* Open the server-side read-only FIFO. This should not
* block because the client side has already opening it
* for writing.
*/
ret = local_open_server_rx(conn, nonblock);
if (ret < 0)
{
nerr("ERROR: Failed to open read-only FIFOs for %s: %d\n",
conn->lc_path, ret);
}
}
/* Do we have a connection? Are the FIFOs opened? */
if (ret == OK)
{
DEBUGASSERT(conn->lc_infile.f_inode != NULL);
/* Return the address family */
if (addr != NULL)
{
ret = local_getaddr(client, addr, addrlen);
}
}
if (ret == OK)
{
/* Setup the client socket structure */
newsock->s_domain = psock->s_domain; newsock->s_domain = psock->s_domain;
newsock->s_type = SOCK_STREAM; newsock->s_type = SOCK_STREAM;
newsock->s_sockif = psock->s_sockif; newsock->s_sockif = psock->s_sockif;
newsock->s_conn = (FAR void *)conn; newsock->s_conn = (FAR void *)conn;
}
/* Signal the client with the result of the connection */ /* Return the address family */
client->u.client.lc_result = ret; if (addr != NULL)
if (client->lc_state == LOCAL_STATE_CONNECTING)
{ {
client->lc_state = LOCAL_STATE_CONNECTED; ret = local_getaddr(conn, addr, addrlen);
_SO_SETERRNO(client->lc_psock, ret);
local_event_pollnotify(client, POLLOUT);
} }
nxsem_post(&client->lc_waitsem); if (ret == OK && nonblock)
if (ret == OK)
{ {
ret = net_sem_wait(&client->lc_donesem); ret = local_set_nonblocking(conn);
} }
local_subref(client);
return ret; return ret;
} }

View File

@ -120,7 +120,6 @@ FAR struct local_conn_s *local_alloc(void)
#ifdef CONFIG_NET_LOCAL_STREAM #ifdef CONFIG_NET_LOCAL_STREAM
nxsem_init(&conn->lc_waitsem, 0, 0); nxsem_init(&conn->lc_waitsem, 0, 0);
nxsem_init(&conn->lc_donesem, 0, 0);
#endif #endif
@ -147,6 +146,87 @@ FAR struct local_conn_s *local_alloc(void)
return conn; return conn;
} }
/****************************************************************************
* Name: local_alloc_accept
*
* Description:
* Called when a client calls connect and can find the appropriate
* connection in LISTEN. In that case, this function will create
* a new connection and initialize it.
*
****************************************************************************/
int local_alloc_accept(FAR struct local_conn_s *server,
FAR struct local_conn_s *client,
FAR struct local_conn_s **accept)
{
FAR struct local_conn_s *conn;
int ret;
/* Create a new connection structure for the server side of the
* connection.
*/
conn = local_alloc();
if (conn == NULL)
{
nerr("ERROR: Failed to allocate new connection structure\n");
return -ENOMEM;
}
/* Initialize the new connection structure */
local_addref(conn);
conn->lc_proto = SOCK_STREAM;
conn->lc_type = LOCAL_TYPE_PATHNAME;
conn->lc_state = LOCAL_STATE_CONNECTED;
conn->lc_peer = client;
client->lc_peer = conn;
strlcpy(conn->lc_path, client->lc_path, sizeof(conn->lc_path));
conn->lc_instance_id = client->lc_instance_id;
/* Open the server-side write-only FIFO. This should not
* block.
*/
ret = local_open_server_tx(conn, false);
if (ret < 0)
{
nerr("ERROR: Failed to open write-only FIFOs for %s: %d\n",
conn->lc_path, ret);
goto err;
}
/* Do we have a connection? Is the write-side FIFO opened? */
DEBUGASSERT(conn->lc_outfile.f_inode != NULL);
/* Open the server-side read-only FIFO. This should not
* block because the client side has already opening it
* for writing.
*/
ret = local_open_server_rx(conn, false);
if (ret < 0)
{
nerr("ERROR: Failed to open read-only FIFOs for %s: %d\n",
conn->lc_path, ret);
goto err;
}
/* Do we have a connection? Are the FIFOs opened? */
DEBUGASSERT(conn->lc_infile.f_inode != NULL);
*accept = conn;
return OK;
err:
local_free(conn);
return ret;
}
/**************************************************************************** /****************************************************************************
* Name: local_free() * Name: local_free()
* *
@ -212,7 +292,6 @@ void local_free(FAR struct local_conn_s *conn)
local_release_fifos(conn); local_release_fifos(conn);
#ifdef CONFIG_NET_LOCAL_STREAM #ifdef CONFIG_NET_LOCAL_STREAM
nxsem_destroy(&conn->lc_waitsem); nxsem_destroy(&conn->lc_waitsem);
nxsem_destroy(&conn->lc_donesem);
#endif #endif
/* Destory sem associated with the connection */ /* Destory sem associated with the connection */

View File

@ -68,6 +68,7 @@ static int inline local_stream_connect(FAR struct local_conn_s *client,
FAR struct local_conn_s *server, FAR struct local_conn_s *server,
bool nonblock) bool nonblock)
{ {
FAR struct local_conn_s *conn;
int ret; int ret;
int sval; int sval;
@ -86,11 +87,6 @@ static int inline local_stream_connect(FAR struct local_conn_s *client,
return -ECONNREFUSED; return -ECONNREFUSED;
} }
/* Increment the number of pending server connection s */
server->u.server.lc_pending++;
DEBUGASSERT(server->u.server.lc_pending != 0);
/* Create the FIFOs needed for the connection */ /* Create the FIFOs needed for the connection */
ret = local_create_fifos(client); ret = local_create_fifos(client);
@ -117,40 +113,16 @@ static int inline local_stream_connect(FAR struct local_conn_s *client,
DEBUGASSERT(client->lc_outfile.f_inode != NULL); DEBUGASSERT(client->lc_outfile.f_inode != NULL);
/* Set the busy "result" before giving the semaphore. */ ret = local_alloc_accept(server, client, &conn);
client->u.client.lc_result = -EBUSY;
client->lc_state = LOCAL_STATE_ACCEPT;
/* Add ourself to the list of waiting connections and notify the server. */
dq_addlast(&client->u.client.lc_waiter, &server->u.server.lc_waiters);
local_event_pollnotify(server, POLLIN);
if (nxsem_get_value(&server->lc_waitsem, &sval) >= 0 && sval < 1)
{
nxsem_post(&server->lc_waitsem);
}
/* Wait for the server to accept the connections */
if (!nonblock)
{
do
{
net_sem_wait_uninterruptible(&client->lc_waitsem);
ret = client->u.client.lc_result;
}
while (ret == -EBUSY);
/* Did we successfully connect? */
if (ret < 0) if (ret < 0)
{ {
nerr("ERROR: Failed to connect: %d\n", ret); nerr("ERROR: Failed to alloc accept conn %s: %d\n",
client->lc_path, ret);
goto errout_with_outfd; goto errout_with_outfd;
} }
}
client->lc_state = LOCAL_STATE_ACCEPT;
/* Yes.. open the read-only FIFO */ /* Yes.. open the read-only FIFO */
@ -159,21 +131,32 @@ static int inline local_stream_connect(FAR struct local_conn_s *client,
{ {
nerr("ERROR: Failed to open write-only FIFOs for %s: %d\n", nerr("ERROR: Failed to open write-only FIFOs for %s: %d\n",
client->lc_path, ret); client->lc_path, ret);
goto errout_with_outfd;
goto errout_with_conn;
} }
DEBUGASSERT(client->lc_infile.f_inode != NULL); DEBUGASSERT(client->lc_infile.f_inode != NULL);
nxsem_post(&client->lc_donesem); /* Increment the number of pending server connections */
if (!nonblock) server->u.server.lc_pending++;
DEBUGASSERT(server->u.server.lc_pending != 0);
/* Add ourself to the list of waiting connections and notify the server. */
dq_addlast(&conn->u.accept.lc_waiter, &server->u.server.lc_waiters);
local_event_pollnotify(server, POLLIN);
if (nxsem_get_value(&server->lc_waitsem, &sval) >= 0 && sval < 1)
{ {
client->lc_state = LOCAL_STATE_CONNECTED; nxsem_post(&server->lc_waitsem);
return ret;
} }
client->lc_state = LOCAL_STATE_CONNECTING; client->lc_state = LOCAL_STATE_CONNECTED;
return -EINPROGRESS; return ret;
errout_with_conn:
local_free(conn);
errout_with_outfd: errout_with_outfd:
file_close(&client->lc_outfile); file_close(&client->lc_outfile);

View File

@ -738,4 +738,30 @@ int local_open_sender(FAR struct local_conn_s *conn, FAR const char *path,
} }
#endif /* CONFIG_NET_LOCAL_DGRAM */ #endif /* CONFIG_NET_LOCAL_DGRAM */
/****************************************************************************
* Name: local_set_nonblocking
*
* Description:
* Set the local conntion to nonblocking mode
*
****************************************************************************/
int local_set_nonblocking(FAR struct local_conn_s *conn)
{
int nonblock = 1;
int ret;
/* Set the conn to nonblocking mode */
ret = file_ioctl(&conn->lc_infile, FIONBIO, &nonblock);
ret |= file_ioctl(&conn->lc_outfile, FIONBIO, &nonblock);
if (ret < 0)
{
nerr("ERROR: Failed to set the conn to nonblocking mode: %d\n", ret);
}
return ret;
}
#endif /* CONFIG_NET && CONFIG_NET_LOCAL */ #endif /* CONFIG_NET && CONFIG_NET_LOCAL */

View File

@ -171,8 +171,7 @@ int local_pollsetup(FAR struct socket *psock, FAR struct pollfd *fds)
} }
#ifdef CONFIG_NET_LOCAL_STREAM #ifdef CONFIG_NET_LOCAL_STREAM
if (conn->lc_state == LOCAL_STATE_LISTENING || if (conn->lc_state == LOCAL_STATE_LISTENING)
conn->lc_state == LOCAL_STATE_CONNECTING)
{ {
return local_event_pollsetup(conn, fds, true); return local_event_pollsetup(conn, fds, true);
} }
@ -324,8 +323,7 @@ int local_pollteardown(FAR struct socket *psock, FAR struct pollfd *fds)
} }
#ifdef CONFIG_NET_LOCAL_STREAM #ifdef CONFIG_NET_LOCAL_STREAM
if (conn->lc_state == LOCAL_STATE_LISTENING || if (conn->lc_state == LOCAL_STATE_LISTENING)
conn->lc_state == LOCAL_STATE_CONNECTING)
{ {
return local_event_pollsetup(conn, fds, false); return local_event_pollsetup(conn, fds, false);
} }

View File

@ -240,11 +240,6 @@ psock_stream_recvfrom(FAR struct socket *psock, FAR void *buf, size_t len,
if (conn->lc_state != LOCAL_STATE_CONNECTED || if (conn->lc_state != LOCAL_STATE_CONNECTED ||
conn->lc_infile.f_inode == NULL) conn->lc_infile.f_inode == NULL)
{ {
if (conn->lc_state == LOCAL_STATE_CONNECTING)
{
return -EAGAIN;
}
nerr("ERROR: not connected\n"); nerr("ERROR: not connected\n");
return -ENOTCONN; return -ENOTCONN;
} }

View File

@ -66,39 +66,11 @@ int local_release(FAR struct local_conn_s *conn)
DEBUGASSERT(conn->lc_state != LOCAL_STATE_ACCEPT); DEBUGASSERT(conn->lc_state != LOCAL_STATE_ACCEPT);
if (conn->lc_state == LOCAL_STATE_CONNECTING)
{
FAR struct local_conn_s *server = NULL;
FAR struct local_conn_s *client;
FAR dq_entry_t *waiter = NULL;
while ((server = local_nextconn(server)) && waiter == NULL)
{
if (server->lc_state == LOCAL_STATE_LISTENING)
{
for (waiter = dq_peek(&server->u.server.lc_waiters);
waiter;
waiter = dq_next(&client->u.client.lc_waiter))
{
if (&conn->u.client.lc_waiter == waiter)
{
dq_rem(waiter, &server->u.server.lc_waiters);
server->u.server.lc_pending--;
break;
}
client = container_of(waiter, struct local_conn_s,
u.client.lc_waiter);
}
}
}
}
/* Is the socket is listening socket (SOCK_STREAM server) */ /* Is the socket is listening socket (SOCK_STREAM server) */
else if (conn->lc_state == LOCAL_STATE_LISTENING) if (conn->lc_state == LOCAL_STATE_LISTENING)
{ {
FAR struct local_conn_s *client; FAR struct local_conn_s *accept;
FAR dq_entry_t *waiter; FAR dq_entry_t *waiter;
DEBUGASSERT(conn->lc_proto == SOCK_STREAM); DEBUGASSERT(conn->lc_proto == SOCK_STREAM);
@ -107,13 +79,11 @@ int local_release(FAR struct local_conn_s *conn)
for (waiter = dq_peek(&conn->u.server.lc_waiters); for (waiter = dq_peek(&conn->u.server.lc_waiters);
waiter; waiter;
waiter = dq_next(&client->u.client.lc_waiter)) waiter = dq_next(&accept->u.accept.lc_waiter))
{ {
client = container_of(waiter, struct local_conn_s, accept = container_of(waiter, struct local_conn_s,
u.client.lc_waiter); u.accept.lc_waiter);
client->u.client.lc_result = -ENOTCONN; local_subref(accept);
nxsem_post(&client->lc_waitsem);
local_event_pollnotify(client, POLLOUT);
} }
conn->u.server.lc_pending = 0; conn->u.server.lc_pending = 0;

View File

@ -189,11 +189,6 @@ static ssize_t local_send(FAR struct socket *psock,
if (peer->lc_state != LOCAL_STATE_CONNECTED || if (peer->lc_state != LOCAL_STATE_CONNECTED ||
peer->lc_outfile.f_inode == NULL) peer->lc_outfile.f_inode == NULL)
{ {
if (peer->lc_state == LOCAL_STATE_CONNECTING)
{
return -EAGAIN;
}
nerr("ERROR: not connected\n"); nerr("ERROR: not connected\n");
return -ENOTCONN; return -ENOTCONN;
} }