diff --git a/.chloggen/implement_contract_checker.yaml b/.chloggen/implement_contract_checker.yaml new file mode 100755 index 00000000000..75f4f5da188 --- /dev/null +++ b/.chloggen/implement_contract_checker.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: receivertest + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: add support for metrics in contract checker + +# One or more tracking issues or pull requests related to the change +issues: [9551] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/receiver/receivertest/contract_checker.go b/receiver/receivertest/contract_checker.go index 7bd71d4c3a9..43bc31f9fd7 100644 --- a/receiver/receivertest/contract_checker.go +++ b/receiver/receivertest/contract_checker.go @@ -22,6 +22,7 @@ import ( "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/receiver" ) @@ -116,8 +117,7 @@ func checkConsumeContractScenario(params CheckConsumeContractParams, decisionFun case component.DataTypeTraces: receiver, err = params.Factory.CreateTracesReceiver(ctx, NewNopCreateSettings(), params.Config, consumer) case component.DataTypeMetrics: - // TODO: add metrics support to mockConsumer so that this case can be also implemented. - require.FailNow(params.T, "DataTypeMetrics is not implemented") + receiver, err = params.Factory.CreateMetricsReceiver(ctx, NewNopCreateSettings(), params.Config, consumer) default: require.FailNow(params.T, "must specify a valid DataType to test for") } @@ -390,7 +390,76 @@ func idSetFromLogs(data plog.Logs) (idSet, error) { return ds, nil } -// TODO: Implement mockConsumer.ConsumeMetrics() +func (m *mockConsumer) ConsumeMetrics(_ context.Context, data pmetric.Metrics) error { + ids, err := idSetFromMetrics(data) + require.NoError(m.t, err) + return m.consume(ids) +} + +// idSetFromLogs computes an idSet from given plog.Logs. The idSet will contain ids of all log records. +func idSetFromMetrics(data pmetric.Metrics) (idSet, error) { + ds := map[UniqueIDAttrVal]bool{} + rss := data.ResourceMetrics() + for i := 0; i < rss.Len(); i++ { + ils := rss.At(i).ScopeMetrics() + for j := 0; j < ils.Len(); j++ { + ss := ils.At(j).Metrics() + for k := 0; k < ss.Len(); k++ { + elem := ss.At(k) + switch elem.Type() { + case pmetric.MetricTypeGauge: + for l := 0; l < elem.Gauge().DataPoints().Len(); l++ { + dp := elem.Gauge().DataPoints().At(l) + if err := idSetFromDataPoint(ds, dp.Attributes()); err != nil { + return ds, err + } + } + case pmetric.MetricTypeSum: + for l := 0; l < elem.Sum().DataPoints().Len(); l++ { + dp := elem.Sum().DataPoints().At(l) + if err := idSetFromDataPoint(ds, dp.Attributes()); err != nil { + return ds, err + } + } + case pmetric.MetricTypeSummary: + for l := 0; l < elem.Summary().DataPoints().Len(); l++ { + dp := elem.Summary().DataPoints().At(l) + if err := idSetFromDataPoint(ds, dp.Attributes()); err != nil { + return ds, err + } + } + case pmetric.MetricTypeHistogram: + for l := 0; l < elem.Histogram().DataPoints().Len(); l++ { + dp := elem.Histogram().DataPoints().At(l) + if err := idSetFromDataPoint(ds, dp.Attributes()); err != nil { + return ds, err + } + } + case pmetric.MetricTypeExponentialHistogram: + for l := 0; l < elem.ExponentialHistogram().DataPoints().Len(); l++ { + dp := elem.ExponentialHistogram().DataPoints().At(l) + if err := idSetFromDataPoint(ds, dp.Attributes()); err != nil { + return ds, err + } + } + } + } + } + } + return ds, nil +} + +func idSetFromDataPoint(ds map[UniqueIDAttrVal]bool, attributes pcommon.Map) error { + key, exists := attributes.Get(UniqueIDAttrName) + if !exists { + return fmt.Errorf("invalid data element, attribute %q is missing", UniqueIDAttrName) + } + if key.Type() != pcommon.ValueTypeStr { + return fmt.Errorf("invalid data element, attribute %q is wrong type %v", UniqueIDAttrName, key.Type()) + } + ds[UniqueIDAttrVal(key.Str())] = true + return nil +} // consume the elements with the specified ids, regardless of the element data type. func (m *mockConsumer) consume(ids idSet) error { @@ -437,3 +506,42 @@ func CreateOneLogWithID(id UniqueIDAttrVal) plog.Logs { ) return data } + +func CreateEveryMetricTypeWithID(id UniqueIDAttrVal) pmetric.Metrics { + data := pmetric.NewMetrics() + gauge := data.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics() + gauge.AppendEmpty().SetEmptyGauge().DataPoints().AppendEmpty().Attributes().PutStr( + UniqueIDAttrName, + string(id), + ) + sum := data.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics() + sum.AppendEmpty().SetEmptySum().DataPoints().AppendEmpty().Attributes().PutStr( + UniqueIDAttrName, + string(id), + ) + summary := data.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics() + summary.AppendEmpty().SetEmptySummary().DataPoints().AppendEmpty().Attributes().PutStr( + UniqueIDAttrName, + string(id), + ) + histogram := data.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics() + histogram.AppendEmpty().SetEmptyHistogram().DataPoints().AppendEmpty().Attributes().PutStr( + UniqueIDAttrName, + string(id), + ) + exponentialHistogram := data.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics() + exponentialHistogram.AppendEmpty().SetEmptyExponentialHistogram().DataPoints().AppendEmpty().Attributes().PutStr( + UniqueIDAttrName, + string(id), + ) + return data +} + +func CreateOneSpanWithID(id UniqueIDAttrVal) ptrace.Traces { + data := ptrace.NewTraces() + data.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty().Attributes().PutStr( + UniqueIDAttrName, + string(id), + ) + return data +} diff --git a/receiver/receivertest/contract_checker_test.go b/receiver/receivertest/contract_checker_test.go index 359d40015df..dcdd40b1482 100644 --- a/receiver/receivertest/contract_checker_test.go +++ b/receiver/receivertest/contract_checker_test.go @@ -13,6 +13,8 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/receiver" ) @@ -20,7 +22,9 @@ import ( // We declare a trivial example receiver, a data generator and then use them in TestConsumeContract(). type exampleReceiver struct { - nextConsumer consumer.Logs + nextLogsConsumer consumer.Logs + nextTracesConsumer consumer.Traces + nextMetricsConsumer consumer.Metrics } func (s *exampleReceiver) Start(_ context.Context, _ component.Host) error { @@ -31,10 +35,48 @@ func (s *exampleReceiver) Shutdown(_ context.Context) error { return nil } -func (s *exampleReceiver) Receive(data plog.Logs) { +func (s *exampleReceiver) ReceiveLogs(data plog.Logs) { // This very simple implementation demonstrates how a single items receiving should happen. for { - err := s.nextConsumer.ConsumeLogs(context.Background(), data) + err := s.nextLogsConsumer.ConsumeLogs(context.Background(), data) + if err != nil { + // The next consumer returned an error. + if !consumererror.IsPermanent(err) { + // It is not a permanent error, so we must retry sending it again. In network-based + // receivers instead we can ask our sender to re-retry the same data again later. + // We may also pause here a bit if we don't want to hammer the next consumer. + continue + } + } + // If we are hear either the ConsumeLogs returned success or it returned a permanent error. + // In either case we don't need to retry the same data, we are done. + return + } +} + +func (s *exampleReceiver) ReceiveMetrics(data pmetric.Metrics) { + // This very simple implementation demonstrates how a single items receiving should happen. + for { + err := s.nextMetricsConsumer.ConsumeMetrics(context.Background(), data) + if err != nil { + // The next consumer returned an error. + if !consumererror.IsPermanent(err) { + // It is not a permanent error, so we must retry sending it again. In network-based + // receivers instead we can ask our sender to re-retry the same data again later. + // We may also pause here a bit if we don't want to hammer the next consumer. + continue + } + } + // If we are hear either the ConsumeLogs returned success or it returned a permanent error. + // In either case we don't need to retry the same data, we are done. + return + } +} + +func (s *exampleReceiver) ReceiveTraces(data ptrace.Traces) { + // This very simple implementation demonstrates how a single items receiving should happen. + for { + err := s.nextTracesConsumer.ConsumeTraces(context.Background(), data) if err != nil { // The next consumer returned an error. if !consumererror.IsPermanent(err) { @@ -52,30 +94,95 @@ func (s *exampleReceiver) Receive(data plog.Logs) { // A config for exampleReceiver. type exampleReceiverConfig struct { - generator *exampleGenerator + generator Generator } // A generator that can send data to exampleReceiver. -type exampleGenerator struct { +type exampleLogGenerator struct { t *testing.T receiver *exampleReceiver sequenceNum int64 } -func (g *exampleGenerator) Start() { +func (g *exampleLogGenerator) Start() { g.sequenceNum = 0 } -func (g *exampleGenerator) Stop() {} +func (g *exampleLogGenerator) Stop() {} -func (g *exampleGenerator) Generate() []UniqueIDAttrVal { +func (g *exampleLogGenerator) Generate() []UniqueIDAttrVal { // Make sure the id is atomically incremented. Generate() may be called concurrently. id := UniqueIDAttrVal(strconv.FormatInt(atomic.AddInt64(&g.sequenceNum, 1), 10)) data := CreateOneLogWithID(id) // Send the generated data to the receiver. - g.receiver.Receive(data) + g.receiver.ReceiveLogs(data) + + // And return the ids for bookkeeping by the test. + return []UniqueIDAttrVal{id} +} + +// A generator that can send data to exampleReceiver. +type exampleTraceGenerator struct { + t *testing.T + receiver *exampleReceiver + sequenceNum int64 +} + +func (g *exampleTraceGenerator) Start() { + g.sequenceNum = 0 +} + +func (g *exampleTraceGenerator) Stop() {} + +func (g *exampleTraceGenerator) Generate() []UniqueIDAttrVal { + // Make sure the id is atomically incremented. Generate() may be called concurrently. + id := UniqueIDAttrVal(strconv.FormatInt(atomic.AddInt64(&g.sequenceNum, 1), 10)) + + data := CreateOneSpanWithID(id) + + // Send the generated data to the receiver. + g.receiver.ReceiveTraces(data) + + // And return the ids for bookkeeping by the test. + return []UniqueIDAttrVal{id} +} + +func (g *exampleLogGenerator) GenerateTraces() []UniqueIDAttrVal { + // Make sure the id is atomically incremented. Generate() may be called concurrently. + id := UniqueIDAttrVal(strconv.FormatInt(atomic.AddInt64(&g.sequenceNum, 1), 10)) + + data := CreateOneSpanWithID(id) + + // Send the generated data to the receiver. + g.receiver.ReceiveTraces(data) + + // And return the ids for bookkeeping by the test. + return []UniqueIDAttrVal{id} +} + +// A generator that can send data to exampleReceiver. +type exampleMetricGenerator struct { + t *testing.T + receiver *exampleReceiver + sequenceNum int64 +} + +func (g *exampleMetricGenerator) Start() { + g.sequenceNum = 0 +} + +func (g *exampleMetricGenerator) Stop() {} + +func (g *exampleMetricGenerator) Generate() []UniqueIDAttrVal { + // Make sure the id is atomically incremented. Generate() may be called concurrently. + id := UniqueIDAttrVal(strconv.FormatInt(atomic.AddInt64(&g.sequenceNum, 1), 10)) + + data := CreateEveryMetricTypeWithID(id) + + // Send the generated data to the receiver. + g.receiver.ReceiveMetrics(data) // And return the ids for bookkeeping by the test. return []UniqueIDAttrVal{id} @@ -88,17 +195,31 @@ func newExampleFactory() receiver.Factory { return &exampleReceiverConfig{} }, receiver.WithLogs(createLog, component.StabilityLevelBeta), + receiver.WithMetrics(createMetric, component.StabilityLevelBeta), + receiver.WithTraces(createTrace, component.StabilityLevelBeta), ) } +func createTrace(_ context.Context, _ receiver.CreateSettings, cfg component.Config, consumer consumer.Traces) (receiver.Traces, error) { + rcv := &exampleReceiver{nextTracesConsumer: consumer} + cfg.(*exampleReceiverConfig).generator.(*exampleTraceGenerator).receiver = rcv + return rcv, nil +} + +func createMetric(_ context.Context, _ receiver.CreateSettings, cfg component.Config, consumer consumer.Metrics) (receiver.Metrics, error) { + rcv := &exampleReceiver{nextMetricsConsumer: consumer} + cfg.(*exampleReceiverConfig).generator.(*exampleMetricGenerator).receiver = rcv + return rcv, nil +} + func createLog( _ context.Context, _ receiver.CreateSettings, cfg component.Config, consumer consumer.Logs, ) (receiver.Logs, error) { - rcv := &exampleReceiver{nextConsumer: consumer} - cfg.(*exampleReceiverConfig).generator.receiver = rcv + rcv := &exampleReceiver{nextLogsConsumer: consumer} + cfg.(*exampleReceiverConfig).generator.(*exampleLogGenerator).receiver = rcv return rcv, nil } @@ -109,7 +230,7 @@ func TestConsumeContract(t *testing.T) { // Number of log records to send per scenario. const logsPerTest = 100 - generator := &exampleGenerator{t: t} + generator := &exampleLogGenerator{t: t} cfg := &exampleReceiverConfig{generator: generator} params := CheckConsumeContractParams{ @@ -124,3 +245,49 @@ func TestConsumeContract(t *testing.T) { // Run the contract checker. This will trigger test failures if any problems are found. CheckConsumeContract(params) } + +// TestConsumeMetricsContract is an example of testing of the receiver for the contract between the +// receiver and next consumer. +func TestConsumeMetricsContract(t *testing.T) { + + // Number of metric data points to send per scenario. + const metricsPerTest = 100 + + generator := &exampleMetricGenerator{t: t} + cfg := &exampleReceiverConfig{generator: generator} + + params := CheckConsumeContractParams{ + T: t, + Factory: newExampleFactory(), + DataType: component.DataTypeMetrics, + Config: cfg, + Generator: generator, + GenerateCount: metricsPerTest, + } + + // Run the contract checker. This will trigger test failures if any problems are found. + CheckConsumeContract(params) +} + +// TestConsumeTracesContract is an example of testing of the receiver for the contract between the +// receiver and next consumer. +func TestConsumeTracesContract(t *testing.T) { + + // Number of trace spans to send per scenario. + const spansPerTest = 100 + + generator := &exampleTraceGenerator{t: t} + cfg := &exampleReceiverConfig{generator: generator} + + params := CheckConsumeContractParams{ + T: t, + Factory: newExampleFactory(), + DataType: component.DataTypeTraces, + Config: cfg, + Generator: generator, + GenerateCount: spansPerTest, + } + + // Run the contract checker. This will trigger test failures if any problems are found. + CheckConsumeContract(params) +}