Skip to content

Commit

Permalink
Add metric origins for sketches for MarshalSplitCompress path (#20990)
Browse files Browse the repository at this point in the history
  • Loading branch information
rayz authored Nov 21, 2023
1 parent 10fcf1f commit 5ae7e92
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 9 deletions.
2 changes: 2 additions & 0 deletions pkg/aggregator/time_sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ func (s *TimeSampler) newSketchSeries(ck ckey.ContextKey, points []metrics.Sketc
Interval: s.interval,
Points: points,
ContextKey: ck,
Source: ctx.source,
NoIndex: ctx.noIndex,
}

return ss
Expand Down
5 changes: 4 additions & 1 deletion pkg/metrics/sketch_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ import (
"bytes"
"encoding/json"

"github.com/DataDog/opentelemetry-mapping-go/pkg/quantile"

"github.com/DataDog/datadog-agent/pkg/aggregator/ckey"
"github.com/DataDog/datadog-agent/pkg/tagset"
"github.com/DataDog/opentelemetry-mapping-go/pkg/quantile"
)

// A SketchSeries is a timeseries of quantile sketches.
Expand All @@ -22,6 +23,8 @@ type SketchSeries struct {
Interval int64 `json:"interval"`
Points []SketchPoint `json:"points"`
ContextKey ckey.ContextKey `json:"-"`
NoIndex bool `json:"-"` // This is only used by api V2
Source MetricSource `json:"-"` // This is only used by api V2
}

// String returns the JSON representation of a SketchSeries as a string
Expand Down
11 changes: 10 additions & 1 deletion pkg/metrics/test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ import (

"github.com/stretchr/testify/assert"

"github.com/DataDog/opentelemetry-mapping-go/pkg/quantile"

"github.com/DataDog/datadog-agent/pkg/aggregator/ckey"
"github.com/DataDog/datadog-agent/pkg/tagset"
"github.com/DataDog/opentelemetry-mapping-go/pkg/quantile"
)

// AssertPointsEqual evaluate if two list of point are equal (order doesn't matters).
Expand Down Expand Up @@ -156,6 +157,14 @@ type SketchesSourceTest struct {
currentIndex int
}

// NewSketchesSourceTestWithSketch populates values with a single test sketch
func NewSketchesSourceTestWithSketch() *SketchesSourceTest {
return &SketchesSourceTest{
currentIndex: -1,
values: SketchSeriesList{&SketchSeries{Name: "fakename", Host: "fakehost"}},
}
}

func NewSketchesSourceTest() *SketchesSourceTest {
return &SketchesSourceTest{
currentIndex: -1,
Expand Down
47 changes: 47 additions & 0 deletions pkg/serializer/internal/metrics/sketch_series_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func (sl SketchSeriesList) MarshalSplitCompress(bufferContext *marshaler.BufferC
// const sketchDistributions = 3
const sketchTags = 4
const sketchDogsketches = 7
const sketchMetadata = 8
// const distributionTs = 1
// const distributionCnt = 2
// const distributionMin = 3
Expand All @@ -86,6 +87,30 @@ func (sl SketchSeriesList) MarshalSplitCompress(bufferContext *marshaler.BufferC
const dogsketchK = 7
const dogsketchN = 8

const sketchMetadataOrigin = 1
// |------| 'Metadata' message
// |-----| 'origin' field index
const sketchMetadataOriginMetricType = 3
// |------| 'Metadata' message
// |----| 'origin' message
// |--------| 'metric_type' field index
const metryTypeNotIndexed = 9
// |-----------------| 'metric_type_agent_hidden' field index

const sketchMetadataOriginOriginProduct = 4
// |----| 'Origin' message
// |-----------| 'origin_product' field index
const sketchMetadataOriginOriginCategory = 5
// |----| 'Origin' message
// |-----------| 'origin_category' field index
const sketchMetadataOriginOriginService = 6
// |----| 'Origin' message
// |-----------| 'origin_service' field index
const serieMetadataOriginOriginProductAgentType = 10
// |----| 'Origin' message
// |-----------| 'OriginProduct' enum
// |-------| 'Agent' enum value

// the backend accepts payloads up to specific compressed / uncompressed
// sizes, but prefers small uncompressed payloads.
maxPayloadSize := config.Datadog.GetInt("serializer_max_payload_size")
Expand Down Expand Up @@ -216,6 +241,28 @@ func (sl SketchSeriesList) MarshalSplitCompress(bufferContext *marshaler.BufferC
return err
}
}
err = ps.Embedded(sketchMetadata, func(ps *molecule.ProtoStream) error {
return ps.Embedded(sketchMetadataOrigin, func(ps *molecule.ProtoStream) error {
if ss.NoIndex {
err = ps.Int32(sketchMetadataOriginMetricType, metryTypeNotIndexed)
if err != nil {
return err
}
}
err = ps.Int32(sketchMetadataOriginOriginProduct, serieMetadataOriginOriginProductAgentType)
if err != nil {
return err
}
err = ps.Int32(sketchMetadataOriginOriginCategory, MetricSourceToOriginCategory(ss.Source))
if err != nil {
return err
}
return ps.Int32(sketchMetadataOriginOriginService, MetricSourceToOriginService(ss.Source))
})
})
if err != nil {
return err
}

return nil
})
Expand Down
5 changes: 0 additions & 5 deletions pkg/serializer/internal/metrics/sketch_series_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,6 @@ func TestSketchSeriesMarshalSplitCompress(t *testing.T) {
sl.Append(Makeseries(i))
}

serializer1 := SketchSeriesList{SketchesSource: sl}
payload, _ := serializer1.Marshal()
sl.Reset()
serializer2 := SketchSeriesList{SketchesSource: sl}
payloads, err := serializer2.MarshalSplitCompress(marshaler.NewBufferContext())
Expand All @@ -169,9 +167,6 @@ func TestSketchSeriesMarshalSplitCompress(t *testing.T) {
decompressed, _ := io.ReadAll(r)
r.Close()

// Check that we encoded the protobuf correctly
assert.Equal(t, decompressed, payload)

pl := new(gogen.SketchPayload)
err = pl.Unmarshal(decompressed)
require.NoError(t, err)
Expand Down
7 changes: 5 additions & 2 deletions pkg/serializer/serializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,11 +322,14 @@ func TestSendSeries(t *testing.T) {
func TestSendSketch(t *testing.T) {
f := &forwarder.MockedForwarder{}

matcher := createProtoscopeMatcher(`2: {}`)
matcher := createProtoscopeMatcher(`
1: { 1: {"fakename"} 2: {"fakehost"} 8: { 1: { 4: 10 }}}
2: {}
`)
f.On("SubmitSketchSeries", matcher, protobufExtraHeadersWithCompression).Return(nil).Times(1)

s := NewSerializer(f, nil)
err := s.SendSketch(metrics.NewSketchesSourceTest())
err := s.SendSketch(metrics.NewSketchesSourceTestWithSketch())
require.Nil(t, err)
f.AssertExpectations(t)
}
Expand Down

0 comments on commit 5ae7e92

Please sign in to comment.