Skip to content

Commit

Permalink
fix: skip encoding empty sketches (vectordotdev#18530)
Browse files Browse the repository at this point in the history
Signed-off-by: Luke Steensen <[email protected]>
  • Loading branch information
lukesteensen authored Sep 11, 2023
1 parent 7295f22 commit ae0aa11
Showing 1 changed file with 54 additions and 21 deletions.
75 changes: 54 additions & 21 deletions src/sinks/datadog/metrics/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,11 @@ fn sketch_to_proto_message(
ddsketch: &AgentDDSketch,
default_namespace: &Option<Arc<str>>,
log_schema: &'static LogSchema,
) -> ddmetric_proto::sketch_payload::Sketch {
) -> Option<ddmetric_proto::sketch_payload::Sketch> {
if ddsketch.is_empty() {
return None;
}

let name = get_namespaced_name(metric, default_namespace);
let ts = encode_timestamp(metric.timestamp());
let mut tags = metric.tags().cloned().unwrap_or_default();
Expand Down Expand Up @@ -418,7 +422,7 @@ fn sketch_to_proto_message(
let k = bins.into_iter().map(Into::into).collect();
let n = counts.into_iter().map(Into::into).collect();

ddmetric_proto::sketch_payload::Sketch {
Some(ddmetric_proto::sketch_payload::Sketch {
metric: name,
tags,
host,
Expand All @@ -433,7 +437,7 @@ fn sketch_to_proto_message(
k,
n,
}],
}
})
}

fn encode_sketch_incremental<B>(
Expand All @@ -459,16 +463,21 @@ where
// for `SketchPayload` with a single sketch looks just like as if we literally wrote out a
// single value for the given field.

let sketch_proto = sketch_to_proto_message(metric, ddsketch, default_namespace, log_schema);

// Manually write the field tag for `sketches` and then encode the sketch payload directly as a
// length-delimited message.
prost::encoding::encode_key(
get_sketch_payload_sketches_field_number(),
prost::encoding::WireType::LengthDelimited,
buf,
);
sketch_proto.encode_length_delimited(buf)
if let Some(sketch_proto) =
sketch_to_proto_message(metric, ddsketch, default_namespace, log_schema)
{
// Manually write the field tag for `sketches` and then encode the sketch payload directly as a
// length-delimited message.
prost::encoding::encode_key(
get_sketch_payload_sketches_field_number(),
prost::encoding::WireType::LengthDelimited,
buf,
);
sketch_proto.encode_length_delimited(buf)
} else {
// If the sketch was empty, that's fine too
Ok(())
}
}

fn get_namespaced_name(metric: &Metric, default_namespace: &Option<Arc<str>>) -> String {
Expand Down Expand Up @@ -790,15 +799,11 @@ mod tests {
};
match sketch {
MetricSketch::AgentDDSketch(ddsketch) => {
// Don't encode any empty sketches.
if ddsketch.is_empty() {
continue;
if let Some(sketch) =
sketch_to_proto_message(metric, ddsketch, default_namespace, log_schema)
{
sketches.push(sketch);
}

let sketch =
sketch_to_proto_message(metric, ddsketch, default_namespace, log_schema);

sketches.push(sketch);
}
}
}
Expand Down Expand Up @@ -927,6 +932,34 @@ mod tests {
assert_eq!(expected, processed.pop().unwrap());
}

#[test]
fn encode_empty_sketch() {
// This is a simple test where we ensure that a single metric, with the default limits, can
// be encoded without hitting any errors.
let mut encoder = DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Sketches, None)
.expect("default payload size limits should be valid");
let sketch = Metric::new(
"empty",
MetricKind::Incremental,
AgentDDSketch::with_agent_defaults().into(),
)
.with_timestamp(Some(ts()));
let expected = sketch.clone();

// Encode the sketch.
let result = encoder.try_encode(sketch);
assert!(result.is_ok());
assert_eq!(result.unwrap(), None);

// Finish the payload, make sure we got what we came for.
let result = encoder.finish();
assert!(result.is_ok());

let (_payload, mut processed) = result.unwrap();
assert_eq!(processed.len(), 1);
assert_eq!(expected, processed.pop().unwrap());
}

#[test]
fn encode_multiple_sketch_metrics_normal_vs_incremental() {
// This tests our incremental sketch encoding against the more straightforward approach of
Expand Down

0 comments on commit ae0aa11

Please sign in to comment.