Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Limit number of retries for light client queries #513

Merged
merged 1 commit into from
Sep 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions substrate/client/src/light/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ impl<Block, S, F, H, C> StateBackend<H, C> for OnDemandState<Block, S, F>
block: self.block,
header: header.expect("if block above guarantees that header is_some(); qed"),
key: key.to_vec(),
retry_count: None,
})
.into_future().wait()
}
Expand Down
1 change: 1 addition & 0 deletions substrate/client/src/light/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ impl<S, F, Block> BlockchainHeaderBackend<Block> for Blockchain<S, F> where Bloc
.remote_header(RemoteHeaderRequest {
cht_root: self.storage.cht_root(cht::SIZE, number)?,
block: number,
retry_count: None,
})
.into_future().wait()
.map(Some)
Expand Down
2 changes: 2 additions & 0 deletions substrate/client/src/light/call_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ impl<B, F, Block> CallExecutor<Block, KeccakHasher, RlpCodec> for RemoteCallExec
header: block_header,
method: method.into(),
call_data: call_data.to_vec(),
retry_count: None,
}).into_future().wait()
}

Expand Down Expand Up @@ -168,6 +169,7 @@ mod tests {
},
method: "authorities".into(),
call_data: vec![],
retry_count: None,
}, remote_execution_proof).unwrap();
}
}
10 changes: 10 additions & 0 deletions substrate/client/src/light/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ pub struct RemoteCallRequest<Header: HeaderT> {
pub method: String,
/// Call data.
pub call_data: Vec<u8>,
/// Number of times to retry request. None means that default RETRY_COUNT is used.
pub retry_count: Option<usize>,
}

/// Remote canonical header request.
Expand All @@ -52,6 +54,8 @@ pub struct RemoteHeaderRequest<Header: HeaderT> {
pub cht_root: Header::Hash,
/// Number of the header to query.
pub block: Header::Number,
/// Number of times to retry request. None means that default RETRY_COUNT is used.
pub retry_count: Option<usize>,
}

/// Remote storage read request.
Expand All @@ -63,6 +67,8 @@ pub struct RemoteReadRequest<Header: HeaderT> {
pub header: Header,
/// Storage key to read.
pub key: Vec<u8>,
/// Number of times to retry request. None means that default RETRY_COUNT is used.
pub retry_count: Option<usize>,
}

/// Light client data fetcher. Implementations of this trait must check if remote data
Expand Down Expand Up @@ -264,6 +270,7 @@ pub mod tests {
block: remote_block_header.hash(),
header: remote_block_header,
key: b":auth:len".to_vec(),
retry_count: None,
}, remote_read_proof).unwrap().unwrap()[0], authorities_len as u8);
}

Expand All @@ -273,6 +280,7 @@ pub mod tests {
assert_eq!((&local_checker as &FetchChecker<Block>).check_header_proof(&RemoteHeaderRequest::<Header> {
cht_root: local_cht_root,
block: 1,
retry_count: None,
}, Some(remote_block_header.clone()), remote_header_proof).unwrap(), remote_block_header);
}

Expand All @@ -283,6 +291,7 @@ pub mod tests {
assert!((&local_checker as &FetchChecker<Block>).check_header_proof(&RemoteHeaderRequest::<Header> {
cht_root: Default::default(),
block: 1,
retry_count: None,
}, Some(remote_block_header.clone()), remote_header_proof).is_err());
}

Expand All @@ -293,6 +302,7 @@ pub mod tests {
assert!((&local_checker as &FetchChecker<Block>).check_header_proof(&RemoteHeaderRequest::<Header> {
cht_root: local_cht_root,
block: 1,
retry_count: None,
}, Some(remote_block_header.clone()), remote_header_proof).is_err());
}
}
128 changes: 114 additions & 14 deletions substrate/network/src/on_demand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ use runtime_primitives::traits::{Block as BlockT, Header as HeaderT};

/// Remote request timeout.
const REQUEST_TIMEOUT: Duration = Duration::from_secs(15);
/// Default request retry count.
const RETRY_COUNT: usize = 1;

