Skip to content

Commit

Permalink
Enable non-blocking socket mode in coreMQTT transport
Browse files Browse the repository at this point in the history
  • Loading branch information
paulbartell committed Feb 1, 2024
1 parent d37fd63 commit c71acfd
Showing 1 changed file with 163 additions and 63 deletions.
226 changes: 163 additions & 63 deletions libraries/coreMQTT/port/network_transport/network_transport.c
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
#include "esp_err.h"
#include "freertos/FreeRTOS.h"
#include "freertos/projdefs.h"
#include "freertos/semphr.h"
#include <string.h>
#include "esp_log.h"
#include "esp_tls.h"
#include "sys/socket.h"
#include "network_transport.h"
#include "sdkconfig.h"

TlsTransportStatus_t xTlsConnect( NetworkContext_t* pxNetworkContext )
{
TlsTransportStatus_t xRet = TLS_TRANSPORT_SUCCESS;
TlsTransportStatus_t xResult = TLS_TRANSPORT_CONNECT_FAILURE;

esp_tls_cfg_t xEspTlsConfig = {
.cacert_buf = (const unsigned char*) ( pxNetworkContext->pcServerRootCA ),
Expand All @@ -21,99 +24,196 @@ TlsTransportStatus_t xTlsConnect( NetworkContext_t* pxNetworkContext )
.ds_data = pxNetworkContext->ds_data,
.clientkey_buf = ( const unsigned char* )( pxNetworkContext->pcClientKey ),
.clientkey_bytes = pxNetworkContext->pcClientKeySize,
.timeout_ms = 1000,
.timeout_ms = 2000,
.non_block = false,
};

esp_tls_t* pxTls = esp_tls_init();

xSemaphoreTake(pxNetworkContext->xTlsContextSemaphore, portMAX_DELAY);
pxNetworkContext->pxTls = pxTls;

if (esp_tls_conn_new_sync( pxNetworkContext->pcHostname,
strlen( pxNetworkContext->pcHostname ),
pxNetworkContext->xPort,
&xEspTlsConfig, pxTls) <= 0)
if( xSemaphoreTake(pxNetworkContext->xTlsContextSemaphore, portMAX_DELAY) == pdTRUE )
{
if (pxNetworkContext->pxTls)
int lConnectResult = -1;
esp_tls_t * pxTls = esp_tls_init();

if( pxTls != NULL )
{
esp_tls_conn_destroy(pxNetworkContext->pxTls);
pxNetworkContext->pxTls = NULL;
pxNetworkContext->pxTls = pxTls;

lConnectResult = esp_tls_conn_new_sync( pxNetworkContext->pcHostname,
strlen( pxNetworkContext->pcHostname ),
pxNetworkContext->xPort,
&xEspTlsConfig, pxTls );

if( lConnectResult == 1 )
{

int lSockFd = -1;
if( esp_tls_get_conn_sockfd(pxNetworkContext->pxTls, &lSockFd) == ESP_OK)
{
int flags = fcntl(lSockFd, F_GETFL);

if( fcntl(lSockFd, F_SETFL, flags | O_NONBLOCK ) != -1)
{
xResult = TLS_TRANSPORT_SUCCESS;
}
}
}

if( xResult != TLS_TRANSPORT_SUCCESS )
{
esp_tls_conn_destroy( pxNetworkContext->pxTls );
pxNetworkContext->pxTls = NULL;
}
}
xRet = TLS_TRANSPORT_CONNECT_FAILURE;
( void ) xSemaphoreGive( pxNetworkContext->xTlsContextSemaphore );
}

xSemaphoreGive(pxNetworkContext->xTlsContextSemaphore);

return xRet;
return xResult;
}

TlsTransportStatus_t xTlsDisconnect( NetworkContext_t* pxNetworkContext )
{
BaseType_t xRet = TLS_TRANSPORT_SUCCESS;
BaseType_t xResult;

xSemaphoreTake(pxNetworkContext->xTlsContextSemaphore, portMAX_DELAY);
if (pxNetworkContext->pxTls != NULL &&
esp_tls_conn_destroy(pxNetworkContext->pxTls) < 0)
if( xSemaphoreTake(pxNetworkContext->xTlsContextSemaphore, portMAX_DELAY ) == pdTRUE )
{
xRet = TLS_TRANSPORT_DISCONNECT_FAILURE;
if( pxNetworkContext->pxTls == NULL )
{
xResult = TLS_TRANSPORT_SUCCESS;
}
else if(esp_tls_conn_destroy(pxNetworkContext->pxTls ) == 0)
{
xResult = TLS_TRANSPORT_SUCCESS;
}
else
{
xResult = TLS_TRANSPORT_DISCONNECT_FAILURE;
}

( void ) xSemaphoreGive( pxNetworkContext->xTlsContextSemaphore );
}
else
{
xResult = TLS_TRANSPORT_DISCONNECT_FAILURE;
}
pxNetworkContext->pxTls = NULL;
xSemaphoreGive(pxNetworkContext->xTlsContextSemaphore);

return xRet;
return xResult;
}

