Skip to content

Commit

Permalink
feat: configurable handshake session timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
ozwaldorf committed May 21, 2024
1 parent 7ab47e7 commit d010e0e
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 5 deletions.
7 changes: 7 additions & 0 deletions core/handshake/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::net::SocketAddr;
use std::path::PathBuf;
use std::time::Duration;

use serde::{Deserialize, Serialize};

Expand All @@ -8,10 +9,15 @@ use crate::transports;
#[derive(Serialize, Deserialize, Clone)]
#[serde(default)]
pub struct HandshakeConfig {
/// List of transports to enable
#[serde(rename = "transport")]
pub transports: Vec<TransportConfig>,
/// Shared tranport http address
pub http_address: SocketAddr,
/// Optional http configuration
pub https: Option<HttpsConfig>,
/// Timeout for disconnected sessions
pub timeout: Duration,
}

impl Default for HandshakeConfig {
Expand All @@ -25,6 +31,7 @@ impl Default for HandshakeConfig {
],
http_address: ([0, 0, 0, 0], 4220).into(),
https: None,
timeout: Duration::from_secs(1),
}
}
}
Expand Down
8 changes: 5 additions & 3 deletions core/handshake/src/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl<C: Collection> Handshake<C> {
let config = config.get::<Self>();
let provider = service_executor.get_provider();
let pk = keystore.get_ed25519_pk();
let ctx = Context::new(provider, waiter);
let ctx = Context::new(provider, waiter, config.timeout);
let handle = Handle::new();

Self {
Expand Down Expand Up @@ -133,6 +133,7 @@ pub struct Context<P: ExecutorProviderInterface> {
pub(crate) shutdown: ShutdownWaiter,
connection_counter: Arc<AtomicU64>,
connections: Arc<DashMap<u64, ConnectionEntry>>,
timeout: Duration,
}

struct ConnectionEntry {
Expand All @@ -146,12 +147,13 @@ struct ConnectionEntry {
}

impl<P: ExecutorProviderInterface> Context<P> {
pub fn new(provider: P, waiter: ShutdownWaiter) -> Self {
pub fn new(provider: P, waiter: ShutdownWaiter, timeout: Duration) -> Self {
Self {
provider,
shutdown: waiter,
connection_counter: AtomicU64::new(0).into(),
connections: DashMap::new().into(),
timeout,
}
}

Expand Down Expand Up @@ -215,7 +217,7 @@ impl<P: ExecutorProviderInterface> Context<P> {
},
);

Proxy::new(connection_id, socket, rx, self.clone()).spawn(Some(
Proxy::new(connection_id, socket, rx, self.clone(), self.timeout).spawn(Some(
State::OnlyPrimaryConnection((sender, receiver).into()),
));
},
Expand Down
11 changes: 9 additions & 2 deletions core/handshake/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub struct Proxy<P: ExecutorProviderInterface> {
/// payload through the transport. And randomly inserting in some other frame in the middle of
/// an active length delimited message before reaching the promised length breaks many things.
queued_primary_response: VecDeque<ResponseFrame>,
timeout: Duration,
}

pub type IsPrimary = bool;
Expand All @@ -65,6 +66,7 @@ impl<P: ExecutorProviderInterface> Proxy<P> {
socket: UnixStream,
connection_rx: Receiver<(IsPrimary, TransportPair)>,
context: Context<P>,
timeout: Duration,
) -> Self {
Self {
context,
Expand All @@ -76,6 +78,7 @@ impl<P: ExecutorProviderInterface> Proxy<P> {
discard_bytes: false,
is_primary_the_current_sender: false,
queued_primary_response: VecDeque::new(),
timeout,
}
}

Expand Down Expand Up @@ -123,7 +126,7 @@ impl<P: ExecutorProviderInterface> Proxy<P> {

#[inline]
async fn run_with_no_connection(&mut self) -> State {
match tokio::time::timeout(Duration::from_secs(10), self.connection_rx.recv()).await {
match tokio::time::timeout(self.timeout, self.connection_rx.recv()).await {
Ok(Ok((is_primary, pair))) => {
if is_primary {
State::OnlyPrimaryConnection(pair)
Expand Down Expand Up @@ -592,7 +595,11 @@ mod tests {

async fn start_mock_node<P: ExecutorProviderInterface>(id: u16) -> Result<ShutdownController> {
let shutdown = ShutdownController::default();
let context = Context::new(MockServiceProvider, shutdown.waiter());
let context = Context::new(
MockServiceProvider,
shutdown.waiter(),
Duration::from_secs(1),
);
let (transport, _) =
MockTransport::bind::<P>(shutdown.waiter(), MockTransportConfig { port: id }).await?;
transport.spawn_listener_task(context);
Expand Down

0 comments on commit d010e0e

Please sign in to comment.