Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Fix concurrent access to signer queue #8854

Merged
merged 5 commits into from
Jun 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rpc/src/v1/helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub use self::requests::{
TransactionRequest, FilledTransactionRequest, ConfirmationRequest, ConfirmationPayload, CallRequest,
};
pub use self::signing_queue::{
ConfirmationsQueue, ConfirmationReceiver, ConfirmationResult,
ConfirmationsQueue, ConfirmationReceiver, ConfirmationResult, ConfirmationSender,
SigningQueue, QueueEvent, DefaultAccount,
QUEUE_LIMIT as SIGNING_QUEUE_LIMIT,
};
Expand Down
86 changes: 44 additions & 42 deletions rpc/src/v1/helpers/signing_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,17 @@ pub trait SigningQueue: Send + Sync {
/// `ConfirmationReceiver` is a `Future` awaiting for resolution of the given request.
fn add_request(&self, request: ConfirmationPayload, origin: Origin) -> Result<(U256, ConfirmationReceiver), QueueAddError>;

/// Removes a request from the queue.
/// Notifies possible token holders that request was rejected.
fn request_rejected(&self, id: U256) -> Option<ConfirmationRequest>;
fn request_rejected(&self, sender: ConfirmationSender) -> Option<ConfirmationRequest>;

/// Removes a request from the queue.
/// Notifies possible token holders that request was confirmed and given hash was assigned.
fn request_confirmed(&self, id: U256, result: ConfirmationResult) -> Option<ConfirmationRequest>;
fn request_confirmed(&self, sender: ConfirmationSender, result: ConfirmationResult) -> Option<ConfirmationRequest>;

/// Returns a request if it is contained in the queue.
fn peek(&self, id: &U256) -> Option<ConfirmationRequest>;
/// Put a request taken from `SigningQueue::take` back to the queue.
fn request_untouched(&self, sender: ConfirmationSender);

/// Returns and removes a request if it is contained in the queue.
fn take(&self, id: &U256) -> Option<ConfirmationSender>;

/// Return copy of all the requests in the queue.
fn requests(&self) -> Vec<ConfirmationRequest>;
Expand All @@ -96,9 +97,12 @@ pub trait SigningQueue: Send + Sync {
fn is_empty(&self) -> bool;
}

struct ConfirmationSender {
/// Confirmation request information with result notifier.
pub struct ConfirmationSender {
/// Confirmation request information.
pub request: ConfirmationRequest,

sender: oneshot::Sender<ConfirmationResult>,
request: ConfirmationRequest,
}

/// Receiving end of the Confirmation channel; can be used as a `Future` to await for `ConfirmationRequest`
Expand All @@ -122,38 +126,31 @@ impl ConfirmationsQueue {
/// Notifies consumer that the communcation is over.
/// No more events will be sent after this function is invoked.
pub fn finish(&self) {
self.notify(QueueEvent::Finish);
self.notify_message(QueueEvent::Finish);
self.on_event.write().clear();
}

/// Notifies `ConfirmationReceiver` holder about the result given a request.
fn notify_result(&self, sender: ConfirmationSender, result: Option<ConfirmationResult>) -> Option<ConfirmationRequest> {
// notify receiver about the event
self.notify_message(result.clone().map_or_else(
|| QueueEvent::RequestRejected(sender.request.id),
|_| QueueEvent::RequestConfirmed(sender.request.id)
));

// notify confirmation receiver about resolution
let result = result.ok_or(errors::request_rejected());
sender.sender.send(result);

Some(sender.request)
}

/// Notifies receiver about the event happening in this queue.
fn notify(&self, message: QueueEvent) {
fn notify_message(&self, message: QueueEvent) {
for listener in &*self.on_event.read() {
listener(message.clone())
}
}

/// Removes requests from this queue and notifies `ConfirmationReceiver` holder about the result.
/// Notifies also a receiver about that event.
fn remove(&self, id: U256, result: Option<ConfirmationResult>) -> Option<ConfirmationRequest> {
let sender = self.queue.write().remove(&id);

if let Some(sender) = sender {
// notify receiver about the event
self.notify(result.clone().map_or_else(
|| QueueEvent::RequestRejected(id),
|_| QueueEvent::RequestConfirmed(id)
));

// notify confirmation receiver about resolution
let result = result.ok_or(errors::request_rejected());
sender.sender.send(result);

Some(sender.request)
} else {
None
}
}
}

impl Drop for ConfirmationsQueue {
Expand Down Expand Up @@ -193,22 +190,26 @@ impl SigningQueue for ConfirmationsQueue {
(id, receiver)
};
// Notify listeners
self.notify(QueueEvent::NewRequest(id));
self.notify_message(QueueEvent::NewRequest(id));
Ok(res)
}

fn peek(&self, id: &U256) -> Option<ConfirmationRequest> {
self.queue.read().get(id).map(|sender| sender.request.clone())
fn take(&self, id: &U256) -> Option<ConfirmationSender> {
self.queue.write().remove(id)
}

fn request_rejected(&self, sender: ConfirmationSender) -> Option<ConfirmationRequest> {
debug!(target: "own_tx", "Signer: Request rejected ({:?}).", sender.request.id);
self.notify_result(sender, None)
}

fn request_rejected(&self, id: U256) -> Option<ConfirmationRequest> {
debug!(target: "own_tx", "Signer: Request rejected ({:?}).", id);
self.remove(id, None)
fn request_confirmed(&self, sender: ConfirmationSender, result: ConfirmationResult) -> Option<ConfirmationRequest> {
debug!(target: "own_tx", "Signer: Request confirmed ({:?}).", sender.request.id);
self.notify_result(sender, Some(result))
}

fn request_confirmed(&self, id: U256, result: ConfirmationResult) -> Option<ConfirmationRequest> {
debug!(target: "own_tx", "Signer: Transaction confirmed ({:?}).", id);
self.remove(id, Some(result))
fn request_untouched(&self, sender: ConfirmationSender) {
self.queue.write().insert(sender.request.id, sender);
}

fn requests(&self) -> Vec<ConfirmationRequest> {
Expand Down Expand Up @@ -260,7 +261,8 @@ mod test {

// when
let (id, future) = queue.add_request(request, Default::default()).unwrap();
queue.request_confirmed(id, Ok(ConfirmationResponse::SendTransaction(1.into())));
let sender = queue.take(&id).unwrap();
queue.request_confirmed(sender, Ok(ConfirmationResponse::SendTransaction(1.into())));

// then
let confirmation = future.wait().unwrap();
Expand Down
21 changes: 13 additions & 8 deletions rpc/src/v1/impls/signer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,11 @@ impl<D: Dispatcher + 'static> SignerClient<D> {
let dispatcher = self.dispatcher.clone();
let signer = self.signer.clone();

Box::new(signer.peek(&id).map(|confirmation| {
let mut payload = confirmation.payload.clone();
Box::new(signer.take(&id).map(|sender| {
let mut payload = sender.request.payload.clone();
// Modify payload
if let ConfirmationPayload::SendTransaction(ref mut request) = payload {
if let Some(sender) = modification.sender.clone() {
if let Some(sender) = modification.sender {
request.from = sender.into();
// Altering sender should always reset the nonce.
request.nonce = None;
Expand All @@ -109,7 +109,9 @@ impl<D: Dispatcher + 'static> SignerClient<D> {
Either::A(fut.into_future().then(move |result| {
// Execute
if let Ok(ref response) = result {
signer.request_confirmed(id, Ok((*response).clone()));
signer.request_confirmed(sender, Ok((*response).clone()));
} else {
signer.request_untouched(sender);
}

result
Expand Down Expand Up @@ -188,8 +190,9 @@ impl<D: Dispatcher + 'static> Signer for SignerClient<D> {
fn confirm_request_raw(&self, id: U256, bytes: Bytes) -> Result<ConfirmationResponse> {
let id = id.into();

self.signer.peek(&id).map(|confirmation| {
let result = match confirmation.payload {
self.signer.take(&id).map(|sender| {
let payload = sender.request.payload.clone();
let result = match payload {
ConfirmationPayload::SendTransaction(request) => {
Self::verify_transaction(bytes, request, |pending_transaction| {
self.dispatcher.dispatch_transaction(pending_transaction)
Expand Down Expand Up @@ -218,14 +221,16 @@ impl<D: Dispatcher + 'static> Signer for SignerClient<D> {
},
};
if let Ok(ref response) = result {
self.signer.request_confirmed(id, Ok(response.clone()));
self.signer.request_confirmed(sender, Ok(response.clone()));
} else {
self.signer.request_untouched(sender);
}
result
}).unwrap_or_else(|| Err(errors::invalid_params("Unknown RequestID", id)))
}

fn reject_request(&self, id: U256) -> Result<bool> {
let res = self.signer.request_rejected(id.into());
let res = self.signer.take(&id.into()).map(|sender| self.signer.request_rejected(sender));
Ok(res.is_some())
}

Expand Down
15 changes: 10 additions & 5 deletions rpc/src/v1/tests/mocked/signing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ fn should_add_sign_to_queue() {
::std::thread::spawn(move || loop {
if signer.requests().len() == 1 {
// respond
signer.request_confirmed(1.into(), Ok(ConfirmationResponse::Signature(0.into())));
let sender = signer.take(&1.into()).unwrap();
signer.request_confirmed(sender, Ok(ConfirmationResponse::Signature(0.into())));
break
}
::std::thread::sleep(Duration::from_millis(100))
Expand Down Expand Up @@ -187,7 +188,8 @@ fn should_check_status_of_request_when_its_resolved() {
"id": 1
}"#;
tester.io.handle_request_sync(&request).expect("Sent");
tester.signer.request_confirmed(1.into(), Ok(ConfirmationResponse::Signature(1.into())));
let sender = tester.signer.take(&1.into()).unwrap();
tester.signer.request_confirmed(sender, Ok(ConfirmationResponse::Signature(1.into())));

// This is not ideal, but we need to give futures some time to be executed, and they need to run in a separate thread
thread::sleep(Duration::from_millis(20));
Expand Down Expand Up @@ -258,7 +260,8 @@ fn should_add_transaction_to_queue() {
::std::thread::spawn(move || loop {
if signer.requests().len() == 1 {
// respond
signer.request_confirmed(1.into(), Ok(ConfirmationResponse::SendTransaction(0.into())));
let sender = signer.take(&1.into()).unwrap();
signer.request_confirmed(sender, Ok(ConfirmationResponse::SendTransaction(0.into())));
break
}
::std::thread::sleep(Duration::from_millis(100))
Expand Down Expand Up @@ -334,7 +337,8 @@ fn should_add_sign_transaction_to_the_queue() {
::std::thread::spawn(move || loop {
if signer.requests().len() == 1 {
// respond
signer.request_confirmed(1.into(), Ok(ConfirmationResponse::SignTransaction(
let sender = signer.take(&1.into()).unwrap();
signer.request_confirmed(sender, Ok(ConfirmationResponse::SignTransaction(
RichRawTransaction::from_signed(t.into(), 0x0, u64::max_value())
)));
break
Expand Down Expand Up @@ -440,7 +444,8 @@ fn should_add_decryption_to_the_queue() {
::std::thread::spawn(move || loop {
if signer.requests().len() == 1 {
// respond
signer.request_confirmed(1.into(), Ok(ConfirmationResponse::Decrypt(vec![0x1, 0x2].into())));
let sender = signer.take(&1.into()).unwrap();
signer.request_confirmed(sender, Ok(ConfirmationResponse::Decrypt(vec![0x1, 0x2].into())));
break
}
::std::thread::sleep(Duration::from_millis(10))
Expand Down