Skip to content

Commit

Permalink
fix: be more permissive of responses for the incorrect request_id (#3588
Browse files Browse the repository at this point in the history
)

Description
---
- add more intrumentation to tari_comms
- improve behaviour and logs around stream interruptions
- be more permissive of responses for the incorrect request_id
- reduce transaction_resend_period to 10 minutes from an hour @philipr-za 

Motivation and Context
---
Track more metrics to diagnose issues.
Formally recognise a stream interruption and continue the session vs a protocol violation which terminates the session. 

How Has This Been Tested?
---
Existing test, stress test involving 2 wallets and a base node
  • Loading branch information
sdbondi authored Nov 21, 2021
1 parent 91fe921 commit c0d625c
Show file tree
Hide file tree
Showing 35 changed files with 529 additions and 175 deletions.
44 changes: 11 additions & 33 deletions RFC/src/RFC-0250_Covenants.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,105 +206,83 @@ little-endian 64-byte unsigned integer.

The output set is returned unaltered. This rule is implicit for an empty (0 byte) covenant.

```yaml
op_byte: 0x20
op_byte: 0x20<br>
args: []
```

##### and(A, B)

The intersection (\\(A \cap B\\)) of the resulting output set for covenant rules \\(A\\) and \\(B\\).

```yaml
op_byte: 0x21
op_byte: 0x21<br>
args: [Covenant, Covenant]
```

##### or(A, B)

The union (\\(A \cup B\\)) of the resulting output set for covenant rules \\(A\\) and \\(B\\).

```yaml
op_byte: 0x22
op_byte: 0x22<br>
args: [Covenant, Covenant]
```

##### xor(A, B)

The symmetric difference (\\(A \triangle B\\)) of the resulting output set for covenant rules \\(A\\) and \\(B\\).
This is, outputs that match either \\(A\\) or \\(B\\) but not both.

```yaml
op_byte: 0x23
op_byte: 0x23<br>
args: [Covenant, Covenant]
```

##### not(A)

Returns the compliment of `A`. That is, all the elements of `A` are removed from the
resultant output set.

```yaml
op_byte: 0x24
op_byte: 0x24<br>
args: [Covenant]
```

##### empty()

Returns an empty set. This will always fail and, if used alone, prevents the UTXO from ever being spent.
A more useful reason to use `empty` is in conjunction a conditional e.g. `if_else(Condition(older_rel(10)), A, empty)`

```yaml
op_byte: 0x25
op_byte: 0x25<br>
args: []
```

#### Filters

##### filter_output_hash_eq(hash)

Filters for a single output that matches the hash. This filter only returns zero or one outputs.

```yaml
op_byte: 0x30
op_byte: 0x30<br>
args: [Hash]
```

##### filter_fields_preserved(fields)

Filter for outputs where all given fields in the input are preserved in the output.

```yaml
op_byte: 0x31
op_byte: 0x31<br>
args: [Fields]
```

##### filter_field_int_eq(field, int)

Filters for outputs whose field value matches the given integer value. If the given field cannot be cast
to an unsigned 64-bit integer, the transaction/block is rejected.

```yaml
op_byte: 0x32
op_byte: 0x32<br>
args: [Field, VarInt]
```

##### filter_fields_hashed_eq(fields, hash)

```yaml
op_byte: 0x33
op_byte: 0x33<br>
args: [Fields, VarInt]
```

##### filter_relative_height(height)

Checks the block height that current [UTXO] (i.e. the current input) was mined plus `height` is greater than or
equal to the current block height. If so, the `identity()` is returned, otherwise `empty()`.

```yaml
op_byte: 0x34
op_byte: 0x34<br>
args: [VarInt]
```

#### Encoding / Decoding

Expand Down
6 changes: 5 additions & 1 deletion applications/daily_tests/washing_machine.js
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,18 @@ function WashingMachine(options) {
`🚀 Launching washing machine (numTransactions = ${numTransactions}, numRounds = ${numRounds}, sleep = ${sleepAfterRound}s)`
);

debug(`Connecting to wallet1 at ${wallet1Grpc}...`);
await this.wallet1.connect(wallet1Grpc);
debug(`Connected.`);

