Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
limit number of retries for light client queries
Browse files Browse the repository at this point in the history
  • Loading branch information
svyatonik committed Aug 8, 2018
1 parent d0555c1 commit 1752ec4
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 18 deletions.
1 change: 1 addition & 0 deletions substrate/client/src/light/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ impl<Block, F> StateBackend for OnDemandState<Block, F> where Block: BlockT, F:
.remote_read(RemoteReadRequest {
block: self.block,
key: key.to_vec(),
retry_count: None,
})
.into_future().wait()
}
Expand Down
1 change: 1 addition & 0 deletions substrate/client/src/light/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ impl<S, F, Block> BlockchainHeaderBackend<Block> for Blockchain<S, F> where Bloc
self.fetcher().upgrade().ok_or(ClientErrorKind::NotAvailableOnLightClient)?
.remote_header(RemoteHeaderRequest {
block: number,
retry_count: None,
})
.into_future().wait()
.map(Some)
Expand Down
2 changes: 2 additions & 0 deletions substrate/client/src/light/call_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ impl<B, F, Block> CallExecutor<Block> for RemoteCallExecutor<B, F>
block: block_hash.clone(),
method: method.into(),
call_data: call_data.to_vec(),
retry_count: None,
}).into_future().wait()
}

Expand Down Expand Up @@ -160,6 +161,7 @@ mod tests {
block: test_client::runtime::Hash::default(),
method: "authorities".into(),
call_data: vec![],
retry_count: None,
}, remote_execution_proof).unwrap();
}
}
12 changes: 12 additions & 0 deletions substrate/client/src/light/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,17 @@ pub struct RemoteCallRequest<Hash: ::std::fmt::Display> {
pub method: String,
/// Call data.
pub call_data: Vec<u8>,
/// Number of times to retry request. None means that default RETRY_COUNT is used.
pub retry_count: Option<usize>,
}

/// Remote canonical header request.
#[derive(Clone, Debug, Default, PartialEq, Eq, Hash)]
pub struct RemoteHeaderRequest<Number: ::std::fmt::Display> {
/// Number of the header to query.
pub block: Number,
/// Number of times to retry request. None means that default RETRY_COUNT is used.
pub retry_count: Option<usize>,
}

/// Remote storage read request.
Expand All @@ -55,6 +59,8 @@ pub struct RemoteReadRequest<Hash: ::std::fmt::Display> {
pub block: Hash,
/// Storage key to read.
pub key: Vec<u8>,
/// Number of times to retry request. None means that default RETRY_COUNT is used.
pub retry_count: Option<usize>,
}

/// Light client data fetcher. Implementations of this trait must check if remote data
Expand Down Expand Up @@ -281,6 +287,7 @@ pub mod tests {
assert_eq!(local_checker.check_read_proof(&RemoteReadRequest {
block: remote_block_hash,
key: b":auth:len".to_vec(),
retry_count: None,
}, remote_read_proof).unwrap().unwrap()[0], authorities_len as u8);
}

Expand All @@ -290,6 +297,7 @@ pub mod tests {
assert!(local_checker.check_read_proof(&RemoteReadRequest {
block: remote_block_hash,
key: b":auth:len".to_vec(),
retry_count: None,
}, remote_read_proof).is_err());
}

Expand All @@ -298,6 +306,7 @@ pub mod tests {
let (local_checker, remote_block_header, remote_header_proof) = prepare_for_header_proof_check(true);
assert_eq!(local_checker.check_header_proof(&RemoteHeaderRequest {
block: 1,
retry_count: None,
}, Some(remote_block_header.clone()), remote_header_proof).unwrap(), remote_block_header);
}

Expand All @@ -306,6 +315,7 @@ pub mod tests {
let (local_checker, _, remote_header_proof) = prepare_for_header_proof_check(true);
assert!(local_checker.check_header_proof(&RemoteHeaderRequest {
block: 1,
retry_count: None,
}, None, remote_header_proof).is_err());
}

