Skip to content

Commit

Permalink
chore(observability): count byte_size after transforming event (vecto…
Browse files Browse the repository at this point in the history
…rdotdev#17941)

Before this the `component_sent_event_bytes_total` was emitting the json
byte size of the event including any fields dropped by `only_fields` and
`except_field` settings.

This changes it so the the count is made after transforming the event. 

A complication arose whereby if the service field was dropped we would
no longer have access to this value to emit the `service` tag with the
metrics. This also adds a `dropped_fields` field to the event metadata
where this value can be stored and accessed.

---------

Signed-off-by: Stephen Wakely <[email protected]>
  • Loading branch information
StephenWakely authored Jul 21, 2023
1 parent 7050b7e commit 0bf6abd
Show file tree
Hide file tree
Showing 29 changed files with 427 additions and 155 deletions.
16 changes: 12 additions & 4 deletions lib/vector-core/src/event/log_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,11 +292,19 @@ impl LogEvent {
}
}

/// Retrieves the value of a field based on it's meaning.
/// This will first check if the value has previously been dropped. It is worth being
/// aware that if the field has been dropped and then some how readded, we still fetch
/// the dropped value here.
pub fn get_by_meaning(&self, meaning: impl AsRef<str>) -> Option<&Value> {
self.metadata()
.schema_definition()
.meaning_path(meaning.as_ref())
.and_then(|path| self.get(path))
if let Some(dropped) = self.metadata().dropped_field(&meaning) {
Some(dropped)
} else {
self.metadata()
.schema_definition()
.meaning_path(meaning.as_ref())
.and_then(|path| self.get(path))
}
}

// TODO(Jean): Once the event API uses `Lookup`, the allocation here can be removed.
Expand Down
22 changes: 22 additions & 0 deletions lib/vector-core/src/event/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ pub struct EventMetadata {
/// TODO(Jean): must not skip serialization to track schemas across restarts.
#[serde(default = "default_schema_definition", skip)]
schema_definition: Arc<schema::Definition>,

/// A store of values that may be dropped during the encoding process but may be needed
/// later on. The map is indexed by meaning.
/// Currently this is just used for the `service`. If the service field is dropped by `only_fields`
/// we need to ensure it is still available later on for emitting metrics tagged by the service.
/// This field could almost be keyed by `&'static str`, but because it needs to be deserializable
/// we have to use `String`.
dropped_fields: BTreeMap<String, Value>,
}

fn default_metadata_value() -> Value {
Expand Down Expand Up @@ -123,6 +131,19 @@ impl EventMetadata {
pub fn set_splunk_hec_token(&mut self, secret: Arc<str>) {
self.secrets.insert(SPLUNK_HEC_TOKEN, secret);
}

/// Adds the value to the dropped fields list.
/// There is currently no way to remove a field from this list, so if a field is dropped
/// and then the field is re-added with a new value - the dropped value will still be
/// retrieved.
pub fn add_dropped_field(&mut self, meaning: String, value: Value) {
self.dropped_fields.insert(meaning, value);
}

/// Fetches the dropped field by meaning.
pub fn dropped_field(&self, meaning: impl AsRef<str>) -> Option<&Value> {
self.dropped_fields.get(meaning.as_ref())
}
}

impl Default for EventMetadata {
Expand All @@ -134,6 +155,7 @@ impl Default for EventMetadata {
schema_definition: default_schema_definition(),
source_id: None,
upstream_id: None,
dropped_fields: BTreeMap::new(),
}
}
}
Expand Down
17 changes: 17 additions & 0 deletions lib/vector-core/src/schema/meaning.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
//! Constants for commonly used semantic meanings.
/// The service typically represents the application that generated the event.
pub const SERVICE: &str = "service";

/// The main text message of the event.
pub const MESSAGE: &str = "message";

/// The main timestamp of the event.
pub const TIMESTAMP: &str = "timestamp";

/// The hostname of the machine where the event was generated.
pub const HOST: &str = "host";

pub const SOURCE: &str = "source";
pub const SEVERITY: &str = "severity";
pub const TRACE_ID: &str = "trace_id";
1 change: 1 addition & 0 deletions lib/vector-core/src/schema/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod definition;
pub mod meaning;
mod requirement;

