From c0d625c1630da1b1b25414ecdc99bd78eccf8bba Mon Sep 17 00:00:00 2001 From: Stan Bondi Date: Sun, 21 Nov 2021 12:42:21 +0200 Subject: [PATCH] fix: be more permissive of responses for the incorrect request_id (#3588) Description --- - add more intrumentation to tari_comms - improve behaviour and logs around stream interruptions - be more permissive of responses for the incorrect request_id - reduce transaction_resend_period to 10 minutes from an hour @philipr-za Motivation and Context --- Track more metrics to diagnose issues. Formally recognise a stream interruption and continue the session vs a protocol violation which terminates the session. How Has This Been Tested? --- Existing test, stress test involving 2 wallets and a base node --- RFC/src/RFC-0250_Covenants.md | 44 +++------ applications/daily_tests/washing_machine.js | 6 +- .../src/automation/command_parser.rs | 14 ++- .../sync/header_sync/synchronizer.rs | 2 +- .../core/src/base_node/sync/rpc/tests.rs | 2 +- base_layer/core/src/mempool/rpc/test.rs | 4 +- .../tasks/txo_validation_task.rs | 4 +- .../wallet/src/transaction_service/config.rs | 2 +- .../transaction_validation_protocol.rs | 2 +- .../src/utxo_scanner_service/utxo_scanning.rs | 2 +- comms/dht/examples/memory_net/utilities.rs | 2 +- comms/dht/src/rpc/test.rs | 2 +- comms/rpc_macros/tests/macro.rs | 2 +- comms/src/connection_manager/dialer.rs | 4 + comms/src/connection_manager/listener.rs | 4 + comms/src/connection_manager/manager.rs | 20 +++- comms/src/connection_manager/metrics.rs | 82 ++++++++++++++++ comms/src/connection_manager/mod.rs | 1 + comms/src/connectivity/manager.rs | 5 +- comms/src/connectivity/metrics.rs | 21 ++-- comms/src/multiplexing/metrics.rs | 32 ++++++ comms/src/multiplexing/mod.rs | 3 + comms/src/multiplexing/yamux.rs | 11 ++- comms/src/protocol/messaging/inbound.rs | 14 +-- comms/src/protocol/messaging/metrics.rs | 74 ++++++++++++++ comms/src/protocol/messaging/mod.rs | 1 + comms/src/protocol/messaging/outbound.rs | 7 +- comms/src/protocol/rpc/client/metrics.rs | 81 +++++++++++---- comms/src/protocol/rpc/client/mod.rs | 43 +++++--- comms/src/protocol/rpc/server/error.rs | 30 +++++- comms/src/protocol/rpc/server/metrics.rs | 61 ++++++++---- comms/src/protocol/rpc/server/mod.rs | 98 +++++++++++++------ comms/src/protocol/rpc/status.rs | 12 ++- .../protocol/rpc/test/comms_integration.rs | 2 +- comms/src/protocol/rpc/test/smoke.rs | 10 +- 35 files changed, 529 insertions(+), 175 deletions(-) create mode 100644 comms/src/connection_manager/metrics.rs create mode 100644 comms/src/multiplexing/metrics.rs create mode 100644 comms/src/protocol/messaging/metrics.rs diff --git a/RFC/src/RFC-0250_Covenants.md b/RFC/src/RFC-0250_Covenants.md index d2e90367db..e3eb65cc15 100644 --- a/RFC/src/RFC-0250_Covenants.md +++ b/RFC/src/RFC-0250_Covenants.md @@ -206,58 +206,46 @@ little-endian 64-byte unsigned integer. The output set is returned unaltered. This rule is implicit for an empty (0 byte) covenant. -```yaml -op_byte: 0x20 +op_byte: 0x20
args: [] -``` ##### and(A, B) The intersection (\\(A \cap B\\)) of the resulting output set for covenant rules \\(A\\) and \\(B\\). -```yaml -op_byte: 0x21 +op_byte: 0x21
args: [Covenant, Covenant] -``` ##### or(A, B) The union (\\(A \cup B\\)) of the resulting output set for covenant rules \\(A\\) and \\(B\\). -```yaml -op_byte: 0x22 +op_byte: 0x22
args: [Covenant, Covenant] -``` ##### xor(A, B) The symmetric difference (\\(A \triangle B\\)) of the resulting output set for covenant rules \\(A\\) and \\(B\\). This is, outputs that match either \\(A\\) or \\(B\\) but not both. -```yaml -op_byte: 0x23 +op_byte: 0x23
args: [Covenant, Covenant] -``` ##### not(A) Returns the compliment of `A`. That is, all the elements of `A` are removed from the resultant output set. -```yaml -op_byte: 0x24 +op_byte: 0x24
args: [Covenant] -``` ##### empty() Returns an empty set. This will always fail and, if used alone, prevents the UTXO from ever being spent. A more useful reason to use `empty` is in conjunction a conditional e.g. `if_else(Condition(older_rel(10)), A, empty)` -```yaml -op_byte: 0x25 +op_byte: 0x25
args: [] -``` #### Filters @@ -265,46 +253,36 @@ args: [] Filters for a single output that matches the hash. This filter only returns zero or one outputs. -```yaml -op_byte: 0x30 +op_byte: 0x30
args: [Hash] -``` ##### filter_fields_preserved(fields) Filter for outputs where all given fields in the input are preserved in the output. -```yaml -op_byte: 0x31 +op_byte: 0x31
args: [Fields] -``` ##### filter_field_int_eq(field, int) Filters for outputs whose field value matches the given integer value. If the given field cannot be cast to an unsigned 64-bit integer, the transaction/block is rejected. -```yaml -op_byte: 0x32 +op_byte: 0x32
args: [Field, VarInt] -``` ##### filter_fields_hashed_eq(fields, hash) -```yaml -op_byte: 0x33 +op_byte: 0x33
args: [Fields, VarInt] -``` ##### filter_relative_height(height) Checks the block height that current [UTXO] (i.e. the current input) was mined plus `height` is greater than or equal to the current block height. If so, the `identity()` is returned, otherwise `empty()`. -```yaml -op_byte: 0x34 +op_byte: 0x34
args: [VarInt] -``` #### Encoding / Decoding diff --git a/applications/daily_tests/washing_machine.js b/applications/daily_tests/washing_machine.js index fc66b8137b..c32144aa57 100644 --- a/applications/daily_tests/washing_machine.js +++ b/applications/daily_tests/washing_machine.js @@ -129,15 +129,18 @@ function WashingMachine(options) { `🚀 Launching washing machine (numTransactions = ${numTransactions}, numRounds = ${numRounds}, sleep = ${sleepAfterRound}s)` ); + debug(`Connecting to wallet1 at ${wallet1Grpc}...`); await this.wallet1.connect(wallet1Grpc); + debug(`Connected.`); - debug("Compiling and starting applications..."); let wallet2Process = null; // Start wallet2 if (wallet2Grpc) { this.wallet2 = new WalletClient(); + debug(`Connecting to wallet2 at ${wallet2Grpc}...`); await this.wallet2.connect(wallet2Grpc); } else { + debug("Compiling wallet2..."); const port = await getFreePort(20000, 25000); wallet2Process = createGrpcWallet( baseNodeSeed, @@ -148,6 +151,7 @@ function WashingMachine(options) { true ); wallet2Process.baseDir = "./wallet"; + debug("Starting wallet2..."); await wallet2Process.startNew(); this.wallet2 = await wallet2Process.connectClient(); } diff --git a/applications/tari_console_wallet/src/automation/command_parser.rs b/applications/tari_console_wallet/src/automation/command_parser.rs index 0ed8acf4a6..275c9720b4 100644 --- a/applications/tari_console_wallet/src/automation/command_parser.rs +++ b/applications/tari_console_wallet/src/automation/command_parser.rs @@ -288,16 +288,14 @@ fn parse_make_it_rain(mut args: SplitWhitespace) -> Result, parsed_args.push(ParsedArgument::PublicKey(pubkey)); // transaction type - let txn_type = args - .next() - .ok_or_else(|| ParseError::Empty("transaction type".to_string()))?; + let txn_type = args.next(); let negotiated = match txn_type { - "negotiated" => true, - "one_sided" => false, + Some("negotiated") | Some("interactive") => true, + Some("one_sided") | Some("one-sided") | Some("onesided") => false, _ => { - println!("Invalid data provided for , must be 'negotiated' or 'one_sided'\n"); + println!("Invalid data provided for , must be 'interactive' or 'one-sided'\n"); return Err(ParseError::Invalid( - "Invalid data provided for , must be 'negotiated' or 'one_sided'".to_string(), + "Invalid data provided for , must be 'interactive' or 'one-sided'".to_string(), )); }, }; @@ -531,7 +529,7 @@ mod test { Err(e) => match e { ParseError::Invalid(e) => assert_eq!( e, - "Invalid data provided for , must be 'negotiated' or 'one_sided'".to_string() + "Invalid data provided for , must be 'interactive' or 'one-sided'".to_string() ), _ => panic!("Expected parsing to return an error here"), }, diff --git a/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs b/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs index ca594087b6..3748a557f5 100644 --- a/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs +++ b/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs @@ -315,7 +315,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { let resp = match client.find_chain_split(request).await { Ok(r) => r, - Err(RpcError::RequestFailed(err)) if err.status_code().is_not_found() => { + Err(RpcError::RequestFailed(err)) if err.as_status_code().is_not_found() => { // This round we sent less hashes than the max, so the next round will not have any more hashes to // send. Exit early in this case. if block_hashes.len() < NUM_CHAIN_SPLIT_HEADERS { diff --git a/base_layer/core/src/base_node/sync/rpc/tests.rs b/base_layer/core/src/base_node/sync/rpc/tests.rs index 52fdfd0285..055dcf2203 100644 --- a/base_layer/core/src/base_node/sync/rpc/tests.rs +++ b/base_layer/core/src/base_node/sync/rpc/tests.rs @@ -62,7 +62,7 @@ mod sync_blocks { }; let req = rpc_request_mock.request_with_context(Default::default(), msg); let err = service.sync_blocks(req).await.unwrap_err(); - unpack_enum!(RpcStatusCode::NotFound = err.status_code()); + unpack_enum!(RpcStatusCode::NotFound = err.as_status_code()); } #[tokio::test] diff --git a/base_layer/core/src/mempool/rpc/test.rs b/base_layer/core/src/mempool/rpc/test.rs index a9cbb2ee49..3f11646463 100644 --- a/base_layer/core/src/mempool/rpc/test.rs +++ b/base_layer/core/src/mempool/rpc/test.rs @@ -124,7 +124,7 @@ mod get_tx_state_by_excess_sig { .await .unwrap_err(); - unpack_enum!(RpcStatusCode::BadRequest = status.status_code()); + unpack_enum!(RpcStatusCode::BadRequest = status.as_status_code()); } } @@ -174,6 +174,6 @@ mod submit_transaction { .await .unwrap_err(); - unpack_enum!(RpcStatusCode::BadRequest = status.status_code()); + unpack_enum!(RpcStatusCode::BadRequest = status.as_status_code()); } } diff --git a/base_layer/wallet/src/output_manager_service/tasks/txo_validation_task.rs b/base_layer/wallet/src/output_manager_service/tasks/txo_validation_task.rs index c4680cfe45..363530f6fd 100644 --- a/base_layer/wallet/src/output_manager_service/tasks/txo_validation_task.rs +++ b/base_layer/wallet/src/output_manager_service/tasks/txo_validation_task.rs @@ -34,7 +34,7 @@ use crate::{ use log::*; use std::{collections::HashMap, convert::TryInto, sync::Arc}; use tari_common_types::types::BlockHash; -use tari_comms::protocol::rpc::{RpcError::RequestFailed, RpcStatusCode::NotFound}; +use tari_comms::protocol::rpc::RpcError::RequestFailed; use tari_core::{ base_node::rpc::BaseNodeWalletRpcClient, blocks::BlockHeader, @@ -353,7 +353,7 @@ where info!(target: LOG_TARGET, "Error asking base node for header:{}", rpc_error); match &rpc_error { RequestFailed(status) => { - if status.status_code() == NotFound { + if status.as_status_code().is_not_found() { return Ok(None); } else { return Err(rpc_error.into()); diff --git a/base_layer/wallet/src/transaction_service/config.rs b/base_layer/wallet/src/transaction_service/config.rs index 42e6f6478b..85ebedb71f 100644 --- a/base_layer/wallet/src/transaction_service/config.rs +++ b/base_layer/wallet/src/transaction_service/config.rs @@ -50,7 +50,7 @@ impl Default for TransactionServiceConfig { direct_send_timeout: Duration::from_secs(20), broadcast_send_timeout: Duration::from_secs(60), low_power_polling_timeout: Duration::from_secs(300), - transaction_resend_period: Duration::from_secs(3600), + transaction_resend_period: Duration::from_secs(600), resend_response_cooldown: Duration::from_secs(300), pending_transaction_cancellation_timeout: Duration::from_secs(259200), // 3 Days num_confirmations_required: 3, diff --git a/base_layer/wallet/src/transaction_service/protocols/transaction_validation_protocol.rs b/base_layer/wallet/src/transaction_service/protocols/transaction_validation_protocol.rs index eff2c9f0cc..639a246e7a 100644 --- a/base_layer/wallet/src/transaction_service/protocols/transaction_validation_protocol.rs +++ b/base_layer/wallet/src/transaction_service/protocols/transaction_validation_protocol.rs @@ -327,7 +327,7 @@ where warn!(target: LOG_TARGET, "Error asking base node for header:{}", rpc_error); match &rpc_error { RequestFailed(status) => { - if status.status_code() == NotFound { + if status.as_status_code() == NotFound { return Ok(None); } else { return Err(rpc_error.into()); diff --git a/base_layer/wallet/src/utxo_scanner_service/utxo_scanning.rs b/base_layer/wallet/src/utxo_scanner_service/utxo_scanning.rs index bd3796479d..e6fb3f1011 100644 --- a/base_layer/wallet/src/utxo_scanner_service/utxo_scanning.rs +++ b/base_layer/wallet/src/utxo_scanner_service/utxo_scanning.rs @@ -354,7 +354,7 @@ where TBackend: WalletBackend + 'static // this returns the index of the vec of hashes we sent it, that is the last hash it knows of. match client.find_chain_split(request).await { Ok(_) => Ok(metadata.utxo_index + 1), - Err(RpcError::RequestFailed(err)) if err.status_code().is_not_found() => { + Err(RpcError::RequestFailed(err)) if err.as_status_code().is_not_found() => { warn!(target: LOG_TARGET, "Reorg detected: {}", err); // The node does not know of the last hash we scanned, thus we had a chain split. // We now start at 0 again. diff --git a/comms/dht/examples/memory_net/utilities.rs b/comms/dht/examples/memory_net/utilities.rs index 9cf6340ebc..c42e433272 100644 --- a/comms/dht/examples/memory_net/utilities.rs +++ b/comms/dht/examples/memory_net/utilities.rs @@ -245,7 +245,7 @@ pub async fn network_connectivity_stats(nodes: &[TestNode], wallets: &[TestNode] total += t; avg += a; println!( - "{} total connections on the network. ({} per node on average)", + "{} total connections on the network. ({} per peer on average)", total, avg / (wallets.len() + nodes.len()) ); diff --git a/comms/dht/src/rpc/test.rs b/comms/dht/src/rpc/test.rs index cd70d65a0f..ef250217a2 100644 --- a/comms/dht/src/rpc/test.rs +++ b/comms/dht/src/rpc/test.rs @@ -159,7 +159,7 @@ mod get_closer_peers { let node_id = NodeId::default(); let req = mock.request_with_context(node_id, req); let err = service.get_closer_peers(req).await.unwrap_err(); - assert_eq!(err.status_code(), RpcStatusCode::BadRequest); + assert_eq!(err.as_status_code(), RpcStatusCode::BadRequest); } } diff --git a/comms/rpc_macros/tests/macro.rs b/comms/rpc_macros/tests/macro.rs index 71f0b7053f..15d11f5876 100644 --- a/comms/rpc_macros/tests/macro.rs +++ b/comms/rpc_macros/tests/macro.rs @@ -147,7 +147,7 @@ async fn it_returns_an_error_for_invalid_method_nums() { .await .unwrap_err(); - unpack_enum!(RpcStatusCode::UnsupportedMethod = err.status_code()); + unpack_enum!(RpcStatusCode::UnsupportedMethod = err.as_status_code()); } #[tokio::test] diff --git a/comms/src/connection_manager/dialer.rs b/comms/src/connection_manager/dialer.rs index f3e04a9eeb..147ffb9299 100644 --- a/comms/src/connection_manager/dialer.rs +++ b/comms/src/connection_manager/dialer.rs @@ -27,6 +27,7 @@ use crate::{ common, dial_state::DialState, manager::{ConnectionManagerConfig, ConnectionManagerEvent}, + metrics, peer_connection, }, multiaddr::Multiaddr, @@ -193,6 +194,7 @@ where dial_result: Result, ) { let node_id = dial_state.peer().node_id.clone(); + metrics::pending_connections(Some(&node_id), ConnectionDirection::Outbound).inc(); let removed = self.cancel_signals.remove(&node_id); drop(removed); @@ -213,6 +215,8 @@ where }, } + metrics::pending_connections(Some(&node_id), ConnectionDirection::Outbound).dec(); + if self.pending_dial_requests.contains_key(&node_id) { self.reply_to_pending_requests(&node_id, dial_result.clone()); } diff --git a/comms/src/connection_manager/listener.rs b/comms/src/connection_manager/listener.rs index c79e192795..2e96471e75 100644 --- a/comms/src/connection_manager/listener.rs +++ b/comms/src/connection_manager/listener.rs @@ -32,6 +32,7 @@ use crate::{ bounded_executor::BoundedExecutor, connection_manager::{ liveness::LivenessSession, + metrics, wire_mode::{WireMode, LIVENESS_WIRE_MODE}, }, multiaddr::Multiaddr, @@ -239,6 +240,7 @@ where let span = span!(Level::TRACE, "connection_mann::listener::inbound_task",); let inbound_fut = async move { + metrics::pending_connections(None, ConnectionDirection::Inbound).inc(); match Self::read_wire_format(&mut socket, config.time_to_first_byte).await { Ok(WireMode::Comms(byte)) if byte == config.network_info.network_byte => { let this_node_id_str = node_identity.node_id().short_str(); @@ -325,6 +327,8 @@ where ); }, } + + metrics::pending_connections(None, ConnectionDirection::Inbound).dec(); } .instrument(span); diff --git a/comms/src/connection_manager/manager.rs b/comms/src/connection_manager/manager.rs index d218a883a3..b1ad6d4c96 100644 --- a/comms/src/connection_manager/manager.rs +++ b/comms/src/connection_manager/manager.rs @@ -29,6 +29,7 @@ use super::{ }; use crate::{ backoff::Backoff, + connection_manager::{metrics, ConnectionDirection}, multiplexing::Substream, noise::NoiseConfig, peer_manager::{NodeId, NodeIdentity}, @@ -397,10 +398,14 @@ where node_id.short_str(), proto_str ); + metrics::inbound_substream_counter(&node_id, &protocol).inc(); let notify_fut = self .protocols .notify(&protocol, ProtocolEvent::NewInboundSubstream(node_id, stream)); match time::timeout(Duration::from_secs(10), notify_fut).await { + Ok(Ok(_)) => { + debug!(target: LOG_TARGET, "Protocol notification for '{}' sent", proto_str); + }, Ok(Err(err)) => { error!( target: LOG_TARGET, @@ -413,12 +418,21 @@ where "Error sending NewSubstream notification for protocol '{}' because {}", proto_str, err ); }, - _ => { - debug!(target: LOG_TARGET, "Protocol notification for '{}' sent", proto_str); - }, } }, + PeerConnected(conn) => { + metrics::successful_connections(conn.peer_node_id(), conn.direction()).inc(); + self.publish_event(PeerConnected(conn)); + }, + PeerConnectFailed(peer, err) => { + metrics::failed_connections(&peer, ConnectionDirection::Outbound).inc(); + self.publish_event(PeerConnectFailed(peer, err)); + }, + PeerInboundConnectFailed(err) => { + metrics::failed_connections(&Default::default(), ConnectionDirection::Inbound).inc(); + self.publish_event(PeerInboundConnectFailed(err)); + }, event => { self.publish_event(event); }, diff --git a/comms/src/connection_manager/metrics.rs b/comms/src/connection_manager/metrics.rs new file mode 100644 index 0000000000..b7ac2b857b --- /dev/null +++ b/comms/src/connection_manager/metrics.rs @@ -0,0 +1,82 @@ +// Copyright 2021, The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use crate::{connection_manager::ConnectionDirection, peer_manager::NodeId, protocol::ProtocolId}; +use once_cell::sync::Lazy; +use tari_metrics::{IntCounter, IntCounterVec, IntGauge, IntGaugeVec}; + +pub fn pending_connections(peer: Option<&NodeId>, direction: ConnectionDirection) -> IntGauge { + static METER: Lazy = Lazy::new(|| { + tari_metrics::register_int_gauge_vec( + "comms::connections::pending", + "Number of active connections by direction", + &["peer_id", "direction"], + ) + .unwrap() + }); + + METER.with_label_values(&[ + peer.map(ToString::to_string) + .unwrap_or_else(|| "unknown".to_string()) + .as_str(), + direction.as_str(), + ]) +} + +pub fn successful_connections(peer: &NodeId, direction: ConnectionDirection) -> IntCounter { + static METER: Lazy = Lazy::new(|| { + tari_metrics::register_int_counter_vec( + "comms::connections::success", + "Number of active connections by direction", + &["peer_id", "direction"], + ) + .unwrap() + }); + + METER.with_label_values(&[peer.to_string().as_str(), direction.as_str()]) +} + +pub fn failed_connections(peer: &NodeId, direction: ConnectionDirection) -> IntCounter { + static METER: Lazy = Lazy::new(|| { + tari_metrics::register_int_counter_vec( + "comms::connections::failed", + "Number of active connections by direction", + &["peer_id", "direction"], + ) + .unwrap() + }); + + METER.with_label_values(&[peer.to_string().as_str(), direction.as_str()]) +} + +pub fn inbound_substream_counter(peer: &NodeId, protocol: &ProtocolId) -> IntCounter { + static METER: Lazy = Lazy::new(|| { + tari_metrics::register_int_counter_vec( + "comms::connections::inbound_substream_request_count", + "Number of substream requests", + &["peer_id", "protocol"], + ) + .unwrap() + }); + + METER.with_label_values(&[peer.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()]) +} diff --git a/comms/src/connection_manager/mod.rs b/comms/src/connection_manager/mod.rs index 92ef4e18f1..fd0f7a3312 100644 --- a/comms/src/connection_manager/mod.rs +++ b/comms/src/connection_manager/mod.rs @@ -23,6 +23,7 @@ mod dial_state; mod dialer; mod listener; +mod metrics; mod common; pub use common::validate_peer_addresses; diff --git a/comms/src/connectivity/manager.rs b/comms/src/connectivity/manager.rs index d8043050e7..ad75db44b0 100644 --- a/comms/src/connectivity/manager.rs +++ b/comms/src/connectivity/manager.rs @@ -510,10 +510,7 @@ impl ConnectivityManagerActor { _ => {}, } }, - #[cfg(feature = "metrics")] - NewInboundSubstream(node_id, protocol, _) => { - super::metrics::substream_request_count(node_id, protocol).inc(); - }, + _ => {}, } diff --git a/comms/src/connectivity/metrics.rs b/comms/src/connectivity/metrics.rs index 56bd8f781d..1470b4c9c7 100644 --- a/comms/src/connectivity/metrics.rs +++ b/comms/src/connectivity/metrics.rs @@ -20,32 +20,23 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use crate::{connection_manager::ConnectionDirection, peer_manager::NodeId, protocol::ProtocolId}; +use crate::connection_manager::ConnectionDirection; use once_cell::sync::Lazy; use tari_metrics::{IntGauge, IntGaugeVec}; pub fn connections(direction: ConnectionDirection) -> IntGauge { static METER: Lazy = Lazy::new(|| { - tari_metrics::register_int_gauge_vec("comms::connections", "Number of active connections by direction", &[ - "direction", - ]) + tari_metrics::register_int_gauge_vec( + "comms::connectivity::num_connections", + "Number of active connections by direction", + &["direction"], + ) .unwrap() }); METER.with_label_values(&[direction.as_str()]) } -pub fn substream_request_count(peer: &NodeId, protocol: &ProtocolId) -> IntGauge { - static METER: Lazy = Lazy::new(|| { - tari_metrics::register_int_gauge_vec("comms::substream_request_count", "Number of substream requests", &[ - "peer", "protocol", - ]) - .unwrap() - }); - - METER.with_label_values(&[peer.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()]) -} - pub fn uptime() -> IntGauge { static METER: Lazy = Lazy::new(|| tari_metrics::register_int_gauge("comms::uptime", "Comms uptime").unwrap()); diff --git a/comms/src/multiplexing/metrics.rs b/comms/src/multiplexing/metrics.rs new file mode 100644 index 0000000000..823501aa57 --- /dev/null +++ b/comms/src/multiplexing/metrics.rs @@ -0,0 +1,32 @@ +// Copyright 2021, The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use once_cell::sync::Lazy; +use tari_metrics::IntCounter; + +pub static TOTAL_BYTES_READ: Lazy = Lazy::new(|| { + tari_metrics::register_int_counter("comms::substream::total_bytes_read", "The total inbound bytes").unwrap() +}); + +pub static TOTAL_BYTES_WRITTEN: Lazy = Lazy::new(|| { + tari_metrics::register_int_counter("comms::substream::total_bytes_written", "The total outbound bytes").unwrap() +}); diff --git a/comms/src/multiplexing/mod.rs b/comms/src/multiplexing/mod.rs index 8ac082d5c4..1732ea142b 100644 --- a/comms/src/multiplexing/mod.rs +++ b/comms/src/multiplexing/mod.rs @@ -20,5 +20,8 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +#[cfg(feature = "metrics")] +mod metrics; + mod yamux; pub use self::yamux::{ConnectionError, Control, IncomingSubstreams, Substream, Yamux}; diff --git a/comms/src/multiplexing/yamux.rs b/comms/src/multiplexing/yamux.rs index 811f485390..a9cf697c31 100644 --- a/comms/src/multiplexing/yamux.rs +++ b/comms/src/multiplexing/yamux.rs @@ -220,12 +220,21 @@ impl StreamId for Substream { impl tokio::io::AsyncRead for Substream { fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> { - Pin::new(&mut self.stream).poll_read(cx, buf) + match Pin::new(&mut self.stream).poll_read(cx, buf) { + Poll::Ready(Ok(())) => { + #[cfg(feature = "metrics")] + super::metrics::TOTAL_BYTES_READ.inc_by(buf.filled().len() as u64); + Poll::Ready(Ok(())) + }, + res => res, + } } } impl tokio::io::AsyncWrite for Substream { fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { + #[cfg(feature = "metrics")] + super::metrics::TOTAL_BYTES_WRITTEN.inc_by(buf.len() as u64); Pin::new(&mut self.stream).poll_write(cx, buf) } diff --git a/comms/src/protocol/messaging/inbound.rs b/comms/src/protocol/messaging/inbound.rs index e65f81f8ec..9445445629 100644 --- a/comms/src/protocol/messaging/inbound.rs +++ b/comms/src/protocol/messaging/inbound.rs @@ -20,12 +20,8 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use crate::{ - message::InboundMessage, - peer_manager::NodeId, - protocol::messaging::{MessagingEvent, MessagingProtocol}, - rate_limit::RateLimit, -}; +use super::{metrics, MessagingEvent, MessagingProtocol}; +use crate::{message::InboundMessage, peer_manager::NodeId, rate_limit::RateLimit}; use futures::{future::Either, StreamExt}; use log::*; use std::{sync::Arc, time::Duration}; @@ -67,6 +63,7 @@ impl InboundMessaging { pub async fn run(self, socket: S) where S: AsyncRead + AsyncWrite + Unpin { let peer = &self.peer; + metrics::num_sessions().inc(); debug!( target: LOG_TARGET, "Starting inbound messaging protocol for peer '{}'", @@ -82,9 +79,11 @@ impl InboundMessaging { }; tokio::pin!(stream); + let inbound_count = metrics::inbound_message_count(&self.peer); while let Some(result) = stream.next().await { match result { Ok(Ok(raw_msg)) => { + inbound_count.inc(); let msg_len = raw_msg.len(); let inbound_msg = InboundMessage::new(peer.clone(), raw_msg.freeze()); debug!( @@ -112,6 +111,7 @@ impl InboundMessaging { let _ = self.messaging_events_tx.send(Arc::new(event)); }, Ok(Err(err)) => { + metrics::error_count(peer).inc(); error!( target: LOG_TARGET, "Failed to receive from peer '{}' because '{}'", @@ -122,6 +122,7 @@ impl InboundMessaging { }, Err(_) => { + metrics::error_count(peer).inc(); debug!( target: LOG_TARGET, "Inbound messaging for peer '{}' has stopped because it was inactive for {:.0?}", @@ -134,6 +135,7 @@ impl InboundMessaging { } } + metrics::num_sessions().dec(); debug!( target: LOG_TARGET, "Inbound messaging handler exited for peer `{}`", diff --git a/comms/src/protocol/messaging/metrics.rs b/comms/src/protocol/messaging/metrics.rs new file mode 100644 index 0000000000..e43fb4a7d6 --- /dev/null +++ b/comms/src/protocol/messaging/metrics.rs @@ -0,0 +1,74 @@ +// Copyright 2021, The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use crate::peer_manager::NodeId; +use once_cell::sync::Lazy; +use tari_metrics::{IntCounter, IntCounterVec, IntGauge}; + +pub fn num_sessions() -> IntGauge { + static METER: Lazy = Lazy::new(|| { + tari_metrics::register_int_gauge( + "comms::messaging::num_sessions", + "The number of active messaging sessions", + ) + .unwrap() + }); + + METER.clone() +} + +pub fn outbound_message_count(peer: &NodeId) -> IntCounter { + static METER: Lazy = Lazy::new(|| { + tari_metrics::register_int_counter_vec( + "comms::messaging::outbound_message_count", + "The number of handshakes per peer", + &["peer_id"], + ) + .unwrap() + }); + + METER.with_label_values(&[peer.to_string().as_str()]) +} + +pub fn inbound_message_count(peer: &NodeId) -> IntCounter { + static METER: Lazy = Lazy::new(|| { + tari_metrics::register_int_counter_vec( + "comms::messaging::inbound_message_count", + "The number of handshakes per peer", + &["peer_id"], + ) + .unwrap() + }); + + METER.with_label_values(&[peer.to_string().as_str()]) +} + +pub fn error_count(peer: &NodeId) -> IntCounter { + static METER: Lazy = Lazy::new(|| { + tari_metrics::register_int_counter_vec("comms::messaging::errors", "The number of errors per peer", &[ + "peer_id", + ]) + .unwrap() + }); + + METER.with_label_values(&[peer.to_string().as_str()]) +} diff --git a/comms/src/protocol/messaging/mod.rs b/comms/src/protocol/messaging/mod.rs index 1fa347b3a2..7fd5703e50 100644 --- a/comms/src/protocol/messaging/mod.rs +++ b/comms/src/protocol/messaging/mod.rs @@ -29,6 +29,7 @@ pub use extension::MessagingProtocolExtension; mod error; mod forward; mod inbound; +mod metrics; mod outbound; mod protocol; pub use protocol::{ diff --git a/comms/src/protocol/messaging/outbound.rs b/comms/src/protocol/messaging/outbound.rs index a2c4288b4e..98fc30a9e2 100644 --- a/comms/src/protocol/messaging/outbound.rs +++ b/comms/src/protocol/messaging/outbound.rs @@ -20,7 +20,7 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use super::{error::MessagingProtocolError, MessagingEvent, MessagingProtocol, SendFailReason}; +use super::{error::MessagingProtocolError, metrics, MessagingEvent, MessagingProtocol, SendFailReason}; use crate::{ connection_manager::{NegotiatedSubstream, PeerConnection}, connectivity::{ConnectivityError, ConnectivityRequester}, @@ -71,6 +71,7 @@ impl OutboundMessaging { "comms::messaging::outbound", node_id = self.peer_node_id.to_string().as_str() ); + metrics::num_sessions().inc(); async move { debug!( target: LOG_TARGET, @@ -112,6 +113,7 @@ impl OutboundMessaging { ); }, Err(err) => { + metrics::error_count(&peer_node_id).inc(); error!( target: LOG_TARGET, "Outbound messaging protocol failed for peer {}: {}", peer_node_id, err @@ -119,6 +121,7 @@ impl OutboundMessaging { }, } + metrics::num_sessions().dec(); let _ = messaging_events_tx .send(MessagingEvent::OutboundProtocolExited(peer_node_id)) .await; @@ -291,7 +294,9 @@ impl OutboundMessaging { None => Either::Right(stream.map(Ok)), }; + let outbound_count = metrics::outbound_message_count(&self.peer_node_id); let stream = stream.map(|msg| { + outbound_count.inc(); msg.map(|mut out_msg| { event!(Level::DEBUG, "Message buffered for sending {}", out_msg); out_msg.reply_success(); diff --git a/comms/src/protocol/rpc/client/metrics.rs b/comms/src/protocol/rpc/client/metrics.rs index ee77a6b91f..98f99e4492 100644 --- a/comms/src/protocol/rpc/client/metrics.rs +++ b/comms/src/protocol/rpc/client/metrics.rs @@ -22,69 +22,108 @@ use crate::{peer_manager::NodeId, protocol::ProtocolId}; use once_cell::sync::Lazy; -use tari_metrics::{Histogram, HistogramVec, IntGauge, IntGaugeVec}; +use tari_metrics::{Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec}; -pub fn sessions_counter(node_id: &NodeId, protocol: &ProtocolId) -> IntGauge { +pub fn num_sessions(peer: &NodeId, protocol: &ProtocolId) -> IntGauge { static METER: Lazy = Lazy::new(|| { tari_metrics::register_int_gauge_vec( "comms::rpc::client::num_sessions", - "The number of active clients per node per protocol", - &["peer", "protocol"], + "The number of active clients per peer per protocol", + &["peer_id", "protocol"], ) .unwrap() }); - METER.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()]) + METER.with_label_values(&[peer.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()]) } -pub fn handshake_errors(node_id: &NodeId, protocol: &ProtocolId) -> IntGauge { - static METER: Lazy = Lazy::new(|| { - tari_metrics::register_int_gauge_vec( +pub fn handshake_counter(peer: &NodeId, protocol: &ProtocolId) -> IntCounter { + static METER: Lazy = Lazy::new(|| { + tari_metrics::register_int_counter_vec( + "comms::rpc::client::handshake_count", + "The number of handshakes per peer per protocol", + &["peer_id", "protocol"], + ) + .unwrap() + }); + + METER.with_label_values(&[peer.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()]) +} + +pub fn handshake_errors(peer: &NodeId, protocol: &ProtocolId) -> IntCounter { + static METER: Lazy = Lazy::new(|| { + tari_metrics::register_int_counter_vec( "comms::rpc::client::handshake_errors", - "The number of handshake errors per node per protocol", - &["peer", "protocol"], + "The number of handshake errors per peer per protocol", + &["peer_id", "protocol"], + ) + .unwrap() + }); + + METER.with_label_values(&[peer.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()]) +} + +pub fn client_errors(peer: &NodeId, protocol: &ProtocolId) -> IntCounter { + static METER: Lazy = Lazy::new(|| { + tari_metrics::register_int_counter_vec( + "comms::rpc::client::error_count", + "The number of client errors per peer per protocol", + &["peer_id", "protocol"], + ) + .unwrap() + }); + + METER.with_label_values(&[peer.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()]) +} + +pub fn client_timeouts(peer: &NodeId, protocol: &ProtocolId) -> IntCounter { + static METER: Lazy = Lazy::new(|| { + tari_metrics::register_int_counter_vec( + "comms::rpc::client::error_timeouts", + "The number of client timeouts per peer per protocol", + &["peer_id", "protocol"], ) .unwrap() }); - METER.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()]) + METER.with_label_values(&[peer.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()]) } -pub fn request_response_latency(node_id: &NodeId, protocol: &ProtocolId) -> Histogram { +pub fn request_response_latency(peer: &NodeId, protocol: &ProtocolId) -> Histogram { static METER: Lazy = Lazy::new(|| { tari_metrics::register_histogram_vec( "comms::rpc::client::request_response_latency", "A histogram of request to first response latency", - &["peer", "protocol"], + &["peer_id", "protocol"], ) .unwrap() }); - METER.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()]) + METER.with_label_values(&[peer.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()]) } -pub fn outbound_request_bytes(node_id: &NodeId, protocol: &ProtocolId) -> Histogram { +pub fn outbound_request_bytes(peer: &NodeId, protocol: &ProtocolId) -> Histogram { static METER: Lazy = Lazy::new(|| { tari_metrics::register_histogram_vec( "comms::rpc::client::outbound_request_bytes", - "Avg. request bytes per node per protocol", - &["peer", "protocol"], + "Avg. request bytes per peer per protocol", + &["peer_id", "protocol"], ) .unwrap() }); - METER.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()]) + METER.with_label_values(&[peer.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()]) } -pub fn inbound_response_bytes(node_id: &NodeId, protocol: &ProtocolId) -> Histogram { +pub fn inbound_response_bytes(peer: &NodeId, protocol: &ProtocolId) -> Histogram { static METER: Lazy = Lazy::new(|| { tari_metrics::register_histogram_vec( "comms::rpc::client::inbound_response_bytes", "Avg. response bytes per peer per protocol", - &["peer", "protocol"], + &["peer_id", "protocol"], ) .unwrap() }); - METER.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()]) + METER.with_label_values(&[peer.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()]) } diff --git a/comms/src/protocol/rpc/client/mod.rs b/comms/src/protocol/rpc/client/mod.rs index 9bec5975ab..b69caaa1db 100644 --- a/comms/src/protocol/rpc/client/mod.rs +++ b/comms/src/protocol/rpc/client/mod.rs @@ -462,8 +462,10 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId if let Some(r) = self.ready_tx.take() { let _ = r.send(Ok(())); } + metrics::handshake_counter(&self.node_id, &self.protocol_id).inc(); }, Err(err) => { + metrics::handshake_errors(&self.node_id, &self.protocol_id).inc(); if let Some(r) = self.ready_tx.take() { let _ = r.send(Err(err.into())); } @@ -472,8 +474,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId }, } - let session_counter = metrics::sessions_counter(&self.node_id, &self.protocol_id); - session_counter.inc(); + metrics::num_sessions(&self.node_id, &self.protocol_id).inc(); loop { tokio::select! { biased; @@ -484,6 +485,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId match req { Some(req) => { if let Err(err) = self.handle_request(req).await { + metrics::client_errors(&self.node_id, &self.protocol_id).inc(); error!(target: LOG_TARGET, "(stream={}) Unexpected error: {}. Worker is terminating.", self.stream_id(), err); break; } @@ -493,21 +495,23 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId } } } - session_counter.dec(); + metrics::num_sessions(&self.node_id, &self.protocol_id).dec(); if let Err(err) = self.framed.close().await { debug!( target: LOG_TARGET, - "(stream={}) IO Error when closing substream: {}", + "(stream: {}, peer: {}) IO Error when closing substream: {}", self.stream_id(), + self.node_id, err ); } debug!( target: LOG_TARGET, - "(stream={}) RpcClientWorker ({}) terminated.", + "(stream: {}, peer: {}) RpcClientWorker ({}) terminated.", self.stream_id(), + self.node_id, self.protocol_name() ); } @@ -552,6 +556,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId self.stream_id(), start.elapsed() ); + metrics::client_timeouts(&self.node_id, &self.protocol_id).inc(); let _ = reply.send(Err(RpcStatus::timed_out("Response timed out"))); return Ok(()); }, @@ -583,7 +588,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId Ok(()) } - #[tracing::instrument(name = "rpc_do_request_response", skip(self, reply, request), fields(request_method = ?request.method, request_size = request.message.len()))] + #[tracing::instrument(name = "rpc_do_request_response", skip(self, reply, request), fields(request_method = ?request.method, request_body_size = request.message.len()))] async fn do_request_response( &mut self, request: BaseRequest, @@ -629,6 +634,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId let mut metrics_timer = Some(latency.start_timer()); if let Err(err) = self.send_request(req).await { warn!(target: LOG_TARGET, "{}", err); + metrics::client_errors(&self.node_id, &self.protocol_id).inc(); let _ = response_tx.send(Err(err.into())); return Ok(()); } @@ -637,7 +643,9 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId if self.shutdown_signal.is_triggered() { debug!( target: LOG_TARGET, - "[{}, stream_id: {}, req_id: {}] Client connector closed. Quitting stream early", + "[peer: {}, protocol: {}, stream_id: {}, req_id: {}] Client connector closed. Quitting stream \ + early", + self.node_id, self.protocol_name(), self.stream_id(), request_id @@ -671,7 +679,17 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId "Request {} (method={}) timed out", request_id, method, ); event!(Level::ERROR, "Response timed out"); - if !response_tx.is_closed() { + metrics::client_timeouts(&self.node_id, &self.protocol_id).inc(); + if response_tx.is_closed() { + let req = proto::rpc::RpcRequest { + request_id: request_id as u32, + method, + flags: RpcMessageFlags::FIN.bits().into(), + ..Default::default() + }; + + self.send_request(req).await?; + } else { let _ = response_tx.send(Err(RpcStatus::timed_out("Response timed out"))).await; } break; @@ -713,9 +731,8 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId let req = proto::rpc::RpcRequest { request_id: request_id as u32, method, - deadline: self.config.deadline.map(|t| t.as_secs()).unwrap_or(0), flags: RpcMessageFlags::FIN.bits().into(), - payload: vec![], + ..Default::default() }; self.send_request(req).await?; @@ -773,7 +790,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId break resp; }, Err(RpcError::ResponseIdDidNotMatchRequest { actual, expected }) - if actual.saturating_add(1) == request_id => + if actual.wrapping_add(1) == request_id => { warn!( target: LOG_TARGET, @@ -783,7 +800,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId // Be lenient for a number of messages that may have been buffered to come through for the previous // request. - const MAX_ALLOWED_IGNORED: usize = 5; + const MAX_ALLOWED_IGNORED: usize = 20; if num_ignored > MAX_ALLOWED_IGNORED { return Err(RpcError::ResponseIdDidNotMatchRequest { actual, expected }); } @@ -798,7 +815,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId fn next_request_id(&mut self) -> u16 { let next_id = self.next_request_id; // request_id is allowed to wrap around back to 0 - self.next_request_id = self.next_request_id.checked_add(1).unwrap_or(0); + self.next_request_id = self.next_request_id.wrapping_add(1); next_id } diff --git a/comms/src/protocol/rpc/server/error.rs b/comms/src/protocol/rpc/server/error.rs index f5cc145de2..7259e987fb 100644 --- a/comms/src/protocol/rpc/server/error.rs +++ b/comms/src/protocol/rpc/server/error.rs @@ -20,7 +20,7 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use crate::protocol::rpc::handshake::RpcHandshakeError; +use crate::{proto, protocol::rpc::handshake::RpcHandshakeError}; use prost::DecodeError; use std::io; use tokio::sync::oneshot; @@ -42,7 +42,15 @@ pub enum RpcServerError { #[error("Service not found for protocol `{0}`")] ProtocolServiceNotFound(String), #[error("Unexpected incoming message")] - UnexpectedIncomingMessage, + UnexpectedIncomingMessage(proto::rpc::RpcRequest), + #[error("Unexpected incoming MALFORMED message")] + UnexpectedIncomingMessageMalformed, + #[error("Client interrupted stream")] + ClientInterruptedStream, + #[error("Service call exceeded deadline")] + ServiceCallExceededDeadline, + #[error("Stream read exceeded deadline")] + ReadStreamExceededDeadline, } impl From for RpcServerError { @@ -50,3 +58,21 @@ impl From for RpcServerError { RpcServerError::RequestCanceled } } + +impl RpcServerError { + pub fn to_debug_string(&self) -> String { + use RpcServerError::*; + match self { + DecodeError(_) => "DecodeError".to_string(), + Io(err) => { + format!("Io({:?})", err.kind()) + }, + HandshakeError(_) => "HandshakeError".to_string(), + ProtocolServiceNotFound(_) => "ProtocolServiceNotFound".to_string(), + UnexpectedIncomingMessage(_) => "UnexpectedIncomingMessage".to_string(), + err => { + format!("{:?}", err) + }, + } + } +} diff --git a/comms/src/protocol/rpc/server/metrics.rs b/comms/src/protocol/rpc/server/metrics.rs index cb0c1d7d2a..ed11db7e9e 100644 --- a/comms/src/protocol/rpc/server/metrics.rs +++ b/comms/src/protocol/rpc/server/metrics.rs @@ -20,16 +20,22 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use crate::{peer_manager::NodeId, protocol::ProtocolId}; +use crate::{ + peer_manager::NodeId, + protocol::{ + rpc::{RpcServerError, RpcStatusCode}, + ProtocolId, + }, +}; use once_cell::sync::Lazy; use tari_metrics::{Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec}; -pub fn sessions_counter(node_id: &NodeId, protocol: &ProtocolId) -> IntGauge { +pub fn num_sessions(node_id: &NodeId, protocol: &ProtocolId) -> IntGauge { static METER: Lazy = Lazy::new(|| { tari_metrics::register_int_gauge_vec( "comms::rpc::server::num_sessions", - "The number of active server sessions per node per protocol", - &["peer", "protocol"], + "The number of active server sessions per peer per protocol", + &["peer_id", "protocol"], ) .unwrap() }); @@ -37,12 +43,12 @@ pub fn sessions_counter(node_id: &NodeId, protocol: &ProtocolId) -> IntGauge { METER.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()]) } -pub fn handshake_error_counter(node_id: &NodeId, protocol: &ProtocolId) -> IntGauge { - static METER: Lazy = Lazy::new(|| { - tari_metrics::register_int_gauge_vec( - "comms::rpc::server::handshake_errors", - "The number of handshake errors per node per protocol", - &["peer", "protocol"], +pub fn handshake_error_counter(node_id: &NodeId, protocol: &ProtocolId) -> IntCounter { + static METER: Lazy = Lazy::new(|| { + tari_metrics::register_int_counter_vec( + "comms::rpc::server::handshake_error_count", + "The number of handshake errors per peer per protocol", + &["peer_id", "protocol"], ) .unwrap() }); @@ -50,25 +56,46 @@ pub fn handshake_error_counter(node_id: &NodeId, protocol: &ProtocolId) -> IntGa METER.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()]) } -pub fn error_counter(node_id: &NodeId, protocol: &ProtocolId) -> IntCounter { +pub fn error_counter(node_id: &NodeId, protocol: &ProtocolId, err: &RpcServerError) -> IntCounter { static METER: Lazy = Lazy::new(|| { tari_metrics::register_int_counter_vec( "comms::rpc::server::error_count", - "The number of RPC errors per node per protocol", - &["peer", "protocol"], + "The number of RPC errors per peer per protocol", + &["peer_id", "protocol", "error"], ) .unwrap() }); - METER.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()]) + METER.with_label_values(&[ + node_id.to_string().as_str(), + String::from_utf8_lossy(protocol).as_ref(), + err.to_debug_string().as_str(), + ]) +} + +pub fn status_error_counter(node_id: &NodeId, protocol: &ProtocolId, status_code: RpcStatusCode) -> IntCounter { + static METER: Lazy = Lazy::new(|| { + tari_metrics::register_int_counter_vec( + "comms::rpc::server::status_error_count", + "The number of RPC errors by status code per peer per protocol", + &["peer_id", "protocol", "status"], + ) + .unwrap() + }); + + METER.with_label_values(&[ + node_id.to_string().as_str(), + String::from_utf8_lossy(protocol).as_ref(), + status_code.to_debug_string().as_str(), + ]) } pub fn inbound_requests_bytes(node_id: &NodeId, protocol: &ProtocolId) -> Histogram { static METER: Lazy = Lazy::new(|| { tari_metrics::register_histogram_vec( "comms::rpc::server::inbound_request_bytes", - "Avg. request bytes per node per protocol", - &["peer", "protocol"], + "Avg. request bytes per peer per protocol", + &["peer_id", "protocol"], ) .unwrap() }); @@ -81,7 +108,7 @@ pub fn outbound_response_bytes(node_id: &NodeId, protocol: &ProtocolId) -> Histo tari_metrics::register_histogram_vec( "comms::rpc::server::outbound_response_bytes", "Avg. response bytes per peer per protocol", - &["peer", "protocol"], + &["peer_id", "protocol"], ) .unwrap() }); diff --git a/comms/src/protocol/rpc/server/mod.rs b/comms/src/protocol/rpc/server/mod.rs index 31fa62ee5a..ed02b82149 100644 --- a/comms/src/protocol/rpc/server/mod.rs +++ b/comms/src/protocol/rpc/server/mod.rs @@ -70,12 +70,12 @@ use prost::Message; use std::{ borrow::Cow, future::Future, + io, pin::Pin, sync::Arc, task::Poll, time::{Duration, Instant}, }; -use tari_metrics::IntCounter; use tokio::{sync::mpsc, time}; use tokio_stream::Stream; use tower::Service; @@ -386,10 +386,10 @@ where let node_id = node_id.clone(); self.executor .try_spawn(async move { - let sessions_counter = metrics::sessions_counter(&node_id, &service.protocol); - sessions_counter.inc(); + let num_sessions = metrics::num_sessions(&node_id, &service.protocol); + num_sessions.inc(); service.start().await; - sessions_counter.dec(); + num_sessions.dec(); }) .map_err(|_| RpcServerError::MaximumSessionsReached)?; @@ -405,7 +405,6 @@ struct ActivePeerRpcService { framed: CanonicalFraming, comms_provider: TCommsProvider, logging_context_string: Arc, - error_counter: IntCounter, } impl ActivePeerRpcService @@ -421,7 +420,6 @@ where framed: CanonicalFraming, comms_provider: TCommsProvider, ) -> Self { - let error_counter = metrics::error_counter(&node_id, &protocol); Self { logging_context_string: Arc::new(format!( "stream_id: {}, peer: {}, protocol: {}", @@ -436,7 +434,6 @@ where service, framed, comms_provider, - error_counter, } } @@ -446,7 +443,7 @@ where "({}) Rpc server started.", self.logging_context_string, ); if let Err(err) = self.run().await { - self.error_counter.inc(); + metrics::error_counter(&self.node_id, &self.protocol, &err).inc(); error!( target: LOG_TARGET, "({}) Rpc server exited with an error: {}", self.logging_context_string, err @@ -470,7 +467,13 @@ where err ); } - self.error_counter.inc(); + error!( + target: LOG_TARGET, + "(peer: {}, protocol: {}) Failed to handle request: {}", + self.node_id, + self.protocol_name(), + err + ); return Err(err); } let elapsed = start.elapsed(); @@ -489,7 +492,6 @@ where "({}) Failed to close substream after socket error: {}", self.logging_context_string, err ); } - self.error_counter.inc(); return Err(err.into()); }, } @@ -524,8 +526,8 @@ where flags: RpcMessageFlags::FIN.bits().into(), payload: status.to_details_bytes(), }; + metrics::status_error_counter(&self.node_id, &self.protocol, status.as_status_code()).inc(); self.framed.send(bad_request.to_encoded_bytes().into()).await?; - self.error_counter.inc(); return Ok(()); } @@ -573,13 +575,17 @@ where Err(_) => { warn!( target: LOG_TARGET, - "RPC service was not able to complete within the deadline ({:.0?}). Request aborted for peer '{}' \ - ({}).", + "{} RPC service was not able to complete within the deadline ({:.0?}). Request aborted", + self.logging_context_string, deadline, - self.node_id, - self.protocol_name() ); - self.error_counter.inc(); + + metrics::error_counter( + &self.node_id, + &self.protocol, + &RpcServerError::ServiceCallExceededDeadline, + ) + .inc(); return Ok(()); }, }; @@ -589,12 +595,9 @@ where self.process_body(request_id, deadline, body).await?; }, Err(err) => { - debug!( + error!( target: LOG_TARGET, - "(peer: {}, protocol: {}) Service returned an error: {}", - self.node_id, - self.protocol_name(), - err + "{} Service returned an error: {}", self.logging_context_string, err ); let resp = proto::rpc::RpcResponse { request_id, @@ -603,7 +606,7 @@ where payload: err.to_details_bytes(), }; - self.error_counter.inc(); + metrics::status_error_counter(&self.node_id, &self.protocol, err.as_status_code()).inc(); self.framed.send(resp.to_encoded_bytes().into()).await?; }, } @@ -623,18 +626,33 @@ where ) -> Result<(), RpcServerError> { let response_bytes = metrics::outbound_response_bytes(&self.node_id, &self.protocol); trace!(target: LOG_TARGET, "Service call succeeded"); + + let node_id = self.node_id.clone(); + let protocol = self.protocol.clone(); let mut stream = body .into_message() .map(|result| into_response(request_id, result)) - .flat_map(|message| stream::iter(ChunkedResponseIter::new(message))) + .flat_map(move |message| { + if !message.status.is_ok() { + metrics::status_error_counter(&node_id, &protocol, message.status).inc(); + } + stream::iter(ChunkedResponseIter::new(message)) + }) .map(|resp| Bytes::from(resp.to_encoded_bytes())); loop { // Check if the client interrupted the outgoing stream if let Err(err) = self.check_interruptions().await { - // Debug level because there are many valid reasons to interrupt a stream - debug!(target: LOG_TARGET, "Stream interrupted: {}", err); - break; + match err { + err @ RpcServerError::ClientInterruptedStream => { + debug!(target: LOG_TARGET, "Stream was interrupted: {}", err); + break; + }, + err => { + error!(target: LOG_TARGET, "Stream was interrupted: {}", err); + return Err(err); + }, + } } let next_item = log_timing( @@ -667,7 +685,12 @@ where deadline ); - self.error_counter.inc(); + metrics::error_counter( + &self.node_id, + &self.protocol, + &RpcServerError::ReadStreamExceededDeadline, + ) + .inc(); break; }, } @@ -677,7 +700,22 @@ where async fn check_interruptions(&mut self) -> Result<(), RpcServerError> { let check = future::poll_fn(|cx| match Pin::new(&mut self.framed).poll_next(cx) { - Poll::Ready(Some(Ok(_))) => Poll::Ready(Some(RpcServerError::UnexpectedIncomingMessage)), + Poll::Ready(Some(Ok(mut msg))) => { + let decoded_msg = match proto::rpc::RpcRequest::decode(&mut msg) { + Ok(msg) => msg, + Err(err) => { + error!(target: LOG_TARGET, "Client send MALFORMED response: {}", err); + return Poll::Ready(Some(RpcServerError::UnexpectedIncomingMessageMalformed)); + }, + }; + let msg_flags = RpcMessageFlags::from_bits_truncate(decoded_msg.flags as u8); + if msg_flags.is_fin() { + Poll::Ready(Some(RpcServerError::ClientInterruptedStream)) + } else { + Poll::Ready(Some(RpcServerError::UnexpectedIncomingMessage(decoded_msg))) + } + }, + Poll::Ready(Some(Err(err))) if err.kind() == io::ErrorKind::WouldBlock => Poll::Ready(None), Poll::Ready(Some(Err(err))) => Poll::Ready(Some(RpcServerError::from(err))), Poll::Ready(None) => Poll::Ready(Some(RpcServerError::StreamClosedByRemote)), Poll::Pending => Poll::Ready(None), @@ -722,7 +760,7 @@ fn into_response(request_id: u32, result: Result) -> RpcRe } RpcResponse { request_id, - status: RpcStatus::ok().status_code(), + status: RpcStatus::ok().as_status_code(), flags, payload: msg.into_bytes().unwrap_or_else(Bytes::new), } @@ -731,7 +769,7 @@ fn into_response(request_id: u32, result: Result) -> RpcRe debug!(target: LOG_TARGET, "Body contained an error: {}", err); RpcResponse { request_id, - status: err.status_code(), + status: err.as_status_code(), flags: RpcMessageFlags::FIN, payload: Bytes::from(err.to_details_bytes()), } diff --git a/comms/src/protocol/rpc/status.rs b/comms/src/protocol/rpc/status.rs index 9194d36479..d2a8e20e6d 100644 --- a/comms/src/protocol/rpc/status.rs +++ b/comms/src/protocol/rpc/status.rs @@ -114,10 +114,10 @@ impl RpcStatus { } pub fn as_code(&self) -> u32 { - self.code as u32 + self.code.as_u32() } - pub fn status_code(&self) -> RpcStatusCode { + pub fn as_status_code(&self) -> RpcStatusCode { self.code } @@ -212,6 +212,14 @@ impl RpcStatusCode { pub fn is_timeout(self) -> bool { self == Self::Timeout } + + pub fn as_u32(&self) -> u32 { + *self as u32 + } + + pub fn to_debug_string(&self) -> String { + format!("{:?}", self) + } } impl From for RpcStatusCode { diff --git a/comms/src/protocol/rpc/test/comms_integration.rs b/comms/src/protocol/rpc/test/comms_integration.rs index f43f921081..025bfcc2b9 100644 --- a/comms/src/protocol/rpc/test/comms_integration.rs +++ b/comms/src/protocol/rpc/test/comms_integration.rs @@ -87,6 +87,6 @@ async fn run_service() { mock_state.set_response_err(RpcStatus::bad_request("Insert 💾")); let err = client.request_response::<_, ()>((), 0.into()).await.unwrap_err(); unpack_enum!(RpcError::RequestFailed(status) = err); - unpack_enum!(RpcStatusCode::BadRequest = status.status_code()); + unpack_enum!(RpcStatusCode::BadRequest = status.as_status_code()); assert_eq!(mock_state.call_count(), 2); } diff --git a/comms/src/protocol/rpc/test/smoke.rs b/comms/src/protocol/rpc/test/smoke.rs index 0fc1f05256..44eb8820ec 100644 --- a/comms/src/protocol/rpc/test/smoke.rs +++ b/comms/src/protocol/rpc/test/smoke.rs @@ -152,7 +152,7 @@ async fn request_response_errors_and_streaming() { let err = client.return_error().await.unwrap_err(); unpack_enum!(RpcError::RequestFailed(status) = err); - assert_eq!(status.status_code(), RpcStatusCode::NotImplemented); + assert_eq!(status.as_status_code(), RpcStatusCode::NotImplemented); assert_eq!(status.details(), "I haven't gotten to this yet :("); let stream = client.streaming_error("Gurglesplurb".to_string()).await.unwrap(); @@ -164,7 +164,7 @@ async fn request_response_errors_and_streaming() { .into_iter() .collect::>() .unwrap_err(); - assert_eq!(status.status_code(), RpcStatusCode::BadRequest); + assert_eq!(status.as_status_code(), RpcStatusCode::BadRequest); assert_eq!(status.details(), "What does 'Gurglesplurb' mean?"); let stream = client.streaming_error2().await.unwrap(); @@ -174,7 +174,7 @@ async fn request_response_errors_and_streaming() { assert_eq!(first_reply, "This is ok"); let second_reply = results.get(1).unwrap().as_ref().unwrap_err(); - assert_eq!(second_reply.status_code(), RpcStatusCode::BadRequest); + assert_eq!(second_reply.as_status_code(), RpcStatusCode::BadRequest); assert_eq!(second_reply.details(), "This is a problem"); let pk_hex = client.get_public_key_hex().await.unwrap(); @@ -262,7 +262,7 @@ async fn response_too_big() { .await .unwrap_err(); unpack_enum!(RpcError::RequestFailed(status) = err); - unpack_enum!(RpcStatusCode::MalformedResponse = status.status_code()); + unpack_enum!(RpcStatusCode::MalformedResponse = status.as_status_code()); // Check that the exact frame size boundary works and that the session is still going let _ = client @@ -314,7 +314,7 @@ async fn timeout() { let err = client.say_hello(Default::default()).await.unwrap_err(); unpack_enum!(RpcError::RequestFailed(status) = err); - assert_eq!(status.status_code(), RpcStatusCode::Timeout); + assert_eq!(status.as_status_code(), RpcStatusCode::Timeout); *delay.write().await = Duration::from_secs(0);