Skip to content

Commit

Permalink
test(subscriber): add initial integration tests
Browse files Browse the repository at this point in the history
The `console-subscriber` crate has no integration tests. There are some
unit tests, but without very high coverage of features.

Recently, we've found or fixed a few errors which probably could have
been caught by a medium level of integration testing.

However, testing `console-subscriber` isn't straight forward. It is
effectively a tracing subscriber (or layer) on one end, and a gRPC
server on the other end.

This change adds enough of a testing framework to write some initial
integration tests. It is the first step towards closing #450.

Each test comprises 2 parts:
- One or more "expcted tasks"
- A future which will be driven to completion on a dedicated Tokio runtime.

Behind the scenes, a console subscriber layer is created and it's server
part is connected to a duplex stream. The client of the duplex stream
then records incoming updates and reconstructs "actual tasks". The layer
itself is set as the default subscriber for the duration of `block_on`
which is used to drive the provided future to completioin.

The expected tasks have a set of "matches", which is how we find the
actual task that we want to validate against. Currently, the only value
we match on is the task's name.

The expected tasks also have a set of expectations. These are other
fields on the actual task which are validated once a matching task is
found. Currently, the two fields which can have expectations set on them
are the `wakes` and `self_wakes` fields.

So, to construct an expected task, which will match a task with the name
`"my-task"` and then validate that the matched task gets woken once, the
code would be:

```rust
ExpectedTask::default()
    .match_name("my-task")
    .expect_wakes(1);
```

A future which passes this test could be:

```rust
async {
    task::Builder::new()
        .name("my-task")
        .spawn(async {
            tokio::time::sleep(std::time::Duration::ZERO).await
        })
}
```

The full test would then look like:

```rust
fn wakes_once() {
    let expected_task = ExpectedTask::default()
        .match_name("my-task")
        .expect_wakes(1);

    let future = async {
        task::Builder::new()
            .name("my-task")
            .spawn(async {
                tokio::time::sleep(std::time::Duration::ZERO).await
            })
    };

    assert_task(expected_task, future);
}
```

The PR depends on 2 others:
 - #447 which fixes an error in the logic that determines whether a task
   is retained in the aggregator or not.
 - #451 which exposes the server parts and is necessary to allow us to
   connect the instrument server and client via a duplex channel.

This change contains some initial tests for wakes and self wakes which
would have caught the error fixed in #430. Additionally there are tests
for the functionality of the testing framework itself.
  • Loading branch information
hds committed Jul 18, 2023
1 parent 7c8e80a commit b68ef76
Show file tree
Hide file tree
Showing 10 changed files with 1,079 additions and 16 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions console-subscriber/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ crossbeam-channel = "0.5"

[dev-dependencies]
tokio = { version = "^1.21", features = ["full", "rt-multi-thread"] }
tower = "0.4"
futures = "0.3"

