Skip to content

Commit

Permalink
Framed transport header data stored in MessageBuffer
Browse files Browse the repository at this point in the history
Signed-off-by: Cervenka Dusan <[email protected]>
  • Loading branch information
Hadatko committed Aug 28, 2023
1 parent 26bd600 commit 4f2a982
Show file tree
Hide file tree
Showing 14 changed files with 137 additions and 60 deletions.
7 changes: 4 additions & 3 deletions erpc_c/infra/erpc_client_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ void ClientManager::verifyReply(RequestContext &request)

// Some transport layers change the request's message buffer pointer (for things like zero
// copy support), so inCodec must be reset to work with correct buffer.
request.getCodec()->reset();
request.getCodec()->reset(m_transport->reserveHeaderSize());

// Extract the reply header.
request.getCodec()->startReadMessage(msgType, service, requestNumber, sequence);
Expand All @@ -186,13 +186,14 @@ Codec *ClientManager::createBufferAndCodec(void)
{
Codec *codec = m_codecFactory->create();
MessageBuffer message;
uint8_t reservedMessageSpace = m_transport->reserveHeaderSize();

if (codec != NULL)
{
message = m_messageFactory->create();
message = m_messageFactory->create(reservedMessageSpace);
if (NULL != message.get())
{
codec->setBuffer(message);
codec->setBuffer(message, reservedMessageSpace);
}
else
{
Expand Down
4 changes: 2 additions & 2 deletions erpc_c/infra/erpc_client_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ class ClientManager : public ClientServerCommon
#endif

protected:
uint32_t m_sequence; //!< Sequence number.
client_error_handler_t m_errorHandler; //!< Pointer to function error handler.
uint32_t m_sequence; //!< Sequence number.
client_error_handler_t m_errorHandler; //!< Pointer to function error handler.
#if ERPC_NESTED_CALLS
Server *m_server; //!< Server used for nested calls.
Thread::thread_id_t m_serverThreadId; //!< Thread in which server run function is called.
Expand Down
22 changes: 12 additions & 10 deletions erpc_c/infra/erpc_codec.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ class Codec
* This function initializes object attributes.
*/
Codec(void)
: m_buffer()
, m_cursor()
: m_cursor()
, m_status(kErpcStatus_Success)
{
}
Expand All @@ -75,24 +74,28 @@ class Codec
*
* @return Pointer to used message buffer.
*/
MessageBuffer *getBuffer(void) { return &m_buffer; }
MessageBuffer *getBuffer(void) { return m_cursor.getBuffer(); }

/*!
* @brief Prototype for set message buffer used for read and write data.
*
* @param[in] buf Message buffer to set.
* @param[in] skip How many bytes to skip from reading.
*/
virtual void setBuffer(MessageBuffer &buf)
virtual void setBuffer(MessageBuffer &buf, uint8_t skip = 0)
{
m_buffer = buf;
m_cursor.set(&m_buffer);
m_cursor.setBuffer(&buf, skip);
m_status = kErpcStatus_Success;
}

/*! @brief Reset the codec to initial state. */
virtual void reset(void)
/*!
* @brief Reset the codec to initial state.
*
* @param[in] skip How many bytes to skip from reading.
*/
virtual void reset(uint8_t skip = 0)
{
m_cursor.set(&m_buffer);
m_cursor.setBuffer(m_cursor.getBuffer(), skip);
m_status = kErpcStatus_Success;
}

Expand Down Expand Up @@ -428,7 +431,6 @@ class Codec
virtual void readCallback(funPtr callbacks1, funPtr *callback2) = 0;

protected:
MessageBuffer m_buffer; /*!< Message buffer object */
MessageBuffer::Cursor m_cursor; /*!< Copy data to message buffers. */
erpc_status_t m_status; /*!< Status of serialized data. */
};
Expand Down
70 changes: 48 additions & 22 deletions erpc_c/infra/erpc_framed_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ FramedTransport::FramedTransport(void)

FramedTransport::~FramedTransport(void) {}

uint8_t FramedTransport::reserveHeaderSize(void)
{
return sizeof(FramedTransport::Header::m_crcHeader) + sizeof(FramedTransport::Header::m_messageSize) +
sizeof(FramedTransport::Header::m_crcBody);
}

void FramedTransport::setCrc16(Crc16 *crcImpl)
{
erpc_assert(crcImpl);
Expand All @@ -47,36 +53,51 @@ Crc16 *FramedTransport::getCrc16(void)

erpc_status_t FramedTransport::receive(MessageBuffer *message)
{
Header h;
Header h = { 0, 0, 0 };
erpc_status_t retVal;
uint16_t computedCrc;
uint8_t offset = 0;

erpc_assert((m_crcImpl != NULL) && ("Uninitialized Crc16 object." != NULL));

if (message->getLength() < reserveHeaderSize())
{
retVal = kErpcStatus_MemoryError;
}
else
{
#if !ERPC_THREADS_IS(NONE)
Mutex::Guard lock(m_receiveLock);
#endif

// Receive header first.
retVal = underlyingReceive((uint8_t *)&h, sizeof(h));
retVal = underlyingReceive(message->get(), sizeof(h));
static_cast<void>(memcpy(&h.m_crcHeader, message->get(), sizeof(h.m_crcHeader)));
offset = sizeof(h.m_crcHeader);
static_cast<void>(memcpy(&h.m_messageSize, &message->get()[offset], sizeof(h.m_messageSize)));
offset += sizeof(h.m_messageSize);
static_cast<void>(memcpy(&h.m_crcBody, &message->get()[offset], sizeof(h.m_crcBody)));
offset += sizeof(h.m_crcBody);

if (retVal == kErpcStatus_Success)
{
ERPC_READ_AGNOSTIC_16(h.m_crcHeader);
ERPC_READ_AGNOSTIC_16(h.m_messageSize);
ERPC_READ_AGNOSTIC_16(h.m_crc);
ERPC_READ_AGNOSTIC_16(h.m_crcBody);

// received size can't be zero.
if (h.m_messageSize == 0U)
computedCrc =
m_crcImpl->computeCRC16(reinterpret_cast<const uint8_t *>(&h.m_messageSize), sizeof(h.m_messageSize)) +
m_crcImpl->computeCRC16(reinterpret_cast<const uint8_t *>(&h.m_crcBody), sizeof(h.m_crcBody));
if (computedCrc != h.m_crcHeader)
{
retVal = kErpcStatus_ReceiveFailed;
retVal = kErpcStatus_CrcCheckFailed;
}
}

if (retVal == kErpcStatus_Success)
{
// received size can't be larger then buffer length.
if (h.m_messageSize > message->getLength())
if ((h.m_messageSize + reserveHeaderSize()) > message->getLength())
{
retVal = kErpcStatus_ReceiveFailed;
}
Expand All @@ -85,17 +106,17 @@ erpc_status_t FramedTransport::receive(MessageBuffer *message)
if (retVal == kErpcStatus_Success)
{
// Receive rest of the message now we know its size.
retVal = underlyingReceive(message->get(), h.m_messageSize);
retVal = underlyingReceive(&message->get()[offset], h.m_messageSize);
}
}

if (retVal == kErpcStatus_Success)
{
// Verify CRC.
computedCrc = m_crcImpl->computeCRC16(message->get(), h.m_messageSize);
if (computedCrc == h.m_crc)
computedCrc = m_crcImpl->computeCRC16(&message->get()[offset], h.m_messageSize);
if (computedCrc == h.m_crcBody)
{
message->setUsed(h.m_messageSize);
message->setUsed(h.m_messageSize + reserveHeaderSize());
}
else
{
Expand All @@ -111,27 +132,32 @@ erpc_status_t FramedTransport::send(MessageBuffer *message)
erpc_status_t ret;
uint16_t messageLength;
Header h;
uint8_t offset;

erpc_assert((m_crcImpl != NULL) && ("Uninitialized Crc16 object." != NULL));

#if !ERPC_THREADS_IS(NONE)
Mutex::Guard lock(m_sendLock);
#endif

messageLength = message->getUsed();
messageLength = message->getUsed() - reserveHeaderSize();

// Send header first.
h.m_messageSize = messageLength;
h.m_crc = m_crcImpl->computeCRC16(message->get(), messageLength);
h.m_crcBody = m_crcImpl->computeCRC16(message->get(), messageLength);
h.m_crcHeader =
m_crcImpl->computeCRC16(reinterpret_cast<const uint8_t *>(&h.m_messageSize), sizeof(h.m_messageSize)) +
m_crcImpl->computeCRC16(reinterpret_cast<const uint8_t *>(&h.m_crcBody), sizeof(h.m_crcBody));

ERPC_WRITE_AGNOSTIC_16(h.m_crcHeader);
ERPC_WRITE_AGNOSTIC_16(h.m_messageSize);
ERPC_WRITE_AGNOSTIC_16(h.m_crc);

ret = underlyingSend((uint8_t *)&h, sizeof(h));
if (ret == kErpcStatus_Success)
{
ret = underlyingSend(message->get(), messageLength);
}
static_cast<void>(memcpy(message->get(), reinterpret_cast<const uint8_t *>(&h.m_crcHeader), sizeof(h.m_crcHeader)));
offset = sizeof(h.m_crcHeader);
static_cast<void>(
memcpy(&message->get()[offset], reinterpret_cast<const uint8_t *>(&h.m_messageSize), sizeof(h.m_messageSize)));
offset += sizeof(h.m_messageSize);
static_cast<void>(
memcpy(&message->get()[offset], reinterpret_cast<const uint8_t *>(&h.m_crcBody), sizeof(h.m_crcBody)));

ret = underlyingSend(message->get(), message->getUsed());

return ret;
}
22 changes: 15 additions & 7 deletions erpc_c/infra/erpc_framed_transport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ namespace erpc {
class FramedTransport : public Transport
{
public:
/*! @brief Contents of the header that prefixes each message. */
struct Header
{
uint16_t m_crcHeader; //!< CRC-16 over this header structure data
uint16_t m_messageSize; //!< Size in bytes of the message, excluding the header.
uint16_t m_crcBody; //!< CRC-16 over the message data.
};

/*!
* @brief Constructor.
*/
Expand All @@ -66,6 +74,13 @@ class FramedTransport : public Transport
*/
virtual ~FramedTransport(void);

/**
* @brief Size of data placed in MessageBuffer before serializing eRPC data.
*
* @return uint8_t Amount of bytes, reserved before serialized data.
*/
virtual uint8_t reserveHeaderSize(void) override;

/*!
* @brief Receives an entire message.
*
Expand Down Expand Up @@ -95,13 +110,6 @@ class FramedTransport : public Transport
*/
virtual erpc_status_t send(MessageBuffer *message) override;

/*! @brief Contents of the header that prefixes each message. */
struct Header
{
uint16_t m_messageSize; //!< Size in bytes of the message, excluding the header.
uint16_t m_crc; //!< CRC-16 over the message data.
};

/*!
* @brief This functions sets the CRC-16 implementation.
*
Expand Down
22 changes: 18 additions & 4 deletions erpc_c/infra/erpc_message_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,20 @@ void MessageBuffer::swap(MessageBuffer *other)
m_buf = temp.m_buf;
}

void MessageBuffer::Cursor::set(MessageBuffer *buffer)
void MessageBuffer::Cursor::setBuffer(MessageBuffer *buffer, uint8_t reserved)
{
erpc_assert(buffer != NULL);

m_buffer = buffer;
// RPMSG when nested calls are enabled can set NULL buffer.
// erpc_assert(buffer->get() && "Data buffer wasn't set to MessageBuffer.");
// receive function should return err if it couldn't set data buffer.
m_pos = buffer->get();
m_pos = buffer->get() + reserved;
}

MessageBuffer *MessageBuffer::Cursor::getBuffer(void)
{
return m_buffer;
}

uint8_t &MessageBuffer::Cursor::operator[](int index)
Expand Down Expand Up @@ -221,8 +226,17 @@ erpc_status_t MessageBuffer::Cursor::write(const void *data, uint32_t length)
return err;
}

erpc_status_t MessageBufferFactory::prepareServerBufferForSend(MessageBuffer *message)
MessageBuffer MessageBufferFactory::create(uint8_t reserveHeaderSize)
{
MessageBuffer messageBuffer = create();

messageBuffer.setUsed(reserveHeaderSize);

return messageBuffer;
}

erpc_status_t MessageBufferFactory::prepareServerBufferForSend(MessageBuffer *message, uint8_t reserveHeaderSize)
{
message->setUsed(0);
message->setUsed(reserveHeaderSize);
return kErpcStatus_Success;
}
22 changes: 20 additions & 2 deletions erpc_c/infra/erpc_message_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,14 @@ class MessageBuffer
* @brief Set message buffer.
*
* @param[in] buffer Message buffer to set.
* @param[in] reserved Moved cursor position outside of reserved memory.
*/
void set(MessageBuffer *buffer);
void setBuffer(MessageBuffer *buffer, uint8_t reserved = 0);

/*!
* @brief Get message buffer.
*/
MessageBuffer *getBuffer(void);

/*!
* @brief Return position in buffer.
Expand Down Expand Up @@ -368,6 +374,17 @@ class MessageBufferFactory
*/
virtual MessageBuffer create(void) = 0;

/*!
* @brief This function creates new message buffer with reserved bytes at the beginning
*
* Reserved bytes can be used by transport to write transport related header file data.
*
* @param[in] reserveHeaderSize Reserved amount of bytes at the beginning of message buffer.
*
* @return New created MessageBuffer.
*/
MessageBuffer create(uint8_t reserveHeaderSize);

/*!
* @brief This function informs server if it has to create buffer for received message.
*
Expand All @@ -382,8 +399,9 @@ class MessageBufferFactory
* In case of using new buffer function has to free given buffer.
*
* @param[in] message MessageBuffer which can be reused.
* @param[in] reserveHeaderSize Reserved amount of bytes at the beginning of message buffer.
*/
virtual erpc_status_t prepareServerBufferForSend(MessageBuffer *message);
virtual erpc_status_t prepareServerBufferForSend(MessageBuffer *message, uint8_t reserveHeaderSize = 0);

/*!
* @brief This function disposes message buffer.
Expand Down
2 changes: 1 addition & 1 deletion erpc_c/infra/erpc_simple_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ erpc_status_t SimpleServer::runInternalBegin(Codec **codec, MessageBuffer &buff,

if (err == kErpcStatus_Success)
{
(*codec)->setBuffer(buff);
(*codec)->setBuffer(buff, m_transport->reserveHeaderSize());

err = readHeadOfMessage(*codec, msgType, serviceId, methodId, sequence);
if (err != kErpcStatus_Success)
Expand Down
7 changes: 7 additions & 0 deletions erpc_c/infra/erpc_transport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ class Transport
*/
virtual ~Transport(void) {}

/**
* @brief Size of data placed in MessageBuffer before serializing eRPC data.
*
* @return uint8_t Amount of bytes, reserved before serialized data.
*/
virtual uint8_t reserveHeaderSize(void) { return 0; }

/*!
* @brief Prototype for receiving message.
*
Expand Down
3 changes: 2 additions & 1 deletion erpc_c/setup/erpc_arbitrated_client_setup.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ extern "C" {
*
* @return erpc_client_t Pointer to client structure.
*/
erpc_client_t erpc_arbitrated_client_init(erpc_transport_t transport, erpc_mbf_t message_buffer_factory, erpc_transport_t *arbitrator);
erpc_client_t erpc_arbitrated_client_init(erpc_transport_t transport, erpc_mbf_t message_buffer_factory,
erpc_transport_t *arbitrator);

/*!
* @brief This function sets error handler function.
Expand Down
Loading

0 comments on commit 4f2a982

Please sign in to comment.