Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
limit retry count in OnDemand
Browse files Browse the repository at this point in the history
  • Loading branch information
svyatonik committed Sep 5, 2018
1 parent 256a1c2 commit 9b94a81
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 14 deletions.
1 change: 1 addition & 0 deletions substrate/client/src/light/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ impl<Block, S, F, H, C> StateBackend<H, C> for OnDemandState<Block, S, F>
block: self.block,
header: header.expect("if block above guarantees that header is_some(); qed"),
key: key.to_vec(),
retry_count: None,
})
.into_future().wait()
}
Expand Down
1 change: 1 addition & 0 deletions substrate/client/src/light/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ impl<S, F, Block> BlockchainHeaderBackend<Block> for Blockchain<S, F> where Bloc
.remote_header(RemoteHeaderRequest {
cht_root: self.storage.cht_root(cht::SIZE, number)?,
block: number,
retry_count: None,
})
.into_future().wait()
.map(Some)
Expand Down
2 changes: 2 additions & 0 deletions substrate/client/src/light/call_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ impl<B, F, Block> CallExecutor<Block, KeccakHasher, RlpCodec> for RemoteCallExec
header: block_header,
method: method.into(),
call_data: call_data.to_vec(),
retry_count: None,
}).into_future().wait()
}

Expand Down Expand Up @@ -168,6 +169,7 @@ mod tests {
},
method: "authorities".into(),
call_data: vec![],
retry_count: None,
}, remote_execution_proof).unwrap();
}
}
10 changes: 10 additions & 0 deletions substrate/client/src/light/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ pub struct RemoteCallRequest<Header: HeaderT> {
pub method: String,
/// Call data.
pub call_data: Vec<u8>,
/// Number of times to retry request. None means that default RETRY_COUNT is used.
pub retry_count: Option<usize>,
}

/// Remote canonical header request.
Expand All @@ -52,6 +54,8 @@ pub struct RemoteHeaderRequest<Header: HeaderT> {
pub cht_root: Header::Hash,
/// Number of the header to query.
pub block: Header::Number,
/// Number of times to retry request. None means that default RETRY_COUNT is used.
pub retry_count: Option<usize>,
}

/// Remote storage read request.
Expand All @@ -63,6 +67,8 @@ pub struct RemoteReadRequest<Header: HeaderT> {
pub header: Header,
/// Storage key to read.
pub key: Vec<u8>,
/// Number of times to retry request. None means that default RETRY_COUNT is used.
pub retry_count: Option<usize>,
}

/// Light client data fetcher. Implementations of this trait must check if remote data
Expand Down Expand Up @@ -264,6 +270,7 @@ pub mod tests {
block: remote_block_header.hash(),
header: remote_block_header,
key: b":auth:len".to_vec(),
retry_count: None,
}, remote_read_proof).unwrap().unwrap()[0], authorities_len as u8);
}

Expand All @@ -273,6 +280,7 @@ pub mod tests {
assert_eq!((&local_checker as &FetchChecker<Block>).check_header_proof(&RemoteHeaderRequest::<Header> {
cht_root: local_cht_root,
block: 1,
retry_count: None,
}, Some(remote_block_header.clone()), remote_header_proof).unwrap(), remote_block_header);
}

Expand All @@ -283,6 +291,7 @@ pub mod tests {
assert!((&local_checker as &FetchChecker<Block>).check_header_proof(&RemoteHeaderRequest::<Header> {
cht_root: Default::default(),
block: 1,
retry_count: None,
}, Some(remote_block_header.clone()), remote_header_proof).is_err());
}

Expand All @@ -293,6 +302,7 @@ pub mod tests {
assert!((&local_checker as &FetchChecker<Block>).check_header_proof(&RemoteHeaderRequest::<Header> {
cht_root: local_cht_root,
block: 1,
retry_count: None,
}, Some(remote_block_header.clone()), remote_header_proof).is_err());
}
}
128 changes: 114 additions & 14 deletions substrate/network/src/on_demand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ use runtime_primitives::traits::{Block as BlockT, Header as HeaderT};

/// Remote request timeout.
const REQUEST_TIMEOUT: Duration = Duration::from_secs(15);
/// Default request retry count.
const RETRY_COUNT: usize = 1;