/// On-demand service API.
pub trait OnDemandService<Block: BlockT>: Send + Sync {
Expand Down Expand Up @@ -85,6 +87,7 @@ struct OnDemandCore<B: BlockT, E: service::ExecuteInContext<B>> {
struct Request<Block: BlockT> {
id: u64,
timestamp: Instant,
retry_count: usize,
data: RequestData<Block>,
}

Expand Down Expand Up @@ -139,9 +142,9 @@ impl<B: BlockT, E> OnDemand<B, E> where
}

/// Schedule && dispatch all scheduled requests.
fn schedule_request<R>(&self, data: RequestData<B>, result: R) -> R {
fn schedule_request<R>(&self, retry_count: Option<usize>, data: RequestData<B>, result: R) -> R {
let mut core = self.core.lock();
core.insert(data);
core.insert(retry_count.unwrap_or(RETRY_COUNT), data);
core.dispatch();
result
}
Expand All @@ -158,21 +161,31 @@ impl<B: BlockT, E> OnDemand<B, E> where
},
};

let retry_request_data = match try_accept(request) {
Accept::Ok => None,
let retry_count = request.retry_count;
let (retry_count, retry_request_data) = match try_accept(request) {
Accept::Ok => (retry_count, None),
Accept::CheckFailed(error, retry_request_data) => {
io.report_peer(peer, Severity::Bad(&format!("Failed to check remote {} response from peer: {}", rtype, error)));
core.remove_peer(peer);
Some(retry_request_data)

if retry_count > 0 {
(retry_count - 1, Some(retry_request_data))
} else {
trace!(target: "sync", "Failed to get remote {} response for given number of retries", rtype);
retry_request_data.fail(client::error::ErrorKind::RemoteFetchFailed.into());
(0, None)
}
},
Accept::Unexpected(retry_request_data) => {
trace!(target: "sync", "Unexpected response to remote {} from peer {}", rtype, peer);
Some(retry_request_data)
io.report_peer(peer, Severity::Bad(&format!("Unexpected response to remote {} from peer", rtype)));
core.remove_peer(peer);

(retry_count, Some(retry_request_data))
},
};

if let Some(request_data) = retry_request_data {
core.insert(request_data);
core.insert(retry_count, request_data);
}

core.dispatch();
Expand Down Expand Up @@ -262,19 +275,19 @@ impl<B, E> Fetcher<B> for OnDemand<B, E> where

fn remote_header(&self, request: RemoteHeaderRequest<B::Header>) -> Self::RemoteHeaderResult {
let (sender, receiver) = channel();
self.schedule_request(RequestData::RemoteHeader(request, sender),
self.schedule_request(request.retry_count.clone(), RequestData::RemoteHeader(request, sender),
RemoteResponse { receiver })
}

fn remote_read(&self, request: RemoteReadRequest<B::Header>) -> Self::RemoteReadResult {
let (sender, receiver) = channel();
self.schedule_request(RequestData::RemoteRead(request, sender),
self.schedule_request(request.retry_count.clone(), RequestData::RemoteRead(request, sender),
RemoteResponse { receiver })
}

fn remote_call(&self, request: RemoteCallRequest<B::Header>) -> Self::RemoteCallResult {
let (sender, receiver) = channel();
self.schedule_request(RequestData::RemoteCall(request, sender),
self.schedule_request(request.retry_count.clone(), RequestData::RemoteCall(request, sender),
RemoteResponse { receiver })
}
}
Expand Down Expand Up @@ -314,13 +327,14 @@ impl<B, E> OnDemandCore<B, E> where
}
}

pub fn insert(&mut self, data: RequestData<B>) {
pub fn insert(&mut self, retry_count: usize, data: RequestData<B>) {
let request_id = self.next_request_id;
self.next_request_id += 1;

self.pending_requests.push_back(Request {
id: request_id,
timestamp: Instant::now(),
retry_count,
data,
});
}
Expand Down Expand Up @@ -385,6 +399,17 @@ impl<Block: BlockT> Request<Block> {
}
}

impl<Block: BlockT> RequestData<Block> {
pub fn fail(self, error: client::error::Error) {
// don't care if anyone is listening
match self {
RequestData::RemoteHeader(_, sender) => { let _ = sender.send(Err(error)); },
RequestData::RemoteCall(_, sender) => { let _ = sender.send(Err(error)); },
RequestData::RemoteRead(_, sender) => { let _ = sender.send(Err(error)); },
}
}
}

#[cfg(test)]
pub mod tests {
use std::collections::VecDeque;
Expand Down Expand Up @@ -503,6 +528,7 @@ pub mod tests {
header: dummy_header(),
method: "test".into(),
call_data: vec![],
retry_count: None,
});
assert_eq!(vec![1], on_demand.core.lock().idle_peers.iter().cloned().collect::<Vec<_>>());
assert_eq!(vec![0], on_demand.core.lock().active_peers.keys().cloned().collect::<Vec<_>>());
Expand All @@ -526,6 +552,7 @@ pub mod tests {
header: dummy_header(),
method: "test".into(),
call_data: vec![],
retry_count: None,
});
receive_call_response(&*on_demand, &mut network, 0, 1);
assert!(network.to_disconnect.contains(&0));
Expand All @@ -542,6 +569,7 @@ pub mod tests {
header: dummy_header(),
method: "test".into(),
call_data: vec![],
retry_count: Some(1),
});

