From 3b2a2be1b075344a92294c1248b09844f895ad72 Mon Sep 17 00:00:00 2001 From: Stephen Wakely Date: Wed, 31 May 2023 15:18:38 +0100 Subject: [PATCH] chore(observability): ensure `sent_event` and `received_event` metrics are estimated json size (#17465) This PR creates a newtype - [`JsonSize`](https://github.com/vectordotdev/vector/blob/stephen/event_json_size/lib/vector-common/src/json_size.rs) that is returned by the `EstimatedJsonEncodedSizeOf::estimated_json_encoded_size_of` trait function. The events that emit a `component_received_event_bytes_total` or `component_sent_event_bytes_total` event accept `JsonSize`. This allows us to use the compiler to ensure we are emitting the correct measurement. A number of components needed changing to ensure this worked. --------- Signed-off-by: Stephen Wakely --- .../src/internal_event/events_received.rs | 2 +- .../src/internal_event/events_sent.rs | 6 +- lib/vector-common/src/internal_event/mod.rs | 6 +- lib/vector-common/src/json_size.rs | 105 +++++ lib/vector-common/src/lib.rs | 2 + lib/vector-common/src/request_metadata.rs | 10 +- lib/vector-core/src/event/array.rs | 7 +- .../event/estimated_json_encoded_size_of.rs | 367 +++++++++--------- lib/vector-core/src/event/log_event.rs | 13 +- lib/vector-core/src/event/metric/mod.rs | 6 +- lib/vector-core/src/event/mod.rs | 4 +- lib/vector-core/src/event/trace.rs | 4 +- lib/vector-core/src/stream/driver.rs | 3 +- lib/vector-core/src/transform/mod.rs | 5 +- .../validators/component_spec/sources.rs | 9 +- src/internal_events/apache_metrics.rs | 9 +- src/internal_events/aws_ecs_metrics.rs | 9 +- src/internal_events/docker_logs.rs | 9 +- src/internal_events/exec.rs | 11 +- src/internal_events/file.rs | 10 +- src/internal_events/http.rs | 9 +- src/internal_events/http_client_source.rs | 9 +- src/internal_events/internal_logs.rs | 5 +- src/internal_events/kafka.rs | 9 +- src/internal_events/kubernetes_logs.rs | 11 +- src/internal_events/mongodb_metrics.rs | 11 +- src/internal_events/nginx_metrics.rs | 9 +- src/internal_events/socket.rs | 17 +- src/sinks/amqp/request_builder.rs | 5 + src/sinks/amqp/service.rs | 17 +- src/sinks/amqp/sink.rs | 3 +- src/sinks/aws_cloudwatch_logs/service.rs | 6 +- src/sinks/aws_cloudwatch_metrics/mod.rs | 6 +- src/sinks/aws_kinesis/service.rs | 8 +- src/sinks/aws_sqs/request_builder.rs | 2 +- src/sinks/aws_sqs/service.rs | 13 +- src/sinks/azure_blob/request_builder.rs | 4 +- src/sinks/azure_common/config.rs | 9 +- src/sinks/blackhole/sink.rs | 4 +- src/sinks/databend/service.rs | 2 +- src/sinks/datadog/events/service.rs | 6 +- src/sinks/datadog/logs/service.rs | 11 +- src/sinks/datadog/metrics/request_builder.rs | 12 +- src/sinks/datadog/metrics/service.rs | 11 +- src/sinks/datadog/traces/request_builder.rs | 13 +- src/sinks/datadog/traces/service.rs | 11 +- src/sinks/datadog_archives.rs | 4 +- src/sinks/elasticsearch/encoder.rs | 3 +- src/sinks/elasticsearch/request_builder.rs | 10 +- src/sinks/elasticsearch/retry.rs | 5 +- src/sinks/elasticsearch/service.rs | 9 +- src/sinks/influxdb/metrics.rs | 6 +- src/sinks/kafka/service.rs | 11 +- src/sinks/loki/event.rs | 11 +- src/sinks/opendal_common.rs | 9 +- src/sinks/prometheus/remote_write.rs | 5 +- src/sinks/pulsar/service.rs | 16 +- src/sinks/pulsar/sink.rs | 4 +- src/sinks/redis.rs | 3 +- src/sinks/s3_common/service.rs | 11 +- src/sinks/sematext/metrics.rs | 12 +- src/sinks/splunk_hec/common/response.rs | 3 +- src/sinks/splunk_hec/common/service.rs | 4 +- src/sinks/statsd/service.rs | 2 +- src/sinks/util/adaptive_concurrency/tests.rs | 5 +- src/sinks/util/batch.rs | 9 + src/sinks/util/buffer/mod.rs | 6 +- src/sinks/util/http.rs | 7 +- src/sinks/util/metadata.rs | 6 +- src/sinks/util/mod.rs | 7 +- src/sinks/util/processed_event.rs | 3 +- src/sinks/util/service.rs | 6 +- src/sinks/util/sink.rs | 47 ++- src/sinks/util/socket_bytes_sink.rs | 13 +- src/sinks/util/tcp.rs | 7 +- src/sinks/util/unix.rs | 7 +- src/sinks/vector/service.rs | 11 +- src/sinks/vector/sink.rs | 3 +- src/sources/dnstap/mod.rs | 69 ++-- src/sources/file.rs | 17 +- src/sources/file_descriptors/mod.rs | 2 +- src/sources/internal_logs.rs | 6 +- src/sources/internal_metrics.rs | 8 +- src/sources/postgresql_metrics.rs | 26 +- src/sources/util/framestream.rs | 9 +- src/sources/util/http_client.rs | 3 +- .../2023-07-04-0-31-0-upgrade-guide.md | 31 ++ 87 files changed, 807 insertions(+), 449 deletions(-) create mode 100644 lib/vector-common/src/json_size.rs create mode 100644 website/content/en/highlights/2023-07-04-0-31-0-upgrade-guide.md diff --git a/lib/vector-common/src/internal_event/events_received.rs b/lib/vector-common/src/internal_event/events_received.rs index 4021b3c578143..c25cc228c9fd1 100644 --- a/lib/vector-common/src/internal_event/events_received.rs +++ b/lib/vector-common/src/internal_event/events_received.rs @@ -18,6 +18,6 @@ crate::registered_event!( #[allow(clippy::cast_precision_loss)] self.events_count.record(count as f64); self.events.increment(count as u64); - self.event_bytes.increment(byte_size as u64); + self.event_bytes.increment(byte_size.get() as u64); } ); diff --git a/lib/vector-common/src/internal_event/events_sent.rs b/lib/vector-common/src/internal_event/events_sent.rs index 7d9986fdf63c4..d329562afe7fc 100644 --- a/lib/vector-common/src/internal_event/events_sent.rs +++ b/lib/vector-common/src/internal_event/events_sent.rs @@ -27,15 +27,15 @@ crate::registered_event!( match &self.output { Some(output) => { - trace!(message = "Events sent.", count = %count, byte_size = %byte_size, output = %output); + trace!(message = "Events sent.", count = %count, byte_size = %byte_size.get(), output = %output); } None => { - trace!(message = "Events sent.", count = %count, byte_size = %byte_size); + trace!(message = "Events sent.", count = %count, byte_size = %byte_size.get()); } } self.events.increment(count as u64); - self.event_bytes.increment(byte_size as u64); + self.event_bytes.increment(byte_size.get() as u64); } ); diff --git a/lib/vector-common/src/internal_event/mod.rs b/lib/vector-common/src/internal_event/mod.rs index 37d0dcfc634d9..7af70cc1322ee 100644 --- a/lib/vector-common/src/internal_event/mod.rs +++ b/lib/vector-common/src/internal_event/mod.rs @@ -16,6 +16,8 @@ pub use events_sent::{EventsSent, DEFAULT_OUTPUT}; pub use prelude::{error_stage, error_type}; pub use service::{CallError, PollReadyError}; +use crate::json_size::JsonSize; + pub trait InternalEvent: Sized { fn emit(self); @@ -106,9 +108,9 @@ pub struct ByteSize(pub usize); #[derive(Clone, Copy)] pub struct Count(pub usize); -/// Holds the tuple `(count_of_events, size_of_events_in_bytes)`. +/// Holds the tuple `(count_of_events, estimated_json_size_of_events)`. #[derive(Clone, Copy)] -pub struct CountByteSize(pub usize, pub usize); +pub struct CountByteSize(pub usize, pub JsonSize); // Wrapper types used to hold parameters for registering events diff --git a/lib/vector-common/src/json_size.rs b/lib/vector-common/src/json_size.rs new file mode 100644 index 0000000000000..746b6335716d1 --- /dev/null +++ b/lib/vector-common/src/json_size.rs @@ -0,0 +1,105 @@ +use std::{ + fmt, + iter::Sum, + ops::{Add, AddAssign, Sub}, +}; + +/// A newtype for the JSON size of an event. +/// Used to emit the `component_received_event_bytes_total` and +/// `component_sent_event_bytes_total` metrics. +#[derive(Clone, Copy, Default, Debug, PartialEq, Eq, PartialOrd, Ord)] +pub struct JsonSize(usize); + +impl fmt::Display for JsonSize { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} + +impl Sub for JsonSize { + type Output = JsonSize; + + #[inline] + fn sub(mut self, rhs: Self) -> Self::Output { + self.0 -= rhs.0; + self + } +} + +impl Add for JsonSize { + type Output = JsonSize; + + #[inline] + fn add(mut self, rhs: Self) -> Self::Output { + self.0 += rhs.0; + self + } +} + +impl AddAssign for JsonSize { + #[inline] + fn add_assign(&mut self, rhs: Self) { + self.0 += rhs.0; + } +} + +impl Sum for JsonSize { + #[inline] + fn sum>(iter: I) -> Self { + let mut accum = 0; + for val in iter { + accum += val.get(); + } + + JsonSize::new(accum) + } +} + +impl From for JsonSize { + #[inline] + fn from(value: usize) -> Self { + Self(value) + } +} + +impl JsonSize { + /// Create a new instance with the specified size. + #[must_use] + #[inline] + pub const fn new(size: usize) -> Self { + Self(size) + } + + /// Create a new instance with size 0. + #[must_use] + #[inline] + pub const fn zero() -> Self { + Self(0) + } + + /// Returns the contained size. + #[must_use] + #[inline] + pub fn get(&self) -> usize { + self.0 + } +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] +#[allow(clippy::module_name_repetitions)] +pub struct NonZeroJsonSize(JsonSize); + +impl NonZeroJsonSize { + #[must_use] + #[inline] + pub fn new(size: JsonSize) -> Option { + (size.0 > 0).then_some(NonZeroJsonSize(size)) + } +} + +impl From for JsonSize { + #[inline] + fn from(value: NonZeroJsonSize) -> Self { + value.0 + } +} diff --git a/lib/vector-common/src/lib.rs b/lib/vector-common/src/lib.rs index 832eaf0d5cdc5..d7d591323e07e 100644 --- a/lib/vector-common/src/lib.rs +++ b/lib/vector-common/src/lib.rs @@ -18,6 +18,8 @@ pub use vrl::btreemap; #[cfg(feature = "byte_size_of")] pub mod byte_size_of; +pub mod json_size; + pub mod config; #[cfg(feature = "conversion")] diff --git a/lib/vector-common/src/request_metadata.rs b/lib/vector-common/src/request_metadata.rs index be68c319dcadf..cce6124361b60 100644 --- a/lib/vector-common/src/request_metadata.rs +++ b/lib/vector-common/src/request_metadata.rs @@ -1,5 +1,7 @@ use std::ops::Add; +use crate::json_size::JsonSize; + /// Metadata for batch requests. #[derive(Clone, Copy, Debug, Default)] pub struct RequestMetadata { @@ -8,7 +10,7 @@ pub struct RequestMetadata { /// Size, in bytes, of the in-memory representation of all events in this batch request. events_byte_size: usize, /// Size, in bytes, of the estimated JSON-encoded representation of all events in this batch request. - events_estimated_json_encoded_byte_size: usize, + events_estimated_json_encoded_byte_size: JsonSize, /// Uncompressed size, in bytes, of the encoded events in this batch request. request_encoded_size: usize, /// On-the-wire size, in bytes, of the batch request itself after compression, etc. @@ -25,7 +27,7 @@ impl RequestMetadata { events_byte_size: usize, request_encoded_size: usize, request_wire_size: usize, - events_estimated_json_encoded_byte_size: usize, + events_estimated_json_encoded_byte_size: JsonSize, ) -> Self { Self { event_count, @@ -47,7 +49,7 @@ impl RequestMetadata { } #[must_use] - pub const fn events_estimated_json_encoded_byte_size(&self) -> usize { + pub const fn events_estimated_json_encoded_byte_size(&self) -> JsonSize { self.events_estimated_json_encoded_byte_size } @@ -64,7 +66,7 @@ impl RequestMetadata { /// Constructs a `RequestMetadata` by summation of the "batch" of `RequestMetadata` provided. #[must_use] pub fn from_batch>(metadata_iter: T) -> Self { - let mut metadata_sum = RequestMetadata::new(0, 0, 0, 0, 0); + let mut metadata_sum = RequestMetadata::new(0, 0, 0, 0, JsonSize::zero()); for metadata in metadata_iter { metadata_sum = metadata_sum + &metadata; diff --git a/lib/vector-core/src/event/array.rs b/lib/vector-core/src/event/array.rs index bc30573147d45..da87a6f6a8074 100644 --- a/lib/vector-core/src/event/array.rs +++ b/lib/vector-core/src/event/array.rs @@ -8,7 +8,10 @@ use futures::{stream, Stream}; #[cfg(test)] use quickcheck::{Arbitrary, Gen}; use vector_buffers::EventCount; -use vector_common::finalization::{AddBatchNotifier, BatchNotifier, EventFinalizers, Finalizable}; +use vector_common::{ + finalization::{AddBatchNotifier, BatchNotifier, EventFinalizers, Finalizable}, + json_size::JsonSize, +}; use super::{ EstimatedJsonEncodedSizeOf, Event, EventDataEq, EventFinalizer, EventMutRef, EventRef, @@ -253,7 +256,7 @@ impl ByteSizeOf for EventArray { } impl EstimatedJsonEncodedSizeOf for EventArray { - fn estimated_json_encoded_size_of(&self) -> usize { + fn estimated_json_encoded_size_of(&self) -> JsonSize { match self { Self::Logs(v) => v.estimated_json_encoded_size_of(), Self::Traces(v) => v.estimated_json_encoded_size_of(), diff --git a/lib/vector-core/src/event/estimated_json_encoded_size_of.rs b/lib/vector-core/src/event/estimated_json_encoded_size_of.rs index 8ee523d13f5cd..b671c8a817919 100644 --- a/lib/vector-core/src/event/estimated_json_encoded_size_of.rs +++ b/lib/vector-core/src/event/estimated_json_encoded_size_of.rs @@ -4,11 +4,12 @@ use bytes::Bytes; use chrono::{DateTime, Timelike, Utc}; use ordered_float::NotNan; use smallvec::SmallVec; +use vector_common::json_size::JsonSize; use vrl::value::Value; -const NULL_SIZE: usize = 4; -const TRUE_SIZE: usize = 4; -const FALSE_SIZE: usize = 5; +const NULL_SIZE: JsonSize = JsonSize::new(4); +const TRUE_SIZE: JsonSize = JsonSize::new(4); +const FALSE_SIZE: JsonSize = JsonSize::new(5); const BRACKETS_SIZE: usize = 2; const BRACES_SIZE: usize = 2; @@ -40,17 +41,17 @@ const EPOCH_RFC3339_9: &str = "1970-01-01T00:00:00.000000000Z"; /// /// Ideally, no allocations should take place in any implementation of this function. pub trait EstimatedJsonEncodedSizeOf { - fn estimated_json_encoded_size_of(&self) -> usize; + fn estimated_json_encoded_size_of(&self) -> JsonSize; } impl EstimatedJsonEncodedSizeOf for &T { - fn estimated_json_encoded_size_of(&self) -> usize { + fn estimated_json_encoded_size_of(&self) -> JsonSize { T::estimated_json_encoded_size_of(self) } } impl EstimatedJsonEncodedSizeOf for Option { - fn estimated_json_encoded_size_of(&self) -> usize { + fn estimated_json_encoded_size_of(&self) -> JsonSize { match self { Some(v) => v.estimated_json_encoded_size_of(), None => NULL_SIZE, @@ -61,13 +62,13 @@ impl EstimatedJsonEncodedSizeOf for Option { impl EstimatedJsonEncodedSizeOf for SmallVec<[T; N]> { - fn estimated_json_encoded_size_of(&self) -> usize { + fn estimated_json_encoded_size_of(&self) -> JsonSize { self.iter().map(T::estimated_json_encoded_size_of).sum() } } impl EstimatedJsonEncodedSizeOf for Value { - fn estimated_json_encoded_size_of(&self) -> usize { + fn estimated_json_encoded_size_of(&self) -> JsonSize { match self { Value::Timestamp(v) => v.estimated_json_encoded_size_of(), Value::Object(v) => v.estimated_json_encoded_size_of(), @@ -88,25 +89,25 @@ impl EstimatedJsonEncodedSizeOf for Value { /// This is the main reason why `EstimatedJsonEncodedSizeOf` is named as is, as most other types can /// be calculated exactly without a noticable performance penalty. impl EstimatedJsonEncodedSizeOf for str { - fn estimated_json_encoded_size_of(&self) -> usize { - QUOTES_SIZE + self.len() + fn estimated_json_encoded_size_of(&self) -> JsonSize { + JsonSize::new(QUOTES_SIZE + self.len()) } } impl EstimatedJsonEncodedSizeOf for String { - fn estimated_json_encoded_size_of(&self) -> usize { + fn estimated_json_encoded_size_of(&self) -> JsonSize { self.as_str().estimated_json_encoded_size_of() } } impl EstimatedJsonEncodedSizeOf for Bytes { - fn estimated_json_encoded_size_of(&self) -> usize { - QUOTES_SIZE + self.len() + fn estimated_json_encoded_size_of(&self) -> JsonSize { + JsonSize::new(QUOTES_SIZE + self.len()) } } impl EstimatedJsonEncodedSizeOf for bool { - fn estimated_json_encoded_size_of(&self) -> usize { + fn estimated_json_encoded_size_of(&self) -> JsonSize { if *self { TRUE_SIZE } else { @@ -116,19 +117,19 @@ impl EstimatedJsonEncodedSizeOf for bool { } impl EstimatedJsonEncodedSizeOf for f64 { - fn estimated_json_encoded_size_of(&self) -> usize { - ryu::Buffer::new().format_finite(*self).len() + fn estimated_json_encoded_size_of(&self) -> JsonSize { + ryu::Buffer::new().format_finite(*self).len().into() } } impl EstimatedJsonEncodedSizeOf for f32 { - fn estimated_json_encoded_size_of(&self) -> usize { - ryu::Buffer::new().format_finite(*self).len() + fn estimated_json_encoded_size_of(&self) -> JsonSize { + ryu::Buffer::new().format_finite(*self).len().into() } } impl EstimatedJsonEncodedSizeOf for NotNan { - fn estimated_json_encoded_size_of(&self) -> usize { + fn estimated_json_encoded_size_of(&self) -> JsonSize { self.into_inner().estimated_json_encoded_size_of() } } @@ -140,19 +141,19 @@ where K: AsRef, V: EstimatedJsonEncodedSizeOf, { - fn estimated_json_encoded_size_of(&self) -> usize { + fn estimated_json_encoded_size_of(&self) -> JsonSize { let size = self.iter().fold(BRACES_SIZE, |acc, (k, v)| { - acc + k.as_ref().estimated_json_encoded_size_of() + acc + k.as_ref().estimated_json_encoded_size_of().get() + COLON_SIZE - + v.estimated_json_encoded_size_of() + + v.estimated_json_encoded_size_of().get() + COMMA_SIZE }); - if size > BRACES_SIZE { + JsonSize::new(if size > BRACES_SIZE { size - COMMA_SIZE } else { size - } + }) } } @@ -164,19 +165,19 @@ where V: EstimatedJsonEncodedSizeOf, S: ::std::hash::BuildHasher, { - fn estimated_json_encoded_size_of(&self) -> usize { + fn estimated_json_encoded_size_of(&self) -> JsonSize { let size = self.iter().fold(BRACES_SIZE, |acc, (k, v)| { - acc + k.as_ref().estimated_json_encoded_size_of() + acc + k.as_ref().estimated_json_encoded_size_of().get() + COLON_SIZE - + v.estimated_json_encoded_size_of() + + v.estimated_json_encoded_size_of().get() + COMMA_SIZE }); - if size > BRACES_SIZE { + JsonSize::new(if size > BRACES_SIZE { size - COMMA_SIZE } else { size - } + }) } } @@ -184,16 +185,16 @@ impl EstimatedJsonEncodedSizeOf for Vec where V: EstimatedJsonEncodedSizeOf, { - fn estimated_json_encoded_size_of(&self) -> usize { + fn estimated_json_encoded_size_of(&self) -> JsonSize { let size = self.iter().fold(BRACKETS_SIZE, |acc, v| { - acc + COMMA_SIZE + v.estimated_json_encoded_size_of() + acc + COMMA_SIZE + v.estimated_json_encoded_size_of().get() }); - if size > BRACKETS_SIZE { + JsonSize::new(if size > BRACKETS_SIZE { size - COMMA_SIZE } else { size - } + }) } } @@ -205,7 +206,7 @@ impl EstimatedJsonEncodedSizeOf for DateTime { /// /// - `chrono::SecondsFormat::AutoSi` is used to calculate nanoseconds precision. /// - `use_z` is `true` for the `chrono::DateTime#to_rfc3339_opts` function call. - fn estimated_json_encoded_size_of(&self) -> usize { + fn estimated_json_encoded_size_of(&self) -> JsonSize { let ns = self.nanosecond() % 1_000_000_000; let epoch = if ns == 0 { EPOCH_RFC3339_0 @@ -217,202 +218,218 @@ impl EstimatedJsonEncodedSizeOf for DateTime { EPOCH_RFC3339_9 }; - QUOTES_SIZE + epoch.len() + JsonSize::new(QUOTES_SIZE + epoch.len()) } } impl EstimatedJsonEncodedSizeOf for u8 { #[rustfmt::skip] - fn estimated_json_encoded_size_of(&self) -> usize { + fn estimated_json_encoded_size_of(&self) -> JsonSize { let v = *self; // 0 ..= 255 - if v < 10 { 1 - } else if v < 100 { 2 - } else { 3 } + JsonSize::new( + if v < 10 { 1 + } else if v < 100 { 2 + } else { 3 } + ) } } impl EstimatedJsonEncodedSizeOf for i8 { #[rustfmt::skip] - fn estimated_json_encoded_size_of(&self) -> usize { + fn estimated_json_encoded_size_of(&self) -> JsonSize { let v = *self; // -128 ..= 127 - if v < -99 { 4 - } else if v < -9 { 3 - } else if v < 0 { 2 - } else if v < 10 { 1 - } else if v < 100 { 2 - } else { 3 } + JsonSize::new( + if v < -99 { 4 + } else if v < -9 { 3 + } else if v < 0 { 2 + } else if v < 10 { 1 + } else if v < 100 { 2 + } else { 3 } + ) } } impl EstimatedJsonEncodedSizeOf for u16 { #[rustfmt::skip] - fn estimated_json_encoded_size_of(&self) -> usize { + fn estimated_json_encoded_size_of(&self) -> JsonSize { let v = *self; // 0 ..= 65_535 - if v < 10 { 1 - } else if v < 100 { 2 - } else if v < 1_000 { 3 - } else if v < 10_000 { 4 - } else { 5 } + JsonSize::new( + if v < 10 { 1 + } else if v < 100 { 2 + } else if v < 1_000 { 3 + } else if v < 10_000 { 4 + } else { 5 } + ) } } impl EstimatedJsonEncodedSizeOf for i16 { #[rustfmt::skip] - fn estimated_json_encoded_size_of(&self) -> usize { + fn estimated_json_encoded_size_of(&self) -> JsonSize { let v = *self; // -32_768 ..= 32_767 - if v < -9_999 { 6 - } else if v < -999 { 5 - } else if v < -99 { 4 - } else if v < -9 { 3 - } else if v < 0 { 2 - } else if v < 10 { 1 - } else if v < 100 { 2 - } else if v < 1_000 { 3 - } else if v < 10_000 { 4 - } else { 5 } + JsonSize::new( + if v < -9_999 { 6 + } else if v < -999 { 5 + } else if v < -99 { 4 + } else if v < -9 { 3 + } else if v < 0 { 2 + } else if v < 10 { 1 + } else if v < 100 { 2 + } else if v < 1_000 { 3 + } else if v < 10_000 { 4 + } else { 5 } + ) } } impl EstimatedJsonEncodedSizeOf for u32 { #[rustfmt::skip] - fn estimated_json_encoded_size_of(&self) -> usize { + fn estimated_json_encoded_size_of(&self) -> JsonSize { let v = *self; // 0 ..= 4_294_967_295 - if v < 10 { 1 - } else if v < 100 { 2 - } else if v < 1_000 { 3 - } else if v < 10_000 { 4 - } else if v < 100_000 { 5 - } else if v < 1_000_000 { 6 - } else if v < 10_000_000 { 7 - } else if v < 100_000_000 { 8 - } else if v < 1_000_000_000 { 9 - } else { 10 } + JsonSize::new( + if v < 10 { 1 + } else if v < 100 { 2 + } else if v < 1_000 { 3 + } else if v < 10_000 { 4 + } else if v < 100_000 { 5 + } else if v < 1_000_000 { 6 + } else if v < 10_000_000 { 7 + } else if v < 100_000_000 { 8 + } else if v < 1_000_000_000 { 9 + } else { 10 } + ) } } impl EstimatedJsonEncodedSizeOf for i32 { #[rustfmt::skip] - fn estimated_json_encoded_size_of(&self) -> usize { + fn estimated_json_encoded_size_of(&self) -> JsonSize { let v = *self; // -2_147_483_648 ..= 2_147_483_647 - if v < -999_999_999 { 11 - } else if v < -99_999_999 { 10 - } else if v < -9_999_999 { 9 - } else if v < -999_999 { 8 - } else if v < -99_999 { 7 - } else if v < -9_999 { 6 - } else if v < -999 { 5 - } else if v < -99 { 4 - } else if v < -9 { 3 - } else if v < 0 { 2 - } else if v < 10 { 1 - } else if v < 100 { 2 - } else if v < 1_000 { 3 - } else if v < 10_000 { 4 - } else if v < 100_000 { 5 - } else if v < 1_000_000 { 6 - } else if v < 10_000_000 { 7 - } else if v < 100_000_000 { 8 - } else if v < 1_000_000_000 { 9 - } else { 10 } + JsonSize::new( + if v < -999_999_999 { 11 + } else if v < -99_999_999 { 10 + } else if v < -9_999_999 { 9 + } else if v < -999_999 { 8 + } else if v < -99_999 { 7 + } else if v < -9_999 { 6 + } else if v < -999 { 5 + } else if v < -99 { 4 + } else if v < -9 { 3 + } else if v < 0 { 2 + } else if v < 10 { 1 + } else if v < 100 { 2 + } else if v < 1_000 { 3 + } else if v < 10_000 { 4 + } else if v < 100_000 { 5 + } else if v < 1_000_000 { 6 + } else if v < 10_000_000 { 7 + } else if v < 100_000_000 { 8 + } else if v < 1_000_000_000 { 9 + } else { 10 } + ) } } impl EstimatedJsonEncodedSizeOf for u64 { #[rustfmt::skip] - fn estimated_json_encoded_size_of(&self) -> usize { + fn estimated_json_encoded_size_of(&self) -> JsonSize { let v = *self; // 0 ..= 18_446_744_073_709_551_615 - if v < 10 { 1 - } else if v < 100 { 2 - } else if v < 1_000 { 3 - } else if v < 10_000 { 4 - } else if v < 100_000 { 5 - } else if v < 1_000_000 { 6 - } else if v < 10_000_000 { 7 - } else if v < 100_000_000 { 8 - } else if v < 1_000_000_000 { 9 - } else if v < 10_000_000_000 { 10 - } else if v < 100_000_000_000 { 11 - } else if v < 1_000_000_000_000 { 12 - } else if v < 10_000_000_000_000 { 13 - } else if v < 100_000_000_000_000 { 14 - } else if v < 1_000_000_000_000_000 { 15 - } else if v < 10_000_000_000_000_000 { 16 - } else if v < 100_000_000_000_000_000 { 17 - } else if v < 1_000_000_000_000_000_000 { 18 - } else if v < 10_000_000_000_000_000_000 { 19 - } else { 20 } + JsonSize::new( + if v < 10 { 1 + } else if v < 100 { 2 + } else if v < 1_000 { 3 + } else if v < 10_000 { 4 + } else if v < 100_000 { 5 + } else if v < 1_000_000 { 6 + } else if v < 10_000_000 { 7 + } else if v < 100_000_000 { 8 + } else if v < 1_000_000_000 { 9 + } else if v < 10_000_000_000 { 10 + } else if v < 100_000_000_000 { 11 + } else if v < 1_000_000_000_000 { 12 + } else if v < 10_000_000_000_000 { 13 + } else if v < 100_000_000_000_000 { 14 + } else if v < 1_000_000_000_000_000 { 15 + } else if v < 10_000_000_000_000_000 { 16 + } else if v < 100_000_000_000_000_000 { 17 + } else if v < 1_000_000_000_000_000_000 { 18 + } else if v < 10_000_000_000_000_000_000 { 19 + } else { 20 } + ) } } impl EstimatedJsonEncodedSizeOf for i64 { #[rustfmt::skip] - fn estimated_json_encoded_size_of(&self) -> usize { + fn estimated_json_encoded_size_of(&self) -> JsonSize { let v = *self; // -9_223_372_036_854_775_808 ..= 9_223_372_036_854_775_807 - if v < -999_999_999_999_999_999 { 20 - } else if v < -99_999_999_999_999_999 { 19 - } else if v < -9_999_999_999_999_999 { 18 - } else if v < -999_999_999_999_999 { 17 - } else if v < -99_999_999_999_999 { 16 - } else if v < -9_999_999_999_999 { 15 - } else if v < -999_999_999_999 { 14 - } else if v < -99_999_999_999 { 13 - } else if v < -9_999_999_999 { 12 - } else if v < -999_999_999 { 11 - } else if v < -99_999_999 { 10 - } else if v < -9_999_999 { 9 - } else if v < -999_999 { 8 - } else if v < -99_999 { 7 - } else if v < -9_999 { 6 - } else if v < -999 { 5 - } else if v < -99 { 4 - } else if v < -9 { 3 - } else if v < 0 { 2 - } else if v < 10 { 1 - } else if v < 100 { 2 - } else if v < 1_000 { 3 - } else if v < 10_000 { 4 - } else if v < 100_000 { 5 - } else if v < 1_000_000 { 6 - } else if v < 10_000_000 { 7 - } else if v < 100_000_000 { 8 - } else if v < 1_000_000_000 { 9 - } else if v < 10_000_000_000 { 10 - } else if v < 100_000_000_000 { 11 - } else if v < 1_000_000_000_000 { 12 - } else if v < 10_000_000_000_000 { 13 - } else if v < 100_000_000_000_000 { 14 - } else if v < 1_000_000_000_000_000 { 15 - } else if v < 10_000_000_000_000_000 { 16 - } else if v < 100_000_000_000_000_000 { 17 - } else if v < 1_000_000_000_000_000_000 { 18 - } else { 19 } + JsonSize::new( + if v < -999_999_999_999_999_999 { 20 + } else if v < -99_999_999_999_999_999 { 19 + } else if v < -9_999_999_999_999_999 { 18 + } else if v < -999_999_999_999_999 { 17 + } else if v < -99_999_999_999_999 { 16 + } else if v < -9_999_999_999_999 { 15 + } else if v < -999_999_999_999 { 14 + } else if v < -99_999_999_999 { 13 + } else if v < -9_999_999_999 { 12 + } else if v < -999_999_999 { 11 + } else if v < -99_999_999 { 10 + } else if v < -9_999_999 { 9 + } else if v < -999_999 { 8 + } else if v < -99_999 { 7 + } else if v < -9_999 { 6 + } else if v < -999 { 5 + } else if v < -99 { 4 + } else if v < -9 { 3 + } else if v < 0 { 2 + } else if v < 10 { 1 + } else if v < 100 { 2 + } else if v < 1_000 { 3 + } else if v < 10_000 { 4 + } else if v < 100_000 { 5 + } else if v < 1_000_000 { 6 + } else if v < 10_000_000 { 7 + } else if v < 100_000_000 { 8 + } else if v < 1_000_000_000 { 9 + } else if v < 10_000_000_000 { 10 + } else if v < 100_000_000_000 { 11 + } else if v < 1_000_000_000_000 { 12 + } else if v < 10_000_000_000_000 { 13 + } else if v < 100_000_000_000_000 { 14 + } else if v < 1_000_000_000_000_000 { 15 + } else if v < 10_000_000_000_000_000 { 16 + } else if v < 100_000_000_000_000_000 { 17 + } else if v < 1_000_000_000_000_000_000 { 18 + } else { 19 } + ) } } impl EstimatedJsonEncodedSizeOf for usize { - fn estimated_json_encoded_size_of(&self) -> usize { + fn estimated_json_encoded_size_of(&self) -> JsonSize { (*self as u64).estimated_json_encoded_size_of() } } impl EstimatedJsonEncodedSizeOf for isize { - fn estimated_json_encoded_size_of(&self) -> usize { + fn estimated_json_encoded_size_of(&self) -> JsonSize { (*self as i64).estimated_json_encoded_size_of() } } @@ -453,7 +470,7 @@ mod tests { let got = v.estimated_json_encoded_size_of(); let want = serde_json::to_string(&v).unwrap(); - got == want.len() + got == want.len().into() } #[quickcheck] @@ -461,7 +478,7 @@ mod tests { let got = v.estimated_json_encoded_size_of(); let want = serde_json::to_string(&v).unwrap(); - got == want.len() + got == want.len().into() } #[quickcheck] @@ -469,7 +486,7 @@ mod tests { let got = v.estimated_json_encoded_size_of(); let want = serde_json::to_string(&v).unwrap(); - got == want.len() + got == want.len().into() } #[quickcheck] @@ -477,7 +494,7 @@ mod tests { let got = v.estimated_json_encoded_size_of(); let want = serde_json::to_string(&v).unwrap(); - got == want.len() + got == want.len().into() } #[quickcheck] @@ -485,7 +502,7 @@ mod tests { let got = v.estimated_json_encoded_size_of(); let want = serde_json::to_string(&v).unwrap(); - got == want.len() + got == want.len().into() } #[quickcheck] @@ -493,7 +510,7 @@ mod tests { let got = v.estimated_json_encoded_size_of(); let want = serde_json::to_string(&v).unwrap(); - got == want.len() + got == want.len().into() } #[quickcheck] @@ -501,7 +518,7 @@ mod tests { let got = v.estimated_json_encoded_size_of(); let want = serde_json::to_string(&v).unwrap(); - got == want.len() + got == want.len().into() } #[quickcheck] @@ -509,7 +526,7 @@ mod tests { let got = v.estimated_json_encoded_size_of(); let want = serde_json::to_string(&v).unwrap(); - got == want.len() + got == want.len().into() } #[quickcheck] @@ -517,7 +534,7 @@ mod tests { let got = v.estimated_json_encoded_size_of(); let want = serde_json::to_string(&v).unwrap(); - got == want.len() + got == want.len().into() } #[quickcheck] @@ -525,7 +542,7 @@ mod tests { let got = v.estimated_json_encoded_size_of(); let want = serde_json::to_string(&v).unwrap(); - got == want.len() + got == want.len().into() } #[quickcheck] @@ -538,7 +555,7 @@ mod tests { let got = v.estimated_json_encoded_size_of(); let want = serde_json::to_string(&v).unwrap(); - got == want.len() + got == want.len().into() } #[quickcheck] @@ -551,7 +568,7 @@ mod tests { let got = v.estimated_json_encoded_size_of(); let want = serde_json::to_string(&v).unwrap(); - got == want.len() + got == want.len().into() } #[quickcheck] @@ -563,7 +580,7 @@ mod tests { let got = v.estimated_json_encoded_size_of(); let want = serde_json::to_string(&v).unwrap(); - TestResult::from_bool(got == want.len()) + TestResult::from_bool(got == want.len().into()) } #[quickcheck] @@ -575,7 +592,7 @@ mod tests { let got = v.estimated_json_encoded_size_of(); let want = serde_json::to_string(&v).unwrap(); - TestResult::from_bool(got == want.len()) + TestResult::from_bool(got == want.len().into()) } #[quickcheck] @@ -583,7 +600,7 @@ mod tests { let got = v.estimated_json_encoded_size_of(); let want = serde_json::to_string(&v).unwrap(); - got == want.len() + got == want.len().into() } #[quickcheck] @@ -591,7 +608,7 @@ mod tests { let got = v.estimated_json_encoded_size_of(); let want = serde_json::to_string(&v).unwrap(); - got == want.len() + got == want.len().into() } #[quickcheck] @@ -599,7 +616,7 @@ mod tests { let got = v.estimated_json_encoded_size_of(); let want = serde_json::to_string(&v).unwrap(); - TestResult::from_bool(got == want.len()) + TestResult::from_bool(got == want.len().into()) } #[quickcheck] @@ -611,7 +628,7 @@ mod tests { let got = v.estimated_json_encoded_size_of(); let want = serde_json::to_string(&v).unwrap(); - TestResult::from_bool(got == want.len()) + TestResult::from_bool(got == want.len().into()) } fn is_inaccurately_counted_value(v: &Value) -> bool { diff --git a/lib/vector-core/src/event/log_event.rs b/lib/vector-core/src/event/log_event.rs index 353d0416fccaa..beb0afdec1e32 100644 --- a/lib/vector-core/src/event/log_event.rs +++ b/lib/vector-core/src/event/log_event.rs @@ -14,7 +14,10 @@ use crossbeam_utils::atomic::AtomicCell; use lookup::lookup_v2::TargetPath; use lookup::PathPrefix; use serde::{Deserialize, Serialize, Serializer}; -use vector_common::EventDataEq; +use vector_common::{ + json_size::{JsonSize, NonZeroJsonSize}, + EventDataEq, +}; use super::{ estimated_json_encoded_size_of::EstimatedJsonEncodedSizeOf, @@ -36,7 +39,7 @@ struct Inner { size_cache: AtomicCell>, #[serde(skip)] - json_encoded_size_cache: AtomicCell>, + json_encoded_size_cache: AtomicCell>, } impl Inner { @@ -73,12 +76,12 @@ impl ByteSizeOf for Inner { } impl EstimatedJsonEncodedSizeOf for Inner { - fn estimated_json_encoded_size_of(&self) -> usize { + fn estimated_json_encoded_size_of(&self) -> JsonSize { self.json_encoded_size_cache .load() .unwrap_or_else(|| { let size = self.fields.estimated_json_encoded_size_of(); - let size = NonZeroUsize::new(size).expect("Size cannot be zero"); + let size = NonZeroJsonSize::new(size).expect("Size cannot be zero"); self.json_encoded_size_cache.store(Some(size)); size @@ -204,7 +207,7 @@ impl Finalizable for LogEvent { } impl EstimatedJsonEncodedSizeOf for LogEvent { - fn estimated_json_encoded_size_of(&self) -> usize { + fn estimated_json_encoded_size_of(&self) -> JsonSize { self.inner.estimated_json_encoded_size_of() } } diff --git a/lib/vector-core/src/event/metric/mod.rs b/lib/vector-core/src/event/metric/mod.rs index 392e43a6cda74..141d3b28997d9 100644 --- a/lib/vector-core/src/event/metric/mod.rs +++ b/lib/vector-core/src/event/metric/mod.rs @@ -11,7 +11,7 @@ use std::{ }; use chrono::{DateTime, Utc}; -use vector_common::EventDataEq; +use vector_common::{json_size::JsonSize, EventDataEq}; use vector_config::configurable_component; use crate::{ @@ -463,10 +463,10 @@ impl ByteSizeOf for Metric { } impl EstimatedJsonEncodedSizeOf for Metric { - fn estimated_json_encoded_size_of(&self) -> usize { + fn estimated_json_encoded_size_of(&self) -> JsonSize { // TODO: For now we're using the in-memory representation of the metric, but we'll convert // this to actually calculate the JSON encoded size in the near future. - self.size_of() + self.size_of().into() } } diff --git a/lib/vector-core/src/event/mod.rs b/lib/vector-core/src/event/mod.rs index de9e01ec4c109..04522793e3436 100644 --- a/lib/vector-core/src/event/mod.rs +++ b/lib/vector-core/src/event/mod.rs @@ -19,7 +19,7 @@ pub use r#ref::{EventMutRef, EventRef}; use serde::{Deserialize, Serialize}; pub use trace::TraceEvent; use vector_buffers::EventCount; -use vector_common::{finalization, EventDataEq}; +use vector_common::{finalization, json_size::JsonSize, EventDataEq}; pub use vrl::value::Value; #[cfg(feature = "vrl")] pub use vrl_target::{TargetEvents, VrlTarget}; @@ -65,7 +65,7 @@ impl ByteSizeOf for Event { } impl EstimatedJsonEncodedSizeOf for Event { - fn estimated_json_encoded_size_of(&self) -> usize { + fn estimated_json_encoded_size_of(&self) -> JsonSize { match self { Event::Log(log_event) => log_event.estimated_json_encoded_size_of(), Event::Metric(metric_event) => metric_event.estimated_json_encoded_size_of(), diff --git a/lib/vector-core/src/event/trace.rs b/lib/vector-core/src/event/trace.rs index 8bc68f9880605..bd10a9e3aaca5 100644 --- a/lib/vector-core/src/event/trace.rs +++ b/lib/vector-core/src/event/trace.rs @@ -3,7 +3,7 @@ use std::{collections::BTreeMap, fmt::Debug}; use lookup::lookup_v2::TargetPath; use serde::{Deserialize, Serialize}; use vector_buffers::EventCount; -use vector_common::EventDataEq; +use vector_common::{json_size::JsonSize, EventDataEq}; use super::{ BatchNotifier, EstimatedJsonEncodedSizeOf, EventFinalizer, EventFinalizers, EventMetadata, @@ -109,7 +109,7 @@ impl ByteSizeOf for TraceEvent { } impl EstimatedJsonEncodedSizeOf for TraceEvent { - fn estimated_json_encoded_size_of(&self) -> usize { + fn estimated_json_encoded_size_of(&self) -> JsonSize { self.0.estimated_json_encoded_size_of() } } diff --git a/lib/vector-core/src/stream/driver.rs b/lib/vector-core/src/stream/driver.rs index 6376be29f5d32..093a7e0c4fad0 100644 --- a/lib/vector-core/src/stream/driver.rs +++ b/lib/vector-core/src/stream/driver.rs @@ -263,6 +263,7 @@ mod tests { use tower::Service; use vector_common::{ finalization::{BatchNotifier, EventFinalizer, EventFinalizers, EventStatus, Finalizable}, + json_size::JsonSize, request_metadata::RequestMetadata, }; use vector_common::{internal_event::CountByteSize, request_metadata::MetaDescriptive}; @@ -310,7 +311,7 @@ mod tests { } fn events_sent(&self) -> CountByteSize { - CountByteSize(1, 1) + CountByteSize(1, JsonSize::new(1)) } } diff --git a/lib/vector-core/src/transform/mod.rs b/lib/vector-core/src/transform/mod.rs index b506716a2cfd0..a60cd85c8200a 100644 --- a/lib/vector-core/src/transform/mod.rs +++ b/lib/vector-core/src/transform/mod.rs @@ -4,6 +4,7 @@ use futures::{Stream, StreamExt}; use vector_common::internal_event::{ self, register, CountByteSize, EventsSent, InternalEventHandle as _, Registered, DEFAULT_OUTPUT, }; +use vector_common::json_size::JsonSize; use vector_common::EventDataEq; use crate::{ @@ -247,7 +248,7 @@ impl TransformOutputs { if let Some(primary) = self.primary_output.as_mut() { let count = buf.primary_buffer.as_ref().map_or(0, OutputBuffer::len); let byte_size = buf.primary_buffer.as_ref().map_or( - 0, + JsonSize::new(0), EstimatedJsonEncodedSizeOf::estimated_json_encoded_size_of, ); buf.primary_buffer @@ -487,7 +488,7 @@ impl EventDataEq> for OutputBuffer { } impl EstimatedJsonEncodedSizeOf for OutputBuffer { - fn estimated_json_encoded_size_of(&self) -> usize { + fn estimated_json_encoded_size_of(&self) -> JsonSize { self.0 .iter() .map(EstimatedJsonEncodedSizeOf::estimated_json_encoded_size_of) diff --git a/src/components/validation/validators/component_spec/sources.rs b/src/components/validation/validators/component_spec/sources.rs index f3593116b795a..c25b217a399e4 100644 --- a/src/components/validation/validators/component_spec/sources.rs +++ b/src/components/validation/validators/component_spec/sources.rs @@ -1,6 +1,7 @@ use std::fmt::{Display, Formatter}; use bytes::BytesMut; +use vector_common::json_size::JsonSize; use vector_core::event::{Event, MetricKind}; use vector_core::EstimatedJsonEncodedSizeOf; @@ -163,7 +164,7 @@ fn validate_component_received_event_bytes_total( } } - let expected_bytes = inputs.iter().fold(0, |acc, i| { + let expected_bytes = inputs.iter().fold(JsonSize::new(0), |acc, i| { if let TestEvent::Passthrough(_) = i { let size = vec![i.clone().into_event()].estimated_json_encoded_size_of(); return acc + size; @@ -179,7 +180,7 @@ fn validate_component_received_event_bytes_total( expected_bytes, ); - if metric_bytes != expected_bytes as f64 { + if JsonSize::new(metric_bytes as usize) != expected_bytes { errs.push(format!( "{}: expected {} bytes, but received {}", SourceMetrics::EventsReceivedBytes, @@ -367,7 +368,7 @@ fn validate_component_sent_event_bytes_total( } } - let mut expected_bytes = 0; + let mut expected_bytes = JsonSize::zero(); for e in outputs { expected_bytes += vec![e].estimated_json_encoded_size_of(); } @@ -379,7 +380,7 @@ fn validate_component_sent_event_bytes_total( expected_bytes, ); - if metric_bytes != expected_bytes as f64 { + if JsonSize::new(metric_bytes as usize) != expected_bytes { errs.push(format!( "{}: expected {} bytes, but received {}.", SourceMetrics::SentEventBytesTotal, diff --git a/src/internal_events/apache_metrics.rs b/src/internal_events/apache_metrics.rs index 86b5bfbdfd61c..0e42463f960e8 100644 --- a/src/internal_events/apache_metrics.rs +++ b/src/internal_events/apache_metrics.rs @@ -1,6 +1,9 @@ use metrics::counter; -use vector_common::internal_event::{error_stage, error_type}; +use vector_common::{ + internal_event::{error_stage, error_type}, + json_size::JsonSize, +}; use vector_core::internal_event::InternalEvent; use super::prelude::http_error_code; @@ -8,7 +11,7 @@ use crate::sources::apache_metrics; #[derive(Debug)] pub struct ApacheMetricsEventsReceived<'a> { - pub byte_size: usize, + pub byte_size: JsonSize, pub count: usize, pub endpoint: &'a str, } @@ -22,7 +25,7 @@ impl<'a> InternalEvent for ApacheMetricsEventsReceived<'a> { "endpoint" => self.endpoint.to_owned(), ); counter!( - "component_received_event_bytes_total", self.byte_size as u64, + "component_received_event_bytes_total", self.byte_size.get() as u64, "endpoint" => self.endpoint.to_owned(), ); } diff --git a/src/internal_events/aws_ecs_metrics.rs b/src/internal_events/aws_ecs_metrics.rs index 92340ba484110..2ae8081697a66 100644 --- a/src/internal_events/aws_ecs_metrics.rs +++ b/src/internal_events/aws_ecs_metrics.rs @@ -1,14 +1,17 @@ use std::borrow::Cow; use metrics::counter; -use vector_common::internal_event::{error_stage, error_type}; +use vector_common::{ + internal_event::{error_stage, error_type}, + json_size::JsonSize, +}; use vector_core::internal_event::InternalEvent; use super::prelude::{http_error_code, hyper_error_code}; #[derive(Debug)] pub struct AwsEcsMetricsEventsReceived<'a> { - pub byte_size: usize, + pub byte_size: JsonSize, pub count: usize, pub endpoint: &'a str, } @@ -27,7 +30,7 @@ impl<'a> InternalEvent for AwsEcsMetricsEventsReceived<'a> { "endpoint" => self.endpoint.to_string(), ); counter!( - "component_received_event_bytes_total", self.byte_size as u64, + "component_received_event_bytes_total", self.byte_size.get() as u64, "endpoint" => self.endpoint.to_string(), ); } diff --git a/src/internal_events/docker_logs.rs b/src/internal_events/docker_logs.rs index d126b656a6773..0447624b01630 100644 --- a/src/internal_events/docker_logs.rs +++ b/src/internal_events/docker_logs.rs @@ -1,12 +1,15 @@ use bollard::errors::Error; use chrono::ParseError; use metrics::counter; -use vector_common::internal_event::{error_stage, error_type}; +use vector_common::{ + internal_event::{error_stage, error_type}, + json_size::JsonSize, +}; use vector_core::internal_event::InternalEvent; #[derive(Debug)] pub struct DockerLogsEventsReceived<'a> { - pub byte_size: usize, + pub byte_size: JsonSize, pub container_id: &'a str, pub container_name: &'a str, } @@ -24,7 +27,7 @@ impl InternalEvent for DockerLogsEventsReceived<'_> { "container_name" => self.container_name.to_owned() ); counter!( - "component_received_event_bytes_total", self.byte_size as u64, + "component_received_event_bytes_total", self.byte_size.get() as u64, "container_name" => self.container_name.to_owned() ); } diff --git a/src/internal_events/exec.rs b/src/internal_events/exec.rs index f171ca374fbb1..4fd3461a1f822 100644 --- a/src/internal_events/exec.rs +++ b/src/internal_events/exec.rs @@ -3,8 +3,9 @@ use std::time::Duration; use crate::emit; use metrics::{counter, histogram}; use tokio::time::error::Elapsed; -use vector_common::internal_event::{ - error_stage, error_type, ComponentEventsDropped, UNINTENTIONAL, +use vector_common::{ + internal_event::{error_stage, error_type, ComponentEventsDropped, UNINTENTIONAL}, + json_size::JsonSize, }; use vector_core::internal_event::InternalEvent; @@ -14,7 +15,7 @@ use super::prelude::io_error_code; pub struct ExecEventsReceived<'a> { pub count: usize, pub command: &'a str, - pub byte_size: usize, + pub byte_size: JsonSize, } impl InternalEvent for ExecEventsReceived<'_> { @@ -22,7 +23,7 @@ impl InternalEvent for ExecEventsReceived<'_> { trace!( message = "Events received.", count = self.count, - byte_size = self.byte_size, + byte_size = self.byte_size.get(), command = %self.command, ); counter!( @@ -30,7 +31,7 @@ impl InternalEvent for ExecEventsReceived<'_> { "command" => self.command.to_owned(), ); counter!( - "component_received_event_bytes_total", self.byte_size as u64, + "component_received_event_bytes_total", self.byte_size.get() as u64, "command" => self.command.to_owned(), ); } diff --git a/src/internal_events/file.rs b/src/internal_events/file.rs index 6e76be2f5d398..aedac3d74afb2 100644 --- a/src/internal_events/file.rs +++ b/src/internal_events/file.rs @@ -6,6 +6,7 @@ use crate::emit; #[cfg(any(feature = "sources-file", feature = "sources-kubernetes_logs"))] pub use self::source::*; + use vector_common::internal_event::{error_stage, error_type}; #[derive(Debug)] @@ -86,7 +87,10 @@ mod source { use super::{FileOpen, InternalEvent}; use crate::emit; - use vector_common::internal_event::{error_stage, error_type}; + use vector_common::{ + internal_event::{error_stage, error_type}, + json_size::JsonSize, + }; #[derive(Debug)] pub struct FileBytesReceived<'a> { @@ -114,7 +118,7 @@ mod source { pub struct FileEventsReceived<'a> { pub count: usize, pub file: &'a str, - pub byte_size: usize, + pub byte_size: JsonSize, } impl InternalEvent for FileEventsReceived<'_> { @@ -130,7 +134,7 @@ mod source { "file" => self.file.to_owned(), ); counter!( - "component_received_event_bytes_total", self.byte_size as u64, + "component_received_event_bytes_total", self.byte_size.get() as u64, "file" => self.file.to_owned(), ); } diff --git a/src/internal_events/http.rs b/src/internal_events/http.rs index a016998b7655b..5243cf47628f5 100644 --- a/src/internal_events/http.rs +++ b/src/internal_events/http.rs @@ -3,7 +3,10 @@ use std::error::Error; use metrics::{counter, histogram}; use vector_core::internal_event::InternalEvent; -use vector_common::internal_event::{error_stage, error_type}; +use vector_common::{ + internal_event::{error_stage, error_type}, + json_size::JsonSize, +}; #[derive(Debug)] pub struct HttpBytesReceived<'a> { @@ -31,7 +34,7 @@ impl InternalEvent for HttpBytesReceived<'_> { #[derive(Debug)] pub struct HttpEventsReceived<'a> { pub count: usize, - pub byte_size: usize, + pub byte_size: JsonSize, pub http_path: &'a str, pub protocol: &'static str, } @@ -54,7 +57,7 @@ impl InternalEvent for HttpEventsReceived<'_> { ); counter!( "component_received_event_bytes_total", - self.byte_size as u64, + self.byte_size.get() as u64, "http_path" => self.http_path.to_string(), "protocol" => self.protocol, ); diff --git a/src/internal_events/http_client_source.rs b/src/internal_events/http_client_source.rs index b5e7ec2d8b68a..b5eb27e8a3fd9 100644 --- a/src/internal_events/http_client_source.rs +++ b/src/internal_events/http_client_source.rs @@ -1,12 +1,15 @@ use metrics::counter; -use vector_common::internal_event::{error_stage, error_type}; +use vector_common::{ + internal_event::{error_stage, error_type}, + json_size::JsonSize, +}; use vector_core::internal_event::InternalEvent; use super::prelude::http_error_code; #[derive(Debug)] pub struct HttpClientEventsReceived { - pub byte_size: usize, + pub byte_size: JsonSize, pub count: usize, pub url: String, } @@ -24,7 +27,7 @@ impl InternalEvent for HttpClientEventsReceived { "uri" => self.url.clone(), ); counter!( - "component_received_event_bytes_total", self.byte_size as u64, + "component_received_event_bytes_total", self.byte_size.get() as u64, "uri" => self.url.clone(), ); } diff --git a/src/internal_events/internal_logs.rs b/src/internal_events/internal_logs.rs index f78df4e2d32fd..5d6637bfec986 100644 --- a/src/internal_events/internal_logs.rs +++ b/src/internal_events/internal_logs.rs @@ -1,4 +1,5 @@ use metrics::counter; +use vector_common::json_size::JsonSize; use vector_core::internal_event::InternalEvent; #[derive(Debug)] @@ -18,7 +19,7 @@ impl InternalEvent for InternalLogsBytesReceived { #[derive(Debug)] pub struct InternalLogsEventsReceived { - pub byte_size: usize, + pub byte_size: JsonSize, pub count: usize, } @@ -28,7 +29,7 @@ impl InternalEvent for InternalLogsEventsReceived { counter!("component_received_events_total", self.count as u64); counter!( "component_received_event_bytes_total", - self.byte_size as u64 + self.byte_size.get() as u64 ); } } diff --git a/src/internal_events/kafka.rs b/src/internal_events/kafka.rs index ab20ec0d47cde..57c25d905c60f 100644 --- a/src/internal_events/kafka.rs +++ b/src/internal_events/kafka.rs @@ -1,7 +1,10 @@ use metrics::{counter, gauge}; use vector_core::{internal_event::InternalEvent, update_counter}; -use vector_common::internal_event::{error_stage, error_type}; +use vector_common::{ + internal_event::{error_stage, error_type}, + json_size::JsonSize, +}; #[derive(Debug)] pub struct KafkaBytesReceived<'a> { @@ -32,7 +35,7 @@ impl<'a> InternalEvent for KafkaBytesReceived<'a> { #[derive(Debug)] pub struct KafkaEventsReceived<'a> { - pub byte_size: usize, + pub byte_size: JsonSize, pub count: usize, pub topic: &'a str, pub partition: i32, @@ -50,7 +53,7 @@ impl<'a> InternalEvent for KafkaEventsReceived<'a> { counter!("component_received_events_total", self.count as u64, "topic" => self.topic.to_string(), "partition" => self.partition.to_string()); counter!( "component_received_event_bytes_total", - self.byte_size as u64, + self.byte_size.get() as u64, "topic" => self.topic.to_string(), "partition" => self.partition.to_string(), ); diff --git a/src/internal_events/kubernetes_logs.rs b/src/internal_events/kubernetes_logs.rs index a008fdb4499c1..aff0109295078 100644 --- a/src/internal_events/kubernetes_logs.rs +++ b/src/internal_events/kubernetes_logs.rs @@ -3,14 +3,15 @@ use vector_core::internal_event::InternalEvent; use crate::emit; use crate::event::Event; -use vector_common::internal_event::{ - error_stage, error_type, ComponentEventsDropped, UNINTENTIONAL, +use vector_common::{ + internal_event::{error_stage, error_type, ComponentEventsDropped, UNINTENTIONAL}, + json_size::JsonSize, }; #[derive(Debug)] pub struct KubernetesLogsEventsReceived<'a> { pub file: &'a str, - pub byte_size: usize, + pub byte_size: JsonSize, pub pod_info: Option, } @@ -34,13 +35,13 @@ impl InternalEvent for KubernetesLogsEventsReceived<'_> { let pod_namespace = pod_info.namespace; counter!("component_received_events_total", 1, "pod_name" => pod_name.clone(), "pod_namespace" => pod_namespace.clone()); - counter!("component_received_event_bytes_total", self.byte_size as u64, "pod_name" => pod_name, "pod_namespace" => pod_namespace); + counter!("component_received_event_bytes_total", self.byte_size.get() as u64, "pod_name" => pod_name, "pod_namespace" => pod_namespace); } None => { counter!("component_received_events_total", 1); counter!( "component_received_event_bytes_total", - self.byte_size as u64 + self.byte_size.get() as u64 ); } } diff --git a/src/internal_events/mongodb_metrics.rs b/src/internal_events/mongodb_metrics.rs index eb585fbccf6de..1e749dc5ba8c2 100644 --- a/src/internal_events/mongodb_metrics.rs +++ b/src/internal_events/mongodb_metrics.rs @@ -2,12 +2,15 @@ use metrics::counter; use mongodb::{bson, error::Error as MongoError}; use vector_core::internal_event::InternalEvent; -use vector_common::internal_event::{error_stage, error_type}; +use vector_common::{ + internal_event::{error_stage, error_type}, + json_size::JsonSize, +}; #[derive(Debug)] pub struct MongoDbMetricsEventsReceived<'a> { pub count: usize, - pub byte_size: usize, + pub byte_size: JsonSize, pub endpoint: &'a str, } @@ -17,7 +20,7 @@ impl<'a> InternalEvent for MongoDbMetricsEventsReceived<'a> { trace!( message = "Events received.", count = self.count, - byte_size = self.byte_size, + byte_size = self.byte_size.get(), endpoint = self.endpoint, ); counter!( @@ -25,7 +28,7 @@ impl<'a> InternalEvent for MongoDbMetricsEventsReceived<'a> { "endpoint" => self.endpoint.to_owned(), ); counter!( - "component_received_event_bytes_total", self.byte_size as u64, + "component_received_event_bytes_total", self.byte_size.get() as u64, "endpoint" => self.endpoint.to_owned(), ); } diff --git a/src/internal_events/nginx_metrics.rs b/src/internal_events/nginx_metrics.rs index 46da39b6288f4..eb5adcf8485d1 100644 --- a/src/internal_events/nginx_metrics.rs +++ b/src/internal_events/nginx_metrics.rs @@ -2,11 +2,14 @@ use metrics::counter; use vector_core::internal_event::InternalEvent; use crate::sources::nginx_metrics::parser::ParseError; -use vector_common::internal_event::{error_stage, error_type}; +use vector_common::{ + internal_event::{error_stage, error_type}, + json_size::JsonSize, +}; #[derive(Debug)] pub struct NginxMetricsEventsReceived<'a> { - pub byte_size: usize, + pub byte_size: JsonSize, pub count: usize, pub endpoint: &'a str, } @@ -24,7 +27,7 @@ impl<'a> InternalEvent for NginxMetricsEventsReceived<'a> { "endpoint" => self.endpoint.to_owned(), ); counter!( - "component_received_event_bytes_total", self.byte_size as u64, + "component_received_event_bytes_total", self.byte_size.get() as u64, "endpoint" => self.endpoint.to_owned(), ); } diff --git a/src/internal_events/socket.rs b/src/internal_events/socket.rs index 58c0c3b69f4fb..daa03a27991b5 100644 --- a/src/internal_events/socket.rs +++ b/src/internal_events/socket.rs @@ -1,5 +1,8 @@ use metrics::counter; -use vector_common::internal_event::{error_stage, error_type}; +use vector_common::{ + internal_event::{error_stage, error_type}, + json_size::JsonSize, +}; use vector_core::internal_event::{ComponentEventsDropped, InternalEvent, UNINTENTIONAL}; use crate::emit; @@ -45,7 +48,7 @@ impl InternalEvent for SocketBytesReceived { #[derive(Debug)] pub struct SocketEventsReceived { pub mode: SocketMode, - pub byte_size: usize, + pub byte_size: JsonSize, pub count: usize, } @@ -55,11 +58,11 @@ impl InternalEvent for SocketEventsReceived { trace!( message = "Events received.", count = self.count, - byte_size = self.byte_size, + byte_size = self.byte_size.get(), %mode, ); counter!("component_received_events_total", self.count as u64, "mode" => mode); - counter!("component_received_event_bytes_total", self.byte_size as u64, "mode" => mode); + counter!("component_received_event_bytes_total", self.byte_size.get() as u64, "mode" => mode); } } @@ -88,14 +91,14 @@ impl InternalEvent for SocketBytesSent { pub struct SocketEventsSent { pub mode: SocketMode, pub count: u64, - pub byte_size: usize, + pub byte_size: JsonSize, } impl InternalEvent for SocketEventsSent { fn emit(self) { - trace!(message = "Events sent.", count = %self.count, byte_size = %self.byte_size); + trace!(message = "Events sent.", count = %self.count, byte_size = %self.byte_size.get()); counter!("component_sent_events_total", self.count, "mode" => self.mode.as_str()); - counter!("component_sent_event_bytes_total", self.byte_size as u64, "mode" => self.mode.as_str()); + counter!("component_sent_event_bytes_total", self.byte_size.get() as u64, "mode" => self.mode.as_str()); } } diff --git a/src/sinks/amqp/request_builder.rs b/src/sinks/amqp/request_builder.rs index 313b37626aec8..ad8fe36565453 100644 --- a/src/sinks/amqp/request_builder.rs +++ b/src/sinks/amqp/request_builder.rs @@ -13,8 +13,10 @@ use lapin::BasicProperties; use std::io; use vector_common::{ finalization::{EventFinalizers, Finalizable}, + json_size::JsonSize, request_metadata::RequestMetadata, }; +use vector_core::EstimatedJsonEncodedSizeOf; use super::{encoder::AmqpEncoder, service::AmqpRequest, sink::AmqpEvent}; @@ -23,6 +25,7 @@ pub(super) struct AmqpMetadata { routing_key: String, properties: BasicProperties, finalizers: EventFinalizers, + event_json_size: JsonSize, } /// Build the request to send to `AMQP` by using the encoder to convert it into @@ -59,6 +62,7 @@ impl RequestBuilder for AmqpRequestBuilder { routing_key: input.routing_key, properties: input.properties, finalizers: input.event.take_finalizers(), + event_json_size: input.event.estimated_json_encoded_size_of(), }; (metadata, builder, input.event) @@ -78,6 +82,7 @@ impl RequestBuilder for AmqpRequestBuilder { amqp_metadata.properties, amqp_metadata.finalizers, metadata, + amqp_metadata.event_json_size, ) } } diff --git a/src/sinks/amqp/service.rs b/src/sinks/amqp/service.rs index 000c753ec97e2..ff1e71487298a 100644 --- a/src/sinks/amqp/service.rs +++ b/src/sinks/amqp/service.rs @@ -13,6 +13,7 @@ use tower::Service; use vector_common::{ finalization::{EventFinalizers, EventStatus, Finalizable}, internal_event::CountByteSize, + json_size::JsonSize, request_metadata::{MetaDescriptive, RequestMetadata}, }; use vector_core::stream::DriverResponse; @@ -26,6 +27,7 @@ pub(super) struct AmqpRequest { properties: BasicProperties, finalizers: EventFinalizers, metadata: RequestMetadata, + event_json_size: JsonSize, } impl AmqpRequest { @@ -36,6 +38,7 @@ impl AmqpRequest { properties: BasicProperties, finalizers: EventFinalizers, metadata: RequestMetadata, + event_json_size: JsonSize, ) -> Self { Self { body, @@ -44,6 +47,7 @@ impl AmqpRequest { properties, finalizers, metadata, + event_json_size, } } } @@ -63,6 +67,7 @@ impl MetaDescriptive for AmqpRequest { /// A successful response from `AMQP`. pub(super) struct AmqpResponse { byte_size: usize, + json_size: JsonSize, } impl DriverResponse for AmqpResponse { @@ -71,7 +76,7 @@ impl DriverResponse for AmqpResponse { } fn events_sent(&self) -> CountByteSize { - CountByteSize(1, self.byte_size) + CountByteSize(1, self.json_size) } fn bytes_sent(&self) -> Option { @@ -128,14 +133,20 @@ impl Service for AmqpService { Ok(result) => match result.await { Ok(lapin::publisher_confirm::Confirmation::Nack(_)) => { warn!("Received Negative Acknowledgement from AMQP server."); - Ok(AmqpResponse { byte_size }) + Ok(AmqpResponse { + json_size: req.event_json_size, + byte_size, + }) } Err(error) => { // TODO: In due course the caller could emit these on error. emit!(AmqpAcknowledgementError { error: &error }); Err(AmqpError::AmqpAcknowledgementFailed { error }) } - Ok(_) => Ok(AmqpResponse { byte_size }), + Ok(_) => Ok(AmqpResponse { + json_size: req.event_json_size, + byte_size, + }), }, Err(error) => { // TODO: In due course the caller could emit these on error. diff --git a/src/sinks/amqp/sink.rs b/src/sinks/amqp/sink.rs index 333b325dbeab4..ff0bdeb9d0042 100644 --- a/src/sinks/amqp/sink.rs +++ b/src/sinks/amqp/sink.rs @@ -12,6 +12,7 @@ use serde::Serialize; use std::sync::Arc; use tower::ServiceBuilder; use vector_buffers::EventCount; +use vector_common::json_size::JsonSize; use vector_core::{sink::StreamSink, ByteSizeOf, EstimatedJsonEncodedSizeOf}; use super::{ @@ -49,7 +50,7 @@ impl ByteSizeOf for AmqpEvent { } impl EstimatedJsonEncodedSizeOf for AmqpEvent { - fn estimated_json_encoded_size_of(&self) -> usize { + fn estimated_json_encoded_size_of(&self) -> JsonSize { self.event.estimated_json_encoded_size_of() } } diff --git a/src/sinks/aws_cloudwatch_logs/service.rs b/src/sinks/aws_cloudwatch_logs/service.rs index 505de36f77e8d..93ed0b52252a2 100644 --- a/src/sinks/aws_cloudwatch_logs/service.rs +++ b/src/sinks/aws_cloudwatch_logs/service.rs @@ -22,7 +22,7 @@ use tower::{ timeout::Timeout, Service, ServiceBuilder, ServiceExt, }; -use vector_common::request_metadata::MetaDescriptive; +use vector_common::{json_size::JsonSize, request_metadata::MetaDescriptive}; use vector_core::{internal_event::CountByteSize, stream::DriverResponse}; use crate::{ @@ -99,7 +99,7 @@ impl From> for CloudwatchError { #[derive(Debug)] pub struct CloudwatchResponse { events_count: usize, - events_byte_size: usize, + events_byte_size: JsonSize, } impl crate::sinks::util::sink::Response for CloudwatchResponse { @@ -158,7 +158,7 @@ impl Service for CloudwatchLogsPartitionSvc { fn call(&mut self, req: BatchCloudwatchRequest) -> Self::Future { let events_count = req.get_metadata().event_count(); - let events_byte_size = req.get_metadata().events_byte_size(); + let events_byte_size = req.get_metadata().events_estimated_json_encoded_byte_size(); let key = req.key; let events = req diff --git a/src/sinks/aws_cloudwatch_metrics/mod.rs b/src/sinks/aws_cloudwatch_metrics/mod.rs index e9bb6fe3c792f..accc041b54c32 100644 --- a/src/sinks/aws_cloudwatch_metrics/mod.rs +++ b/src/sinks/aws_cloudwatch_metrics/mod.rs @@ -15,7 +15,7 @@ use futures_util::{future, future::BoxFuture}; use std::task::{Context, Poll}; use tower::Service; use vector_config::configurable_component; -use vector_core::{sink::VectorSink, EstimatedJsonEncodedSizeOf}; +use vector_core::{sink::VectorSink, ByteSizeOf, EstimatedJsonEncodedSizeOf}; use crate::{ aws::{ @@ -236,7 +236,8 @@ impl CloudWatchMetricsSvc { .sink_map_err(|error| error!(message = "Fatal CloudwatchMetrics sink error.", %error)) .with_flat_map(move |event: Event| { stream::iter({ - let byte_size = event.estimated_json_encoded_size_of(); + let byte_size = event.allocated_bytes(); + let json_byte_size = event.estimated_json_encoded_size_of(); normalizer.normalize(event.into_metric()).map(|mut metric| { let namespace = metric .take_namespace() @@ -245,6 +246,7 @@ impl CloudWatchMetricsSvc { Ok(EncodedEvent::new( PartitionInnerBuffer::new(metric, namespace), byte_size, + json_byte_size, )) }) }) diff --git a/src/sinks/aws_kinesis/service.rs b/src/sinks/aws_kinesis/service.rs index a1058806288c4..9ceeb8c8d4938 100644 --- a/src/sinks/aws_kinesis/service.rs +++ b/src/sinks/aws_kinesis/service.rs @@ -7,7 +7,7 @@ use aws_smithy_client::SdkError; use aws_types::region::Region; use futures::future::BoxFuture; use tower::Service; -use vector_common::request_metadata::MetaDescriptive; +use vector_common::{json_size::JsonSize, request_metadata::MetaDescriptive}; use vector_core::{internal_event::CountByteSize, stream::DriverResponse}; use super::{ @@ -41,7 +41,7 @@ where pub struct KinesisResponse { count: usize, - events_byte_size: usize, + events_byte_size: JsonSize, } impl DriverResponse for KinesisResponse { @@ -72,7 +72,9 @@ where // Emission of internal events for errors and dropped events is handled upstream by the caller. fn call(&mut self, requests: BatchKinesisRequest) -> Self::Future { - let events_byte_size = requests.get_metadata().events_byte_size(); + let events_byte_size = requests + .get_metadata() + .events_estimated_json_encoded_byte_size(); let count = requests.get_metadata().event_count(); let records = requests diff --git a/src/sinks/aws_sqs/request_builder.rs b/src/sinks/aws_sqs/request_builder.rs index 22e1340be1a1f..03d30f34f3737 100644 --- a/src/sinks/aws_sqs/request_builder.rs +++ b/src/sinks/aws_sqs/request_builder.rs @@ -130,7 +130,7 @@ pub(crate) struct SendMessageEntry { pub message_deduplication_id: Option, pub queue_url: String, finalizers: EventFinalizers, - metadata: RequestMetadata, + pub metadata: RequestMetadata, } impl ByteSizeOf for SendMessageEntry { diff --git a/src/sinks/aws_sqs/service.rs b/src/sinks/aws_sqs/service.rs index 15ffd155e0072..38b20fddfe21f 100644 --- a/src/sinks/aws_sqs/service.rs +++ b/src/sinks/aws_sqs/service.rs @@ -4,6 +4,7 @@ use aws_sdk_sqs::{error::SendMessageError, types::SdkError, Client as SqsClient} use futures::{future::BoxFuture, TryFutureExt}; use tower::Service; use tracing::Instrument; +use vector_common::json_size::JsonSize; use vector_core::{ event::EventStatus, internal_event::CountByteSize, stream::DriverResponse, ByteSizeOf, }; @@ -44,7 +45,10 @@ impl Service for SqsService { .set_message_deduplication_id(entry.message_deduplication_id) .queue_url(entry.queue_url) .send() - .map_ok(|_| SendMessageResponse { byte_size }) + .map_ok(|_| SendMessageResponse { + byte_size, + json_byte_size: entry.metadata.events_estimated_json_encoded_byte_size(), + }) .instrument(info_span!("request").or_current()) .await }) @@ -53,6 +57,7 @@ impl Service for SqsService { pub(crate) struct SendMessageResponse { byte_size: usize, + json_byte_size: JsonSize, } impl DriverResponse for SendMessageResponse { @@ -61,6 +66,10 @@ impl DriverResponse for SendMessageResponse { } fn events_sent(&self) -> CountByteSize { - CountByteSize(1, self.byte_size) + CountByteSize(1, self.json_byte_size) + } + + fn bytes_sent(&self) -> Option { + Some(self.byte_size) } } diff --git a/src/sinks/azure_blob/request_builder.rs b/src/sinks/azure_blob/request_builder.rs index f3e85cc5a8e7c..2ec67a7c758eb 100644 --- a/src/sinks/azure_blob/request_builder.rs +++ b/src/sinks/azure_blob/request_builder.rs @@ -3,7 +3,7 @@ use chrono::Utc; use codecs::encoding::Framer; use uuid::Uuid; use vector_common::request_metadata::RequestMetadata; -use vector_core::ByteSizeOf; +use vector_core::EstimatedJsonEncodedSizeOf; use crate::{ codecs::{Encoder, Transformer}, @@ -51,7 +51,7 @@ impl RequestBuilder<(String, Vec)> for AzureBlobRequestOptions { let azure_metadata = AzureBlobMetadata { partition_key, count: events.len(), - byte_size: events.size_of(), + byte_size: events.estimated_json_encoded_size_of(), finalizers, }; diff --git a/src/sinks/azure_common/config.rs b/src/sinks/azure_common/config.rs index 2b3b663d89753..56ed8d210f694 100644 --- a/src/sinks/azure_common/config.rs +++ b/src/sinks/azure_common/config.rs @@ -8,7 +8,10 @@ use bytes::Bytes; use futures::FutureExt; use http::StatusCode; use snafu::Snafu; -use vector_common::request_metadata::{MetaDescriptive, RequestMetadata}; +use vector_common::{ + json_size::JsonSize, + request_metadata::{MetaDescriptive, RequestMetadata}, +}; use vector_core::{internal_event::CountByteSize, stream::DriverResponse}; use crate::{ @@ -41,7 +44,7 @@ impl MetaDescriptive for AzureBlobRequest { pub struct AzureBlobMetadata { pub partition_key: String, pub count: usize, - pub byte_size: usize, + pub byte_size: JsonSize, pub finalizers: EventFinalizers, } @@ -62,7 +65,7 @@ impl RetryLogic for AzureBlobRetryLogic { pub struct AzureBlobResponse { pub inner: PutBlockBlobResponse, pub count: usize, - pub events_byte_size: usize, + pub events_byte_size: JsonSize, pub byte_size: usize, } diff --git a/src/sinks/blackhole/sink.rs b/src/sinks/blackhole/sink.rs index 92f90377931b1..09fb5f6353d94 100644 --- a/src/sinks/blackhole/sink.rs +++ b/src/sinks/blackhole/sink.rs @@ -92,10 +92,10 @@ impl StreamSink for BlackholeSink { _ = self.total_events.fetch_add(events.len(), Ordering::AcqRel); _ = self .total_raw_bytes - .fetch_add(message_len, Ordering::AcqRel); + .fetch_add(message_len.get(), Ordering::AcqRel); events_sent.emit(CountByteSize(events.len(), message_len)); - bytes_sent.emit(ByteSize(message_len)); + bytes_sent.emit(ByteSize(message_len.get())); } // Notify the reporting task to shutdown. diff --git a/src/sinks/databend/service.rs b/src/sinks/databend/service.rs index 23da26f560d41..473a2d3220ba3 100644 --- a/src/sinks/databend/service.rs +++ b/src/sinks/databend/service.rs @@ -85,7 +85,7 @@ impl DriverResponse for DatabendResponse { fn events_sent(&self) -> CountByteSize { CountByteSize( self.metadata.event_count(), - self.metadata.events_byte_size(), + self.metadata.events_estimated_json_encoded_byte_size(), ) } diff --git a/src/sinks/datadog/events/service.rs b/src/sinks/datadog/events/service.rs index 25669c45a9ff0..693929d62e961 100644 --- a/src/sinks/datadog/events/service.rs +++ b/src/sinks/datadog/events/service.rs @@ -8,7 +8,7 @@ use futures::{ use http::Request; use hyper::Body; use tower::{Service, ServiceExt}; -use vector_common::request_metadata::MetaDescriptive; +use vector_common::{json_size::JsonSize, request_metadata::MetaDescriptive}; use vector_core::{internal_event::CountByteSize, stream::DriverResponse}; use crate::{ @@ -23,7 +23,7 @@ use crate::{ pub struct DatadogEventsResponse { pub(self) event_status: EventStatus, pub http_status: http::StatusCode, - pub event_byte_size: usize, + pub event_byte_size: JsonSize, } impl DriverResponse for DatadogEventsResponse { @@ -90,7 +90,7 @@ impl Service for DatadogEventsService { Box::pin(async move { http_service.ready().await?; - let event_byte_size = req.get_metadata().events_byte_size(); + let event_byte_size = req.get_metadata().events_estimated_json_encoded_byte_size(); let http_response = http_service.call(req).await?; let event_status = if http_response.is_successful() { EventStatus::Delivered diff --git a/src/sinks/datadog/logs/service.rs b/src/sinks/datadog/logs/service.rs index 5401165df873f..06bc923ad3e36 100644 --- a/src/sinks/datadog/logs/service.rs +++ b/src/sinks/datadog/logs/service.rs @@ -14,7 +14,10 @@ use hyper::Body; use indexmap::IndexMap; use tower::Service; use tracing::Instrument; -use vector_common::request_metadata::{MetaDescriptive, RequestMetadata}; +use vector_common::{ + json_size::JsonSize, + request_metadata::{MetaDescriptive, RequestMetadata}, +}; use vector_core::{ event::{EventFinalizers, EventStatus, Finalizable}, internal_event::CountByteSize, @@ -65,7 +68,7 @@ impl MetaDescriptive for LogApiRequest { pub struct LogApiResponse { event_status: EventStatus, count: usize, - events_byte_size: usize, + events_byte_size: JsonSize, raw_byte_size: usize, } @@ -137,7 +140,9 @@ impl Service for LogApiService { }; let count = request.get_metadata().event_count(); - let events_byte_size = request.get_metadata().events_byte_size(); + let events_byte_size = request + .get_metadata() + .events_estimated_json_encoded_byte_size(); let raw_byte_size = request.uncompressed_size; let mut http_request = http_request.header(CONTENT_LENGTH, request.body.len()); diff --git a/src/sinks/datadog/metrics/request_builder.rs b/src/sinks/datadog/metrics/request_builder.rs index c6b287b39ccc1..64b1226b661bf 100644 --- a/src/sinks/datadog/metrics/request_builder.rs +++ b/src/sinks/datadog/metrics/request_builder.rs @@ -3,7 +3,10 @@ use serde_json::error::Category; use snafu::Snafu; use std::{num::NonZeroUsize, sync::Arc}; use vector_common::request_metadata::RequestMetadata; -use vector_core::event::{EventFinalizers, Finalizable, Metric}; +use vector_core::{ + event::{EventFinalizers, Finalizable, Metric}, + EstimatedJsonEncodedSizeOf, +}; use super::{ config::{DatadogMetricsEndpoint, DatadogMetricsEndpointConfiguration}, @@ -209,6 +212,7 @@ impl IncrementalRequestBuilder<((Option>, DatadogMetricsEndpoint), Vec< if n > 0 { match encoder.finish() { Ok((payload, mut metrics, raw_bytes_written)) => { + let json_size = metrics.estimated_json_encoded_size_of(); let finalizers = metrics.take_finalizers(); let metadata = DDMetricsMetadata { api_key: api_key.as_ref().map(Arc::clone), @@ -219,7 +223,7 @@ impl IncrementalRequestBuilder<((Option>, DatadogMetricsEndpoint), Vec< let builder = RequestMetadataBuilder::new( metrics.len(), raw_bytes_written, - raw_bytes_written, + json_size, ); let bytes_len = NonZeroUsize::new(payload.len()) .expect("payload should never be zero length"); @@ -329,6 +333,7 @@ fn encode_now_or_never( encoder .finish() .map(|(payload, mut processed, raw_bytes_written)| { + let json_size = processed.estimated_json_encoded_size_of(); let finalizers = processed.take_finalizers(); let ddmetrics_metadata = DDMetricsMetadata { api_key, @@ -336,8 +341,7 @@ fn encode_now_or_never( finalizers, raw_bytes: raw_bytes_written, }; - let builder = - RequestMetadataBuilder::new(metrics_len, raw_bytes_written, raw_bytes_written); + let builder = RequestMetadataBuilder::new(metrics_len, raw_bytes_written, json_size); let bytes_len = NonZeroUsize::new(payload.len()).expect("payload should never be zero length"); let request_metadata = builder.with_request_size(bytes_len); diff --git a/src/sinks/datadog/metrics/service.rs b/src/sinks/datadog/metrics/service.rs index f107c7413bd9c..d15716b99d8ad 100644 --- a/src/sinks/datadog/metrics/service.rs +++ b/src/sinks/datadog/metrics/service.rs @@ -10,7 +10,10 @@ use http::{ use hyper::Body; use snafu::ResultExt; use tower::Service; -use vector_common::request_metadata::{MetaDescriptive, RequestMetadata}; +use vector_common::{ + json_size::JsonSize, + request_metadata::{MetaDescriptive, RequestMetadata}, +}; use vector_core::{ event::{EventFinalizers, EventStatus, Finalizable}, internal_event::CountByteSize, @@ -123,7 +126,7 @@ pub struct DatadogMetricsResponse { status_code: StatusCode, body: Bytes, batch_size: usize, - byte_size: usize, + byte_size: JsonSize, raw_byte_size: usize, } @@ -182,7 +185,9 @@ impl Service for DatadogMetricsService { let api_key = self.api_key.clone(); Box::pin(async move { - let byte_size = request.get_metadata().events_byte_size(); + let byte_size = request + .get_metadata() + .events_estimated_json_encoded_byte_size(); let batch_size = request.get_metadata().event_count(); let raw_byte_size = request.raw_bytes; diff --git a/src/sinks/datadog/traces/request_builder.rs b/src/sinks/datadog/traces/request_builder.rs index 80613314caef7..dbe57714995fc 100644 --- a/src/sinks/datadog/traces/request_builder.rs +++ b/src/sinks/datadog/traces/request_builder.rs @@ -9,7 +9,10 @@ use bytes::Bytes; use prost::Message; use snafu::Snafu; use vector_common::request_metadata::RequestMetadata; -use vector_core::event::{EventFinalizers, Finalizable}; +use vector_core::{ + event::{EventFinalizers, Finalizable}, + EstimatedJsonEncodedSizeOf, +}; use super::{ apm_stats::{compute_apm_stats, Aggregator}, @@ -122,6 +125,7 @@ impl IncrementalRequestBuilder<(PartitionKey, Vec)> for DatadogTracesRequ .for_each(|r| match r { Ok((payload, mut processed)) => { let uncompressed_size = payload.len(); + let json_size = processed.estimated_json_encoded_size_of(); let metadata = DDTracesMetadata { api_key: key .api_key @@ -139,11 +143,8 @@ impl IncrementalRequestBuilder<(PartitionKey, Vec)> for DatadogTracesRequ let bytes = compressor.into_inner().freeze(); // build RequestMetadata - let builder = RequestMetadataBuilder::new( - n, - uncompressed_size, - uncompressed_size, - ); + let builder = + RequestMetadataBuilder::new(n, uncompressed_size, json_size); let bytes_len = NonZeroUsize::new(bytes.len()) .expect("payload should never be zero length"); let request_metadata = builder.with_request_size(bytes_len); diff --git a/src/sinks/datadog/traces/service.rs b/src/sinks/datadog/traces/service.rs index 509c909a87d4d..66e46b8075ca1 100644 --- a/src/sinks/datadog/traces/service.rs +++ b/src/sinks/datadog/traces/service.rs @@ -9,7 +9,10 @@ use http::{Request, StatusCode, Uri}; use hyper::Body; use snafu::ResultExt; use tower::Service; -use vector_common::request_metadata::{MetaDescriptive, RequestMetadata}; +use vector_common::{ + json_size::JsonSize, + request_metadata::{MetaDescriptive, RequestMetadata}, +}; use vector_core::{ event::{EventFinalizers, EventStatus, Finalizable}, internal_event::CountByteSize, @@ -91,7 +94,7 @@ pub struct TraceApiResponse { status_code: StatusCode, body: Bytes, batch_size: usize, - byte_size: usize, + byte_size: JsonSize, uncompressed_size: usize, } @@ -146,7 +149,9 @@ impl Service for TraceApiService { let client = self.client.clone(); Box::pin(async move { - let byte_size = request.get_metadata().events_byte_size(); + let byte_size = request + .get_metadata() + .events_estimated_json_encoded_byte_size(); let batch_size = request.get_metadata().event_count(); let uncompressed_size = request.uncompressed_size; let http_request = request.into_http_request().context(BuildRequestSnafu)?; diff --git a/src/sinks/datadog_archives.rs b/src/sinks/datadog_archives.rs index 6be169c12a62a..e439481c076a1 100644 --- a/src/sinks/datadog_archives.rs +++ b/src/sinks/datadog_archives.rs @@ -31,7 +31,7 @@ use vector_config::{configurable_component, NamedComponent}; use vector_core::{ config::AcknowledgementsConfig, event::{Event, EventFinalizers, Finalizable}, - schema, ByteSizeOf, + schema, EstimatedJsonEncodedSizeOf, }; use vrl::value::Kind; @@ -816,7 +816,7 @@ impl RequestBuilder<(String, Vec)> for DatadogAzureRequestBuilder { let metadata = AzureBlobMetadata { partition_key, count: events.len(), - byte_size: events.size_of(), + byte_size: events.estimated_json_encoded_size_of(), finalizers, }; let builder = RequestMetadataBuilder::from_events(&events); diff --git a/src/sinks/elasticsearch/encoder.rs b/src/sinks/elasticsearch/encoder.rs index 0558e8c44684c..5d27ba891b596 100644 --- a/src/sinks/elasticsearch/encoder.rs +++ b/src/sinks/elasticsearch/encoder.rs @@ -2,6 +2,7 @@ use std::{io, io::Write}; use serde::Serialize; use vector_buffers::EventCount; +use vector_common::json_size::JsonSize; use vector_core::{event::Event, ByteSizeOf, EstimatedJsonEncodedSizeOf}; use crate::{ @@ -34,7 +35,7 @@ impl ByteSizeOf for ProcessedEvent { } impl EstimatedJsonEncodedSizeOf for ProcessedEvent { - fn estimated_json_encoded_size_of(&self) -> usize { + fn estimated_json_encoded_size_of(&self) -> JsonSize { self.log.estimated_json_encoded_size_of() } } diff --git a/src/sinks/elasticsearch/request_builder.rs b/src/sinks/elasticsearch/request_builder.rs index f6327f34ca50b..c5918df6e384c 100644 --- a/src/sinks/elasticsearch/request_builder.rs +++ b/src/sinks/elasticsearch/request_builder.rs @@ -1,6 +1,6 @@ use bytes::Bytes; -use vector_common::request_metadata::RequestMetadata; -use vector_core::ByteSizeOf; +use vector_common::{json_size::JsonSize, request_metadata::RequestMetadata}; +use vector_core::EstimatedJsonEncodedSizeOf; use crate::{ event::{EventFinalizers, Finalizable}, @@ -25,7 +25,7 @@ pub struct ElasticsearchRequestBuilder { pub struct Metadata { finalizers: EventFinalizers, batch_size: usize, - events_byte_size: usize, + events_byte_size: JsonSize, } impl RequestBuilder> for ElasticsearchRequestBuilder { @@ -50,9 +50,9 @@ impl RequestBuilder> for ElasticsearchRequestBuilder { ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) { let events_byte_size = events .iter() - .map(|x| x.log.size_of()) + .map(|x| x.log.estimated_json_encoded_size_of()) .reduce(|a, b| a + b) - .unwrap_or(0); + .unwrap_or(JsonSize::zero()); let metadata_builder = RequestMetadataBuilder::from_events(&events); diff --git a/src/sinks/elasticsearch/retry.rs b/src/sinks/elasticsearch/retry.rs index bd6035087c91e..4f5a6e0c73ed6 100644 --- a/src/sinks/elasticsearch/retry.rs +++ b/src/sinks/elasticsearch/retry.rs @@ -160,6 +160,7 @@ mod tests { use bytes::Bytes; use http::Response; use similar_asserts::assert_eq; + use vector_common::json_size::JsonSize; use super::*; use crate::event::EventStatus; @@ -179,7 +180,7 @@ mod tests { http_response: response, event_status: EventStatus::Rejected, batch_size: 1, - events_byte_size: 1, + events_byte_size: JsonSize::new(1), }), RetryAction::DontRetry(_) )); @@ -200,7 +201,7 @@ mod tests { http_response: response, event_status: EventStatus::Errored, batch_size: 1, - events_byte_size: 1, + events_byte_size: JsonSize::new(1), }), RetryAction::Retry(_) )); diff --git a/src/sinks/elasticsearch/service.rs b/src/sinks/elasticsearch/service.rs index b205f10f810e6..1baea684c07b2 100644 --- a/src/sinks/elasticsearch/service.rs +++ b/src/sinks/elasticsearch/service.rs @@ -11,7 +11,10 @@ use futures::future::BoxFuture; use http::{Response, Uri}; use hyper::{service::Service, Body, Request}; use tower::ServiceExt; -use vector_common::request_metadata::{MetaDescriptive, RequestMetadata}; +use vector_common::{ + json_size::JsonSize, + request_metadata::{MetaDescriptive, RequestMetadata}, +}; use vector_core::{internal_event::CountByteSize, stream::DriverResponse, ByteSizeOf}; use crate::sinks::elasticsearch::sign_request; @@ -31,7 +34,7 @@ pub struct ElasticsearchRequest { pub payload: Bytes, pub finalizers: EventFinalizers, pub batch_size: usize, - pub events_byte_size: usize, + pub events_byte_size: JsonSize, pub metadata: RequestMetadata, } @@ -146,7 +149,7 @@ pub struct ElasticsearchResponse { pub http_response: Response, pub event_status: EventStatus, pub batch_size: usize, - pub events_byte_size: usize, + pub events_byte_size: JsonSize, } impl DriverResponse for ElasticsearchResponse { diff --git a/src/sinks/influxdb/metrics.rs b/src/sinks/influxdb/metrics.rs index 41c8559963cc5..75bde8fde1238 100644 --- a/src/sinks/influxdb/metrics.rs +++ b/src/sinks/influxdb/metrics.rs @@ -7,7 +7,7 @@ use tower::Service; use vector_config::configurable_component; use vector_core::{ event::metric::{MetricSketch, MetricTags, Quantile}, - ByteSizeOf, + ByteSizeOf, EstimatedJsonEncodedSizeOf, }; use crate::{ @@ -184,9 +184,11 @@ impl InfluxDbSvc { .with_flat_map(move |event: Event| { stream::iter({ let byte_size = event.size_of(); + let json_size = event.estimated_json_encoded_size_of(); + normalizer .normalize(event.into_metric()) - .map(|metric| Ok(EncodedEvent::new(metric, byte_size))) + .map(|metric| Ok(EncodedEvent::new(metric, byte_size, json_size))) }) }) .sink_map_err(|error| error!(message = "Fatal influxdb sink error.", %error)); diff --git a/src/sinks/kafka/service.rs b/src/sinks/kafka/service.rs index d79ab6eb808b2..89a1fb5ce6827 100644 --- a/src/sinks/kafka/service.rs +++ b/src/sinks/kafka/service.rs @@ -9,7 +9,10 @@ use rdkafka::{ util::Timeout, }; use tower::Service; -use vector_common::request_metadata::{MetaDescriptive, RequestMetadata}; +use vector_common::{ + json_size::JsonSize, + request_metadata::{MetaDescriptive, RequestMetadata}, +}; use vector_core::{ internal_event::{ ByteSize, BytesSent, CountByteSize, InternalEventHandle as _, Protocol, Registered, @@ -37,7 +40,7 @@ pub struct KafkaRequestMetadata { } pub struct KafkaResponse { - event_byte_size: usize, + event_byte_size: JsonSize, } impl DriverResponse for KafkaResponse { @@ -90,7 +93,9 @@ impl Service for KafkaService { let this = self.clone(); Box::pin(async move { - let event_byte_size = request.get_metadata().events_byte_size(); + let event_byte_size = request + .get_metadata() + .events_estimated_json_encoded_byte_size(); let mut record = FutureRecord::to(&request.metadata.topic).payload(request.body.as_ref()); diff --git a/src/sinks/loki/event.rs b/src/sinks/loki/event.rs index 93d92b8498ab1..76389e2c64fb9 100644 --- a/src/sinks/loki/event.rs +++ b/src/sinks/loki/event.rs @@ -3,6 +3,7 @@ use std::{collections::HashMap, io}; use bytes::Bytes; use serde::{ser::SerializeSeq, Serialize}; use vector_buffers::EventCount; +use vector_common::json_size::JsonSize; use vector_core::{ event::{EventFinalizers, Finalizable}, ByteSizeOf, EstimatedJsonEncodedSizeOf, @@ -140,10 +141,10 @@ impl ByteSizeOf for LokiEvent { /// This implementation approximates the `Serialize` implementation below, without any allocations. impl EstimatedJsonEncodedSizeOf for LokiEvent { - fn estimated_json_encoded_size_of(&self) -> usize { - static BRACKETS_SIZE: usize = 2; - static COLON_SIZE: usize = 1; - static QUOTES_SIZE: usize = 2; + fn estimated_json_encoded_size_of(&self) -> JsonSize { + static BRACKETS_SIZE: JsonSize = JsonSize::new(2); + static COLON_SIZE: JsonSize = JsonSize::new(1); + static QUOTES_SIZE: JsonSize = JsonSize::new(2); BRACKETS_SIZE + QUOTES_SIZE @@ -185,7 +186,7 @@ impl ByteSizeOf for LokiRecord { } impl EstimatedJsonEncodedSizeOf for LokiRecord { - fn estimated_json_encoded_size_of(&self) -> usize { + fn estimated_json_encoded_size_of(&self) -> JsonSize { self.event.estimated_json_encoded_size_of() } } diff --git a/src/sinks/opendal_common.rs b/src/sinks/opendal_common.rs index dc3f1aefcd991..ba961607be438 100644 --- a/src/sinks/opendal_common.rs +++ b/src/sinks/opendal_common.rs @@ -19,13 +19,14 @@ use tower::Service; use tracing::Instrument; use vector_common::{ finalization::{EventStatus, Finalizable}, + json_size::JsonSize, request_metadata::{MetaDescriptive, RequestMetadata}, }; use vector_core::{ internal_event::CountByteSize, sink::StreamSink, stream::{BatcherSettings, DriverResponse}, - ByteSizeOf, + EstimatedJsonEncodedSizeOf, }; use crate::{ @@ -168,7 +169,7 @@ impl Finalizable for OpenDalRequest { pub struct OpenDalMetadata { pub partition_key: String, pub count: usize, - pub byte_size: usize, + pub byte_size: JsonSize, pub finalizers: EventFinalizers, } @@ -204,7 +205,7 @@ impl RequestBuilder<(String, Vec)> for OpenDalRequestBuilder { let opendal_metadata = OpenDalMetadata { partition_key, count: events.len(), - byte_size: events.size_of(), + byte_size: events.estimated_json_encoded_size_of(), finalizers, }; @@ -237,7 +238,7 @@ impl RequestBuilder<(String, Vec)> for OpenDalRequestBuilder { #[derive(Debug)] pub struct OpenDalResponse { pub count: usize, - pub events_byte_size: usize, + pub events_byte_size: JsonSize, pub byte_size: usize, } diff --git a/src/sinks/prometheus/remote_write.rs b/src/sinks/prometheus/remote_write.rs index 9d4d7ba9f37e5..2692253780c5a 100644 --- a/src/sinks/prometheus/remote_write.rs +++ b/src/sinks/prometheus/remote_write.rs @@ -10,7 +10,7 @@ use prost::Message; use snafu::{ResultExt, Snafu}; use tower::Service; use vector_config::configurable_component; -use vector_core::ByteSizeOf; +use vector_core::{ByteSizeOf, EstimatedJsonEncodedSizeOf}; use super::collector::{self, MetricCollector as _}; use crate::{ @@ -200,6 +200,8 @@ impl SinkConfig for RemoteWriteConfig { .partition_sink(HttpRetryLogic, service, buffer, batch.timeout) .with_flat_map(move |event: Event| { let byte_size = event.size_of(); + let json_size = event.estimated_json_encoded_size_of(); + stream::iter(normalizer.normalize(event.into_metric()).map(|event| { let tenant_id = tenant_id.as_ref().and_then(|template| { template @@ -217,6 +219,7 @@ impl SinkConfig for RemoteWriteConfig { Ok(EncodedEvent::new( PartitionInnerBuffer::new(event, key), byte_size, + json_size, )) })) }) diff --git a/src/sinks/pulsar/service.rs b/src/sinks/pulsar/service.rs index d723748fd1c7a..bb61dcee92ed3 100644 --- a/src/sinks/pulsar/service.rs +++ b/src/sinks/pulsar/service.rs @@ -14,7 +14,10 @@ use vector_core::stream::DriverResponse; use crate::event::{EventFinalizers, EventStatus, Finalizable}; use crate::internal_events::PulsarSendingError; use crate::sinks::pulsar::request_builder::PulsarMetadata; -use vector_common::request_metadata::{MetaDescriptive, RequestMetadata}; +use vector_common::{ + json_size::JsonSize, + request_metadata::{MetaDescriptive, RequestMetadata}, +}; #[derive(Clone)] pub(super) struct PulsarRequest { @@ -24,7 +27,8 @@ pub(super) struct PulsarRequest { } pub struct PulsarResponse { - event_byte_size: usize, + byte_size: usize, + event_byte_size: JsonSize, } impl DriverResponse for PulsarResponse { @@ -37,7 +41,7 @@ impl DriverResponse for PulsarResponse { } fn bytes_sent(&self) -> Option { - Some(self.event_byte_size) + Some(self.byte_size) } } @@ -102,6 +106,7 @@ impl Service for PulsarService { Box::pin(async move { let body = request.body.clone(); + let byte_size = request.body.len(); let mut properties = HashMap::new(); if let Some(props) = request.metadata.properties { @@ -134,7 +139,10 @@ impl Service for PulsarService { match fut { Ok(resp) => match resp.await { Ok(_) => Ok(PulsarResponse { - event_byte_size: request.request_metadata.events_byte_size(), + byte_size, + event_byte_size: request + .request_metadata + .events_estimated_json_encoded_byte_size(), }), Err(e) => { emit!(PulsarSendingError { diff --git a/src/sinks/pulsar/sink.rs b/src/sinks/pulsar/sink.rs index 0d7e13ec6c84d..c8ab0bcae256c 100644 --- a/src/sinks/pulsar/sink.rs +++ b/src/sinks/pulsar/sink.rs @@ -14,7 +14,7 @@ use crate::{ template::Template, }; use vector_buffers::EventCount; -use vector_common::byte_size_of::ByteSizeOf; +use vector_common::{byte_size_of::ByteSizeOf, json_size::JsonSize}; use vector_core::{ event::{EstimatedJsonEncodedSizeOf, LogEvent}, sink::StreamSink, @@ -76,7 +76,7 @@ impl ByteSizeOf for PulsarEvent { } impl EstimatedJsonEncodedSizeOf for PulsarEvent { - fn estimated_json_encoded_size_of(&self) -> usize { + fn estimated_json_encoded_size_of(&self) -> JsonSize { self.event.estimated_json_encoded_size_of() } } diff --git a/src/sinks/redis.rs b/src/sinks/redis.rs index ff08a98f683a9..084ff21ffd0e3 100644 --- a/src/sinks/redis.rs +++ b/src/sinks/redis.rs @@ -290,12 +290,13 @@ fn encode_event( transformer.transform(&mut event); let mut bytes = BytesMut::new(); + let byte_size = bytes.len(); // Errors are handled by `Encoder`. encoder.encode(event, &mut bytes).ok()?; let value = bytes.freeze(); - let event = EncodedEvent::new(RedisKvEntry { key, value }, event_byte_size); + let event = EncodedEvent::new(RedisKvEntry { key, value }, byte_size, event_byte_size); Some(event) } diff --git a/src/sinks/s3_common/service.rs b/src/sinks/s3_common/service.rs index 52cfc71f637d0..c9c12ac4bcb69 100644 --- a/src/sinks/s3_common/service.rs +++ b/src/sinks/s3_common/service.rs @@ -11,7 +11,10 @@ use futures::future::BoxFuture; use md5::Digest; use tower::Service; use tracing::Instrument; -use vector_common::request_metadata::{MetaDescriptive, RequestMetadata}; +use vector_common::{ + json_size::JsonSize, + request_metadata::{MetaDescriptive, RequestMetadata}, +}; use vector_core::{ event::{EventFinalizers, EventStatus, Finalizable}, internal_event::CountByteSize, @@ -53,7 +56,7 @@ pub struct S3Metadata { #[derive(Debug)] pub struct S3Response { count: usize, - events_byte_size: usize, + events_byte_size: JsonSize, } impl DriverResponse for S3Response { @@ -100,7 +103,9 @@ impl Service for S3Service { // Emission of internal events for errors and dropped events is handled upstream by the caller. fn call(&mut self, request: S3Request) -> Self::Future { let count = request.get_metadata().event_count(); - let events_byte_size = request.get_metadata().events_byte_size(); + let events_byte_size = request + .get_metadata() + .events_estimated_json_encoded_byte_size(); let options = request.options; diff --git a/src/sinks/sematext/metrics.rs b/src/sinks/sematext/metrics.rs index b362808264829..c67898f307d59 100644 --- a/src/sinks/sematext/metrics.rs +++ b/src/sinks/sematext/metrics.rs @@ -8,7 +8,7 @@ use indoc::indoc; use tower::Service; use vector_common::sensitive_string::SensitiveString; use vector_config::configurable_component; -use vector_core::EstimatedJsonEncodedSizeOf; +use vector_core::{ByteSizeOf, EstimatedJsonEncodedSizeOf}; use super::Region; use crate::{ @@ -185,10 +185,11 @@ impl SematextMetricsService { ) .with_flat_map(move |event: Event| { stream::iter({ - let byte_size = event.estimated_json_encoded_size_of(); + let byte_size = event.size_of(); + let json_byte_size = event.estimated_json_encoded_size_of(); normalizer .normalize(event.into_metric()) - .map(|item| Ok(EncodedEvent::new(item, byte_size))) + .map(|item| Ok(EncodedEvent::new(item, byte_size, json_byte_size))) }) }) .sink_map_err(|error| error!(message = "Fatal sematext metrics sink error.", %error)); @@ -256,7 +257,8 @@ fn encode_events( metrics: Vec, ) -> EncodedEvent { let mut output = BytesMut::new(); - let byte_size = metrics.estimated_json_encoded_size_of(); + let byte_size = metrics.size_of(); + let json_byte_size = metrics.estimated_json_encoded_size_of(); for metric in metrics.into_iter() { let (series, data, _metadata) = metric.into_parts(); let namespace = series @@ -292,7 +294,7 @@ fn encode_events( if !output.is_empty() { output.truncate(output.len() - 1); } - EncodedEvent::new(output.freeze(), byte_size) + EncodedEvent::new(output.freeze(), byte_size, json_byte_size) } fn to_fields(label: String, value: f64) -> HashMap { diff --git a/src/sinks/splunk_hec/common/response.rs b/src/sinks/splunk_hec/common/response.rs index 9c3f8e952a50e..65eaea0f12bcf 100644 --- a/src/sinks/splunk_hec/common/response.rs +++ b/src/sinks/splunk_hec/common/response.rs @@ -1,10 +1,11 @@ +use vector_common::json_size::JsonSize; use vector_core::internal_event::CountByteSize; use vector_core::{event::EventStatus, stream::DriverResponse}; pub struct HecResponse { pub event_status: EventStatus, pub events_count: usize, - pub events_byte_size: usize, + pub events_byte_size: JsonSize, } impl AsRef for HecResponse { diff --git a/src/sinks/splunk_hec/common/service.rs b/src/sinks/splunk_hec/common/service.rs index 7f44cfa90e4df..9492f11137dbe 100644 --- a/src/sinks/splunk_hec/common/service.rs +++ b/src/sinks/splunk_hec/common/service.rs @@ -114,7 +114,7 @@ where let ack_slot = self.current_ack_slot.take(); let events_count = req.get_metadata().event_count(); - let events_byte_size = req.get_metadata().events_byte_size(); + let events_byte_size = req.get_metadata().events_estimated_json_encoded_byte_size(); let response = self.inner.call(req); Box::pin(async move { @@ -338,7 +338,7 @@ mod tests { let body = Bytes::from("test-message"); let events_byte_size = body.len(); - let builder = RequestMetadataBuilder::new(1, events_byte_size, events_byte_size); + let builder = RequestMetadataBuilder::new(1, events_byte_size, events_byte_size.into()); let bytes_len = NonZeroUsize::new(events_byte_size).expect("payload should never be zero length"); let metadata = builder.with_request_size(bytes_len); diff --git a/src/sinks/statsd/service.rs b/src/sinks/statsd/service.rs index 4d3c3bc78dd09..5686dc22de1ea 100644 --- a/src/sinks/statsd/service.rs +++ b/src/sinks/statsd/service.rs @@ -49,7 +49,7 @@ impl DriverResponse for StatsdResponse { fn events_sent(&self) -> CountByteSize { CountByteSize( self.metadata.event_count(), - self.metadata.events_byte_size(), + self.metadata.events_estimated_json_encoded_byte_size(), ) } diff --git a/src/sinks/util/adaptive_concurrency/tests.rs b/src/sinks/util/adaptive_concurrency/tests.rs index 5da3259d117cb..33d735a76f187 100644 --- a/src/sinks/util/adaptive_concurrency/tests.rs +++ b/src/sinks/util/adaptive_concurrency/tests.rs @@ -25,6 +25,7 @@ use serde::Deserialize; use snafu::Snafu; use tokio::time::{self, sleep, Duration, Instant}; use tower::Service; +use vector_common::json_size::JsonSize; use vector_config::configurable_component; use super::controller::ControllerStatistics; @@ -184,7 +185,9 @@ impl SinkConfig for TestConfig { VecBuffer::new(batch_settings.size), batch_settings.timeout, ) - .with_flat_map(|event| stream::iter(Some(Ok(EncodedEvent::new(event, 0))))) + .with_flat_map(|event| { + stream::iter(Some(Ok(EncodedEvent::new(event, 0, JsonSize::zero())))) + }) .sink_map_err(|error| panic!("Fatal test sink error: {}", error)); let healthcheck = future::ok(()).boxed(); diff --git a/src/sinks/util/batch.rs b/src/sinks/util/batch.rs index 7eff19279670f..b4bd7e70fdd60 100644 --- a/src/sinks/util/batch.rs +++ b/src/sinks/util/batch.rs @@ -3,6 +3,7 @@ use std::{marker::PhantomData, num::NonZeroUsize, time::Duration}; use derivative::Derivative; use serde_with::serde_as; use snafu::Snafu; +use vector_common::json_size::JsonSize; use vector_config::configurable_component; use vector_core::stream::BatcherSettings; @@ -362,6 +363,7 @@ pub struct EncodedBatch { pub finalizers: EventFinalizers, pub count: usize, pub byte_size: usize, + pub json_byte_size: JsonSize, } /// This is a batch construct that stores an set of event finalizers alongside the batch itself. @@ -374,6 +376,7 @@ pub struct FinalizersBatch { // could be smaller due to aggregated items (ie metrics). count: usize, byte_size: usize, + json_byte_size: JsonSize, } impl From for FinalizersBatch { @@ -383,6 +386,7 @@ impl From for FinalizersBatch { finalizers: Default::default(), count: 0, byte_size: 0, + json_byte_size: JsonSize::zero(), } } } @@ -402,18 +406,21 @@ impl Batch for FinalizersBatch { item, finalizers, byte_size, + json_byte_size, } = item; match self.inner.push(item) { PushResult::Ok(full) => { self.finalizers.merge(finalizers); self.count += 1; self.byte_size += byte_size; + self.json_byte_size += json_byte_size; PushResult::Ok(full) } PushResult::Overflow(item) => PushResult::Overflow(EncodedEvent { item, finalizers, byte_size, + json_byte_size, }), } } @@ -428,6 +435,7 @@ impl Batch for FinalizersBatch { finalizers: Default::default(), count: 0, byte_size: 0, + json_byte_size: JsonSize::zero(), } } @@ -437,6 +445,7 @@ impl Batch for FinalizersBatch { finalizers: self.finalizers, count: self.count, byte_size: self.byte_size, + json_byte_size: self.json_byte_size, } } diff --git a/src/sinks/util/buffer/mod.rs b/src/sinks/util/buffer/mod.rs index aa9a22ff6b22f..69bd67d17b2f3 100644 --- a/src/sinks/util/buffer/mod.rs +++ b/src/sinks/util/buffer/mod.rs @@ -145,6 +145,7 @@ mod test { use bytes::{Buf, BytesMut}; use futures::{future, stream, SinkExt, StreamExt}; use tokio::time::Duration; + use vector_common::json_size::JsonSize; use super::{Buffer, Compression}; use crate::sinks::util::{BatchSettings, BatchSink, EncodedEvent}; @@ -179,7 +180,10 @@ mod test { buffered .sink_map_err(drop) - .send_all(&mut stream::iter(input).map(|item| Ok(EncodedEvent::new(item, 0)))) + .send_all( + &mut stream::iter(input) + .map(|item| Ok(EncodedEvent::new(item, 0, JsonSize::zero()))), + ) .await .unwrap(); diff --git a/src/sinks/util/http.rs b/src/sinks/util/http.rs index fcc9c3d0f8158..3f943db17eeb9 100644 --- a/src/sinks/util/http.rs +++ b/src/sinks/util/http.rs @@ -20,7 +20,7 @@ use snafu::{ResultExt, Snafu}; use tower::{Service, ServiceBuilder}; use tower_http::decompression::DecompressionLayer; use vector_config::configurable_component; -use vector_core::ByteSizeOf; +use vector_core::{ByteSizeOf, EstimatedJsonEncodedSizeOf}; use super::{ retries::{RetryAction, RetryLogic}, @@ -172,12 +172,14 @@ where fn start_send(mut self: Pin<&mut Self>, mut event: Event) -> Result<(), Self::Error> { let byte_size = event.size_of(); + let json_byte_size = event.estimated_json_encoded_size_of(); let finalizers = event.metadata_mut().take_finalizers(); if let Some(item) = self.encoder.encode_event(event) { *self.project().slot = Some(EncodedEvent { item, finalizers, byte_size, + json_byte_size, }); } @@ -323,11 +325,14 @@ where fn start_send(mut self: Pin<&mut Self>, mut event: Event) -> Result<(), Self::Error> { let finalizers = event.metadata_mut().take_finalizers(); let byte_size = event.size_of(); + let json_byte_size = event.estimated_json_encoded_size_of(); + if let Some(item) = self.encoder.encode_event(event) { *self.project().slot = Some(EncodedEvent { item, finalizers, byte_size, + json_byte_size, }); } diff --git a/src/sinks/util/metadata.rs b/src/sinks/util/metadata.rs index 975959e56aa15..d89b51140e5f6 100644 --- a/src/sinks/util/metadata.rs +++ b/src/sinks/util/metadata.rs @@ -3,7 +3,7 @@ use std::num::NonZeroUsize; use vector_buffers::EventCount; use vector_core::{ByteSizeOf, EstimatedJsonEncodedSizeOf}; -use vector_common::request_metadata::RequestMetadata; +use vector_common::{json_size::JsonSize, request_metadata::RequestMetadata}; use super::request_builder::EncodeResult; @@ -11,7 +11,7 @@ use super::request_builder::EncodeResult; pub struct RequestMetadataBuilder { event_count: usize, events_byte_size: usize, - events_estimated_json_encoded_byte_size: usize, + events_estimated_json_encoded_byte_size: JsonSize, } impl RequestMetadataBuilder { @@ -29,7 +29,7 @@ impl RequestMetadataBuilder { pub const fn new( event_count: usize, events_byte_size: usize, - events_estimated_json_encoded_byte_size: usize, + events_estimated_json_encoded_byte_size: JsonSize, ) -> Self { Self { event_count, diff --git a/src/sinks/util/mod.rs b/src/sinks/util/mod.rs index 591e822ac557c..a83b17673ddbc 100644 --- a/src/sinks/util/mod.rs +++ b/src/sinks/util/mod.rs @@ -47,6 +47,7 @@ pub use service::{ pub use sink::{BatchSink, PartitionBatchSink, StreamSink}; use snafu::Snafu; pub use uri::UriSerde; +use vector_common::json_size::JsonSize; use crate::event::EventFinalizers; @@ -63,16 +64,18 @@ pub struct EncodedEvent { pub item: I, pub finalizers: EventFinalizers, pub byte_size: usize, + pub json_byte_size: JsonSize, } impl EncodedEvent { /// Create a trivial input with no metadata. This method will be /// removed when all sinks are converted. - pub fn new(item: I, byte_size: usize) -> Self { + pub fn new(item: I, byte_size: usize, json_byte_size: JsonSize) -> Self { Self { item, finalizers: Default::default(), byte_size, + json_byte_size, } } @@ -89,6 +92,7 @@ impl EncodedEvent { item: I::from(that.item), finalizers: that.finalizers, byte_size: that.byte_size, + json_byte_size: that.json_byte_size, } } @@ -98,6 +102,7 @@ impl EncodedEvent { item: doit(self.item), finalizers: self.finalizers, byte_size: self.byte_size, + json_byte_size: self.json_byte_size, } } } diff --git a/src/sinks/util/processed_event.rs b/src/sinks/util/processed_event.rs index 0aa241efe9709..dd13df8bd3f21 100644 --- a/src/sinks/util/processed_event.rs +++ b/src/sinks/util/processed_event.rs @@ -1,4 +1,5 @@ use serde::Serialize; +use vector_common::json_size::JsonSize; use vector_core::{ event::{EventFinalizers, Finalizable, LogEvent, MaybeAsLogMut}, ByteSizeOf, EstimatedJsonEncodedSizeOf, @@ -44,7 +45,7 @@ impl EstimatedJsonEncodedSizeOf for ProcessedEvent where E: EstimatedJsonEncodedSizeOf, { - fn estimated_json_encoded_size_of(&self) -> usize { + fn estimated_json_encoded_size_of(&self) -> JsonSize { self.event.estimated_json_encoded_size_of() } } diff --git a/src/sinks/util/service.rs b/src/sinks/util/service.rs index 2872c4eb39f6d..c53c52b8713c4 100644 --- a/src/sinks/util/service.rs +++ b/src/sinks/util/service.rs @@ -438,6 +438,7 @@ mod tests { use futures::{future, stream, FutureExt, SinkExt, StreamExt}; use tokio::time::Duration; + use vector_common::json_size::JsonSize; use super::*; use crate::sinks::util::{ @@ -520,7 +521,10 @@ mod tests { let input = (0..20).map(|i| PartitionInnerBuffer::new(i, 0)); sink.sink_map_err(drop) - .send_all(&mut stream::iter(input).map(|item| Ok(EncodedEvent::new(item, 0)))) + .send_all( + &mut stream::iter(input) + .map(|item| Ok(EncodedEvent::new(item, 0, JsonSize::zero()))), + ) .await .unwrap(); diff --git a/src/sinks/util/sink.rs b/src/sinks/util/sink.rs index 000b3d82be265..3305573899acb 100644 --- a/src/sinks/util/sink.rs +++ b/src/sinks/util/sink.rs @@ -426,7 +426,8 @@ where items, finalizers, count, - byte_size, + json_byte_size, + .. } = batch; let (tx, rx) = oneshot::channel(); @@ -449,7 +450,7 @@ where finalizers.update_status(status); match status { EventStatus::Delivered => { - events_sent.emit(CountByteSize(count, byte_size)); + events_sent.emit(CountByteSize(count, json_byte_size)); // TODO: Emit a BytesSent event here too } EventStatus::Rejected => { @@ -575,8 +576,9 @@ mod tests { use bytes::Bytes; use futures::{future, stream, task::noop_waker_ref, SinkExt, StreamExt}; use tokio::{task::yield_now, time::Instant}; - use vector_common::finalization::{ - BatchNotifier, BatchStatus, EventFinalizer, EventFinalizers, + use vector_common::{ + finalization::{BatchNotifier, BatchStatus, EventFinalizer, EventFinalizers}, + json_size::JsonSize, }; use super::*; @@ -621,6 +623,7 @@ mod tests { EncodedEvent { item, finalizers, + json_byte_size: JsonSize::zero(), byte_size: 0, } } @@ -770,7 +773,10 @@ mod tests { buffered .sink_map_err(drop) - .send_all(&mut stream::iter(0..22).map(|item| Ok(EncodedEvent::new(item, 0)))) + .send_all( + &mut stream::iter(0..22) + .map(|item| Ok(EncodedEvent::new(item, 0, JsonSize::zero()))), + ) .await .unwrap(); @@ -806,7 +812,7 @@ mod tests { Poll::Ready(Ok(())) )); assert!(matches!( - buffered.start_send_unpin(EncodedEvent::new(0, 0)), + buffered.start_send_unpin(EncodedEvent::new(0, 0, JsonSize::zero())), Ok(()) )); assert!(matches!( @@ -814,7 +820,7 @@ mod tests { Poll::Ready(Ok(())) )); assert!(matches!( - buffered.start_send_unpin(EncodedEvent::new(1, 0)), + buffered.start_send_unpin(EncodedEvent::new(1, 0, JsonSize::zero())), Ok(()) )); @@ -845,7 +851,7 @@ mod tests { Poll::Ready(Ok(())) )); assert!(matches!( - buffered.start_send_unpin(EncodedEvent::new(0, 0)), + buffered.start_send_unpin(EncodedEvent::new(0, 0, JsonSize::zero())), Ok(()) )); assert!(matches!( @@ -853,7 +859,7 @@ mod tests { Poll::Ready(Ok(())) )); assert!(matches!( - buffered.start_send_unpin(EncodedEvent::new(1, 0)), + buffered.start_send_unpin(EncodedEvent::new(1, 0, JsonSize::zero())), Ok(()) )); @@ -887,7 +893,10 @@ mod tests { let sink = PartitionBatchSink::new(svc, VecBuffer::new(batch_settings.size), TIMEOUT); sink.sink_map_err(drop) - .send_all(&mut stream::iter(0..22).map(|item| Ok(EncodedEvent::new(item, 0)))) + .send_all( + &mut stream::iter(0..22) + .map(|item| Ok(EncodedEvent::new(item, 0, JsonSize::zero()))), + ) .await .unwrap(); @@ -920,7 +929,10 @@ mod tests { let input = vec![Partitions::A, Partitions::B]; sink.sink_map_err(drop) - .send_all(&mut stream::iter(input).map(|item| Ok(EncodedEvent::new(item, 0)))) + .send_all( + &mut stream::iter(input) + .map(|item| Ok(EncodedEvent::new(item, 0, JsonSize::zero()))), + ) .await .unwrap(); @@ -947,7 +959,10 @@ mod tests { let input = vec![Partitions::A, Partitions::B, Partitions::A, Partitions::B]; sink.sink_map_err(drop) - .send_all(&mut stream::iter(input).map(|item| Ok(EncodedEvent::new(item, 0)))) + .send_all( + &mut stream::iter(input) + .map(|item| Ok(EncodedEvent::new(item, 0, JsonSize::zero()))), + ) .await .unwrap(); @@ -984,7 +999,7 @@ mod tests { Poll::Ready(Ok(())) )); assert!(matches!( - sink.start_send_unpin(EncodedEvent::new(1, 0)), + sink.start_send_unpin(EncodedEvent::new(1, 0, JsonSize::zero())), Ok(()) )); assert!(matches!(sink.poll_flush_unpin(&mut cx), Poll::Pending)); @@ -1024,6 +1039,7 @@ mod tests { finalizers, count: items, byte_size: 1, + json_byte_size: JsonSize::new(1), } }; @@ -1085,7 +1101,10 @@ mod tests { let input = (0..20).map(|i| (0, i)).chain((0..20).map(|i| (1, i))); sink.sink_map_err(drop) - .send_all(&mut stream::iter(input).map(|item| Ok(EncodedEvent::new(item, 0)))) + .send_all( + &mut stream::iter(input) + .map(|item| Ok(EncodedEvent::new(item, 0, JsonSize::zero()))), + ) .await .unwrap(); diff --git a/src/sinks/util/socket_bytes_sink.rs b/src/sinks/util/socket_bytes_sink.rs index c826213fe5bce..7d668c0201fb4 100644 --- a/src/sinks/util/socket_bytes_sink.rs +++ b/src/sinks/util/socket_bytes_sink.rs @@ -10,7 +10,10 @@ use futures::Sink; use pin_project::{pin_project, pinned_drop}; use tokio::io::AsyncWrite; use tokio_util::codec::{BytesCodec, FramedWrite}; -use vector_common::finalization::{EventFinalizers, EventStatus}; +use vector_common::{ + finalization::{EventFinalizers, EventStatus}, + json_size::JsonSize, +}; use super::EncodedEvent; use crate::internal_events::{SocketBytesSent, SocketEventsSent, SocketMode}; @@ -55,7 +58,7 @@ where shutdown_check: Box::new(shutdown_check), state: State { events_total: 0, - event_bytes: 0, + event_bytes: JsonSize::zero(), bytes_total: 0, socket_mode, finalizers: Vec::new(), @@ -67,7 +70,7 @@ where struct State { socket_mode: SocketMode, events_total: usize, - event_bytes: usize, + event_bytes: JsonSize, bytes_total: usize, finalizers: Vec, } @@ -92,7 +95,7 @@ impl State { } self.events_total = 0; - self.event_bytes = 0; + self.event_bytes = JsonSize::zero(); self.bytes_total = 0; } } @@ -129,7 +132,7 @@ where let pinned = self.project(); pinned.state.finalizers.push(item.finalizers); pinned.state.events_total += 1; - pinned.state.event_bytes += item.byte_size; + pinned.state.event_bytes += item.json_byte_size; pinned.state.bytes_total += item.item.len(); let result = pinned.inner.start_send(item.item); diff --git a/src/sinks/util/tcp.rs b/src/sinks/util/tcp.rs index 932638f0fed01..1fd97fa744e99 100644 --- a/src/sinks/util/tcp.rs +++ b/src/sinks/util/tcp.rs @@ -17,8 +17,9 @@ use tokio::{ time::sleep, }; use tokio_util::codec::Encoder; +use vector_common::json_size::JsonSize; use vector_config::configurable_component; -use vector_core::ByteSizeOf; +use vector_core::{ByteSizeOf, EstimatedJsonEncodedSizeOf}; use crate::{ codecs::Transformer, @@ -275,6 +276,7 @@ where let mut encoder = self.encoder.clone(); let mut input = input.map(|mut event| { let byte_size = event.size_of(); + let json_byte_size = event.estimated_json_encoded_size_of(); let finalizers = event.metadata_mut().take_finalizers(); self.transformer.transform(&mut event); let mut bytes = BytesMut::new(); @@ -286,9 +288,10 @@ where item, finalizers, byte_size, + json_byte_size, } } else { - EncodedEvent::new(Bytes::new(), 0) + EncodedEvent::new(Bytes::new(), 0, JsonSize::zero()) } }); diff --git a/src/sinks/util/unix.rs b/src/sinks/util/unix.rs index a5f2b97596c39..c8a3a8b93603b 100644 --- a/src/sinks/util/unix.rs +++ b/src/sinks/util/unix.rs @@ -6,8 +6,9 @@ use futures::{stream::BoxStream, SinkExt, StreamExt}; use snafu::{ResultExt, Snafu}; use tokio::{net::UnixStream, time::sleep}; use tokio_util::codec::Encoder; +use vector_common::json_size::JsonSize; use vector_config::configurable_component; -use vector_core::ByteSizeOf; +use vector_core::{ByteSizeOf, EstimatedJsonEncodedSizeOf}; use crate::{ codecs::Transformer, @@ -151,6 +152,7 @@ where let mut input = input .map(|mut event| { let byte_size = event.size_of(); + let json_byte_size = event.estimated_json_encoded_size_of(); transformer.transform(&mut event); @@ -164,9 +166,10 @@ where item, finalizers, byte_size, + json_byte_size, } } else { - EncodedEvent::new(Bytes::new(), 0) + EncodedEvent::new(Bytes::new(), 0, JsonSize::zero()) } }) .peekable(); diff --git a/src/sinks/vector/service.rs b/src/sinks/vector/service.rs index bfa4602a0cbd9..a93a196a58dc9 100644 --- a/src/sinks/vector/service.rs +++ b/src/sinks/vector/service.rs @@ -8,7 +8,10 @@ use hyper_proxy::ProxyConnector; use prost::Message; use tonic::{body::BoxBody, IntoRequest}; use tower::Service; -use vector_common::request_metadata::{MetaDescriptive, RequestMetadata}; +use vector_common::{ + json_size::JsonSize, + request_metadata::{MetaDescriptive, RequestMetadata}, +}; use vector_core::{internal_event::CountByteSize, stream::DriverResponse}; use super::VectorSinkError; @@ -29,7 +32,7 @@ pub struct VectorService { pub struct VectorResponse { events_count: usize, - events_byte_size: usize, + events_byte_size: JsonSize, } impl DriverResponse for VectorResponse { @@ -104,7 +107,9 @@ impl Service for VectorService { let mut service = self.clone(); let byte_size = list.request.encoded_len(); let events_count = list.get_metadata().event_count(); - let events_byte_size = list.get_metadata().events_byte_size(); + let events_byte_size = list + .get_metadata() + .events_estimated_json_encoded_byte_size(); let future = async move { service diff --git a/src/sinks/vector/sink.rs b/src/sinks/vector/sink.rs index 6b8fe8af766a9..09eede27d1844 100644 --- a/src/sinks/vector/sink.rs +++ b/src/sinks/vector/sink.rs @@ -4,6 +4,7 @@ use async_trait::async_trait; use futures::{stream::BoxStream, StreamExt}; use prost::Message; use tower::Service; +use vector_common::json_size::JsonSize; use vector_core::{ stream::{BatcherSettings, DriverResponse}, ByteSizeOf, @@ -62,7 +63,7 @@ where let builder = RequestMetadataBuilder::new( event_collection.events.len(), event_collection.events_byte_size, - event_collection.events_byte_size, // this is fine as it isn't being used + JsonSize::new(event_collection.events_byte_size), // this is fine as it isn't being used ); let encoded_events = proto_vector::PushEventsRequest { diff --git a/src/sources/dnstap/mod.rs b/src/sources/dnstap/mod.rs index f2871689f1087..69a9d13728805 100644 --- a/src/sources/dnstap/mod.rs +++ b/src/sources/dnstap/mod.rs @@ -13,7 +13,7 @@ use super::util::framestream::{build_framestream_unix_source, FrameHandler}; use crate::{ config::{log_schema, DataType, SourceConfig, SourceContext, SourceOutput}, event::{Event, LogEvent}, - internal_events::DnstapParseError, + internal_events::{DnstapParseError, SocketEventsReceived, SocketMode}, Result, }; @@ -24,7 +24,10 @@ pub mod schema; use dnsmsg_parser::{dns_message, dns_message_parser}; use lookup::lookup_v2::{parse_value_path, OptionalValuePath}; pub use schema::DnstapEventSchema; -use vector_core::config::{LegacyKey, LogNamespace}; +use vector_core::{ + config::{LegacyKey, LogNamespace}, + EstimatedJsonEncodedSizeOf, +}; /// Configuration for the `dnstap` source. #[configurable_component(source("dnstap", "Collect DNS logs from a dnstap-compatible server."))] @@ -260,12 +263,38 @@ impl FrameHandler for DnstapFrameHandler { * Takes a data frame from the unix socket and turns it into a Vector Event. **/ fn handle_event(&self, received_from: Option, frame: Bytes) -> Option { - // SocketEventsReceived is emitted already - self.bytes_received.emit(ByteSize(frame.len())); let mut log_event = LogEvent::default(); + if let Some(host) = received_from { + self.log_namespace.insert_source_metadata( + DnstapConfig::NAME, + &mut log_event, + self.host_key.as_ref().map(LegacyKey::Overwrite), + path!("host"), + host, + ); + } + + if self.raw_data_only { + log_event.insert( + self.schema.dnstap_root_data_schema().raw_data(), + BASE64_STANDARD.encode(&frame), + ); + } else if let Err(err) = parse_dnstap_data(&self.schema, &mut log_event, frame) { + emit!(DnstapParseError { + error: format!("Dnstap protobuf decode error {:?}.", err) + }); + return None; + } + + emit!(SocketEventsReceived { + mode: SocketMode::Unix, + byte_size: log_event.estimated_json_encoded_size_of(), + count: 1 + }); + if self.log_namespace == LogNamespace::Vector { // The timestamp is inserted by the parser which caters for the Legacy namespace. self.log_namespace.insert_vector_metadata( @@ -283,37 +312,7 @@ impl FrameHandler for DnstapFrameHandler { DnstapConfig::NAME, ); - if let Some(host) = received_from { - self.log_namespace.insert_source_metadata( - DnstapConfig::NAME, - &mut log_event, - self.host_key.as_ref().map(LegacyKey::Overwrite), - path!("host"), - host, - ); - } - - if self.raw_data_only { - log_event.insert( - self.schema.dnstap_root_data_schema().raw_data(), - BASE64_STANDARD.encode(&frame), - ); - let event = Event::from(log_event); - Some(event) - } else { - match parse_dnstap_data(&self.schema, &mut log_event, frame) { - Err(err) => { - emit!(DnstapParseError { - error: format!("Dnstap protobuf decode error {:?}.", err) - }); - None - } - Ok(_) => { - let event = Event::from(log_event); - Some(event) - } - } - } + Some(Event::from(log_event)) } fn socket_path(&self) -> PathBuf { diff --git a/src/sources/file.rs b/src/sources/file.rs index 76bec5f106d9f..3e68e57a2b04e 100644 --- a/src/sources/file.rs +++ b/src/sources/file.rs @@ -18,7 +18,10 @@ use tokio::{sync::oneshot, task::spawn_blocking}; use tracing::{Instrument, Span}; use vector_common::finalizer::OrderedFinalizer; use vector_config::configurable_component; -use vector_core::config::{LegacyKey, LogNamespace}; +use vector_core::{ + config::{LegacyKey, LogNamespace}, + EstimatedJsonEncodedSizeOf, +}; use vrl::value::Kind; use super::util::{EncodingConfig, MultilineConfig}; @@ -739,12 +742,6 @@ fn create_event( meta: &EventMetadata, log_namespace: LogNamespace, ) -> LogEvent { - emit!(FileEventsReceived { - count: 1, - file, - byte_size: line.len(), - }); - let deserializer = BytesDeserializer::new(); let mut event = deserializer.parse_single(line, log_namespace); @@ -791,6 +788,12 @@ fn create_event( file, ); + emit!(FileEventsReceived { + count: 1, + file, + byte_size: event.estimated_json_encoded_size_of(), + }); + event } diff --git a/src/sources/file_descriptors/mod.rs b/src/sources/file_descriptors/mod.rs index 32acfbede5e61..06710adec9c37 100644 --- a/src/sources/file_descriptors/mod.rs +++ b/src/sources/file_descriptors/mod.rs @@ -144,7 +144,7 @@ async fn process_stream( bytes_received.emit(ByteSize(byte_size)); events_received.emit(CountByteSize( events.len(), - events.estimated_json_encoded_size_of(), + events.estimated_json_encoded_size_of(), )); let now = Utc::now(); diff --git a/src/sources/internal_logs.rs b/src/sources/internal_logs.rs index bc923eef399c2..0d22b3b310208 100644 --- a/src/sources/internal_logs.rs +++ b/src/sources/internal_logs.rs @@ -155,12 +155,14 @@ async fn run( // any logs that don't break the loop, as that could cause an // infinite loop since it receives all such logs. while let Some(mut log) = rx.next().await { - let byte_size = log.estimated_json_encoded_size_of(); + // TODO: Should this actually be in memory size? + let byte_size = log.estimated_json_encoded_size_of().get(); + let json_byte_size = log.estimated_json_encoded_size_of(); // This event doesn't emit any log emit!(InternalLogsBytesReceived { byte_size }); emit!(InternalLogsEventsReceived { count: 1, - byte_size, + byte_size: json_byte_size, }); if let Ok(hostname) = &hostname { diff --git a/src/sources/internal_metrics.rs b/src/sources/internal_metrics.rs index bddf04240673d..03f447c7488d9 100644 --- a/src/sources/internal_metrics.rs +++ b/src/sources/internal_metrics.rs @@ -6,8 +6,7 @@ use tokio::time; use tokio_stream::wrappers::IntervalStream; use vector_common::internal_event::{CountByteSize, InternalEventHandle as _}; use vector_config::configurable_component; -use vector_core::config::LogNamespace; -use vector_core::EstimatedJsonEncodedSizeOf; +use vector_core::{config::LogNamespace, ByteSizeOf, EstimatedJsonEncodedSizeOf}; use crate::{ config::{log_schema, SourceConfig, SourceContext, SourceOutput}, @@ -166,10 +165,11 @@ impl<'a> InternalMetrics<'a> { let metrics = self.controller.capture_metrics(); let count = metrics.len(); - let byte_size = metrics.estimated_json_encoded_size_of(); + let byte_size = metrics.size_of(); + let json_size = metrics.estimated_json_encoded_size_of(); emit!(InternalMetricsBytesReceived { byte_size }); - events_received.emit(CountByteSize(count, byte_size)); + events_received.emit(CountByteSize(count, json_size)); let batch = metrics.into_iter().map(|mut metric| { // A metric starts out with a default "vector" namespace, but will be overridden diff --git a/src/sources/postgresql_metrics.rs b/src/sources/postgresql_metrics.rs index b7415c4217831..808028539e6f3 100644 --- a/src/sources/postgresql_metrics.rs +++ b/src/sources/postgresql_metrics.rs @@ -25,7 +25,10 @@ use tokio_postgres::{ Client, Config, Error as PgError, NoTls, Row, }; use tokio_stream::wrappers::IntervalStream; -use vector_common::internal_event::{CountByteSize, InternalEventHandle as _, Registered}; +use vector_common::{ + internal_event::{CountByteSize, InternalEventHandle as _, Registered}, + json_size::JsonSize, +}; use vector_config::configurable_component; use vector_core::config::LogNamespace; use vector_core::{metric_tags, ByteSizeOf, EstimatedJsonEncodedSizeOf}; @@ -566,20 +569,23 @@ impl PostgresqlMetrics { .await { Ok(result) => { - let (count, byte_size, received_byte_size) = - result.iter().fold((0, 0, 0), |res, (set, size)| { - ( - res.0 + set.len(), - res.1 + set.estimated_json_encoded_size_of(), - res.2 + size, - ) - }); + let (count, json_byte_size, received_byte_size) = + result + .iter() + .fold((0, JsonSize::zero(), 0), |res, (set, size)| { + ( + res.0 + set.len(), + res.1 + set.estimated_json_encoded_size_of(), + res.2 + size, + ) + }); emit!(EndpointBytesReceived { byte_size: received_byte_size, protocol: "tcp", endpoint: &self.endpoint, }); - self.events_received.emit(CountByteSize(count, byte_size)); + self.events_received + .emit(CountByteSize(count, json_byte_size)); self.client.set((client, client_version)); Ok(result.into_iter().flat_map(|(metrics, _)| metrics)) } diff --git a/src/sources/util/framestream.rs b/src/sources/util/framestream.rs index 8ed437d9bbaed..194dd48432074 100644 --- a/src/sources/util/framestream.rs +++ b/src/sources/util/framestream.rs @@ -28,9 +28,7 @@ use tracing::{field, Instrument}; use crate::{ event::Event, - internal_events::{ - SocketEventsReceived, SocketMode, UnixSocketError, UnixSocketFileDeleteError, - }, + internal_events::{UnixSocketError, UnixSocketFileDeleteError}, shutdown::ShutdownSignal, sources::Source, SourceSender, @@ -157,11 +155,6 @@ impl FrameStreamReader { } else { //data frame if self.state.control_state == ControlState::ReadingData { - emit!(SocketEventsReceived { - mode: SocketMode::Unix, - byte_size: frame.len(), - count: 1 - }); Some(frame) //return data frame } else { error!( diff --git a/src/sources/util/http_client.rs b/src/sources/util/http_client.rs index 9dbffb26e4f30..aa5cdb1b8db7e 100644 --- a/src/sources/util/http_client.rs +++ b/src/sources/util/http_client.rs @@ -15,6 +15,7 @@ use hyper::{Body, Request}; use std::time::{Duration, Instant}; use std::{collections::HashMap, future::ready}; use tokio_stream::wrappers::IntervalStream; +use vector_common::json_size::JsonSize; use crate::{ http::{Auth, HttpClient}, @@ -187,7 +188,7 @@ pub(crate) async fn call< // HttpClientEventsReceived event, we should // emit 0 when there aren't any usable // metrics. - 0 + JsonSize::zero() } else { events.estimated_json_encoded_size_of() }; diff --git a/website/content/en/highlights/2023-07-04-0-31-0-upgrade-guide.md b/website/content/en/highlights/2023-07-04-0-31-0-upgrade-guide.md new file mode 100644 index 0000000000000..51eada592553a --- /dev/null +++ b/website/content/en/highlights/2023-07-04-0-31-0-upgrade-guide.md @@ -0,0 +1,31 @@ +--- +date: "2023-07-04" +title: "0.31 Upgrade Guide" +description: "An upgrade guide that addresses breaking changes in 0.31.0" +authors: ["stephenwakely"] +release: "0.31.0" +hide_on_release_notes: false +badges: + type: breaking change +--- + +Vector's 0.31.0 release includes **breaking changes**: + +1. [`component_received_event_bytes_total` and `component_sent_event_bytes_total` consistently use estimated JSON size of the event](#event_json_size) + +We cover them below to help you upgrade quickly: + +## Upgrade guide + +### Breaking changes + +#### `component_received_event_bytes_total` and `component_sent_event_bytes_total` consistently use estimated JSON size of the event {#event_json_size} + +Prior to this Version, metrics emitted by Vector were inconsistently measuring +the byte size of the events that were being sent and received. These metrics +have been updated for all components so they always emit an estimate of the size +of the event should it be serialized to JSON. + +Measuring the events like this allows a consistent measurement to be applied +across all components regardless of how the source or sink serializes the event +when connecting to the external service.