Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update FileDataProvider to use the new model.Unmarshaler, avoid InternalRep usage #3398

Merged
merged 1 commit into from
Jun 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 24 additions & 46 deletions testbed/testbed/data_providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,20 @@
package testbed

import (
"io/ioutil"
"log"
"os"
"path/filepath"
"strconv"
"time"

"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/proto"
"go.uber.org/atomic"

"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal"
otlplogscol "go.opentelemetry.io/collector/internal/data/protogen/collector/logs/v1"
otlpmetricscol "go.opentelemetry.io/collector/internal/data/protogen/collector/metrics/v1"
otlptracecol "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1"
"go.opentelemetry.io/collector/internal/goldendataset"
"go.opentelemetry.io/collector/internal/idutils"
"go.opentelemetry.io/collector/internal/otlp"
)

// DataProvider defines the interface for generators of test data used to drive various end-to-end tests.
Expand Down Expand Up @@ -257,7 +253,9 @@ func (dp *goldenDataProvider) GenerateLogs() (pdata.Logs, bool) {
// expects just a single JSON message in the entire file).
type FileDataProvider struct {
dataItemsGenerated *atomic.Uint64
message proto.Message
logs pdata.Logs
metrics pdata.Metrics
traces pdata.Traces
ItemsPerBatch int
}

Expand All @@ -268,70 +266,50 @@ func NewFileDataProvider(filePath string, dataType config.DataType) (*FileDataPr
if err != nil {
return nil, err
}
var buf []byte
buf, err = ioutil.ReadAll(file)
if err != nil {
return nil, err
}

var message proto.Message
var dataPointCount int

dp := &FileDataProvider{}
// Load the message from the file and count the data points.

switch dataType {
case config.TracesDataType:
var msg otlptracecol.ExportTraceServiceRequest
if err := protobufJSONUnmarshaler.Unmarshal(file, &msg); err != nil {
if dp.traces, err = otlp.NewJSONTracesUnmarshaler().Unmarshal(buf); err != nil {
return nil, err
}
message = &msg

md := pdata.TracesFromInternalRep(internal.TracesFromOtlp(&msg))
dataPointCount = md.SpanCount()

dp.ItemsPerBatch = dp.traces.SpanCount()
case config.MetricsDataType:
var msg otlpmetricscol.ExportMetricsServiceRequest
if err := protobufJSONUnmarshaler.Unmarshal(file, &msg); err != nil {
if dp.metrics, err = otlp.NewJSONMetricsUnmarshaler().Unmarshal(buf); err != nil {
return nil, err
}
message = &msg

md := pdata.MetricsFromInternalRep(internal.MetricsFromOtlp(&msg))
_, dataPointCount = md.MetricAndDataPointCount()

_, dp.ItemsPerBatch = dp.metrics.MetricAndDataPointCount()
case config.LogsDataType:
var msg otlplogscol.ExportLogsServiceRequest
if err := protobufJSONUnmarshaler.Unmarshal(file, &msg); err != nil {
if dp.logs, err = otlp.NewJSONLogsUnmarshaler().Unmarshal(buf); err != nil {
return nil, err
}
message = &msg

md := pdata.LogsFromInternalRep(internal.LogsFromOtlp(&msg))
dataPointCount = md.LogRecordCount()
dp.ItemsPerBatch = dp.logs.LogRecordCount()
}

return &FileDataProvider{
message: message,
ItemsPerBatch: dataPointCount,
}, nil
return dp, nil
}

func (dp *FileDataProvider) SetLoadGeneratorCounters(dataItemsGenerated *atomic.Uint64) {
dp.dataItemsGenerated = dataItemsGenerated
}

// Marshaler configuration used for marhsaling Protobuf to JSON. Use default config.
var protobufJSONUnmarshaler = &jsonpb.Unmarshaler{}

func (dp *FileDataProvider) GenerateTraces() (pdata.Traces, bool) {
// TODO: implement similar to GenerateMetrics.
return pdata.NewTraces(), true
dp.dataItemsGenerated.Add(uint64(dp.ItemsPerBatch))
return dp.traces, false
}

func (dp *FileDataProvider) GenerateMetrics() (pdata.Metrics, bool) {
md := pdata.MetricsFromInternalRep(internal.MetricsFromOtlp(dp.message.(*otlpmetricscol.ExportMetricsServiceRequest)))
_, dataPointCount := md.MetricAndDataPointCount()
dp.dataItemsGenerated.Add(uint64(dataPointCount))
return md, false
dp.dataItemsGenerated.Add(uint64(dp.ItemsPerBatch))
return dp.metrics, false
}

func (dp *FileDataProvider) GenerateLogs() (pdata.Logs, bool) {
// TODO: implement similar to GenerateMetrics.
return pdata.NewLogs(), true
dp.dataItemsGenerated.Add(uint64(dp.ItemsPerBatch))
return dp.logs, false
}
66 changes: 66 additions & 0 deletions testbed/tests/metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,14 @@ package tests
// coded in this file or use scenarios from perf_scenarios.go.

import (
"path"
"path/filepath"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/testbed/testbed"
)

Expand Down Expand Up @@ -96,3 +102,63 @@ func TestMetric10kDPS(t *testing.T) {
}

}

func TestMetricsFromFile(t *testing.T) {
// This test demonstrates usage of NewFileDataProvider to generate load using
// previously recorded data.

resultDir, err := filepath.Abs(path.Join("results", t.Name()))
require.NoError(t, err)

// Use metrics previously recorded using "file" exporter and "k8scluster" receiver.
dataProvider, err := testbed.NewFileDataProvider("testdata/k8s-metrics.json", config.MetricsDataType)
assert.NoError(t, err)

options := testbed.LoadOptions{
DataItemsPerSecond: 1_000,
Parallel: 1,
// ItemsPerBatch is based on the data from the file.
ItemsPerBatch: dataProvider.ItemsPerBatch,
}
agentProc := &testbed.ChildProcess{}

sender := testbed.NewOTLPMetricDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t))
receiver := testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t))

