From 1752ec451b1d210e90607a591988636970151562 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Wed, 8 Aug 2018 13:02:15 +0300 Subject: [PATCH] limit number of retries for light client queries --- substrate/client/src/light/backend.rs | 1 + substrate/client/src/light/blockchain.rs | 1 + substrate/client/src/light/call_executor.rs | 2 + substrate/client/src/light/fetcher.rs | 12 ++ substrate/network/src/on_demand.rs | 145 +++++++++++++++++--- 5 files changed, 143 insertions(+), 18 deletions(-) diff --git a/substrate/client/src/light/backend.rs b/substrate/client/src/light/backend.rs index 5d6d33f7a7a1c..4307c8ca95d45 100644 --- a/substrate/client/src/light/backend.rs +++ b/substrate/client/src/light/backend.rs @@ -158,6 +158,7 @@ impl StateBackend for OnDemandState where Block: BlockT, F: .remote_read(RemoteReadRequest { block: self.block, key: key.to_vec(), + retry_count: None, }) .into_future().wait() } diff --git a/substrate/client/src/light/blockchain.rs b/substrate/client/src/light/blockchain.rs index 443f92f78c197..27d6aaa54248e 100644 --- a/substrate/client/src/light/blockchain.rs +++ b/substrate/client/src/light/blockchain.rs @@ -99,6 +99,7 @@ impl BlockchainHeaderBackend for Blockchain where Bloc self.fetcher().upgrade().ok_or(ClientErrorKind::NotAvailableOnLightClient)? .remote_header(RemoteHeaderRequest { block: number, + retry_count: None, }) .into_future().wait() .map(Some) diff --git a/substrate/client/src/light/call_executor.rs b/substrate/client/src/light/call_executor.rs index 40d8665510100..eaab1532e5570 100644 --- a/substrate/client/src/light/call_executor.rs +++ b/substrate/client/src/light/call_executor.rs @@ -65,6 +65,7 @@ impl CallExecutor for RemoteCallExecutor block: block_hash.clone(), method: method.into(), call_data: call_data.to_vec(), + retry_count: None, }).into_future().wait() } @@ -160,6 +161,7 @@ mod tests { block: test_client::runtime::Hash::default(), method: "authorities".into(), call_data: vec![], + retry_count: None, }, remote_execution_proof).unwrap(); } } diff --git a/substrate/client/src/light/fetcher.rs b/substrate/client/src/light/fetcher.rs index ee8874adad98c..2c3b34013bfc2 100644 --- a/substrate/client/src/light/fetcher.rs +++ b/substrate/client/src/light/fetcher.rs @@ -39,6 +39,8 @@ pub struct RemoteCallRequest { pub method: String, /// Call data. pub call_data: Vec, + /// Number of times to retry request. None means that default RETRY_COUNT is used. + pub retry_count: Option, } /// Remote canonical header request. @@ -46,6 +48,8 @@ pub struct RemoteCallRequest { pub struct RemoteHeaderRequest { /// Number of the header to query. pub block: Number, + /// Number of times to retry request. None means that default RETRY_COUNT is used. + pub retry_count: Option, } /// Remote storage read request. @@ -55,6 +59,8 @@ pub struct RemoteReadRequest { pub block: Hash, /// Storage key to read. pub key: Vec, + /// Number of times to retry request. None means that default RETRY_COUNT is used. + pub retry_count: Option, } /// Light client data fetcher. Implementations of this trait must check if remote data @@ -281,6 +287,7 @@ pub mod tests { assert_eq!(local_checker.check_read_proof(&RemoteReadRequest { block: remote_block_hash, key: b":auth:len".to_vec(), + retry_count: None, }, remote_read_proof).unwrap().unwrap()[0], authorities_len as u8); } @@ -290,6 +297,7 @@ pub mod tests { assert!(local_checker.check_read_proof(&RemoteReadRequest { block: remote_block_hash, key: b":auth:len".to_vec(), + retry_count: None, }, remote_read_proof).is_err()); } @@ -298,6 +306,7 @@ pub mod tests { let (local_checker, remote_block_header, remote_header_proof) = prepare_for_header_proof_check(true); assert_eq!(local_checker.check_header_proof(&RemoteHeaderRequest { block: 1, + retry_count: None, }, Some(remote_block_header.clone()), remote_header_proof).unwrap(), remote_block_header); } @@ -306,6 +315,7 @@ pub mod tests { let (local_checker, _, remote_header_proof) = prepare_for_header_proof_check(true); assert!(local_checker.check_header_proof(&RemoteHeaderRequest { block: 1, + retry_count: None, }, None, remote_header_proof).is_err()); } @@ -314,6 +324,7 @@ pub mod tests { let (local_checker, remote_block_header, remote_header_proof) = prepare_for_header_proof_check(false); assert!(local_checker.check_header_proof(&RemoteHeaderRequest { block: 1, + retry_count: None, }, Some(remote_block_header.clone()), remote_header_proof).is_err()); } @@ -323,6 +334,7 @@ pub mod tests { remote_block_header.number = 100; assert!(local_checker.check_header_proof(&RemoteHeaderRequest { block: 1, + retry_count: None, }, Some(remote_block_header.clone()), remote_header_proof).is_err()); } } diff --git a/substrate/network/src/on_demand.rs b/substrate/network/src/on_demand.rs index 9c032b3822a5e..65a6ff284de56 100644 --- a/substrate/network/src/on_demand.rs +++ b/substrate/network/src/on_demand.rs @@ -35,6 +35,8 @@ use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor}; /// Remote request timeout. const REQUEST_TIMEOUT: Duration = Duration::from_secs(15); +/// Default request retry count. +const RETRY_COUNT: usize = 1; /// On-demand service API. pub trait OnDemandService: Send + Sync { @@ -85,6 +87,7 @@ struct OnDemandCore> { struct Request { id: u64, timestamp: Instant, + retry_count: usize, data: RequestData, } @@ -139,9 +142,9 @@ impl OnDemand where } /// Schedule && dispatch all scheduled requests. - fn schedule_request(&self, data: RequestData, result: R) -> R { + fn schedule_request(&self, retry_count: Option, data: RequestData, result: R) -> R { let mut core = self.core.lock(); - core.insert(data); + core.insert(retry_count.unwrap_or(RETRY_COUNT), data); core.dispatch(); result } @@ -158,21 +161,31 @@ impl OnDemand where }, }; - let retry_request_data = match try_accept(request) { - Accept::Ok => None, + let retry_count = request.retry_count; + let (retry_count, retry_request_data) = match try_accept(request) { + Accept::Ok => (retry_count, None), Accept::CheckFailed(error, retry_request_data) => { io.report_peer(peer, Severity::Bad(&format!("Failed to check remote {} response from peer: {}", rtype, error))); core.remove_peer(peer); - Some(retry_request_data) + + if retry_count > 0 { + (retry_count - 1, Some(retry_request_data)) + } else { + trace!(target: "sync", "Failed to get remote {} response for given number of retries", rtype); + retry_request_data.fail(client::error::ErrorKind::RemoteFetchFailed.into()); + (0, None) + } }, Accept::Unexpected(retry_request_data) => { - trace!(target: "sync", "Unexpected response to remote {} from peer {}", rtype, peer); - Some(retry_request_data) + io.report_peer(peer, Severity::Bad(&format!("Unexpected response to remote {} from peer", rtype))); + core.remove_peer(peer); + + (retry_count, Some(retry_request_data)) }, }; if let Some(request_data) = retry_request_data { - core.insert(request_data); + core.insert(retry_count, request_data); } core.dispatch(); @@ -262,19 +275,19 @@ impl Fetcher for OnDemand where fn remote_header(&self, request: RemoteHeaderRequest>) -> Self::RemoteHeaderResult { let (sender, receiver) = channel(); - self.schedule_request(RequestData::RemoteHeader(request, sender), + self.schedule_request(request.retry_count.clone(), RequestData::RemoteHeader(request, sender), RemoteResponse { receiver }) } fn remote_read(&self, request: RemoteReadRequest) -> Self::RemoteReadResult { let (sender, receiver) = channel(); - self.schedule_request(RequestData::RemoteRead(request, sender), + self.schedule_request(request.retry_count.clone(), RequestData::RemoteRead(request, sender), RemoteResponse { receiver }) } fn remote_call(&self, request: RemoteCallRequest) -> Self::RemoteCallResult { let (sender, receiver) = channel(); - self.schedule_request(RequestData::RemoteCall(request, sender), + self.schedule_request(request.retry_count.clone(), RequestData::RemoteCall(request, sender), RemoteResponse { receiver }) } } @@ -314,13 +327,14 @@ impl OnDemandCore where } } - pub fn insert(&mut self, data: RequestData) { + pub fn insert(&mut self, retry_count: usize, data: RequestData) { let request_id = self.next_request_id; self.next_request_id += 1; self.pending_requests.push_back(Request { id: request_id, timestamp: Instant::now(), + retry_count, data, }); } @@ -385,6 +399,17 @@ impl Request { } } +impl RequestData { + pub fn fail(self, error: client::error::Error) { + // don't care if anyone is listening + match self { + RequestData::RemoteHeader(_, sender) => { let _ = sender.send(Err(error)); }, + RequestData::RemoteCall(_, sender) => { let _ = sender.send(Err(error)); }, + RequestData::RemoteRead(_, sender) => { let _ = sender.send(Err(error)); }, + } + } +} + #[cfg(test)] pub mod tests { use std::collections::VecDeque; @@ -488,7 +513,12 @@ pub mod tests { assert_eq!(vec![0, 1], on_demand.core.lock().idle_peers.iter().cloned().collect::>()); assert!(on_demand.core.lock().active_peers.is_empty()); - on_demand.remote_call(RemoteCallRequest { block: Default::default(), method: "test".into(), call_data: vec![] }); + on_demand.remote_call(RemoteCallRequest { + block: Default::default(), + method: "test".into(), + call_data: vec![], + retry_count: None, + }); assert_eq!(vec![1], on_demand.core.lock().idle_peers.iter().cloned().collect::>()); assert_eq!(vec![0], on_demand.core.lock().active_peers.keys().cloned().collect::>()); @@ -506,7 +536,12 @@ pub mod tests { let mut network = TestIo::new(&queue, None); on_demand.on_connect(0, Roles::FULL); - on_demand.remote_call(RemoteCallRequest { block: Default::default(), method: "test".into(), call_data: vec![] }); + on_demand.remote_call(RemoteCallRequest { + block: Default::default(), + method: "test".into(), + call_data: vec![], + retry_count: None, + }); receive_call_response(&*on_demand, &mut network, 0, 1); assert!(network.to_disconnect.contains(&0)); assert_eq!(on_demand.core.lock().pending_requests.len(), 1); @@ -517,7 +552,12 @@ pub mod tests { let (_x, on_demand) = dummy(false); let queue = RwLock::new(VecDeque::new()); let mut network = TestIo::new(&queue, None); - on_demand.remote_call(RemoteCallRequest { block: Default::default(), method: "test".into(), call_data: vec![] }); + on_demand.remote_call(RemoteCallRequest { + block: Default::default(), + method: "test".into(), + call_data: vec![], + retry_count: Some(1), + }); on_demand.on_connect(0, Roles::FULL); receive_call_response(&*on_demand, &mut network, 0, 0); @@ -536,6 +576,66 @@ pub mod tests { assert!(network.to_disconnect.contains(&0)); } + #[test] + fn disconnects_from_peer_on_wrong_response_type() { + let (_x, on_demand) = dummy(false); + let queue = RwLock::new(VecDeque::new()); + let mut network = TestIo::new(&queue, None); + on_demand.on_connect(0, Roles::FULL); + + on_demand.remote_call(RemoteCallRequest { + block: Default::default(), + method: "test".into(), + call_data: vec![], + retry_count: Some(1), + }); + + on_demand.on_remote_read_response(&mut network, 0, message::RemoteReadResponse { + id: 0, + proof: vec![vec![2]], + }); + assert!(network.to_disconnect.contains(&0)); + assert_eq!(on_demand.core.lock().pending_requests.len(), 1); + } + + #[test] + fn receives_remote_failure_after_retry_count_failures() { + use parking_lot::{Condvar, Mutex}; + + let retry_count = 2; + let (_x, on_demand) = dummy(false); + let queue = RwLock::new(VecDeque::new()); + let mut network = TestIo::new(&queue, None); + for i in 0..retry_count+1 { + on_demand.on_connect(i, Roles::FULL); + } + + let sync = Arc::new((Mutex::new(0), Mutex::new(0), Condvar::new())); + let thread_sync = sync.clone(); + + let response = on_demand.remote_call(RemoteCallRequest { block: Default::default(), method: "test".into(), + call_data: vec![], retry_count: Some(retry_count) }); + let thread = ::std::thread::spawn(move || { + let &(ref current, ref finished_at, ref finished) = &*thread_sync; + let _ = response.wait().unwrap_err(); + *finished_at.lock() = *current.lock(); + finished.notify_one(); + }); + + let &(ref current, ref finished_at, ref finished) = &*sync; + for i in 0..retry_count+1 { + let mut current = current.lock(); + *current = *current + 1; + receive_call_response(&*on_demand, &mut network, i, i as u64); + } + + let mut finished_at = finished_at.lock(); + assert!(!finished.wait_for(&mut finished_at, ::std::time::Duration::from_millis(1000)).timed_out()); + assert_eq!(*finished_at, retry_count + 1); + + thread.join().unwrap(); + } + #[test] fn receives_remote_call_response() { let (_x, on_demand) = dummy(true); @@ -543,7 +643,12 @@ pub mod tests { let mut network = TestIo::new(&queue, None); on_demand.on_connect(0, Roles::FULL); - let response = on_demand.remote_call(RemoteCallRequest { block: Default::default(), method: "test".into(), call_data: vec![] }); + let response = on_demand.remote_call(RemoteCallRequest { + block: Default::default(), + method: "test".into(), + call_data: vec![], + retry_count: None, + }); let thread = ::std::thread::spawn(move || { let result = response.wait().unwrap(); assert_eq!(result.return_data, vec![42]); @@ -560,7 +665,11 @@ pub mod tests { let mut network = TestIo::new(&queue, None); on_demand.on_connect(0, Roles::FULL); - let response = on_demand.remote_read(RemoteReadRequest { block: Default::default(), key: b":key".to_vec() }); + let response = on_demand.remote_read(RemoteReadRequest { + block: Default::default(), + key: b":key".to_vec(), + retry_count: None, + }); let thread = ::std::thread::spawn(move || { let result = response.wait().unwrap(); assert_eq!(result, Some(vec![41])); @@ -580,7 +689,7 @@ pub mod tests { let mut network = TestIo::new(&queue, None); on_demand.on_connect(0, Roles::FULL); - let response = on_demand.remote_header(RemoteHeaderRequest { block: 1 }); + let response = on_demand.remote_header(RemoteHeaderRequest { block: 1, retry_count: None, }); let thread = ::std::thread::spawn(move || { let result = response.wait().unwrap(); assert_eq!(result.hash(), "80729accb7bb10ff9c637a10e8bb59f21c52571aa7b46544c5885ca89ed190f4".into());