diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md index ec487e7c..60a9d67c 100644 --- a/docs/ARCHITECTURE.md +++ b/docs/ARCHITECTURE.md @@ -63,16 +63,20 @@ simulation, we relay on the following: and a `Listener` that propagates this signal. 2. The (`Trigger`, `Listener`) pair are used with channels: if a channel errors out across `send()` or `recv()`, shutdown is triggered. There is - no reliance on channel mechanics, i.e. errors generated when all senders - are and/or a receiver is dropped. + no reliance on channel mechanics, i.e. that receiving channels will error + out when all of their sending channels are dropped. 3. All events are handled in a `tokio::select` to allow waiting on multiple asynchronous tasks at once. These selects should be `biased` on the exit case (ie, the `Listener` being triggered) so that we prioritize exit above generating more events. 4. Additionally, we `select!` on shutdown signal on `send()`/`recv()` -for all channels to guarantee this: - - A task's receiver exiting while one or more corresponding senders - (in different tasks) are actively sending, doesn't result in the - sending tasks erroring due to channel `SendError`. Any sender's - inability to `send()` due to a dropped receiver triggers a clean - shutdown across all listening tasks. +for all channels to guarantee shutdown: + - When the signal to shutdown is received, it is possible that a + task responsible for consuming on the `Receiver` channel exits when + multiple tasks are still attempting to send to it. + - By using `select` with all `send()` instructions, we ensure that + the senders will exit cleanly, rather than block waiting on a + receiver that has already exited to consume its send. + - An alternative to this approach would be to use `receiver.close()` + and drain all items from the channel (resulting in unblocking the + senders). diff --git a/sim-lib/src/lib.rs b/sim-lib/src/lib.rs index 2d20568b..9a7649c8 100644 --- a/sim-lib/src/lib.rs +++ b/sim-lib/src/lib.rs @@ -1053,8 +1053,17 @@ async fn consume_events( } }; - if sender.send(outcome.clone()).await.is_err() { - return Err(SimulationError::MpscChannelError(format!("Error sending simulation output {outcome:?}."))); + select!{ + biased; + _ = listener.clone() => { + return Ok(()) + } + send_result = sender.send(outcome.clone()) => { + if send_result.is_err() { + return Err(SimulationError::MpscChannelError( + format!("Error sending simulation output {outcome:?}."))); + } + } } } } @@ -1118,8 +1127,17 @@ async fn produce_events {destination}."))); + select!{ + biased; + _ = listener.clone() => { + return Ok(()); + }, + send_result = sender.send(event.clone()) => { + if send_result.is_err(){ + return Err(SimulationError::MpscChannelError( + format!("Stopped activity producer for {amount}: {source} -> {destination}."))); + } + }, } current_count += 1; @@ -1314,10 +1332,18 @@ async fn produce_simulation_results( } }, SimulationOutput::SendPaymentFailure(payment, result) => { - if results.send((payment, result.clone())).await.is_err() { - break Err(SimulationError::MpscChannelError( - format!("Failed to send payment result: {result} for payment {:?} dispatched at {:?}.", payment.hash, payment.dispatch_time), - )); + select!{ + _ = listener.clone() => { + return Ok(()); + }, + send_result = results.send((payment, result.clone())) => { + if send_result.is_err(){ + break Err(SimulationError::MpscChannelError( + format!("Failed to send payment result: {result} for payment {:?} dispatched at {:?}.", + payment.hash, payment.dispatch_time), + )); + } + }, } } }; @@ -1384,7 +1410,8 @@ async fn track_payment_result( }, send_payment_result = results.send((payment, res.clone())) => { if send_payment_result.is_err() { - return Err(SimulationError::MpscChannelError(format!("Failed to send payment result {res} for payment {payment}."))) + return Err(SimulationError::MpscChannelError( + format!("Failed to send payment result {res} for payment {payment}."))) } } }