From e2233e080ef0ad62682f1725e17abb07e939ed50 Mon Sep 17 00:00:00 2001 From: anchao Date: Mon, 10 Dec 2018 19:39:01 +0800 Subject: [PATCH 07/10] rpmsg: bring back zero copy transfer Commit-id: b16ca55 Adding RPMsg Extension layer implementing zero-copy send and receive. Signed-off-by: anchao --- lib/include/openamp/rpmsg.h | 178 ++++++++++++++++++++++++++++++++++++ lib/rpmsg/rpmsg.c | 58 ++++++++++++ lib/rpmsg/rpmsg_internal.h | 3 + lib/rpmsg/rpmsg_virtio.c | 130 +++++++++++++++++++++++++- 4 files changed, 367 insertions(+), 2 deletions(-) diff --git a/lib/include/openamp/rpmsg.h open-amp/lib/include/openamp/rpmsg.h index 1279ea2..0b46ee1 100644 --- a/lib/include/openamp/rpmsg.h +++ open-amp/lib/include/openamp/rpmsg.h @@ -89,9 +89,20 @@ struct rpmsg_endpoint { /** * struct rpmsg_device_ops - RPMsg device operations + * @hold_rx_buffer: hold RPMsg RX buffer + * @release_rx_buffer: release RPMsg RX buffer + * @get_tx_payload_buffer: get RPMsg TX buffer + * @send_offchannel_nocopy: send RPMsg data without copy * @send_offchannel_raw: send RPMsg data */ struct rpmsg_device_ops { + void (*hold_rx_buffer)(struct rpmsg_device *rdev, void *rxbuf); + void (*release_rx_buffer)(struct rpmsg_device *rdev, void *rxbuf); + void *(*get_tx_payload_buffer)(struct rpmsg_device *rdev, + uint32_t *len, int wait); + int (*send_offchannel_nocopy)(struct rpmsg_device *rdev, + uint32_t src, uint32_t dst, + const void *data, int len); int (*send_offchannel_raw)(struct rpmsg_device *rdev, uint32_t src, uint32_t dst, const void *data, int size, int wait); @@ -294,6 +305,173 @@ static inline int rpmsg_trysend_offchannel(struct rpmsg_endpoint *ept, return rpmsg_send_offchannel_raw(ept, src, dst, data, len, false); } +/** + * @brief Holds the rx buffer for usage outside the receive callback. + * + * Calling this function prevents the RPMsg receive buffer from being released + * back to the pool of shmem buffers. This API can only be called at rx + * callback context (rpmsg_rx_cb_t). With this API, the application doesn't + * need to copy the message in rx callback. Instead, the rx buffer base address + * is saved in application context and further processed in application + * process. After the message is processed, the application can release the rx + * buffer for future reuse in vring by calling the rpmsg_release_rx_buffer() + * function. + * + * @param: ept The rpmsg endpoint + * @param: rxbuf RX buffer with message payload + * + * @see rpmsg_release_rx_buffer + */ +void rpmsg_hold_rx_buffer(struct rpmsg_endpoint *ept, void *rxbuf); + +/** + * @brief Releases the rx buffer for future reuse in vring. + * + * This API can be called at process context when the message in rx buffer is + * processed. + * + * @ept: the rpmsg endpoint + * @rxbuf: rx buffer with message payload + * + * @see rpmsg_hold_rx_buffer + */ +void rpmsg_release_rx_buffer(struct rpmsg_endpoint *ept, void *rxbuf); + +/** + * @brief Gets the tx buffer for message payload. + * + * This API can only be called at process context to get the tx buffer in vring. + * By this way, the application can directly put its message into the vring tx + * buffer without copy from an application buffer. + * It is the application responsibility to correctly fill the allocated tx + * buffer by data and passing correct parameters to the rpmsg_send_nocopy() or + * rpmsg_sendto_nocopy() function to perform data no-copy-send mechanism. + * + * @ept: Pointer to rpmsg endpoint + * @len: Pointer to store tx buffer size + * @wait: Boolean, wait or not for buffer to become available + * + * @return The tx buffer address on success and NULL on failure + * + * @see rpmsg_send_offchannel_nocopy + * @see rpmsg_sendto_nocopy + * @see rpmsg_send_nocopy + */ +void *rpmsg_get_tx_payload_buffer(struct rpmsg_endpoint *ept, + uint32_t *len, int wait); + +/** + * rpmsg_send_offchannel_nocopy() - send a message in tx buffer reserved by + * rpmsg_get_tx_payload_buffer() across to the remote processor. + * + * This function sends buf of length len to the remote dst address, + * and uses src as the source address. + * The message will be sent to the remote processor which the ept + * endpoint belongs to. + * The application has to take the responsibility for: + * 1. tx buffer reserved (rpmsg_get_tx_payload_buffer() ) + * 2. filling the data to be sent into the pre-allocated tx buffer + * 3. not exceeding the buffer size when filling the data + * 4. data cache coherency + * + * After the rpmsg_send_offchannel_nocopy() function is issued the tx buffer is + * no more owned by the sending task and must not be touched anymore unless the + * rpmsg_send_offchannel_nocopy() function fails and returns an error. In that + * case the application should try to re-issue the rpmsg_send_offchannel_nocopy() + * again. + * + * @ept: The rpmsg endpoint + * @src: The rpmsg endpoint local address + * @dst: The rpmsg endpoint remote address + * @data: TX buffer with message filled + * @len: Length of payload + * + * @return number of bytes it has sent or negative error value on failure. + * + * @see rpmsg_get_tx_payload_buffer + * @see rpmsg_sendto_nocopy + * @see rpmsg_send_nocopy + */ +int rpmsg_send_offchannel_nocopy(struct rpmsg_endpoint *ept, uint32_t src, + uint32_t dst, const void *data, int len); + +/** + * @brief Sends a message in tx buffer allocated by rpmsg_get_tx_payload_buffer() + * across to the remote processor, specify dst. + * + * This function sends buf of length len to the remote dst address. + * The message will be sent to the remote processor which the ept + * endpoint belongs to, using ept's source address. + * The application has to take the responsibility for: + * 1. tx buffer allocation (rpmsg_get_tx_payload_buffer() ) + * 2. filling the data to be sent into the pre-allocated tx buffer + * 3. not exceeding the buffer size when filling the data + * 4. data cache coherency + * + * After the rpmsg_sendto_nocopy() function is issued the tx buffer is no more + * owned by the sending task and must not be touched anymore unless the + * rpmsg_sendto_nocopy() function fails and returns an error. In that case the + * application should try to re-issue the rpmsg_sendto_nocopy() again. + * + * @ept: The rpmsg endpoint + * @data: TX buffer with message filled + * @len: Length of payload + * @dst: Destination address + * + * @return number of bytes it has sent or negative error value on failure. + * + * @see rpmsg_get_tx_payload_buffer + * @see rpmsg_send_offchannel_nocopy + * @see rpmsg_send_nocopy + */ +static inline int rpmsg_sendto_nocopy(struct rpmsg_endpoint *ept, + const void *data, int len, uint32_t dst) +{ + return rpmsg_send_offchannel_nocopy(ept, ept->addr, dst, data, len); +} + +/** + * rpmsg_send_nocopy() - send a message in tx buffer reserved by + * rpmsg_get_tx_payload_buffer() across to the remote processor. + * + * This function sends buf of length len on the ept endpoint. + * The message will be sent to the remote processor which the ept + * endpoint belongs to, using ept's source and destination addresses. + * The application has to take the responsibility for: + * 1. tx buffer reserved (rpmsg_get_tx_payload_buffer() ) + * 2. filling the data to be sent into the pre-allocated tx buffer + * 3. not exceeding the buffer size when filling the data + * 4. data cache coherency + * + * After the rpmsg_send_nocopy() function is issued the tx buffer is no more + * owned by the sending task and must not be touched anymore unless the + * rpmsg_send_nocopy() function fails and returns an error. In that case the + * application should try to re-issue the rpmsg_send_nocopy() again. + * + * @ept: The rpmsg endpoint + * @data: TX buffer with message filled + * @len: Length of payload + * + * @return number of bytes it has sent or negative error value on failure. + * + * @see rpmsg_get_tx_payload_buffer + * @see rpmsg_send_offchannel_nocopy + * @see rpmsg_sendto_nocopy + */ +static inline int rpmsg_send_nocopy(struct rpmsg_endpoint *ept, const void *data, int len) +{ + int tc = 0; + + for (; tc < RPMSG_TICK_COUNT; tc += RPMSG_TICKS_PER_INTERVAL) { + if (is_rpmsg_ept_ready(ept)) + return rpmsg_send_offchannel_nocopy(ept, ept->addr, + ept->dest_addr, data, len); + metal_sleep_usec(RPMSG_TICKS_PER_INTERVAL); + } + + return RPMSG_ERR_ADDR; +} + /** * rpmsg_init_ept - initialize rpmsg endpoint * diff --git a/lib/rpmsg/rpmsg.c open-amp/lib/rpmsg/rpmsg.c index 9da38a1..0a8de1b 100644 --- a/lib/rpmsg/rpmsg.c +++ open-amp/lib/rpmsg/rpmsg.c @@ -144,6 +144,64 @@ int rpmsg_send_ns_message(struct rpmsg_endpoint *ept, unsigned long flags) return RPMSG_SUCCESS; } +void rpmsg_hold_rx_buffer(struct rpmsg_endpoint *ept, void *rxbuf) +{ + struct rpmsg_device *rdev; + + if (!ept || !ept->rdev || !rxbuf) + return; + + rdev = ept->rdev; + + if (rdev->ops.hold_rx_buffer) + rdev->ops.hold_rx_buffer(rdev, rxbuf); +} + +void rpmsg_release_rx_buffer(struct rpmsg_endpoint *ept, void *rxbuf) +{ + struct rpmsg_device *rdev; + + if (!ept || !ept->rdev || !rxbuf) + return; + + rdev = ept->rdev; + + if (rdev->ops.release_rx_buffer) + rdev->ops.release_rx_buffer(rdev, rxbuf); +} + +void *rpmsg_get_tx_payload_buffer(struct rpmsg_endpoint *ept, + uint32_t *len, int wait) +{ + struct rpmsg_device *rdev; + + if (!ept || !ept->rdev || !len) + return NULL; + + rdev = ept->rdev; + + if (rdev->ops.get_tx_payload_buffer) + return rdev->ops.get_tx_payload_buffer(rdev, len, wait); + + return NULL; +} + +int rpmsg_send_offchannel_nocopy(struct rpmsg_endpoint *ept, uint32_t src, + uint32_t dst, const void *data, int len) +{ + struct rpmsg_device *rdev; + + if (!ept || !ept->rdev || !data || !dst || dst == RPMSG_ADDR_ANY) + return RPMSG_ERR_PARAM; + + rdev = ept->rdev; + + if (rdev->ops.send_offchannel_nocopy) + return rdev->ops.send_offchannel_nocopy(rdev, src, dst, data, len); + + return RPMSG_ERR_PARAM; +} + struct rpmsg_endpoint *rpmsg_get_endpoint(struct rpmsg_device *rdev, const char *name, uint32_t addr, uint32_t dest_addr) diff --git a/lib/rpmsg/rpmsg_internal.h open-amp/lib/rpmsg/rpmsg_internal.h index 9c46970..3db6b24 100644 --- a/lib/rpmsg/rpmsg_internal.h +++ open-amp/lib/rpmsg/rpmsg_internal.h @@ -35,6 +35,9 @@ extern "C" { } while (0) #endif +#define RPMSG_BUF_HELD (1U << 31) /* Flag to suggest to hold the buffer */ + +#define RPMSG_LOCATE_HDR(p) (struct rpmsg_hdr *)((char *)(p) - sizeof(struct rpmsg_hdr)) #define RPMSG_LOCATE_DATA(p) ((unsigned char *)(p) + sizeof(struct rpmsg_hdr)) /** * enum rpmsg_ns_flags - dynamic name service announcement flags diff --git a/lib/rpmsg/rpmsg_virtio.c open-amp/lib/rpmsg/rpmsg_virtio.c index 44b46d4..195189d 100644 --- a/lib/rpmsg/rpmsg_virtio.c +++ open-amp/lib/rpmsg/rpmsg_virtio.c @@ -140,6 +140,7 @@ static void *rpmsg_virtio_get_tx_buffer(struct rpmsg_virtio_device *rvdev, data = rpmsg_virtio_shm_pool_get_buffer(rvdev->shpool, RPMSG_BUFFER_SIZE); *len = RPMSG_BUFFER_SIZE; + *idx = 0; } } #endif /*!VIRTIO_SLAVE_ONLY*/ @@ -253,6 +254,118 @@ static int _rpmsg_virtio_get_buffer_size(struct rpmsg_virtio_device *rvdev) return length; } +static void rpmsg_virtio_hold_rx_buffer(struct rpmsg_device *rdev, void *rxbuf) +{ + struct rpmsg_hdr *rp_hdr; + + rp_hdr = RPMSG_LOCATE_HDR(rxbuf); + + /* Set held status to keep buffer */ + rp_hdr->reserved = RPMSG_BUF_HELD; +} + +static void rpmsg_virtio_release_rx_buffer(struct rpmsg_device *rdev, void *rxbuf) +{ + struct rpmsg_virtio_device *rvdev; + struct rpmsg_hdr *rp_hdr; + uint16_t idx; + uint32_t len; + + rvdev = metal_container_of(rdev, struct rpmsg_virtio_device, rdev); + rp_hdr = RPMSG_LOCATE_HDR(rxbuf); + /* The reserved field contains buffer index */ + idx = rp_hdr->reserved; + + metal_mutex_acquire(&rdev->lock); + /* Return buffer on virtqueue. */ + len = virtqueue_get_buffer_length(rvdev->rvq, idx); + rpmsg_virtio_return_buffer(rvdev, rp_hdr, len, idx); + metal_mutex_release(&rdev->lock); +} + +static void *rpmsg_virtio_get_tx_payload_buffer(struct rpmsg_device *rdev, + uint32_t *len, int wait) +{ + struct rpmsg_virtio_device *rvdev; + struct rpmsg_hdr *rp_hdr; + uint16_t idx; + int tick_count; + + /* Get the associated remote device for channel. */ + rvdev = metal_container_of(rdev, struct rpmsg_virtio_device, rdev); + + if (wait) + tick_count = RPMSG_TICK_COUNT / RPMSG_TICKS_PER_INTERVAL; + else + tick_count = 0; + + while (1) { + /* Lock the device to enable exclusive access to virtqueues */ + metal_mutex_acquire(&rdev->lock); + rp_hdr = rpmsg_virtio_get_tx_buffer(rvdev, len, &idx); + metal_mutex_release(&rdev->lock); + if (rp_hdr || !tick_count) + break; + metal_sleep_usec(RPMSG_TICKS_PER_INTERVAL); + tick_count--; + } + + if (!rp_hdr) + return NULL; + + /* Store the index into the reserved field to be used when sending */ + rp_hdr->reserved = idx; + + /* Actual data buffer size is vring buffer size minus rpmsg header length */ + *len -= sizeof(struct rpmsg_hdr); + return RPMSG_LOCATE_DATA(rp_hdr); +} + +static int rpmsg_virtio_send_offchannel_nocopy(struct rpmsg_device *rdev, + uint32_t src, uint32_t dst, + const void *data, int len) +{ + struct rpmsg_virtio_device *rvdev; + struct rpmsg_hdr rp_hdr; + struct rpmsg_hdr *hdr; + uint16_t idx; + int status; + struct metal_io_region *io; + + /* Get the associated remote device for channel. */ + rvdev = metal_container_of(rdev, struct rpmsg_virtio_device, rdev); + + hdr = RPMSG_LOCATE_HDR(data); + /* The reserved field contains buffer index */ + idx = hdr->reserved; + + /* Initialize RPMSG header. */ + rp_hdr.dst = dst; + rp_hdr.src = src; + rp_hdr.len = len; + rp_hdr.reserved = 0; + rp_hdr.flags = 0; + + /* Copy data to rpmsg buffer. */ + io = rvdev->shbuf_io; + status = metal_io_block_write(io, metal_io_virt_to_offset(io, hdr), + &rp_hdr, sizeof(rp_hdr)); + RPMSG_ASSERT(status == sizeof(rp_hdr), "failed to write header\r\n"); + + metal_mutex_acquire(&rdev->lock); + + /* Enqueue buffer on virtqueue. */ + len = virtqueue_get_buffer_length(rvdev->svq, idx); + status = rpmsg_virtio_enqueue_buffer(rvdev, hdr, len, idx); + RPMSG_ASSERT(status == VQUEUE_SUCCESS, "failed to enqueue buffer\r\n"); + /* Let the other side know that there is a job to process. */ + virtqueue_kick(rvdev->svq); + + metal_mutex_release(&rdev->lock); + + return len; +} + /** * This function sends rpmsg "message" to remote device. * @@ -389,6 +502,8 @@ static void rpmsg_virtio_rx_callback(struct virtqueue *vq) metal_mutex_release(&rdev->lock); while (rp_hdr) { + rp_hdr->reserved = 0; + /* Get the channel node from the remote device channels list. */ metal_mutex_acquire(&rdev->lock); ept = rpmsg_get_ept_from_addr(rdev, rp_hdr->dst); @@ -411,8 +526,15 @@ static void rpmsg_virtio_rx_callback(struct virtqueue *vq) metal_mutex_acquire(&rdev->lock); - /* Return used buffers. */ - rpmsg_virtio_return_buffer(rvdev, rp_hdr, len, idx); + /* Check whether callback wants to hold buffer */ + if (rp_hdr->reserved & RPMSG_BUF_HELD) { + /* 'rp_hdr->reserved' field is now used as storage for + * 'idx' to release buffer later */ + rp_hdr->reserved = idx; + } else { + /* Return used buffers. */ + rpmsg_virtio_return_buffer(rvdev, rp_hdr, len, idx); + } rp_hdr = rpmsg_virtio_get_rx_buffer(rvdev, &len, &idx); if (!rp_hdr) { @@ -522,6 +644,10 @@ int rpmsg_init_vdev(struct rpmsg_virtio_device *rvdev, rvdev->vdev = vdev; rdev->ns_bind_cb = ns_bind_cb; vdev->priv = rvdev; + rdev->ops.hold_rx_buffer = rpmsg_virtio_hold_rx_buffer; + rdev->ops.release_rx_buffer = rpmsg_virtio_release_rx_buffer; + rdev->ops.get_tx_payload_buffer = rpmsg_virtio_get_tx_payload_buffer; + rdev->ops.send_offchannel_nocopy = rpmsg_virtio_send_offchannel_nocopy; rdev->ops.send_offchannel_raw = rpmsg_virtio_send_offchannel_raw; role = rpmsg_virtio_get_role(rvdev); -- 2.17.1