Expand All @@ -314,6 +324,7 @@ pub mod tests {
let (local_checker, remote_block_header, remote_header_proof) = prepare_for_header_proof_check(false);
assert!(local_checker.check_header_proof(&RemoteHeaderRequest {
block: 1,
retry_count: None,
}, Some(remote_block_header.clone()), remote_header_proof).is_err());
}

Expand All @@ -323,6 +334,7 @@ pub mod tests {
remote_block_header.number = 100;
assert!(local_checker.check_header_proof(&RemoteHeaderRequest {
block: 1,
retry_count: None,
}, Some(remote_block_header.clone()), remote_header_proof).is_err());
}
}
145 changes: 127 additions & 18 deletions substrate/network/src/on_demand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor};

/// Remote request timeout.
const REQUEST_TIMEOUT: Duration = Duration::from_secs(15);
/// Default request retry count.
const RETRY_COUNT: usize = 1;

/// On-demand service API.
pub trait OnDemandService<Block: BlockT>: Send + Sync {
Expand Down Expand Up @@ -85,6 +87,7 @@ struct OnDemandCore<B: BlockT, E: service::ExecuteInContext<B>> {
struct Request<Block: BlockT> {
id: u64,
timestamp: Instant,
retry_count: usize,
data: RequestData<Block>,
}

Expand Down Expand Up @@ -139,9 +142,9 @@ impl<B: BlockT, E> OnDemand<B, E> where
}

/// Schedule && dispatch all scheduled requests.
fn schedule_request<R>(&self, data: RequestData<B>, result: R) -> R {
fn schedule_request<R>(&self, retry_count: Option<usize>, data: RequestData<B>, result: R) -> R {
let mut core = self.core.lock();
core.insert(data);
core.insert(retry_count.unwrap_or(RETRY_COUNT), data);
core.dispatch();
result
}
Expand All @@ -158,21 +161,31 @@ impl<B: BlockT, E> OnDemand<B, E> where
},
};

