Skip to content

Commit

Permalink
fix(comms/messaging): fix possible deadlock in outbound pipeline (#4657)
Browse files Browse the repository at this point in the history
Description
---
- Fixes possible rare deadlock when broadcasting many messages due to internal channel
- Reduce number of inbound and outbound pipeline workers
- Greatly reduce buffer size between inbound messaging and the inbound pipeline to allow for substream backpressure
- Adds "last resort" timeout in outbound pipeline

Motivation and Context
---

The outbound pipeline could deadlock when all pipeline workers are busy, and the outbound sink service is full, causing the pipeline to wait for both a free executor slot and a free slot to send on the channel

How Has This Been Tested?
---
Memorynet, Manually: wallet stress tests (2 x wallets, 2 x base nodes), checked SAF message exchange
  • Loading branch information
sdbondi authored Sep 12, 2022
1 parent c01471a commit 3fcc6a0
Show file tree
Hide file tree
Showing 24 changed files with 166 additions and 139 deletions.
7 changes: 2 additions & 5 deletions base_layer/p2p/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,6 @@ pub struct P2pConfig {
/// The maximum number of concurrent outbound tasks allowed before back-pressure is applied to outbound messaging
/// queue
pub max_concurrent_outbound_tasks: usize,
/// The size of the buffer (channel) which holds pending outbound message requests
pub outbound_buffer_size: usize,
/// Configuration for DHT
pub dht: DhtConfig,
/// Set to true to allow peers to provide test addresses (loopback, memory etc.). If set to false, memory
Expand Down Expand Up @@ -131,9 +129,8 @@ impl Default for P2pConfig {
transport: Default::default(),
datastore_path: PathBuf::from("peer_db"),
peer_database_name: "peers".to_string(),
max_concurrent_inbound_tasks: 50,
max_concurrent_outbound_tasks: 100,
outbound_buffer_size: 100,
max_concurrent_inbound_tasks: 4,
max_concurrent_outbound_tasks: 4,
dht: DhtConfig {
database_url: DbConnectionUrl::file("dht.sqlite"),
..Default::default()
Expand Down
4 changes: 1 addition & 3 deletions base_layer/p2p/src/initialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,6 @@ pub async fn initialize_local_test_comms<P: AsRef<Path>>(
let dht_outbound_layer = dht.outbound_middleware_layer();
let (event_sender, _) = broadcast::channel(100);
let pipeline = pipeline::Builder::new()
.outbound_buffer_size(10)
.with_outbound_pipeline(outbound_rx, |sink| {
ServiceBuilder::new().layer(dht_outbound_layer).service(sink)
})
Expand Down Expand Up @@ -333,7 +332,7 @@ async fn configure_comms_and_dht(
let node_identity = comms.node_identity();
let shutdown_signal = comms.shutdown_signal();
// Create outbound channel
let (outbound_tx, outbound_rx) = mpsc::channel(config.outbound_buffer_size);
let (outbound_tx, outbound_rx) = mpsc::channel(config.dht.outbound_buffer_size);

let mut dht = Dht::builder();
dht.with_config(config.dht.clone()).with_outbound_sender(outbound_tx);
Expand All @@ -350,7 +349,6 @@ async fn configure_comms_and_dht(

// Hook up DHT messaging middlewares
let messaging_pipeline = pipeline::Builder::new()
.outbound_buffer_size(config.outbound_buffer_size)
.with_outbound_pipeline(outbound_rx, |sink| {
ServiceBuilder::new().layer(dht_outbound_layer).service(sink)
})
Expand Down
1 change: 0 additions & 1 deletion base_layer/wallet/tests/contacts_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ pub fn setup_contacts_service<T: ContactsBackend + 'static>(
peer_database_name: random::string(8),
max_concurrent_inbound_tasks: 10,
max_concurrent_outbound_tasks: 10,
outbound_buffer_size: 100,
dht: DhtConfig {
discovery_request_timeout: Duration::from_secs(1),
auto_join: true,
Expand Down
2 changes: 0 additions & 2 deletions base_layer/wallet/tests/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ async fn create_wallet(
peer_database_name: random::string(8),
max_concurrent_inbound_tasks: 10,
max_concurrent_outbound_tasks: 10,
outbound_buffer_size: 100,
dht: DhtConfig {
discovery_request_timeout: Duration::from_secs(1),
auto_join: true,
Expand Down Expand Up @@ -672,7 +671,6 @@ async fn test_import_utxo() {
peer_database_name: random::string(8),
max_concurrent_inbound_tasks: 10,
max_concurrent_outbound_tasks: 10,
outbound_buffer_size: 10,
dht: Default::default(),
allow_test_addresses: true,
listener_liveness_allowlist_cidrs: StringList::new(),
Expand Down
1 change: 0 additions & 1 deletion base_layer/wallet_ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3899,7 +3899,6 @@ pub unsafe extern "C" fn comms_config_create(
peer_database_name: database_name_string,
max_concurrent_inbound_tasks: 25,
max_concurrent_outbound_tasks: 50,
outbound_buffer_size: 50,
dht: DhtConfig {
discovery_request_timeout: Duration::from_secs(discovery_timeout_in_secs),
database_url: DbConnectionUrl::File(dht_database_path),
Expand Down
7 changes: 2 additions & 5 deletions common/config/presets/c_base_node.toml
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,10 @@ track_reorgs = true
#peer_database_name = "peers"

# The maximum number of concurrent Inbound tasks allowed before back-pressure is applied to peers
#max_concurrent_inbound_tasks = 50
#max_concurrent_inbound_tasks = 4

# The maximum number of concurrent outbound tasks allowed before back-pressure is applied to outbound messaging queue
#max_concurrent_outbound_tasks = 100

# The size of the buffer (channel) which holds pending outbound message requests
#outbound_buffer_size = 100
#max_concurrent_outbound_tasks = 4

# Set to true to allow peers to provide test addresses (loopback, memory etc.). If set to false, memory
# addresses, loopback, local-link (i.e addresses used in local tests) will not be accepted from peers. This
Expand Down
7 changes: 2 additions & 5 deletions common/config/presets/d_console_wallet.toml
Original file line number Diff line number Diff line change
Expand Up @@ -189,13 +189,10 @@ event_channel_size = 3500
#peer_database_name = "peers"

# The maximum number of concurrent Inbound tasks allowed before back-pressure is applied to peers
#max_concurrent_inbound_tasks = 50
#max_concurrent_inbound_tasks = 4

# The maximum number of concurrent outbound tasks allowed before back-pressure is applied to outbound messaging queue
#max_concurrent_outbound_tasks = 100

# The size of the buffer (channel) which holds pending outbound message requests
#outbound_buffer_size = 100
#max_concurrent_outbound_tasks = 4

# Set to true to allow peers to provide test addresses (loopback, memory etc.). If set to false, memory
# addresses, loopback, local-link (i.e addresses used in local tests) will not be accepted from peers. This
Expand Down
16 changes: 3 additions & 13 deletions comms/core/src/pipeline/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,14 @@ use crate::{
};

const DEFAULT_MAX_CONCURRENT_TASKS: usize = 50;
const DEFAULT_OUTBOUND_BUFFER_SIZE: usize = 50;

type OutboundMessageSinkService = SinkService<mpsc::Sender<OutboundMessage>>;
type OutboundMessageSinkService = SinkService<mpsc::UnboundedSender<OutboundMessage>>;

/// Message pipeline builder
#[derive(Default)]
pub struct Builder<TInSvc, TOutSvc, TOutReq> {
max_concurrent_inbound_tasks: usize,
max_concurrent_outbound_tasks: Option<usize>,
outbound_buffer_size: usize,
inbound: Option<TInSvc>,
outbound_rx: Option<mpsc::Receiver<TOutReq>>,
outbound_pipeline_factory: Option<Box<dyn FnOnce(OutboundMessageSinkService) -> TOutSvc>>,
Expand All @@ -50,7 +48,6 @@ impl Builder<(), (), ()> {
Self {
max_concurrent_inbound_tasks: DEFAULT_MAX_CONCURRENT_TASKS,
max_concurrent_outbound_tasks: None,
outbound_buffer_size: DEFAULT_OUTBOUND_BUFFER_SIZE,
inbound: None,
outbound_rx: None,
outbound_pipeline_factory: None,
Expand All @@ -69,11 +66,6 @@ impl<TInSvc, TOutSvc, TOutReq> Builder<TInSvc, TOutSvc, TOutReq> {
self
}

pub fn outbound_buffer_size(mut self, buf_size: usize) -> Self {
self.outbound_buffer_size = buf_size;
self
}

pub fn with_outbound_pipeline<F, S, R>(self, receiver: mpsc::Receiver<R>, factory: F) -> Builder<TInSvc, S, R>
where
// Factory function takes in a SinkService and returns a new composed service
Expand All @@ -87,7 +79,6 @@ impl<TInSvc, TOutSvc, TOutReq> Builder<TInSvc, TOutSvc, TOutReq> {
max_concurrent_inbound_tasks: self.max_concurrent_inbound_tasks,
max_concurrent_outbound_tasks: self.max_concurrent_outbound_tasks,
inbound: self.inbound,
outbound_buffer_size: self.outbound_buffer_size,
}
}

Expand All @@ -100,7 +91,6 @@ impl<TInSvc, TOutSvc, TOutReq> Builder<TInSvc, TOutSvc, TOutReq> {
max_concurrent_outbound_tasks: self.max_concurrent_outbound_tasks,
outbound_rx: self.outbound_rx,
outbound_pipeline_factory: self.outbound_pipeline_factory,
outbound_buffer_size: self.outbound_buffer_size,
}
}
}
Expand All @@ -111,7 +101,7 @@ where
TInSvc: Service<InboundMessage> + Clone + Send + 'static,
{
fn build_outbound(&mut self) -> Result<OutboundPipelineConfig<TOutReq, TOutSvc>, PipelineBuilderError> {
let (out_sender, out_receiver) = mpsc::channel(self.outbound_buffer_size);
let (out_sender, out_receiver) = mpsc::unbounded_channel();

let in_receiver = self
.outbound_rx
Expand Down Expand Up @@ -157,7 +147,7 @@ pub struct OutboundPipelineConfig<TInItem, TPipeline> {
/// Messages read from this stream are passed to the pipeline
pub in_receiver: mpsc::Receiver<TInItem>,
/// Receiver of `OutboundMessage`s coming from the pipeline
pub out_receiver: mpsc::Receiver<OutboundMessage>,
pub out_receiver: mpsc::UnboundedReceiver<OutboundMessage>,
/// The pipeline (`tower::Service`) to run for each in_stream message
pub pipeline: TPipeline,
}
Expand Down
22 changes: 18 additions & 4 deletions comms/core/src/pipeline/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{fmt::Display, time::Instant};
use std::{
fmt::Display,
time::{Duration, Instant},
};

use futures::future::FusedFuture;
use log::*;
use tari_shutdown::ShutdownSignal;
use tokio::sync::mpsc;
use tokio::{sync::mpsc, time};
use tower::{Service, ServiceExt};

use crate::bounded_executor::BoundedExecutor;
Expand Down Expand Up @@ -103,8 +106,19 @@ where
.spawn(async move {
let timer = Instant::now();
trace!(target: LOG_TARGET, "Start inbound pipeline {}", id);
if let Err(err) = service.oneshot(item).await {
warn!(target: LOG_TARGET, "Inbound pipeline returned an error: '{}'", err);
match time::timeout(Duration::from_secs(30), service.oneshot(item)).await {
Ok(Ok(_)) => {},
Ok(Err(err)) => {
warn!(target: LOG_TARGET, "Inbound pipeline returned an error: '{}'", err);
},
Err(_) => {
error!(
target: LOG_TARGET,
"Inbound pipeline {} timed out and was aborted. THIS SHOULD NOT HAPPEN: there was a \
deadlock or excessive delay in processing this pipeline.",
id
);
},
}
trace!(
target: LOG_TARGET,
Expand Down
31 changes: 22 additions & 9 deletions comms/core/src/pipeline/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{fmt::Display, time::Instant};
use std::{
fmt::Display,
time::{Duration, Instant},
};

use futures::future::Either;
use log::*;
use tokio::sync::mpsc;
use tokio::{sync::mpsc, time};
use tower::{Service, ServiceExt};

use crate::{
Expand Down Expand Up @@ -93,16 +96,26 @@ where
let pipeline = self.config.pipeline.clone();
let id = current_id;
current_id = (current_id + 1) % u64::MAX;

self.executor
.spawn(async move {
let timer = Instant::now();
trace!(target: LOG_TARGET, "Start outbound pipeline {}", id);
if let Err(err) = pipeline.oneshot(msg).await {
error!(
target: LOG_TARGET,
"Outbound pipeline {} returned an error: '{}'", id, err
);
match time::timeout(Duration::from_secs(30), pipeline.oneshot(msg)).await {
Ok(Ok(_)) => {},
Ok(Err(err)) => {
error!(
target: LOG_TARGET,
"Outbound pipeline {} returned an error: '{}'", id, err
);
},
Err(_) => {
error!(
target: LOG_TARGET,
"Outbound pipeline {} timed out and was aborted. THIS SHOULD NOT HAPPEN: \
there was a deadlock or excessive delay in processing this pipeline.",
id
);
},
}

trace!(
Expand Down Expand Up @@ -174,7 +187,7 @@ mod test {
)
.await
.unwrap();
let (out_tx, out_rx) = mpsc::channel(NUM_ITEMS);
let (out_tx, out_rx) = mpsc::unbounded_channel();
let (msg_tx, mut msg_rx) = mpsc::channel(NUM_ITEMS);
let executor = Handle::current();

Expand Down
21 changes: 20 additions & 1 deletion comms/core/src/pipeline/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::task::Poll;
use std::{future, task::Poll};

use futures::{future::BoxFuture, task::Context, FutureExt};
use tower::Service;
Expand Down Expand Up @@ -59,3 +59,22 @@ where T: Send + 'static
.boxed()
}
}
impl<T> Service<T> for SinkService<tokio::sync::mpsc::UnboundedSender<T>>
where T: Send + 'static
{
type Error = PipelineError;
type Future = future::Ready<Result<Self::Response, Self::Error>>;
type Response = ();

fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, item: T) -> Self::Future {
let sink = self.0.clone();
let result = sink
.send(item)
.map_err(|_| anyhow::anyhow!("sink closed in sink service"));
future::ready(result)
}
}
6 changes: 3 additions & 3 deletions comms/core/src/protocol/messaging/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ use crate::{
runtime::task,
};

/// Buffer size for inbound messages from _all_ peers. This should be large enough to buffer quite a few incoming
/// messages before creating backpressure on peers speaking the messaging protocol.
pub const INBOUND_MESSAGE_BUFFER_SIZE: usize = 100;
/// Buffer size for inbound messages from _all_ peers. If the message consumer is slow to get through this queue,
/// sending peers will start to experience backpressure (this is a good thing).
pub const INBOUND_MESSAGE_BUFFER_SIZE: usize = 10;
/// Buffer size notifications that a peer wants to speak /tari/messaging. This buffer is used for all peers, but a low
/// value is ok because this events happen once (or less) per connecting peer. For e.g. a value of 10 would allow 10
/// peers to concurrently request to speak /tari/messaging.
Expand Down
Loading

0 comments on commit 3fcc6a0

Please sign in to comment.