Skip to content

Commit

Permalink
Merge pull request #1 from alexisbatyk/1195-store-contract-data-on-fn…
Browse files Browse the repository at this point in the history
…-save_record-when-pushing-a-new-event

Improved transactions data sent in network monitor
  • Loading branch information
alexisbatyk authored Aug 20, 2024
2 parents 1247a06 + aa66d0f commit 686c93c
Showing 1 changed file with 98 additions and 23 deletions.
121 changes: 98 additions & 23 deletions crates/fdev/src/network_metrics_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ async fn run_server(
changes,
peer_data: DashMap::new(),
transactions_data: DashMap::new(),
_contract_data: DashMap::new(),
contract_data: DashMap::new(),
}));

tracing::info!("Starting metrics server on port {port}");
Expand Down Expand Up @@ -206,6 +206,52 @@ async fn pull_interface(ws: WebSocket, state: Arc<ServerState>) -> anyhow::Resul
);
tx.send(Message::Binary(msg)).await?;
}

for transaction in state.transactions_data.iter() {
tracing::info!("sending transaction data");
for change in transaction.value() {
let msg = match change {
Change::PutRequest {
tx_id,
key,
requester,
target,
timestamp,
contract_location,
} => {
tracing::info!("sending put request");
ContractChange::put_request_msg(
tx_id.clone(),
key.to_string(),
requester.to_string(),
target.to_string(),
*timestamp,
*contract_location,
)
}
Change::PutSuccess {
tx_id,
key,
target,
timestamp,
contract_location,
} => {
tracing::info!("sending put success");
ContractChange::put_success_msg(
tx_id.clone(),
key.to_string(),
target.to_string(),
target.to_string(),
*timestamp,
*contract_location,
)
}
_ => continue,
};
tx.send(Message::Binary(msg)).await?;
}
}

let mut changes = state.changes.subscribe();
while let Ok(msg) = changes.recv().await {
match msg {
Expand Down Expand Up @@ -269,15 +315,21 @@ async fn pull_interface(ws: WebSocket, state: Arc<ServerState>) -> anyhow::Resul
struct ServerState {
changes: tokio::sync::broadcast::Sender<Change>,
peer_data: DashMap<PeerId, PeerData>,
transactions_data: DashMap<String, Vec<String>>,
_contract_data: DashMap<String, Vec<String>>,
transactions_data: DashMap<String, Vec<Change>>,
contract_data: DashMap<String, ContractData>,
}

struct PeerData {
connections: Vec<(PeerId, f64)>,
location: f64,
}

struct ContractData {
location: f64,
connections: Vec<PeerId>,
key: String,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) enum Change {
AddedConnection {
Expand Down Expand Up @@ -420,18 +472,20 @@ impl ServerState {
}

if let Some(mut entry) = self.transactions_data.get_mut(&tx_id) {
tracing::error!(
"found an already included in logs transaction when it should create it."
);
tracing::error!("this tx should not be included on transactions_data");

unreachable!();
} else {
self.transactions_data.insert(
tx_id.clone(),
vec![format!(
"tx_id {} key {} req {} target {} state PutRequest",
tx_id, key, requester, target
)],
vec![Change::PutRequest {
tx_id: tx_id.clone(),
key: key.clone(),
requester: requester.clone(),
target: target.clone(),
timestamp,
contract_location,
}],
);
}

Expand Down Expand Up @@ -466,21 +520,42 @@ impl ServerState {
return Err(anyhow::anyhow!("requester is empty"));
}

if let Some(mut entry) = self.transactions_data.get_mut(&tx_id) {
entry.push(format!(
"tx_id {} key {} req {} target {} state PutSuccess",
tx_id, key, requester, target
));
} else {
tracing::error!("transaction data not found for this tx when it should.");
unreachable!()
match self.transactions_data.entry(tx_id.clone()) {
dashmap::mapref::entry::Entry::Occupied(mut occ) => {
tracing::info!("found transaction data, adding PutSuccess to changes");
let changes = occ.get_mut();
changes.push(Change::PutSuccess {
tx_id: tx_id.clone(),
key: key.clone(),
target: target.clone(),
timestamp,
contract_location,
});
//connections.sort_unstable_by(|a, b| a.cmp(&b.0));
//connections.dedup();
}
dashmap::mapref::entry::Entry::Vacant(_vac) => {
// this should not happen
tracing::error!("this tx should be included on transactions_data. It should exists a PutRequest before the PutSuccess.");
unreachable!();
}
}

// if let Some(mut entry) = self.contract_data.get_mut(&key) {
// entry.push();
// } else {
// self.contract_data.insert();
// }
match self.contract_data.entry(key.clone()) {
dashmap::mapref::entry::Entry::Occupied(mut occ) => {
let connections = &mut occ.get_mut().connections;
connections.push(PeerId::from_str(&target)?);
//connections.sort_unstable_by(|a, b| a.cmp(&b.0));
//connections.dedup();
}
dashmap::mapref::entry::Entry::Vacant(vac) => {
vac.insert(ContractData {
connections: vec![PeerId::from_str(&target)?],
location: contract_location,
key: key.clone(),
});
}
}

tracing::debug!(%tx_id, %key, %requester, %target, "checking values from save_record -- putsuccess");

Expand Down

0 comments on commit 686c93c

Please sign in to comment.