int32_t espTlsTransportSend(NetworkContext_t* pxNetworkContext,
const void* pvData, size_t uxDataLen)
int32_t espTlsTransportSend( NetworkContext_t* pxNetworkContext,
const void* pvData, size_t uxDataLen)
{
if (pvData == NULL || uxDataLen == 0)
{
return -1;
}

int32_t lBytesSent = 0;
int lSockFd = -1;
int32_t lBytesSent = -1;

if(pxNetworkContext != NULL && pxNetworkContext->pxTls != NULL)
if( ( pvData != NULL ) &&
( uxDataLen > 0 ) &&
( pxNetworkContext != NULL ) &&
( pxNetworkContext->pxTls != NULL ) )
{
xSemaphoreTake(pxNetworkContext->xTlsContextSemaphore, portMAX_DELAY);
lBytesSent = esp_tls_conn_write(pxNetworkContext->pxTls, pvData, uxDataLen);
xSemaphoreGive(pxNetworkContext->xTlsContextSemaphore);
}
else
{
lBytesSent = -1;
TimeOut_t xTimeout;
TickType_t xTicksToWait = pdMS_TO_TICKS(10);

vTaskSetTimeOutState( &xTimeout );

if( xSemaphoreTake( pxNetworkContext->xTlsContextSemaphore, xTicksToWait ) == pdTRUE )
{
esp_err_t xError = esp_tls_get_conn_sockfd( pxNetworkContext->pxTls, &lSockFd );
if( xError == ESP_OK)
{
struct timeval timeout = { .tv_usec = 10000, .tv_sec = 0 };
lBytesSent = 0;
do
{
fd_set write_fds;
fd_set error_fds;
int lSelectResult = -1;

FD_ZERO( &write_fds );
FD_SET( lSockFd, &write_fds );

FD_ZERO( &error_fds );
FD_SET( lSockFd, &error_fds );

lSelectResult = select( lSockFd + 1, NULL, &write_fds, &error_fds, &timeout );

if( lSelectResult < 0 )
{
lBytesSent = -1;
break;
}
else if( lSelectResult > 0 )
{
ssize_t lResult = esp_tls_conn_write( pxNetworkContext->pxTls, pvData, uxDataLen );

if( lResult > 0 )
{
lBytesSent += ( int32_t ) lResult;
}
else if( ( lResult != MBEDTLS_ERR_SSL_WANT_WRITE ) &&
( lResult != MBEDTLS_ERR_SSL_WANT_READ ) )
{
lBytesSent = lResult;
}
else
{
/* Empty when lResult == 0 */
}

if( ( lBytesSent < 0 ) ||
( lBytesSent == uxDataLen ) )
{
break;
}
}
else
{
/* Empty when lSelectResult == 0 */
}
}
while( xTaskCheckForTimeOut( &xTimeout, &xTicksToWait ) == pdFALSE );
}
xSemaphoreGive(pxNetworkContext->xTlsContextSemaphore);
}
}

return lBytesSent;
}

int32_t espTlsTransportRecv(NetworkContext_t* pxNetworkContext,
void* pvData, size_t uxDataLen)
int32_t espTlsTransportRecv( NetworkContext_t* pxNetworkContext,
void* pvData, size_t uxDataLen)
{
if (pvData == NULL || uxDataLen == 0)
{
return -1;
}
int32_t lBytesRead = 0;
if(pxNetworkContext != NULL && pxNetworkContext->pxTls != NULL)
int32_t lBytesRead;

if( ( pvData != NULL ) &&
( uxDataLen > 0 ) &&
( pxNetworkContext != NULL ) &&
( pxNetworkContext->pxTls != NULL ) )
{
xSemaphoreTake(pxNetworkContext->xTlsContextSemaphore, portMAX_DELAY);
lBytesRead = esp_tls_conn_read(pxNetworkContext->pxTls, pvData, uxDataLen);
xSemaphoreGive(pxNetworkContext->xTlsContextSemaphore);
if( xSemaphoreTake( pxNetworkContext->xTlsContextSemaphore, portMAX_DELAY ) == pdTRUE )
{
ssize_t lResult = esp_tls_conn_read( pxNetworkContext->pxTls, pvData, ( size_t ) uxDataLen );

if( lResult > 0 )
{
lBytesRead = ( int32_t ) lResult;
}
else if( ( lResult == ESP_TLS_ERR_SSL_WANT_READ ) ||
( lResult == ESP_TLS_ERR_SSL_WANT_WRITE ) )
{
lBytesRead = 0;
}
else
{
lBytesRead = -1;
}

( void ) xSemaphoreGive( pxNetworkContext->xTlsContextSemaphore);
}
}
else
{
return -1; /* pxNetworkContext or pxTls uninitialised */
}
if (lBytesRead == ESP_TLS_ERR_SSL_WANT_WRITE || lBytesRead == ESP_TLS_ERR_SSL_WANT_READ) {
return 0;
}
if (lBytesRead < 0) {
return lBytesRead;
}
if (lBytesRead == 0) {
/* Connection closed */
return -1;
lBytesRead = -1;
}

return lBytesRead;
}

0 comments on commit c71acfd

Please sign in to comment.