Skip to content

Commit

Permalink
Merge branch 'main' into kafkarecvpdata
Browse files Browse the repository at this point in the history
  • Loading branch information
bogdandrutu authored Jun 10, 2021
2 parents 3fbf5a8 + 5efd8fc commit 6d6245f
Show file tree
Hide file tree
Showing 17 changed files with 267 additions and 283 deletions.
12 changes: 6 additions & 6 deletions consumer/pdata/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,17 @@ func TestLogRecordCount(t *testing.T) {

func TestLogRecordCountWithEmpty(t *testing.T) {
assert.Zero(t, NewLogs().LogRecordCount())
assert.Zero(t, LogsFromInternalRep(internal.LogsFromOtlp(&otlpcollectorlog.ExportLogsServiceRequest{
assert.Zero(t, Logs{orig: &otlpcollectorlog.ExportLogsServiceRequest{
ResourceLogs: []*otlplogs.ResourceLogs{{}},
})).LogRecordCount())
assert.Zero(t, LogsFromInternalRep(internal.LogsFromOtlp(&otlpcollectorlog.ExportLogsServiceRequest{
}}.LogRecordCount())
assert.Zero(t, Logs{orig: &otlpcollectorlog.ExportLogsServiceRequest{
ResourceLogs: []*otlplogs.ResourceLogs{
{
InstrumentationLibraryLogs: []*otlplogs.InstrumentationLibraryLogs{{}},
},
},
})).LogRecordCount())
assert.Equal(t, 1, LogsFromInternalRep(internal.LogsFromOtlp(&otlpcollectorlog.ExportLogsServiceRequest{
}}.LogRecordCount())
assert.Equal(t, 1, Logs{orig: &otlpcollectorlog.ExportLogsServiceRequest{
ResourceLogs: []*otlplogs.ResourceLogs{
{
InstrumentationLibraryLogs: []*otlplogs.InstrumentationLibraryLogs{
Expand All @@ -68,7 +68,7 @@ func TestLogRecordCountWithEmpty(t *testing.T) {
},
},
},
})).LogRecordCount())
}}.LogRecordCount())
}

func TestToFromLogProto(t *testing.T) {
Expand Down
46 changes: 20 additions & 26 deletions consumer/pdata/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,19 +174,13 @@ func TestMetricCount(t *testing.T) {
assert.EqualValues(t, 6, md.MetricCount())
}

func TestMetricSize(t *testing.T) {
md := NewMetrics()
assert.Equal(t, 0, md.OtlpProtoSize())
rms := md.ResourceMetrics()
metric := rms.AppendEmpty().InstrumentationLibraryMetrics().AppendEmpty().Metrics().AppendEmpty()
metric.SetDataType(MetricDataTypeHistogram)
doubleHistogram := metric.Histogram()
pt := doubleHistogram.DataPoints().AppendEmpty()
pt.SetCount(123)
pt.SetSum(123)
otlp := internal.MetricsToOtlp(md.InternalRep())
size := otlp.Size()
bytes, err := otlp.Marshal()
func TestMetricsSize(t *testing.T) {
assert.Equal(t, 0, NewMetrics().OtlpProtoSize())

md := generateMetricsEmptyDataPoints()
orig := md.orig
size := orig.Size()
bytes, err := orig.Marshal()
require.NoError(t, err)
assert.Equal(t, size, md.OtlpProtoSize())
assert.Equal(t, len(bytes), md.OtlpProtoSize())
Expand Down Expand Up @@ -296,7 +290,7 @@ func TestMetricAndDataPointCountWithNilDataPoints(t *testing.T) {
}

func TestOtlpToInternalReadOnly(t *testing.T) {
metricData := MetricsFromInternalRep(internal.MetricsFromOtlp(&otlpcollectormetrics.ExportMetricsServiceRequest{
md := Metrics{orig: &otlpcollectormetrics.ExportMetricsServiceRequest{
ResourceMetrics: []*otlpmetrics.ResourceMetrics{
{
Resource: generateTestProtoResource(),
Expand All @@ -308,8 +302,8 @@ func TestOtlpToInternalReadOnly(t *testing.T) {
},
},
},
}))
resourceMetrics := metricData.ResourceMetrics()
}}
resourceMetrics := md.ResourceMetrics()
assert.EqualValues(t, 1, resourceMetrics.Len())

resourceMetric := resourceMetrics.At(0)
Expand Down Expand Up @@ -384,7 +378,7 @@ func TestOtlpToInternalReadOnly(t *testing.T) {
}

func TestOtlpToFromInternalReadOnly(t *testing.T) {
metricData := MetricsFromInternalRep(internal.MetricsFromOtlp(&otlpcollectormetrics.ExportMetricsServiceRequest{
md := MetricsFromInternalRep(internal.MetricsFromOtlp(&otlpcollectormetrics.ExportMetricsServiceRequest{
ResourceMetrics: []*otlpmetrics.ResourceMetrics{
{
Resource: generateTestProtoResource(),
Expand All @@ -410,7 +404,7 @@ func TestOtlpToFromInternalReadOnly(t *testing.T) {
},
},
},
}, internal.MetricsToOtlp(metricData.InternalRep()))
}, internal.MetricsToOtlp(md.InternalRep()))
}

func TestOtlpToFromInternalIntGaugeMutating(t *testing.T) {
Expand Down Expand Up @@ -971,23 +965,23 @@ func generateTestProtoDoubleHistogramMetric() *otlpmetrics.Metric {
}

func generateMetricsEmptyResource() Metrics {
return MetricsFromInternalRep(internal.MetricsFromOtlp(&otlpcollectormetrics.ExportMetricsServiceRequest{
return Metrics{orig: &otlpcollectormetrics.ExportMetricsServiceRequest{
ResourceMetrics: []*otlpmetrics.ResourceMetrics{{}},
}))
}}
}

func generateMetricsEmptyInstrumentation() Metrics {
return MetricsFromInternalRep(internal.MetricsFromOtlp(&otlpcollectormetrics.ExportMetricsServiceRequest{
return Metrics{orig: &otlpcollectormetrics.ExportMetricsServiceRequest{
ResourceMetrics: []*otlpmetrics.ResourceMetrics{
{
InstrumentationLibraryMetrics: []*otlpmetrics.InstrumentationLibraryMetrics{{}},
},
},
}))
}}
}

func generateMetricsEmptyMetrics() Metrics {
return MetricsFromInternalRep(internal.MetricsFromOtlp(&otlpcollectormetrics.ExportMetricsServiceRequest{
return Metrics{orig: &otlpcollectormetrics.ExportMetricsServiceRequest{
ResourceMetrics: []*otlpmetrics.ResourceMetrics{
{
InstrumentationLibraryMetrics: []*otlpmetrics.InstrumentationLibraryMetrics{
Expand All @@ -997,11 +991,11 @@ func generateMetricsEmptyMetrics() Metrics {
},
},
},
}))
}}
}

func generateMetricsEmptyDataPoints() Metrics {
return MetricsFromInternalRep(internal.MetricsFromOtlp(&otlpcollectormetrics.ExportMetricsServiceRequest{
return Metrics{orig: &otlpcollectormetrics.ExportMetricsServiceRequest{
ResourceMetrics: []*otlpmetrics.ResourceMetrics{
{
InstrumentationLibraryMetrics: []*otlpmetrics.InstrumentationLibraryMetrics{
Expand All @@ -1021,5 +1015,5 @@ func generateMetricsEmptyDataPoints() Metrics {
},
},
},
}))
}}
}
10 changes: 5 additions & 5 deletions consumer/pdata/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,14 @@ func TestSpanCount(t *testing.T) {
assert.EqualValues(t, 6, md.SpanCount())
}

func TestSize(t *testing.T) {
func TestTracesSize(t *testing.T) {
assert.Equal(t, 0, NewTraces().OtlpProtoSize())
td := NewTraces()
assert.Equal(t, 0, td.OtlpProtoSize())
rms := td.ResourceSpans()
rms.AppendEmpty().InstrumentationLibrarySpans().AppendEmpty().Spans().AppendEmpty().SetName("foo")
otlp := internal.TracesToOtlp(td.InternalRep())
size := otlp.Size()
bytes, err := otlp.Marshal()
orig := td.orig
size := orig.Size()
bytes, err := orig.Marshal()
require.NoError(t, err)
assert.Equal(t, size, td.OtlpProtoSize())
assert.Equal(t, len(bytes), td.OtlpProtoSize())
Expand Down
89 changes: 45 additions & 44 deletions exporter/loggingexporter/logging_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,53 +26,77 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/internal/model"
"go.opentelemetry.io/collector/internal/otlptext"
)

type loggingExporter struct {
logger *zap.Logger
debug bool
logger *zap.Logger
debug bool
logsMarshaler model.LogsMarshaler
metricsMarshaler model.MetricsMarshaler
tracesMarshaler model.TracesMarshaler
}

func (s *loggingExporter) pushTraces(
_ context.Context,
td pdata.Traces,
) error {

func (s *loggingExporter) pushTraces(_ context.Context, td pdata.Traces) error {
s.logger.Info("TracesExporter", zap.Int("#spans", td.SpanCount()))

if !s.debug {
return nil
}

s.logger.Debug(otlptext.Traces(td))

buf, err := s.tracesMarshaler.Marshal(td)
if err != nil {
return err
}
s.logger.Debug(string(buf))
return nil
}

func (s *loggingExporter) pushMetrics(
_ context.Context,
md pdata.Metrics,
) error {
func (s *loggingExporter) pushMetrics(_ context.Context, md pdata.Metrics) error {
s.logger.Info("MetricsExporter", zap.Int("#metrics", md.MetricCount()))

if !s.debug {
return nil
}

s.logger.Debug(otlptext.Metrics(md))
buf, err := s.metricsMarshaler.Marshal(md)
if err != nil {
return err
}
s.logger.Debug(string(buf))
return nil
}

func (s *loggingExporter) pushLogs(_ context.Context, ld pdata.Logs) error {
s.logger.Info("LogsExporter", zap.Int("#logs", ld.LogRecordCount()))

if !s.debug {
return nil
}

buf, err := s.logsMarshaler.Marshal(ld)
if err != nil {
return err
}
s.logger.Debug(string(buf))
return nil
}

func newLoggingExporter(level string, logger *zap.Logger) *loggingExporter {
return &loggingExporter{
debug: strings.ToLower(level) == "debug",
logger: logger,
logsMarshaler: otlptext.NewTextLogsMarshaler(),
metricsMarshaler: otlptext.NewTextMetricsMarshaler(),
tracesMarshaler: otlptext.NewTextTracesMarshaler(),
}
}

// newTracesExporter creates an exporter.TracesExporter that just drops the
// received data and logs debugging messages.
func newTracesExporter(config config.Exporter, level string, logger *zap.Logger) (component.TracesExporter, error) {
s := &loggingExporter{
debug: strings.ToLower(level) == "debug",
logger: logger,
}

s := newLoggingExporter(level, logger)
return exporterhelper.NewTracesExporter(
config,
logger,
Expand All @@ -89,11 +113,7 @@ func newTracesExporter(config config.Exporter, level string, logger *zap.Logger)
// newMetricsExporter creates an exporter.MetricsExporter that just drops the
// received data and logs debugging messages.
func newMetricsExporter(config config.Exporter, level string, logger *zap.Logger) (component.MetricsExporter, error) {
s := &loggingExporter{
debug: strings.ToLower(level) == "debug",
logger: logger,
}

s := newLoggingExporter(level, logger)
return exporterhelper.NewMetricsExporter(
config,
logger,
Expand All @@ -110,11 +130,7 @@ func newMetricsExporter(config config.Exporter, level string, logger *zap.Logger
// newLogsExporter creates an exporter.LogsExporter that just drops the
// received data and logs debugging messages.
func newLogsExporter(config config.Exporter, level string, logger *zap.Logger) (component.LogsExporter, error) {
s := &loggingExporter{
debug: strings.ToLower(level) == "debug",
logger: logger,
}

s := newLoggingExporter(level, logger)
return exporterhelper.NewLogsExporter(
config,
logger,
Expand All @@ -128,21 +144,6 @@ func newLogsExporter(config config.Exporter, level string, logger *zap.Logger) (
)
}

func (s *loggingExporter) pushLogs(
_ context.Context,
ld pdata.Logs,
) error {
s.logger.Info("LogsExporter", zap.Int("#logs", ld.LogRecordCount()))

if !s.debug {
return nil
}

s.logger.Debug(otlptext.Logs(ld))

return nil
}

func loggerSync(logger *zap.Logger) func(context.Context) error {
return func(context.Context) error {
// Currently Sync() return a different error depending on the OS.
Expand Down
38 changes: 38 additions & 0 deletions exporter/loggingexporter/logging_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package loggingexporter

import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -62,3 +63,40 @@ func TestLoggingLogsExporterNoErrors(t *testing.T) {

assert.NoError(t, lle.Shutdown(context.Background()))
}

func TestLoggingExporterErrors(t *testing.T) {
le := newLoggingExporter("Debug", zap.NewNop())
require.NotNil(t, le)

errWant := errors.New("my error")
le.tracesMarshaler = &errTracesMarshaler{err: errWant}
le.metricsMarshaler = &errMetricsMarshaler{err: errWant}
le.logsMarshaler = &errLogsMarshaler{err: errWant}
assert.Equal(t, errWant, le.pushTraces(context.Background(), pdata.NewTraces()))
assert.Equal(t, errWant, le.pushMetrics(context.Background(), pdata.NewMetrics()))
assert.Equal(t, errWant, le.pushLogs(context.Background(), pdata.NewLogs()))
}

type errLogsMarshaler struct {
err error
}

func (e errLogsMarshaler) Marshal(pdata.Logs) ([]byte, error) {
return nil, e.err
}

type errMetricsMarshaler struct {
err error
}

func (e errMetricsMarshaler) Marshal(pdata.Metrics) ([]byte, error) {
return nil, e.err
}

type errTracesMarshaler struct {
err error
}

func (e errTracesMarshaler) Marshal(pdata.Traces) ([]byte, error) {
return nil, e.err
}
12 changes: 6 additions & 6 deletions internal/otlp/unmarshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,32 @@ import (
"go.opentelemetry.io/collector/internal/model"
)

// NewJSONTracesUnmarshaler returns a model.TracesUnmarshaler. Unmarshalls from OTLP json bytes.
// NewJSONTracesUnmarshaler returns a model.TracesUnmarshaler. Unmarshals from OTLP json bytes.
func NewJSONTracesUnmarshaler() model.TracesUnmarshaler {
return model.NewTracesUnmarshaler(newJSONDecoder(), newToTranslator())
}

// NewJSONMetricsUnmarshaler returns a model.MetricsUnmarshaler. Unmarshalls from OTLP json bytes.
// NewJSONMetricsUnmarshaler returns a model.MetricsUnmarshaler. Unmarshals from OTLP json bytes.
func NewJSONMetricsUnmarshaler() model.MetricsUnmarshaler {
return model.NewMetricsUnmarshaler(newJSONDecoder(), newToTranslator())
}

// NewJSONLogsUnmarshaler returns a model.LogsUnmarshaler. Unmarshalls from OTLP json bytes.
// NewJSONLogsUnmarshaler returns a model.LogsUnmarshaler. Unmarshals from OTLP json bytes.
func NewJSONLogsUnmarshaler() model.LogsUnmarshaler {
return model.NewLogsUnmarshaler(newJSONDecoder(), newToTranslator())
}

// NewProtobufTracesUnmarshaler returns a model.TracesUnmarshaler. Unmarshalls from OTLP binary protobuf bytes.
// NewProtobufTracesUnmarshaler returns a model.TracesUnmarshaler. Unmarshals from OTLP binary protobuf bytes.
func NewProtobufTracesUnmarshaler() model.TracesUnmarshaler {
return model.NewTracesUnmarshaler(newPbDecoder(), newToTranslator())
}

// NewProtobufMetricsUnmarshaler returns a model.MetricsUnmarshaler. Unmarshalls from OTLP binary protobuf bytes.
// NewProtobufMetricsUnmarshaler returns a model.MetricsUnmarshaler. Unmarshals from OTLP binary protobuf bytes.
func NewProtobufMetricsUnmarshaler() model.MetricsUnmarshaler {
return model.NewMetricsUnmarshaler(newPbDecoder(), newToTranslator())
}

// NewProtobufLogsUnmarshaler returns a model.LogsUnmarshaler. Unmarshalls from OTLP binary protobuf bytes.
// NewProtobufLogsUnmarshaler returns a model.LogsUnmarshaler. Unmarshals from OTLP binary protobuf bytes.
func NewProtobufLogsUnmarshaler() model.LogsUnmarshaler {
return model.NewLogsUnmarshaler(newPbDecoder(), newToTranslator())
}
Loading

0 comments on commit 6d6245f

Please sign in to comment.