diff --git a/CHANGELOG.md b/CHANGELOG.md index 9decc17f33a7..79b78076fa34 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ## [Unreleased] +### Added + +- Integration tests for more OTel Collector Attribute types. (#1062) + ### Changed - Rename `sdk/metric/processor/test` to `sdk/metric/processor/processortest` diff --git a/exporters/otlp/otlp_integration_test.go b/exporters/otlp/otlp_integration_test.go index 5af4f7306a66..36e23226ee56 100644 --- a/exporters/otlp/otlp_integration_test.go +++ b/exporters/otlp/otlp_integration_test.go @@ -426,3 +426,149 @@ func TestNewExporter_withHeaders(t *testing.T) { require.Len(t, headers.Get("header1"), 1) assert.Equal(t, "value1", headers.Get("header1")[0]) } + +func TestNewExporter_withMultipleAttributeTypes(t *testing.T) { + mc := runMockCol(t) + + defer func() { + _ = mc.stop() + }() + + <-time.After(5 * time.Millisecond) + + exp, _ := otlp.NewExporter( + otlp.WithInsecure(), + otlp.WithReconnectionPeriod(50*time.Millisecond), + otlp.WithAddress(mc.address), + ) + + defer func() { + _ = exp.Stop() + }() + + tp, err := sdktrace.NewProvider( + sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}), + sdktrace.WithBatcher(exp, // add following two options to ensure flush + sdktrace.WithBatchTimeout(15*time.Millisecond), + sdktrace.WithMaxExportBatchSize(10), + )) + assert.NoError(t, err) + + tr := tp.Tracer("test-tracer") + testKvs := []label.KeyValue{ + label.Int("Int", 1), + label.Int32("Int32", int32(2)), + label.Int64("Int64", int64(3)), + label.Float32("Float32", float32(1.11)), + label.Float64("Float64", 2.22), + label.Bool("Bool", true), + label.String("String", "test"), + } + _, span := tr.Start(context.Background(), "AlwaysSample") + span.SetAttributes(testKvs...) + span.End() + + selector := simple.NewWithExactDistribution() + processor := processor.New(selector, metricsdk.PassThroughExporter) + pusher := push.New(processor, exp) + pusher.Start() + + // Flush and close. + pusher.Stop() + + // Wait >2 cycles. + <-time.After(40 * time.Millisecond) + + // Now shutdown the exporter + if err := exp.Stop(); err != nil { + t.Fatalf("failed to stop the exporter: %v", err) + } + + // Shutdown the collector too so that we can begin + // verification checks of expected data back. + _ = mc.stop() + + // Now verify that we only got one span + rss := mc.getSpans() + if got, want := len(rss), 1; got != want { + t.Fatalf("resource span count: got %d, want %d\n", got, want) + } + + expected := []*commonpb.KeyValue{ + { + Key: "Int", + Value: &commonpb.AnyValue{ + Value: &commonpb.AnyValue_IntValue{ + IntValue: 1, + }, + }, + }, + { + Key: "Int32", + Value: &commonpb.AnyValue{ + Value: &commonpb.AnyValue_IntValue{ + IntValue: 2, + }, + }, + }, + { + Key: "Int64", + Value: &commonpb.AnyValue{ + Value: &commonpb.AnyValue_IntValue{ + IntValue: 3, + }, + }, + }, + { + Key: "Float32", + Value: &commonpb.AnyValue{ + Value: &commonpb.AnyValue_DoubleValue{ + DoubleValue: 1.11, + }, + }, + }, + { + Key: "Float64", + Value: &commonpb.AnyValue{ + Value: &commonpb.AnyValue_DoubleValue{ + DoubleValue: 2.22, + }, + }, + }, + { + Key: "Bool", + Value: &commonpb.AnyValue{ + Value: &commonpb.AnyValue_BoolValue{ + BoolValue: true, + }, + }, + }, + { + Key: "String", + Value: &commonpb.AnyValue{ + Value: &commonpb.AnyValue_StringValue{ + StringValue: "test", + }, + }, + }, + } + + // Verify attributes + if !assert.Len(t, rss[0].Attributes, len(expected)) { + t.Fatalf("attributes count: got %d, want %d\n", len(rss[0].Attributes), len(expected)) + } + for i, actual := range rss[0].Attributes { + if a, ok := actual.Value.Value.(*commonpb.AnyValue_DoubleValue); ok { + e, ok := expected[i].Value.Value.(*commonpb.AnyValue_DoubleValue) + if !ok { + t.Errorf("expected AnyValue_DoubleValue, got %T", expected[i].Value.Value) + continue + } + if !assert.InDelta(t, e.DoubleValue, a.DoubleValue, 0.01) { + continue + } + e.DoubleValue = a.DoubleValue + } + assert.Equal(t, expected[i], actual) + } +}