Skip to content

Commit

Permalink
bugfix: dont store sent/received fragments unless explicitly enabled
Browse files Browse the repository at this point in the history
  • Loading branch information
jstuczyn committed Oct 18, 2024
1 parent 15fd6a2 commit abc1dff
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 47 deletions.
87 changes: 54 additions & 33 deletions common/nymsphinx/chunking/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
// Copyright 2021 - Nym Technologies SA <[email protected]>
// SPDX-License-Identifier: Apache-2.0

use std::sync::LazyLock;

use crate::fragment::{linked_fragment_payload_max_len, unlinked_fragment_payload_max_len};
use dashmap::DashMap;
use fragment::{Fragment, FragmentHeader};
use fragment::FragmentHeader;
use nym_crypto::asymmetric::ed25519::PublicKey;
use serde::Serialize;
pub use set::split_into_sets;
Expand All @@ -29,6 +26,59 @@ pub mod fragment;
pub mod reconstruction;
pub mod set;

pub mod monitoring {
use crate::fragment::Fragment;
use crate::{ReceivedFragment, SentFragment};
use dashmap::DashMap;
use nym_crypto::asymmetric::ed25519::PublicKey;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::LazyLock;

pub static ENABLED: AtomicBool = AtomicBool::new(false);

pub static FRAGMENTS_RECEIVED: LazyLock<DashMap<i32, Vec<ReceivedFragment>>> =
LazyLock::new(DashMap::new);

pub static FRAGMENTS_SENT: LazyLock<DashMap<i32, Vec<SentFragment>>> =
LazyLock::new(DashMap::new);

pub fn enable() {
ENABLED.store(true, Ordering::Relaxed)
}

pub fn enabled() -> bool {
ENABLED.load(Ordering::Relaxed)
}

#[macro_export]
macro_rules! now {
() => {
match std::time::SystemTime::now().duration_since(std::time::SystemTime::UNIX_EPOCH) {
Ok(n) => n.as_secs(),
Err(_) => 0,
}
};
}

pub fn fragment_received(fragment: &Fragment) {
if enabled() {
let id = fragment.fragment_identifier().set_id();
let mut entry = FRAGMENTS_RECEIVED.entry(id).or_default();
let r = ReceivedFragment::new(fragment.header(), now!());
entry.push(r);
}
}

pub fn fragment_sent(fragment: &Fragment, client_nonce: i32, destination: PublicKey, hops: u8) {
if enabled() {
let id = fragment.fragment_identifier().set_id();
let mut entry = FRAGMENTS_SENT.entry(id).or_default();
let s = SentFragment::new(fragment.header(), now!(), client_nonce, destination, hops);
entry.push(s);
}
}
}

#[derive(Debug, Clone)]
pub struct FragmentMixParams {
destination: PublicKey,
Expand Down Expand Up @@ -112,35 +162,6 @@ impl ReceivedFragment {
}
}

pub static FRAGMENTS_RECEIVED: LazyLock<DashMap<i32, Vec<ReceivedFragment>>> =
LazyLock::new(DashMap::new);

pub static FRAGMENTS_SENT: LazyLock<DashMap<i32, Vec<SentFragment>>> = LazyLock::new(DashMap::new);

#[macro_export]
macro_rules! now {
() => {
match std::time::SystemTime::now().duration_since(std::time::SystemTime::UNIX_EPOCH) {
Ok(n) => n.as_secs(),
Err(_) => 0,
}
};
}

pub fn fragment_received(fragment: &Fragment) {
let id = fragment.fragment_identifier().set_id();
let mut entry = FRAGMENTS_RECEIVED.entry(id).or_default();
let r = ReceivedFragment::new(fragment.header(), now!());
entry.push(r);
}

