diff --git a/RFC/src/RFC-0250_Covenants.md b/RFC/src/RFC-0250_Covenants.md
index d2e90367db..e3eb65cc15 100644
--- a/RFC/src/RFC-0250_Covenants.md
+++ b/RFC/src/RFC-0250_Covenants.md
@@ -206,58 +206,46 @@ little-endian 64-byte unsigned integer.
The output set is returned unaltered. This rule is implicit for an empty (0 byte) covenant.
-```yaml
-op_byte: 0x20
+op_byte: 0x20
args: []
-```
##### and(A, B)
The intersection (\\(A \cap B\\)) of the resulting output set for covenant rules \\(A\\) and \\(B\\).
-```yaml
-op_byte: 0x21
+op_byte: 0x21
args: [Covenant, Covenant]
-```
##### or(A, B)
The union (\\(A \cup B\\)) of the resulting output set for covenant rules \\(A\\) and \\(B\\).
-```yaml
-op_byte: 0x22
+op_byte: 0x22
args: [Covenant, Covenant]
-```
##### xor(A, B)
The symmetric difference (\\(A \triangle B\\)) of the resulting output set for covenant rules \\(A\\) and \\(B\\).
This is, outputs that match either \\(A\\) or \\(B\\) but not both.
-```yaml
-op_byte: 0x23
+op_byte: 0x23
args: [Covenant, Covenant]
-```
##### not(A)
Returns the compliment of `A`. That is, all the elements of `A` are removed from the
resultant output set.
-```yaml
-op_byte: 0x24
+op_byte: 0x24
args: [Covenant]
-```
##### empty()
Returns an empty set. This will always fail and, if used alone, prevents the UTXO from ever being spent.
A more useful reason to use `empty` is in conjunction a conditional e.g. `if_else(Condition(older_rel(10)), A, empty)`
-```yaml
-op_byte: 0x25
+op_byte: 0x25
args: []
-```
#### Filters
@@ -265,46 +253,36 @@ args: []
Filters for a single output that matches the hash. This filter only returns zero or one outputs.
-```yaml
-op_byte: 0x30
+op_byte: 0x30
args: [Hash]
-```
##### filter_fields_preserved(fields)
Filter for outputs where all given fields in the input are preserved in the output.
-```yaml
-op_byte: 0x31
+op_byte: 0x31
args: [Fields]
-```
##### filter_field_int_eq(field, int)
Filters for outputs whose field value matches the given integer value. If the given field cannot be cast
to an unsigned 64-bit integer, the transaction/block is rejected.
-```yaml
-op_byte: 0x32
+op_byte: 0x32
args: [Field, VarInt]
-```
##### filter_fields_hashed_eq(fields, hash)
-```yaml
-op_byte: 0x33
+op_byte: 0x33
args: [Fields, VarInt]
-```
##### filter_relative_height(height)
Checks the block height that current [UTXO] (i.e. the current input) was mined plus `height` is greater than or
equal to the current block height. If so, the `identity()` is returned, otherwise `empty()`.
-```yaml
-op_byte: 0x34
+op_byte: 0x34
args: [VarInt]
-```
#### Encoding / Decoding
diff --git a/applications/daily_tests/washing_machine.js b/applications/daily_tests/washing_machine.js
index fc66b8137b..c32144aa57 100644
--- a/applications/daily_tests/washing_machine.js
+++ b/applications/daily_tests/washing_machine.js
@@ -129,15 +129,18 @@ function WashingMachine(options) {
`🚀 Launching washing machine (numTransactions = ${numTransactions}, numRounds = ${numRounds}, sleep = ${sleepAfterRound}s)`
);
+ debug(`Connecting to wallet1 at ${wallet1Grpc}...`);
await this.wallet1.connect(wallet1Grpc);
+ debug(`Connected.`);
- debug("Compiling and starting applications...");
let wallet2Process = null;
// Start wallet2
if (wallet2Grpc) {
this.wallet2 = new WalletClient();
+ debug(`Connecting to wallet2 at ${wallet2Grpc}...`);
await this.wallet2.connect(wallet2Grpc);
} else {
+ debug("Compiling wallet2...");
const port = await getFreePort(20000, 25000);
wallet2Process = createGrpcWallet(
baseNodeSeed,
@@ -148,6 +151,7 @@ function WashingMachine(options) {
true
);
wallet2Process.baseDir = "./wallet";
+ debug("Starting wallet2...");
await wallet2Process.startNew();
this.wallet2 = await wallet2Process.connectClient();
}
diff --git a/applications/tari_console_wallet/src/automation/command_parser.rs b/applications/tari_console_wallet/src/automation/command_parser.rs
index 0ed8acf4a6..275c9720b4 100644
--- a/applications/tari_console_wallet/src/automation/command_parser.rs
+++ b/applications/tari_console_wallet/src/automation/command_parser.rs
@@ -288,16 +288,14 @@ fn parse_make_it_rain(mut args: SplitWhitespace) -> Result,
parsed_args.push(ParsedArgument::PublicKey(pubkey));
// transaction type
- let txn_type = args
- .next()
- .ok_or_else(|| ParseError::Empty("transaction type".to_string()))?;
+ let txn_type = args.next();
let negotiated = match txn_type {
- "negotiated" => true,
- "one_sided" => false,
+ Some("negotiated") | Some("interactive") => true,
+ Some("one_sided") | Some("one-sided") | Some("onesided") => false,
_ => {
- println!("Invalid data provided for , must be 'negotiated' or 'one_sided'\n");
+ println!("Invalid data provided for , must be 'interactive' or 'one-sided'\n");
return Err(ParseError::Invalid(
- "Invalid data provided for , must be 'negotiated' or 'one_sided'".to_string(),
+ "Invalid data provided for , must be 'interactive' or 'one-sided'".to_string(),
));
},
};
@@ -531,7 +529,7 @@ mod test {
Err(e) => match e {
ParseError::Invalid(e) => assert_eq!(
e,
- "Invalid data provided for , must be 'negotiated' or 'one_sided'".to_string()
+ "Invalid data provided for , must be 'interactive' or 'one-sided'".to_string()
),
_ => panic!("Expected parsing to return an error here"),
},
diff --git a/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs b/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs
index ca594087b6..3748a557f5 100644
--- a/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs
+++ b/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs
@@ -315,7 +315,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
let resp = match client.find_chain_split(request).await {
Ok(r) => r,
- Err(RpcError::RequestFailed(err)) if err.status_code().is_not_found() => {
+ Err(RpcError::RequestFailed(err)) if err.as_status_code().is_not_found() => {
// This round we sent less hashes than the max, so the next round will not have any more hashes to
// send. Exit early in this case.
if block_hashes.len() < NUM_CHAIN_SPLIT_HEADERS {
diff --git a/base_layer/core/src/base_node/sync/rpc/tests.rs b/base_layer/core/src/base_node/sync/rpc/tests.rs
index 52fdfd0285..055dcf2203 100644
--- a/base_layer/core/src/base_node/sync/rpc/tests.rs
+++ b/base_layer/core/src/base_node/sync/rpc/tests.rs
@@ -62,7 +62,7 @@ mod sync_blocks {
};
let req = rpc_request_mock.request_with_context(Default::default(), msg);
let err = service.sync_blocks(req).await.unwrap_err();
- unpack_enum!(RpcStatusCode::NotFound = err.status_code());
+ unpack_enum!(RpcStatusCode::NotFound = err.as_status_code());
}
#[tokio::test]
diff --git a/base_layer/core/src/mempool/rpc/test.rs b/base_layer/core/src/mempool/rpc/test.rs
index a9cbb2ee49..3f11646463 100644
--- a/base_layer/core/src/mempool/rpc/test.rs
+++ b/base_layer/core/src/mempool/rpc/test.rs
@@ -124,7 +124,7 @@ mod get_tx_state_by_excess_sig {
.await
.unwrap_err();
- unpack_enum!(RpcStatusCode::BadRequest = status.status_code());
+ unpack_enum!(RpcStatusCode::BadRequest = status.as_status_code());
}
}
@@ -174,6 +174,6 @@ mod submit_transaction {
.await
.unwrap_err();
- unpack_enum!(RpcStatusCode::BadRequest = status.status_code());
+ unpack_enum!(RpcStatusCode::BadRequest = status.as_status_code());
}
}
diff --git a/base_layer/wallet/src/output_manager_service/tasks/txo_validation_task.rs b/base_layer/wallet/src/output_manager_service/tasks/txo_validation_task.rs
index c4680cfe45..363530f6fd 100644
--- a/base_layer/wallet/src/output_manager_service/tasks/txo_validation_task.rs
+++ b/base_layer/wallet/src/output_manager_service/tasks/txo_validation_task.rs
@@ -34,7 +34,7 @@ use crate::{
use log::*;
use std::{collections::HashMap, convert::TryInto, sync::Arc};
use tari_common_types::types::BlockHash;
-use tari_comms::protocol::rpc::{RpcError::RequestFailed, RpcStatusCode::NotFound};
+use tari_comms::protocol::rpc::RpcError::RequestFailed;
use tari_core::{
base_node::rpc::BaseNodeWalletRpcClient,
blocks::BlockHeader,
@@ -353,7 +353,7 @@ where
info!(target: LOG_TARGET, "Error asking base node for header:{}", rpc_error);
match &rpc_error {
RequestFailed(status) => {
- if status.status_code() == NotFound {
+ if status.as_status_code().is_not_found() {
return Ok(None);
} else {
return Err(rpc_error.into());
diff --git a/base_layer/wallet/src/transaction_service/config.rs b/base_layer/wallet/src/transaction_service/config.rs
index 42e6f6478b..85ebedb71f 100644
--- a/base_layer/wallet/src/transaction_service/config.rs
+++ b/base_layer/wallet/src/transaction_service/config.rs
@@ -50,7 +50,7 @@ impl Default for TransactionServiceConfig {
direct_send_timeout: Duration::from_secs(20),
broadcast_send_timeout: Duration::from_secs(60),
low_power_polling_timeout: Duration::from_secs(300),
- transaction_resend_period: Duration::from_secs(3600),
+ transaction_resend_period: Duration::from_secs(600),
resend_response_cooldown: Duration::from_secs(300),
pending_transaction_cancellation_timeout: Duration::from_secs(259200), // 3 Days
num_confirmations_required: 3,
diff --git a/base_layer/wallet/src/transaction_service/protocols/transaction_validation_protocol.rs b/base_layer/wallet/src/transaction_service/protocols/transaction_validation_protocol.rs
index eff2c9f0cc..639a246e7a 100644
--- a/base_layer/wallet/src/transaction_service/protocols/transaction_validation_protocol.rs
+++ b/base_layer/wallet/src/transaction_service/protocols/transaction_validation_protocol.rs
@@ -327,7 +327,7 @@ where
warn!(target: LOG_TARGET, "Error asking base node for header:{}", rpc_error);
match &rpc_error {
RequestFailed(status) => {
- if status.status_code() == NotFound {
+ if status.as_status_code() == NotFound {
return Ok(None);
} else {
return Err(rpc_error.into());
diff --git a/base_layer/wallet/src/utxo_scanner_service/utxo_scanning.rs b/base_layer/wallet/src/utxo_scanner_service/utxo_scanning.rs
index bd3796479d..e6fb3f1011 100644
--- a/base_layer/wallet/src/utxo_scanner_service/utxo_scanning.rs
+++ b/base_layer/wallet/src/utxo_scanner_service/utxo_scanning.rs
@@ -354,7 +354,7 @@ where TBackend: WalletBackend + 'static
// this returns the index of the vec of hashes we sent it, that is the last hash it knows of.
match client.find_chain_split(request).await {
Ok(_) => Ok(metadata.utxo_index + 1),
- Err(RpcError::RequestFailed(err)) if err.status_code().is_not_found() => {
+ Err(RpcError::RequestFailed(err)) if err.as_status_code().is_not_found() => {
warn!(target: LOG_TARGET, "Reorg detected: {}", err);
// The node does not know of the last hash we scanned, thus we had a chain split.
// We now start at 0 again.
diff --git a/comms/dht/examples/memory_net/utilities.rs b/comms/dht/examples/memory_net/utilities.rs
index 9cf6340ebc..c42e433272 100644
--- a/comms/dht/examples/memory_net/utilities.rs
+++ b/comms/dht/examples/memory_net/utilities.rs
@@ -245,7 +245,7 @@ pub async fn network_connectivity_stats(nodes: &[TestNode], wallets: &[TestNode]
total += t;
avg += a;
println!(
- "{} total connections on the network. ({} per node on average)",
+ "{} total connections on the network. ({} per peer on average)",
total,
avg / (wallets.len() + nodes.len())
);
diff --git a/comms/dht/src/rpc/test.rs b/comms/dht/src/rpc/test.rs
index cd70d65a0f..ef250217a2 100644
--- a/comms/dht/src/rpc/test.rs
+++ b/comms/dht/src/rpc/test.rs
@@ -159,7 +159,7 @@ mod get_closer_peers {
let node_id = NodeId::default();
let req = mock.request_with_context(node_id, req);
let err = service.get_closer_peers(req).await.unwrap_err();
- assert_eq!(err.status_code(), RpcStatusCode::BadRequest);
+ assert_eq!(err.as_status_code(), RpcStatusCode::BadRequest);
}
}
diff --git a/comms/rpc_macros/tests/macro.rs b/comms/rpc_macros/tests/macro.rs
index 71f0b7053f..15d11f5876 100644
--- a/comms/rpc_macros/tests/macro.rs
+++ b/comms/rpc_macros/tests/macro.rs
@@ -147,7 +147,7 @@ async fn it_returns_an_error_for_invalid_method_nums() {
.await
.unwrap_err();
- unpack_enum!(RpcStatusCode::UnsupportedMethod = err.status_code());
+ unpack_enum!(RpcStatusCode::UnsupportedMethod = err.as_status_code());
}
#[tokio::test]
diff --git a/comms/src/connection_manager/dialer.rs b/comms/src/connection_manager/dialer.rs
index f3e04a9eeb..147ffb9299 100644
--- a/comms/src/connection_manager/dialer.rs
+++ b/comms/src/connection_manager/dialer.rs
@@ -27,6 +27,7 @@ use crate::{
common,
dial_state::DialState,
manager::{ConnectionManagerConfig, ConnectionManagerEvent},
+ metrics,
peer_connection,
},
multiaddr::Multiaddr,
@@ -193,6 +194,7 @@ where
dial_result: Result,
) {
let node_id = dial_state.peer().node_id.clone();
+ metrics::pending_connections(Some(&node_id), ConnectionDirection::Outbound).inc();
let removed = self.cancel_signals.remove(&node_id);
drop(removed);
@@ -213,6 +215,8 @@ where
},
}
+ metrics::pending_connections(Some(&node_id), ConnectionDirection::Outbound).dec();
+
if self.pending_dial_requests.contains_key(&node_id) {
self.reply_to_pending_requests(&node_id, dial_result.clone());
}
diff --git a/comms/src/connection_manager/listener.rs b/comms/src/connection_manager/listener.rs
index c79e192795..2e96471e75 100644
--- a/comms/src/connection_manager/listener.rs
+++ b/comms/src/connection_manager/listener.rs
@@ -32,6 +32,7 @@ use crate::{
bounded_executor::BoundedExecutor,
connection_manager::{
liveness::LivenessSession,
+ metrics,
wire_mode::{WireMode, LIVENESS_WIRE_MODE},
},
multiaddr::Multiaddr,
@@ -239,6 +240,7 @@ where
let span = span!(Level::TRACE, "connection_mann::listener::inbound_task",);
let inbound_fut = async move {
+ metrics::pending_connections(None, ConnectionDirection::Inbound).inc();
match Self::read_wire_format(&mut socket, config.time_to_first_byte).await {
Ok(WireMode::Comms(byte)) if byte == config.network_info.network_byte => {
let this_node_id_str = node_identity.node_id().short_str();
@@ -325,6 +327,8 @@ where
);
},
}
+
+ metrics::pending_connections(None, ConnectionDirection::Inbound).dec();
}
.instrument(span);
diff --git a/comms/src/connection_manager/manager.rs b/comms/src/connection_manager/manager.rs
index d218a883a3..b1ad6d4c96 100644
--- a/comms/src/connection_manager/manager.rs
+++ b/comms/src/connection_manager/manager.rs
@@ -29,6 +29,7 @@ use super::{
};
use crate::{
backoff::Backoff,
+ connection_manager::{metrics, ConnectionDirection},
multiplexing::Substream,
noise::NoiseConfig,
peer_manager::{NodeId, NodeIdentity},
@@ -397,10 +398,14 @@ where
node_id.short_str(),
proto_str
);
+ metrics::inbound_substream_counter(&node_id, &protocol).inc();
let notify_fut = self
.protocols
.notify(&protocol, ProtocolEvent::NewInboundSubstream(node_id, stream));
match time::timeout(Duration::from_secs(10), notify_fut).await {
+ Ok(Ok(_)) => {
+ debug!(target: LOG_TARGET, "Protocol notification for '{}' sent", proto_str);
+ },
Ok(Err(err)) => {
error!(
target: LOG_TARGET,
@@ -413,12 +418,21 @@ where
"Error sending NewSubstream notification for protocol '{}' because {}", proto_str, err
);
},
- _ => {
- debug!(target: LOG_TARGET, "Protocol notification for '{}' sent", proto_str);
- },
}
},
+ PeerConnected(conn) => {
+ metrics::successful_connections(conn.peer_node_id(), conn.direction()).inc();
+ self.publish_event(PeerConnected(conn));
+ },
+ PeerConnectFailed(peer, err) => {
+ metrics::failed_connections(&peer, ConnectionDirection::Outbound).inc();
+ self.publish_event(PeerConnectFailed(peer, err));
+ },
+ PeerInboundConnectFailed(err) => {
+ metrics::failed_connections(&Default::default(), ConnectionDirection::Inbound).inc();
+ self.publish_event(PeerInboundConnectFailed(err));
+ },
event => {
self.publish_event(event);
},
diff --git a/comms/src/connection_manager/metrics.rs b/comms/src/connection_manager/metrics.rs
new file mode 100644
index 0000000000..b7ac2b857b
--- /dev/null
+++ b/comms/src/connection_manager/metrics.rs
@@ -0,0 +1,82 @@
+// Copyright 2021, The Tari Project
+//
+// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
+// following conditions are met:
+//
+// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
+// disclaimer.
+//
+// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
+// following disclaimer in the documentation and/or other materials provided with the distribution.
+//
+// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
+// products derived from this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
+// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
+// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+use crate::{connection_manager::ConnectionDirection, peer_manager::NodeId, protocol::ProtocolId};
+use once_cell::sync::Lazy;
+use tari_metrics::{IntCounter, IntCounterVec, IntGauge, IntGaugeVec};
+
+pub fn pending_connections(peer: Option<&NodeId>, direction: ConnectionDirection) -> IntGauge {
+ static METER: Lazy = Lazy::new(|| {
+ tari_metrics::register_int_gauge_vec(
+ "comms::connections::pending",
+ "Number of active connections by direction",
+ &["peer_id", "direction"],
+ )
+ .unwrap()
+ });
+
+ METER.with_label_values(&[
+ peer.map(ToString::to_string)
+ .unwrap_or_else(|| "unknown".to_string())
+ .as_str(),
+ direction.as_str(),
+ ])
+}
+
+pub fn successful_connections(peer: &NodeId, direction: ConnectionDirection) -> IntCounter {
+ static METER: Lazy = Lazy::new(|| {
+ tari_metrics::register_int_counter_vec(
+ "comms::connections::success",
+ "Number of active connections by direction",
+ &["peer_id", "direction"],
+ )
+ .unwrap()
+ });
+
+ METER.with_label_values(&[peer.to_string().as_str(), direction.as_str()])
+}
+
+pub fn failed_connections(peer: &NodeId, direction: ConnectionDirection) -> IntCounter {
+ static METER: Lazy = Lazy::new(|| {
+ tari_metrics::register_int_counter_vec(
+ "comms::connections::failed",
+ "Number of active connections by direction",
+ &["peer_id", "direction"],
+ )
+ .unwrap()
+ });
+
+ METER.with_label_values(&[peer.to_string().as_str(), direction.as_str()])
+}
+
+pub fn inbound_substream_counter(peer: &NodeId, protocol: &ProtocolId) -> IntCounter {
+ static METER: Lazy = Lazy::new(|| {
+ tari_metrics::register_int_counter_vec(
+ "comms::connections::inbound_substream_request_count",
+ "Number of substream requests",
+ &["peer_id", "protocol"],
+ )
+ .unwrap()
+ });
+
+ METER.with_label_values(&[peer.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()])
+}
diff --git a/comms/src/connection_manager/mod.rs b/comms/src/connection_manager/mod.rs
index 92ef4e18f1..fd0f7a3312 100644
--- a/comms/src/connection_manager/mod.rs
+++ b/comms/src/connection_manager/mod.rs
@@ -23,6 +23,7 @@
mod dial_state;
mod dialer;
mod listener;
+mod metrics;
mod common;
pub use common::validate_peer_addresses;
diff --git a/comms/src/connectivity/manager.rs b/comms/src/connectivity/manager.rs
index d8043050e7..ad75db44b0 100644
--- a/comms/src/connectivity/manager.rs
+++ b/comms/src/connectivity/manager.rs
@@ -510,10 +510,7 @@ impl ConnectivityManagerActor {
_ => {},
}
},
- #[cfg(feature = "metrics")]
- NewInboundSubstream(node_id, protocol, _) => {
- super::metrics::substream_request_count(node_id, protocol).inc();
- },
+
_ => {},
}
diff --git a/comms/src/connectivity/metrics.rs b/comms/src/connectivity/metrics.rs
index 56bd8f781d..1470b4c9c7 100644
--- a/comms/src/connectivity/metrics.rs
+++ b/comms/src/connectivity/metrics.rs
@@ -20,32 +20,23 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-use crate::{connection_manager::ConnectionDirection, peer_manager::NodeId, protocol::ProtocolId};
+use crate::connection_manager::ConnectionDirection;
use once_cell::sync::Lazy;
use tari_metrics::{IntGauge, IntGaugeVec};
pub fn connections(direction: ConnectionDirection) -> IntGauge {
static METER: Lazy = Lazy::new(|| {
- tari_metrics::register_int_gauge_vec("comms::connections", "Number of active connections by direction", &[
- "direction",
- ])
+ tari_metrics::register_int_gauge_vec(
+ "comms::connectivity::num_connections",
+ "Number of active connections by direction",
+ &["direction"],
+ )
.unwrap()
});
METER.with_label_values(&[direction.as_str()])
}
-pub fn substream_request_count(peer: &NodeId, protocol: &ProtocolId) -> IntGauge {
- static METER: Lazy = Lazy::new(|| {
- tari_metrics::register_int_gauge_vec("comms::substream_request_count", "Number of substream requests", &[
- "peer", "protocol",
- ])
- .unwrap()
- });
-
- METER.with_label_values(&[peer.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()])
-}
-
pub fn uptime() -> IntGauge {
static METER: Lazy =
Lazy::new(|| tari_metrics::register_int_gauge("comms::uptime", "Comms uptime").unwrap());
diff --git a/comms/src/multiplexing/metrics.rs b/comms/src/multiplexing/metrics.rs
new file mode 100644
index 0000000000..823501aa57
--- /dev/null
+++ b/comms/src/multiplexing/metrics.rs
@@ -0,0 +1,32 @@
+// Copyright 2021, The Tari Project
+//
+// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
+// following conditions are met:
+//
+// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
+// disclaimer.
+//
+// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
+// following disclaimer in the documentation and/or other materials provided with the distribution.
+//
+// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
+// products derived from this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
+// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
+// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+use once_cell::sync::Lazy;
+use tari_metrics::IntCounter;
+
+pub static TOTAL_BYTES_READ: Lazy = Lazy::new(|| {
+ tari_metrics::register_int_counter("comms::substream::total_bytes_read", "The total inbound bytes").unwrap()
+});
+
+pub static TOTAL_BYTES_WRITTEN: Lazy = Lazy::new(|| {
+ tari_metrics::register_int_counter("comms::substream::total_bytes_written", "The total outbound bytes").unwrap()
+});
diff --git a/comms/src/multiplexing/mod.rs b/comms/src/multiplexing/mod.rs
index 8ac082d5c4..1732ea142b 100644
--- a/comms/src/multiplexing/mod.rs
+++ b/comms/src/multiplexing/mod.rs
@@ -20,5 +20,8 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+#[cfg(feature = "metrics")]
+mod metrics;
+
mod yamux;
pub use self::yamux::{ConnectionError, Control, IncomingSubstreams, Substream, Yamux};
diff --git a/comms/src/multiplexing/yamux.rs b/comms/src/multiplexing/yamux.rs
index 811f485390..a9cf697c31 100644
--- a/comms/src/multiplexing/yamux.rs
+++ b/comms/src/multiplexing/yamux.rs
@@ -220,12 +220,21 @@ impl StreamId for Substream {
impl tokio::io::AsyncRead for Substream {
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> {
- Pin::new(&mut self.stream).poll_read(cx, buf)
+ match Pin::new(&mut self.stream).poll_read(cx, buf) {
+ Poll::Ready(Ok(())) => {
+ #[cfg(feature = "metrics")]
+ super::metrics::TOTAL_BYTES_READ.inc_by(buf.filled().len() as u64);
+ Poll::Ready(Ok(()))
+ },
+ res => res,
+ }
}
}
impl tokio::io::AsyncWrite for Substream {
fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> {
+ #[cfg(feature = "metrics")]
+ super::metrics::TOTAL_BYTES_WRITTEN.inc_by(buf.len() as u64);
Pin::new(&mut self.stream).poll_write(cx, buf)
}
diff --git a/comms/src/protocol/messaging/inbound.rs b/comms/src/protocol/messaging/inbound.rs
index e65f81f8ec..9445445629 100644
--- a/comms/src/protocol/messaging/inbound.rs
+++ b/comms/src/protocol/messaging/inbound.rs
@@ -20,12 +20,8 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-use crate::{
- message::InboundMessage,
- peer_manager::NodeId,
- protocol::messaging::{MessagingEvent, MessagingProtocol},
- rate_limit::RateLimit,
-};
+use super::{metrics, MessagingEvent, MessagingProtocol};
+use crate::{message::InboundMessage, peer_manager::NodeId, rate_limit::RateLimit};
use futures::{future::Either, StreamExt};
use log::*;
use std::{sync::Arc, time::Duration};
@@ -67,6 +63,7 @@ impl InboundMessaging {
pub async fn run(self, socket: S)
where S: AsyncRead + AsyncWrite + Unpin {
let peer = &self.peer;
+ metrics::num_sessions().inc();
debug!(
target: LOG_TARGET,
"Starting inbound messaging protocol for peer '{}'",
@@ -82,9 +79,11 @@ impl InboundMessaging {
};
tokio::pin!(stream);
+ let inbound_count = metrics::inbound_message_count(&self.peer);
while let Some(result) = stream.next().await {
match result {
Ok(Ok(raw_msg)) => {
+ inbound_count.inc();
let msg_len = raw_msg.len();
let inbound_msg = InboundMessage::new(peer.clone(), raw_msg.freeze());
debug!(
@@ -112,6 +111,7 @@ impl InboundMessaging {
let _ = self.messaging_events_tx.send(Arc::new(event));
},
Ok(Err(err)) => {
+ metrics::error_count(peer).inc();
error!(
target: LOG_TARGET,
"Failed to receive from peer '{}' because '{}'",
@@ -122,6 +122,7 @@ impl InboundMessaging {
},
Err(_) => {
+ metrics::error_count(peer).inc();
debug!(
target: LOG_TARGET,
"Inbound messaging for peer '{}' has stopped because it was inactive for {:.0?}",
@@ -134,6 +135,7 @@ impl InboundMessaging {
}
}
+ metrics::num_sessions().dec();
debug!(
target: LOG_TARGET,
"Inbound messaging handler exited for peer `{}`",
diff --git a/comms/src/protocol/messaging/metrics.rs b/comms/src/protocol/messaging/metrics.rs
new file mode 100644
index 0000000000..e43fb4a7d6
--- /dev/null
+++ b/comms/src/protocol/messaging/metrics.rs
@@ -0,0 +1,74 @@
+// Copyright 2021, The Tari Project
+//
+// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
+// following conditions are met:
+//
+// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
+// disclaimer.
+//
+// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
+// following disclaimer in the documentation and/or other materials provided with the distribution.
+//
+// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
+// products derived from this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
+// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
+// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+use crate::peer_manager::NodeId;
+use once_cell::sync::Lazy;
+use tari_metrics::{IntCounter, IntCounterVec, IntGauge};
+
+pub fn num_sessions() -> IntGauge {
+ static METER: Lazy = Lazy::new(|| {
+ tari_metrics::register_int_gauge(
+ "comms::messaging::num_sessions",
+ "The number of active messaging sessions",
+ )
+ .unwrap()
+ });
+
+ METER.clone()
+}
+
+pub fn outbound_message_count(peer: &NodeId) -> IntCounter {
+ static METER: Lazy = Lazy::new(|| {
+ tari_metrics::register_int_counter_vec(
+ "comms::messaging::outbound_message_count",
+ "The number of handshakes per peer",
+ &["peer_id"],
+ )
+ .unwrap()
+ });
+
+ METER.with_label_values(&[peer.to_string().as_str()])
+}
+
+pub fn inbound_message_count(peer: &NodeId) -> IntCounter {
+ static METER: Lazy = Lazy::new(|| {
+ tari_metrics::register_int_counter_vec(
+ "comms::messaging::inbound_message_count",
+ "The number of handshakes per peer",
+ &["peer_id"],
+ )
+ .unwrap()
+ });
+
+ METER.with_label_values(&[peer.to_string().as_str()])
+}
+
+pub fn error_count(peer: &NodeId) -> IntCounter {
+ static METER: Lazy = Lazy::new(|| {
+ tari_metrics::register_int_counter_vec("comms::messaging::errors", "The number of errors per peer", &[
+ "peer_id",
+ ])
+ .unwrap()
+ });
+
+ METER.with_label_values(&[peer.to_string().as_str()])
+}
diff --git a/comms/src/protocol/messaging/mod.rs b/comms/src/protocol/messaging/mod.rs
index 1fa347b3a2..7fd5703e50 100644
--- a/comms/src/protocol/messaging/mod.rs
+++ b/comms/src/protocol/messaging/mod.rs
@@ -29,6 +29,7 @@ pub use extension::MessagingProtocolExtension;
mod error;
mod forward;
mod inbound;
+mod metrics;
mod outbound;
mod protocol;
pub use protocol::{
diff --git a/comms/src/protocol/messaging/outbound.rs b/comms/src/protocol/messaging/outbound.rs
index a2c4288b4e..98fc30a9e2 100644
--- a/comms/src/protocol/messaging/outbound.rs
+++ b/comms/src/protocol/messaging/outbound.rs
@@ -20,7 +20,7 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-use super::{error::MessagingProtocolError, MessagingEvent, MessagingProtocol, SendFailReason};
+use super::{error::MessagingProtocolError, metrics, MessagingEvent, MessagingProtocol, SendFailReason};
use crate::{
connection_manager::{NegotiatedSubstream, PeerConnection},
connectivity::{ConnectivityError, ConnectivityRequester},
@@ -71,6 +71,7 @@ impl OutboundMessaging {
"comms::messaging::outbound",
node_id = self.peer_node_id.to_string().as_str()
);
+ metrics::num_sessions().inc();
async move {
debug!(
target: LOG_TARGET,
@@ -112,6 +113,7 @@ impl OutboundMessaging {
);
},
Err(err) => {
+ metrics::error_count(&peer_node_id).inc();
error!(
target: LOG_TARGET,
"Outbound messaging protocol failed for peer {}: {}", peer_node_id, err
@@ -119,6 +121,7 @@ impl OutboundMessaging {
},
}
+ metrics::num_sessions().dec();
let _ = messaging_events_tx
.send(MessagingEvent::OutboundProtocolExited(peer_node_id))
.await;
@@ -291,7 +294,9 @@ impl OutboundMessaging {
None => Either::Right(stream.map(Ok)),
};
+ let outbound_count = metrics::outbound_message_count(&self.peer_node_id);
let stream = stream.map(|msg| {
+ outbound_count.inc();
msg.map(|mut out_msg| {
event!(Level::DEBUG, "Message buffered for sending {}", out_msg);
out_msg.reply_success();
diff --git a/comms/src/protocol/rpc/client/metrics.rs b/comms/src/protocol/rpc/client/metrics.rs
index ee77a6b91f..98f99e4492 100644
--- a/comms/src/protocol/rpc/client/metrics.rs
+++ b/comms/src/protocol/rpc/client/metrics.rs
@@ -22,69 +22,108 @@
use crate::{peer_manager::NodeId, protocol::ProtocolId};
use once_cell::sync::Lazy;
-use tari_metrics::{Histogram, HistogramVec, IntGauge, IntGaugeVec};
+use tari_metrics::{Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec};
-pub fn sessions_counter(node_id: &NodeId, protocol: &ProtocolId) -> IntGauge {
+pub fn num_sessions(peer: &NodeId, protocol: &ProtocolId) -> IntGauge {
static METER: Lazy = Lazy::new(|| {
tari_metrics::register_int_gauge_vec(
"comms::rpc::client::num_sessions",
- "The number of active clients per node per protocol",
- &["peer", "protocol"],
+ "The number of active clients per peer per protocol",
+ &["peer_id", "protocol"],
)
.unwrap()
});
- METER.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()])
+ METER.with_label_values(&[peer.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()])
}
-pub fn handshake_errors(node_id: &NodeId, protocol: &ProtocolId) -> IntGauge {
- static METER: Lazy = Lazy::new(|| {
- tari_metrics::register_int_gauge_vec(
+pub fn handshake_counter(peer: &NodeId, protocol: &ProtocolId) -> IntCounter {
+ static METER: Lazy = Lazy::new(|| {
+ tari_metrics::register_int_counter_vec(
+ "comms::rpc::client::handshake_count",
+ "The number of handshakes per peer per protocol",
+ &["peer_id", "protocol"],
+ )
+ .unwrap()
+ });
+
+ METER.with_label_values(&[peer.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()])
+}
+
+pub fn handshake_errors(peer: &NodeId, protocol: &ProtocolId) -> IntCounter {
+ static METER: Lazy = Lazy::new(|| {
+ tari_metrics::register_int_counter_vec(
"comms::rpc::client::handshake_errors",
- "The number of handshake errors per node per protocol",
- &["peer", "protocol"],
+ "The number of handshake errors per peer per protocol",
+ &["peer_id", "protocol"],
+ )
+ .unwrap()
+ });
+
+ METER.with_label_values(&[peer.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()])
+}
+
+pub fn client_errors(peer: &NodeId, protocol: &ProtocolId) -> IntCounter {
+ static METER: Lazy = Lazy::new(|| {
+ tari_metrics::register_int_counter_vec(
+ "comms::rpc::client::error_count",
+ "The number of client errors per peer per protocol",
+ &["peer_id", "protocol"],
+ )
+ .unwrap()
+ });
+
+ METER.with_label_values(&[peer.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()])
+}
+
+pub fn client_timeouts(peer: &NodeId, protocol: &ProtocolId) -> IntCounter {
+ static METER: Lazy = Lazy::new(|| {
+ tari_metrics::register_int_counter_vec(
+ "comms::rpc::client::error_timeouts",
+ "The number of client timeouts per peer per protocol",
+ &["peer_id", "protocol"],
)
.unwrap()
});
- METER.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()])
+ METER.with_label_values(&[peer.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()])
}
-pub fn request_response_latency(node_id: &NodeId, protocol: &ProtocolId) -> Histogram {
+pub fn request_response_latency(peer: &NodeId, protocol: &ProtocolId) -> Histogram {
static METER: Lazy = Lazy::new(|| {
tari_metrics::register_histogram_vec(
"comms::rpc::client::request_response_latency",
"A histogram of request to first response latency",
- &["peer", "protocol"],
+ &["peer_id", "protocol"],
)
.unwrap()
});
- METER.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()])
+ METER.with_label_values(&[peer.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()])
}
-pub fn outbound_request_bytes(node_id: &NodeId, protocol: &ProtocolId) -> Histogram {
+pub fn outbound_request_bytes(peer: &NodeId, protocol: &ProtocolId) -> Histogram {
static METER: Lazy = Lazy::new(|| {
tari_metrics::register_histogram_vec(
"comms::rpc::client::outbound_request_bytes",
- "Avg. request bytes per node per protocol",
- &["peer", "protocol"],
+ "Avg. request bytes per peer per protocol",
+ &["peer_id", "protocol"],
)
.unwrap()
});
- METER.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()])
+ METER.with_label_values(&[peer.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()])
}
-pub fn inbound_response_bytes(node_id: &NodeId, protocol: &ProtocolId) -> Histogram {
+pub fn inbound_response_bytes(peer: &NodeId, protocol: &ProtocolId) -> Histogram {
static METER: Lazy = Lazy::new(|| {
tari_metrics::register_histogram_vec(
"comms::rpc::client::inbound_response_bytes",
"Avg. response bytes per peer per protocol",
- &["peer", "protocol"],
+ &["peer_id", "protocol"],
)
.unwrap()
});
- METER.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()])
+ METER.with_label_values(&[peer.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()])
}
diff --git a/comms/src/protocol/rpc/client/mod.rs b/comms/src/protocol/rpc/client/mod.rs
index 9bec5975ab..b69caaa1db 100644
--- a/comms/src/protocol/rpc/client/mod.rs
+++ b/comms/src/protocol/rpc/client/mod.rs
@@ -462,8 +462,10 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId
if let Some(r) = self.ready_tx.take() {
let _ = r.send(Ok(()));
}
+ metrics::handshake_counter(&self.node_id, &self.protocol_id).inc();
},
Err(err) => {
+ metrics::handshake_errors(&self.node_id, &self.protocol_id).inc();
if let Some(r) = self.ready_tx.take() {
let _ = r.send(Err(err.into()));
}
@@ -472,8 +474,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId
},
}
- let session_counter = metrics::sessions_counter(&self.node_id, &self.protocol_id);
- session_counter.inc();
+ metrics::num_sessions(&self.node_id, &self.protocol_id).inc();
loop {
tokio::select! {
biased;
@@ -484,6 +485,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId
match req {
Some(req) => {
if let Err(err) = self.handle_request(req).await {
+ metrics::client_errors(&self.node_id, &self.protocol_id).inc();
error!(target: LOG_TARGET, "(stream={}) Unexpected error: {}. Worker is terminating.", self.stream_id(), err);
break;
}
@@ -493,21 +495,23 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId
}
}
}
- session_counter.dec();
+ metrics::num_sessions(&self.node_id, &self.protocol_id).dec();
if let Err(err) = self.framed.close().await {
debug!(
target: LOG_TARGET,
- "(stream={}) IO Error when closing substream: {}",
+ "(stream: {}, peer: {}) IO Error when closing substream: {}",
self.stream_id(),
+ self.node_id,
err
);
}
debug!(
target: LOG_TARGET,
- "(stream={}) RpcClientWorker ({}) terminated.",
+ "(stream: {}, peer: {}) RpcClientWorker ({}) terminated.",
self.stream_id(),
+ self.node_id,
self.protocol_name()
);
}
@@ -552,6 +556,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId
self.stream_id(),
start.elapsed()
);
+ metrics::client_timeouts(&self.node_id, &self.protocol_id).inc();
let _ = reply.send(Err(RpcStatus::timed_out("Response timed out")));
return Ok(());
},
@@ -583,7 +588,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId
Ok(())
}
- #[tracing::instrument(name = "rpc_do_request_response", skip(self, reply, request), fields(request_method = ?request.method, request_size = request.message.len()))]
+ #[tracing::instrument(name = "rpc_do_request_response", skip(self, reply, request), fields(request_method = ?request.method, request_body_size = request.message.len()))]
async fn do_request_response(
&mut self,
request: BaseRequest,
@@ -629,6 +634,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId
let mut metrics_timer = Some(latency.start_timer());
if let Err(err) = self.send_request(req).await {
warn!(target: LOG_TARGET, "{}", err);
+ metrics::client_errors(&self.node_id, &self.protocol_id).inc();
let _ = response_tx.send(Err(err.into()));
return Ok(());
}
@@ -637,7 +643,9 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId
if self.shutdown_signal.is_triggered() {
debug!(
target: LOG_TARGET,
- "[{}, stream_id: {}, req_id: {}] Client connector closed. Quitting stream early",
+ "[peer: {}, protocol: {}, stream_id: {}, req_id: {}] Client connector closed. Quitting stream \
+ early",
+ self.node_id,
self.protocol_name(),
self.stream_id(),
request_id
@@ -671,7 +679,17 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId
"Request {} (method={}) timed out", request_id, method,
);
event!(Level::ERROR, "Response timed out");
- if !response_tx.is_closed() {
+ metrics::client_timeouts(&self.node_id, &self.protocol_id).inc();
+ if response_tx.is_closed() {
+ let req = proto::rpc::RpcRequest {
+ request_id: request_id as u32,
+ method,
+ flags: RpcMessageFlags::FIN.bits().into(),
+ ..Default::default()
+ };
+
+ self.send_request(req).await?;
+ } else {
let _ = response_tx.send(Err(RpcStatus::timed_out("Response timed out"))).await;
}
break;
@@ -713,9 +731,8 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId
let req = proto::rpc::RpcRequest {
request_id: request_id as u32,
method,
- deadline: self.config.deadline.map(|t| t.as_secs()).unwrap_or(0),
flags: RpcMessageFlags::FIN.bits().into(),
- payload: vec![],
+ ..Default::default()
};
self.send_request(req).await?;
@@ -773,7 +790,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId
break resp;
},
Err(RpcError::ResponseIdDidNotMatchRequest { actual, expected })
- if actual.saturating_add(1) == request_id =>
+ if actual.wrapping_add(1) == request_id =>
{
warn!(
target: LOG_TARGET,
@@ -783,7 +800,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId
// Be lenient for a number of messages that may have been buffered to come through for the previous
// request.
- const MAX_ALLOWED_IGNORED: usize = 5;
+ const MAX_ALLOWED_IGNORED: usize = 20;
if num_ignored > MAX_ALLOWED_IGNORED {
return Err(RpcError::ResponseIdDidNotMatchRequest { actual, expected });
}
@@ -798,7 +815,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId
fn next_request_id(&mut self) -> u16 {
let next_id = self.next_request_id;
// request_id is allowed to wrap around back to 0
- self.next_request_id = self.next_request_id.checked_add(1).unwrap_or(0);
+ self.next_request_id = self.next_request_id.wrapping_add(1);
next_id
}
diff --git a/comms/src/protocol/rpc/server/error.rs b/comms/src/protocol/rpc/server/error.rs
index f5cc145de2..7259e987fb 100644
--- a/comms/src/protocol/rpc/server/error.rs
+++ b/comms/src/protocol/rpc/server/error.rs
@@ -20,7 +20,7 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-use crate::protocol::rpc::handshake::RpcHandshakeError;
+use crate::{proto, protocol::rpc::handshake::RpcHandshakeError};
use prost::DecodeError;
use std::io;
use tokio::sync::oneshot;
@@ -42,7 +42,15 @@ pub enum RpcServerError {
#[error("Service not found for protocol `{0}`")]
ProtocolServiceNotFound(String),
#[error("Unexpected incoming message")]
- UnexpectedIncomingMessage,
+ UnexpectedIncomingMessage(proto::rpc::RpcRequest),
+ #[error("Unexpected incoming MALFORMED message")]
+ UnexpectedIncomingMessageMalformed,
+ #[error("Client interrupted stream")]
+ ClientInterruptedStream,
+ #[error("Service call exceeded deadline")]
+ ServiceCallExceededDeadline,
+ #[error("Stream read exceeded deadline")]
+ ReadStreamExceededDeadline,
}
impl From for RpcServerError {
@@ -50,3 +58,21 @@ impl From for RpcServerError {
RpcServerError::RequestCanceled
}
}
+
+impl RpcServerError {
+ pub fn to_debug_string(&self) -> String {
+ use RpcServerError::*;
+ match self {
+ DecodeError(_) => "DecodeError".to_string(),
+ Io(err) => {
+ format!("Io({:?})", err.kind())
+ },
+ HandshakeError(_) => "HandshakeError".to_string(),
+ ProtocolServiceNotFound(_) => "ProtocolServiceNotFound".to_string(),
+ UnexpectedIncomingMessage(_) => "UnexpectedIncomingMessage".to_string(),
+ err => {
+ format!("{:?}", err)
+ },
+ }
+ }
+}
diff --git a/comms/src/protocol/rpc/server/metrics.rs b/comms/src/protocol/rpc/server/metrics.rs
index cb0c1d7d2a..ed11db7e9e 100644
--- a/comms/src/protocol/rpc/server/metrics.rs
+++ b/comms/src/protocol/rpc/server/metrics.rs
@@ -20,16 +20,22 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-use crate::{peer_manager::NodeId, protocol::ProtocolId};
+use crate::{
+ peer_manager::NodeId,
+ protocol::{
+ rpc::{RpcServerError, RpcStatusCode},
+ ProtocolId,
+ },
+};
use once_cell::sync::Lazy;
use tari_metrics::{Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec};
-pub fn sessions_counter(node_id: &NodeId, protocol: &ProtocolId) -> IntGauge {
+pub fn num_sessions(node_id: &NodeId, protocol: &ProtocolId) -> IntGauge {
static METER: Lazy = Lazy::new(|| {
tari_metrics::register_int_gauge_vec(
"comms::rpc::server::num_sessions",
- "The number of active server sessions per node per protocol",
- &["peer", "protocol"],
+ "The number of active server sessions per peer per protocol",
+ &["peer_id", "protocol"],
)
.unwrap()
});
@@ -37,12 +43,12 @@ pub fn sessions_counter(node_id: &NodeId, protocol: &ProtocolId) -> IntGauge {
METER.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()])
}
-pub fn handshake_error_counter(node_id: &NodeId, protocol: &ProtocolId) -> IntGauge {
- static METER: Lazy = Lazy::new(|| {
- tari_metrics::register_int_gauge_vec(
- "comms::rpc::server::handshake_errors",
- "The number of handshake errors per node per protocol",
- &["peer", "protocol"],
+pub fn handshake_error_counter(node_id: &NodeId, protocol: &ProtocolId) -> IntCounter {
+ static METER: Lazy = Lazy::new(|| {
+ tari_metrics::register_int_counter_vec(
+ "comms::rpc::server::handshake_error_count",
+ "The number of handshake errors per peer per protocol",
+ &["peer_id", "protocol"],
)
.unwrap()
});
@@ -50,25 +56,46 @@ pub fn handshake_error_counter(node_id: &NodeId, protocol: &ProtocolId) -> IntGa
METER.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()])
}
-pub fn error_counter(node_id: &NodeId, protocol: &ProtocolId) -> IntCounter {
+pub fn error_counter(node_id: &NodeId, protocol: &ProtocolId, err: &RpcServerError) -> IntCounter {
static METER: Lazy = Lazy::new(|| {
tari_metrics::register_int_counter_vec(
"comms::rpc::server::error_count",
- "The number of RPC errors per node per protocol",
- &["peer", "protocol"],
+ "The number of RPC errors per peer per protocol",
+ &["peer_id", "protocol", "error"],
)
.unwrap()
});
- METER.with_label_values(&[node_id.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()])
+ METER.with_label_values(&[
+ node_id.to_string().as_str(),
+ String::from_utf8_lossy(protocol).as_ref(),
+ err.to_debug_string().as_str(),
+ ])
+}
+
+pub fn status_error_counter(node_id: &NodeId, protocol: &ProtocolId, status_code: RpcStatusCode) -> IntCounter {
+ static METER: Lazy = Lazy::new(|| {
+ tari_metrics::register_int_counter_vec(
+ "comms::rpc::server::status_error_count",
+ "The number of RPC errors by status code per peer per protocol",
+ &["peer_id", "protocol", "status"],
+ )
+ .unwrap()
+ });
+
+ METER.with_label_values(&[
+ node_id.to_string().as_str(),
+ String::from_utf8_lossy(protocol).as_ref(),
+ status_code.to_debug_string().as_str(),
+ ])
}
pub fn inbound_requests_bytes(node_id: &NodeId, protocol: &ProtocolId) -> Histogram {
static METER: Lazy = Lazy::new(|| {
tari_metrics::register_histogram_vec(
"comms::rpc::server::inbound_request_bytes",
- "Avg. request bytes per node per protocol",
- &["peer", "protocol"],
+ "Avg. request bytes per peer per protocol",
+ &["peer_id", "protocol"],
)
.unwrap()
});
@@ -81,7 +108,7 @@ pub fn outbound_response_bytes(node_id: &NodeId, protocol: &ProtocolId) -> Histo
tari_metrics::register_histogram_vec(
"comms::rpc::server::outbound_response_bytes",
"Avg. response bytes per peer per protocol",
- &["peer", "protocol"],
+ &["peer_id", "protocol"],
)
.unwrap()
});
diff --git a/comms/src/protocol/rpc/server/mod.rs b/comms/src/protocol/rpc/server/mod.rs
index 31fa62ee5a..ed02b82149 100644
--- a/comms/src/protocol/rpc/server/mod.rs
+++ b/comms/src/protocol/rpc/server/mod.rs
@@ -70,12 +70,12 @@ use prost::Message;
use std::{
borrow::Cow,
future::Future,
+ io,
pin::Pin,
sync::Arc,
task::Poll,
time::{Duration, Instant},
};
-use tari_metrics::IntCounter;
use tokio::{sync::mpsc, time};
use tokio_stream::Stream;
use tower::Service;
@@ -386,10 +386,10 @@ where
let node_id = node_id.clone();
self.executor
.try_spawn(async move {
- let sessions_counter = metrics::sessions_counter(&node_id, &service.protocol);
- sessions_counter.inc();
+ let num_sessions = metrics::num_sessions(&node_id, &service.protocol);
+ num_sessions.inc();
service.start().await;
- sessions_counter.dec();
+ num_sessions.dec();
})
.map_err(|_| RpcServerError::MaximumSessionsReached)?;
@@ -405,7 +405,6 @@ struct ActivePeerRpcService {
framed: CanonicalFraming,
comms_provider: TCommsProvider,
logging_context_string: Arc,
- error_counter: IntCounter,
}
impl ActivePeerRpcService
@@ -421,7 +420,6 @@ where
framed: CanonicalFraming,
comms_provider: TCommsProvider,
) -> Self {
- let error_counter = metrics::error_counter(&node_id, &protocol);
Self {
logging_context_string: Arc::new(format!(
"stream_id: {}, peer: {}, protocol: {}",
@@ -436,7 +434,6 @@ where
service,
framed,
comms_provider,
- error_counter,
}
}
@@ -446,7 +443,7 @@ where
"({}) Rpc server started.", self.logging_context_string,
);
if let Err(err) = self.run().await {
- self.error_counter.inc();
+ metrics::error_counter(&self.node_id, &self.protocol, &err).inc();
error!(
target: LOG_TARGET,
"({}) Rpc server exited with an error: {}", self.logging_context_string, err
@@ -470,7 +467,13 @@ where
err
);
}
- self.error_counter.inc();
+ error!(
+ target: LOG_TARGET,
+ "(peer: {}, protocol: {}) Failed to handle request: {}",
+ self.node_id,
+ self.protocol_name(),
+ err
+ );
return Err(err);
}
let elapsed = start.elapsed();
@@ -489,7 +492,6 @@ where
"({}) Failed to close substream after socket error: {}", self.logging_context_string, err
);
}
- self.error_counter.inc();
return Err(err.into());
},
}
@@ -524,8 +526,8 @@ where
flags: RpcMessageFlags::FIN.bits().into(),
payload: status.to_details_bytes(),
};
+ metrics::status_error_counter(&self.node_id, &self.protocol, status.as_status_code()).inc();
self.framed.send(bad_request.to_encoded_bytes().into()).await?;
- self.error_counter.inc();
return Ok(());
}
@@ -573,13 +575,17 @@ where
Err(_) => {
warn!(
target: LOG_TARGET,
- "RPC service was not able to complete within the deadline ({:.0?}). Request aborted for peer '{}' \
- ({}).",
+ "{} RPC service was not able to complete within the deadline ({:.0?}). Request aborted",
+ self.logging_context_string,
deadline,
- self.node_id,
- self.protocol_name()
);
- self.error_counter.inc();
+
+ metrics::error_counter(
+ &self.node_id,
+ &self.protocol,
+ &RpcServerError::ServiceCallExceededDeadline,
+ )
+ .inc();
return Ok(());
},
};
@@ -589,12 +595,9 @@ where
self.process_body(request_id, deadline, body).await?;
},
Err(err) => {
- debug!(
+ error!(
target: LOG_TARGET,
- "(peer: {}, protocol: {}) Service returned an error: {}",
- self.node_id,
- self.protocol_name(),
- err
+ "{} Service returned an error: {}", self.logging_context_string, err
);
let resp = proto::rpc::RpcResponse {
request_id,
@@ -603,7 +606,7 @@ where
payload: err.to_details_bytes(),
};
- self.error_counter.inc();
+ metrics::status_error_counter(&self.node_id, &self.protocol, err.as_status_code()).inc();
self.framed.send(resp.to_encoded_bytes().into()).await?;
},
}
@@ -623,18 +626,33 @@ where
) -> Result<(), RpcServerError> {
let response_bytes = metrics::outbound_response_bytes(&self.node_id, &self.protocol);
trace!(target: LOG_TARGET, "Service call succeeded");
+
+ let node_id = self.node_id.clone();
+ let protocol = self.protocol.clone();
let mut stream = body
.into_message()
.map(|result| into_response(request_id, result))
- .flat_map(|message| stream::iter(ChunkedResponseIter::new(message)))
+ .flat_map(move |message| {
+ if !message.status.is_ok() {
+ metrics::status_error_counter(&node_id, &protocol, message.status).inc();
+ }
+ stream::iter(ChunkedResponseIter::new(message))
+ })
.map(|resp| Bytes::from(resp.to_encoded_bytes()));
loop {
// Check if the client interrupted the outgoing stream
if let Err(err) = self.check_interruptions().await {
- // Debug level because there are many valid reasons to interrupt a stream
- debug!(target: LOG_TARGET, "Stream interrupted: {}", err);
- break;
+ match err {
+ err @ RpcServerError::ClientInterruptedStream => {
+ debug!(target: LOG_TARGET, "Stream was interrupted: {}", err);
+ break;
+ },
+ err => {
+ error!(target: LOG_TARGET, "Stream was interrupted: {}", err);
+ return Err(err);
+ },
+ }
}
let next_item = log_timing(
@@ -667,7 +685,12 @@ where
deadline
);
- self.error_counter.inc();
+ metrics::error_counter(
+ &self.node_id,
+ &self.protocol,
+ &RpcServerError::ReadStreamExceededDeadline,
+ )
+ .inc();
break;
},
}
@@ -677,7 +700,22 @@ where
async fn check_interruptions(&mut self) -> Result<(), RpcServerError> {
let check = future::poll_fn(|cx| match Pin::new(&mut self.framed).poll_next(cx) {
- Poll::Ready(Some(Ok(_))) => Poll::Ready(Some(RpcServerError::UnexpectedIncomingMessage)),
+ Poll::Ready(Some(Ok(mut msg))) => {
+ let decoded_msg = match proto::rpc::RpcRequest::decode(&mut msg) {
+ Ok(msg) => msg,
+ Err(err) => {
+ error!(target: LOG_TARGET, "Client send MALFORMED response: {}", err);
+ return Poll::Ready(Some(RpcServerError::UnexpectedIncomingMessageMalformed));
+ },
+ };
+ let msg_flags = RpcMessageFlags::from_bits_truncate(decoded_msg.flags as u8);
+ if msg_flags.is_fin() {
+ Poll::Ready(Some(RpcServerError::ClientInterruptedStream))
+ } else {
+ Poll::Ready(Some(RpcServerError::UnexpectedIncomingMessage(decoded_msg)))
+ }
+ },
+ Poll::Ready(Some(Err(err))) if err.kind() == io::ErrorKind::WouldBlock => Poll::Ready(None),
Poll::Ready(Some(Err(err))) => Poll::Ready(Some(RpcServerError::from(err))),
Poll::Ready(None) => Poll::Ready(Some(RpcServerError::StreamClosedByRemote)),
Poll::Pending => Poll::Ready(None),
@@ -722,7 +760,7 @@ fn into_response(request_id: u32, result: Result) -> RpcRe
}
RpcResponse {
request_id,
- status: RpcStatus::ok().status_code(),
+ status: RpcStatus::ok().as_status_code(),
flags,
payload: msg.into_bytes().unwrap_or_else(Bytes::new),
}
@@ -731,7 +769,7 @@ fn into_response(request_id: u32, result: Result) -> RpcRe
debug!(target: LOG_TARGET, "Body contained an error: {}", err);
RpcResponse {
request_id,
- status: err.status_code(),
+ status: err.as_status_code(),
flags: RpcMessageFlags::FIN,
payload: Bytes::from(err.to_details_bytes()),
}
diff --git a/comms/src/protocol/rpc/status.rs b/comms/src/protocol/rpc/status.rs
index 9194d36479..d2a8e20e6d 100644
--- a/comms/src/protocol/rpc/status.rs
+++ b/comms/src/protocol/rpc/status.rs
@@ -114,10 +114,10 @@ impl RpcStatus {
}
pub fn as_code(&self) -> u32 {
- self.code as u32
+ self.code.as_u32()
}
- pub fn status_code(&self) -> RpcStatusCode {
+ pub fn as_status_code(&self) -> RpcStatusCode {
self.code
}
@@ -212,6 +212,14 @@ impl RpcStatusCode {
pub fn is_timeout(self) -> bool {
self == Self::Timeout
}
+
+ pub fn as_u32(&self) -> u32 {
+ *self as u32
+ }
+
+ pub fn to_debug_string(&self) -> String {
+ format!("{:?}", self)
+ }
}
impl From for RpcStatusCode {
diff --git a/comms/src/protocol/rpc/test/comms_integration.rs b/comms/src/protocol/rpc/test/comms_integration.rs
index f43f921081..025bfcc2b9 100644
--- a/comms/src/protocol/rpc/test/comms_integration.rs
+++ b/comms/src/protocol/rpc/test/comms_integration.rs
@@ -87,6 +87,6 @@ async fn run_service() {
mock_state.set_response_err(RpcStatus::bad_request("Insert 💾"));
let err = client.request_response::<_, ()>((), 0.into()).await.unwrap_err();
unpack_enum!(RpcError::RequestFailed(status) = err);
- unpack_enum!(RpcStatusCode::BadRequest = status.status_code());
+ unpack_enum!(RpcStatusCode::BadRequest = status.as_status_code());
assert_eq!(mock_state.call_count(), 2);
}
diff --git a/comms/src/protocol/rpc/test/smoke.rs b/comms/src/protocol/rpc/test/smoke.rs
index 0fc1f05256..44eb8820ec 100644
--- a/comms/src/protocol/rpc/test/smoke.rs
+++ b/comms/src/protocol/rpc/test/smoke.rs
@@ -152,7 +152,7 @@ async fn request_response_errors_and_streaming() {
let err = client.return_error().await.unwrap_err();
unpack_enum!(RpcError::RequestFailed(status) = err);
- assert_eq!(status.status_code(), RpcStatusCode::NotImplemented);
+ assert_eq!(status.as_status_code(), RpcStatusCode::NotImplemented);
assert_eq!(status.details(), "I haven't gotten to this yet :(");
let stream = client.streaming_error("Gurglesplurb".to_string()).await.unwrap();
@@ -164,7 +164,7 @@ async fn request_response_errors_and_streaming() {
.into_iter()
.collect::>()
.unwrap_err();
- assert_eq!(status.status_code(), RpcStatusCode::BadRequest);
+ assert_eq!(status.as_status_code(), RpcStatusCode::BadRequest);
assert_eq!(status.details(), "What does 'Gurglesplurb' mean?");
let stream = client.streaming_error2().await.unwrap();
@@ -174,7 +174,7 @@ async fn request_response_errors_and_streaming() {
assert_eq!(first_reply, "This is ok");
let second_reply = results.get(1).unwrap().as_ref().unwrap_err();
- assert_eq!(second_reply.status_code(), RpcStatusCode::BadRequest);
+ assert_eq!(second_reply.as_status_code(), RpcStatusCode::BadRequest);
assert_eq!(second_reply.details(), "This is a problem");
let pk_hex = client.get_public_key_hex().await.unwrap();
@@ -262,7 +262,7 @@ async fn response_too_big() {
.await
.unwrap_err();
unpack_enum!(RpcError::RequestFailed(status) = err);
- unpack_enum!(RpcStatusCode::MalformedResponse = status.status_code());
+ unpack_enum!(RpcStatusCode::MalformedResponse = status.as_status_code());
// Check that the exact frame size boundary works and that the session is still going
let _ = client
@@ -314,7 +314,7 @@ async fn timeout() {
let err = client.say_hello(Default::default()).await.unwrap_err();
unpack_enum!(RpcError::RequestFailed(status) = err);
- assert_eq!(status.status_code(), RpcStatusCode::Timeout);
+ assert_eq!(status.as_status_code(), RpcStatusCode::Timeout);
*delay.write().await = Duration::from_secs(0);