/// On-demand service API.
pub trait OnDemandService<Block: BlockT>: Send + Sync {
Expand Down Expand Up @@ -85,6 +87,7 @@ struct OnDemandCore<B: BlockT, E: service::ExecuteInContext<B>> {
struct Request<Block: BlockT> {
id: u64,
timestamp: Instant,
retry_count: usize,
data: RequestData<Block>,
}

Expand Down Expand Up @@ -139,9 +142,9 @@ impl<B: BlockT, E> OnDemand<B, E> where
}

/// Schedule && dispatch all scheduled requests.
fn schedule_request<R>(&self, data: RequestData<B>, result: R) -> R {
fn schedule_request<R>(&self, retry_count: Option<usize>, data: RequestData<B>, result: R) -> R {
let mut core = self.core.lock();
core.insert(data);
core.insert(retry_count.unwrap_or(RETRY_COUNT), data);
core.dispatch();
result
}
Expand All @@ -158,21 +161,31 @@ impl<B: BlockT, E> OnDemand<B, E> where
},
};

let retry_request_data = match try_accept(request) {
Accept::Ok => None,
let retry_count = request.retry_count;
let (retry_count, retry_request_data) = match try_accept(request) {
Accept::Ok => (retry_count, None),
Accept::CheckFailed(error, retry_request_data) => {
io.report_peer(peer, Severity::Bad(&format!("Failed to check remote {} response from peer: {}", rtype, error)));
core.remove_peer(peer);
Some(retry_request_data)

if retry_count > 0 {
(retry_count - 1, Some(retry_request_data))
} else {
trace!(target: "sync", "Failed to get remote {} response for given number of retries", rtype);
retry_request_data.fail(client::error::ErrorKind::RemoteFetchFailed.into());
(0, None)
}
},
Accept::Unexpected(retry_request_data) => {
trace!(target: "sync", "Unexpected response to remote {} from peer {}", rtype, peer);
Some(retry_request_data)
io.report_peer(peer, Severity::Bad(&format!("Unexpected response to remote {} from peer", rtype)));
core.remove_peer(peer);

(retry_count, Some(retry_request_data))
},
};

if let Some(request_data) = retry_request_data {
core.insert(request_data);
core.insert(retry_count, request_data);
}

core.dispatch();
Expand Down Expand Up @@ -262,19 +275,19 @@ impl<B, E> Fetcher<B> for OnDemand<B, E> where

fn remote_header(&self, request: RemoteHeaderRequest<B::Header>) -> Self::RemoteHeaderResult {
let (sender, receiver) = channel();
self.schedule_request(RequestData::RemoteHeader(request, sender),
self.schedule_request(request.retry_count.clone(), RequestData::RemoteHeader(request, sender),
RemoteResponse { receiver })
}

fn remote_read(&self, request: RemoteReadRequest<B::Header>) -> Self::RemoteReadResult {
let (sender, receiver) = channel();
self.schedule_request(RequestData::RemoteRead(request, sender),
self.schedule_request(request.retry_count.clone(), RequestData::RemoteRead(request, sender),
RemoteResponse { receiver })
}

fn remote_call(&self, request: RemoteCallRequest<B::Header>) -> Self::RemoteCallResult {
let (sender, receiver) = channel();
self.schedule_request(RequestData::RemoteCall(request, sender),
self.schedule_request(request.retry_count.clone(), RequestData::RemoteCall(request, sender),
RemoteResponse { receiver })
}
}
Expand Down Expand Up @@ -314,13 +327,14 @@ impl<B, E> OnDemandCore<B, E> where
}
}

pub fn insert(&mut self, data: RequestData<B>) {
pub fn insert(&mut self, retry_count: usize, data: RequestData<B>) {
let request_id = self.next_request_id;
self.next_request_id += 1;

self.pending_requests.push_back(Request {
id: request_id,
timestamp: Instant::now(),
retry_count,
data,
});
}
Expand Down Expand Up @@ -385,6 +399,17 @@ impl<Block: BlockT> Request<Block> {
}
}

impl<Block: BlockT> RequestData<Block> {
pub fn fail(self, error: client::error::Error) {
// don't care if anyone is listening
match self {
RequestData::RemoteHeader(_, sender) => { let _ = sender.send(Err(error)); },
RequestData::RemoteCall(_, sender) => { let _ = sender.send(Err(error)); },
RequestData::RemoteRead(_, sender) => { let _ = sender.send(Err(error)); },
}
}
}

#[cfg(test)]
pub mod tests {
use std::collections::VecDeque;
Expand Down Expand Up @@ -503,6 +528,7 @@ pub mod tests {
header: dummy_header(),
method: "test".into(),
call_data: vec![],
retry_count: None,
});
assert_eq!(vec![1], on_demand.core.lock().idle_peers.iter().cloned().collect::<Vec<_>>());
assert_eq!(vec![0], on_demand.core.lock().active_peers.keys().cloned().collect::<Vec<_>>());
Expand All @@ -526,6 +552,7 @@ pub mod tests {
header: dummy_header(),
method: "test".into(),
call_data: vec![],
retry_count: None,
});
receive_call_response(&*on_demand, &mut network, 0, 1);
assert!(network.to_disconnect.contains(&0));
Expand All @@ -542,6 +569,7 @@ pub mod tests {
header: dummy_header(),
method: "test".into(),
call_data: vec![],
retry_count: Some(1),
});

