webclient: Implement non-blocking I/O

* webclient_perform
    * Add a new flag to use non-blocking mode (WEBCLIENT_FLAG_NON_BLOCKING)
    * Implement restarting

* Add a few associated API functions
    * webclient_get_poll_info: get the descriptor info for poll/select
    * webclient_abort: abort the operation (instead of restarting)
This commit is contained in:
YAMAMOTO Takashi 2021-02-17 17:17:42 +09:00 committed by Xiang Xiao
parent d4d2f13f89
commit 0bae950b63
2 changed files with 663 additions and 322 deletions

View File

@ -77,6 +77,39 @@
# endif
#endif
/* The following WEBCLIENT_FLAG_xxx constants are for
* webclient_context::flags.
*/
/* WEBCLIENT_FLAG_NON_BLOCKING tells webclient_perform() to
* use non-blocking I/O.
*
* If this flag is set, webclient_perform() returns -EAGAIN
* when it would otherwise block for network I/O. In that case,
* the application should either retry the operation later by calling
* webclient_perform() again, or abort it by calling webclient_abort().
* It can also use webclient_get_poll_info() to avoid busy-retrying.
*
* If this flag is set, it's the application's responsibility to
* implement a timeout.
*
* If the application specifies tls_ops, it's the application's
* responsibility to make the TLS implementation to use non-blocking I/O
* in addition to specifying this flag.
*
* Caveat: Even when this flag is set, the current implementation performs
* the name resolution in a blocking manner.
*/
#define WEBCLIENT_FLAG_NON_BLOCKING 1U
/* The following WEBCLIENT_FLAG_xxx constants are for
* webclient_poll_info::flags.
*/
#define WEBCLIENT_POLL_INFO_WANT_READ 1U
#define WEBCLIENT_POLL_INFO_WANT_WRITE 2U
/****************************************************************************
* Public types
****************************************************************************/
@ -181,6 +214,7 @@ typedef CODE int (*webclient_body_callback_t)(
FAR void *ctx);
struct webclient_tls_connection;
struct webclient_poll_info;
struct webclient_tls_ops
{
@ -196,6 +230,9 @@ struct webclient_tls_ops
FAR void *buf, size_t len);
CODE int (*close)(FAR void *ctx,
FAR struct webclient_tls_connection *conn);
CODE int (*get_poll_info)(FAR void *ctx,
FAR struct webclient_tls_connection *conn,
FAR struct webclient_poll_info *info);
};
struct webclient_context
@ -248,6 +285,7 @@ struct webclient_context
* tls_ops - A vector to implement TLS operations.
* NULL means no https support.
* tls_ctx - A user pointer to be passed to tls_ops as it is.
* flags - OR'ed WEBCLIENT_FLAG_xxx values.
*/
FAR char *buffer;
@ -261,6 +299,7 @@ struct webclient_context
FAR void *body_callback_arg;
FAR const struct webclient_tls_ops *tls_ops;
FAR void *tls_ctx;
unsigned int flags;
/* results
*
@ -275,6 +314,16 @@ struct webclient_context
unsigned int http_status;
FAR char *http_reason;
size_t http_reason_len;
struct wget_s *ws;
};
struct webclient_poll_info
{
/* A file descriptor to wait for i/o. */
int fd;
unsigned int flags; /* OR'ed WEBCLIENT_POLL_INFO_xxx flags */
};
/****************************************************************************
@ -329,9 +378,12 @@ int wget_post(FAR const char *url, FAR const char *posts, FAR char *buffer,
void webclient_set_defaults(FAR struct webclient_context *ctx);
int webclient_perform(FAR struct webclient_context *ctx);
void webclient_abort(FAR struct webclient_context *ctx);
void webclient_set_static_body(FAR struct webclient_context *ctx,
FAR const void *body,
size_t bodylen);
int webclient_get_poll_info(FAR struct webclient_context *ctx,
FAR struct webclient_poll_info *info);
#undef EXTERN
#ifdef __cplusplus

View File

@ -53,6 +53,8 @@
#include <nuttx/compiler.h>
#include <debug.h>
#include <assert.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <stdint.h>
@ -111,11 +113,6 @@
# define CONFIG_WEBCLIENT_MAX_REDIRECT 50
#endif
#define WEBCLIENT_STATE_STATUSLINE 0
#define WEBCLIENT_STATE_HEADERS 1
#define WEBCLIENT_STATE_DATA 2
#define WEBCLIENT_STATE_CLOSE 3
#define HTTPSTATUS_NONE 0
#define HTTPSTATUS_OK 1
#define HTTPSTATUS_MOVED 2
@ -128,15 +125,49 @@
#define WGET_MODE_GET 0
#define WGET_MODE_POST 1
/* The following CONN_ flags are for conn::flags.
*/
#define CONN_WANT_READ WEBCLIENT_POLL_INFO_WANT_READ
#define CONN_WANT_WRITE WEBCLIENT_POLL_INFO_WANT_WRITE
/****************************************************************************
* Private Types
****************************************************************************/
enum webclient_state_e
{
WEBCLIENT_STATE_SOCKET,
WEBCLIENT_STATE_CONNECT,
WEBCLIENT_STATE_PREPARE_REQUEST,
WEBCLIENT_STATE_SEND_REQUEST,
WEBCLIENT_STATE_SEND_REQUEST_BODY,
WEBCLIENT_STATE_STATUSLINE,
WEBCLIENT_STATE_HEADERS,
WEBCLIENT_STATE_DATA,
WEBCLIENT_STATE_CLOSE,
WEBCLIENT_STATE_DONE,
};
struct conn
{
bool tls;
/* for !tls */
int sockfd;
unsigned int flags;
/* for tls */
struct webclient_tls_connection *tls_conn;
};
struct wget_s
{
/* Internal status */
uint8_t state;
enum webclient_state_e state;
uint8_t httpstatus;
uint16_t port; /* The port number to use in the connection */
@ -160,13 +191,18 @@ struct wget_s
char scheme[sizeof("https") + 1];
char hostname[CONFIG_WEBCLIENT_MAXHOSTNAME];
char filename[CONFIG_WEBCLIENT_MAXFILENAME];
};
struct conn
{
bool tls;
int sockfd;
struct webclient_tls_connection *tls_conn;
bool need_conn_close;
struct conn conn;
unsigned int nredirect;
int redirected;
/* progress and todo for the current state (WEBCLIENT_STATE_) */
off_t state_offset;
size_t state_len;
FAR const void *data_buffer;
size_t data_len;
};
/****************************************************************************
@ -217,6 +253,11 @@ static ssize_t conn_send(struct webclient_context *ctx, struct conn *conn,
ssize_t ret = send(conn->sockfd, buffer, len, 0);
if (ret == -1)
{
if (errno == EAGAIN)
{
conn->flags |= CONN_WANT_WRITE;
}
return -errno;
}
@ -224,33 +265,6 @@ static ssize_t conn_send(struct webclient_context *ctx, struct conn *conn,
}
}
/****************************************************************************
* Name: conn_send_all
****************************************************************************/
static int conn_send_all(struct webclient_context *ctx, struct conn *conn,
const void *buffer, size_t len)
{
size_t left = len;
ssize_t ret;
while (left > 0)
{
ret = conn_send(ctx, conn, buffer, left);
if (ret < 0)
{
goto errout;
}
buffer += ret;
left -= ret;
}
return len;
errout:
return ret;
}
/****************************************************************************
* Name: conn_recv
****************************************************************************/
@ -267,6 +281,11 @@ static ssize_t conn_recv(struct webclient_context *ctx, struct conn *conn,
ssize_t ret = recv(conn->sockfd, buffer, len, 0);
if (ret == -1)
{
if (errno == EAGAIN)
{
conn->flags |= CONN_WANT_READ;
}
return -errno;
}
@ -746,11 +765,9 @@ int webclient_perform(FAR struct webclient_context *ctx)
{
struct wget_s *ws;
struct timeval tv;
bool redirected;
unsigned int nredirect = 0;
char *dest;
char *ep;
struct conn conn;
struct conn *conn;
FAR const struct webclient_tls_ops *tls_ops = ctx->tls_ops;
FAR const char *method = ctx->method;
FAR void *tls_ctx = ctx->tls_ctx;
@ -760,6 +777,8 @@ int webclient_perform(FAR struct webclient_context *ctx)
/* Initialize the state structure */
if (ctx->ws == NULL)
{
ws = calloc(1, sizeof(struct wget_s));
if (!ws)
{
@ -781,19 +800,28 @@ int webclient_perform(FAR struct webclient_context *ctx)
return ret;
}
ws->state = WEBCLIENT_STATE_SOCKET;
ctx->ws = ws;
}
ws = ctx->ws;
ninfo("hostname='%s' filename='%s'\n", ws->hostname, ws->filename);
/* The following sequence may repeat indefinitely if we are redirected */
conn = &ws->conn;
do
{
if (ws->state == WEBCLIENT_STATE_SOCKET)
{
if (!strcmp(ws->scheme, "https") && tls_ops != NULL)
{
conn.tls = true;
conn->tls = true;
}
else if (!strcmp(ws->scheme, "http"))
{
conn.tls = false;
conn->tls = false;
}
else
{
@ -811,8 +839,77 @@ int webclient_perform(FAR struct webclient_context *ctx)
ws->offset = 0;
ws->datend = 0;
ws->ndx = 0;
ws->redirected = 0;
if (conn.tls)
if (conn->tls)
{
#if defined(CONFIG_WEBCLIENT_NET_LOCAL)
if (ctx->unix_socket_path != NULL)
{
nerr("ERROR: TLS on AF_LOCAL socket is not implemented\n");
free(ws);
return -ENOTSUP;
}
#endif
}
else
{
int domain;
#if defined(CONFIG_WEBCLIENT_NET_LOCAL)
if (ctx->unix_socket_path != NULL)
{
domain = PF_LOCAL;
}
else
#endif
{
domain = PF_INET;
}
/* Create a socket */
conn->sockfd = socket(domain, SOCK_STREAM, 0);
if (conn->sockfd < 0)
{
ret = -errno;
nerr("ERROR: socket failed: %d\n", errno);
goto errout_with_errno;
}
ws->need_conn_close = true;
if ((ctx->flags & WEBCLIENT_FLAG_NON_BLOCKING) != 0)
{
int flags = fcntl(conn->sockfd, F_GETFL, 0);
ret = fcntl(conn->sockfd, F_SETFL, flags | O_NONBLOCK);
if (ret == -1)
{
ret = -errno;
nerr("ERROR: F_SETFL failed: %d\n", ret);
goto errout_with_errno;
}
}
else
{
/* Set send and receive timeout values */
tv.tv_sec = ctx->timeout_sec;
tv.tv_usec = 0;
setsockopt(conn->sockfd, SOL_SOCKET, SO_RCVTIMEO,
(FAR const void *)&tv, sizeof(struct timeval));
setsockopt(conn->sockfd, SOL_SOCKET, SO_SNDTIMEO,
(FAR const void *)&tv, sizeof(struct timeval));
}
}
ws->state = WEBCLIENT_STATE_CONNECT;
}
if (ws->state == WEBCLIENT_STATE_CONNECT)
{
if (conn->tls)
{
char port_str[sizeof("65535")];
@ -827,7 +924,12 @@ int webclient_perform(FAR struct webclient_context *ctx)
snprintf(port_str, sizeof(port_str), "%u", ws->port);
ret = tls_ops->connect(tls_ctx, ws->hostname, port_str,
ctx->timeout_sec, &conn.tls_conn);
ctx->timeout_sec,
&conn->tls_conn);
if (ret == 0)
{
ws->need_conn_close = true;
}
}
else
{
@ -835,15 +937,12 @@ int webclient_perform(FAR struct webclient_context *ctx)
struct sockaddr_un server_un;
#endif
struct sockaddr_in server_in;
int domain;
const struct sockaddr *server_address;
socklen_t server_address_len;
#if defined(CONFIG_WEBCLIENT_NET_LOCAL)
if (ctx->unix_socket_path != NULL)
{
domain = PF_LOCAL;
memset(&server_un, 0, sizeof(server_un));
server_un.sun_family = AF_LOCAL;
strncpy(server_un.sun_path, ctx->unix_socket_path,
@ -857,8 +956,6 @@ int webclient_perform(FAR struct webclient_context *ctx)
else
#endif
{
domain = PF_INET;
/* Get the server address from the host name */
server_in.sin_family = AF_INET;
@ -877,48 +974,35 @@ int webclient_perform(FAR struct webclient_context *ctx)
server_address_len = sizeof(struct sockaddr_in);
}
/* Create a socket */
conn.sockfd = socket(domain, SOCK_STREAM, 0);
if (conn.sockfd < 0)
{
/* socket failed. It will set the errno appropriately */
nerr("ERROR: socket failed: %d\n", errno);
free(ws);
return -errno;
}
/* Set send and receive timeout values */
tv.tv_sec = ctx->timeout_sec;
tv.tv_usec = 0;
setsockopt(conn.sockfd, SOL_SOCKET, SO_RCVTIMEO,
(FAR const void *)&tv, sizeof(struct timeval));
setsockopt(conn.sockfd, SOL_SOCKET, SO_SNDTIMEO,
(FAR const void *)&tv, sizeof(struct timeval));
/* Connect to server. First we have to set some fields in the
* 'server' address structure. The system will assign me an
* arbitrary local port that is not in use.
*/
ret = connect(conn.sockfd, server_address, server_address_len);
ret = connect(conn->sockfd, server_address,
server_address_len);
if (ret == -1)
{
ret = -errno;
close(conn.sockfd);
DEBUGASSERT(ret < 0);
if (ret == -EISCONN)
{
ret = 0;
}
}
}
if (ret < 0)
{
nerr("ERROR: connect failed: %d\n", errno);
free(ws);
return ret;
goto errout_with_errno;
}
ws->state = WEBCLIENT_STATE_PREPARE_REQUEST;
}
if (ws->state == WEBCLIENT_STATE_PREPARE_REQUEST)
{
/* Send the request */
dest = ws->buffer;
@ -967,18 +1051,62 @@ int webclient_perform(FAR struct webclient_context *ctx)
len = dest - ws->buffer;
ret = conn_send_all(ctx, &conn, ws->buffer, len);
ws->state = WEBCLIENT_STATE_SEND_REQUEST;
ws->state_offset = 0;
ws->state_len = len;
}
if (ret >= 0)
if (ws->state == WEBCLIENT_STATE_SEND_REQUEST)
{
size_t sent = 0;
ssize_t ssz;
while (sent < ctx->bodylen)
ssz = conn_send(ctx, conn,
ws->buffer + ws->state_offset,
ws->state_len);
if (ssz < 0)
{
ret = ssz;
nerr("ERROR: send failed: %d\n", -ret);
goto errout_with_errno;
}
if (ssz >= 0)
{
ws->state_offset += ssz;
ws->state_len -= ssz;
if (ws->state_len == 0)
{
ws->state = WEBCLIENT_STATE_SEND_REQUEST_BODY;
ws->state_offset = 0;
ws->state_len = ctx->bodylen;
ws->data_buffer = NULL;
ninfo("Sending %zu bytes request body\n", ws->state_len);
}
}
}
if (ws->state == WEBCLIENT_STATE_SEND_REQUEST_BODY)
{
/* In this state,
*
* ws->data_buffer the data given by the user
* ws->data_offset the byte offset in the entire body
* ws->data_len the byte size of the data in ws->data_buffer
* ws->state_offset the send offset in ws->data_buffer
* ws->state_len the number of remaining bytes to send (total)
*/
if (ws->state_len == 0)
{
ninfo("Finished sending request body\n");
ws->state = WEBCLIENT_STATE_STATUSLINE;
}
else if (ws->data_buffer == NULL)
{
FAR const void *input_buffer;
size_t input_buffer_size = ws->buflen;
size_t todo = ctx->bodylen - sent;
size_t todo = ws->state_len;
if (input_buffer_size > todo)
{
input_buffer_size = todo;
@ -996,33 +1124,56 @@ int webclient_perform(FAR struct webclient_context *ctx)
goto errout_with_errno;
}
ret = conn_send_all(ctx, &conn, input_buffer,
input_buffer_size);
if (ret < 0)
ninfo("Got %zu bytes body chunk / total remaining %zu bytes\n",
input_buffer_size, ws->state_len);
ws->data_buffer = input_buffer;
ws->data_len = input_buffer_size;
ws->state_offset = 0;
}
else
{
break;
}
size_t bytes_to_send = ws->data_len - ws->state_offset;
sent += input_buffer_size;
}
}
if (ret < 0)
DEBUGASSERT(bytes_to_send <= ws->state_len);
ssize_t ssz = conn_send(ctx, conn,
ws->data_buffer + ws->state_offset,
bytes_to_send);
if (ssz < 0)
{
ret = ssz;
nerr("ERROR: send failed: %d\n", -ret);
goto errout_with_errno;
}
ninfo("Sent %zd bytes request body at offset %ju "
"in the %zu bytes chunk, "
"total remaining %zu bytes\n",
ssz,
(uintmax_t)ws->state_offset,
ws->data_len,
ws->state_len);
ws->state_len -= ssz;
ws->state_offset += ssz;
DEBUGASSERT(ws->state_offset <= ws->data_len);
if (ws->state_offset == ws->data_len)
{
ws->data_buffer = NULL;
}
}
}
/* Now loop to get the file sent in response to the GET. This
* loop continues until either we read the end of file (nbytes == 0)
* or until we detect that we have been redirected.
*/
ws->state = WEBCLIENT_STATE_STATUSLINE;
redirected = false;
if (ws->state == WEBCLIENT_STATE_STATUSLINE ||
ws->state == WEBCLIENT_STATE_HEADERS ||
ws->state == WEBCLIENT_STATE_DATA)
{
for (; ; )
{
ws->datend = conn_recv(ctx, &conn, ws->buffer, ws->buflen);
ws->datend = conn_recv(ctx, conn, ws->buffer, ws->buflen);
if (ws->datend < 0)
{
ret = ws->datend;
@ -1039,6 +1190,8 @@ int webclient_perform(FAR struct webclient_context *ctx)
}
ninfo("Connection lost\n");
ws->state = WEBCLIENT_STATE_CLOSE;
ws->redirected = 0;
break;
}
@ -1097,30 +1250,58 @@ int webclient_perform(FAR struct webclient_context *ctx)
}
else
{
redirected = true;
nredirect++;
if (nredirect > CONFIG_WEBCLIENT_MAX_REDIRECT)
ws->nredirect++;
if (ws->nredirect > CONFIG_WEBCLIENT_MAX_REDIRECT)
{
nerr("ERROR: too many redirects (%u)\n", nredirect);
goto errout;
nerr("ERROR: too many redirects (%u)\n",
ws->nredirect);
ret = -ELOOP;
goto errout_with_errno;
}
ws->state = WEBCLIENT_STATE_CLOSE;
ws->redirected = 1;
break;
}
}
}
conn_close(ctx, &conn);
}
while (redirected);
if (ws->state == WEBCLIENT_STATE_CLOSE)
{
conn_close(ctx, conn);
ws->need_conn_close = false;
if (ws->redirected)
{
ws->state = WEBCLIENT_STATE_SOCKET;
}
else
{
ws->state = WEBCLIENT_STATE_DONE;
}
}
}
while (ws->state != WEBCLIENT_STATE_DONE);
free(ws);
return OK;
errout:
ret = -errno;
errout_with_errno:
conn_close(ctx, &conn);
if ((ctx->flags & WEBCLIENT_FLAG_NON_BLOCKING) != 0 &&
(ret == -EAGAIN || ret == -EINPROGRESS || ret == -EALREADY))
{
if (ret == -EINPROGRESS || ret == -EALREADY)
{
conn->flags |= CONN_WANT_WRITE;
}
return -EAGAIN;
}
if (ws->need_conn_close)
{
conn_close(ctx, conn);
}
free(ws);
return ret;
@ -1130,6 +1311,38 @@ errout_with_errno:
* Public Functions
****************************************************************************/
/****************************************************************************
* Name: webclient_abort
*
* Description:
* This function is used to free the resources used by webclient_perform()
* in case of non blocking I/O.
*
* After webclient_perform() returned -EAGAIN, the application can either
* retry the operation by calling webclient_perform() again or abort
* the operation by calling this function.
*
****************************************************************************/
void webclient_abort(FAR struct webclient_context *ctx)
{
struct wget_s *ws = ctx->ws;
if (ws == NULL)
{
return;
}
if (ws->need_conn_close)
{
struct conn *conn = &ws->conn;
conn_close(ctx, conn);
}
free(ws);
}
/****************************************************************************
* Name: web_post_str
****************************************************************************/
@ -1304,3 +1517,79 @@ void webclient_set_static_body(FAR struct webclient_context *ctx,
ctx->body_callback_arg = (void *)body; /* discard const */
ctx->bodylen = bodylen;
}
/****************************************************************************
* Name: webclient_get_poll_info
*
* Description:
* This function retrieves the information necessary
* to wait for events when using non blocking I/O.
*
* When using WEBCLIENT_FLAG_NON_BLOCKING, webclient_perform() can
* return -EAGAIN when it would otherwise block for I/O.
* In that case, the application can use this function to
* get the information necessary to wait for the I/O events
* using poll()/select(), by populating the the given
* webclient_poll_info structure with the information.
*
* The following is an example to use this function to handle EAGAIN.
*
* retry:
* ret = webclient_perform(&ctx);
* if (ret == -EAGAIN)
* {
* struct webclient_poll_info info;
* struct pollfd pfd;
*
* ret = webclient_get_poll_info(&ctx, &info);
* if (ret != 0)
* {
* ...
* }
*
* memset(&pfd, 0, sizeof(pfd));
* pfd.fd = info.fd;
* if ((info.flags & WEBCLIENT_POLL_INFO_WANT_READ) != 0)
* {
* pfd.events |= POLLIN;
* }
*
* if ((info.flags & WEBCLIENT_POLL_INFO_WANT_WRITE) != 0)
* {
* pfd.events |= POLLOUT;
* }
*
* ret = poll(&pfd, 1, -1);
* if (ret != 0)
* {
* ...
* }
*
* goto retry;
* }
*
****************************************************************************/
int webclient_get_poll_info(FAR struct webclient_context *ctx,
FAR struct webclient_poll_info *info)
{
struct wget_s *ws;
struct conn *conn;
ws = ctx->ws;
if (ws == NULL)
{
return -EINVAL;
}
conn = &ws->conn;
if (conn->tls)
{
return ctx->tls_ops->get_poll_info(ctx->tls_ctx, conn->tls_conn, info);
}
info->fd = conn->sockfd;
info->flags = conn->flags & (CONN_WANT_READ | CONN_WANT_WRITE);
conn->flags &= ~(CONN_WANT_READ | CONN_WANT_WRITE);
return 0;
}