Skip to content

Commit

Permalink
chore(gcp_stackdriver_metrics sink): rewrite to stream based sink (ve…
Browse files Browse the repository at this point in the history
…ctordotdev#18749)

* Move to stream based sink

Signed-off-by: Stephen Wakely <[email protected]>

* Added tests

Signed-off-by: Stephen Wakely <[email protected]>

* Aggregate metrics

Signed-off-by: Stephen Wakely <[email protected]>

* Remove aggregation

Signed-off-by: Stephen Wakely <[email protected]>

* Feedback from Kyle

Signed-off-by: Stephen Wakely <[email protected]>

---------

Signed-off-by: Stephen Wakely <[email protected]>
  • Loading branch information
StephenWakely authored Oct 12, 2023
1 parent f2efb1a commit 92268e4
Show file tree
Hide file tree
Showing 9 changed files with 703 additions and 317 deletions.
8 changes: 4 additions & 4 deletions src/sinks/gcp/chronicle_unstructured.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ impl ChronicleUnstructuredConfig {
.settings(request, GcsRetryLogic)
.service(ChronicleService::new(client, base_url, creds));

let request_settings = RequestSettings::new(self)?;
let request_settings = ChronicleRequestBuilder::new(self)?;

let sink = GcsSink::new(svc, request_settings, partitioner, batch_settings, "http");

Expand Down Expand Up @@ -362,7 +362,7 @@ impl Encoder<(String, Vec<Event>)> for ChronicleEncoder {
// request. All possible values are pre-computed for direct use in
// producing a request.
#[derive(Clone, Debug)]
struct RequestSettings {
struct ChronicleRequestBuilder {
encoder: ChronicleEncoder,
}

Expand All @@ -382,7 +382,7 @@ impl AsRef<[u8]> for ChronicleRequestPayload {
}
}

impl RequestBuilder<(String, Vec<Event>)> for RequestSettings {
impl RequestBuilder<(String, Vec<Event>)> for ChronicleRequestBuilder {
type Metadata = EventFinalizers;
type Events = (String, Vec<Event>);
type Encoder = ChronicleEncoder;
Expand Down Expand Up @@ -423,7 +423,7 @@ impl RequestBuilder<(String, Vec<Event>)> for RequestSettings {
}
}

impl RequestSettings {
impl ChronicleRequestBuilder {
fn new(config: &ChronicleUnstructuredConfig) -> crate::Result<Self> {
let transformer = config.encoding.transformer();
let serializer = config.encoding.config().build()?;
Expand Down
9 changes: 4 additions & 5 deletions src/sinks/gcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ pub mod chronicle_unstructured;
pub mod cloud_storage;
pub mod pubsub;
pub mod stackdriver;
pub mod stackdriver_metrics;

/// A monitored resource.
///
Expand Down Expand Up @@ -103,18 +102,18 @@ pub struct GcpResource {

#[derive(Serialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct GcpSerie<'a> {
pub struct GcpSerie {
pub metric: GcpMetric,
pub resource: GcpResource,
pub metric_kind: GcpMetricKind,
pub value_type: GcpValueType,
pub points: &'a [GcpPoint],
pub points: Vec<GcpPoint>,
}

#[derive(Serialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct GcpSeries<'a> {
time_series: &'a [GcpSerie<'a>],
time_series: &'a [GcpSerie],
}

fn serialize_int64_value<S>(value: &Option<i64>, serializer: S) -> Result<S::Ok, S::Error>
Expand Down Expand Up @@ -180,7 +179,7 @@ mod tests {
},
metric_kind: GcpMetricKind::Gauge,
value_type: GcpValueType::Int64,
points: &[GcpPoint {
points: vec![GcpPoint {
interval: GcpInterval {
start_time: None,
end_time,
Expand Down
168 changes: 168 additions & 0 deletions src/sinks/gcp/stackdriver/metrics/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
use bytes::Bytes;
use goauth::scopes::Scope;
use http::{Request, Uri};

use crate::{
gcp::{GcpAuthConfig, GcpAuthenticator},
http::HttpClient,
sinks::{
gcp,
prelude::*,
util::http::{http_response_retry_logic, HttpService, HttpServiceRequestBuilder},
},
};

use super::{
request_builder::{StackdriverMetricsEncoder, StackdriverMetricsRequestBuilder},
sink::StackdriverMetricsSink,
};

/// Configuration for the `gcp_stackdriver_metrics` sink.
#[configurable_component(sink(
"gcp_stackdriver_metrics",
"Deliver metrics to GCP's Cloud Monitoring system."
))]
#[derive(Clone, Debug, Default)]
pub struct StackdriverConfig {
#[serde(skip, default = "default_endpoint")]
pub(super) endpoint: String,

/// The project ID to which to publish metrics.
///
/// See the [Google Cloud Platform project management documentation][project_docs] for more details.
///
/// [project_docs]: https://cloud.google.com/resource-manager/docs/creating-managing-projects
pub(super) project_id: String,

/// The monitored resource to associate the metrics with.
pub(super) resource: gcp::GcpTypedResource,

#[serde(flatten)]
pub(super) auth: GcpAuthConfig,

/// The default namespace to use for metrics that do not have one.
///
/// Metrics with the same name can only be differentiated by their namespace, and not all
/// metrics have their own namespace.
#[serde(default = "default_metric_namespace_value")]
pub(super) default_namespace: String,

#[configurable(derived)]
#[serde(default)]
pub(super) request: TowerRequestConfig,

#[configurable(derived)]
#[serde(default)]
pub(super) batch: BatchConfig<StackdriverMetricsDefaultBatchSettings>,

#[configurable(derived)]
pub(super) tls: Option<TlsConfig>,

#[configurable(derived)]
#[serde(
default,
deserialize_with = "crate::serde::bool_or_struct",
skip_serializing_if = "crate::serde::skip_serializing_if_default"
)]
pub(super) acknowledgements: AcknowledgementsConfig,
}

fn default_metric_namespace_value() -> String {
"namespace".to_string()
}

fn default_endpoint() -> String {
"https://monitoring.googleapis.com".to_string()
}

impl_generate_config_from_default!(StackdriverConfig);

#[async_trait::async_trait]
#[typetag::serde(name = "gcp_stackdriver_metrics")]
impl SinkConfig for StackdriverConfig {
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
let auth = self.auth.build(Scope::MonitoringWrite).await?;

let healthcheck = healthcheck().boxed();
let started = chrono::Utc::now();
let tls_settings = TlsSettings::from_options(&self.tls)?;
let client = HttpClient::new(tls_settings, cx.proxy())?;

let batch_settings = self.batch.validate()?.into_batcher_settings()?;

let request_builder = StackdriverMetricsRequestBuilder {
encoder: StackdriverMetricsEncoder {
default_namespace: self.default_namespace.clone(),
started,
resource: self.resource.clone(),
},
};

let request_limits = self.request.unwrap_with(
&TowerRequestConfig::default()
.rate_limit_duration_secs(1)
.rate_limit_num(1000),
);

let uri: Uri = format!(
"{}/v3/projects/{}/timeSeries",
self.endpoint, self.project_id
)
.parse()?;

auth.spawn_regenerate_token();

let stackdriver_metrics_service_request_builder =
StackdriverMetricsServiceRequestBuilder { uri, auth };

let service = HttpService::new(client, stackdriver_metrics_service_request_builder);

let service = ServiceBuilder::new()
.settings(request_limits, http_response_retry_logic())
.service(service);

let sink = StackdriverMetricsSink::new(service, batch_settings, request_builder);

Ok((VectorSink::from_event_streamsink(sink), healthcheck))
}

fn input(&self) -> Input {
Input::metric()
}

fn acknowledgements(&self) -> &AcknowledgementsConfig {
&self.acknowledgements
}
}

#[derive(Clone, Copy, Debug, Default)]
pub struct StackdriverMetricsDefaultBatchSettings;

impl SinkBatchSettings for StackdriverMetricsDefaultBatchSettings {
const MAX_EVENTS: Option<usize> = Some(1);
const MAX_BYTES: Option<usize> = None;
const TIMEOUT_SECS: f64 = 1.0;
}

#[derive(Debug, Clone)]
pub(super) struct StackdriverMetricsServiceRequestBuilder {
pub(super) uri: Uri,
pub(super) auth: GcpAuthenticator,
}

impl HttpServiceRequestBuilder for StackdriverMetricsServiceRequestBuilder {
fn build(&self, body: Bytes) -> Request<Bytes> {
let mut request = Request::post(self.uri.clone())
.header("Content-Type", "application/json")
.body(body)
.unwrap();

self.auth.apply(&mut request);

request
}
}

async fn healthcheck() -> crate::Result<()> {
Ok(())
}
9 changes: 9 additions & 0 deletions src/sinks/gcp/stackdriver/metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
//! GCP Cloud Monitoring (formerly Stackdriver Metrics) sink.
//! Sends metrics to [GPC Cloud Monitoring][cloud monitoring].
//!
//! [cloud monitoring]: https://cloud.google.com/monitoring/docs/monitoring-overview
mod config;
mod request_builder;
mod sink;
#[cfg(test)]
mod tests;
138 changes: 138 additions & 0 deletions src/sinks/gcp/stackdriver/metrics/request_builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
use std::io;

use bytes::Bytes;
use chrono::Utc;
use vector_core::event::{Metric, MetricValue};

use crate::sinks::{gcp, prelude::*, util::http::HttpRequest};

#[derive(Clone, Debug)]
pub(super) struct StackdriverMetricsRequestBuilder {
pub(super) encoder: StackdriverMetricsEncoder,
}

impl RequestBuilder<Vec<Metric>> for StackdriverMetricsRequestBuilder {
type Metadata = EventFinalizers;
type Events = Vec<Metric>;
type Encoder = StackdriverMetricsEncoder;
type Payload = Bytes;
type Request = HttpRequest;
type Error = io::Error;

fn compression(&self) -> Compression {
Compression::None
}

fn encoder(&self) -> &Self::Encoder {
&self.encoder
}

fn split_input(
&self,
mut events: Vec<Metric>,
) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
let finalizers = events.take_finalizers();
let builder = RequestMetadataBuilder::from_events(&events);
(finalizers, builder, events)
}

fn build_request(
&self,
metadata: Self::Metadata,
request_metadata: RequestMetadata,
payload: EncodeResult<Self::Payload>,
) -> Self::Request {
HttpRequest::new(payload.into_payload(), metadata, request_metadata)
}
}

#[derive(Clone, Debug)]
pub struct StackdriverMetricsEncoder {
pub(super) default_namespace: String,
pub(super) started: chrono::DateTime<Utc>,
pub(super) resource: gcp::GcpTypedResource,
}

impl encoding::Encoder<Vec<Metric>> for StackdriverMetricsEncoder {
/// Create the object defined [here][api_docs].
///
/// [api_docs]: https://cloud.google.com/monitoring/api/ref_v3/rest/v3/projects.timeSeries/create
fn encode_input(
&self,
input: Vec<Metric>,
writer: &mut dyn io::Write,
) -> io::Result<(usize, GroupedCountByteSize)> {
let mut byte_size = telemetry().create_request_count_byte_size();
let time_series = input
.into_iter()
.map(|metric| {
byte_size.add_event(&metric, metric.estimated_json_encoded_size_of());

let (series, data, _metadata) = metric.into_parts();
let namespace = series
.name
.namespace
.unwrap_or_else(|| self.default_namespace.clone());
let metric_type = format!(
"custom.googleapis.com/{}/metrics/{}",
namespace, series.name.name
);

let end_time = data.time.timestamp.unwrap_or_else(chrono::Utc::now);

let (point_value, interval, metric_kind) = match &data.value {
MetricValue::Counter { value } => {
let interval = gcp::GcpInterval {
start_time: Some(self.started),
end_time,
};

(*value, interval, gcp::GcpMetricKind::Cumulative)
}
MetricValue::Gauge { value } => {
let interval = gcp::GcpInterval {
start_time: None,
end_time,
};

(*value, interval, gcp::GcpMetricKind::Gauge)
}
_ => {
unreachable!("sink has filtered out all metrics that aren't counter or gauge by this point")
},
};
let metric_labels = series
.tags
.unwrap_or_default()
.into_iter_single()
.collect::<std::collections::HashMap<_, _>>();

gcp::GcpSerie {
metric: gcp::GcpMetric {
r#type: metric_type,
labels: metric_labels,
},
resource: gcp::GcpResource {
r#type: self.resource.r#type.clone(),
labels: self.resource.labels.clone(),
},
metric_kind,
value_type: gcp::GcpValueType::Int64,
points: vec![gcp::GcpPoint {
interval,
value: gcp::GcpPointValue {
int64_value: Some(point_value as i64),
},
}],
}
})
.collect::<Vec<_>>();

let series = gcp::GcpSeries {
time_series: &time_series,
};

let body = crate::serde::json::to_bytes(&series).unwrap().freeze();
writer.write_all(&body).map(|()| (body.len(), byte_size))
}
}
Loading

0 comments on commit 92268e4

Please sign in to comment.