pub fn fragment_sent(fragment: &Fragment, client_nonce: i32, destination: PublicKey, hops: u8) {
let id = fragment.fragment_identifier().set_id();
let mut entry = FRAGMENTS_SENT.entry(id).or_default();
let s = SentFragment::new(fragment.header(), now!(), client_nonce, destination, hops);
entry.push(s);
}

/// The idea behind the process of chunking is to incur as little data overhead as possible due
/// to very computationally costly sphinx encapsulation procedure.
///
Expand Down
4 changes: 2 additions & 2 deletions common/nymsphinx/chunking/src/reconstruction.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2021 - Nym Technologies SA <[email protected]>
// SPDX-License-Identifier: Apache-2.0
use crate::fragment::Fragment;
use crate::{fragment_received, ChunkingError};
use crate::{monitoring, ChunkingError};
use log::*;
use std::collections::HashMap;

Expand Down Expand Up @@ -110,7 +110,7 @@ impl ReconstructionBuffer {
}
});

fragment_received(&fragment);
monitoring::fragment_received(&fragment);

let fragment_index = fragment.current_fragment() as usize - 1;
if self.fragments[fragment_index].is_some() {
Expand Down
4 changes: 2 additions & 2 deletions common/nymsphinx/src/preparer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use nym_sphinx_addressing::clients::Recipient;
use nym_sphinx_addressing::nodes::NymNodeRoutingAddress;
use nym_sphinx_anonymous_replies::reply_surb::ReplySurb;
use nym_sphinx_chunking::fragment::{Fragment, FragmentIdentifier};
use nym_sphinx_chunking::fragment_sent;
use nym_sphinx_forwarding::packet::MixPacket;
use nym_sphinx_params::packet_sizes::PacketSize;
use nym_sphinx_params::{PacketType, ReplySurbKeyDigestAlgorithm, DEFAULT_NUM_MIX_HOPS};
Expand All @@ -21,6 +20,7 @@ use nym_topology::{NymTopology, NymTopologyError};
use rand::{CryptoRng, Rng, SeedableRng};
use rand_chacha::ChaCha20Rng;

use nym_sphinx_chunking::monitoring;
use std::time::Duration;

pub(crate) mod payload;
Expand Down Expand Up @@ -206,7 +206,7 @@ pub trait FragmentPreparer {

let destination = packet_recipient.gateway();
let hops = mix_hops.unwrap_or(self.num_mix_hops());
fragment_sent(&fragment, self.nonce(), *destination, hops);
monitoring::fragment_sent(&fragment, self.nonce(), *destination, hops);

let non_reply_overhead = encryption::PUBLIC_KEY_SIZE;
let expected_plaintext = match packet_type {
Expand Down
14 changes: 7 additions & 7 deletions nym-network-monitor/src/accounting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::collections::{HashMap, HashSet};
use anyhow::Result;
use futures::{stream::FuturesUnordered, StreamExt};
use log::{debug, info};
use nym_sphinx::chunking::{SentFragment, FRAGMENTS_RECEIVED, FRAGMENTS_SENT};
use nym_sphinx::chunking::{monitoring, SentFragment};
use nym_topology::{gateway, mix, NymTopology};
use nym_types::monitoring::{MonitorMessage, NodeResult};
use nym_validator_client::nym_api::routes::{API_VERSION, STATUS, SUBMIT_GATEWAY, SUBMIT_NODE};
Expand Down Expand Up @@ -115,8 +115,8 @@ impl NetworkAccount {
}

pub fn empty_buffers() {
FRAGMENTS_SENT.clear();
FRAGMENTS_RECEIVED.clear();
monitoring::FRAGMENTS_SENT.clear();
monitoring::FRAGMENTS_RECEIVED.clear();
}

fn new() -> Self {
Expand All @@ -125,7 +125,7 @@ impl NetworkAccount {
topology,
..Default::default()
};
for fragment_set in FRAGMENTS_SENT.iter() {
for fragment_set in monitoring::FRAGMENTS_SENT.iter() {
let sent_fragments = fragment_set
.value()
.first()
Expand All @@ -138,7 +138,7 @@ impl NetworkAccount {
sent_fragments
);

let recv = FRAGMENTS_RECEIVED.get(fragment_set.key());
let recv = monitoring::FRAGMENTS_RECEIVED.get(fragment_set.key());
let recv_fragments = recv.as_ref().map(|r| r.value().len()).unwrap_or(0);
debug!(
"RECV Fragment set {} has {} fragments",
Expand Down Expand Up @@ -170,7 +170,7 @@ impl NetworkAccount {
}

fn hydrate_all_fragments(&mut self) -> Result<()> {
for fragment_set in FRAGMENTS_SENT.iter() {
for fragment_set in monitoring::FRAGMENTS_SENT.iter() {
let fragment_set_id = fragment_set.key();
for fragment in fragment_set.value() {
let route = self.hydrate_route(fragment.clone())?;
Expand Down Expand Up @@ -205,7 +205,7 @@ impl NetworkAccount {
fn find_missing_fragments(&mut self) {
let mut missing_fragments_map = HashMap::new();
for fragment_set_id in &self.incomplete_fragment_sets {
if let Some(fragment_ref) = FRAGMENTS_RECEIVED.get(fragment_set_id) {
if let Some(fragment_ref) = monitoring::FRAGMENTS_RECEIVED.get(fragment_set_id) {
if let Some(ref_fragment) = fragment_ref.value().first() {
let ref_header = ref_fragment.header();
let ref_id_set = (0..ref_header.total_fragments()).collect::<HashSet<u8>>();
Expand Down
6 changes: 3 additions & 3 deletions nym-network-monitor/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use axum::{
use futures::StreamExt;
use log::{debug, error, warn};
use nym_sdk::mixnet::MixnetMessageSender;
use nym_sphinx::chunking::{ReceivedFragment, SentFragment, FRAGMENTS_RECEIVED, FRAGMENTS_SENT};
use nym_sphinx::chunking::{monitoring, ReceivedFragment, SentFragment};
use petgraph::{dot::Dot, Graph};
use rand::{distributions::Alphanumeric, seq::SliceRandom, Rng};
use serde::Serialize;
Expand Down Expand Up @@ -113,7 +113,7 @@ pub async fn graph_handler() -> Result<String, StatusCode> {
)]
pub async fn sent_handler() -> Json<FragmentsSent> {
Json(FragmentsSent(
(*FRAGMENTS_SENT)
(*monitoring::FRAGMENTS_SENT)
.clone()
.into_iter()
.collect::<HashMap<_, _>>(),
Expand All @@ -129,7 +129,7 @@ pub async fn sent_handler() -> Json<FragmentsSent> {
)]
pub async fn recv_handler() -> Json<FragmentsReceived> {
Json(FragmentsReceived(
(*FRAGMENTS_RECEIVED)
(*monitoring::FRAGMENTS_RECEIVED)
.clone()
.into_iter()
.collect::<HashMap<_, _>>(),
Expand Down
4 changes: 4 additions & 0 deletions nym-network-monitor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use nym_crypto::asymmetric::ed25519::PrivateKey;
use nym_network_defaults::setup_env;
use nym_network_defaults::var_names::NYM_API;
use nym_sdk::mixnet::{self, MixnetClient};
use nym_sphinx::chunking::monitoring;
use nym_topology::{HardcodedTopologyProvider, NymTopology};
use std::fs::File;
use std::io::Write;
Expand Down Expand Up @@ -154,6 +155,9 @@ async fn main() -> Result<()> {

setup_env(args.env); // Defaults to mainnet if empty

// enable monitoring client-side
monitoring::enable();

let cancel_token = CancellationToken::new();
let server_cancel_token = cancel_token.clone();
let clients = Arc::new(RwLock::new(VecDeque::with_capacity(args.n_clients)));
Expand Down

0 comments on commit abc1dff

Please sign in to comment.