-
Notifications
You must be signed in to change notification settings - Fork 111
/
Copy pathserver.rs
136 lines (119 loc) · 4.88 KB
/
server.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
//! A JSON-RPC 1.0 & 2.0 endpoint for Zebra.
//!
//! This endpoint is compatible with clients that incorrectly send
//! `"jsonrpc" = 1.0` fields in JSON-RPC 1.0 requests,
//! such as `lightwalletd`.
//!
//! See the full list of
//! [Differences between JSON-RPC 1.0 and 2.0.](https://www.simple-is-better.org/rpc/#differences-between-1-0-and-2-0)
use std::panic;
use jsonrpc_core::{Compatibility, MetaIoHandler};
use jsonrpc_http_server::ServerBuilder;
use tokio::task::JoinHandle;
use tower::{buffer::Buffer, Service};
use tracing::*;
use tracing_futures::Instrument;
use zebra_chain::{chain_tip::ChainTip, parameters::Network};
use zebra_node_services::{mempool, BoxError};
use crate::{
config::Config,
methods::{Rpc, RpcImpl},
server::{compatibility::FixHttpRequestMiddleware, tracing_middleware::TracingMiddleware},
};
pub mod compatibility;
mod tracing_middleware;
#[cfg(test)]
mod tests;
/// Zebra RPC Server
#[derive(Clone, Debug)]
pub struct RpcServer;
impl RpcServer {
/// Start a new RPC server endpoint
pub fn spawn<Version, Mempool, State, Tip>(
config: Config,
app_version: Version,
mempool: Buffer<Mempool, mempool::Request>,
state: State,
latest_chain_tip: Tip,
network: Network,
) -> (JoinHandle<()>, JoinHandle<()>)
where
Version: ToString,
Mempool: tower::Service<mempool::Request, Response = mempool::Response, Error = BoxError>
+ 'static,
Mempool::Future: Send,
State: Service<
zebra_state::ReadRequest,
Response = zebra_state::ReadResponse,
Error = zebra_state::BoxError,
> + Clone
+ Send
+ Sync
+ 'static,
State::Future: Send,
Tip: ChainTip + Clone + Send + Sync + 'static,
{
if let Some(listen_addr) = config.listen_addr {
info!("Trying to open RPC endpoint at {}...", listen_addr,);
// Initialize the rpc methods with the zebra version
let (rpc_impl, rpc_tx_queue_task_handle) = RpcImpl::new(
app_version,
network,
config.debug_force_finished_sync,
mempool,
state,
latest_chain_tip,
);
// Create handler compatible with V1 and V2 RPC protocols
let mut io: MetaIoHandler<(), _> =
MetaIoHandler::new(Compatibility::Both, TracingMiddleware);
io.extend_with(rpc_impl.to_delegate());
// If zero, automatically scale threads to the number of CPU cores
let mut parallel_cpu_threads = config.parallel_cpu_threads;
if parallel_cpu_threads == 0 {
parallel_cpu_threads = num_cpus::get();
}
// The server is a blocking task, which blocks on executor shutdown.
// So we need to create and spawn it on a std::thread, inside a tokio blocking task.
// (Otherwise tokio panics when we shut down the RPC server.)
let span = Span::current();
let server = move || {
span.in_scope(|| {
// Use a different tokio executor from the rest of Zebra,
// so that large RPCs and any task handling bugs don't impact Zebra.
//
// TODO:
// - return server.close_handle(), which can shut down the RPC server,
// and add it to the server tests
let server = ServerBuilder::new(io)
.threads(parallel_cpu_threads)
// TODO: disable this security check if we see errors from lightwalletd
//.allowed_hosts(DomainsValidation::Disabled)
.request_middleware(FixHttpRequestMiddleware)
.start_http(&listen_addr)
.expect("Unable to start RPC server");
info!("Opened RPC endpoint at {}", server.address());
server.wait();
info!("Stopping RPC endpoint");
})
};
(
tokio::task::spawn_blocking(|| {
let thread_handle = std::thread::spawn(server);
// Propagate panics from the inner std::thread to the outer tokio blocking task
match thread_handle.join() {
Ok(()) => (),
Err(panic_object) => panic::resume_unwind(panic_object),
}
}),
rpc_tx_queue_task_handle,
)
} else {
// There is no RPC port, so the RPC tasks do nothing.
(
tokio::task::spawn(futures::future::pending().in_current_span()),
tokio::task::spawn(futures::future::pending().in_current_span()),
)
}
}
}