Skip to content

Commit

Permalink
feat(utxo): prioritize electrum connections (#1966)
Browse files Browse the repository at this point in the history
Refactors electrum client to add min/max connection controls, with server priority based on list order. Electrum client can now operate in single-server mode (1,1) to reduce resource usage (especially beneficial for mobile) or multi-server (legacy) mode for reliability. Higher priority servers automatically replace lower priority ones when reconnecting during periodic retries or when connection count drops below minimum.
  • Loading branch information
rozhkovdmitrii authored Oct 25, 2024
1 parent df3d39c commit f487947
Show file tree
Hide file tree
Showing 42 changed files with 3,418 additions and 2,491 deletions.
61 changes: 17 additions & 44 deletions mm2src/adex_cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 8 additions & 7 deletions mm2src/adex_cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,32 +7,33 @@ description = "Provides a CLI interface and facilitates interoperating to komodo
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
anyhow = { version = "=1.0.42", features = ["std"] }
async-trait = "=0.1.52"
anyhow = { version = "1.0", features = ["std"] }
async-trait = "0.1"
clap = { version = "4.2", features = ["derive"] }
common = { path = "../common" }
derive_more = "0.99"
directories = "5.0"
env_logger = "0.7.1"
env_logger = "0.9.3"
http = "0.2"
hyper = { version = "0.14.26", features = ["client", "http2", "tcp"] }
hyper-rustls = "0.24.0"
gstuff = { version = "=0.7.4" , features = [ "nightly" ]}
hyper-rustls = "0.24"
gstuff = { version = "0.7" , features = [ "nightly" ]}
inquire = "0.6"
itertools = "0.10"
log = "0.4.21"
mm2_net = { path = "../mm2_net" }
mm2_number = { path = "../mm2_number" }
mm2_rpc = { path = "../mm2_rpc"}
mm2_core = { path = "../mm2_core" }
passwords = "3.1"
rpc = { path = "../mm2_bitcoin/rpc" }
rustls = { version = "0.21", features = [ "dangerous_configuration" ] }
serde = "1.0"
serde_json = { version = "1", features = ["preserve_order", "raw_value"] }
sysinfo = "0.28"
tiny-bip39 = "0.8.0"
tokio = { version = "=1.25.0", features = [ "macros" ] }
uuid = { version = "=1.2.2", features = ["fast-rng", "serde", "v4"] }
tokio = { version = "1.20.0", features = [ "macros" ] }
uuid = { version = "1.2.2", features = ["fast-rng", "serde", "v4"] }

[target.'cfg(windows)'.dependencies]
winapi = { version = "0.3.3", features = ["processthreadsapi", "winnt"] }
4 changes: 2 additions & 2 deletions mm2src/adex_cli/src/adex_proc/adex_proc_impl.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use anyhow::{anyhow, bail, Result};
use log::{error, info, warn};
use log::{debug, error, info, warn};
use mm2_rpc::data::legacy::{BalanceResponse, CoinInitResponse, GetEnabledResponse, Mm2RpcResult, MmVersionResponse,
OrderbookRequest, OrderbookResponse, SellBuyRequest, SellBuyResponse, Status};
use serde_json::{json, Value as Json};
Expand Down Expand Up @@ -38,7 +38,7 @@ impl<T: Transport, P: ResponseHandler, C: AdexConfig + 'static> AdexProc<'_, '_,

let activation_scheme = get_activation_scheme()?;
let activation_method = activation_scheme.get_activation_method(asset)?;

debug!("Got activation scheme for the coin: {}, {:?}", asset, activation_method);
let enable = Command::builder()
.flatten_data(activation_method)
.userpass(self.get_rpc_password()?)
Expand Down
5 changes: 5 additions & 0 deletions mm2src/adex_cli/src/rpc_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ pub(crate) struct ElectrumRequest {
#[serde(skip_serializing_if = "Vec::is_empty")]
pub(super) servers: Vec<Server>,
#[serde(skip_serializing_if = "Option::is_none")]
min_connected: Option<usize>,
#[serde(skip_serializing_if = "Option::is_none")]
max_connected: Option<usize>,
#[serde(skip_serializing_if = "Option::is_none")]
mm2: Option<u8>,
#[serde(default)]
tx_history: bool,
Expand All @@ -62,4 +66,5 @@ pub(super) struct Server {
protocol: ElectrumProtocol,
#[serde(default)]
disable_cert_verification: bool,
pub timeout_sec: Option<u64>,
}
22 changes: 11 additions & 11 deletions mm2src/coins/eth/web3_transport/http_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,10 @@ async fn send_request(request: Call, transport: HttpTransport) -> Result<Json, E
request_bytes.len(),
common::PROXY_REQUEST_EXPIRATION_SEC,
)
.map_err(|e| request_failed_error(request.clone(), Web3RpcError::Internal(e.to_string())))?;
.map_err(|e| request_failed_error(&request, Web3RpcError::Internal(e.to_string())))?;

let proxy_sign_serialized = serde_json::to_string(&proxy_sign)
.map_err(|e| request_failed_error(request.clone(), Web3RpcError::Internal(e.to_string())))?;
.map_err(|e| request_failed_error(&request, Web3RpcError::Internal(e.to_string())))?;

req.headers_mut()
.insert(X_AUTH_PAYLOAD, proxy_sign_serialized.parse().unwrap());
Expand All @@ -145,22 +145,22 @@ async fn send_request(request: Call, transport: HttpTransport) -> Result<Json, E
transport.node.uri, REQUEST_TIMEOUT_S, method, id
);
warn!("{}", error);
return Err(request_failed_error(request, Web3RpcError::Transport(error)));
return Err(request_failed_error(&request, Web3RpcError::Transport(error)));
},
};

