Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc-v2/tx/tests: Add transaction broadcast tests and check propagated tx status #3193

Merged
merged 20 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
886d3ab
tx/tests: Move tests to dedicated module
lexnv Feb 13, 2024
52c3afb
tx/tests: Add mock transaction pool to gain access to tx status
lexnv Feb 13, 2024
58b3f5e
tx/tests: Add task executor mock
lexnv Feb 13, 2024
8fc2339
tx/tests: Add setup file to use middlewares
lexnv Feb 13, 2024
9a54386
tx/tests: Use the testing infrastructure
lexnv Feb 13, 2024
d98f713
tx/tests: Ensure a future tx is propagated
lexnv Feb 13, 2024
c93a386
tx/tests: Check stop cannot be called after broadcast finishes
lexnv Feb 13, 2024
274b47a
tx/tests: Fix race between InBlock and Finalized events
lexnv Feb 13, 2024
94bae37
tx-pool: Extend basic pool with options
lexnv Feb 13, 2024
233f789
tx/tests: Use options as default
lexnv Feb 13, 2024
f8177ab
tx/tests: Check that an invalid tx is resubmited in the future
lexnv Feb 13, 2024
7004451
chainHead/tests: Do not panic on dropped receivers from mocks
lexnv Feb 13, 2024
5c76f29
test-utils: Inject tx priority to the test pool api
lexnv Feb 13, 2024
2d764ef
tx/tests: Check immediately dropped tx are resubmitted
lexnv Feb 13, 2024
373ae27
tx/tests: Decrease timeout from 60 secs to 5 secs
lexnv Feb 15, 2024
6466cf0
Merge remote-tracking branch 'origin/master' into lexnv/broadcast-tx-…
lexnv Feb 15, 2024
72dd3ae
tx/tests: Check number of active spawned tasks
lexnv Feb 27, 2024
661458a
Update substrate/client/rpc-spec-v2/src/transaction/tests/transaction…
lexnv Feb 27, 2024
c06012a
Update substrate/test-utils/runtime/transaction-pool/src/lib.rs
lexnv Feb 27, 2024
34933c3
Merge branch 'master' into lexnv/broadcast-tx-tests
lexnv Feb 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions substrate/client/rpc-spec-v2/src/chain_head/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl<Client> ChainHeadMockClient<Client> {
BlockImportNotification::new(header.hash(), BlockOrigin::Own, header, true, None, sink);

for sink in self.import_sinks.lock().iter_mut() {
sink.unbounded_send(notification.clone()).unwrap();
let _ = sink.unbounded_send(notification.clone());
}
}

Expand All @@ -83,7 +83,7 @@ impl<Client> ChainHeadMockClient<Client> {
let notification = FinalityNotification::from_summary(summary, sink);

for sink in self.finality_sinks.lock().iter_mut() {
sink.unbounded_send(notification.clone()).unwrap();
let _ = sink.unbounded_send(notification.clone());
}
}
}
Expand Down
238 changes: 0 additions & 238 deletions substrate/client/rpc-spec-v2/src/transaction/tests.rs

This file was deleted.

100 changes: 100 additions & 0 deletions substrate/client/rpc-spec-v2/src/transaction/tests/executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// This file is part of Substrate.

// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0

// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use sp_core::{testing::TaskExecutor, traits::SpawnNamed};
use std::sync::{atomic::AtomicUsize, Arc};
use tokio::sync::mpsc;

/// Wrap the `TaskExecutor` to know when the broadcast future is dropped.
#[derive(Clone)]
pub struct TaskExecutorBroadcast {
executor: TaskExecutor,
sender: mpsc::UnboundedSender<()>,
num_tasks: Arc<AtomicUsize>,
}

/// The channel that receives events when the broadcast futures are dropped.
pub type TaskExecutorRecv = mpsc::UnboundedReceiver<()>;

/// The state of the `TaskExecutorBroadcast`.
pub struct TaskExecutorState {
pub recv: TaskExecutorRecv,
pub num_tasks: Arc<AtomicUsize>,
}

impl TaskExecutorState {
pub fn num_tasks(&self) -> usize {
self.num_tasks.load(std::sync::atomic::Ordering::Acquire)
}
}

impl TaskExecutorBroadcast {
/// Construct a new `TaskExecutorBroadcast` and a receiver to know when the broadcast futures
/// are dropped.
pub fn new() -> (Self, TaskExecutorState) {
let (sender, recv) = mpsc::unbounded_channel();
let num_tasks = Arc::new(AtomicUsize::new(0));

(
Self { executor: TaskExecutor::new(), sender, num_tasks: num_tasks.clone() },
TaskExecutorState { recv, num_tasks },
)
}
}

impl SpawnNamed for TaskExecutorBroadcast {
fn spawn(
&self,
name: &'static str,
group: Option<&'static str>,
future: futures::future::BoxFuture<'static, ()>,
) {
let sender = self.sender.clone();
let num_tasks = self.num_tasks.clone();

let future = Box::pin(async move {
num_tasks.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
future.await;
num_tasks.fetch_sub(1, std::sync::atomic::Ordering::AcqRel);

let _ = sender.send(());
});

self.executor.spawn(name, group, future)
}

fn spawn_blocking(
&self,
name: &'static str,
group: Option<&'static str>,
future: futures::future::BoxFuture<'static, ()>,
) {
let sender = self.sender.clone();
let num_tasks = self.num_tasks.clone();

let future = Box::pin(async move {
num_tasks.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
future.await;
num_tasks.fetch_sub(1, std::sync::atomic::Ordering::AcqRel);

let _ = sender.send(());
});

self.executor.spawn_blocking(name, group, future)
}
}
Loading
Loading