configStr := createConfigYaml(t, sender, receiver, resultDir, nil, nil)
configCleanup, err := agentProc.PrepareConfig(configStr)
require.NoError(t, err)
defer configCleanup()

tc := testbed.NewTestCase(
t,
dataProvider,
sender,
receiver,
agentProc,
&testbed.PerfTestValidator{},
performanceResultsSummary,
)
defer tc.Stop()

tc.SetResourceLimits(testbed.ResourceSpec{
ExpectedMaxCPU: 120,
ExpectedMaxRAM: 70,
})
tc.StartBackend()
tc.StartAgent("--log-level=debug")

tc.StartLoad(options)

tc.Sleep(tc.Duration)

tc.StopLoad()

tc.WaitFor(func() bool { return tc.LoadGenerator.DataItemsSent() > 0 }, "load generator started")
tc.WaitFor(func() bool { return tc.LoadGenerator.DataItemsSent() == tc.MockBackend.DataItemsReceived() },
"all data items received")

tc.StopAgent()

tc.ValidateData()
}
61 changes: 0 additions & 61 deletions testbed/tests/trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/idutils"
"go.opentelemetry.io/collector/testbed/testbed"
Expand Down Expand Up @@ -472,63 +471,3 @@ func TestTraceAttributesProcessor(t *testing.T) {
})
}
}

func TestMetricsFromFile(t *testing.T) {
// This test demonstrates usage of NewFileDataProvider to generate load using
// previously recorded data.

resultDir, err := filepath.Abs(path.Join("results", t.Name()))
require.NoError(t, err)

// Use metrics previously recorded using "file" exporter and "k8scluster" receiver.
dataProvider, err := testbed.NewFileDataProvider("testdata/k8s-metrics.json", config.MetricsDataType)
assert.NoError(t, err)

options := testbed.LoadOptions{
DataItemsPerSecond: 1_000,
Parallel: 1,
// ItemsPerBatch is based on the data from the file.
ItemsPerBatch: dataProvider.ItemsPerBatch,
}
agentProc := &testbed.ChildProcess{}

sender := testbed.NewOTLPMetricDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t))
receiver := testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t))

configStr := createConfigYaml(t, sender, receiver, resultDir, nil, nil)
configCleanup, err := agentProc.PrepareConfig(configStr)
require.NoError(t, err)
defer configCleanup()

tc := testbed.NewTestCase(
t,
dataProvider,
sender,
receiver,
agentProc,
&testbed.PerfTestValidator{},
performanceResultsSummary,
)
defer tc.Stop()

tc.SetResourceLimits(testbed.ResourceSpec{
ExpectedMaxCPU: 120,
ExpectedMaxRAM: 70,
})
tc.StartBackend()
tc.StartAgent("--log-level=debug")

tc.StartLoad(options)

tc.Sleep(tc.Duration)

tc.StopLoad()

tc.WaitFor(func() bool { return tc.LoadGenerator.DataItemsSent() > 0 }, "load generator started")
tc.WaitFor(func() bool { return tc.LoadGenerator.DataItemsSent() == tc.MockBackend.DataItemsReceived() },
"all data items received")

tc.StopAgent()

tc.ValidateData()
}