on_demand.on_connect(0, Roles::FULL);
Expand All @@ -561,6 +589,72 @@ pub mod tests {
assert!(network.to_disconnect.contains(&0));
}

#[test]
fn disconnects_from_peer_on_wrong_response_type() {
let (_x, on_demand) = dummy(false);
let queue = RwLock::new(VecDeque::new());
let mut network = TestIo::new(&queue, None);
on_demand.on_connect(0, Roles::FULL);

on_demand.remote_call(RemoteCallRequest {
block: Default::default(),
header: dummy_header(),
method: "test".into(),
call_data: vec![],
retry_count: Some(1),
});

on_demand.on_remote_read_response(&mut network, 0, message::RemoteReadResponse {
id: 0,
proof: vec![vec![2]],
});
assert!(network.to_disconnect.contains(&0));
assert_eq!(on_demand.core.lock().pending_requests.len(), 1);
}

#[test]
fn receives_remote_failure_after_retry_count_failures() {
use parking_lot::{Condvar, Mutex};

let retry_count = 2;
let (_x, on_demand) = dummy(false);
let queue = RwLock::new(VecDeque::new());
let mut network = TestIo::new(&queue, None);
for i in 0..retry_count+1 {
on_demand.on_connect(i, Roles::FULL);
}

let sync = Arc::new((Mutex::new(0), Mutex::new(0), Condvar::new()));
let thread_sync = sync.clone();

let response = on_demand.remote_call(RemoteCallRequest {
block: Default::default(),
header: dummy_header(),
method: "test".into(),
call_data: vec![],
retry_count: Some(retry_count)
});
let thread = ::std::thread::spawn(move || {
let &(ref current, ref finished_at, ref finished) = &*thread_sync;
let _ = response.wait().unwrap_err();
*finished_at.lock() = *current.lock();
finished.notify_one();
});

let &(ref current, ref finished_at, ref finished) = &*sync;
for i in 0..retry_count+1 {
let mut current = current.lock();
*current = *current + 1;
receive_call_response(&*on_demand, &mut network, i, i as u64);
}

let mut finished_at = finished_at.lock();
assert!(!finished.wait_for(&mut finished_at, ::std::time::Duration::from_millis(1000)).timed_out());
assert_eq!(*finished_at, retry_count + 1);

thread.join().unwrap();
}

#[test]
fn receives_remote_call_response() {
let (_x, on_demand) = dummy(true);
Expand All @@ -573,6 +667,7 @@ pub mod tests {
header: dummy_header(),
method: "test".into(),
call_data: vec![],
retry_count: None,
});
let thread = ::std::thread::spawn(move || {
let result = response.wait().unwrap();
Expand All @@ -593,7 +688,8 @@ pub mod tests {
let response = on_demand.remote_read(RemoteReadRequest {
header: dummy_header(),
block: Default::default(),
key: b":key".to_vec()
key: b":key".to_vec(),
retry_count: None,
});
let thread = ::std::thread::spawn(move || {
let result = response.wait().unwrap();
Expand All @@ -614,7 +710,11 @@ pub mod tests {
let mut network = TestIo::new(&queue, None);
on_demand.on_connect(0, Roles::FULL);

let response = on_demand.remote_header(RemoteHeaderRequest { cht_root: Default::default(), block: 1 });
let response = on_demand.remote_header(RemoteHeaderRequest {
cht_root: Default::default(),
block: 1,
retry_count: None,
});
let thread = ::std::thread::spawn(move || {
let result = response.wait().unwrap();
assert_eq!(result.hash(), "80729accb7bb10ff9c637a10e8bb59f21c52571aa7b46544c5885ca89ed190f4".into());
Expand Down