Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Make unbounded channels size warning exact (part 2) #13504

Merged
merged 9 commits into from
Mar 7, 2023
14 changes: 13 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion client/api/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ impl<Block: BlockT> UnpinHandleInner<Block> {

impl<Block: BlockT> Drop for UnpinHandleInner<Block> {
fn drop(&mut self) {
if let Err(err) = self.unpin_worker_sender.unbounded_send(self.hash) {
if let Err(err) = self.unpin_worker_sender.try_send(self.hash) {
log::debug!(target: "db", "Unable to unpin block with hash: {}, error: {:?}", self.hash, err);
};
}
Expand Down
71 changes: 37 additions & 34 deletions client/consensus/common/src/import_queue/basic_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ impl<B: BlockT> BasicQueueHandle<B> {
}

pub fn close(&mut self) {
self.justification_sender.close_channel();
self.block_import_sender.close_channel();
self.justification_sender.close();
self.block_import_sender.close();
}
}

Expand All @@ -130,9 +130,7 @@ impl<B: BlockT> ImportQueueService<B> for BasicQueueHandle<B> {
}

trace!(target: LOG_TARGET, "Scheduling {} blocks for import", blocks.len());
let res = self
.block_import_sender
.unbounded_send(worker_messages::ImportBlocks(origin, blocks));
let res = self.block_import_sender.try_send(worker_messages::ImportBlocks(origin, blocks));

if res.is_err() {
log::error!(
Expand All @@ -150,9 +148,12 @@ impl<B: BlockT> ImportQueueService<B> for BasicQueueHandle<B> {
justifications: Justifications,
) {
for justification in justifications {
let res = self.justification_sender.unbounded_send(
worker_messages::ImportJustification(who, hash, number, justification),
);
let res = self.justification_sender.try_send(worker_messages::ImportJustification(
who,
hash,
number,
justification,
));

if res.is_err() {
log::error!(
Expand Down Expand Up @@ -597,11 +598,11 @@ mod tests {
fn prioritizes_finality_work_over_block_import() {
let (result_sender, mut result_port) = buffered_link::buffered_link(100_000);

let (worker, mut finality_sender, mut block_import_sender) =
let (worker, finality_sender, block_import_sender) =
BlockImportWorker::new(result_sender, (), Box::new(()), Some(Box::new(())), None);
futures::pin_mut!(worker);

let mut import_block = |n| {
let import_block = |n| {
let header = Header {
parent_hash: Hash::random(),
number: n,
Expand All @@ -612,35 +613,37 @@ mod tests {

let hash = header.hash();

block_on(block_import_sender.send(worker_messages::ImportBlocks(
BlockOrigin::Own,
vec![IncomingBlock {
hash,
header: Some(header),
body: None,
indexed_body: None,
justifications: None,
origin: None,
allow_missing_state: false,
import_existing: false,
state: None,
skip_execution: false,
}],
)))
.unwrap();
block_import_sender
.try_send(worker_messages::ImportBlocks(
BlockOrigin::Own,
vec![IncomingBlock {
hash,
header: Some(header),
body: None,
indexed_body: None,
justifications: None,
origin: None,
allow_missing_state: false,
import_existing: false,
state: None,
skip_execution: false,
}],
))
.unwrap();

hash
};

let mut import_justification = || {
let import_justification = || {
let hash = Hash::random();
block_on(finality_sender.send(worker_messages::ImportJustification(
libp2p::PeerId::random(),
hash,
1,
(*b"TEST", Vec::new()),
)))
.unwrap();
finality_sender
.try_send(worker_messages::ImportJustification(
libp2p::PeerId::random(),
hash,
1,
(*b"TEST", Vec::new()),
))
.unwrap();

hash
};
Expand Down
12 changes: 5 additions & 7 deletions client/consensus/common/src/import_queue/buffered_link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ use super::BlockImportResult;
/// can be used to buffer commands, and the receiver can be used to poll said commands and transfer
/// them to another link. `queue_size_warning` sets the warning threshold of the channel queue size.
pub fn buffered_link<B: BlockT>(
queue_size_warning: i64,
queue_size_warning: usize,
) -> (BufferedLinkSender<B>, BufferedLinkReceiver<B>) {
let (tx, rx) = tracing_unbounded("mpsc_buffered_link", queue_size_warning);
let tx = BufferedLinkSender { tx };
Expand Down Expand Up @@ -97,7 +97,7 @@ impl<B: BlockT> Link<B> for BufferedLinkSender<B> {
) {
let _ = self
.tx
.unbounded_send(BlockImportWorkerMsg::BlocksProcessed(imported, count, results));
.try_send(BlockImportWorkerMsg::BlocksProcessed(imported, count, results));
}

fn justification_imported(
Expand All @@ -108,13 +108,11 @@ impl<B: BlockT> Link<B> for BufferedLinkSender<B> {
success: bool,
) {
let msg = BlockImportWorkerMsg::JustificationImported(who, *hash, number, success);
let _ = self.tx.unbounded_send(msg);
let _ = self.tx.try_send(msg);
}

fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>) {
let _ = self
.tx
.unbounded_send(BlockImportWorkerMsg::RequestJustification(*hash, number));
let _ = self.tx.try_send(BlockImportWorkerMsg::RequestJustification(*hash, number));
}
}

Expand Down Expand Up @@ -166,7 +164,7 @@ impl<B: BlockT> BufferedLinkReceiver<B> {
}

/// Close the channel.
pub fn close(&mut self) {
pub fn close(&mut self) -> bool {
self.rx.get_mut().close()
}
}
Expand Down
2 changes: 1 addition & 1 deletion client/consensus/grandpa/src/communication/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1424,7 +1424,7 @@ impl<Block: BlockT> GossipValidator<Block> {
}

fn report(&self, who: PeerId, cost_benefit: ReputationChange) {
let _ = self.report_sender.unbounded_send(PeerReport { who, cost_benefit });
let _ = self.report_sender.try_send(PeerReport { who, cost_benefit });
}

pub(super) fn do_validate(
Expand Down
2 changes: 1 addition & 1 deletion client/consensus/grandpa/src/communication/periodic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl<B: BlockT> NeighborPacketSender<B> {
who: Vec<sc_network::PeerId>,
neighbor_packet: NeighborPacket<NumberFor<B>>,
) {
if let Err(err) = self.0.unbounded_send((who, neighbor_packet)) {
if let Err(err) = self.0.try_send((who, neighbor_packet)) {
debug!(target: LOG_TARGET, "Failed to send neighbor packet: {:?}", err);
}
}
Expand Down
20 changes: 10 additions & 10 deletions client/consensus/grandpa/src/communication/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl NetworkPeers for TestNetwork {
}

fn report_peer(&self, who: PeerId, cost_benefit: ReputationChange) {
let _ = self.sender.unbounded_send(Event::Report(who, cost_benefit));
let _ = self.sender.try_send(Event::Report(who, cost_benefit));
}

fn disconnect_peer(&self, _who: PeerId, _protocol: ProtocolName) {}
Expand Down Expand Up @@ -136,14 +136,14 @@ impl NetworkEventStream for TestNetwork {
_name: &'static str,
) -> Pin<Box<dyn Stream<Item = NetworkEvent> + Send>> {
let (tx, rx) = tracing_unbounded("test", 100_000);
let _ = self.sender.unbounded_send(Event::EventStream(tx));
let _ = self.sender.try_send(Event::EventStream(tx));
Box::pin(rx)
}
}

impl NetworkNotification for TestNetwork {
fn write_notification(&self, target: PeerId, _protocol: ProtocolName, message: Vec<u8>) {
let _ = self.sender.unbounded_send(Event::WriteNotification(target, message));
let _ = self.sender.try_send(Event::WriteNotification(target, message));
}

fn notification_sender(
Expand All @@ -157,7 +157,7 @@ impl NetworkNotification for TestNetwork {

impl NetworkBlock<Hash, NumberFor<Block>> for TestNetwork {
fn announce_block(&self, hash: Hash, _data: Option<Vec<u8>>) {
let _ = self.sender.unbounded_send(Event::Announce(hash));
let _ = self.sender.try_send(Event::Announce(hash));
}

fn new_best_block_imported(&self, _hash: Hash, _number: NumberFor<Block>) {
Expand Down Expand Up @@ -365,14 +365,14 @@ fn good_commit_leads_to_relay() {
let send_message = tester.filter_network_events(move |event| match event {
Event::EventStream(sender) => {
// Add the sending peer and send the commit
let _ = sender.unbounded_send(NetworkEvent::NotificationStreamOpened {
let _ = sender.try_send(NetworkEvent::NotificationStreamOpened {
remote: sender_id,
protocol: grandpa_protocol_name::NAME.into(),
negotiated_fallback: None,
role: ObservedRole::Full,
});

let _ = sender.unbounded_send(NetworkEvent::NotificationsReceived {
let _ = sender.try_send(NetworkEvent::NotificationsReceived {
remote: sender_id,
messages: vec![(
grandpa_protocol_name::NAME.into(),
Expand All @@ -382,7 +382,7 @@ fn good_commit_leads_to_relay() {

// Add a random peer which will be the recipient of this message
let receiver_id = PeerId::random();
let _ = sender.unbounded_send(NetworkEvent::NotificationStreamOpened {
let _ = sender.try_send(NetworkEvent::NotificationStreamOpened {
remote: receiver_id,
protocol: grandpa_protocol_name::NAME.into(),
negotiated_fallback: None,
Expand All @@ -400,7 +400,7 @@ fn good_commit_leads_to_relay() {

let msg = gossip::GossipMessage::<Block>::Neighbor(update);

sender.unbounded_send(NetworkEvent::NotificationsReceived {
sender.try_send(NetworkEvent::NotificationsReceived {
remote: receiver_id,
messages: vec![(
grandpa_protocol_name::NAME.into(),
Expand Down Expand Up @@ -514,13 +514,13 @@ fn bad_commit_leads_to_report() {
let sender_id = id;
let send_message = tester.filter_network_events(move |event| match event {
Event::EventStream(sender) => {
let _ = sender.unbounded_send(NetworkEvent::NotificationStreamOpened {
let _ = sender.try_send(NetworkEvent::NotificationStreamOpened {
remote: sender_id,
protocol: grandpa_protocol_name::NAME.into(),
negotiated_fallback: None,
role: ObservedRole::Full,
});
let _ = sender.unbounded_send(NetworkEvent::NotificationsReceived {
let _ = sender.try_send(NetworkEvent::NotificationsReceived {
remote: sender_id,
messages: vec![(
grandpa_protocol_name::NAME.into(),
Expand Down
11 changes: 4 additions & 7 deletions client/consensus/grandpa/src/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,9 +497,7 @@ where
.map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
let new_set =
NewAuthoritySet { canon_number: number, canon_hash: hash, set_id, authorities };
let _ = self
.send_voter_commands
.unbounded_send(VoterCommand::ChangeAuthorities(new_set));
let _ = self.send_voter_commands.try_send(VoterCommand::ChangeAuthorities(new_set));
Ok(ImportResult::Imported(aux))
},
Ok(r) => Ok(r),
Expand Down Expand Up @@ -608,7 +606,7 @@ where

// Send the pause signal after import but BEFORE sending a `ChangeAuthorities` message.
if do_pause {
let _ = self.send_voter_commands.unbounded_send(VoterCommand::Pause(
let _ = self.send_voter_commands.try_send(VoterCommand::Pause(
"Forced change scheduled after inactivity".to_string(),
));
}
Expand All @@ -628,8 +626,7 @@ where
// they should import the block and discard the justification, and they will
// then request a justification from sync if it's necessary (which they should
// then be able to successfully validate).
let _ =
self.send_voter_commands.unbounded_send(VoterCommand::ChangeAuthorities(new));
let _ = self.send_voter_commands.try_send(VoterCommand::ChangeAuthorities(new));

// we must clear all pending justifications requests, presumably they won't be
// finalized hence why this forced changes was triggered
Expand Down Expand Up @@ -808,7 +805,7 @@ where
);

// send the command to the voter
let _ = self.send_voter_commands.unbounded_send(command);
let _ = self.send_voter_commands.try_send(command);
},
Err(CommandOrError::Error(e)) =>
return Err(match e {
Expand Down
8 changes: 4 additions & 4 deletions client/consensus/grandpa/src/until_imported.rs
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ mod tests {
let (tx, _rx) = tracing_unbounded("unpin-worker-channel", 10_000);
self.known_blocks.lock().insert(hash, number);
self.sender
.unbounded_send(BlockImportNotification::<Block>::new(
.try_send(BlockImportNotification::<Block>::new(
hash,
BlockOrigin::File,
header,
Expand Down Expand Up @@ -692,7 +692,7 @@ mod tests {
None,
);

global_tx.unbounded_send(msg).unwrap();
global_tx.try_send(msg).unwrap();

let work = until_imported.into_future();

Expand Down Expand Up @@ -720,7 +720,7 @@ mod tests {
None,
);

global_tx.unbounded_send(msg).unwrap();
global_tx.try_send(msg).unwrap();

// NOTE: needs to be cloned otherwise it is moved to the stream and
// dropped too early.
Expand Down Expand Up @@ -930,7 +930,7 @@ mod tests {
|| voter::CommunicationIn::Commit(0, unknown_commit.clone(), voter::Callback::Blank);

// we send the commit message and spawn the until_imported stream
global_tx.unbounded_send(unknown_commit()).unwrap();
global_tx.try_send(unknown_commit()).unwrap();

let threads_pool = futures::executor::ThreadPool::new().unwrap();
threads_pool.spawn_ok(until_imported.into_future().map(|_| ()));
Expand Down
Loading