Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add uxr_run_session_until_confirm_delivery_one_stream API #285

Draft
wants to merge 6 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions include/uxr/client/core/session/session.h
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,15 @@ UXRDLLAPI uxrStreamId uxr_create_input_reliable_stream(
UXRDLLAPI void uxr_flash_output_streams(
uxrSession* session);

/**
* @brief Flashes one output stream seding the data through the transport.
* @param session A uxrSession structure previously initialized.
* @param stream_id A uxrStreamId structure previously initialized.
*/
UXRDLLAPI void uxr_flash_one_output_stream(
uxrSession* session,
const uxrStreamId stream_id);

/**
* @brief Keeps communication between the Client and the Agent.
* This function involves the following actions:
Expand Down Expand Up @@ -466,12 +475,29 @@ UXRDLLAPI bool uxr_run_session_until_timeout(
* The aforementioned actions will be performed in a loop until a the `timeout` is exceeded
* or the output reliable streams confirm the delivery of all their messages.
* @param session A uxrSession structure previously initialized.
* @param timeout The waiting time in milliseconds.
* @param timeout_ms The waiting time in milliseconds.
* @return `true` if all output reliable streams confirm the delivery of their messages. `false` in other case.
*/
UXRDLLAPI bool uxr_run_session_until_confirm_delivery(
uxrSession* session,
int timeout);
int timeout_ms);

/**
* @brief Keeps communication between the Client and the Agent.
* This function involves the following actions:
* 1. flushing one output streams sending the data through the transport,
* 2. listening messages from the Agent calling the associated callback (topic and status).
* The aforementioned actions will be performed in a loop until a the `timeout` is exceeded
* or the output reliable streams confirm the delivery of all their messages.
* @param session A uxrSession structure previously initialized.
* @param stream A uxrStreamId previously initialized.
* @param timeout_ms The waiting time in milliseconds.
* @return `true` if given output reliable stream confirms the delivery of his messages. `false` in other case.
*/
UXRDLLAPI bool uxr_run_session_until_confirm_delivery_one_stream(
uxrSession* session,
const uxrStreamId stream,
int timeout_ms);

/**
* @brief Keeps communication between the Client and the Agent.
Expand Down
88 changes: 85 additions & 3 deletions src/c/core/session/session.c
Original file line number Diff line number Diff line change
Expand Up @@ -389,18 +389,57 @@ bool uxr_run_session_until_confirm_delivery(

uxr_flash_output_streams(session);

bool timeout = false;
while (!uxr_output_streams_confirmed(&session->streams) && !timeout)
int64_t start_timestamp = uxr_millis();
int remaining_time = timeout_ms;

do
{
timeout = !listen_message_reliably(session, timeout_ms);
listen_message_reliably(session, remaining_time);
remaining_time = timeout_ms - (int)(uxr_millis() - start_timestamp);
}
while (remaining_time > 0 && !uxr_output_streams_confirmed(&session->streams));

bool ret = uxr_output_streams_confirmed(&session->streams);

UXR_UNLOCK_SESSION(session);
return ret;
}

bool uxr_run_session_until_confirm_delivery_one_stream(
uxrSession* session,
const uxrStreamId stream_id,
int timeout_ms)
{
if (stream_id.direction != UXR_OUTPUT_STREAM ||
stream_id.type != UXR_RELIABLE_STREAM ||
stream_id.index >= session->streams.output_reliable_size)
{
return false;
}

UXR_LOCK_SESSION(session);

const uxrOutputReliableStream* stream =
&session->streams.output_reliable[stream_id.index];

uxr_flash_one_output_stream(session, stream_id);

int64_t start_timestamp = uxr_millis();
int remaining_time = timeout_ms;

do
{
listen_message_reliably(session, remaining_time);
remaining_time = timeout_ms - (int)(uxr_millis() - start_timestamp);
}
while (remaining_time > 0 && !uxr_output_one_stream_confirmed(stream));

bool ret = uxr_output_one_stream_confirmed(stream);

UXR_UNLOCK_SESSION(session);
return ret;
}

bool uxr_run_session_until_all_status(
uxrSession* session,
int timeout_ms,
Expand Down Expand Up @@ -604,6 +643,49 @@ void uxr_flash_output_streams(
}
}

void uxr_flash_one_output_stream(
uxrSession* session,
const uxrStreamId stream_id)
{
UXR_HANDLE_SHARED_MEMORY();

if (stream_id.direction == UXR_OUTPUT_STREAM)
{
if (stream_id.type == UXR_BEST_EFFORT_STREAM)
{
uxrOutputBestEffortStream* stream = &session->streams.output_best_effort[stream_id.index];

uint8_t* buffer; size_t length; uxrSeqNum seq_num;

UXR_LOCK_STREAM_ID(session, stream_id);

if (uxr_prepare_best_effort_buffer_to_send(stream, &buffer, &length, &seq_num))
{
uxr_stamp_session_header(&session->info, stream_id.raw, seq_num, buffer);
send_message(session, buffer, length);
}

UXR_UNLOCK_STREAM_ID(session, stream_id);
}
else if (stream_id.type == UXR_RELIABLE_STREAM)
{
uxrOutputReliableStream* stream = &session->streams.output_reliable[stream_id.index];

uint8_t* buffer; size_t length; uxrSeqNum seq_num;

UXR_LOCK_STREAM_ID(session, stream_id);

while (uxr_prepare_next_reliable_buffer_to_send(stream, &buffer, &length, &seq_num))
{
uxr_stamp_session_header(&session->info, stream_id.raw, seq_num, buffer);
send_message(session, buffer, length);
}

UXR_UNLOCK_STREAM_ID(session, stream_id);
}
}
}

//==================================================================
// PRIVATE
//==================================================================
Expand Down
10 changes: 10 additions & 0 deletions src/c/core/session/stream/stream_storage.c
Original file line number Diff line number Diff line change
Expand Up @@ -148,3 +148,13 @@ bool uxr_output_streams_confirmed(
}
return up_to_date;
}

bool uxr_output_one_stream_confirmed(
const uxrOutputReliableStream* stream)
{
bool up_to_date = true;
UXR_LOCK((uxrMutex*) &stream->mutex);
up_to_date = uxr_is_output_up_to_date(stream);
UXR_UNLOCK((uxrMutex*) &stream->mutex);
return up_to_date;
}
3 changes: 3 additions & 0 deletions src/c/core/session/stream/stream_storage_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ uxrInputReliableStream* uxr_get_input_reliable_stream(
bool uxr_output_streams_confirmed(
const uxrStreamStorage* storage);

bool uxr_output_one_stream_confirmed(
const uxrOutputReliableStream* storage);

#ifdef __cplusplus
}
#endif // ifdef __cplusplus
Expand Down