on_demand.on_connect(0, Roles::FULL);
Expand All @@ -561,6 +589,72 @@ pub mod tests {
assert!(network.to_disconnect.contains(&0));
}

#[test]
fn disconnects_from_peer_on_wrong_response_type() {
let (_x, on_demand) = dummy(false);
let queue = RwLock::new(VecDeque::new());
let mut network = TestIo::new(&queue, None);
on_demand.on_connect(0, Roles::FULL);

on_demand.remote_call(RemoteCallRequest {
block: Default::default(),
header: dummy_header(),
method: "test".into(),
call_data: vec![],
retry_count: Some(1),
});

on_demand.on_remote_read_response(&mut network, 0, message::RemoteReadResponse {
id: 0,
proof: vec![vec![2]],
});
assert!(network.to_disconnect.contains(&0));
assert_eq!(on_demand.core.lock().pending_requests.len(), 1);
}

#[test]
fn receives_remote_failure_after_retry_count_failures() {
use parking_lot::{Condvar, Mutex};

let retry_count = 2;
let (_x, on_demand) = dummy(false);
let queue = RwLock::new(VecDeque::new());
let mut network = TestIo::new(&queue, None);
for i in 0..retry_count+1 {
on_demand.on_connect(i, Roles::FULL);
}

let sync = Arc::new((Mutex::new(0), Mutex::new(0), Condvar::new()));
let thread_sync = sync.clone();

let response = on_demand.remote_call(RemoteCallRequest {
block: Default::default(),
header: dummy_header(),
method: "test".into(),
call_data: vec![],
retry_count: Some(retry_count)
});
let thread = ::std::thread::spawn(move || {
let &(ref current, ref finished_at, ref finished) = &*thread_sync;
let _ = response.wait().unwrap_err();
*finished_at.lock() = *current.lock();
finished.notify_one();
});

let &(ref current, ref finished_at, ref finished) = &*sync;
for i in 0..retry_count+1 {
let mut current = current.lock();
*current = *current + 1;
receive_call_response(&*on_demand, &mut network, i, i as u64);
}

let mut finished_at = finished_at.lock();
assert!(!finished.wait_for(&mut finished_at, ::std::time::Duration::from_millis(1000)).timed_out());
assert_eq!(*finished_at, retry_count + 1);

thread.join().unwrap();
}

#[test]
fn receives_remote_call_response() {
let (_x, on_demand) = dummy(true);
Expand All @@ -573,6 +667,7 @@ pub mod tests {
header: dummy_header(),
method: "test".into(),
call_data: vec![],
retry_count: None,
});
let thread = ::std::thread::spawn(move || {
let result = response.wait().unwrap();
Expand All @@ -593,7 +688,8 @@ pub mod tests {
let response = on_demand.remote_read(RemoteReadRequest {
header: dummy_header(),
block: Default::default(),
key: b":key".to_vec()
key: b":key".to_vec(),
retry_count: None,
});
let thread = ::std::thread::spawn(move || {
let result = response.wait().unwrap();
Expand All @@ -614,7 +710,11 @@ pub mod tests {
let mut network = TestIo::new(&queue, None);
on_demand.on_connect(0, Roles::FULL);

let response = on_demand.remote_header(RemoteHeaderRequest { cht_root: Default::default(), block: 1 });
let response = on_demand.remote_header(RemoteHeaderRequest {
cht_root: Default::default(),
block: 1,
retry_count: None,
});
let thread = ::std::thread::spawn(move || {
let result = response.wait().unwrap();
assert_eq!(result.hash(), "80729accb7bb10ff9c637a10e8bb59f21c52571aa7b46544c5885ca89ed190f4".into());
Expand Down

0 comments on commit 9b94a81

Please sign in to comment.