Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(subscriber): only send *new* tasks/resources/etc over the event channel #238

Merged
merged 17 commits into from
Jan 12, 2022
Merged
21 changes: 21 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions console-subscriber/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,16 @@ tracing = "0.1.26"
tracing-subscriber = { version = "0.3.0", default-features = false, features = ["fmt", "registry"] }
futures = { version = "0.3", default-features = false }
hdrhistogram = { version = "7.3.0", default-features = false, features = ["serialization"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
# The parking_lot dependency is renamed, because we want our `parking_lot`
# feature to also enable `tracing-subscriber`'s parking_lot feature flag.
parking_lot_crate = { package = "parking_lot", version = "0.11", optional = true }
humantime = "2.1.0"

# Required for recording:
serde = { version = "1", features = ["derive"] }
serde_json = "1"
crossbeam-channel = "0.5"

[dev-dependencies]
tokio = { version = "^1.7", features = ["full", "rt-multi-thread"] }
futures = "0.3"
Expand Down
66 changes: 16 additions & 50 deletions console-subscriber/src/aggregator/id_data.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
use super::{shrink::ShrinkMap, DroppedAt, Id, ToProto};
use super::{shrink::ShrinkMap, Id, ToProto};
use crate::stats::{DroppedAt, Unsent};
use std::collections::HashMap;
use std::ops::{Deref, DerefMut};
use std::time::{Duration, SystemTime};

pub(crate) struct IdData<T> {
data: ShrinkMap<Id, (T, bool)>,
data: ShrinkMap<Id, T>,
}

pub(crate) struct Updating<'a, T>(&'a mut (T, bool));

pub(crate) enum Include {
All,
UpdatedOnly,
Expand All @@ -19,31 +17,19 @@ pub(crate) enum Include {
impl<T> Default for IdData<T> {
fn default() -> Self {
IdData {
data: ShrinkMap::<Id, (T, bool)>::new(),
data: ShrinkMap::<Id, T>::new(),
}
}
}

impl<T> IdData<T> {
pub(crate) fn update_or_default(&mut self, id: Id) -> Updating<'_, T>
where
T: Default,
{
Updating(self.data.entry(id).or_default())
}

pub(crate) fn update(&mut self, id: &Id) -> Option<Updating<'_, T>> {
self.data.get_mut(id).map(Updating)
}

impl<T: Unsent> IdData<T> {
pub(crate) fn insert(&mut self, id: Id, data: T) {
self.data.insert(id, (data, true));
self.data.insert(id, data);
}

pub(crate) fn since_last_update(&mut self) -> impl Iterator<Item = (&Id, &mut T)> {
self.data.iter_mut().filter_map(|(id, (data, dirty))| {
if *dirty {
*dirty = false;
self.data.iter_mut().filter_map(|(id, data)| {
if data.take_unsent() {
Some((id, data))
} else {
None
Expand All @@ -52,11 +38,11 @@ impl<T> IdData<T> {
}

pub(crate) fn all(&self) -> impl Iterator<Item = (&Id, &T)> {
self.data.iter().map(|(id, (data, _))| (id, data))
self.data.iter()
}

pub(crate) fn get(&self, id: &Id) -> Option<&T> {
self.data.get(id).map(|(data, _)| data)
self.data.get(id)
}

pub(crate) fn as_proto(&mut self, include: Include) -> HashMap<u64, T::Output>
Expand All @@ -75,7 +61,7 @@ impl<T> IdData<T> {
}
}

pub(crate) fn drop_closed<R: DroppedAt>(
pub(crate) fn drop_closed<R: DroppedAt + Unsent>(
&mut self,
stats: &mut IdData<R>,
now: SystemTime,
Expand All @@ -92,18 +78,19 @@ impl<T> IdData<T> {
// drop closed entities
tracing::trace!(?retention, has_watchers, "dropping closed");

stats.data.retain_and_shrink(|id, (stats, dirty)| {
stats.data.retain_and_shrink(|id, stats| {
if let Some(dropped_at) = stats.dropped_at() {
let dropped_for = now.duration_since(dropped_at).unwrap_or_default();
let dirty = stats.is_unsent();
let should_drop =
// if there are any clients watching, retain all dirty tasks regardless of age
(*dirty && has_watchers)
(dirty && has_watchers)
|| dropped_for > retention;
tracing::trace!(
stats.id = ?id,
stats.dropped_at = ?dropped_at,
stats.dropped_for = ?dropped_for,
stats.dirty = *dirty,
stats.dirty = dirty,
should_drop,
);
return !should_drop;
Expand All @@ -114,27 +101,6 @@ impl<T> IdData<T> {

// drop closed entities which no longer have stats.
self.data
.retain_and_shrink(|id, (_, _)| stats.data.contains_key(id));
}
}

// === impl Updating ===

impl<'a, T> Deref for Updating<'a, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.0 .0
}
}

impl<'a, T> DerefMut for Updating<'a, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0 .0
}
}

impl<'a, T> Drop for Updating<'a, T> {
fn drop(&mut self) {
self.0 .1 = true;
.retain_and_shrink(|id, _| stats.data.contains_key(id));
}
}
Loading