Skip to content

Commit

Permalink
add global configuration interface for healthchecks
Browse files Browse the repository at this point in the history
Signed-off-by: onur-ozkan <[email protected]>
  • Loading branch information
onur-ozkan committed Sep 2, 2024
1 parent f550550 commit 9774f15
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 14 deletions.
44 changes: 44 additions & 0 deletions mm2src/mm2_core/src/mm_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use mm2_event_stream::{controller::Controller, Event, EventStreamConfiguration};
use mm2_metrics::{MetricsArc, MetricsOps};
use primitives::hash::H160;
use rand::Rng;
use serde::Deserialize;
use serde_json::{self as json, Value as Json};
use shared_ref_counter::{SharedRc, WeakRc};
use std::any::Any;
Expand Down Expand Up @@ -44,6 +45,39 @@ cfg_native! {
/// Default interval to export and record metrics to log.
const EXPORT_METRICS_INTERVAL: f64 = 5. * 60.;

mod healthcheck_defaults {
/// The default duration required to wait before sending another healthcheck
/// request to the same peer.
pub(crate) const fn default_healthcheck_blocking_ms() -> u64 { 750 }

pub(crate) const fn default_healthcheck_message_expiration() -> i64 { 10 }

pub(crate) const fn default_result_channel_timeout() -> u64 { 10 }
}

#[derive(Debug, Deserialize)]
pub struct HealthcheckConfig {
/// TODO
#[serde(default = "healthcheck_defaults::default_healthcheck_blocking_ms")]
pub blocking_ms_for_per_address: u64,
/// TODO
#[serde(default = "healthcheck_defaults::default_healthcheck_message_expiration")]
pub message_expiration: i64,
/// TODO
#[serde(default = "healthcheck_defaults::default_result_channel_timeout")]
pub timeout: u64,
}

impl Default for HealthcheckConfig {
fn default() -> Self {
Self {
blocking_ms_for_per_address: healthcheck_defaults::default_healthcheck_blocking_ms(),
message_expiration: healthcheck_defaults::default_healthcheck_message_expiration(),
timeout: healthcheck_defaults::default_result_channel_timeout(),
}
}
}

/// MarketMaker state, shared between the various MarketMaker threads.
///
/// Every MarketMaker has one and only one instance of `MmCtx`.
Expand Down Expand Up @@ -148,6 +182,8 @@ pub struct MmCtx {
pub healthcheck_response_handler: AsyncMutex<ExpirableMap<String, oneshot::Sender<()>>>,
/// This is used to record healthcheck sender peers in an expirable manner to prevent brute-force attacks.
pub healthcheck_bruteforce_shield: AsyncMutex<ExpirableMap<String, ()>>,
/// TODO
pub healthcheck_config: HealthcheckConfig,
}

impl MmCtx {
Expand Down Expand Up @@ -199,6 +235,7 @@ impl MmCtx {
async_sqlite_connection: Constructible::default(),
healthcheck_response_handler: AsyncMutex::new(ExpirableMap::default()),
healthcheck_bruteforce_shield: AsyncMutex::new(ExpirableMap::default()),
healthcheck_config: HealthcheckConfig::default(),
}
}

Expand Down Expand Up @@ -768,6 +805,13 @@ impl MmCtxBuilder {
.expect("Invalid json value in 'event_stream_configuration'.");
ctx.event_stream_configuration = Some(event_stream_configuration);
}

let healthcheck_config = &ctx.conf["healthcheck_config"];
if !healthcheck_config.is_null() {
let healthcheck_config: HealthcheckConfig =
json::from_value(healthcheck_config.clone()).expect("Invalid json value in 'healthcheck_config'.");
ctx.healthcheck_config = healthcheck_config;
}
}

#[cfg(target_arch = "wasm32")]
Expand Down
16 changes: 5 additions & 11 deletions mm2src/mm2_main/src/lp_healthcheck.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@ use crate::lp_network::broadcast_p2p_msg;

