-
Notifications
You must be signed in to change notification settings - Fork 110
/
Copy pathisolated.rs
138 lines (120 loc) · 4.38 KB
/
isolated.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
//! Code for creating isolated connections to specific peers.
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use futures::future::{FutureExt, TryFutureExt};
use tokio::net::TcpStream;
use tower::{
util::{BoxService, Oneshot},
Service,
};
use crate::{peer, BoxError, Config, Request, Response};
use peer::ConnectedAddr;
/// Use the provided TCP connection to create a Zcash connection completely
/// isolated from all other node state.
///
/// The connection pool returned by `init` should be used for all requests that
/// don't require isolated state or use of an existing TCP connection. However,
/// this low-level API is useful for custom network crawlers or Tor connections.
///
/// In addition to being completely isolated from all other node state, this
/// method also aims to be minimally distinguishable from other clients.
///
/// Note that this method does not implement any timeout behavior, so callers may
/// want to layer it with a timeout as appropriate for their application.
///
/// # Inputs
///
/// - `conn`: an existing TCP connection to use. Passing an existing TCP
/// connection allows this method to be used with clearnet or Tor transports.
///
/// - `user_agent`: a valid BIP14 user-agent, e.g., the empty string.
///
/// # Bug
///
/// `connect_isolated` only works on `Mainnet`, see #1687.
pub fn connect_isolated(
conn: TcpStream,
user_agent: String,
) -> impl Future<
Output = Result<
BoxService<Request, Response, Box<dyn std::error::Error + Send + Sync + 'static>>,
Box<dyn std::error::Error + Send + Sync + 'static>,
>,
> {
let handshake = peer::Handshake::builder()
.with_config(Config::default())
.with_inbound_service(tower::service_fn(|_req| async move {
Ok::<Response, Box<dyn std::error::Error + Send + Sync + 'static>>(Response::Nil)
}))
.with_user_agent(user_agent)
.finish()
.expect("provided mandatory builder parameters");
// Don't send any metadata about the connection
let connected_addr = ConnectedAddr::new_isolated();
Oneshot::new(handshake, (conn, connected_addr))
.map_ok(|client| BoxService::new(Wrapper(client)))
}
// This can be deleted when a new version of Tower with map_err is released.
struct Wrapper(peer::Client);
impl Service<Request> for Wrapper {
type Response = Response;
type Error = BoxError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.poll_ready(cx).map_err(Into::into)
}
fn call(&mut self, req: Request) -> Self::Future {
self.0.call(req).map_err(Into::into).boxed()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn connect_isolated_sends_minimally_distinguished_version_message() {
use crate::{
protocol::external::{Codec, Message},
types::PeerServices,
};
use futures::stream::StreamExt;
use tokio_util::codec::Framed;
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let listen_addr = listener.local_addr().unwrap();
let conn = tokio::net::TcpStream::connect(listen_addr).await.unwrap();
tokio::spawn(connect_isolated(conn, "".to_string()));
let (conn, _) = listener.accept().await.unwrap();
let mut stream = Framed::new(conn, Codec::builder().finish());
if let Message::Version {
services,
timestamp,
address_from,
user_agent,
start_height,
relay,
..
} = stream
.next()
.await
.expect("stream item")
.expect("item is Ok(msg)")
{
// Check that the version message sent by connect_isolated
// has the fields specified in the Stolon RFC.
assert_eq!(services, PeerServices::empty());
assert_eq!(timestamp.timestamp() % (5 * 60), 0);
assert_eq!(
address_from,
(PeerServices::empty(), "0.0.0.0:8233".parse().unwrap())
);
assert_eq!(user_agent, "");
assert_eq!(start_height.0, 0);
assert!(!relay);
} else {
panic!("handshake did not send version message");
}
}
}