Skip to content

Commit

Permalink
Upgrade to release v1.0.0-RC3 (trace) and v0.23 (metrics) (#1091)
Browse files Browse the repository at this point in the history
* WIP metrics update

Signed-off-by: Anthony J Mirabella <[email protected]>

* fix cortex

* one dogstatsd test

* fix tests w/ an ordered reader

* checkpoint

* checkpoint2

* checkpoint3

* tests pass

* precommit

* OC lint

Co-authored-by: Anthony J Mirabella <[email protected]>
  • Loading branch information
jmacd and Aneurysm9 authored Sep 8, 2021
1 parent f650585 commit b2a1760
Show file tree
Hide file tree
Showing 101 changed files with 728 additions and 669 deletions.
83 changes: 47 additions & 36 deletions exporters/metric/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,28 @@ import (
controller "go.opentelemetry.io/otel/sdk/metric/controller/basic"
processor "go.opentelemetry.io/otel/sdk/metric/processor/basic"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
"go.opentelemetry.io/otel/sdk/resource"
)

// Exporter forwards metrics to a Cortex instance
type Exporter struct {
config Config
}

type exportData struct {
export.Record

Resource *resource.Resource
}

// ExportKindFor returns CumulativeExporter so the Processor correctly aggregates data
func (e *Exporter) ExportKindFor(*apimetric.Descriptor, aggregation.Kind) metric.ExportKind {
return metric.CumulativeExportKind
}

// Export forwards metrics to Cortex from the SDK
func (e *Exporter) Export(_ context.Context, checkpointSet metric.CheckpointSet) error {
timeseries, err := e.ConvertToTimeSeries(checkpointSet)
func (e *Exporter) Export(_ context.Context, res *resource.Resource, checkpointSet metric.CheckpointSet) error {
timeseries, err := e.ConvertToTimeSeries(res, checkpointSet)
if err != nil {
return err
}
Expand Down Expand Up @@ -121,13 +128,17 @@ func InstallNewPipeline(config Config, options ...controller.Option) (*controlle
// ConvertToTimeSeries converts a CheckpointSet to a slice of TimeSeries pointers
// Based on the aggregation type, ConvertToTimeSeries will call helper functions like
// convertFromSum to generate the correct number of TimeSeries.
func (e *Exporter) ConvertToTimeSeries(checkpointSet export.CheckpointSet) ([]*prompb.TimeSeries, error) {
func (e *Exporter) ConvertToTimeSeries(res *resource.Resource, checkpointSet export.CheckpointSet) ([]*prompb.TimeSeries, error) {
var aggError error
var timeSeries []*prompb.TimeSeries

// Iterate over each record in the checkpoint set and convert to TimeSeries
aggError = checkpointSet.ForEach(e, func(record metric.Record) error {
// Convert based on aggregation type
edata := exportData{
Resource: res,
Record: record,
}
agg := record.Aggregation()

// The following section uses loose type checking to determine how to
Expand All @@ -137,33 +148,33 @@ func (e *Exporter) ConvertToTimeSeries(checkpointSet export.CheckpointSet) ([]*p
// See the Aggregator Kind for more information
// https://github.com/open-telemetry/opentelemetry-go/blob/main/sdk/export/metric/aggregation/aggregation.go#L123-L138
if histogram, ok := agg.(aggregation.Histogram); ok {
tSeries, err := convertFromHistogram(record, histogram)
tSeries, err := convertFromHistogram(edata, histogram)
if err != nil {
return err
}
timeSeries = append(timeSeries, tSeries...)
} else if sum, ok := agg.(aggregation.Sum); ok {
tSeries, err := convertFromSum(record, sum)
tSeries, err := convertFromSum(edata, sum)
if err != nil {
return err
}
timeSeries = append(timeSeries, tSeries)
if minMaxSumCount, ok := agg.(aggregation.MinMaxSumCount); ok {
tSeries, err := convertFromMinMaxSumCount(record, minMaxSumCount)
tSeries, err := convertFromMinMaxSumCount(edata, minMaxSumCount)
if err != nil {
return err
}
timeSeries = append(timeSeries, tSeries...)
}
} else if lastValue, ok := agg.(aggregation.LastValue); ok {
tSeries, err := convertFromLastValue(record, lastValue)
tSeries, err := convertFromLastValue(edata, lastValue)
if err != nil {
return err
}
timeSeries = append(timeSeries, tSeries)
} else {
// Report to the user when no conversion was found
fmt.Printf("No conversion found for record: %s\n", record.Descriptor().Name())
fmt.Printf("No conversion found for record: %s\n", edata.Descriptor().Name())
}

return nil
Expand All @@ -178,13 +189,13 @@ func (e *Exporter) ConvertToTimeSeries(checkpointSet export.CheckpointSet) ([]*p
}

// createTimeSeries is a helper function to create a timeseries from a value and attributes
func createTimeSeries(record metric.Record, value number.Number, valueNumberKind number.Kind, extraAttributes ...attribute.KeyValue) *prompb.TimeSeries {
func createTimeSeries(edata exportData, value number.Number, valueNumberKind number.Kind, extraAttributes ...attribute.KeyValue) *prompb.TimeSeries {
sample := prompb.Sample{
Value: value.CoerceToFloat64(valueNumberKind),
Timestamp: int64(time.Nanosecond) * record.EndTime().UnixNano() / int64(time.Millisecond),
Timestamp: int64(time.Nanosecond) * edata.EndTime().UnixNano() / int64(time.Millisecond),
}

attributes := createLabelSet(record, extraAttributes...)
attributes := createLabelSet(edata, extraAttributes...)

return &prompb.TimeSeries{
Samples: []prompb.Sample{sample},
Expand All @@ -193,7 +204,7 @@ func createTimeSeries(record metric.Record, value number.Number, valueNumberKind
}

// convertFromSum returns a single TimeSeries based on a Record with a Sum aggregation
func convertFromSum(record metric.Record, sum aggregation.Sum) (*prompb.TimeSeries, error) {
func convertFromSum(edata exportData, sum aggregation.Sum) (*prompb.TimeSeries, error) {
// Get Sum value
value, err := sum.Sum()
if err != nil {
Expand All @@ -202,56 +213,56 @@ func convertFromSum(record metric.Record, sum aggregation.Sum) (*prompb.TimeSeri

// Create TimeSeries. Note that Cortex requires the name attribute to be in the format
// "__name__". This is the case for all time series created by this exporter.
name := sanitize(record.Descriptor().Name())
numberKind := record.Descriptor().NumberKind()
tSeries := createTimeSeries(record, value, numberKind, attribute.String("__name__", name))
name := sanitize(edata.Descriptor().Name())
numberKind := edata.Descriptor().NumberKind()
tSeries := createTimeSeries(edata, value, numberKind, attribute.String("__name__", name))

return tSeries, nil
}

// convertFromLastValue returns a single TimeSeries based on a Record with a LastValue aggregation
func convertFromLastValue(record metric.Record, lastValue aggregation.LastValue) (*prompb.TimeSeries, error) {
func convertFromLastValue(edata exportData, lastValue aggregation.LastValue) (*prompb.TimeSeries, error) {
// Get value
value, _, err := lastValue.LastValue()
if err != nil {
return nil, err
}

// Create TimeSeries
name := sanitize(record.Descriptor().Name())
numberKind := record.Descriptor().NumberKind()
tSeries := createTimeSeries(record, value, numberKind, attribute.String("__name__", name))
name := sanitize(edata.Descriptor().Name())
numberKind := edata.Descriptor().NumberKind()
tSeries := createTimeSeries(edata, value, numberKind, attribute.String("__name__", name))

return tSeries, nil
}

// convertFromMinMaxSumCount returns 4 TimeSeries for the min, max, sum, and count from the mmsc aggregation
func convertFromMinMaxSumCount(record metric.Record, minMaxSumCount aggregation.MinMaxSumCount) ([]*prompb.TimeSeries, error) {
numberKind := record.Descriptor().NumberKind()
func convertFromMinMaxSumCount(edata exportData, minMaxSumCount aggregation.MinMaxSumCount) ([]*prompb.TimeSeries, error) {
numberKind := edata.Descriptor().NumberKind()

// Convert Min
min, err := minMaxSumCount.Min()
if err != nil {
return nil, err
}
name := sanitize(record.Descriptor().Name() + "_min")
minTimeSeries := createTimeSeries(record, min, numberKind, attribute.String("__name__", name))
name := sanitize(edata.Descriptor().Name() + "_min")
minTimeSeries := createTimeSeries(edata, min, numberKind, attribute.String("__name__", name))

// Convert Max
max, err := minMaxSumCount.Max()
if err != nil {
return nil, err
}
name = sanitize(record.Descriptor().Name() + "_max")
maxTimeSeries := createTimeSeries(record, max, numberKind, attribute.String("__name__", name))
name = sanitize(edata.Descriptor().Name() + "_max")
maxTimeSeries := createTimeSeries(edata, max, numberKind, attribute.String("__name__", name))

// Convert Count
count, err := minMaxSumCount.Count()
if err != nil {
return nil, err
}
name = sanitize(record.Descriptor().Name() + "_count")
countTimeSeries := createTimeSeries(record, number.NewInt64Number(int64(count)), number.Int64Kind, attribute.String("__name__", name))
name = sanitize(edata.Descriptor().Name() + "_count")
countTimeSeries := createTimeSeries(edata, number.NewInt64Number(int64(count)), number.Int64Kind, attribute.String("__name__", name))

// Return all timeSeries
tSeries := []*prompb.TimeSeries{
Expand All @@ -262,17 +273,17 @@ func convertFromMinMaxSumCount(record metric.Record, minMaxSumCount aggregation.
}

// convertFromHistogram returns len(histogram.Buckets) timeseries for a histogram aggregation
func convertFromHistogram(record metric.Record, histogram aggregation.Histogram) ([]*prompb.TimeSeries, error) {
func convertFromHistogram(edata exportData, histogram aggregation.Histogram) ([]*prompb.TimeSeries, error) {
var timeSeries []*prompb.TimeSeries
metricName := sanitize(record.Descriptor().Name())
numberKind := record.Descriptor().NumberKind()
metricName := sanitize(edata.Descriptor().Name())
numberKind := edata.Descriptor().NumberKind()

// Create Sum TimeSeries
sum, err := histogram.Sum()
if err != nil {
return nil, err
}
sumTimeSeries := createTimeSeries(record, sum, numberKind, attribute.String("__name__", metricName+"_sum"))
sumTimeSeries := createTimeSeries(edata, sum, numberKind, attribute.String("__name__", metricName+"_sum"))
timeSeries = append(timeSeries, sumTimeSeries)

// Handle Histogram buckets
Expand All @@ -294,7 +305,7 @@ func convertFromHistogram(record metric.Record, histogram aggregation.Histogram)
boundaryStr := strconv.FormatFloat(boundary, 'f', -1, 64)

// Create timeSeries and append
boundaryTimeSeries := createTimeSeries(record, number.NewFloat64Number(totalCount), number.Float64Kind, attribute.String("__name__", metricName), attribute.String("le", boundaryStr))
boundaryTimeSeries := createTimeSeries(edata, number.NewFloat64Number(totalCount), number.Float64Kind, attribute.String("__name__", metricName), attribute.String("le", boundaryStr))
timeSeries = append(timeSeries, boundaryTimeSeries)
}

Expand All @@ -304,9 +315,9 @@ func convertFromHistogram(record metric.Record, histogram aggregation.Histogram)
// Create a timeSeries for the +inf bucket and total count
// These are the same and are both required by Prometheus-based backends

upperBoundTimeSeries := createTimeSeries(record, number.NewFloat64Number(totalCount), number.Float64Kind, attribute.String("__name__", metricName), attribute.String("le", "+inf"))
upperBoundTimeSeries := createTimeSeries(edata, number.NewFloat64Number(totalCount), number.Float64Kind, attribute.String("__name__", metricName), attribute.String("le", "+inf"))

countTimeSeries := createTimeSeries(record, number.NewFloat64Number(totalCount), number.Float64Kind, attribute.String("__name__", metricName+"_count"))
countTimeSeries := createTimeSeries(edata, number.NewFloat64Number(totalCount), number.Float64Kind, attribute.String("__name__", metricName+"_count"))

timeSeries = append(timeSeries, upperBoundTimeSeries)
timeSeries = append(timeSeries, countTimeSeries)
Expand All @@ -316,13 +327,13 @@ func convertFromHistogram(record metric.Record, histogram aggregation.Histogram)

// createLabelSet combines attributes from a Record, resource, and extra attributes to create a
// slice of prompb.Label.
func createLabelSet(record metric.Record, extraAttributes ...attribute.KeyValue) []*prompb.Label {
func createLabelSet(edata exportData, extraAttributes ...attribute.KeyValue) []*prompb.Label {
// Map ensure no duplicate label names.
labelMap := map[string]prompb.Label{}

// mergeAttributes merges Record and Resource attributes into a single set, giving precedence
// to the record's attributes.
mi := attribute.NewMergeIterator(record.Labels(), record.Resource().Set())
mi := attribute.NewMergeIterator(edata.Labels(), edata.Resource.Set())
for mi.Next() {
attribute := mi.Label()
key := string(attribute.Key)
Expand Down
18 changes: 12 additions & 6 deletions exporters/metric/cortex/cortex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import (
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
)

var testResource = resource.NewWithAttributes(semconv.SchemaURL, attribute.String("R", "V"))

// ValidConfig is a Config struct that should cause no errors.
var validConfig = Config{
Endpoint: "/api/prom/push",
Expand Down Expand Up @@ -67,9 +69,6 @@ var validConfig = Config{
Quantiles: []float64{0, 0.25, 0.5, 0.75, 1},
}

var testResource = resource.NewWithAttributes(semconv.SchemaURL, attribute.String("R", "V"))
var mockTime = int64(time.Nanosecond) * time.Time{}.UnixNano() / int64(time.Millisecond)

func TestExportKindFor(t *testing.T) {
exporter := Exporter{}
got := exporter.ExportKindFor(nil, aggregation.Kind(rune(0)))
Expand All @@ -88,6 +87,8 @@ func TestConvertToTimeSeries(t *testing.T) {
},
}

startTime := time.Now()

// Test conversions based on aggregation type
tests := []struct {
name string
Expand Down Expand Up @@ -121,9 +122,11 @@ func TestConvertToTimeSeries(t *testing.T) {
},
}

endTime := time.Now()

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := exporter.ConvertToTimeSeries(tt.input)
got, err := exporter.ConvertToTimeSeries(testResource, tt.input)
want := tt.want

// Check for errors and for the correct number of timeseries.
Expand All @@ -148,10 +151,13 @@ func TestConvertToTimeSeries(t *testing.T) {
wantAttributes[attribute.String()] = true
}
for _, sample := range got[i].Samples {
gotSamples[sample.String()] = true
gotSamples[fmt.Sprint(sample.Value)] = true

assert.LessOrEqual(t, toMillis(startTime), sample.Timestamp)
assert.GreaterOrEqual(t, toMillis(endTime), sample.Timestamp)
}
for _, sample := range want[i].Samples {
wantSamples[sample.String()] = true
wantSamples[fmt.Sprint(sample.Value)] = true
}
}
assert.Equal(t, wantAttributes, gotAttributes)
Expand Down
8 changes: 4 additions & 4 deletions exporters/metric/cortex/example/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ replace (
)

require (
go.opentelemetry.io/contrib/exporters/metric/cortex v0.22.0
go.opentelemetry.io/contrib/exporters/metric/cortex/utils v0.22.0
go.opentelemetry.io/contrib/exporters/metric/cortex v0.23.0
go.opentelemetry.io/contrib/exporters/metric/cortex/utils v0.23.0
go.opentelemetry.io/otel v1.0.0-RC3
go.opentelemetry.io/otel/metric v0.22.0
go.opentelemetry.io/otel/metric v0.23.0
go.opentelemetry.io/otel/sdk v1.0.0-RC3
go.opentelemetry.io/otel/sdk/metric v0.22.0
go.opentelemetry.io/otel/sdk/metric v0.23.0
)
20 changes: 8 additions & 12 deletions exporters/metric/cortex/example/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -255,22 +255,18 @@ go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk=
go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
go.opentelemetry.io/otel v1.0.0-RC1/go.mod h1:x9tRa9HK4hSSq7jf2TKbqFbtt58/TGk0f9XiEYISI1I=
go.opentelemetry.io/otel v1.0.0-RC3 h1:kvwiyEkiUT/JaadXzVLI/R1wDO934A7r3Bs2wEe6wqA=
go.opentelemetry.io/otel v1.0.0-RC3/go.mod h1:Ka5j3ua8tZs4Rkq4Ex3hwgBgOchyPVq5S6P2lz//nKQ=
go.opentelemetry.io/otel/internal/metric v0.22.0 h1:Q9bS02XRykSRIbggaU4hVF9oWOP9PyILu26zJWoKmk0=
go.opentelemetry.io/otel/internal/metric v0.22.0/go.mod h1:7qVuMihW/ktMonEfOvBXuh6tfMvvEyoIDgeJNRloYbQ=
go.opentelemetry.io/otel/metric v0.22.0 h1:/qv10BzznqEifrXBwsTT370OCN1PRgt+mnjzMwxJKrQ=
go.opentelemetry.io/otel/metric v0.22.0/go.mod h1:KcsUkBiYGW003DJ+ugd2aqIRIfjabD9jeOUXqsAtrq0=
go.opentelemetry.io/otel/oteltest v1.0.0-RC1/go.mod h1:+eoIG0gdEOaPNftuy1YScLr1Gb4mL/9lpDkZ0JjMRq4=
go.opentelemetry.io/otel/sdk v1.0.0-RC1/go.mod h1:kj6yPn7Pgt5ByRuwesbaWcRLA+V7BSDg3Hf8xRvsvf8=
go.opentelemetry.io/otel/internal/metric v0.23.0 h1:mPfzm9Iqhw7G2nDBmUAjFTfPqLZPbOW2k7QI57ITbaI=
go.opentelemetry.io/otel/internal/metric v0.23.0/go.mod h1:z+RPiDJe30YnCrOhFGivwBS+DU1JU/PiLKkk4re2DNY=
go.opentelemetry.io/otel/metric v0.23.0 h1:mYCcDxi60P4T27/0jchIDFa1WHEfQeU3zH9UEMpnj2c=
go.opentelemetry.io/otel/metric v0.23.0/go.mod h1:G/Nn9InyNnIv7J6YVkQfpc0JCfKBNJaERBGw08nqmVQ=
go.opentelemetry.io/otel/sdk v1.0.0-RC3 h1:iRMkET+EmJUn5mW0hJzygBraXRmrUwzbOtNvTCh/oKs=
go.opentelemetry.io/otel/sdk v1.0.0-RC3/go.mod h1:78H6hyg2fka0NYT9fqGuFLvly2yCxiBXDJAgLKo/2Us=
go.opentelemetry.io/otel/sdk/export/metric v0.22.0 h1:6huidwh9LZi/+lvFw7EQ+m+pVmlfhOMd9s9PmTXAgeo=
go.opentelemetry.io/otel/sdk/export/metric v0.22.0/go.mod h1:a14rf2CiHSn9xjB6cHuv0HoZGl5C4w2PAgl+Lja1VzU=
go.opentelemetry.io/otel/sdk/metric v0.22.0 h1:ZBagqeLlTgEmvxtaN3GkvmbmG+XWKDwS+amr8EsSMDo=
go.opentelemetry.io/otel/sdk/metric v0.22.0/go.mod h1:LzkI0G0z6KhEagqmzgk3bw/dglE2Tk2OXs455UMcI0s=
go.opentelemetry.io/otel/trace v1.0.0-RC1/go.mod h1:86UHmyHWFEtWjfWPSbu0+d0Pf9Q6e1U+3ViBOc+NXAg=
go.opentelemetry.io/otel/sdk/export/metric v0.23.0 h1:7NeoKPPx6NdZBVHLEp/LY5Lq85Ff1WNZnuJkuRy+azw=
go.opentelemetry.io/otel/sdk/export/metric v0.23.0/go.mod h1:SuMiREmKVRIwFKq73zvGTvwFpxb/ZAYkMfyqMoOtDqs=
go.opentelemetry.io/otel/sdk/metric v0.23.0 h1:xlZhPbiue1+jjSFEth94q9QCmX8Q24mOtue9IAmlVyI=
go.opentelemetry.io/otel/sdk/metric v0.23.0/go.mod h1:wa0sKK13eeIFW+0OFjcC3S1i7FTRRiLAXe1kjBVbhwg=
go.opentelemetry.io/otel/trace v1.0.0-RC3 h1:9F0ayEvlxv8BmNmPbU005WK7hC+7KbOazCPZjNa1yME=
go.opentelemetry.io/otel/trace v1.0.0-RC3/go.mod h1:VUt2TUYd8S2/ZRX09ZDFZQwn2RqfMB5MzO17jBojGxo=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
Expand Down
10 changes: 5 additions & 5 deletions exporters/metric/cortex/example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,16 @@ func main() {
meter := pusher.MeterProvider().Meter("example")

// Create instruments.
recorder := metric.Must(meter).NewInt64ValueRecorder(
"example.valuerecorder",
histogram := metric.Must(meter).NewInt64Histogram(
"example.histogram",
metric.WithDescription("Records values"),
)

counter := metric.Must(meter).NewInt64Counter(
"example.counter",
metric.WithDescription("Counts things"),
)
fmt.Println("Success: Created Int64ValueRecorder and Int64Counter instruments!")
fmt.Println("Success: Created Int64Histogram and Int64Counter instruments!")

// Record random values to the instruments in a loop
fmt.Println("Starting to write data to the instruments!")
Expand All @@ -74,9 +74,9 @@ func main() {
time.Sleep(1 * time.Second)
randomValue := random.Intn(100)
value := int64(randomValue * 10)
recorder.Record(ctx, value, attribute.String("key", "value"))
histogram.Record(ctx, value, attribute.String("key", "value"))
counter.Add(ctx, int64(randomValue), attribute.String("key", "value"))
fmt.Printf("Adding %d to counter and recording %d in recorder\n", randomValue, value)
fmt.Printf("Adding %d to counter and recording %d in histogram\n", randomValue, value)
}

}
Expand Down
6 changes: 3 additions & 3 deletions exporters/metric/cortex/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ require (
github.com/prometheus/prometheus v2.5.0+incompatible
github.com/stretchr/testify v1.7.0
go.opentelemetry.io/otel v1.0.0-RC3
go.opentelemetry.io/otel/metric v0.22.0
go.opentelemetry.io/otel/metric v0.23.0
go.opentelemetry.io/otel/sdk v1.0.0-RC3
go.opentelemetry.io/otel/sdk/export/metric v0.22.0
go.opentelemetry.io/otel/sdk/metric v0.22.0
go.opentelemetry.io/otel/sdk/export/metric v0.23.0
go.opentelemetry.io/otel/sdk/metric v0.23.0
)
Loading

0 comments on commit b2a1760

Please sign in to comment.