let (status, _headers, body) = match res {
Ok(r) => r,
Err(err) => {
return Err(request_failed_error(request, Web3RpcError::Transport(err.to_string())));
return Err(request_failed_error(&request, Web3RpcError::Transport(err.to_string())));
},
};

transport.event_handlers.on_incoming_response(&body);

if !status.is_success() {
return Err(request_failed_error(
request,
&request,
Web3RpcError::Transport(format!(
"Server: '{}', response !200: {}, {}",
transport.node.uri,
Expand All @@ -174,7 +174,7 @@ async fn send_request(request: Call, transport: HttpTransport) -> Result<Json, E
Ok(r) => r,
Err(err) => {
return Err(request_failed_error(
request,
&request,
Web3RpcError::InvalidResponse(format!("Server: '{}', error: {}", transport.node.uri, err)),
));
},
Expand All @@ -195,10 +195,10 @@ async fn send_request(request: Call, transport: HttpTransport) -> Result<Json, E
request_bytes.len(),
common::PROXY_REQUEST_EXPIRATION_SEC,
)
.map_err(|e| request_failed_error(request.clone(), Web3RpcError::Internal(e.to_string())))?;
.map_err(|e| request_failed_error(&request, Web3RpcError::Internal(e.to_string())))?;

let proxy_sign_serialized = serde_json::to_string(&proxy_sign)
.map_err(|e| request_failed_error(request.clone(), Web3RpcError::Internal(e.to_string())))?;
.map_err(|e| request_failed_error(&request, Web3RpcError::Internal(e.to_string())))?;

Some(proxy_sign_serialized)
} else {
Expand All @@ -215,11 +215,11 @@ async fn send_request(request: Call, transport: HttpTransport) -> Result<Json, E
{
Ok(response_json) => Ok(response_json),
Err(Error::Transport(e)) => Err(request_failed_error(
request,
&request,
Web3RpcError::Transport(format!("Server: '{}', error: {}", transport.node.uri, e)),
)),
Err(e) => Err(request_failed_error(
request,
&request,
Web3RpcError::InvalidResponse(format!("Server: '{}', error: {}", transport.node.uri, e)),
)),
}
Expand Down Expand Up @@ -275,7 +275,7 @@ async fn send_request_once(
}
}

fn request_failed_error(request: Call, error: Web3RpcError) -> Error {
fn request_failed_error(request: &Call, error: Web3RpcError) -> Error {
let error = format!("request {:?} failed: {}", request, error);
Error::Transport(TransportError::Message(error))
}
2 changes: 1 addition & 1 deletion mm2src/coins/eth/web3_transport/websocket_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ async fn send_request(

let mut tx = transport.controller_channel.tx.lock().await;

let (notification_sender, notification_receiver) = futures::channel::oneshot::channel::<Vec<u8>>();
let (notification_sender, notification_receiver) = oneshot::channel::<Vec<u8>>();

event_handlers.on_outgoing_request(&request_bytes);

Expand Down
6 changes: 3 additions & 3 deletions mm2src/coins/lightning/ln_platform.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use super::*;
use crate::lightning::ln_errors::{SaveChannelClosingError, SaveChannelClosingResult};
use crate::utxo::rpc_clients::{BestBlock as RpcBestBlock, BlockHashOrHeight, ConfirmedTransactionInfo,
ElectrumBlockHeader, ElectrumClient, ElectrumNonce, EstimateFeeMethod,
UtxoRpcClientEnum, UtxoRpcResult};
use crate::lightning::ln_utils::RpcBestBlock;
use crate::utxo::rpc_clients::{BlockHashOrHeight, ConfirmedTransactionInfo, ElectrumBlockHeader, ElectrumClient,
ElectrumNonce, EstimateFeeMethod, UtxoRpcClientEnum, UtxoRpcResult};
use crate::utxo::spv::SimplePaymentVerification;
use crate::utxo::utxo_standard::UtxoStandardCoin;
use crate::utxo::GetConfirmedTxError;
Expand Down
Loading

0 comments on commit f487947

Please sign in to comment.