Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix: client memory leak #4991

Merged
merged 1 commit into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 54 additions & 33 deletions common/nymsphinx/chunking/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
// Copyright 2021 - Nym Technologies SA <[email protected]>
// SPDX-License-Identifier: Apache-2.0

use std::sync::LazyLock;

use crate::fragment::{linked_fragment_payload_max_len, unlinked_fragment_payload_max_len};
use dashmap::DashMap;
use fragment::{Fragment, FragmentHeader};
use fragment::FragmentHeader;
use nym_crypto::asymmetric::ed25519::PublicKey;
use serde::Serialize;
pub use set::split_into_sets;
Expand All @@ -29,6 +26,59 @@ pub mod fragment;
pub mod reconstruction;
pub mod set;

pub mod monitoring {
use crate::fragment::Fragment;
use crate::{ReceivedFragment, SentFragment};
use dashmap::DashMap;
use nym_crypto::asymmetric::ed25519::PublicKey;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::LazyLock;

pub static ENABLED: AtomicBool = AtomicBool::new(false);

pub static FRAGMENTS_RECEIVED: LazyLock<DashMap<i32, Vec<ReceivedFragment>>> =
LazyLock::new(DashMap::new);

pub static FRAGMENTS_SENT: LazyLock<DashMap<i32, Vec<SentFragment>>> =
LazyLock::new(DashMap::new);

pub fn enable() {
ENABLED.store(true, Ordering::Relaxed)
}

pub fn enabled() -> bool {
ENABLED.load(Ordering::Relaxed)
}

#[macro_export]
macro_rules! now {
() => {
match std::time::SystemTime::now().duration_since(std::time::SystemTime::UNIX_EPOCH) {
Ok(n) => n.as_secs(),
Err(_) => 0,
}
};
}

pub fn fragment_received(fragment: &Fragment) {
if enabled() {
let id = fragment.fragment_identifier().set_id();
let mut entry = FRAGMENTS_RECEIVED.entry(id).or_default();
let r = ReceivedFragment::new(fragment.header(), now!());
entry.push(r);
}
}

pub fn fragment_sent(fragment: &Fragment, client_nonce: i32, destination: PublicKey, hops: u8) {
if enabled() {
let id = fragment.fragment_identifier().set_id();
let mut entry = FRAGMENTS_SENT.entry(id).or_default();
let s = SentFragment::new(fragment.header(), now!(), client_nonce, destination, hops);
entry.push(s);
}
}
}

#[derive(Debug, Clone)]
pub struct FragmentMixParams {
destination: PublicKey,
Expand Down Expand Up @@ -112,35 +162,6 @@ impl ReceivedFragment {
}
}

pub static FRAGMENTS_RECEIVED: LazyLock<DashMap<i32, Vec<ReceivedFragment>>> =
LazyLock::new(DashMap::new);

pub static FRAGMENTS_SENT: LazyLock<DashMap<i32, Vec<SentFragment>>> = LazyLock::new(DashMap::new);

#[macro_export]
macro_rules! now {
() => {
match std::time::SystemTime::now().duration_since(std::time::SystemTime::UNIX_EPOCH) {
Ok(n) => n.as_secs(),
Err(_) => 0,
}
};
}

pub fn fragment_received(fragment: &Fragment) {
let id = fragment.fragment_identifier().set_id();
let mut entry = FRAGMENTS_RECEIVED.entry(id).or_default();
let r = ReceivedFragment::new(fragment.header(), now!());
entry.push(r);
}

pub fn fragment_sent(fragment: &Fragment, client_nonce: i32, destination: PublicKey, hops: u8) {
let id = fragment.fragment_identifier().set_id();
let mut entry = FRAGMENTS_SENT.entry(id).or_default();
let s = SentFragment::new(fragment.header(), now!(), client_nonce, destination, hops);
entry.push(s);
}

/// The idea behind the process of chunking is to incur as little data overhead as possible due
/// to very computationally costly sphinx encapsulation procedure.
///
Expand Down
4 changes: 2 additions & 2 deletions common/nymsphinx/chunking/src/reconstruction.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2021 - Nym Technologies SA <[email protected]>
// SPDX-License-Identifier: Apache-2.0
use crate::fragment::Fragment;
use crate::{fragment_received, ChunkingError};
use crate::{monitoring, ChunkingError};
use log::*;
use std::collections::HashMap;

Expand Down Expand Up @@ -110,7 +110,7 @@ impl ReconstructionBuffer {
}
});

fragment_received(&fragment);
monitoring::fragment_received(&fragment);