[package.metadata.docs.rs]
Expand Down
8 changes: 4 additions & 4 deletions console-subscriber/src/aggregator/id_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,18 +104,18 @@ impl<T: Unsent> IdData<T> {
if let Some(dropped_at) = stats.dropped_at() {
let dropped_for = now.checked_duration_since(dropped_at).unwrap_or_default();
let dirty = stats.is_unsent();
let should_drop =
let should_retain =
// if there are any clients watching, retain all dirty tasks regardless of age
(dirty && has_watchers)
|| dropped_for > retention;
|| dropped_for <= retention;
tracing::trace!(
stats.id = ?id,
stats.dropped_at = ?dropped_at,
stats.dropped_for = ?dropped_for,
stats.dirty = dirty,
should_drop,
should_retain,
);
return !should_drop;
return should_retain;
}

true
Expand Down
125 changes: 113 additions & 12 deletions console-subscriber/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![doc = include_str!("../README.md")]
use console_api as proto;
use proto::resources::resource;
use proto::{instrument::instrument_server::InstrumentServer, resources::resource};
use serde::Serialize;
use std::{
cell::RefCell,
Expand All @@ -15,7 +15,10 @@ use std::{
use thread_local::ThreadLocal;
#[cfg(unix)]
use tokio::net::UnixListener;
use tokio::sync::{mpsc, oneshot};
use tokio::{
sync::{mpsc, oneshot},
task::JoinHandle,
};
#[cfg(unix)]
use tokio_stream::wrappers::UnixListenerStream;
use tracing_core::{
Expand Down Expand Up @@ -933,18 +936,15 @@ impl Server {
///
/// [`tonic`]: https://docs.rs/tonic/
pub async fn serve_with(
mut self,
self,
mut builder: tonic::transport::Server,
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let aggregate = self
.aggregator
.take()
.expect("cannot start server multiple times");
let aggregate = spawn_named(aggregate.run(), "console::aggregate");
let addr = self.addr.clone();
let router = builder.add_service(
proto::instrument::instrument_server::InstrumentServer::new(self),
);
let ServerParts {
instrument_server: service,
aggregator_handle: aggregate,
} = self.into_parts();
let router = builder.add_service(service);
let res = match addr {
ServerAddr::Tcp(addr) => {
let serve = router.serve(addr);
Expand All @@ -957,9 +957,110 @@ impl Server {
spawn_named(serve, "console::serve").await
}
};
aggregate.abort();
drop(aggregate);
res?.map_err(Into::into)
}

/// Returns the parts needed to spawn a gRPC server and keep the aggregation
/// worker running.
///
/// Note that a server spawned in this way will overwrite any value set by
/// [`Builder::server_addr`] as the user becomes responsible for defining
/// the address when calling [`Router::serve`].
///
/// # Examples
///
/// The parts can be used to serve the instrument server together with
/// other endpoints from the same gRPC server.
///
/// ```
/// use console_subscriber::{ConsoleLayer, ServerParts};
///
/// # let runtime = tokio::runtime::Builder::new_current_thread()
/// # .enable_all()
/// # .build()
/// # .unwrap();
/// # runtime.block_on(async {
/// let (console_layer, server) = ConsoleLayer::builder().build();
/// let ServerParts {
/// instrument_server,
/// aggregator_handle,
/// ..
/// } = server.into_parts();
///
/// let router = tonic::transport::Server::builder()
/// //.add_service(some_other_service)
/// .add_service(instrument_server);
/// let serve = router.serve(std::net::SocketAddr::new(
/// std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)),
/// 6669,
/// ));
///
/// // Finally, spawn the server.
/// tokio::spawn(serve);
/// # // Avoid a warning that `console_layer` and `aggregator_handle` are unused.
/// # drop(console_layer);
/// # drop(aggregator_handle);
/// # });
/// ```
///
/// [`Router::serve`]: fn@tonic::transport::server::Router::serve
pub fn into_parts(mut self) -> ServerParts {
let aggregate = self
.aggregator
.take()
.expect("cannot start server multiple times");
let aggregate = spawn_named(aggregate.run(), "console::aggregate");

let service = proto::instrument::instrument_server::InstrumentServer::new(self);

ServerParts {
instrument_server: service,
aggregator_handle: AggregatorHandle {
join_handle: aggregate,
},
}
}
}

/// Server Parts
///
/// This struct contains the parts returned by [`Server::into_parts`]. It may contain
/// further parts in the future, an as such is marked as `non_exhaustive`.
///
/// The `InstrumentServer<Server>` can be used to construct a router which
/// can be added to a [`tonic`] gRPC server.
///
/// The [`AggregatorHandle`] must be kept until after the server has been
/// shut down.
///
/// See the [`Server::into_parts`] documentation for usage.
#[non_exhaustive]
pub struct ServerParts {
/// The instrument server.
///
/// See the documentation for [`InstrumentServer`] for details.
pub instrument_server: InstrumentServer<Server>,

/// The aggregate handle.
///
/// See the documentation for [`AggregatorHandle`] for details.
pub aggregator_handle: AggregatorHandle,
}

/// Aggregator handle.
///
/// This object is returned from [`Server::into_parts`] and must be
/// kept as long as the `InstrumentServer<Server>` - which is also
/// returned - is in use.
pub struct AggregatorHandle {
join_handle: JoinHandle<()>,
}

impl Drop for AggregatorHandle {
fn drop(&mut self) {
self.join_handle.abort();
}
}

#[tonic::async_trait]
Expand Down
184 changes: 184 additions & 0 deletions console-subscriber/tests/framework.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
//! Framework tests
//!
//! The tests in this module are here to verify the testing framework itself.
//! As such, some of these tests may be repeated elsewhere (where we wish to
//! actually test the functionality of `console-subscriber`) and others are
//! negative tests that should panic.
use std::time::Duration;

use tokio::{task, time::sleep};

mod support;
use support::{assert_task, assert_tasks, ExpectedTask, MAIN_TASK_NAME};

#[test]
fn expect_present() {
let expected_task = ExpectedTask::default()
.match_default_name()
.expect_present();

let future = async {
sleep(Duration::ZERO).await;
};

assert_task(expected_task, future);
}

#[test]
#[should_panic(expected = "Test failed: Task validation failed:
- Task<name=main>: no expectations set, if you want to just expect that a matching task is present, use `expect_present()`
")]
fn fail_no_expectations() {
let expected_task = ExpectedTask::default().match_default_name();

let future = async {
sleep(Duration::ZERO).await;
};

assert_task(expected_task, future);
}

#[test]
fn wakes() {
let expected_task = ExpectedTask::default().match_default_name().expect_wakes(1);

let future = async {
sleep(Duration::ZERO).await;
};

assert_task(expected_task, future);
}

#[test]
#[should_panic(expected = "Test failed: Task validation failed:
- Task<name=main>: expected `wakes` to be 5, but actual was 1
")]
fn fail_wakes() {
let expected_task = ExpectedTask::default().match_default_name().expect_wakes(5);

let future = async {
sleep(Duration::ZERO).await;
};

assert_task(expected_task, future);
}

#[test]
fn self_wakes() {
let expected_task = ExpectedTask::default()
.match_default_name()
.expect_self_wakes(1);

let future = async { task::yield_now().await };

assert_task(expected_task, future);
}

#[test]
#[should_panic(expected = "Test failed: Task validation failed:
- Task<name=main>: expected `self_wakes` to be 1, but actual was 0
")]
fn fail_self_wake() {
let expected_task = ExpectedTask::default()
.match_default_name()
.expect_self_wakes(1);

let future = async {
sleep(Duration::ZERO).await;
};

assert_task(expected_task, future);
}

#[test]
fn test_spawned_task() {
let expected_task = ExpectedTask::default()
.match_name("another-name".into())
.expect_present();

let future = async {
task::Builder::new()
.name("another-name")
.spawn(async { task::yield_now().await })
};

assert_task(expected_task, future);
}

#[test]
#[should_panic(expected = "Test failed: Task validation failed:
- Task<name=wrong-name>: no matching actual task was found
")]
fn fail_wrong_task_name() {
let expected_task = ExpectedTask::default().match_name("wrong-name".into());

let future = async { task::yield_now().await };

assert_task(expected_task, future);
}

#[test]
fn multiple_tasks() {
let expected_tasks = vec![
ExpectedTask::default()
.match_name("task-1".into())
.expect_wakes(1),
ExpectedTask::default()
.match_name("task-2".into())
.expect_wakes(1),
];

let future = async {
let task1 = task::Builder::new()
.name("task-1")
.spawn(async { task::yield_now().await })
.unwrap();
let task2 = task::Builder::new()
.name("task-2")
.spawn(async { task::yield_now().await })
.unwrap();

tokio::try_join! {
task1,
task2,
}
.unwrap();
};

assert_tasks(expected_tasks, future);
}

#[test]
#[should_panic(expected = "Test failed: Task validation failed:
- Task<name=task-2>: expected `wakes` to be 2, but actual was 1
")]
fn fail_1_of_2_expected_tasks() {
let expected_tasks = vec![
ExpectedTask::default()
.match_name("task-1".into())
.expect_wakes(1),
ExpectedTask::default()
.match_name("task-2".into())
.expect_wakes(2),
];

let future = async {
let task1 = task::Builder::new()
.name("task-1")
.spawn(async { task::yield_now().await })
.unwrap();
let task2 = task::Builder::new()
.name("task-2")
.spawn(async { task::yield_now().await })
.unwrap();

tokio::try_join! {
task1,
task2,
}
.unwrap();
};

assert_tasks(expected_tasks, future);
}
Loading

0 comments on commit b68ef76

Please sign in to comment.