diff --git a/testbed/testbed/data_providers.go b/testbed/testbed/data_providers.go index aafa46254a0..aa37666c612 100644 --- a/testbed/testbed/data_providers.go +++ b/testbed/testbed/data_providers.go @@ -15,24 +15,20 @@ package testbed import ( + "io/ioutil" "log" "os" "path/filepath" "strconv" "time" - "github.com/gogo/protobuf/jsonpb" - "github.com/gogo/protobuf/proto" "go.uber.org/atomic" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/internal" - otlplogscol "go.opentelemetry.io/collector/internal/data/protogen/collector/logs/v1" - otlpmetricscol "go.opentelemetry.io/collector/internal/data/protogen/collector/metrics/v1" - otlptracecol "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1" "go.opentelemetry.io/collector/internal/goldendataset" "go.opentelemetry.io/collector/internal/idutils" + "go.opentelemetry.io/collector/internal/otlp" ) // DataProvider defines the interface for generators of test data used to drive various end-to-end tests. @@ -257,7 +253,9 @@ func (dp *goldenDataProvider) GenerateLogs() (pdata.Logs, bool) { // expects just a single JSON message in the entire file). type FileDataProvider struct { dataItemsGenerated *atomic.Uint64 - message proto.Message + logs pdata.Logs + metrics pdata.Metrics + traces pdata.Traces ItemsPerBatch int } @@ -268,70 +266,50 @@ func NewFileDataProvider(filePath string, dataType config.DataType) (*FileDataPr if err != nil { return nil, err } + var buf []byte + buf, err = ioutil.ReadAll(file) + if err != nil { + return nil, err + } - var message proto.Message - var dataPointCount int - + dp := &FileDataProvider{} // Load the message from the file and count the data points. - switch dataType { case config.TracesDataType: - var msg otlptracecol.ExportTraceServiceRequest - if err := protobufJSONUnmarshaler.Unmarshal(file, &msg); err != nil { + if dp.traces, err = otlp.NewJSONTracesUnmarshaler().Unmarshal(buf); err != nil { return nil, err } - message = &msg - - md := pdata.TracesFromInternalRep(internal.TracesFromOtlp(&msg)) - dataPointCount = md.SpanCount() - + dp.ItemsPerBatch = dp.traces.SpanCount() case config.MetricsDataType: - var msg otlpmetricscol.ExportMetricsServiceRequest - if err := protobufJSONUnmarshaler.Unmarshal(file, &msg); err != nil { + if dp.metrics, err = otlp.NewJSONMetricsUnmarshaler().Unmarshal(buf); err != nil { return nil, err } - message = &msg - - md := pdata.MetricsFromInternalRep(internal.MetricsFromOtlp(&msg)) - _, dataPointCount = md.MetricAndDataPointCount() - + _, dp.ItemsPerBatch = dp.metrics.MetricAndDataPointCount() case config.LogsDataType: - var msg otlplogscol.ExportLogsServiceRequest - if err := protobufJSONUnmarshaler.Unmarshal(file, &msg); err != nil { + if dp.logs, err = otlp.NewJSONLogsUnmarshaler().Unmarshal(buf); err != nil { return nil, err } - message = &msg - - md := pdata.LogsFromInternalRep(internal.LogsFromOtlp(&msg)) - dataPointCount = md.LogRecordCount() + dp.ItemsPerBatch = dp.logs.LogRecordCount() } - return &FileDataProvider{ - message: message, - ItemsPerBatch: dataPointCount, - }, nil + return dp, nil } func (dp *FileDataProvider) SetLoadGeneratorCounters(dataItemsGenerated *atomic.Uint64) { dp.dataItemsGenerated = dataItemsGenerated } -// Marshaler configuration used for marhsaling Protobuf to JSON. Use default config. -var protobufJSONUnmarshaler = &jsonpb.Unmarshaler{} - func (dp *FileDataProvider) GenerateTraces() (pdata.Traces, bool) { - // TODO: implement similar to GenerateMetrics. - return pdata.NewTraces(), true + dp.dataItemsGenerated.Add(uint64(dp.ItemsPerBatch)) + return dp.traces, false } func (dp *FileDataProvider) GenerateMetrics() (pdata.Metrics, bool) { - md := pdata.MetricsFromInternalRep(internal.MetricsFromOtlp(dp.message.(*otlpmetricscol.ExportMetricsServiceRequest))) - _, dataPointCount := md.MetricAndDataPointCount() - dp.dataItemsGenerated.Add(uint64(dataPointCount)) - return md, false + dp.dataItemsGenerated.Add(uint64(dp.ItemsPerBatch)) + return dp.metrics, false } func (dp *FileDataProvider) GenerateLogs() (pdata.Logs, bool) { - // TODO: implement similar to GenerateMetrics. - return pdata.NewLogs(), true + dp.dataItemsGenerated.Add(uint64(dp.ItemsPerBatch)) + return dp.logs, false } diff --git a/testbed/tests/metric_test.go b/testbed/tests/metric_test.go index 58859a5b436..d26ea534561 100644 --- a/testbed/tests/metric_test.go +++ b/testbed/tests/metric_test.go @@ -18,8 +18,14 @@ package tests // coded in this file or use scenarios from perf_scenarios.go. import ( + "path" + "path/filepath" "testing" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/testbed/testbed" ) @@ -96,3 +102,63 @@ func TestMetric10kDPS(t *testing.T) { } } + +func TestMetricsFromFile(t *testing.T) { + // This test demonstrates usage of NewFileDataProvider to generate load using + // previously recorded data. + + resultDir, err := filepath.Abs(path.Join("results", t.Name())) + require.NoError(t, err) + + // Use metrics previously recorded using "file" exporter and "k8scluster" receiver. + dataProvider, err := testbed.NewFileDataProvider("testdata/k8s-metrics.json", config.MetricsDataType) + assert.NoError(t, err) + + options := testbed.LoadOptions{ + DataItemsPerSecond: 1_000, + Parallel: 1, + // ItemsPerBatch is based on the data from the file. + ItemsPerBatch: dataProvider.ItemsPerBatch, + } + agentProc := &testbed.ChildProcess{} + + sender := testbed.NewOTLPMetricDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t)) + receiver := testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t)) + + configStr := createConfigYaml(t, sender, receiver, resultDir, nil, nil) + configCleanup, err := agentProc.PrepareConfig(configStr) + require.NoError(t, err) + defer configCleanup() + + tc := testbed.NewTestCase( + t, + dataProvider, + sender, + receiver, + agentProc, + &testbed.PerfTestValidator{}, + performanceResultsSummary, + ) + defer tc.Stop() + + tc.SetResourceLimits(testbed.ResourceSpec{ + ExpectedMaxCPU: 120, + ExpectedMaxRAM: 70, + }) + tc.StartBackend() + tc.StartAgent("--log-level=debug") + + tc.StartLoad(options) + + tc.Sleep(tc.Duration) + + tc.StopLoad() + + tc.WaitFor(func() bool { return tc.LoadGenerator.DataItemsSent() > 0 }, "load generator started") + tc.WaitFor(func() bool { return tc.LoadGenerator.DataItemsSent() == tc.MockBackend.DataItemsReceived() }, + "all data items received") + + tc.StopAgent() + + tc.ValidateData() +} diff --git a/testbed/tests/trace_test.go b/testbed/tests/trace_test.go index 22846894451..70294d68b37 100644 --- a/testbed/tests/trace_test.go +++ b/testbed/tests/trace_test.go @@ -29,7 +29,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/internal/idutils" "go.opentelemetry.io/collector/testbed/testbed" @@ -472,63 +471,3 @@ func TestTraceAttributesProcessor(t *testing.T) { }) } } - -func TestMetricsFromFile(t *testing.T) { - // This test demonstrates usage of NewFileDataProvider to generate load using - // previously recorded data. - - resultDir, err := filepath.Abs(path.Join("results", t.Name())) - require.NoError(t, err) - - // Use metrics previously recorded using "file" exporter and "k8scluster" receiver. - dataProvider, err := testbed.NewFileDataProvider("testdata/k8s-metrics.json", config.MetricsDataType) - assert.NoError(t, err) - - options := testbed.LoadOptions{ - DataItemsPerSecond: 1_000, - Parallel: 1, - // ItemsPerBatch is based on the data from the file. - ItemsPerBatch: dataProvider.ItemsPerBatch, - } - agentProc := &testbed.ChildProcess{} - - sender := testbed.NewOTLPMetricDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t)) - receiver := testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t)) - - configStr := createConfigYaml(t, sender, receiver, resultDir, nil, nil) - configCleanup, err := agentProc.PrepareConfig(configStr) - require.NoError(t, err) - defer configCleanup() - - tc := testbed.NewTestCase( - t, - dataProvider, - sender, - receiver, - agentProc, - &testbed.PerfTestValidator{}, - performanceResultsSummary, - ) - defer tc.Stop() - - tc.SetResourceLimits(testbed.ResourceSpec{ - ExpectedMaxCPU: 120, - ExpectedMaxRAM: 70, - }) - tc.StartBackend() - tc.StartAgent("--log-level=debug") - - tc.StartLoad(options) - - tc.Sleep(tc.Duration) - - tc.StopLoad() - - tc.WaitFor(func() bool { return tc.LoadGenerator.DataItemsSent() > 0 }, "load generator started") - tc.WaitFor(func() bool { return tc.LoadGenerator.DataItemsSent() == tc.MockBackend.DataItemsReceived() }, - "all data items received") - - tc.StopAgent() - - tc.ValidateData() -}