nuttx/openamp/0007-rpmsg-bring-back-zero-copy-transfer.patch

471 lines
16 KiB
Diff
Raw Normal View History

From 4224f5f028aec8ebc13e1e203cbae3127b07605e Mon Sep 17 00:00:00 2001
From: anchao <anchao@pinecone.net>
Date: Mon, 10 Dec 2018 19:39:01 +0800
Subject: [PATCH 07/10] rpmsg: bring back zero copy transfer
Commit-id:
b16ca55 Adding RPMsg Extension layer implementing zero-copy send and receive.
Signed-off-by: anchao <anchao@pinecone.net>
---
lib/include/openamp/rpmsg.h | 178 ++++++++++++++++++++++++++++++++++++
lib/rpmsg/rpmsg.c | 58 ++++++++++++
lib/rpmsg/rpmsg_internal.h | 3 +
lib/rpmsg/rpmsg_virtio.c | 130 +++++++++++++++++++++++++-
4 files changed, 367 insertions(+), 2 deletions(-)
diff --git a/lib/include/openamp/rpmsg.h open-amp/lib/include/openamp/rpmsg.h
index 1279ea2..0b46ee1 100644
--- a/lib/include/openamp/rpmsg.h
+++ open-amp/lib/include/openamp/rpmsg.h
@@ -89,9 +89,20 @@ struct rpmsg_endpoint {
/**
* struct rpmsg_device_ops - RPMsg device operations
+ * @hold_rx_buffer: hold RPMsg RX buffer
+ * @release_rx_buffer: release RPMsg RX buffer
+ * @get_tx_payload_buffer: get RPMsg TX buffer
+ * @send_offchannel_nocopy: send RPMsg data without copy
* @send_offchannel_raw: send RPMsg data
*/
struct rpmsg_device_ops {
+ void (*hold_rx_buffer)(struct rpmsg_device *rdev, void *rxbuf);
+ void (*release_rx_buffer)(struct rpmsg_device *rdev, void *rxbuf);
+ void *(*get_tx_payload_buffer)(struct rpmsg_device *rdev,
+ uint32_t *len, int wait);
+ int (*send_offchannel_nocopy)(struct rpmsg_device *rdev,
+ uint32_t src, uint32_t dst,
+ const void *data, int len);
int (*send_offchannel_raw)(struct rpmsg_device *rdev,
uint32_t src, uint32_t dst,
const void *data, int size, int wait);
@@ -294,6 +305,173 @@ static inline int rpmsg_trysend_offchannel(struct rpmsg_endpoint *ept,
return rpmsg_send_offchannel_raw(ept, src, dst, data, len, false);
}
+/**
+ * @brief Holds the rx buffer for usage outside the receive callback.
+ *
+ * Calling this function prevents the RPMsg receive buffer from being released
+ * back to the pool of shmem buffers. This API can only be called at rx
+ * callback context (rpmsg_rx_cb_t). With this API, the application doesn't
+ * need to copy the message in rx callback. Instead, the rx buffer base address
+ * is saved in application context and further processed in application
+ * process. After the message is processed, the application can release the rx
+ * buffer for future reuse in vring by calling the rpmsg_release_rx_buffer()
+ * function.
+ *
+ * @param: ept The rpmsg endpoint
+ * @param: rxbuf RX buffer with message payload
+ *
+ * @see rpmsg_release_rx_buffer
+ */
+void rpmsg_hold_rx_buffer(struct rpmsg_endpoint *ept, void *rxbuf);
+
+/**
+ * @brief Releases the rx buffer for future reuse in vring.
+ *
+ * This API can be called at process context when the message in rx buffer is
+ * processed.
+ *
+ * @ept: the rpmsg endpoint
+ * @rxbuf: rx buffer with message payload
+ *
+ * @see rpmsg_hold_rx_buffer
+ */
+void rpmsg_release_rx_buffer(struct rpmsg_endpoint *ept, void *rxbuf);
+
+/**
+ * @brief Gets the tx buffer for message payload.
+ *
+ * This API can only be called at process context to get the tx buffer in vring.
+ * By this way, the application can directly put its message into the vring tx
+ * buffer without copy from an application buffer.
+ * It is the application responsibility to correctly fill the allocated tx
+ * buffer by data and passing correct parameters to the rpmsg_send_nocopy() or
+ * rpmsg_sendto_nocopy() function to perform data no-copy-send mechanism.
+ *
+ * @ept: Pointer to rpmsg endpoint
+ * @len: Pointer to store tx buffer size
+ * @wait: Boolean, wait or not for buffer to become available
+ *
+ * @return The tx buffer address on success and NULL on failure
+ *
+ * @see rpmsg_send_offchannel_nocopy
+ * @see rpmsg_sendto_nocopy
+ * @see rpmsg_send_nocopy
+ */
+void *rpmsg_get_tx_payload_buffer(struct rpmsg_endpoint *ept,
+ uint32_t *len, int wait);
+
+/**
+ * rpmsg_send_offchannel_nocopy() - send a message in tx buffer reserved by
+ * rpmsg_get_tx_payload_buffer() across to the remote processor.
+ *
+ * This function sends buf of length len to the remote dst address,
+ * and uses src as the source address.
+ * The message will be sent to the remote processor which the ept
+ * endpoint belongs to.
+ * The application has to take the responsibility for:
+ * 1. tx buffer reserved (rpmsg_get_tx_payload_buffer() )
+ * 2. filling the data to be sent into the pre-allocated tx buffer
+ * 3. not exceeding the buffer size when filling the data
+ * 4. data cache coherency
+ *
+ * After the rpmsg_send_offchannel_nocopy() function is issued the tx buffer is
+ * no more owned by the sending task and must not be touched anymore unless the
+ * rpmsg_send_offchannel_nocopy() function fails and returns an error. In that
+ * case the application should try to re-issue the rpmsg_send_offchannel_nocopy()
+ * again.
+ *
+ * @ept: The rpmsg endpoint
+ * @src: The rpmsg endpoint local address
+ * @dst: The rpmsg endpoint remote address
+ * @data: TX buffer with message filled
+ * @len: Length of payload
+ *
+ * @return number of bytes it has sent or negative error value on failure.
+ *
+ * @see rpmsg_get_tx_payload_buffer
+ * @see rpmsg_sendto_nocopy
+ * @see rpmsg_send_nocopy
+ */
+int rpmsg_send_offchannel_nocopy(struct rpmsg_endpoint *ept, uint32_t src,
+ uint32_t dst, const void *data, int len);
+
+/**
+ * @brief Sends a message in tx buffer allocated by rpmsg_get_tx_payload_buffer()
+ * across to the remote processor, specify dst.
+ *
+ * This function sends buf of length len to the remote dst address.
+ * The message will be sent to the remote processor which the ept
+ * endpoint belongs to, using ept's source address.
+ * The application has to take the responsibility for:
+ * 1. tx buffer allocation (rpmsg_get_tx_payload_buffer() )
+ * 2. filling the data to be sent into the pre-allocated tx buffer
+ * 3. not exceeding the buffer size when filling the data
+ * 4. data cache coherency
+ *
+ * After the rpmsg_sendto_nocopy() function is issued the tx buffer is no more
+ * owned by the sending task and must not be touched anymore unless the
+ * rpmsg_sendto_nocopy() function fails and returns an error. In that case the
+ * application should try to re-issue the rpmsg_sendto_nocopy() again.
+ *
+ * @ept: The rpmsg endpoint
+ * @data: TX buffer with message filled
+ * @len: Length of payload
+ * @dst: Destination address
+ *
+ * @return number of bytes it has sent or negative error value on failure.
+ *
+ * @see rpmsg_get_tx_payload_buffer
+ * @see rpmsg_send_offchannel_nocopy
+ * @see rpmsg_send_nocopy
+ */
+static inline int rpmsg_sendto_nocopy(struct rpmsg_endpoint *ept,
+ const void *data, int len, uint32_t dst)
+{
+ return rpmsg_send_offchannel_nocopy(ept, ept->addr, dst, data, len);
+}
+
+/**
+ * rpmsg_send_nocopy() - send a message in tx buffer reserved by
+ * rpmsg_get_tx_payload_buffer() across to the remote processor.
+ *
+ * This function sends buf of length len on the ept endpoint.
+ * The message will be sent to the remote processor which the ept
+ * endpoint belongs to, using ept's source and destination addresses.
+ * The application has to take the responsibility for:
+ * 1. tx buffer reserved (rpmsg_get_tx_payload_buffer() )
+ * 2. filling the data to be sent into the pre-allocated tx buffer
+ * 3. not exceeding the buffer size when filling the data
+ * 4. data cache coherency
+ *
+ * After the rpmsg_send_nocopy() function is issued the tx buffer is no more
+ * owned by the sending task and must not be touched anymore unless the
+ * rpmsg_send_nocopy() function fails and returns an error. In that case the
+ * application should try to re-issue the rpmsg_send_nocopy() again.
+ *
+ * @ept: The rpmsg endpoint
+ * @data: TX buffer with message filled
+ * @len: Length of payload
+ *
+ * @return number of bytes it has sent or negative error value on failure.
+ *
+ * @see rpmsg_get_tx_payload_buffer
+ * @see rpmsg_send_offchannel_nocopy
+ * @see rpmsg_sendto_nocopy
+ */
+static inline int rpmsg_send_nocopy(struct rpmsg_endpoint *ept, const void *data, int len)
+{
+ int tc = 0;
+
+ for (; tc < RPMSG_TICK_COUNT; tc += RPMSG_TICKS_PER_INTERVAL) {
+ if (is_rpmsg_ept_ready(ept))
+ return rpmsg_send_offchannel_nocopy(ept, ept->addr,
+ ept->dest_addr, data, len);
+ metal_sleep_usec(RPMSG_TICKS_PER_INTERVAL);
+ }
+
+ return RPMSG_ERR_ADDR;
+}
+
/**
* rpmsg_init_ept - initialize rpmsg endpoint
*
diff --git a/lib/rpmsg/rpmsg.c open-amp/lib/rpmsg/rpmsg.c
index 9da38a1..0a8de1b 100644
--- a/lib/rpmsg/rpmsg.c
+++ open-amp/lib/rpmsg/rpmsg.c
@@ -144,6 +144,64 @@ int rpmsg_send_ns_message(struct rpmsg_endpoint *ept, unsigned long flags)
return RPMSG_SUCCESS;
}
+void rpmsg_hold_rx_buffer(struct rpmsg_endpoint *ept, void *rxbuf)
+{
+ struct rpmsg_device *rdev;
+
+ if (!ept || !ept->rdev || !rxbuf)
+ return;
+
+ rdev = ept->rdev;
+
+ if (rdev->ops.hold_rx_buffer)
+ rdev->ops.hold_rx_buffer(rdev, rxbuf);
+}
+
+void rpmsg_release_rx_buffer(struct rpmsg_endpoint *ept, void *rxbuf)
+{
+ struct rpmsg_device *rdev;
+
+ if (!ept || !ept->rdev || !rxbuf)
+ return;
+
+ rdev = ept->rdev;
+
+ if (rdev->ops.release_rx_buffer)
+ rdev->ops.release_rx_buffer(rdev, rxbuf);
+}
+
+void *rpmsg_get_tx_payload_buffer(struct rpmsg_endpoint *ept,
+ uint32_t *len, int wait)
+{
+ struct rpmsg_device *rdev;
+
+ if (!ept || !ept->rdev || !len)
+ return NULL;
+
+ rdev = ept->rdev;
+
+ if (rdev->ops.get_tx_payload_buffer)
+ return rdev->ops.get_tx_payload_buffer(rdev, len, wait);
+
+ return NULL;
+}
+
+int rpmsg_send_offchannel_nocopy(struct rpmsg_endpoint *ept, uint32_t src,
+ uint32_t dst, const void *data, int len)
+{
+ struct rpmsg_device *rdev;
+
+ if (!ept || !ept->rdev || !data || !dst || dst == RPMSG_ADDR_ANY)
+ return RPMSG_ERR_PARAM;
+
+ rdev = ept->rdev;
+
+ if (rdev->ops.send_offchannel_nocopy)
+ return rdev->ops.send_offchannel_nocopy(rdev, src, dst, data, len);
+
+ return RPMSG_ERR_PARAM;
+}
+
struct rpmsg_endpoint *rpmsg_get_endpoint(struct rpmsg_device *rdev,
const char *name, uint32_t addr,
uint32_t dest_addr)
diff --git a/lib/rpmsg/rpmsg_internal.h open-amp/lib/rpmsg/rpmsg_internal.h
index 9c46970..3db6b24 100644
--- a/lib/rpmsg/rpmsg_internal.h
+++ open-amp/lib/rpmsg/rpmsg_internal.h
@@ -35,6 +35,9 @@ extern "C" {
} while (0)
#endif
+#define RPMSG_BUF_HELD (1U << 31) /* Flag to suggest to hold the buffer */
+
+#define RPMSG_LOCATE_HDR(p) (struct rpmsg_hdr *)((char *)(p) - sizeof(struct rpmsg_hdr))
#define RPMSG_LOCATE_DATA(p) ((unsigned char *)(p) + sizeof(struct rpmsg_hdr))
/**
* enum rpmsg_ns_flags - dynamic name service announcement flags
diff --git a/lib/rpmsg/rpmsg_virtio.c open-amp/lib/rpmsg/rpmsg_virtio.c
index 17f2998..a525066 100644
--- a/lib/rpmsg/rpmsg_virtio.c
+++ open-amp/lib/rpmsg/rpmsg_virtio.c
@@ -140,6 +140,7 @@ static void *rpmsg_virtio_get_tx_buffer(struct rpmsg_virtio_device *rvdev,
data = rpmsg_virtio_shm_pool_get_buffer(rvdev->shpool,
RPMSG_BUFFER_SIZE);
*len = RPMSG_BUFFER_SIZE;
+ *idx = 0;
}
}
#endif /*!VIRTIO_SLAVE_ONLY*/
@@ -253,6 +254,118 @@ static int _rpmsg_virtio_get_buffer_size(struct rpmsg_virtio_device *rvdev)
return length;
}
+static void rpmsg_virtio_hold_rx_buffer(struct rpmsg_device *rdev, void *rxbuf)
+{
+ struct rpmsg_hdr *rp_hdr;
+
+ rp_hdr = RPMSG_LOCATE_HDR(rxbuf);
+
+ /* Set held status to keep buffer */
+ rp_hdr->reserved = RPMSG_BUF_HELD;
+}
+
+static void rpmsg_virtio_release_rx_buffer(struct rpmsg_device *rdev, void *rxbuf)
+{
+ struct rpmsg_virtio_device *rvdev;
+ struct rpmsg_hdr *rp_hdr;
+ uint16_t idx;
+ uint32_t len;
+
+ rvdev = metal_container_of(rdev, struct rpmsg_virtio_device, rdev);
+ rp_hdr = RPMSG_LOCATE_HDR(rxbuf);
+ /* The reserved field contains buffer index */
+ idx = rp_hdr->reserved;
+
+ metal_mutex_acquire(&rdev->lock);
+ /* Return buffer on virtqueue. */
+ len = virtqueue_get_buffer_length(rvdev->rvq, idx);
+ rpmsg_virtio_return_buffer(rvdev, rp_hdr, len, idx);
+ metal_mutex_release(&rdev->lock);
+}
+
+static void *rpmsg_virtio_get_tx_payload_buffer(struct rpmsg_device *rdev,
+ uint32_t *len, int wait)
+{
+ struct rpmsg_virtio_device *rvdev;
+ struct rpmsg_hdr *rp_hdr;
+ uint16_t idx;
+ int tick_count;
+
+ /* Get the associated remote device for channel. */
+ rvdev = metal_container_of(rdev, struct rpmsg_virtio_device, rdev);
+
+ if (wait)
+ tick_count = RPMSG_TICK_COUNT / RPMSG_TICKS_PER_INTERVAL;
+ else
+ tick_count = 0;
+
+ while (1) {
+ /* Lock the device to enable exclusive access to virtqueues */
+ metal_mutex_acquire(&rdev->lock);
+ rp_hdr = rpmsg_virtio_get_tx_buffer(rvdev, len, &idx);
+ metal_mutex_release(&rdev->lock);
+ if (rp_hdr || !tick_count)
+ break;
+ metal_sleep_usec(RPMSG_TICKS_PER_INTERVAL);
+ tick_count--;
+ }
+
+ if (!rp_hdr)
+ return NULL;
+
+ /* Store the index into the reserved field to be used when sending */
+ rp_hdr->reserved = idx;
+
+ /* Actual data buffer size is vring buffer size minus rpmsg header length */
+ *len -= sizeof(struct rpmsg_hdr);
+ return RPMSG_LOCATE_DATA(rp_hdr);
+}
+
+static int rpmsg_virtio_send_offchannel_nocopy(struct rpmsg_device *rdev,
+ uint32_t src, uint32_t dst,
+ const void *data, int len)
+{
+ struct rpmsg_virtio_device *rvdev;
+ struct rpmsg_hdr rp_hdr;
+ struct rpmsg_hdr *hdr;
+ uint16_t idx;
+ int status;
+ struct metal_io_region *io;
+
+ /* Get the associated remote device for channel. */
+ rvdev = metal_container_of(rdev, struct rpmsg_virtio_device, rdev);
+
+ hdr = RPMSG_LOCATE_HDR(data);
+ /* The reserved field contains buffer index */
+ idx = hdr->reserved;
+
+ /* Initialize RPMSG header. */
+ rp_hdr.dst = dst;
+ rp_hdr.src = src;
+ rp_hdr.len = len;
+ rp_hdr.reserved = 0;
+ rp_hdr.flags = 0;
+
+ /* Copy data to rpmsg buffer. */
+ io = rvdev->shbuf_io;
+ status = metal_io_block_write(io, metal_io_virt_to_offset(io, hdr),
+ &rp_hdr, sizeof(rp_hdr));
+ RPMSG_ASSERT(status == sizeof(rp_hdr), "failed to write header\r\n");
+
+ metal_mutex_acquire(&rdev->lock);
+
+ /* Enqueue buffer on virtqueue. */
+ len = virtqueue_get_buffer_length(rvdev->svq, idx);
+ status = rpmsg_virtio_enqueue_buffer(rvdev, hdr, len, idx);
+ RPMSG_ASSERT(status == VQUEUE_SUCCESS, "failed to enqueue buffer\r\n");
+ /* Let the other side know that there is a job to process. */
+ virtqueue_kick(rvdev->svq);
+
+ metal_mutex_release(&rdev->lock);
+
+ return len;
+}
+
/**
* This function sends rpmsg "message" to remote device.
*
@@ -389,6 +502,8 @@ static void rpmsg_virtio_rx_callback(struct virtqueue *vq)
metal_mutex_release(&rdev->lock);
while (rp_hdr) {
+ rp_hdr->reserved = 0;
+
/* Get the channel node from the remote device channels list. */
metal_mutex_acquire(&rdev->lock);
ept = rpmsg_get_ept_from_addr(rdev, rp_hdr->dst);
@@ -411,8 +526,15 @@ static void rpmsg_virtio_rx_callback(struct virtqueue *vq)
metal_mutex_acquire(&rdev->lock);
- /* Return used buffers. */
- rpmsg_virtio_return_buffer(rvdev, rp_hdr, len, idx);
+ /* Check whether callback wants to hold buffer */
+ if (rp_hdr->reserved & RPMSG_BUF_HELD) {
+ /* 'rp_hdr->reserved' field is now used as storage for
+ * 'idx' to release buffer later */
+ rp_hdr->reserved = idx;
+ } else {
+ /* Return used buffers. */
+ rpmsg_virtio_return_buffer(rvdev, rp_hdr, len, idx);
+ }
rp_hdr = rpmsg_virtio_get_rx_buffer(rvdev, &len, &idx);
if (rp_hdr == NULL) {
@@ -522,6 +644,10 @@ int rpmsg_init_vdev(struct rpmsg_virtio_device *rvdev,
rvdev->vdev = vdev;
rdev->ns_bind_cb = ns_bind_cb;
vdev->priv = rvdev;
+ rdev->ops.hold_rx_buffer = rpmsg_virtio_hold_rx_buffer;
+ rdev->ops.release_rx_buffer = rpmsg_virtio_release_rx_buffer;
+ rdev->ops.get_tx_payload_buffer = rpmsg_virtio_get_tx_payload_buffer;
+ rdev->ops.send_offchannel_nocopy = rpmsg_virtio_send_offchannel_nocopy;
rdev->ops.send_offchannel_raw = rpmsg_virtio_send_offchannel_raw;
role = rpmsg_virtio_get_role(rvdev);
--
2.17.1