Skip to content

Commit

Permalink
enhancement(observability): add fixed tag option to `RegisteredEventC…
Browse files Browse the repository at this point in the history
…ache` (#17814)

@bruceg highlighted the fact that there was no straight forward way to
add fixed tags to the `RegisteredEventCache`.

This adds the option to do that. Very quickly, supposing the
`TaggedEventsSent` had two tags, `something` and `service`. `something`
never changes so can be passed in when the event cache is first created.
The code for this would look similar to:

```rust
crate::registered_event!(
    TaggedEventsSent {
        something: String,
        service: OptionalTag<String>,
    } => {
        events: Counter = {
            register_counter!("component_sent_events_total", &make_tags(&self.something, &self.service))
        },
        event_bytes: Counter = {
            register_counter!("component_sent_event_bytes_total", &make_tags(&self.something, &self.service))
        },
    }

    fn emit(&self, data: CountByteSize) {
       ...
    }

    fn register(something: String, tags: EventCountTags) {
        super::register(TaggedEventsSent::new(
            something,
            tags,
        ))
    }
);

...

        let events_sent = RegisteredEventCache::default(String::from("something"));
        ....
        events_sent.emit(service, *size);
```

---------

Signed-off-by: Stephen Wakely <[email protected]>
  • Loading branch information
StephenWakely authored Jul 4, 2023
1 parent 911477a commit bc86222
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 12 deletions.
71 changes: 65 additions & 6 deletions lib/vector-common/src/internal_event/cached_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ use super::{InternalEventHandle, RegisterInternalEvent};
/// new event is emitted for a previously unseen set of tags an event is registered
/// and stored in the cache.
#[derive(Derivative)]
#[derivative(Clone(bound = ""), Default(bound = ""))]
pub struct RegisteredEventCache<Event: RegisterTaggedInternalEvent> {
#[derivative(Clone(bound = "T: Clone"))]
pub struct RegisteredEventCache<T, Event: RegisterTaggedInternalEvent> {
fixed_tags: T,
cache: Arc<
RwLock<
BTreeMap<
Expand All @@ -36,16 +37,31 @@ pub trait RegisterTaggedInternalEvent: RegisterInternalEvent {
/// that will be used when registering the event.
type Tags;

fn register(tags: Self::Tags) -> <Self as RegisterInternalEvent>::Handle;
/// The type that contains data necessary to extract the tags that will
/// be fixed and only need setting up front when the cache is first created.
type Fixed;

fn register(fixed: Self::Fixed, tags: Self::Tags) -> <Self as RegisterInternalEvent>::Handle;
}

impl<Event, EventHandle, Data, Tags> RegisteredEventCache<Event>
impl<Event, EventHandle, Data, Tags, FixedTags> RegisteredEventCache<FixedTags, Event>
where
Data: Sized,
EventHandle: InternalEventHandle<Data = Data>,
Tags: Ord + Clone,
Event: RegisterInternalEvent<Handle = EventHandle> + RegisterTaggedInternalEvent<Tags = Tags>,
FixedTags: Clone,
Event: RegisterInternalEvent<Handle = EventHandle>
+ RegisterTaggedInternalEvent<Tags = Tags, Fixed = FixedTags>,
{
/// Create a new event cache with a set of fixed tags. These tags are passed to
/// all registered events.
pub fn new(fixed_tags: FixedTags) -> Self {
Self {
fixed_tags,
cache: Arc::default(),
}
}

/// Emits the event with the given tags.
/// It will register the event and store in the cache if this has not already
/// been done.
Expand All @@ -58,7 +74,10 @@ where
if let Some(event) = read.get(tags) {
event.emit(value);
} else {
let event = <Event as RegisterTaggedInternalEvent>::register(tags.clone());
let event = <Event as RegisterTaggedInternalEvent>::register(
self.fixed_tags.clone(),
tags.clone(),
);
event.emit(value);

// Ensure the read lock is dropped so we can write.
Expand All @@ -67,3 +86,43 @@ where
}
}
}

#[cfg(test)]
mod tests {
#![allow(unreachable_pub)]
use metrics::{register_counter, Counter};

use super::*;

crate::registered_event!(
TestEvent {
fixed: String,
dynamic: String,
} => {
event: Counter = {
register_counter!("test_event_total", "fixed" => self.fixed, "dynamic" => self.dynamic)
},
}

fn emit(&self, count: u64) {
self.event.increment(count);
}

fn register(fixed: String, dynamic: String) {
crate::internal_event::register(TestEvent {
fixed,
dynamic,
})
}
);

#[test]
fn test_fixed_tag() {
let event: RegisteredEventCache<String, TestEvent> =
RegisteredEventCache::new("fixed".to_string());

for tag in 1..=5 {
event.emit(&format!("dynamic{tag}"), tag);
}
}
}
2 changes: 1 addition & 1 deletion lib/vector-common/src/internal_event/events_sent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ crate::registered_event!(
self.event_bytes.increment(byte_size.get() as u64);
}

fn register(tags: EventCountTags) {
fn register(_fixed: (), tags: EventCountTags) {
super::register(TaggedEventsSent::new(
tags,
))
Expand Down
4 changes: 3 additions & 1 deletion lib/vector-common/src/internal_event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ macro_rules! registered_event {
fn emit(&$slf:ident, $data_name:ident: $data:ident)
$emit_body:block

$(fn register($tags_name:ident: $tags:ty)
$(fn register($fixed_name:ident: $fixed_tags:ty, $tags_name:ident: $tags:ty)
$register_body:block)?
) => {
paste::paste!{
Expand Down Expand Up @@ -251,8 +251,10 @@ macro_rules! registered_event {

$(impl $crate::internal_event::cached_event::RegisterTaggedInternalEvent for $event {
type Tags = $tags;
type Fixed = $fixed_tags;

fn register(
$fixed_name: $fixed_tags,
$tags_name: $tags,
) -> <Self as $crate::internal_event::RegisterInternalEvent>::Handle {
$register_body
Expand Down
4 changes: 2 additions & 2 deletions lib/vector-common/src/request_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,9 @@ impl GroupedCountByteSize {
}

/// Emits our counts to a `RegisteredEvent` cached event.
pub fn emit_event<T, H>(&self, event_cache: &RegisteredEventCache<T>)
pub fn emit_event<T, H>(&self, event_cache: &RegisteredEventCache<(), T>)
where
T: RegisterTaggedInternalEvent<Tags = EventCountTags, Handle = H>,
T: RegisterTaggedInternalEvent<Tags = EventCountTags, Fixed = (), Handle = H>,
H: InternalEventHandle<Data = CountByteSize>,
{
match self {
Expand Down
4 changes: 2 additions & 2 deletions lib/vector-core/src/stream/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ where
pin!(batched_input);

let bytes_sent = protocol.map(|protocol| register(BytesSent { protocol }));
let events_sent = RegisteredEventCache::default();
let events_sent = RegisteredEventCache::new(());

loop {
// Core behavior of the loop:
Expand Down Expand Up @@ -203,7 +203,7 @@ where
finalizers: EventFinalizers,
event_count: usize,
bytes_sent: &Option<Registered<BytesSent>>,
events_sent: &RegisteredEventCache<TaggedEventsSent>,
events_sent: &RegisteredEventCache<(), TaggedEventsSent>,
) {
match result {
Err(error) => {
Expand Down

0 comments on commit bc86222

Please sign in to comment.