From 9b94a81319e091d7372a01910a7303508fc5b5c4 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Wed, 5 Sep 2018 09:54:04 +0300 Subject: [PATCH] limit retry count in OnDemand --- substrate/client/src/light/backend.rs | 1 + substrate/client/src/light/blockchain.rs | 1 + substrate/client/src/light/call_executor.rs | 2 + substrate/client/src/light/fetcher.rs | 10 ++ substrate/network/src/on_demand.rs | 128 +++++++++++++++++--- 5 files changed, 128 insertions(+), 14 deletions(-) diff --git a/substrate/client/src/light/backend.rs b/substrate/client/src/light/backend.rs index 3da10c7cf4604..54291521bb0da 100644 --- a/substrate/client/src/light/backend.rs +++ b/substrate/client/src/light/backend.rs @@ -196,6 +196,7 @@ impl StateBackend for OnDemandState block: self.block, header: header.expect("if block above guarantees that header is_some(); qed"), key: key.to_vec(), + retry_count: None, }) .into_future().wait() } diff --git a/substrate/client/src/light/blockchain.rs b/substrate/client/src/light/blockchain.rs index 9f788e7f2c9bd..11f3070aa6d4d 100644 --- a/substrate/client/src/light/blockchain.rs +++ b/substrate/client/src/light/blockchain.rs @@ -101,6 +101,7 @@ impl BlockchainHeaderBackend for Blockchain where Bloc .remote_header(RemoteHeaderRequest { cht_root: self.storage.cht_root(cht::SIZE, number)?, block: number, + retry_count: None, }) .into_future().wait() .map(Some) diff --git a/substrate/client/src/light/call_executor.rs b/substrate/client/src/light/call_executor.rs index 8d3cd98f9f84e..bc1436aa5625c 100644 --- a/substrate/client/src/light/call_executor.rs +++ b/substrate/client/src/light/call_executor.rs @@ -73,6 +73,7 @@ impl CallExecutor for RemoteCallExec header: block_header, method: method.into(), call_data: call_data.to_vec(), + retry_count: None, }).into_future().wait() } @@ -168,6 +169,7 @@ mod tests { }, method: "authorities".into(), call_data: vec![], + retry_count: None, }, remote_execution_proof).unwrap(); } } diff --git a/substrate/client/src/light/fetcher.rs b/substrate/client/src/light/fetcher.rs index b3944afa508d9..89416a2beee8d 100644 --- a/substrate/client/src/light/fetcher.rs +++ b/substrate/client/src/light/fetcher.rs @@ -43,6 +43,8 @@ pub struct RemoteCallRequest { pub method: String, /// Call data. pub call_data: Vec, + /// Number of times to retry request. None means that default RETRY_COUNT is used. + pub retry_count: Option, } /// Remote canonical header request. @@ -52,6 +54,8 @@ pub struct RemoteHeaderRequest { pub cht_root: Header::Hash, /// Number of the header to query. pub block: Header::Number, + /// Number of times to retry request. None means that default RETRY_COUNT is used. + pub retry_count: Option, } /// Remote storage read request. @@ -63,6 +67,8 @@ pub struct RemoteReadRequest { pub header: Header, /// Storage key to read. pub key: Vec, + /// Number of times to retry request. None means that default RETRY_COUNT is used. + pub retry_count: Option, } /// Light client data fetcher. Implementations of this trait must check if remote data @@ -264,6 +270,7 @@ pub mod tests { block: remote_block_header.hash(), header: remote_block_header, key: b":auth:len".to_vec(), + retry_count: None, }, remote_read_proof).unwrap().unwrap()[0], authorities_len as u8); } @@ -273,6 +280,7 @@ pub mod tests { assert_eq!((&local_checker as &FetchChecker).check_header_proof(&RemoteHeaderRequest::
{ cht_root: local_cht_root, block: 1, + retry_count: None, }, Some(remote_block_header.clone()), remote_header_proof).unwrap(), remote_block_header); } @@ -283,6 +291,7 @@ pub mod tests { assert!((&local_checker as &FetchChecker).check_header_proof(&RemoteHeaderRequest::
{ cht_root: Default::default(), block: 1, + retry_count: None, }, Some(remote_block_header.clone()), remote_header_proof).is_err()); } @@ -293,6 +302,7 @@ pub mod tests { assert!((&local_checker as &FetchChecker).check_header_proof(&RemoteHeaderRequest::
{ cht_root: local_cht_root, block: 1, + retry_count: None, }, Some(remote_block_header.clone()), remote_header_proof).is_err()); } } diff --git a/substrate/network/src/on_demand.rs b/substrate/network/src/on_demand.rs index a066926237987..7883b5fa14360 100644 --- a/substrate/network/src/on_demand.rs +++ b/substrate/network/src/on_demand.rs @@ -35,6 +35,8 @@ use runtime_primitives::traits::{Block as BlockT, Header as HeaderT}; /// Remote request timeout. const REQUEST_TIMEOUT: Duration = Duration::from_secs(15); +/// Default request retry count. +const RETRY_COUNT: usize = 1; /// On-demand service API. pub trait OnDemandService: Send + Sync { @@ -85,6 +87,7 @@ struct OnDemandCore> { struct Request { id: u64, timestamp: Instant, + retry_count: usize, data: RequestData, } @@ -139,9 +142,9 @@ impl OnDemand where } /// Schedule && dispatch all scheduled requests. - fn schedule_request(&self, data: RequestData, result: R) -> R { + fn schedule_request(&self, retry_count: Option, data: RequestData, result: R) -> R { let mut core = self.core.lock(); - core.insert(data); + core.insert(retry_count.unwrap_or(RETRY_COUNT), data); core.dispatch(); result } @@ -158,21 +161,31 @@ impl OnDemand where }, }; - let retry_request_data = match try_accept(request) { - Accept::Ok => None, + let retry_count = request.retry_count; + let (retry_count, retry_request_data) = match try_accept(request) { + Accept::Ok => (retry_count, None), Accept::CheckFailed(error, retry_request_data) => { io.report_peer(peer, Severity::Bad(&format!("Failed to check remote {} response from peer: {}", rtype, error))); core.remove_peer(peer); - Some(retry_request_data) + + if retry_count > 0 { + (retry_count - 1, Some(retry_request_data)) + } else { + trace!(target: "sync", "Failed to get remote {} response for given number of retries", rtype); + retry_request_data.fail(client::error::ErrorKind::RemoteFetchFailed.into()); + (0, None) + } }, Accept::Unexpected(retry_request_data) => { - trace!(target: "sync", "Unexpected response to remote {} from peer {}", rtype, peer); - Some(retry_request_data) + io.report_peer(peer, Severity::Bad(&format!("Unexpected response to remote {} from peer", rtype))); + core.remove_peer(peer); + + (retry_count, Some(retry_request_data)) }, }; if let Some(request_data) = retry_request_data { - core.insert(request_data); + core.insert(retry_count, request_data); } core.dispatch(); @@ -262,19 +275,19 @@ impl Fetcher for OnDemand where fn remote_header(&self, request: RemoteHeaderRequest) -> Self::RemoteHeaderResult { let (sender, receiver) = channel(); - self.schedule_request(RequestData::RemoteHeader(request, sender), + self.schedule_request(request.retry_count.clone(), RequestData::RemoteHeader(request, sender), RemoteResponse { receiver }) } fn remote_read(&self, request: RemoteReadRequest) -> Self::RemoteReadResult { let (sender, receiver) = channel(); - self.schedule_request(RequestData::RemoteRead(request, sender), + self.schedule_request(request.retry_count.clone(), RequestData::RemoteRead(request, sender), RemoteResponse { receiver }) } fn remote_call(&self, request: RemoteCallRequest) -> Self::RemoteCallResult { let (sender, receiver) = channel(); - self.schedule_request(RequestData::RemoteCall(request, sender), + self.schedule_request(request.retry_count.clone(), RequestData::RemoteCall(request, sender), RemoteResponse { receiver }) } } @@ -314,13 +327,14 @@ impl OnDemandCore where } } - pub fn insert(&mut self, data: RequestData) { + pub fn insert(&mut self, retry_count: usize, data: RequestData) { let request_id = self.next_request_id; self.next_request_id += 1; self.pending_requests.push_back(Request { id: request_id, timestamp: Instant::now(), + retry_count, data, }); } @@ -385,6 +399,17 @@ impl Request { } } +impl RequestData { + pub fn fail(self, error: client::error::Error) { + // don't care if anyone is listening + match self { + RequestData::RemoteHeader(_, sender) => { let _ = sender.send(Err(error)); }, + RequestData::RemoteCall(_, sender) => { let _ = sender.send(Err(error)); }, + RequestData::RemoteRead(_, sender) => { let _ = sender.send(Err(error)); }, + } + } +} + #[cfg(test)] pub mod tests { use std::collections::VecDeque; @@ -503,6 +528,7 @@ pub mod tests { header: dummy_header(), method: "test".into(), call_data: vec![], + retry_count: None, }); assert_eq!(vec![1], on_demand.core.lock().idle_peers.iter().cloned().collect::>()); assert_eq!(vec![0], on_demand.core.lock().active_peers.keys().cloned().collect::>()); @@ -526,6 +552,7 @@ pub mod tests { header: dummy_header(), method: "test".into(), call_data: vec![], + retry_count: None, }); receive_call_response(&*on_demand, &mut network, 0, 1); assert!(network.to_disconnect.contains(&0)); @@ -542,6 +569,7 @@ pub mod tests { header: dummy_header(), method: "test".into(), call_data: vec![], + retry_count: Some(1), }); on_demand.on_connect(0, Roles::FULL); @@ -561,6 +589,72 @@ pub mod tests { assert!(network.to_disconnect.contains(&0)); } + #[test] + fn disconnects_from_peer_on_wrong_response_type() { + let (_x, on_demand) = dummy(false); + let queue = RwLock::new(VecDeque::new()); + let mut network = TestIo::new(&queue, None); + on_demand.on_connect(0, Roles::FULL); + + on_demand.remote_call(RemoteCallRequest { + block: Default::default(), + header: dummy_header(), + method: "test".into(), + call_data: vec![], + retry_count: Some(1), + }); + + on_demand.on_remote_read_response(&mut network, 0, message::RemoteReadResponse { + id: 0, + proof: vec![vec![2]], + }); + assert!(network.to_disconnect.contains(&0)); + assert_eq!(on_demand.core.lock().pending_requests.len(), 1); + } + + #[test] + fn receives_remote_failure_after_retry_count_failures() { + use parking_lot::{Condvar, Mutex}; + + let retry_count = 2; + let (_x, on_demand) = dummy(false); + let queue = RwLock::new(VecDeque::new()); + let mut network = TestIo::new(&queue, None); + for i in 0..retry_count+1 { + on_demand.on_connect(i, Roles::FULL); + } + + let sync = Arc::new((Mutex::new(0), Mutex::new(0), Condvar::new())); + let thread_sync = sync.clone(); + + let response = on_demand.remote_call(RemoteCallRequest { + block: Default::default(), + header: dummy_header(), + method: "test".into(), + call_data: vec![], + retry_count: Some(retry_count) + }); + let thread = ::std::thread::spawn(move || { + let &(ref current, ref finished_at, ref finished) = &*thread_sync; + let _ = response.wait().unwrap_err(); + *finished_at.lock() = *current.lock(); + finished.notify_one(); + }); + + let &(ref current, ref finished_at, ref finished) = &*sync; + for i in 0..retry_count+1 { + let mut current = current.lock(); + *current = *current + 1; + receive_call_response(&*on_demand, &mut network, i, i as u64); + } + + let mut finished_at = finished_at.lock(); + assert!(!finished.wait_for(&mut finished_at, ::std::time::Duration::from_millis(1000)).timed_out()); + assert_eq!(*finished_at, retry_count + 1); + + thread.join().unwrap(); + } + #[test] fn receives_remote_call_response() { let (_x, on_demand) = dummy(true); @@ -573,6 +667,7 @@ pub mod tests { header: dummy_header(), method: "test".into(), call_data: vec![], + retry_count: None, }); let thread = ::std::thread::spawn(move || { let result = response.wait().unwrap(); @@ -593,7 +688,8 @@ pub mod tests { let response = on_demand.remote_read(RemoteReadRequest { header: dummy_header(), block: Default::default(), - key: b":key".to_vec() + key: b":key".to_vec(), + retry_count: None, }); let thread = ::std::thread::spawn(move || { let result = response.wait().unwrap(); @@ -614,7 +710,11 @@ pub mod tests { let mut network = TestIo::new(&queue, None); on_demand.on_connect(0, Roles::FULL); - let response = on_demand.remote_header(RemoteHeaderRequest { cht_root: Default::default(), block: 1 }); + let response = on_demand.remote_header(RemoteHeaderRequest { + cht_root: Default::default(), + block: 1, + retry_count: None, + }); let thread = ::std::thread::spawn(move || { let result = response.wait().unwrap(); assert_eq!(result.hash(), "80729accb7bb10ff9c637a10e8bb59f21c52571aa7b46544c5885ca89ed190f4".into());