Skip to content

Commit

Permalink
Tests pass
Browse files Browse the repository at this point in the history
  • Loading branch information
jmacd committed May 18, 2020
1 parent 57c12e6 commit e2126bf
Show file tree
Hide file tree
Showing 12 changed files with 113 additions and 132 deletions.
5 changes: 2 additions & 3 deletions exporters/metric/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"go.opentelemetry.io/otel/sdk/metric/controller/push"
integrator "go.opentelemetry.io/otel/sdk/metric/integrator/simple"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
"go.opentelemetry.io/otel/sdk/resource"
)

// Exporter is an implementation of metric.Exporter that sends metrics to
Expand Down Expand Up @@ -169,8 +168,7 @@ func NewExportPipeline(config Config, period time.Duration) (*push.Controller, h
}

// Export exports the provide metric record to prometheus.
func (e *Exporter) Export(_ context.Context, _ *resource.Resource, checkpointSet export.CheckpointSet) error {
// TODO: Use the resource value in this exporter.
func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error {
e.snapshot = checkpointSet
return nil
}
Expand Down Expand Up @@ -211,6 +209,7 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) {
err := c.exp.snapshot.ForEach(func(record export.Record) error {
agg := record.Aggregator()
numberKind := record.Descriptor().NumberKind()
// TODO: Use the resource value in this record.
labels := labelValues(record.Labels())
desc := c.toDesc(&record)

Expand Down
4 changes: 2 additions & 2 deletions exporters/metric/prometheus/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestPrometheusExporter(t *testing.T) {
}

var expected []string
checkpointSet := test.NewCheckpointSet()
checkpointSet := test.NewCheckpointSet(nil)

counter := metric.NewDescriptor(
"counter", metric.CounterKind, metric.Float64NumberKind)
Expand Down Expand Up @@ -117,7 +117,7 @@ func TestPrometheusExporter(t *testing.T) {
}

func compareExport(t *testing.T, exporter *prometheus.Exporter, checkpointSet *test.CheckpointSet, expected []string) {
err := exporter.Export(context.Background(), nil, checkpointSet)
err := exporter.Export(context.Background(), checkpointSet)
require.Nil(t, err)

rec := httptest.NewRecorder()
Expand Down
44 changes: 22 additions & 22 deletions exporters/metric/stdout/stdout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,11 @@ type testFixture struct {
ctx context.Context
exporter *stdout.Exporter
output *bytes.Buffer
resource *resource.Resource
}

func newFixture(t *testing.T, resource *resource.Resource, config stdout.Config) testFixture {
var testResource = resource.New(kv.String("R", "V"))

func newFixture(t *testing.T, config stdout.Config) testFixture {
buf := &bytes.Buffer{}
config.Writer = buf
config.DoNotPrintTime = true
Expand All @@ -60,7 +61,6 @@ func newFixture(t *testing.T, resource *resource.Resource, config stdout.Config)
ctx: context.Background(),
exporter: exp,
output: buf,
resource: resource,
}
}

Expand Down Expand Up @@ -95,7 +95,7 @@ func TestStdoutTimestamp(t *testing.T) {

before := time.Now()

checkpointSet := test.NewCheckpointSet()
checkpointSet := test.NewCheckpointSet(testResource)

ctx := context.Background()
desc := metric.NewDescriptor("test.name", metric.ObserverKind, metric.Int64NumberKind)
Expand Down Expand Up @@ -139,9 +139,9 @@ func TestStdoutTimestamp(t *testing.T) {
}

func TestStdoutCounterFormat(t *testing.T) {
fix := newFixture(t, nil, stdout.Config{})
fix := newFixture(t, stdout.Config{})

checkpointSet := test.NewCheckpointSet()
checkpointSet := test.NewCheckpointSet(testResource)

desc := metric.NewDescriptor("test.name", metric.CounterKind, metric.Int64NumberKind)
cagg := sum.New()
Expand All @@ -152,13 +152,13 @@ func TestStdoutCounterFormat(t *testing.T) {

fix.Export(checkpointSet)

require.Equal(t, `{"updates":[{"name":"test.name{A=B,C=D}","sum":123}]}`, fix.Output())
require.Equal(t, `{"updates":[{"name":"test.name{R=V,A=B,C=D}","sum":123}]}`, fix.Output())
}

func TestStdoutLastValueFormat(t *testing.T) {
fix := newFixture(t, nil, stdout.Config{})
fix := newFixture(t, stdout.Config{})

checkpointSet := test.NewCheckpointSet()
checkpointSet := test.NewCheckpointSet(testResource)

desc := metric.NewDescriptor("test.name", metric.ObserverKind, metric.Float64NumberKind)
lvagg := lastvalue.New()
Expand All @@ -169,13 +169,13 @@ func TestStdoutLastValueFormat(t *testing.T) {

fix.Export(checkpointSet)

require.Equal(t, `{"updates":[{"name":"test.name{A=B,C=D}","last":123.456}]}`, fix.Output())
require.Equal(t, `{"updates":[{"name":"test.name{R=V,A=B,C=D}","last":123.456}]}`, fix.Output())
}

func TestStdoutMinMaxSumCount(t *testing.T) {
fix := newFixture(t, nil, stdout.Config{})
fix := newFixture(t, stdout.Config{})

checkpointSet := test.NewCheckpointSet()
checkpointSet := test.NewCheckpointSet(testResource)

desc := metric.NewDescriptor("test.name", metric.ValueRecorderKind, metric.Float64NumberKind)
magg := minmaxsumcount.New(&desc)
Expand All @@ -187,15 +187,15 @@ func TestStdoutMinMaxSumCount(t *testing.T) {

fix.Export(checkpointSet)

require.Equal(t, `{"updates":[{"name":"test.name{A=B,C=D}","min":123.456,"max":876.543,"sum":999.999,"count":2}]}`, fix.Output())
require.Equal(t, `{"updates":[{"name":"test.name{R=V,A=B,C=D}","min":123.456,"max":876.543,"sum":999.999,"count":2}]}`, fix.Output())
}

func TestStdoutValueRecorderFormat(t *testing.T) {
fix := newFixture(t, nil, stdout.Config{
fix := newFixture(t, stdout.Config{
PrettyPrint: true,
})

checkpointSet := test.NewCheckpointSet()
checkpointSet := test.NewCheckpointSet(testResource)

desc := metric.NewDescriptor("test.name", metric.ValueRecorderKind, metric.Float64NumberKind)
magg := array.New()
Expand All @@ -213,7 +213,7 @@ func TestStdoutValueRecorderFormat(t *testing.T) {
require.Equal(t, `{
"updates": [
{
"name": "test.name{A=B,C=D}",
"name": "test.name{R=V,A=B,C=D}",
"min": 0.5,
"max": 999.5,
"sum": 500000,
Expand Down Expand Up @@ -247,9 +247,9 @@ func TestStdoutNoData(t *testing.T) {
t.Run(name, func(t *testing.T) {
t.Parallel()

fix := newFixture(t, nil, stdout.Config{})
fix := newFixture(t, stdout.Config{})

checkpointSet := test.NewCheckpointSet()
checkpointSet := test.NewCheckpointSet(testResource)

magg := tc
magg.Checkpoint(fix.ctx, &desc)
Expand All @@ -264,9 +264,9 @@ func TestStdoutNoData(t *testing.T) {
}

func TestStdoutLastValueNotSet(t *testing.T) {
fix := newFixture(t, nil, stdout.Config{})
fix := newFixture(t, stdout.Config{})

checkpointSet := test.NewCheckpointSet()
checkpointSet := test.NewCheckpointSet(testResource)

desc := metric.NewDescriptor("test.name", metric.ObserverKind, metric.Float64NumberKind)
lvagg := lastvalue.New()
Expand Down Expand Up @@ -314,9 +314,9 @@ func TestStdoutResource(t *testing.T) {
}

for _, tc := range testCases {
fix := newFixture(t, tc.res, stdout.Config{})
fix := newFixture(t, stdout.Config{})

checkpointSet := test.NewCheckpointSet()
checkpointSet := test.NewCheckpointSet(tc.res)

desc := metric.NewDescriptor("test.name", metric.ObserverKind, metric.Float64NumberKind)
lvagg := lastvalue.New()
Expand Down
8 changes: 4 additions & 4 deletions exporters/otlp/internal/transform/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type result struct {

// CheckpointSet transforms all records contained in a checkpoint into
// batched OTLP ResourceMetrics.
func CheckpointSet(ctx context.Context, resource *resource.Resource, cps export.CheckpointSet, numWorkers uint) ([]*metricpb.ResourceMetrics, error) {
func CheckpointSet(ctx context.Context, cps export.CheckpointSet, numWorkers uint) ([]*metricpb.ResourceMetrics, error) {
records, errc := source(ctx, cps)

// Start a fixed number of goroutines to transform records.
Expand All @@ -71,7 +71,7 @@ func CheckpointSet(ctx context.Context, resource *resource.Resource, cps export.
for i := uint(0); i < numWorkers; i++ {
go func() {
defer wg.Done()
transformer(ctx, resource, records, transformed)
transformer(ctx, records, transformed)
}()
}
go func() {
Expand Down Expand Up @@ -116,15 +116,15 @@ func source(ctx context.Context, cps export.CheckpointSet) (<-chan export.Record

// transformer transforms records read from the passed in chan into
// OTLP Metrics which are sent on the out chan.
func transformer(ctx context.Context, resource *resource.Resource, in <-chan export.Record, out chan<- result) {
func transformer(ctx context.Context, in <-chan export.Record, out chan<- result) {
for r := range in {
m, err := Record(r)
// Propagate errors, but do not send empty results.
if err == nil && m == nil {
continue
}
res := result{
Resource: resource,
Resource: r.Resource(),
Library: r.Descriptor().LibraryName(),
Metric: m,
Err: err,
Expand Down
5 changes: 2 additions & 3 deletions exporters/otlp/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"go.opentelemetry.io/otel/exporters/otlp/internal/transform"
metricsdk "go.opentelemetry.io/otel/sdk/export/metric"
tracesdk "go.opentelemetry.io/otel/sdk/export/trace"
"go.opentelemetry.io/otel/sdk/resource"
)

type Exporter struct {
Expand Down Expand Up @@ -212,7 +211,7 @@ func (e *Exporter) Stop() error {
// Export implements the "go.opentelemetry.io/otel/sdk/export/metric".Exporter
// interface. It transforms and batches metric Records into OTLP Metrics and
// transmits them to the configured collector.
func (e *Exporter) Export(parent context.Context, resource *resource.Resource, cps metricsdk.CheckpointSet) error {
func (e *Exporter) Export(parent context.Context, cps metricsdk.CheckpointSet) error {
// Unify the parent context Done signal with the exporter stopCh.
ctx, cancel := context.WithCancel(parent)
defer cancel()
Expand All @@ -224,7 +223,7 @@ func (e *Exporter) Export(parent context.Context, resource *resource.Resource, c
}
}(ctx, cancel)

rms, err := transform.CheckpointSet(ctx, resource, cps, e.c.numWorkers)
rms, err := transform.CheckpointSet(ctx, cps, e.c.numWorkers)
if err != nil {
return err
}
Expand Down
11 changes: 4 additions & 7 deletions exporters/otlp/otlp_metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,11 +659,10 @@ func runMetricExportTest(t *testing.T, exp *Exporter, rs []record, expected []me

equiv := r.resource.Equivalent()
resources[equiv] = r.resource
recs[equiv] = append(recs[equiv], metricsdk.NewRecord(&desc, &labs, agg))
recs[equiv] = append(recs[equiv], metricsdk.NewRecord(&desc, &labs, r.resource, agg))
}
for equiv, records := range recs {
resource := resources[equiv]
assert.NoError(t, exp.Export(context.Background(), resource, checkpointSet{records: records}))
for _, records := range recs {
assert.NoError(t, exp.Export(context.Background(), checkpointSet{records: records}))
}

// assert.ElementsMatch does not equate nested slices of different order,
Expand Down Expand Up @@ -713,8 +712,6 @@ func TestEmptyMetricExport(t *testing.T) {
exp.metricExporter = msc
exp.started = true

resource := resource.New(kv.String("R", "S"))

for _, test := range []struct {
records []metricsdk.Record
want []metricpb.ResourceMetrics
Expand All @@ -729,7 +726,7 @@ func TestEmptyMetricExport(t *testing.T) {
},
} {
msc.Reset()
require.NoError(t, exp.Export(context.Background(), resource, checkpointSet{records: test.records}))
require.NoError(t, exp.Export(context.Background(), checkpointSet{records: test.records}))
assert.Equal(t, test.want, msc.ResourceMetrics())
}
}
1 change: 0 additions & 1 deletion sdk/metric/controller/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ type Controller struct {
lock sync.Mutex
collectLock sync.Mutex
accumulator *sdk.Accumulator
resource *resource.Resource
uniq metric.MeterImpl
named map[string]metric.Meter
errorHandler sdk.ErrorHandler
Expand Down
25 changes: 18 additions & 7 deletions sdk/metric/controller/push/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"github.com/benbjohnson/clock"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel/api/kv"
"go.opentelemetry.io/otel/api/label"
"go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/exporters/metric/test"
export "go.opentelemetry.io/otel/sdk/export/metric"
Expand All @@ -42,6 +44,8 @@ type testIntegrator struct {
finishes int
}

var testResource = resource.New(kv.String("R", "V"))

type testExporter struct {
t *testing.T
lock sync.Mutex
Expand All @@ -68,7 +72,7 @@ var _ push.Clock = mockClock{}
var _ push.Ticker = mockTicker{}

func newFixture(t *testing.T) testFixture {
checkpointSet := test.NewCheckpointSet()
checkpointSet := test.NewCheckpointSet(testResource)

integrator := &testIntegrator{
t: t,
Expand Down Expand Up @@ -115,7 +119,7 @@ func (b *testIntegrator) getCounts() (checkpoints, finishes int) {
return b.checkpoints, b.finishes
}

func (e *testExporter) Export(_ context.Context, _ *resource.Resource, checkpointSet export.CheckpointSet) error {
func (e *testExporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error {
e.lock.Lock()
defer e.lock.Unlock()
e.exports++
Expand Down Expand Up @@ -213,6 +217,7 @@ func TestPushTicker(t *testing.T) {
require.Equal(t, 1, exports)
require.Equal(t, 1, len(records))
require.Equal(t, "counter", records[0].Descriptor().Name())
require.Equal(t, "R=V", records[0].Resource().Encoded(label.DefaultEncoder()))

sum, err := records[0].Aggregator().(aggregator.Sum).Sum()
require.Equal(t, int64(3), sum.AsInt64())
Expand All @@ -232,6 +237,7 @@ func TestPushTicker(t *testing.T) {
require.Equal(t, 2, exports)
require.Equal(t, 1, len(records))
require.Equal(t, "counter", records[0].Descriptor().Name())
require.Equal(t, "R=V", records[0].Resource().Encoded(label.DefaultEncoder()))

sum, err = records[0].Aggregator().(aggregator.Sum).Sum()
require.Equal(t, int64(7), sum.AsInt64())
Expand All @@ -256,8 +262,8 @@ func TestPushExportError(t *testing.T) {
expectedDescriptors []string
expectedError error
}{
{"errNone", nil, []string{"counter1", "counter2"}, nil},
{"errNoData", aggregator.ErrNoData, []string{"counter2"}, nil},
{"errNone", nil, []string{"counter1{R=V,X=Y}", "counter2{R=V,}"}, nil},
{"errNoData", aggregator.ErrNoData, []string{"counter2{R=V,}"}, nil},
{"errUnexpected", errAggregator, []string{}, errAggregator},
}
for _, tt := range tests {
Expand Down Expand Up @@ -287,7 +293,7 @@ func TestPushExportError(t *testing.T) {
p.Start()
runtime.Gosched()

counter1.Add(ctx, 3)
counter1.Add(ctx, 3, kv.String("X", "Y"))
counter2.Add(ctx, 5)

require.Equal(t, 0, fix.exporter.exports)
Expand All @@ -311,11 +317,16 @@ func TestPushExportError(t *testing.T) {
lock.Unlock()
require.Equal(t, len(tt.expectedDescriptors), len(records))
for _, r := range records {
require.Contains(t, tt.expectedDescriptors, r.Descriptor().Name())
require.Contains(t, tt.expectedDescriptors,
fmt.Sprintf("%s{%s,%s}",
r.Descriptor().Name(),
r.Resource().Encoded(label.DefaultEncoder()),
r.Labels().Encoded(label.DefaultEncoder()),
),
)
}

p.Stop()

})
}
}
Loading

0 comments on commit e2126bf

Please sign in to comment.