Skip to content

Commit

Permalink
basic functionality works
Browse files Browse the repository at this point in the history
Signed-off-by: Eliza Weisman <[email protected]>
  • Loading branch information
hawkw committed Dec 28, 2021
1 parent 8bc1637 commit 94f541e
Show file tree
Hide file tree
Showing 6 changed files with 260 additions and 433 deletions.
66 changes: 16 additions & 50 deletions console-subscriber/src/aggregator/id_data.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
use super::{shrink::ShrinkMap, DroppedAt, Id, ToProto};
use super::{shrink::ShrinkMap, Id, ToProto};
use crate::stats::{DroppedAt, Unsent};
use std::collections::HashMap;
use std::ops::{Deref, DerefMut};
use std::time::{Duration, SystemTime};

pub(crate) struct IdData<T> {
data: ShrinkMap<Id, (T, bool)>,
data: ShrinkMap<Id, T>,
}

pub(crate) struct Updating<'a, T>(&'a mut (T, bool));

pub(crate) enum Include {
All,
UpdatedOnly,
Expand All @@ -19,31 +17,19 @@ pub(crate) enum Include {
impl<T> Default for IdData<T> {
fn default() -> Self {
IdData {
data: ShrinkMap::<Id, (T, bool)>::new(),
data: ShrinkMap::<Id, T>::new(),
}
}
}

impl<T> IdData<T> {
pub(crate) fn update_or_default(&mut self, id: Id) -> Updating<'_, T>
where
T: Default,
{
Updating(self.data.entry(id).or_default())
}

pub(crate) fn update(&mut self, id: &Id) -> Option<Updating<'_, T>> {
self.data.get_mut(id).map(Updating)
}

impl<T: Unsent> IdData<T> {
pub(crate) fn insert(&mut self, id: Id, data: T) {
self.data.insert(id, (data, true));
self.data.insert(id, data);
}

pub(crate) fn since_last_update(&mut self) -> impl Iterator<Item = (&Id, &mut T)> {
self.data.iter_mut().filter_map(|(id, (data, dirty))| {
if *dirty {
*dirty = false;
self.data.iter_mut().filter_map(|(id, data)| {
if data.take_unsent() {
Some((id, data))
} else {
None
Expand All @@ -52,11 +38,11 @@ impl<T> IdData<T> {
}

pub(crate) fn all(&self) -> impl Iterator<Item = (&Id, &T)> {
self.data.iter().map(|(id, (data, _))| (id, data))
self.data.iter()
}

pub(crate) fn get(&self, id: &Id) -> Option<&T> {
self.data.get(id).map(|(data, _)| data)
self.data.get(id)
}

pub(crate) fn as_proto(&mut self, include: Include) -> HashMap<u64, T::Output>
Expand All @@ -75,7 +61,7 @@ impl<T> IdData<T> {
}
}

pub(crate) fn drop_closed<R: DroppedAt>(
pub(crate) fn drop_closed<R: DroppedAt + Unsent>(
&mut self,
stats: &mut IdData<R>,
now: SystemTime,
Expand All @@ -92,18 +78,19 @@ impl<T> IdData<T> {
// drop closed entities
tracing::trace!(?retention, has_watchers, "dropping closed");

stats.data.retain_and_shrink(|id, (stats, dirty)| {
stats.data.retain_and_shrink(|id, stats| {
if let Some(dropped_at) = stats.dropped_at() {
let dropped_for = now.duration_since(dropped_at).unwrap_or_default();
let dirty = stats.is_unsent();
let should_drop =
// if there are any clients watching, retain all dirty tasks regardless of age
(*dirty && has_watchers)
(dirty && has_watchers)
|| dropped_for > retention;
tracing::trace!(
stats.id = ?id,
stats.dropped_at = ?dropped_at,
stats.dropped_for = ?dropped_for,
stats.dirty = *dirty,
stats.dirty = dirty,
should_drop,
);
return !should_drop;
Expand All @@ -114,27 +101,6 @@ impl<T> IdData<T> {

// drop closed entities which no longer have stats.
self.data
.retain_and_shrink(|id, (_, _)| stats.data.contains_key(id));
}
}

// === impl Updating ===

impl<'a, T> Deref for Updating<'a, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.0 .0
}
}

impl<'a, T> DerefMut for Updating<'a, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0 .0
}
}

impl<'a, T> Drop for Updating<'a, T> {
fn drop(&mut self) {
self.0 .1 = true;
.retain_and_shrink(|id, _| stats.data.contains_key(id));
}
}
Loading

0 comments on commit 94f541e

Please sign in to comment.