From e8e8a9c2a8111afb0aeb1fe474a5d3eecc62a176 Mon Sep 17 00:00:00 2001 From: Bruce Guenter Date: Fri, 20 Oct 2023 13:43:36 -0600 Subject: [PATCH 1/2] chore(deps): Refactor `vector-core::stream` into its own package --- Cargo.lock | 21 +++++++-- Cargo.toml | 2 + lib/vector-core/Cargo.toml | 3 -- lib/vector-core/src/lib.rs | 1 - lib/vector-core/src/time.rs | 4 +- lib/vector-stream/Cargo.toml | 19 ++++++++ .../src}/batcher/config.rs | 0 .../src}/batcher/data.rs | 0 .../src}/batcher/limiter.rs | 4 +- .../src}/batcher/mod.rs | 0 .../src}/concurrent_map.rs | 0 .../stream => vector-stream/src}/driver.rs | 6 +-- .../src}/expiration_map.rs | 0 .../src}/futures_unordered_count.rs | 0 .../mod.rs => vector-stream/src/lib.rs} | 3 ++ .../src}/partitioned_batcher.rs | 17 +++---- src/sinks/appsignal/service.rs | 2 +- src/sinks/aws_cloudwatch_logs/service.rs | 2 +- src/sinks/aws_cloudwatch_logs/sink.rs | 7 +-- src/sinks/aws_kinesis/config.rs | 2 +- src/sinks/aws_s_s/service.rs | 3 +- src/sinks/azure_common/config.rs | 2 +- src/sinks/databend/service.rs | 2 +- src/sinks/datadog/events/service.rs | 2 +- src/sinks/datadog/logs/service.rs | 6 +-- src/sinks/datadog/metrics/service.rs | 6 +-- src/sinks/datadog/metrics/sink.rs | 2 +- src/sinks/datadog/traces/service.rs | 6 +-- src/sinks/datadog/traces/sink.rs | 10 +---- src/sinks/elasticsearch/service.rs | 6 +-- src/sinks/gcs_common/service.rs | 2 +- src/sinks/greptimedb/batch.rs | 6 +-- src/sinks/http/batch.rs | 5 +-- src/sinks/prelude.rs | 44 +++++++++---------- src/sinks/s3_common/service.rs | 6 +-- src/sinks/splunk_hec/common/response.rs | 3 +- src/sinks/statsd/batch.rs | 3 +- src/sinks/statsd/service.rs | 2 +- src/sinks/statsd/sink.rs | 7 +-- src/sinks/util/batch.rs | 2 +- src/sinks/util/builder.rs | 8 ++-- src/sinks/util/http.rs | 5 +-- src/sinks/vector/service.rs | 2 +- src/sinks/vector/sink.rs | 7 +-- .../kubernetes_logs/partial_events_merger.rs | 7 ++- src/transforms/reduce/mod.rs | 2 +- 46 files changed, 128 insertions(+), 121 deletions(-) create mode 100644 lib/vector-stream/Cargo.toml rename lib/{vector-core/src/stream => vector-stream/src}/batcher/config.rs (100%) rename lib/{vector-core/src/stream => vector-stream/src}/batcher/data.rs (100%) rename lib/{vector-core/src/stream => vector-stream/src}/batcher/limiter.rs (97%) rename lib/{vector-core/src/stream => vector-stream/src}/batcher/mod.rs (100%) rename lib/{vector-core/src/stream => vector-stream/src}/concurrent_map.rs (100%) rename lib/{vector-core/src/stream => vector-stream/src}/driver.rs (99%) rename lib/{vector-core/src/stream => vector-stream/src}/expiration_map.rs (100%) rename lib/{vector-core/src/stream => vector-stream/src}/futures_unordered_count.rs (100%) rename lib/{vector-core/src/stream/mod.rs => vector-stream/src/lib.rs} (90%) rename lib/{vector-core/src/stream => vector-stream/src}/partitioned_batcher.rs (98%) diff --git a/Cargo.lock b/Cargo.lock index c0215803d27cc..7b58d53791191 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9861,6 +9861,7 @@ dependencies = [ "vector-config-macros", "vector-core", "vector-lookup", + "vector-stream", "vector-vrl-functions", "vrl", "warp", @@ -10027,7 +10028,6 @@ name = "vector-core" version = "0.1.0" dependencies = [ "async-graphql 5.0.10", - "async-stream", "async-trait", "base64 0.21.4", "bitmask-enum", @@ -10088,12 +10088,10 @@ dependencies = [ "tokio-util", "toml 0.8.2", "tonic 0.10.2", - "tower", "tracing 0.1.37", "tracing-core 0.1.30", "tracing-log", "tracing-subscriber", - "twox-hash", "typetag", "url", "vector-buffers", @@ -10115,6 +10113,23 @@ dependencies = [ "vrl", ] +[[package]] +name = "vector-stream" +version = "0.1.0" +dependencies = [ + "async-stream", + "futures 0.3.28", + "futures-util", + "pin-project", + "tokio", + "tokio-util", + "tower", + "tracing 0.1.37", + "twox-hash", + "vector-common", + "vector-core", +] + [[package]] name = "vector-vrl-cli" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index ef08f56229f77..69fd0c5ff6d8f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -110,6 +110,7 @@ members = [ "lib/vector-config-macros", "lib/vector-core", "lib/vector-lookup", + "lib/vector-stream", "lib/vector-vrl/cli", "lib/vector-vrl/functions", "lib/vector-vrl/tests", @@ -144,6 +145,7 @@ vector-config = { path = "lib/vector-config" } vector-config-common = { path = "lib/vector-config-common" } vector-config-macros = { path = "lib/vector-config-macros" } vector-core = { path = "lib/vector-core", default-features = false, features = ["vrl"] } +vector-stream = { path = "lib/vector-stream" } vector-vrl-functions = { path = "lib/vector-vrl/functions" } loki-logproto = { path = "lib/loki-logproto", optional = true } diff --git a/lib/vector-core/Cargo.toml b/lib/vector-core/Cargo.toml index 0527cdbd8682c..817b6928744fe 100644 --- a/lib/vector-core/Cargo.toml +++ b/lib/vector-core/Cargo.toml @@ -7,7 +7,6 @@ publish = false [dependencies] async-graphql = { version = "5.0.10", default-features = false, features = ["playground" ], optional = true } -async-stream = { version = "0.3.5", default-features = false } async-trait = { version = "0.1", default-features = false } bitmask-enum = { version = "2.2.2", default-features = false } bytes = { version = "1.5.0", default-features = false, features = ["serde"] } @@ -53,13 +52,11 @@ tokio-stream = { version = "0.1", default-features = false, features = ["time"], tokio-util = { version = "0.7.0", default-features = false, features = ["time"] } toml = { version = "0.8.2", default-features = false } tonic = { version = "0.10", default-features = false, features = ["transport"] } -tower = { version = "0.4", default-features = false, features = ["util"] } tracing = { version = "0.1.34", default-features = false } tracing-core = { version = "0.1.26", default-features = false } tracing-log = { version = "0.1.3", default-features = false } tracing-subscriber = { version = "0.3.17", default-features = false, features = ["std"] } typetag = { version = "0.2.13", default-features = false } -twox-hash = { version = "1.6.3", default-features = false } url = { version = "2", default-features = false } vector-buffers = { path = "../vector-buffers", default-features = false } vector-common = { path = "../vector-common" } diff --git a/lib/vector-core/src/lib.rs b/lib/vector-core/src/lib.rs index a6b0488b018ab..399b270945110 100644 --- a/lib/vector-core/src/lib.rs +++ b/lib/vector-core/src/lib.rs @@ -36,7 +36,6 @@ pub mod schema; pub mod serde; pub mod sink; pub mod source; -pub mod stream; pub mod tcp; #[cfg(test)] mod test_util; diff --git a/lib/vector-core/src/time.rs b/lib/vector-core/src/time.rs index e4e4e4975ebe4..83e6203f8a95a 100644 --- a/lib/vector-core/src/time.rs +++ b/lib/vector-core/src/time.rs @@ -8,7 +8,7 @@ use std::task::{Context, Poll}; /// this trait represents the minimum functionality required to describe management of keyed timers /// for the types implemented in this crate that require such behavior. /// -/// Users can look at `vector_core::stream::batcher::ExpirationQueue` for a concrete implementation. +/// Users can look at `vector_stream::batcher::ExpirationQueue` for a concrete implementation. pub trait KeyedTimer { /// Clear the timer. /// @@ -32,6 +32,6 @@ pub trait KeyedTimer { /// Unlike a typical stream, returning `None` only indicates that the queue /// is empty, not that the queue will never return anything else in the future. /// - /// Used primarily for property testing vis-á-vis `vector_core::stream::batcher::Batcher`. + /// Used primarily for property testing vis-á-vis `vector_stream::batcher::Batcher`. fn poll_expired(&mut self, cx: &mut Context) -> Poll>; } diff --git a/lib/vector-stream/Cargo.toml b/lib/vector-stream/Cargo.toml new file mode 100644 index 0000000000000..102c260cac7c3 --- /dev/null +++ b/lib/vector-stream/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "vector-stream" +version = "0.1.0" +authors = ["Vector Contributors "] +edition = "2021" +publish = false + +[dependencies] +async-stream = { version = "0.3.5", default-features = false } +futures = { version = "0.3.28", default-features = false, features = ["std"] } +futures-util = { version = "0.3.28", default-features = false, features = ["std"] } +pin-project.workspace = true +tokio = { version = "1.33.0", default-features = false, features = ["net"] } +tokio-util = { version = "0.7.0", default-features = false, features = ["time"] } +tower = { version = "0.4", default-features = false, features = ["util"] } +tracing = { version = "0.1.34", default-features = false } +twox-hash = { version = "1.6.3", default-features = false } +vector-common = { path = "../vector-common" } +vector-core = { path = "../vector-core" } diff --git a/lib/vector-core/src/stream/batcher/config.rs b/lib/vector-stream/src/batcher/config.rs similarity index 100% rename from lib/vector-core/src/stream/batcher/config.rs rename to lib/vector-stream/src/batcher/config.rs diff --git a/lib/vector-core/src/stream/batcher/data.rs b/lib/vector-stream/src/batcher/data.rs similarity index 100% rename from lib/vector-core/src/stream/batcher/data.rs rename to lib/vector-stream/src/batcher/data.rs diff --git a/lib/vector-core/src/stream/batcher/limiter.rs b/lib/vector-stream/src/batcher/limiter.rs similarity index 97% rename from lib/vector-core/src/stream/batcher/limiter.rs rename to lib/vector-stream/src/batcher/limiter.rs index 6a04506857201..873c5bdd323b1 100644 --- a/lib/vector-core/src/stream/batcher/limiter.rs +++ b/lib/vector-stream/src/batcher/limiter.rs @@ -1,4 +1,6 @@ -use crate::{stream::batcher::data::BatchData, ByteSizeOf}; +use vector_core::ByteSizeOf; + +use crate::batcher::data::BatchData; pub trait BatchLimiter { type ItemMetadata; diff --git a/lib/vector-core/src/stream/batcher/mod.rs b/lib/vector-stream/src/batcher/mod.rs similarity index 100% rename from lib/vector-core/src/stream/batcher/mod.rs rename to lib/vector-stream/src/batcher/mod.rs diff --git a/lib/vector-core/src/stream/concurrent_map.rs b/lib/vector-stream/src/concurrent_map.rs similarity index 100% rename from lib/vector-core/src/stream/concurrent_map.rs rename to lib/vector-stream/src/concurrent_map.rs diff --git a/lib/vector-core/src/stream/driver.rs b/lib/vector-stream/src/driver.rs similarity index 99% rename from lib/vector-core/src/stream/driver.rs rename to lib/vector-stream/src/driver.rs index 73db71d33f31b..3d0094846fa81 100644 --- a/lib/vector-core/src/stream/driver.rs +++ b/lib/vector-stream/src/driver.rs @@ -9,13 +9,13 @@ use vector_common::internal_event::{ RegisteredEventCache, SharedString, TaggedEventsSent, }; use vector_common::request_metadata::{GroupedCountByteSize, MetaDescriptive}; - -use super::FuturesUnorderedCount; -use crate::{ +use vector_core::{ event::{EventFinalizers, EventStatus, Finalizable}, internal_event::emit, }; +use super::FuturesUnorderedCount; + pub trait DriverResponse { fn event_status(&self) -> EventStatus; fn events_sent(&self) -> &GroupedCountByteSize; diff --git a/lib/vector-core/src/stream/expiration_map.rs b/lib/vector-stream/src/expiration_map.rs similarity index 100% rename from lib/vector-core/src/stream/expiration_map.rs rename to lib/vector-stream/src/expiration_map.rs diff --git a/lib/vector-core/src/stream/futures_unordered_count.rs b/lib/vector-stream/src/futures_unordered_count.rs similarity index 100% rename from lib/vector-core/src/stream/futures_unordered_count.rs rename to lib/vector-stream/src/futures_unordered_count.rs diff --git a/lib/vector-core/src/stream/mod.rs b/lib/vector-stream/src/lib.rs similarity index 90% rename from lib/vector-core/src/stream/mod.rs rename to lib/vector-stream/src/lib.rs index 64e4edc3247f9..9d2a2d81e45fc 100644 --- a/lib/vector-core/src/stream/mod.rs +++ b/lib/vector-stream/src/lib.rs @@ -9,3 +9,6 @@ pub use concurrent_map::ConcurrentMap; pub use driver::{Driver, DriverResponse}; use futures_unordered_count::FuturesUnorderedCount; pub use partitioned_batcher::{BatcherSettings, ExpirationQueue, PartitionedBatcher}; + +#[macro_use] +extern crate tracing; diff --git a/lib/vector-core/src/stream/partitioned_batcher.rs b/lib/vector-stream/src/partitioned_batcher.rs similarity index 98% rename from lib/vector-core/src/stream/partitioned_batcher.rs rename to lib/vector-stream/src/partitioned_batcher.rs index f7d0d55b9556f..fabe192b78406 100644 --- a/lib/vector-core/src/stream/partitioned_batcher.rs +++ b/lib/vector-stream/src/partitioned_batcher.rs @@ -11,20 +11,15 @@ use futures::stream::{Fuse, Stream, StreamExt}; use pin_project::pin_project; use tokio_util::time::{delay_queue::Key, DelayQueue}; use twox_hash::XxHash64; +use vector_core::{partition::Partitioner, time::KeyedTimer, ByteSizeOf}; -use crate::{ - partition::Partitioner, - stream::batcher::{ - config::BatchConfigParts, - data::BatchReduce, - limiter::{ByteSizeOfItemSize, ItemBatchSize, SizeLimit}, - }, - time::KeyedTimer, - ByteSizeOf, +use crate::batcher::{ + config::BatchConfigParts, + data::BatchReduce, + limiter::{ByteSizeOfItemSize, ItemBatchSize, SizeLimit}, + BatchConfig, }; -use super::batcher::BatchConfig; - /// A `KeyedTimer` based on `DelayQueue`. pub struct ExpirationQueue { /// The timeout to give each new key entry diff --git a/src/sinks/appsignal/service.rs b/src/sinks/appsignal/service.rs index 959f573f752d0..188972eae7963 100644 --- a/src/sinks/appsignal/service.rs +++ b/src/sinks/appsignal/service.rs @@ -13,7 +13,7 @@ use vector_common::{ finalization::EventStatus, request_metadata::GroupedCountByteSize, request_metadata::MetaDescriptive, sensitive_string::SensitiveString, }; -use vector_core::stream::DriverResponse; +use vector_stream::DriverResponse; use crate::{ http::HttpClient, diff --git a/src/sinks/aws_cloudwatch_logs/service.rs b/src/sinks/aws_cloudwatch_logs/service.rs index 9b02493536017..0df88c82bf565 100644 --- a/src/sinks/aws_cloudwatch_logs/service.rs +++ b/src/sinks/aws_cloudwatch_logs/service.rs @@ -26,7 +26,7 @@ use vector_common::{ finalization::EventStatus, request_metadata::{GroupedCountByteSize, MetaDescriptive}, }; -use vector_core::stream::DriverResponse; +use vector_stream::DriverResponse; use crate::sinks::{ aws_cloudwatch_logs::{ diff --git a/src/sinks/aws_cloudwatch_logs/sink.rs b/src/sinks/aws_cloudwatch_logs/sink.rs index 5d71f8d0b605e..b734774bfbb9b 100644 --- a/src/sinks/aws_cloudwatch_logs/sink.rs +++ b/src/sinks/aws_cloudwatch_logs/sink.rs @@ -5,11 +5,8 @@ use chrono::{Duration, Utc}; use futures::{future, stream::BoxStream, StreamExt}; use tower::Service; use vector_common::request_metadata::{MetaDescriptive, RequestMetadata}; -use vector_core::{ - partition::Partitioner, - sink::StreamSink, - stream::{BatcherSettings, DriverResponse}, -}; +use vector_core::{partition::Partitioner, sink::StreamSink}; +use vector_stream::{BatcherSettings, DriverResponse}; use crate::{ event::{Event, EventFinalizers, Finalizable}, diff --git a/src/sinks/aws_kinesis/config.rs b/src/sinks/aws_kinesis/config.rs index 1b3292afd29dd..f294025b23d60 100644 --- a/src/sinks/aws_kinesis/config.rs +++ b/src/sinks/aws_kinesis/config.rs @@ -1,7 +1,7 @@ use lookup::lookup_v2::ConfigValuePath; use std::marker::PhantomData; -use vector_core::stream::BatcherSettings; +use vector_stream::BatcherSettings; use crate::{ aws::{AwsAuthentication, RegionOrEndpoint}, diff --git a/src/sinks/aws_s_s/service.rs b/src/sinks/aws_s_s/service.rs index 8d5fd53c17dbd..4d4d1f88832f1 100644 --- a/src/sinks/aws_s_s/service.rs +++ b/src/sinks/aws_s_s/service.rs @@ -5,7 +5,8 @@ use aws_sdk_sqs::types::SdkError; use futures::future::BoxFuture; use tower::Service; use vector_common::request_metadata::GroupedCountByteSize; -use vector_core::{event::EventStatus, stream::DriverResponse, ByteSizeOf}; +use vector_core::{event::EventStatus, ByteSizeOf}; +use vector_stream::DriverResponse; use super::{client::Client, request_builder::SendMessageEntry}; diff --git a/src/sinks/azure_common/config.rs b/src/sinks/azure_common/config.rs index f23a8f0bde3b1..1fa8494221e3e 100644 --- a/src/sinks/azure_common/config.rs +++ b/src/sinks/azure_common/config.rs @@ -12,7 +12,7 @@ use vector_common::{ json_size::JsonSize, request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata}, }; -use vector_core::stream::DriverResponse; +use vector_stream::DriverResponse; use crate::{ event::{EventFinalizers, EventStatus, Finalizable}, diff --git a/src/sinks/databend/service.rs b/src/sinks/databend/service.rs index 05a8b4629a767..463c880749b3d 100644 --- a/src/sinks/databend/service.rs +++ b/src/sinks/databend/service.rs @@ -10,7 +10,7 @@ use snafu::Snafu; use tower::Service; use vector_common::finalization::{EventFinalizers, EventStatus, Finalizable}; use vector_common::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata}; -use vector_core::stream::DriverResponse; +use vector_stream::DriverResponse; use crate::{internal_events::EndpointBytesSent, sinks::util::retries::RetryLogic}; diff --git a/src/sinks/datadog/events/service.rs b/src/sinks/datadog/events/service.rs index 0dfeb4c5b9f69..89035116390db 100644 --- a/src/sinks/datadog/events/service.rs +++ b/src/sinks/datadog/events/service.rs @@ -9,7 +9,7 @@ use http::Request; use hyper::Body; use tower::{Service, ServiceExt}; use vector_common::request_metadata::{GroupedCountByteSize, MetaDescriptive}; -use vector_core::stream::DriverResponse; +use vector_stream::DriverResponse; use crate::{ event::EventStatus, diff --git a/src/sinks/datadog/logs/service.rs b/src/sinks/datadog/logs/service.rs index 09e9a81490370..6c80fbc8a482d 100644 --- a/src/sinks/datadog/logs/service.rs +++ b/src/sinks/datadog/logs/service.rs @@ -15,10 +15,8 @@ use indexmap::IndexMap; use tower::Service; use tracing::Instrument; use vector_common::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata}; -use vector_core::{ - event::{EventFinalizers, EventStatus, Finalizable}, - stream::DriverResponse, -}; +use vector_core::event::{EventFinalizers, EventStatus, Finalizable}; +use vector_stream::DriverResponse; use crate::{ http::HttpClient, diff --git a/src/sinks/datadog/metrics/service.rs b/src/sinks/datadog/metrics/service.rs index 7f62e6ddaefd5..f51728cb2cf18 100644 --- a/src/sinks/datadog/metrics/service.rs +++ b/src/sinks/datadog/metrics/service.rs @@ -11,10 +11,8 @@ use hyper::Body; use snafu::ResultExt; use tower::Service; use vector_common::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata}; -use vector_core::{ - event::{EventFinalizers, EventStatus, Finalizable}, - stream::DriverResponse, -}; +use vector_core::event::{EventFinalizers, EventStatus, Finalizable}; +use vector_stream::DriverResponse; use crate::{ http::{BuildRequestSnafu, CallRequestSnafu, HttpClient}, diff --git a/src/sinks/datadog/metrics/sink.rs b/src/sinks/datadog/metrics/sink.rs index 0e5fc962b0386..329b46ea9745c 100644 --- a/src/sinks/datadog/metrics/sink.rs +++ b/src/sinks/datadog/metrics/sink.rs @@ -12,8 +12,8 @@ use vector_core::{ event::{Event, Metric, MetricValue}, partition::Partitioner, sink::StreamSink, - stream::{BatcherSettings, DriverResponse}, }; +use vector_stream::{BatcherSettings, DriverResponse}; use super::{ config::DatadogMetricsEndpoint, normalizer::DatadogMetricsNormalizer, diff --git a/src/sinks/datadog/traces/service.rs b/src/sinks/datadog/traces/service.rs index 5128d855edb0f..01f820f841431 100644 --- a/src/sinks/datadog/traces/service.rs +++ b/src/sinks/datadog/traces/service.rs @@ -10,10 +10,8 @@ use hyper::Body; use snafu::ResultExt; use tower::Service; use vector_common::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata}; -use vector_core::{ - event::{EventFinalizers, EventStatus, Finalizable}, - stream::DriverResponse, -}; +use vector_core::event::{EventFinalizers, EventStatus, Finalizable}; +use vector_stream::DriverResponse; use crate::{ http::{BuildRequestSnafu, CallRequestSnafu, HttpClient, HttpError}, diff --git a/src/sinks/datadog/traces/sink.rs b/src/sinks/datadog/traces/sink.rs index 384d7ef635022..57735ca1699d1 100644 --- a/src/sinks/datadog/traces/sink.rs +++ b/src/sinks/datadog/traces/sink.rs @@ -7,17 +7,11 @@ use futures_util::{ }; use tokio::sync::oneshot::{channel, Sender}; use tower::Service; +use vector_core::{config::log_schema, event::Event, partition::Partitioner, sink::StreamSink}; +use vector_stream::{BatcherSettings, DriverResponse}; use vrl::event_path; use vrl::path::PathPrefix; -use vector_core::{ - config::log_schema, - event::Event, - partition::Partitioner, - sink::StreamSink, - stream::{BatcherSettings, DriverResponse}, -}; - use crate::{ internal_events::DatadogTracesEncodingError, sinks::{datadog::traces::request_builder::DatadogTracesRequestBuilder, util::SinkBuilderExt}, diff --git a/src/sinks/elasticsearch/service.rs b/src/sinks/elasticsearch/service.rs index 909b5a35acd0b..95bc4fdd1be60 100644 --- a/src/sinks/elasticsearch/service.rs +++ b/src/sinks/elasticsearch/service.rs @@ -13,8 +13,10 @@ use vector_common::{ json_size::JsonSize, request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata}, }; -use vector_core::{stream::DriverResponse, ByteSizeOf}; +use vector_core::ByteSizeOf; +use vector_stream::DriverResponse; +use super::{ElasticsearchCommon, ElasticsearchConfig}; use crate::{ event::{EventFinalizers, EventStatus, Finalizable}, http::HttpClient, @@ -25,8 +27,6 @@ use crate::{ }, }; -use super::{ElasticsearchCommon, ElasticsearchConfig}; - #[derive(Clone, Debug)] pub struct ElasticsearchRequest { pub payload: Bytes, diff --git a/src/sinks/gcs_common/service.rs b/src/sinks/gcs_common/service.rs index 9a75203629dfb..455df2eb919fe 100644 --- a/src/sinks/gcs_common/service.rs +++ b/src/sinks/gcs_common/service.rs @@ -9,7 +9,7 @@ use http::{ use hyper::Body; use tower::Service; use vector_common::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata}; -use vector_core::stream::DriverResponse; +use vector_stream::DriverResponse; use crate::{ event::{EventFinalizers, EventStatus, Finalizable}, diff --git a/src/sinks/greptimedb/batch.rs b/src/sinks/greptimedb/batch.rs index af4ef01096dfb..d5bf70c47505f 100644 --- a/src/sinks/greptimedb/batch.rs +++ b/src/sinks/greptimedb/batch.rs @@ -1,7 +1,5 @@ -use vector_core::{ - event::{Metric, MetricValue}, - stream::batcher::limiter::ItemBatchSize, -}; +use vector_core::event::{Metric, MetricValue}; +use vector_stream::batcher::limiter::ItemBatchSize; use super::request_builder::{ DISTRIBUTION_QUANTILES, DISTRIBUTION_STAT_FIELD_COUNT, SUMMARY_STAT_FIELD_COUNT, diff --git a/src/sinks/http/batch.rs b/src/sinks/http/batch.rs index 38718631fadd7..244d90e20df44 100644 --- a/src/sinks/http/batch.rs +++ b/src/sinks/http/batch.rs @@ -1,9 +1,8 @@ //! Batch settings for the `http` sink. use codecs::encoding::Framer; -use vector_core::{ - event::Event, stream::batcher::limiter::ItemBatchSize, ByteSizeOf, EstimatedJsonEncodedSizeOf, -}; +use vector_core::{event::Event, ByteSizeOf, EstimatedJsonEncodedSizeOf}; +use vector_stream::batcher::limiter::ItemBatchSize; use crate::codecs::Encoder; diff --git a/src/sinks/prelude.rs b/src/sinks/prelude.rs index d118545537604..100811537062c 100644 --- a/src/sinks/prelude.rs +++ b/src/sinks/prelude.rs @@ -1,6 +1,28 @@ //! Prelude module for sinks which will re-export the symbols that most //! stream based sinks are likely to use. +pub use async_trait::async_trait; +pub use futures::{future, future::BoxFuture, stream::BoxStream, FutureExt, StreamExt}; +pub use tower::{Service, ServiceBuilder}; +pub use vector_buffers::EventCount; +pub use vector_common::{ + finalization::{EventFinalizers, EventStatus, Finalizable}, + internal_event::{CountByteSize, TaggedEventsSent}, + json_size::JsonSize, + request_metadata::{GetEventCountTags, GroupedCountByteSize, MetaDescriptive, RequestMetadata}, +}; +pub use vector_config::configurable_component; +pub use vector_core::{ + config::{telemetry, AcknowledgementsConfig, Input}, + event::Value, + partition::Partitioner, + schema::Requirement, + sink::{StreamSink, VectorSink}, + tls::TlsSettings, + ByteSizeOf, EstimatedJsonEncodedSizeOf, +}; +pub use vector_stream::{BatcherSettings, DriverResponse}; + pub use crate::{ codecs::{Encoder, EncodingConfig, Transformer}, components::validation::{ @@ -27,25 +49,3 @@ pub use crate::{ template::{Template, TemplateParseError}, tls::TlsConfig, }; -pub use async_trait::async_trait; -pub use futures::{future, future::BoxFuture, stream::BoxStream, FutureExt, StreamExt}; -pub use tower::{Service, ServiceBuilder}; -pub use vector_buffers::EventCount; -pub use vector_common::{ - finalization::{EventFinalizers, EventStatus, Finalizable}, - internal_event::{CountByteSize, TaggedEventsSent}, - json_size::JsonSize, - request_metadata::{GetEventCountTags, GroupedCountByteSize, MetaDescriptive, RequestMetadata}, -}; -pub use vector_config::configurable_component; - -pub use vector_core::{ - config::{telemetry, AcknowledgementsConfig, Input}, - event::Value, - partition::Partitioner, - schema::Requirement, - sink::{StreamSink, VectorSink}, - stream::{BatcherSettings, DriverResponse}, - tls::TlsSettings, - ByteSizeOf, EstimatedJsonEncodedSizeOf, -}; diff --git a/src/sinks/s3_common/service.rs b/src/sinks/s3_common/service.rs index 5ad3bea516d5c..a13ebdfb001b1 100644 --- a/src/sinks/s3_common/service.rs +++ b/src/sinks/s3_common/service.rs @@ -12,10 +12,8 @@ use md5::Digest; use tower::Service; use tracing::Instrument; use vector_common::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata}; -use vector_core::{ - event::{EventFinalizers, EventStatus, Finalizable}, - stream::DriverResponse, -}; +use vector_core::event::{EventFinalizers, EventStatus, Finalizable}; +use vector_stream::DriverResponse; use super::config::S3Options; use super::partitioner::S3PartitionKey; diff --git a/src/sinks/splunk_hec/common/response.rs b/src/sinks/splunk_hec/common/response.rs index 16a7b74abc1ab..413d0b7f5e7f1 100644 --- a/src/sinks/splunk_hec/common/response.rs +++ b/src/sinks/splunk_hec/common/response.rs @@ -1,5 +1,6 @@ use vector_common::request_metadata::GroupedCountByteSize; -use vector_core::{event::EventStatus, stream::DriverResponse}; +use vector_core::event::EventStatus; +use vector_stream::DriverResponse; pub struct HecResponse { pub event_status: EventStatus, diff --git a/src/sinks/statsd/batch.rs b/src/sinks/statsd/batch.rs index 23504e146ca46..9aefaed21f5e0 100644 --- a/src/sinks/statsd/batch.rs +++ b/src/sinks/statsd/batch.rs @@ -1,4 +1,5 @@ -use vector_core::{event::Metric, stream::batcher::limiter::ItemBatchSize}; +use vector_core::event::Metric; +use vector_stream::batcher::limiter::ItemBatchSize; // This accounts for the separators, the metric type string, the length of the value itself. It can // never be too small, as the above values will always take at least 4 bytes. diff --git a/src/sinks/statsd/service.rs b/src/sinks/statsd/service.rs index 5ab5e6092e7ed..f04ac83ad9f8f 100644 --- a/src/sinks/statsd/service.rs +++ b/src/sinks/statsd/service.rs @@ -6,7 +6,7 @@ use vector_common::{ finalization::{EventFinalizers, EventStatus, Finalizable}, request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata}, }; -use vector_core::stream::DriverResponse; +use vector_stream::DriverResponse; /// Generalized request for sending metrics to a StatsD endpoint. #[derive(Clone, Debug)] diff --git a/src/sinks/statsd/sink.rs b/src/sinks/statsd/sink.rs index f5c77e91b4f86..3b79f177022ea 100644 --- a/src/sinks/statsd/sink.rs +++ b/src/sinks/statsd/sink.rs @@ -7,11 +7,8 @@ use futures_util::{ }; use tower::Service; use vector_common::internal_event::Protocol; -use vector_core::{ - event::Event, - sink::StreamSink, - stream::{BatcherSettings, DriverResponse}, -}; +use vector_core::{event::Event, sink::StreamSink}; +use vector_stream::{BatcherSettings, DriverResponse}; use crate::sinks::util::SinkBuilderExt; diff --git a/src/sinks/util/batch.rs b/src/sinks/util/batch.rs index 13f911abacffd..f0a9a896b8131 100644 --- a/src/sinks/util/batch.rs +++ b/src/sinks/util/batch.rs @@ -5,7 +5,7 @@ use serde_with::serde_as; use snafu::Snafu; use vector_common::json_size::JsonSize; use vector_config::configurable_component; -use vector_core::stream::BatcherSettings; +use vector_stream::BatcherSettings; use super::EncodedEvent; use crate::{event::EventFinalizers, internal_events::LargeEventDroppedError}; diff --git a/src/sinks/util/builder.rs b/src/sinks/util/builder.rs index a25f0badb6f70..7e5fb0d8ca954 100644 --- a/src/sinks/util/builder.rs +++ b/src/sinks/util/builder.rs @@ -15,12 +15,12 @@ use tower::Service; use vector_core::{ event::{Finalizable, Metric}, partition::Partitioner, - stream::{ - batcher::{config::BatchConfig, Batcher}, - ConcurrentMap, Driver, DriverResponse, ExpirationQueue, PartitionedBatcher, - }, ByteSizeOf, }; +use vector_stream::{ + batcher::{config::BatchConfig, Batcher}, + ConcurrentMap, Driver, DriverResponse, ExpirationQueue, PartitionedBatcher, +}; use super::{ buffer::metrics::MetricNormalize, IncrementalRequestBuilder, Normalizer, RequestBuilder, diff --git a/src/sinks/util/http.rs b/src/sinks/util/http.rs index 435a28c9f685f..eb67ca9772ba3 100644 --- a/src/sinks/util/http.rs +++ b/src/sinks/util/http.rs @@ -20,9 +20,8 @@ use snafu::{ResultExt, Snafu}; use tower::{Service, ServiceBuilder}; use tower_http::decompression::DecompressionLayer; use vector_config::configurable_component; -use vector_core::{ - stream::batcher::limiter::ItemBatchSize, ByteSizeOf, EstimatedJsonEncodedSizeOf, -}; +use vector_core::{ByteSizeOf, EstimatedJsonEncodedSizeOf}; +use vector_stream::batcher::limiter::ItemBatchSize; use super::{ retries::{RetryAction, RetryLogic}, diff --git a/src/sinks/vector/service.rs b/src/sinks/vector/service.rs index a89271d4f928d..5e8b278866e02 100644 --- a/src/sinks/vector/service.rs +++ b/src/sinks/vector/service.rs @@ -9,7 +9,7 @@ use prost::Message; use tonic::{body::BoxBody, IntoRequest}; use tower::Service; use vector_common::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata}; -use vector_core::stream::DriverResponse; +use vector_stream::DriverResponse; use super::VectorSinkError; use crate::{ diff --git a/src/sinks/vector/sink.rs b/src/sinks/vector/sink.rs index fe77de6837883..8d6527326d5b4 100644 --- a/src/sinks/vector/sink.rs +++ b/src/sinks/vector/sink.rs @@ -5,11 +5,8 @@ use futures::{stream::BoxStream, StreamExt}; use prost::Message; use tower::Service; use vector_common::request_metadata::GroupedCountByteSize; -use vector_core::{ - config::telemetry, - stream::{BatcherSettings, DriverResponse}, - ByteSizeOf, EstimatedJsonEncodedSizeOf, -}; +use vector_core::{config::telemetry, ByteSizeOf, EstimatedJsonEncodedSizeOf}; +use vector_stream::{BatcherSettings, DriverResponse}; use super::service::VectorRequest; use crate::{ diff --git a/src/sources/kubernetes_logs/partial_events_merger.rs b/src/sources/kubernetes_logs/partial_events_merger.rs index 3c94f05c5cd53..1151993b9050b 100644 --- a/src/sources/kubernetes_logs/partial_events_merger.rs +++ b/src/sources/kubernetes_logs/partial_events_merger.rs @@ -1,9 +1,12 @@ #![deny(missing_docs)] +use bytes::BytesMut; use futures::{Stream, StreamExt}; +use lookup::OwnedTargetPath; use std::collections::HashMap; use std::time::{Duration, Instant}; use vector_core::config::LogNamespace; +use vector_stream::expiration_map::{map_with_expiration, Emitter}; use vrl::owned_value_path; use crate::event; @@ -15,10 +18,6 @@ const FILE_KEY: &str = "file"; const EXPIRATION_TIME: Duration = Duration::from_secs(30); -use bytes::BytesMut; -use lookup::OwnedTargetPath; -use vector_core::stream::expiration_map::{map_with_expiration, Emitter}; - struct PartialEventMergeState { buckets: HashMap, } diff --git a/src/transforms/reduce/mod.rs b/src/transforms/reduce/mod.rs index ba0ad8ffbd0be..d1f318b990218 100644 --- a/src/transforms/reduce/mod.rs +++ b/src/transforms/reduce/mod.rs @@ -28,7 +28,7 @@ use crate::config::schema::Definition; use crate::event::Value; pub use merge_strategy::*; use vector_core::config::LogNamespace; -use vector_core::stream::expiration_map::{map_with_expiration, Emitter}; +use vector_stream::expiration_map::{map_with_expiration, Emitter}; use vrl::value::kind::Collection; use vrl::value::Kind; From a8758d8e1c201ae5ea51dbccf1ce53a16501a72a Mon Sep 17 00:00:00 2001 From: Bruce Guenter Date: Fri, 20 Oct 2023 15:10:50 -0600 Subject: [PATCH 2/2] Fixes --- Cargo.lock | 3 +++ lib/vector-stream/Cargo.toml | 5 +++++ lib/vector-stream/src/batcher/mod.rs | 2 +- lib/vector-stream/src/driver.rs | 3 ++- lib/vector-stream/src/lib.rs | 12 ++++++++++++ lib/vector-stream/src/partitioned_batcher.rs | 10 ++++------ 6 files changed, 27 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7b58d53791191..1c58ab181ba99 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10121,6 +10121,9 @@ dependencies = [ "futures 0.3.28", "futures-util", "pin-project", + "proptest", + "rand 0.8.5", + "rand_distr", "tokio", "tokio-util", "tower", diff --git a/lib/vector-stream/Cargo.toml b/lib/vector-stream/Cargo.toml index 102c260cac7c3..a5e7cfc926a0b 100644 --- a/lib/vector-stream/Cargo.toml +++ b/lib/vector-stream/Cargo.toml @@ -17,3 +17,8 @@ tracing = { version = "0.1.34", default-features = false } twox-hash = { version = "1.6.3", default-features = false } vector-common = { path = "../vector-common" } vector-core = { path = "../vector-core" } + +[dev-dependencies] +proptest = "1.3" +rand = "0.8.5" +rand_distr = "0.4.3" diff --git a/lib/vector-stream/src/batcher/mod.rs b/lib/vector-stream/src/batcher/mod.rs index bfb20e95e4160..cd2cb6aea1012 100644 --- a/lib/vector-stream/src/batcher/mod.rs +++ b/lib/vector-stream/src/batcher/mod.rs @@ -119,7 +119,7 @@ mod test { use futures::stream; use super::*; - use crate::stream::BatcherSettings; + use crate::BatcherSettings; #[tokio::test] async fn item_limit() { diff --git a/lib/vector-stream/src/driver.rs b/lib/vector-stream/src/driver.rs index 3d0094846fa81..3de357bdb134c 100644 --- a/lib/vector-stream/src/driver.rs +++ b/lib/vector-stream/src/driver.rs @@ -293,7 +293,7 @@ mod tests { } impl Finalizable for DelayRequest { - fn take_finalizers(&mut self) -> crate::event::EventFinalizers { + fn take_finalizers(&mut self) -> vector_core::event::EventFinalizers { std::mem::take(&mut self.1) } } @@ -365,6 +365,7 @@ mod tests { let upper = self.upper_bound_us; // Generate a value between 10ms and 500ms, with a long tail shape to the distribution. + #[allow(clippy::cast_sign_loss)] // Value will be positive anyways self.jitter .sample_iter(&mut self.jitter_gen) .map(|n| n * lower as f64) diff --git a/lib/vector-stream/src/lib.rs b/lib/vector-stream/src/lib.rs index 9d2a2d81e45fc..6014bb60e5d5e 100644 --- a/lib/vector-stream/src/lib.rs +++ b/lib/vector-stream/src/lib.rs @@ -1,3 +1,15 @@ +#![deny(warnings)] +#![deny(clippy::all)] +#![deny(clippy::pedantic)] +#![deny(unreachable_pub)] +#![deny(unused_allocation)] +#![deny(unused_extern_crates)] +#![deny(unused_assignments)] +#![deny(unused_comparisons)] +#![allow(clippy::module_name_repetitions)] +#![allow(clippy::must_use_candidate)] +#![allow(clippy::type_complexity)] + pub mod batcher; mod concurrent_map; mod driver; diff --git a/lib/vector-stream/src/partitioned_batcher.rs b/lib/vector-stream/src/partitioned_batcher.rs index fabe192b78406..0d73cc5267098 100644 --- a/lib/vector-stream/src/partitioned_batcher.rs +++ b/lib/vector-stream/src/partitioned_batcher.rs @@ -346,6 +346,7 @@ where } } +#[allow(clippy::cast_sign_loss)] #[cfg(test)] mod test { use std::{ @@ -360,14 +361,11 @@ mod test { use pin_project::pin_project; use proptest::prelude::*; use tokio::{pin, time::advance}; + use vector_core::{partition::Partitioner, time::KeyedTimer}; use crate::{ - partition::Partitioner, - stream::{ - partitioned_batcher::{ExpirationQueue, PartitionedBatcher}, - BatcherSettings, - }, - time::KeyedTimer, + partitioned_batcher::{ExpirationQueue, PartitionedBatcher}, + BatcherSettings, }; #[derive(Debug)]