Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(system): Spawn with custom task ID #4262

Merged
merged 31 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
3fab72e
rm start_in
jjbayer Nov 13, 2024
ff88466
wip: easy cases
jjbayer Nov 14, 2024
45f580a
spawn
jjbayer Nov 14, 2024
4b2c4f9
clean
jjbayer Nov 14, 2024
20e403c
wip: service runner
jjbayer Nov 14, 2024
d246af0
update usage
jjbayer Nov 14, 2024
ffa83ca
fix remaining 2
jjbayer Nov 14, 2024
97cd873
lint
jjbayer Nov 14, 2024
ced514b
Merge remote-tracking branch 'origin/master' into joris/join
jjbayer Nov 14, 2024
a9a5b6d
doc
jjbayer Nov 14, 2024
0a28e44
lint
jjbayer Nov 14, 2024
d6bbdcc
health check
jjbayer Nov 14, 2024
d0b4af9
changelog
jjbayer Nov 14, 2024
e885bda
Update relay-server/src/services/projects/source/mod.rs
jjbayer Nov 14, 2024
2260462
start_with
jjbayer Nov 14, 2024
61a8c14
naming
jjbayer Nov 14, 2024
5803f9e
Merge remote-tracking branch 'origin/master' into joris/panic-unhealthy
jjbayer Nov 14, 2024
ad28282
merge
jjbayer Nov 14, 2024
82729a8
Revert "health check"
jjbayer Nov 14, 2024
f92a26d
Merge remote-tracking branch 'origin/joris/join' into joris/join
jjbayer Nov 14, 2024
740bc33
push
jjbayer Nov 14, 2024
d46f370
Revert "changelog"
jjbayer Nov 15, 2024
1af06f1
Merge remote-tracking branch 'origin/master' into joris/join
jjbayer Nov 18, 2024
491b7e1
ref(system): Spawn with custom task ID
jjbayer Nov 18, 2024
bea8db8
lint
jjbayer Nov 18, 2024
b7b835c
ref: for_service()
jjbayer Nov 18, 2024
b36a988
lint
jjbayer Nov 18, 2024
8352ac9
Merge branch 'master' into joris/spawn-id
jjbayer Nov 19, 2024
4ea90ca
dedup changelog
jjbayer Nov 19, 2024
6fb2d29
fix import
jjbayer Nov 21, 2024
766090d
lint
jjbayer Nov 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 83 additions & 36 deletions relay-system/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,76 +2,95 @@ use futures::Future;
use tokio::task::JoinHandle;

use crate::statsd::SystemCounters;
use crate::Service;

/// Spawns a new asynchronous task, returning a [`JoinHandle`] for it.
/// Spawns an instrumented task with an automatically generated [`TaskId`].
///
/// This is in instrumented spawn variant of Tokio's [`tokio::spawn`].
/// Returns a [`JoinHandle`].
#[macro_export]
macro_rules! spawn {
($future:expr) => {{
static _TASK_ID: ::std::sync::OnceLock<$crate::TaskId> = ::std::sync::OnceLock::new();
let task_id = _TASK_ID.get_or_init(|| (*::std::panic::Location::caller()).into());
$crate::_spawn_inner(task_id, $future)
static _PARTS: ::std::sync::OnceLock<(String, String, String)> =
::std::sync::OnceLock::new();
let (id, file, line) = _PARTS.get_or_init(|| {
let caller = *::std::panic::Location::caller();
let id = format!("{}:{}", caller.file(), caller.line());
(id, caller.file().to_owned(), caller.line().to_string())
});
$crate::spawn(
$crate::TaskId::_from_location(id.as_str(), file.as_str(), line.as_str()),
$future,
)
}};
}