pub use definition::Definition;
Expand Down
129 changes: 123 additions & 6 deletions src/codecs/encoding/transformer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use lookup::{
use serde::{Deserialize, Deserializer};
use vector_config::configurable_component;
use vector_core::event::{LogEvent, MaybeAsLogMut};
use vector_core::schema::meaning;
use vrl::value::Value;

use crate::{event::Event, serde::skip_serializing_if_default};
Expand Down Expand Up @@ -128,20 +129,52 @@ impl Transformer {

fn apply_only_fields(&self, log: &mut LogEvent) {
if let Some(only_fields) = self.only_fields.as_ref() {
let old_value = std::mem::replace(log.value_mut(), Value::Object(BTreeMap::new()));
let mut old_value = std::mem::replace(log.value_mut(), Value::Object(BTreeMap::new()));

for field in only_fields {
if let Some(value) = old_value.get(field) {
log.insert((PathPrefix::Event, field), value.clone());
if let Some(value) = old_value.remove(field, true) {
log.insert((PathPrefix::Event, field), value);
}
}

// We may need the service field to apply tags to emitted metrics after the log message has been pruned. If there
// is a service meaning, we move this value to `dropped_fields` in the metadata.
// If the field is still in the new log message after pruning it will have been removed from `old_value` above.
let service_path = log
.metadata()
.schema_definition()
.meaning_path(meaning::SERVICE);
if let Some(service_path) = service_path {
let mut new_log = LogEvent::from(old_value);
if let Some(service) = new_log.remove(service_path) {
log.metadata_mut()
.add_dropped_field(meaning::SERVICE.to_string(), service);
}
}
}
}

fn apply_except_fields(&self, log: &mut LogEvent) {
use lookup::path::TargetPath;

if let Some(except_fields) = self.except_fields.as_ref() {
let service_path = log
.metadata()
.schema_definition()
.meaning_path(meaning::SERVICE)
.map(|path| path.value_path().to_string());

for field in except_fields {
log.remove(field.as_str());
let value = log.remove(field.as_str());

// If we are removing the service field we need to store this in a `dropped_fields` list as we may need to
// refer to this later when emitting metrics.
if let Some(v) = value {
if matches!(service_path.as_ref(), Some(path) if path == field) {
log.metadata_mut()
.add_dropped_field(meaning::SERVICE.to_string(), v);
}
}
}
}
}
Expand Down Expand Up @@ -213,10 +246,15 @@ pub enum TimestampFormat {
#[cfg(test)]
mod tests {
use indoc::indoc;
use vector_core::config::log_schema;
use lookup::path::parse_target_path;
use vector_common::btreemap;
use vector_core::config::{log_schema, LogNamespace};
use vrl::value::Kind;

use crate::config::schema;

use super::*;
use std::collections::BTreeMap;
use std::{collections::BTreeMap, sync::Arc};

#[test]
fn serialize() {
Expand Down Expand Up @@ -374,4 +412,83 @@ mod tests {
"#});
assert!(config.is_err())
}

#[test]
fn only_fields_with_service() {
let transformer: Transformer = toml::from_str(r#"only_fields = ["message"]"#).unwrap();
let mut log = LogEvent::default();
{
log.insert("message", 1);
log.insert("thing.service", "carrot");
}

let schema = schema::Definition::new_with_default_metadata(
Kind::object(btreemap! {
"thing" => Kind::object(btreemap! {
"service" => Kind::bytes(),
})
}),
[LogNamespace::Vector],
);

let schema = schema.with_meaning(parse_target_path("thing.service").unwrap(), "service");

let mut event = Event::from(log);

event
.metadata_mut()
.set_schema_definition(&Arc::new(schema));

transformer.transform(&mut event);
assert!(event.as_mut_log().contains("message"));

// Event no longer contains the service field.
assert!(!event.as_mut_log().contains("thing.service"));

// But we can still get the service by meaning.
assert_eq!(
&Value::from("carrot"),
event.as_log().get_by_meaning("service").unwrap()
);
}

#[test]
fn except_fields_with_service() {
let transformer: Transformer =
toml::from_str(r#"except_fields = ["thing.service"]"#).unwrap();
let mut log = LogEvent::default();
{
log.insert("message", 1);
log.insert("thing.service", "carrot");
}

let schema = schema::Definition::new_with_default_metadata(
Kind::object(btreemap! {
"thing" => Kind::object(btreemap! {
"service" => Kind::bytes(),
})
}),
[LogNamespace::Vector],
);

let schema = schema.with_meaning(parse_target_path("thing.service").unwrap(), "service");

let mut event = Event::from(log);

event
.metadata_mut()
.set_schema_definition(&Arc::new(schema));

transformer.transform(&mut event);
assert!(event.as_mut_log().contains("message"));

// Event no longer contains the service field.
assert!(!event.as_mut_log().contains("thing.service"));

// But we can still get the service by meaning.
assert_eq!(
&Value::from("carrot"),
event.as_log().get_by_meaning("service").unwrap()
);
}
}
13 changes: 11 additions & 2 deletions src/sinks/amqp/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::sinks::prelude::*;
use bytes::BytesMut;
use std::io;
use tokio_util::codec::Encoder as _;
use vector_core::config::telemetry;

#[derive(Clone, Debug)]
pub(super) struct AmqpEncoder {
Expand All @@ -11,9 +12,17 @@ pub(super) struct AmqpEncoder {
}

impl encoding::Encoder<Event> for AmqpEncoder {
fn encode_input(&self, mut input: Event, writer: &mut dyn io::Write) -> io::Result<usize> {
fn encode_input(
&self,
mut input: Event,
writer: &mut dyn io::Write,
) -> io::Result<(usize, GroupedCountByteSize)> {
let mut body = BytesMut::new();
self.transformer.transform(&mut input);

let mut byte_size = telemetry().create_request_count_byte_size();
byte_size.add_event(&input, input.estimated_json_encoded_size_of());

let mut encoder = self.encoder.clone();
encoder
.encode(input, &mut body)
Expand All @@ -22,6 +31,6 @@ impl encoding::Encoder<Event> for AmqpEncoder {
let body = body.freeze();
write_all(writer, 1, body.as_ref())?;

Ok(body.len())
Ok((body.len(), byte_size))
}
}
24 changes: 19 additions & 5 deletions src/sinks/azure_blob/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use codecs::{
encoding::{Framer, FramingConfig},
NewlineDelimitedEncoder, TextSerializerConfig,
};
use vector_core::partition::Partitioner;
use vector_common::request_metadata::GroupedCountByteSize;
use vector_core::{partition::Partitioner, EstimatedJsonEncodedSizeOf};

use super::config::AzureBlobSinkConfig;
use super::request_builder::AzureBlobRequestOptions;
Expand Down Expand Up @@ -68,10 +69,13 @@ fn azure_blob_build_request_without_compression() {
compression,
};

let mut byte_size = GroupedCountByteSize::new_untagged();
byte_size.add_event(&log, log.estimated_json_encoded_size_of());

let (metadata, request_metadata_builder, _events) =
request_options.split_input((key, vec![log]));

let payload = EncodeResult::uncompressed(Bytes::new());
let payload = EncodeResult::uncompressed(Bytes::new(), byte_size);
let request_metadata = request_metadata_builder.build(&payload);
let request = request_options.build_request(metadata, request_metadata, payload);

Expand Down Expand Up @@ -112,10 +116,14 @@ fn azure_blob_build_request_with_compression() {
),
compression,
};

let mut byte_size = GroupedCountByteSize::new_untagged();
byte_size.add_event(&log, log.estimated_json_encoded_size_of());

let (metadata, request_metadata_builder, _events) =
request_options.split_input((key, vec![log]));

let payload = EncodeResult::uncompressed(Bytes::new());
let payload = EncodeResult::uncompressed(Bytes::new(), byte_size);
let request_metadata = request_metadata_builder.build(&payload);
let request = request_options.build_request(metadata, request_metadata, payload);

Expand Down Expand Up @@ -157,10 +165,13 @@ fn azure_blob_build_request_with_time_format() {
compression,
};

let mut byte_size = GroupedCountByteSize::new_untagged();
byte_size.add_event(&log, log.estimated_json_encoded_size_of());

let (metadata, request_metadata_builder, _events) =
request_options.split_input((key, vec![log]));

let payload = EncodeResult::uncompressed(Bytes::new());
let payload = EncodeResult::uncompressed(Bytes::new(), byte_size);
let request_metadata = request_metadata_builder.build(&payload);
let request = request_options.build_request(metadata, request_metadata, payload);

Expand Down Expand Up @@ -205,10 +216,13 @@ fn azure_blob_build_request_with_uuid() {
compression,
};

let mut byte_size = GroupedCountByteSize::new_untagged();
byte_size.add_event(&log, log.estimated_json_encoded_size_of());

let (metadata, request_metadata_builder, _events) =
request_options.split_input((key, vec![log]));

let payload = EncodeResult::uncompressed(Bytes::new());
let payload = EncodeResult::uncompressed(Bytes::new(), byte_size);
let request_metadata = request_metadata_builder.build(&payload);
let request = request_options.build_request(metadata, request_metadata, payload);

Expand Down
16 changes: 8 additions & 8 deletions src/sinks/datadog/logs/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{convert::TryFrom, sync::Arc};
use indoc::indoc;
use tower::ServiceBuilder;
use vector_config::configurable_component;
use vector_core::config::proxy::ProxyConfig;
use vector_core::{config::proxy::ProxyConfig, schema::meaning};
use vrl::value::Kind;

use super::{service::LogApiRetry, sink::LogSinkBuilder};
Expand Down Expand Up @@ -176,13 +176,13 @@ impl SinkConfig for DatadogLogsConfig {

fn input(&self) -> Input {
let requirement = schema::Requirement::empty()
.required_meaning("message", Kind::bytes())
.required_meaning("timestamp", Kind::timestamp())
.optional_meaning("host", Kind::bytes())
.optional_meaning("source", Kind::bytes())
.optional_meaning("severity", Kind::bytes())
.optional_meaning("service", Kind::bytes())
.optional_meaning("trace_id", Kind::bytes());
.required_meaning(meaning::MESSAGE, Kind::bytes())
.required_meaning(meaning::TIMESTAMP, Kind::timestamp())
.optional_meaning(meaning::HOST, Kind::bytes())
.optional_meaning(meaning::SOURCE, Kind::bytes())
.optional_meaning(meaning::SEVERITY, Kind::bytes())
.optional_meaning(meaning::SERVICE, Kind::bytes())
.optional_meaning(meaning::TRACE_ID, Kind::bytes());

Input::log().with_schema_requirement(requirement)
}
Expand Down
Loading

0 comments on commit 0bf6abd

Please sign in to comment.