debug("Compiling and starting applications...");
let wallet2Process = null;
// Start wallet2
if (wallet2Grpc) {
this.wallet2 = new WalletClient();
debug(`Connecting to wallet2 at ${wallet2Grpc}...`);
await this.wallet2.connect(wallet2Grpc);
} else {
debug("Compiling wallet2...");
const port = await getFreePort(20000, 25000);
wallet2Process = createGrpcWallet(
baseNodeSeed,
Expand All @@ -148,6 +151,7 @@ function WashingMachine(options) {
true
);
wallet2Process.baseDir = "./wallet";
debug("Starting wallet2...");
await wallet2Process.startNew();
this.wallet2 = await wallet2Process.connectClient();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,16 +288,14 @@ fn parse_make_it_rain(mut args: SplitWhitespace) -> Result<Vec<ParsedArgument>,
parsed_args.push(ParsedArgument::PublicKey(pubkey));

// transaction type
let txn_type = args
.next()
.ok_or_else(|| ParseError::Empty("transaction type".to_string()))?;
let txn_type = args.next();
let negotiated = match txn_type {
"negotiated" => true,
"one_sided" => false,
Some("negotiated") | Some("interactive") => true,
Some("one_sided") | Some("one-sided") | Some("onesided") => false,
_ => {
println!("Invalid data provided for <transaction type>, must be 'negotiated' or 'one_sided'\n");
println!("Invalid data provided for <transaction type>, must be 'interactive' or 'one-sided'\n");
return Err(ParseError::Invalid(
"Invalid data provided for <transaction type>, must be 'negotiated' or 'one_sided'".to_string(),
"Invalid data provided for <transaction type>, must be 'interactive' or 'one-sided'".to_string(),
));
},
};
Expand Down Expand Up @@ -531,7 +529,7 @@ mod test {
Err(e) => match e {
ParseError::Invalid(e) => assert_eq!(
e,
"Invalid data provided for <transaction type>, must be 'negotiated' or 'one_sided'".to_string()
"Invalid data provided for <transaction type>, must be 'interactive' or 'one-sided'".to_string()
),
_ => panic!("Expected parsing <transaction type> to return an error here"),
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {

let resp = match client.find_chain_split(request).await {
Ok(r) => r,
Err(RpcError::RequestFailed(err)) if err.status_code().is_not_found() => {
Err(RpcError::RequestFailed(err)) if err.as_status_code().is_not_found() => {
// This round we sent less hashes than the max, so the next round will not have any more hashes to
// send. Exit early in this case.
if block_hashes.len() < NUM_CHAIN_SPLIT_HEADERS {
Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/src/base_node/sync/rpc/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ mod sync_blocks {
};
let req = rpc_request_mock.request_with_context(Default::default(), msg);
let err = service.sync_blocks(req).await.unwrap_err();
unpack_enum!(RpcStatusCode::NotFound = err.status_code());
unpack_enum!(RpcStatusCode::NotFound = err.as_status_code());
}

#[tokio::test]
Expand Down
4 changes: 2 additions & 2 deletions base_layer/core/src/mempool/rpc/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ mod get_tx_state_by_excess_sig {
.await
.unwrap_err();

unpack_enum!(RpcStatusCode::BadRequest = status.status_code());
unpack_enum!(RpcStatusCode::BadRequest = status.as_status_code());
}
}

Expand Down Expand Up @@ -174,6 +174,6 @@ mod submit_transaction {
.await
.unwrap_err();

unpack_enum!(RpcStatusCode::BadRequest = status.status_code());
unpack_enum!(RpcStatusCode::BadRequest = status.as_status_code());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::{
use log::*;
use std::{collections::HashMap, convert::TryInto, sync::Arc};
use tari_common_types::types::BlockHash;
use tari_comms::protocol::rpc::{RpcError::RequestFailed, RpcStatusCode::NotFound};
use tari_comms::protocol::rpc::RpcError::RequestFailed;
use tari_core::{
base_node::rpc::BaseNodeWalletRpcClient,
blocks::BlockHeader,
Expand Down Expand Up @@ -353,7 +353,7 @@ where
info!(target: LOG_TARGET, "Error asking base node for header:{}", rpc_error);
match &rpc_error {
RequestFailed(status) => {
if status.status_code() == NotFound {
if status.as_status_code().is_not_found() {
return Ok(None);
} else {
return Err(rpc_error.into());
Expand Down
2 changes: 1 addition & 1 deletion base_layer/wallet/src/transaction_service/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl Default for TransactionServiceConfig {
direct_send_timeout: Duration::from_secs(20),
broadcast_send_timeout: Duration::from_secs(60),
low_power_polling_timeout: Duration::from_secs(300),
transaction_resend_period: Duration::from_secs(3600),
transaction_resend_period: Duration::from_secs(600),
resend_response_cooldown: Duration::from_secs(300),
pending_transaction_cancellation_timeout: Duration::from_secs(259200), // 3 Days
num_confirmations_required: 3,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ where
warn!(target: LOG_TARGET, "Error asking base node for header:{}", rpc_error);
match &rpc_error {
RequestFailed(status) => {
if status.status_code() == NotFound {
if status.as_status_code() == NotFound {
return Ok(None);
} else {
return Err(rpc_error.into());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ where TBackend: WalletBackend + 'static
// this returns the index of the vec of hashes we sent it, that is the last hash it knows of.
match client.find_chain_split(request).await {
Ok(_) => Ok(metadata.utxo_index + 1),
Err(RpcError::RequestFailed(err)) if err.status_code().is_not_found() => {
Err(RpcError::RequestFailed(err)) if err.as_status_code().is_not_found() => {
warn!(target: LOG_TARGET, "Reorg detected: {}", err);
// The node does not know of the last hash we scanned, thus we had a chain split.
// We now start at 0 again.
Expand Down
2 changes: 1 addition & 1 deletion comms/dht/examples/memory_net/utilities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ pub async fn network_connectivity_stats(nodes: &[TestNode], wallets: &[TestNode]
total += t;
avg += a;
println!(
"{} total connections on the network. ({} per node on average)",
"{} total connections on the network. ({} per peer on average)",
total,
avg / (wallets.len() + nodes.len())
);
Expand Down
2 changes: 1 addition & 1 deletion comms/dht/src/rpc/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ mod get_closer_peers {
let node_id = NodeId::default();
let req = mock.request_with_context(node_id, req);
let err = service.get_closer_peers(req).await.unwrap_err();
assert_eq!(err.status_code(), RpcStatusCode::BadRequest);
assert_eq!(err.as_status_code(), RpcStatusCode::BadRequest);
}
}

Expand Down
2 changes: 1 addition & 1 deletion comms/rpc_macros/tests/macro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ async fn it_returns_an_error_for_invalid_method_nums() {
.await
.unwrap_err();

unpack_enum!(RpcStatusCode::UnsupportedMethod = err.status_code());
unpack_enum!(RpcStatusCode::UnsupportedMethod = err.as_status_code());
}

#[tokio::test]
Expand Down
4 changes: 4 additions & 0 deletions comms/src/connection_manager/dialer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::{
common,
dial_state::DialState,
manager::{ConnectionManagerConfig, ConnectionManagerEvent},
metrics,
peer_connection,
},
multiaddr::Multiaddr,
Expand Down Expand Up @@ -193,6 +194,7 @@ where
dial_result: Result<PeerConnection, ConnectionManagerError>,
) {
let node_id = dial_state.peer().node_id.clone();
metrics::pending_connections(Some(&node_id), ConnectionDirection::Outbound).inc();

let removed = self.cancel_signals.remove(&node_id);
drop(removed);
Expand All @@ -213,6 +215,8 @@ where
},
}

metrics::pending_connections(Some(&node_id), ConnectionDirection::Outbound).dec();

if self.pending_dial_requests.contains_key(&node_id) {
self.reply_to_pending_requests(&node_id, dial_result.clone());
}
Expand Down
4 changes: 4 additions & 0 deletions comms/src/connection_manager/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::{
bounded_executor::BoundedExecutor,
connection_manager::{
liveness::LivenessSession,
metrics,
wire_mode::{WireMode, LIVENESS_WIRE_MODE},
},
multiaddr::Multiaddr,
Expand Down Expand Up @@ -239,6 +240,7 @@ where

let span = span!(Level::TRACE, "connection_mann::listener::inbound_task",);
let inbound_fut = async move {
metrics::pending_connections(None, ConnectionDirection::Inbound).inc();
match Self::read_wire_format(&mut socket, config.time_to_first_byte).await {
Ok(WireMode::Comms(byte)) if byte == config.network_info.network_byte => {
let this_node_id_str = node_identity.node_id().short_str();
Expand Down Expand Up @@ -325,6 +327,8 @@ where
);
},
}

metrics::pending_connections(None, ConnectionDirection::Inbound).dec();
}
.instrument(span);

Expand Down
20 changes: 17 additions & 3 deletions comms/src/connection_manager/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use super::{
};
use crate::{
backoff::Backoff,
connection_manager::{metrics, ConnectionDirection},
multiplexing::Substream,
noise::NoiseConfig,
peer_manager::{NodeId, NodeIdentity},
Expand Down Expand Up @@ -397,10 +398,14 @@ where
node_id.short_str(),
proto_str
);
metrics::inbound_substream_counter(&node_id, &protocol).inc();
let notify_fut = self
.protocols
.notify(&protocol, ProtocolEvent::NewInboundSubstream(node_id, stream));
match time::timeout(Duration::from_secs(10), notify_fut).await {
Ok(Ok(_)) => {
debug!(target: LOG_TARGET, "Protocol notification for '{}' sent", proto_str);
},
Ok(Err(err)) => {
error!(
target: LOG_TARGET,
Expand All @@ -413,12 +418,21 @@ where
"Error sending NewSubstream notification for protocol '{}' because {}", proto_str, err
);
},
_ => {
debug!(target: LOG_TARGET, "Protocol notification for '{}' sent", proto_str);
},
}
},

PeerConnected(conn) => {
metrics::successful_connections(conn.peer_node_id(), conn.direction()).inc();
self.publish_event(PeerConnected(conn));
},
PeerConnectFailed(peer, err) => {
metrics::failed_connections(&peer, ConnectionDirection::Outbound).inc();
self.publish_event(PeerConnectFailed(peer, err));
},
PeerInboundConnectFailed(err) => {
metrics::failed_connections(&Default::default(), ConnectionDirection::Inbound).inc();
self.publish_event(PeerInboundConnectFailed(err));
},
event => {
self.publish_event(event);
},
Expand Down
Loading

0 comments on commit c0d625c

Please sign in to comment.