pub(crate) const PEER_HEALTHCHECK_PREFIX: TopicPrefix = "hcheck";

/// The default duration required to wait before sending another healthcheck
/// request to the same peer.
pub(crate) const HEALTHCHECK_BLOCKING_DURATION: Duration = Duration::from_millis(750);

#[derive(Debug, Deserialize, Serialize)]
#[cfg_attr(any(test, target_arch = "wasm32"), derive(PartialEq))]
pub(crate) struct HealthcheckMessage {
Expand Down Expand Up @@ -163,15 +159,12 @@ pub async fn peer_connection_healthcheck_rpc(
/// This is unrelated to the timeout logic.
const ADDRESS_RECORD_EXPIRATION: Duration = Duration::from_secs(60);

const RESULT_CHANNEL_TIMEOUT: Duration = Duration::from_secs(10);

const HEALTHCHECK_MESSAGE_EXPIRATION: i64 = 10;

let target_peer_id = PeerId::from_str(&req.peer_id)
.map_err(|e| HealthcheckRpcError::InvalidPeerAddress { reason: e.to_string() })?;

let message = HealthcheckMessage::generate_message(&ctx, target_peer_id, false, HEALTHCHECK_MESSAGE_EXPIRATION)
.map_err(|reason| HealthcheckRpcError::MessageGenerationFailed { reason })?;
let message =
HealthcheckMessage::generate_message(&ctx, target_peer_id, false, ctx.healthcheck_config.message_expiration)
.map_err(|reason| HealthcheckRpcError::MessageGenerationFailed { reason })?;

let encoded_message = message
.encode()
Expand All @@ -186,7 +179,8 @@ pub async fn peer_connection_healthcheck_rpc(

broadcast_p2p_msg(&ctx, peer_healthcheck_topic(&target_peer_id), encoded_message, None);

Ok(rx.timeout(RESULT_CHANNEL_TIMEOUT).await == Ok(Ok(())))
let timeout_duration = Duration::from_millis(ctx.healthcheck_config.timeout);
Ok(rx.timeout(timeout_duration).await == Ok(Ok(())))
}

#[cfg(any(test, target_arch = "wasm32"))]
Expand Down
10 changes: 7 additions & 3 deletions mm2src/mm2_main/src/lp_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use common::executor::SpawnFuture;
use common::{log, Future01CompatExt};
use derive_more::Display;
use futures::{channel::oneshot, StreamExt};
use instant::Instant;
use instant::{Duration, Instant};
use keys::KeyPair;
use mm2_core::mm_ctx::{MmArc, MmWeak};
use mm2_err_handle::prelude::*;
Expand All @@ -39,7 +39,7 @@ use serde::de;
use std::net::ToSocketAddrs;
use std::str::FromStr;

use crate::lp_healthcheck::{peer_healthcheck_topic, HealthcheckMessage, HEALTHCHECK_BLOCKING_DURATION};
use crate::lp_healthcheck::{peer_healthcheck_topic, HealthcheckMessage};
use crate::{lp_healthcheck, lp_ordermatch, lp_stats, lp_swap};

pub type P2PRequestResult<T> = Result<T, MmError<P2PRequestError>>;
Expand Down Expand Up @@ -241,7 +241,11 @@ async fn process_p2p_message(
let mut bruteforce_shield = ctx.healthcheck_bruteforce_shield.lock().await;
bruteforce_shield.clear_expired_entries();
if bruteforce_shield
.insert(sender_peer.clone(), (), HEALTHCHECK_BLOCKING_DURATION)
.insert(
sender_peer.clone(),
(),
Duration::from_millis(ctx.healthcheck_config.blocking_ms_for_per_address),
)
.is_some()
{
log::warn!("Peer '{sender_peer}' exceeded the healthcheck blocking time, skipping their message.");
Expand Down

0 comments on commit 9774f15

Please sign in to comment.