let fragment_index = fragment.current_fragment() as usize - 1;
if self.fragments[fragment_index].is_some() {
Expand Down
4 changes: 2 additions & 2 deletions common/nymsphinx/src/preparer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use nym_sphinx_addressing::clients::Recipient;
use nym_sphinx_addressing::nodes::NymNodeRoutingAddress;
use nym_sphinx_anonymous_replies::reply_surb::ReplySurb;
use nym_sphinx_chunking::fragment::{Fragment, FragmentIdentifier};
use nym_sphinx_chunking::fragment_sent;
use nym_sphinx_forwarding::packet::MixPacket;
use nym_sphinx_params::packet_sizes::PacketSize;
use nym_sphinx_params::{PacketType, ReplySurbKeyDigestAlgorithm, DEFAULT_NUM_MIX_HOPS};
Expand All @@ -21,6 +20,7 @@ use nym_topology::{NymTopology, NymTopologyError};
use rand::{CryptoRng, Rng, SeedableRng};
use rand_chacha::ChaCha20Rng;

use nym_sphinx_chunking::monitoring;
use std::time::Duration;

pub(crate) mod payload;
Expand Down Expand Up @@ -206,7 +206,7 @@ pub trait FragmentPreparer {

let destination = packet_recipient.gateway();
let hops = mix_hops.unwrap_or(self.num_mix_hops());
fragment_sent(&fragment, self.nonce(), *destination, hops);
monitoring::fragment_sent(&fragment, self.nonce(), *destination, hops);

let non_reply_overhead = encryption::PUBLIC_KEY_SIZE;
let expected_plaintext = match packet_type {
Expand Down
14 changes: 7 additions & 7 deletions nym-network-monitor/src/accounting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::collections::{HashMap, HashSet};
use anyhow::Result;
use futures::{stream::FuturesUnordered, StreamExt};
use log::{debug, info};
use nym_sphinx::chunking::{SentFragment, FRAGMENTS_RECEIVED, FRAGMENTS_SENT};
use nym_sphinx::chunking::{monitoring, SentFragment};
use nym_topology::{gateway, mix, NymTopology};
use nym_types::monitoring::{MonitorMessage, NodeResult};
use nym_validator_client::nym_api::routes::{API_VERSION, STATUS, SUBMIT_GATEWAY, SUBMIT_NODE};
Expand Down Expand Up @@ -115,8 +115,8 @@ impl NetworkAccount {
}

pub fn empty_buffers() {
FRAGMENTS_SENT.clear();
FRAGMENTS_RECEIVED.clear();
monitoring::FRAGMENTS_SENT.clear();
monitoring::FRAGMENTS_RECEIVED.clear();
}

fn new() -> Self {
Expand All @@ -125,7 +125,7 @@ impl NetworkAccount {
topology,
..Default::default()
};
for fragment_set in FRAGMENTS_SENT.iter() {
for fragment_set in monitoring::FRAGMENTS_SENT.iter() {
let sent_fragments = fragment_set
.value()
.first()
Expand All @@ -138,7 +138,7 @@ impl NetworkAccount {
sent_fragments
);

let recv = FRAGMENTS_RECEIVED.get(fragment_set.key());
let recv = monitoring::FRAGMENTS_RECEIVED.get(fragment_set.key());
let recv_fragments = recv.as_ref().map(|r| r.value().len()).unwrap_or(0);
debug!(
"RECV Fragment set {} has {} fragments",
Expand Down Expand Up @@ -170,7 +170,7 @@ impl NetworkAccount {
}

fn hydrate_all_fragments(&mut self) -> Result<()> {
for fragment_set in FRAGMENTS_SENT.iter() {
for fragment_set in monitoring::FRAGMENTS_SENT.iter() {
let fragment_set_id = fragment_set.key();
for fragment in fragment_set.value() {
let route = self.hydrate_route(fragment.clone())?;
Expand Down Expand Up @@ -205,7 +205,7 @@ impl NetworkAccount {
fn find_missing_fragments(&mut self) {
let mut missing_fragments_map = HashMap::new();
for fragment_set_id in &self.incomplete_fragment_sets {
if let Some(fragment_ref) = FRAGMENTS_RECEIVED.get(fragment_set_id) {
if let Some(fragment_ref) = monitoring::FRAGMENTS_RECEIVED.get(fragment_set_id) {
if let Some(ref_fragment) = fragment_ref.value().first() {
let ref_header = ref_fragment.header();
let ref_id_set = (0..ref_header.total_fragments()).collect::<HashSet<u8>>();
Expand Down
6 changes: 3 additions & 3 deletions nym-network-monitor/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use axum::{
use futures::StreamExt;
use log::{debug, error, warn};
use nym_sdk::mixnet::MixnetMessageSender;
use nym_sphinx::chunking::{ReceivedFragment, SentFragment, FRAGMENTS_RECEIVED, FRAGMENTS_SENT};
use nym_sphinx::chunking::{monitoring, ReceivedFragment, SentFragment};
use petgraph::{dot::Dot, Graph};
use rand::{distributions::Alphanumeric, seq::SliceRandom, Rng};
use serde::Serialize;
Expand Down Expand Up @@ -113,7 +113,7 @@ pub async fn graph_handler() -> Result<String, StatusCode> {
)]
pub async fn sent_handler() -> Json<FragmentsSent> {
Json(FragmentsSent(
(*FRAGMENTS_SENT)
(*monitoring::FRAGMENTS_SENT)
.clone()
.into_iter()
.collect::<HashMap<_, _>>(),
Expand All @@ -129,7 +129,7 @@ pub async fn sent_handler() -> Json<FragmentsSent> {
)]
pub async fn recv_handler() -> Json<FragmentsReceived> {
Json(FragmentsReceived(
(*FRAGMENTS_RECEIVED)
(*monitoring::FRAGMENTS_RECEIVED)
.clone()
.into_iter()
.collect::<HashMap<_, _>>(),
Expand Down
4 changes: 4 additions & 0 deletions nym-network-monitor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use nym_crypto::asymmetric::ed25519::PrivateKey;
use nym_network_defaults::setup_env;
use nym_network_defaults::var_names::NYM_API;
use nym_sdk::mixnet::{self, MixnetClient};
use nym_sphinx::chunking::monitoring;
use nym_topology::{HardcodedTopologyProvider, NymTopology};
use std::fs::File;
use std::io::Write;
Expand Down Expand Up @@ -154,6 +155,9 @@ async fn main() -> Result<()> {

setup_env(args.env); // Defaults to mainnet if empty

// enable monitoring client-side
monitoring::enable();

let cancel_token = CancellationToken::new();
let server_cancel_token = cancel_token.clone();
let clients = Arc::new(RwLock::new(VecDeque::with_capacity(args.n_clients)));
Expand Down
Loading