Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RTI and federate socket fixes #422

Merged
merged 4 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions core/federated/RTI/rti_remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -386,11 +386,10 @@ void handle_timed_message(federate_info_t* sending_federate, unsigned char* buff
// issue a TAG before this message has been forwarded.
LF_MUTEX_LOCK(&rti_mutex);

// If the destination federate is no longer connected, issue a warning
// and return.
// If the destination federate is no longer connected, issue a warning,
// remove the message from the socket and return.
federate_info_t* fed = GET_FED_INFO(federate_id);
if (fed->enclave.state == NOT_CONNECTED) {
LF_MUTEX_UNLOCK(&rti_mutex);
lf_print_warning("RTI: Destination federate %d is no longer connected. Dropping message.", federate_id);
LF_PRINT_LOG("Fed status: next_event " PRINTF_TAG ", "
"completed " PRINTF_TAG ", "
Expand All @@ -401,6 +400,18 @@ void handle_timed_message(federate_info_t* sending_federate, unsigned char* buff
fed->enclave.last_granted.time - start_time, fed->enclave.last_granted.microstep,
fed->enclave.last_provisionally_granted.time - start_time,
fed->enclave.last_provisionally_granted.microstep);
// If the message was larger than the buffer, we must empty out the remainder also.
size_t total_bytes_read = bytes_read;
while (total_bytes_read < total_bytes_to_read) {
bytes_to_read = total_bytes_to_read - total_bytes_read;
if (bytes_to_read > FED_COM_BUFFER_SIZE) {
bytes_to_read = FED_COM_BUFFER_SIZE;
}
read_from_socket_fail_on_error(&sending_federate->socket, bytes_to_read, buffer, NULL,
erlingrj marked this conversation as resolved.
Show resolved Hide resolved
"RTI failed to clear message chunks.");
total_bytes_read += bytes_to_read;
}
LF_MUTEX_UNLOCK(&rti_mutex);
return;
}

Expand Down Expand Up @@ -1073,7 +1084,7 @@ void* federate_info_thread_TCP(void* fed) {
int read_failed = read_from_socket(my_fed->socket, 1, buffer);
if (read_failed) {
// Socket is closed
lf_print_warning("RTI: Socket to federate %d is closed. Exiting the thread.", my_fed->enclave.id);
lf_print_error("RTI: Socket to federate %d is closed. Exiting the thread.", my_fed->enclave.id);
my_fed->enclave.state = NOT_CONNECTED;
my_fed->socket = -1;
// FIXME: We need better error handling here, but do not stop execution here.
Expand Down
9 changes: 5 additions & 4 deletions core/federated/network/net_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -95,17 +95,17 @@ int read_from_socket(int socket, size_t num_bytes, unsigned char* buffer) {
return -1;
}
ssize_t bytes_read = 0;
int retry_count = 0;
while (bytes_read < (ssize_t)num_bytes) {
ssize_t more = read(socket, buffer + bytes_read, num_bytes - (size_t)bytes_read);
if (more < 0 && retry_count++ < NUM_SOCKET_RETRIES && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) {
if (more < 0 && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) {
// Those error codes set by the socket indicates
// that we should try again (@see man errno).
lf_print_warning("Reading from socket failed. Will try again.");
LF_PRINT_DEBUG("Reading from socket %d failed with error: `%s`. Will try again.", socket, strerror(errno));
lf_sleep(DELAY_BETWEEN_SOCKET_RETRIES);
continue;
} else if (more < 0) {
// A more serious error occurred.
lf_print_error("Reading from socket %d failed. With error: `%s`", socket, strerror(errno));
return -1;
} else if (more == 0) {
// EOF received.
Expand Down Expand Up @@ -173,11 +173,12 @@ int write_to_socket(int socket, size_t num_bytes, unsigned char* buffer) {
// The error codes EAGAIN or EWOULDBLOCK indicate
// that we should try again (@see man errno).
// The error code EINTR means the system call was interrupted before completing.
LF_PRINT_DEBUG("Writing to socket was blocked. Will try again.");
LF_PRINT_DEBUG("Writing to socket %d was blocked. Will try again.", socket);
lf_sleep(DELAY_BETWEEN_SOCKET_RETRIES);
continue;
} else if (more < 0) {
// A more serious error occurred.
lf_print_error("Writing to socket %d failed. With error: `%s`", socket, strerror(errno));
return -1;
}
bytes_written += more;
Expand Down
Loading