let retry_request_data = match try_accept(request) {
Accept::Ok => None,
let retry_count = request.retry_count;
let (retry_count, retry_request_data) = match try_accept(request) {
Accept::Ok => (retry_count, None),
Accept::CheckFailed(error, retry_request_data) => {
io.report_peer(peer, Severity::Bad(&format!("Failed to check remote {} response from peer: {}", rtype, error)));
core.remove_peer(peer);
Some(retry_request_data)

if retry_count > 0 {
(retry_count - 1, Some(retry_request_data))
} else {
trace!(target: "sync", "Failed to get remote {} response for given number of retries", rtype);
retry_request_data.fail(client::error::ErrorKind::RemoteFetchFailed.into());
(0, None)
}
},
Accept::Unexpected(retry_request_data) => {
trace!(target: "sync", "Unexpected response to remote {} from peer {}", rtype, peer);
Some(retry_request_data)
io.report_peer(peer, Severity::Bad(&format!("Unexpected response to remote {} from peer", rtype)));
core.remove_peer(peer);

(retry_count, Some(retry_request_data))
},
};

if let Some(request_data) = retry_request_data {
core.insert(request_data);
core.insert(retry_count, request_data);
}

core.dispatch();
Expand Down Expand Up @@ -262,19 +275,19 @@ impl<B, E> Fetcher<B> for OnDemand<B, E> where

fn remote_header(&self, request: RemoteHeaderRequest<NumberFor<B>>) -> Self::RemoteHeaderResult {
let (sender, receiver) = channel();
self.schedule_request(RequestData::RemoteHeader(request, sender),
self.schedule_request(request.retry_count.clone(), RequestData::RemoteHeader(request, sender),
RemoteResponse { receiver })
}

fn remote_read(&self, request: RemoteReadRequest<B::Hash>) -> Self::RemoteReadResult {
let (sender, receiver) = channel();
self.schedule_request(RequestData::RemoteRead(request, sender),
self.schedule_request(request.retry_count.clone(), RequestData::RemoteRead(request, sender),
RemoteResponse { receiver })
}

fn remote_call(&self, request: RemoteCallRequest<B::Hash>) -> Self::RemoteCallResult {
let (sender, receiver) = channel();
self.schedule_request(RequestData::RemoteCall(request, sender),
self.schedule_request(request.retry_count.clone(), RequestData::RemoteCall(request, sender),
RemoteResponse { receiver })
}
}
Expand Down Expand Up @@ -314,13 +327,14 @@ impl<B, E> OnDemandCore<B, E> where
}
}

pub fn insert(&mut self, data: RequestData<B>) {
pub fn insert(&mut self, retry_count: usize, data: RequestData<B>) {
let request_id = self.next_request_id;
self.next_request_id += 1;

self.pending_requests.push_back(Request {
id: request_id,
timestamp: Instant::now(),
retry_count,
data,
});
}
Expand Down Expand Up @@ -385,6 +399,17 @@ impl<Block: BlockT> Request<Block> {
}
}

impl<Block: BlockT> RequestData<Block> {
pub fn fail(self, error: client::error::Error) {
// don't care if anyone is listening
match self {
RequestData::RemoteHeader(_, sender) => { let _ = sender.send(Err(error)); },
RequestData::RemoteCall(_, sender) => { let _ = sender.send(Err(error)); },
RequestData::RemoteRead(_, sender) => { let _ = sender.send(Err(error)); },
}
}
}

#[cfg(test)]
pub mod tests {
use std::collections::VecDeque;
Expand Down Expand Up @@ -488,7 +513,12 @@ pub mod tests {
assert_eq!(vec![0, 1], on_demand.core.lock().idle_peers.iter().cloned().collect::<Vec<_>>());
assert!(on_demand.core.lock().active_peers.is_empty());

on_demand.remote_call(RemoteCallRequest { block: Default::default(), method: "test".into(), call_data: vec![] });
on_demand.remote_call(RemoteCallRequest {
block: Default::default(),
method: "test".into(),
call_data: vec![],
retry_count: None,
});
assert_eq!(vec![1], on_demand.core.lock().idle_peers.iter().cloned().collect::<Vec<_>>());
assert_eq!(vec![0], on_demand.core.lock().active_peers.keys().cloned().collect::<Vec<_>>());

Expand All @@ -506,7 +536,12 @@ pub mod tests {
let mut network = TestIo::new(&queue, None);
on_demand.on_connect(0, Roles::FULL);

on_demand.remote_call(RemoteCallRequest { block: Default::default(), method: "test".into(), call_data: vec![] });
on_demand.remote_call(RemoteCallRequest {
block: Default::default(),
method: "test".into(),
call_data: vec![],
retry_count: None,
});
receive_call_response(&*on_demand, &mut network, 0, 1);
assert!(network.to_disconnect.contains(&0));
assert_eq!(on_demand.core.lock().pending_requests.len(), 1);
Expand All @@ -517,7 +552,12 @@ pub mod tests {
let (_x, on_demand) = dummy(false);
let queue = RwLock::new(VecDeque::new());
let mut network = TestIo::new(&queue, None);
on_demand.remote_call(RemoteCallRequest { block: Default::default(), method: "test".into(), call_data: vec![] });
on_demand.remote_call(RemoteCallRequest {
block: Default::default(),
method: "test".into(),
call_data: vec![],
retry_count: Some(1),
});

on_demand.on_connect(0, Roles::FULL);
receive_call_response(&*on_demand, &mut network, 0, 0);
Expand All @@ -536,14 +576,79 @@ pub mod tests {
assert!(network.to_disconnect.contains(&0));
}

#[test]
fn disconnects_from_peer_on_wrong_response_type() {
let (_x, on_demand) = dummy(false);
let queue = RwLock::new(VecDeque::new());
let mut network = TestIo::new(&queue, None);
on_demand.on_connect(0, Roles::FULL);

on_demand.remote_call(RemoteCallRequest {
block: Default::default(),
method: "test".into(),
call_data: vec![],
retry_count: Some(1),
});

on_demand.on_remote_read_response(&mut network, 0, message::RemoteReadResponse {
id: 0,
proof: vec![vec![2]],
});
assert!(network.to_disconnect.contains(&0));
assert_eq!(on_demand.core.lock().pending_requests.len(), 1);
}

#[test]
fn receives_remote_failure_after_retry_count_failures() {
use parking_lot::{Condvar, Mutex};

let retry_count = 2;
let (_x, on_demand) = dummy(false);
let queue = RwLock::new(VecDeque::new());
let mut network = TestIo::new(&queue, None);
for i in 0..retry_count+1 {
on_demand.on_connect(i, Roles::FULL);
}

let sync = Arc::new((Mutex::new(0), Mutex::new(0), Condvar::new()));
let thread_sync = sync.clone();

let response = on_demand.remote_call(RemoteCallRequest { block: Default::default(), method: "test".into(),
call_data: vec![], retry_count: Some(retry_count) });
let thread = ::std::thread::spawn(move || {
let &(ref current, ref finished_at, ref finished) = &*thread_sync;
let _ = response.wait().unwrap_err();
*finished_at.lock() = *current.lock();
finished.notify_one();
});

let &(ref current, ref finished_at, ref finished) = &*sync;
for i in 0..retry_count+1 {
let mut current = current.lock();
*current = *current + 1;
receive_call_response(&*on_demand, &mut network, i, i as u64);
}

let mut finished_at = finished_at.lock();
assert!(!finished.wait_for(&mut finished_at, ::std::time::Duration::from_millis(1000)).timed_out());
assert_eq!(*finished_at, retry_count + 1);

thread.join().unwrap();
}

#[test]
fn receives_remote_call_response() {
let (_x, on_demand) = dummy(true);
let queue = RwLock::new(VecDeque::new());
let mut network = TestIo::new(&queue, None);
on_demand.on_connect(0, Roles::FULL);

let response = on_demand.remote_call(RemoteCallRequest { block: Default::default(), method: "test".into(), call_data: vec![] });
let response = on_demand.remote_call(RemoteCallRequest {
block: Default::default(),
method: "test".into(),
call_data: vec![],
retry_count: None,
});
let thread = ::std::thread::spawn(move || {
let result = response.wait().unwrap();
assert_eq!(result.return_data, vec![42]);
Expand All @@ -560,7 +665,11 @@ pub mod tests {
let mut network = TestIo::new(&queue, None);
on_demand.on_connect(0, Roles::FULL);

let response = on_demand.remote_read(RemoteReadRequest { block: Default::default(), key: b":key".to_vec() });
let response = on_demand.remote_read(RemoteReadRequest {
block: Default::default(),
key: b":key".to_vec(),
retry_count: None,
});
let thread = ::std::thread::spawn(move || {
let result = response.wait().unwrap();
assert_eq!(result, Some(vec![41]));
Expand All @@ -580,7 +689,7 @@ pub mod tests {
let mut network = TestIo::new(&queue, None);
on_demand.on_connect(0, Roles::FULL);

let response = on_demand.remote_header(RemoteHeaderRequest { block: 1 });
let response = on_demand.remote_header(RemoteHeaderRequest { block: 1, retry_count: None, });
let thread = ::std::thread::spawn(move || {
let result = response.wait().unwrap();
assert_eq!(result.hash(), "80729accb7bb10ff9c637a10e8bb59f21c52571aa7b46544c5885ca89ed190f4".into());
Expand Down

0 comments on commit 1752ec4

Please sign in to comment.