#[doc(hidden)]
/// Spawns a new asynchronous task, returning a [`JoinHandle`] for it.
///
/// This is in instrumented spawn variant of Tokio's [`tokio::spawn`].
#[allow(clippy::disallowed_methods)]
pub fn _spawn_inner<F>(task_id: &'static TaskId, future: F) -> JoinHandle<F::Output>
pub fn spawn<F>(task_id: TaskId, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
tokio::spawn(Task::new(task_id, future))
}

/// An internal id for a spawned task.
#[doc(hidden)]
/// An identifier for tasks spawned by [`spawn()`], used to log metrics.
pub struct TaskId {
id: String,
file: String,
line: String,
id: &'static str,
file: Option<&'static str>,
line: Option<&'static str>,
}

impl From<std::panic::Location<'_>> for TaskId {
fn from(value: std::panic::Location<'_>) -> Self {
impl TaskId {
/// Create a task ID based on the service's name.
pub fn for_service<S: Service>() -> Self {
Self {
id: S::name(),
file: None,
line: None,
}
}

#[doc(hidden)]
pub fn _from_location(id: &'static str, file: &'static str, line: &'static str) -> Self {
Self {
id: format!("{}:{}", value.file(), value.line()),
file: value.file().to_owned(),
line: value.line().to_string(),
id,
file: Some(file),
line: Some(line),
}
}

fn emit_metric(&self, metric: SystemCounters) {
let Self { id, file, line } = self;
relay_statsd::metric!(
counter(metric) += 1,
id = id,
file = file.unwrap_or_default(),
line = line.unwrap_or_default()
);
}
}

pin_project_lite::pin_project! {
/// Wraps a future and emits related task metrics.
struct Task<T> {
id: &'static TaskId,
id: TaskId,
#[pin]
inner: T,
}

impl<T> PinnedDrop for Task<T> {
fn drop(this: Pin<&mut Self>) {
let this = this.project();
relay_statsd::metric!(
counter(SystemCounters::RuntimeTaskTerminated) += 1,
id = this.id.id.as_str(),
file = this.id.file.as_str(),
line = this.id.line.as_str(),
);
this.id.emit_metric(SystemCounters::RuntimeTaskTerminated);
}
}
}

impl<T> Task<T> {
fn new(id: &'static TaskId, inner: T) -> Self {
relay_statsd::metric!(
counter(SystemCounters::RuntimeTaskCreated) += 1,
id = id.id.as_str(),
file = id.file.as_str(),
line = id.line.as_str(),
);
fn new(id: TaskId, inner: T) -> Self {
id.emit_metric(SystemCounters::RuntimeTaskCreated);
Self { id, inner }
}
}
Expand All @@ -92,6 +111,8 @@ impl<T: Future> Future for Task<T> {
mod tests {
use insta::assert_debug_snapshot;

use crate::{Service, TaskId};

#[test]
fn test_spawn_spawns_a_future() {
let rt = tokio::runtime::Builder::new_current_thread()
Expand All @@ -107,15 +128,41 @@ mod tests {
#[cfg(not(windows))]
assert_debug_snapshot!(captures, @r###"
[
"runtime.task.spawn.created:1|c|#id:relay-system/src/runtime.rs:103,file:relay-system/src/runtime.rs,line:103",
"runtime.task.spawn.terminated:1|c|#id:relay-system/src/runtime.rs:103,file:relay-system/src/runtime.rs,line:103",
"runtime.task.spawn.created:1|c|#id:relay-system/src/runtime.rs:124,file:relay-system/src/runtime.rs,line:124",
"runtime.task.spawn.terminated:1|c|#id:relay-system/src/runtime.rs:124,file:relay-system/src/runtime.rs,line:124",
]
"###);
#[cfg(windows)]
assert_debug_snapshot!(captures, @r###"
[
"runtime.task.spawn.created:1|c|#id:relay-system\\src\\runtime.rs:103,file:relay-system\\src\\runtime.rs,line:103",
"runtime.task.spawn.terminated:1|c|#id:relay-system\\src\\runtime.rs:103,file:relay-system\\src\\runtime.rs,line:103",
"runtime.task.spawn.created:1|c|#id:relay-system\\src\\runtime.rs:124,file:relay-system\\src\\runtime.rs,line:124",
"runtime.task.spawn.terminated:1|c|#id:relay-system\\src\\runtime.rs:124,file:relay-system\\src\\runtime.rs,line:124",
]
"###);
}

#[test]
fn test_spawn_with_custom_id() {
struct Foo;
impl Service for Foo {
type Interface = ();
async fn run(self, _rx: crate::Receiver<Self::Interface>) {}
}

let rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();

let captures = relay_statsd::with_capturing_test_client(|| {
rt.block_on(async {
let _ = crate::spawn(TaskId::for_service::<Foo>(), async {}).await;
})
});

assert_debug_snapshot!(captures, @r###"
[
"runtime.task.spawn.created:1|c|#id:relay_system::runtime::tests::test_spawn_with_custom_id::Foo,file:,line:",
"runtime.task.spawn.terminated:1|c|#id:relay_system::runtime::tests::test_spawn_with_custom_id::Foo,file:,line:",
]
"###);
}
Expand Down
7 changes: 4 additions & 3 deletions relay-system/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinHandle;
use tokio::time::MissedTickBehavior;

use crate::spawn;
use crate::statsd::SystemGauges;
use crate::{spawn, TaskId};

/// Interval for recording backlog metrics on service channels.
const BACKLOG_INTERVAL: Duration = Duration::from_secs(1);
Expand Down Expand Up @@ -1009,7 +1009,7 @@ pub trait Service: Sized {
/// for tests.
fn start_detached(self) -> Addr<Self::Interface> {
let (addr, rx) = channel(Self::name());
spawn!(self.run(rx));
spawn(TaskId::for_service::<Self>(), self.run(rx));
addr
}

Expand Down Expand Up @@ -1043,7 +1043,8 @@ impl ServiceRunner {

/// Starts a service and starts tracking its join handle, given a predefined receiver.
pub fn start_with<S: Service>(&mut self, service: S, rx: Receiver<S::Interface>) {
self.0.push(spawn!(service.run(rx)));
self.0
.push(spawn(TaskId::for_service::<S>(), service.run(rx)));
}

/// Awaits until all services have finished.
Expand Down
2 changes: 1 addition & 1 deletion relay-system/src/statsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use relay_statsd::{CounterMetric, GaugeMetric};
pub enum SystemCounters {
/// Number of runtime tasks created/spawned.
///
/// Every call to [`spawn`](`crate::spawn`) increases this counter by one.
/// Every call to [`spawn`](`crate::spawn()`) increases this counter by one.
///
/// This metric is tagged with:
/// - `id`: A unique identifier for the task, derived from its location in code.
Expand Down
Loading