Skip to content

Commit

Permalink
Remove the number of items from returned values in Push funcs (#2684)
Browse files Browse the repository at this point in the history
Even in code for metrics we did not have a consistent implementation,
also most important thing was not used anymore in observability helper.

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored Mar 13, 2021
1 parent 59a20a1 commit 03904de
Show file tree
Hide file tree
Showing 20 changed files with 194 additions and 313 deletions.
6 changes: 3 additions & 3 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type request interface {
context() context.Context
// setContext updates the Context of the requests.
setContext(context.Context)
export(ctx context.Context) (int, error)
export(ctx context.Context) error
// Returns a new request that contains the items left to be sent.
onPartialError(consumererror.PartialError) request
// Returns the count of spans/metric points or log records.
Expand All @@ -54,7 +54,7 @@ type request interface {

// requestSender is an abstraction of a sender for a request independent of the type of the data (traces, metrics, logs).
type requestSender interface {
send(req request) (int, error)
send(req request) error
}

// baseRequest is a base implementation for the request.
Expand Down Expand Up @@ -205,7 +205,7 @@ type timeoutSender struct {
}

// send implements the requestSender interface
func (ts *timeoutSender) send(req request) (int, error) {
func (ts *timeoutSender) send(req request) error {
// Intentionally don't overwrite the context inside the request, because in case of retries deadline will not be
// updated because this deadline most likely is before the next one.
ctx := req.context()
Expand Down
12 changes: 6 additions & 6 deletions exporter/exporterhelper/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ var (
TypeVal: typeStr,
NameVal: typeStr,
}
nopTracesExporter, _ = NewTraceExporter(defaultCfg, zap.NewNop(), func(ctx context.Context, td pdata.Traces) (droppedSpans int, err error) {
return 0, nil
nopTracesExporter, _ = NewTraceExporter(defaultCfg, zap.NewNop(), func(ctx context.Context, td pdata.Traces) error {
return nil
})
nopMetricsExporter, _ = NewMetricsExporter(defaultCfg, zap.NewNop(), func(ctx context.Context, md pdata.Metrics) (droppedTimeSeries int, err error) {
return 0, nil
nopMetricsExporter, _ = NewMetricsExporter(defaultCfg, zap.NewNop(), func(ctx context.Context, md pdata.Metrics) error {
return nil
})
nopLogsExporter, _ = NewLogsExporter(defaultCfg, zap.NewNop(), func(ctx context.Context, md pdata.Logs) (droppedTimeSeries int, err error) {
return 0, nil
nopLogsExporter, _ = NewLogsExporter(defaultCfg, zap.NewNop(), func(ctx context.Context, md pdata.Logs) error {
return nil
})
)

Expand Down
13 changes: 6 additions & 7 deletions exporter/exporterhelper/logshelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (

// PushLogs is a helper function that is similar to ConsumeLogs but also returns
// the number of dropped logs.
type PushLogs func(ctx context.Context, md pdata.Logs) (droppedTimeSeries int, err error)
type PushLogs func(ctx context.Context, md pdata.Logs) error

type logsRequest struct {
baseRequest
Expand All @@ -49,7 +49,7 @@ func (req *logsRequest) onPartialError(partialErr consumererror.PartialError) re
return newLogsRequest(req.ctx, partialErr.GetLogs(), req.pusher)
}

func (req *logsRequest) export(ctx context.Context) (int, error) {
func (req *logsRequest) export(ctx context.Context) error {
return req.pusher(ctx, req.ld)
}

Expand All @@ -63,8 +63,7 @@ type logsExporter struct {
}

func (lexp *logsExporter) ConsumeLogs(ctx context.Context, ld pdata.Logs) error {
_, err := lexp.sender.send(newLogsRequest(ctx, ld, lexp.pusher))
return err
return lexp.sender.send(newLogsRequest(ctx, ld, lexp.pusher))
}

// NewLogsExporter creates an LogsExporter that records observability metrics and wraps every request with a Span.
Expand Down Expand Up @@ -105,9 +104,9 @@ type logsExporterWithObservability struct {
nextSender requestSender
}

func (lewo *logsExporterWithObservability) send(req request) (int, error) {
func (lewo *logsExporterWithObservability) send(req request) error {
req.setContext(lewo.obsrep.StartLogsExportOp(req.context()))
numDroppedLogs, err := lewo.nextSender.send(req)
err := lewo.nextSender.send(req)
lewo.obsrep.EndLogsExportOp(req.context(), req.count(), err)
return numDroppedLogs, err
return err
}
41 changes: 13 additions & 28 deletions exporter/exporterhelper/logshelper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,13 @@ func TestLogsRequest(t *testing.T) {
}

func TestLogsExporter_InvalidName(t *testing.T) {
le, err := NewLogsExporter(nil, zap.NewNop(), newPushLogsData(0, nil))
le, err := NewLogsExporter(nil, zap.NewNop(), newPushLogsData(nil))
require.Nil(t, le)
require.Equal(t, errNilConfig, err)
}

func TestLogsExporter_NilLogger(t *testing.T) {
le, err := NewLogsExporter(fakeLogsExporterConfig, nil, newPushLogsData(0, nil))
le, err := NewLogsExporter(fakeLogsExporterConfig, nil, newPushLogsData(nil))
require.Nil(t, le)
require.Equal(t, errNilLogger, err)
}
Expand All @@ -76,7 +76,7 @@ func TestLogsExporter_NilPushLogsData(t *testing.T) {

func TestLogsExporter_Default(t *testing.T) {
ld := testdata.GenerateLogDataEmpty()
le, err := NewLogsExporter(fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(0, nil))
le, err := NewLogsExporter(fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(nil))
assert.NotNil(t, le)
assert.NoError(t, err)

Expand All @@ -87,22 +87,14 @@ func TestLogsExporter_Default(t *testing.T) {
func TestLogsExporter_Default_ReturnError(t *testing.T) {
ld := testdata.GenerateLogDataEmpty()
want := errors.New("my_error")
le, err := NewLogsExporter(fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(0, want))
le, err := NewLogsExporter(fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(want))
require.Nil(t, err)
require.NotNil(t, le)
require.Equal(t, want, le.ConsumeLogs(context.Background(), ld))
}

func TestLogsExporter_WithRecordLogs(t *testing.T) {
le, err := NewLogsExporter(fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(0, nil))
require.Nil(t, err)
require.NotNil(t, le)

checkRecordedMetricsForLogsExporter(t, le, nil)
}

func TestLogsExporter_WithRecordLogs_NonZeroDropped(t *testing.T) {
le, err := NewLogsExporter(fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(1, nil))
le, err := NewLogsExporter(fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(nil))
require.Nil(t, err)
require.NotNil(t, le)

Expand All @@ -111,30 +103,23 @@ func TestLogsExporter_WithRecordLogs_NonZeroDropped(t *testing.T) {

func TestLogsExporter_WithRecordLogs_ReturnError(t *testing.T) {
want := errors.New("my_error")
le, err := NewLogsExporter(fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(0, want))
le, err := NewLogsExporter(fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(want))
require.Nil(t, err)
require.NotNil(t, le)

checkRecordedMetricsForLogsExporter(t, le, want)
}

func TestLogsExporter_WithSpan(t *testing.T) {
le, err := NewLogsExporter(fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(0, nil))
require.Nil(t, err)
require.NotNil(t, le)
checkWrapSpanForLogsExporter(t, le, nil, 1)
}

func TestLogsExporter_WithSpan_NonZeroDropped(t *testing.T) {
le, err := NewLogsExporter(fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(1, nil))
le, err := NewLogsExporter(fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(nil))
require.Nil(t, err)
require.NotNil(t, le)
checkWrapSpanForLogsExporter(t, le, nil, 1)
}

func TestLogsExporter_WithSpan_ReturnError(t *testing.T) {
want := errors.New("my_error")
le, err := NewLogsExporter(fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(0, want))
le, err := NewLogsExporter(fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(want))
require.Nil(t, err)
require.NotNil(t, le)
checkWrapSpanForLogsExporter(t, le, want, 1)
Expand All @@ -144,7 +129,7 @@ func TestLogsExporter_WithShutdown(t *testing.T) {
shutdownCalled := false
shutdown := func(context.Context) error { shutdownCalled = true; return nil }

le, err := NewLogsExporter(fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(0, nil), WithShutdown(shutdown))
le, err := NewLogsExporter(fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(nil), WithShutdown(shutdown))
assert.NotNil(t, le)
assert.NoError(t, err)

Expand All @@ -156,16 +141,16 @@ func TestLogsExporter_WithShutdown_ReturnError(t *testing.T) {
want := errors.New("my_error")
shutdownErr := func(context.Context) error { return want }

le, err := NewLogsExporter(fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(0, nil), WithShutdown(shutdownErr))
le, err := NewLogsExporter(fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(nil), WithShutdown(shutdownErr))
assert.NotNil(t, le)
assert.NoError(t, err)

assert.Equal(t, le.Shutdown(context.Background()), want)
}

func newPushLogsData(droppedTimeSeries int, retError error) PushLogs {
return func(ctx context.Context, td pdata.Logs) (int, error) {
return droppedTimeSeries, retError
func newPushLogsData(retError error) PushLogs {
return func(ctx context.Context, td pdata.Logs) error {
return retError
}
}

Expand Down
22 changes: 7 additions & 15 deletions exporter/exporterhelper/metricshelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (

// PushMetrics is a helper function that is similar to ConsumeMetrics but also returns
// the number of dropped metrics.
type PushMetrics func(ctx context.Context, md pdata.Metrics) (droppedTimeSeries int, err error)
type PushMetrics func(ctx context.Context, md pdata.Metrics) error

type metricsRequest struct {
baseRequest
Expand All @@ -49,7 +49,7 @@ func (req *metricsRequest) onPartialError(partialErr consumererror.PartialError)
return newMetricsRequest(req.ctx, partialErr.GetMetrics(), req.pusher)
}

func (req *metricsRequest) export(ctx context.Context) (int, error) {
func (req *metricsRequest) export(ctx context.Context) error {
return req.pusher(ctx, req.md)
}

Expand All @@ -67,8 +67,7 @@ func (mexp *metricsExporter) ConsumeMetrics(ctx context.Context, md pdata.Metric
if mexp.baseExporter.convertResourceToTelemetry {
md = convertResourceToLabels(md)
}
_, err := mexp.sender.send(newMetricsRequest(ctx, md, mexp.pusher))
return err
return mexp.sender.send(newMetricsRequest(ctx, md, mexp.pusher))
}

// NewMetricsExporter creates an MetricsExporter that records observability metrics and wraps every request with a Span.
Expand Down Expand Up @@ -109,16 +108,9 @@ type metricsSenderWithObservability struct {
nextSender requestSender
}

func (mewo *metricsSenderWithObservability) send(req request) (int, error) {
func (mewo *metricsSenderWithObservability) send(req request) error {
req.setContext(mewo.obsrep.StartMetricsExportOp(req.context()))
_, err := mewo.nextSender.send(req)

// TODO: this is not ideal: it should come from the next function itself.
// temporarily loading it from internal format. Once full switch is done
// to new metrics will remove this.
mReq := req.(*metricsRequest)
numReceivedMetrics, numPoints := mReq.md.MetricAndDataPointCount()

mewo.obsrep.EndMetricsExportOp(req.context(), numPoints, err)
return numReceivedMetrics, err
err := mewo.nextSender.send(req)
mewo.obsrep.EndMetricsExportOp(req.context(), req.count(), err)
return err
}
45 changes: 15 additions & 30 deletions exporter/exporterhelper/metricshelper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,13 @@ func TestMetricsRequest(t *testing.T) {
}

func TestMetricsExporter_InvalidName(t *testing.T) {
me, err := NewMetricsExporter(nil, zap.NewNop(), newPushMetricsData(0, nil))
me, err := NewMetricsExporter(nil, zap.NewNop(), newPushMetricsData(nil))
require.Nil(t, me)
require.Equal(t, errNilConfig, err)
}

func TestMetricsExporter_NilLogger(t *testing.T) {
me, err := NewMetricsExporter(fakeMetricsExporterConfig, nil, newPushMetricsData(0, nil))
me, err := NewMetricsExporter(fakeMetricsExporterConfig, nil, newPushMetricsData(nil))
require.Nil(t, me)
require.Equal(t, errNilLogger, err)
}
Expand All @@ -76,7 +76,7 @@ func TestMetricsExporter_NilPushMetricsData(t *testing.T) {

func TestMetricsExporter_Default(t *testing.T) {
md := testdata.GenerateMetricsEmpty()
me, err := NewMetricsExporter(fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(0, nil))
me, err := NewMetricsExporter(fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(nil))
assert.NotNil(t, me)
assert.NoError(t, err)

Expand All @@ -87,22 +87,14 @@ func TestMetricsExporter_Default(t *testing.T) {
func TestMetricsExporter_Default_ReturnError(t *testing.T) {
md := testdata.GenerateMetricsEmpty()
want := errors.New("my_error")
me, err := NewMetricsExporter(fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(0, want))
me, err := NewMetricsExporter(fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(want))
require.Nil(t, err)
require.NotNil(t, me)
require.Equal(t, want, me.ConsumeMetrics(context.Background(), md))
}

func TestMetricsExporter_WithRecordMetrics(t *testing.T) {
me, err := NewMetricsExporter(fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(0, nil))
require.Nil(t, err)
require.NotNil(t, me)

checkRecordedMetricsForMetricsExporter(t, me, nil)
}

func TestMetricsExporter_WithRecordMetrics_NonZeroDropped(t *testing.T) {
me, err := NewMetricsExporter(fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(1, nil))
me, err := NewMetricsExporter(fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(nil))
require.Nil(t, err)
require.NotNil(t, me)

Expand All @@ -111,30 +103,23 @@ func TestMetricsExporter_WithRecordMetrics_NonZeroDropped(t *testing.T) {

func TestMetricsExporter_WithRecordMetrics_ReturnError(t *testing.T) {
want := errors.New("my_error")
me, err := NewMetricsExporter(fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(0, want))
me, err := NewMetricsExporter(fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(want))
require.Nil(t, err)
require.NotNil(t, me)

checkRecordedMetricsForMetricsExporter(t, me, want)
}

func TestMetricsExporter_WithSpan(t *testing.T) {
me, err := NewMetricsExporter(fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(0, nil))
require.Nil(t, err)
require.NotNil(t, me)
checkWrapSpanForMetricsExporter(t, me, nil, 1)
}

func TestMetricsExporter_WithSpan_NonZeroDropped(t *testing.T) {
me, err := NewMetricsExporter(fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(1, nil))
me, err := NewMetricsExporter(fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(nil))
require.Nil(t, err)
require.NotNil(t, me)
checkWrapSpanForMetricsExporter(t, me, nil, 1)
}

func TestMetricsExporter_WithSpan_ReturnError(t *testing.T) {
want := errors.New("my_error")
me, err := NewMetricsExporter(fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(0, want))
me, err := NewMetricsExporter(fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(want))
require.Nil(t, err)
require.NotNil(t, me)
checkWrapSpanForMetricsExporter(t, me, want, 1)
Expand All @@ -144,7 +129,7 @@ func TestMetricsExporter_WithShutdown(t *testing.T) {
shutdownCalled := false
shutdown := func(context.Context) error { shutdownCalled = true; return nil }

me, err := NewMetricsExporter(fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(0, nil), WithShutdown(shutdown))
me, err := NewMetricsExporter(fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(nil), WithShutdown(shutdown))
assert.NotNil(t, me)
assert.NoError(t, err)

Expand All @@ -154,7 +139,7 @@ func TestMetricsExporter_WithShutdown(t *testing.T) {

func TestMetricsExporter_WithResourceToTelemetryConversionDisabled(t *testing.T) {
md := testdata.GenerateMetricsTwoMetrics()
me, err := NewMetricsExporter(fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(0, nil), WithResourceToTelemetryConversion(defaultResourceToTelemetrySettings()))
me, err := NewMetricsExporter(fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(nil), WithResourceToTelemetryConversion(defaultResourceToTelemetrySettings()))
assert.NotNil(t, me)
assert.NoError(t, err)

Expand All @@ -164,7 +149,7 @@ func TestMetricsExporter_WithResourceToTelemetryConversionDisabled(t *testing.T)

func TestMetricsExporter_WithResourceToTelemetryConversionEbabled(t *testing.T) {
md := testdata.GenerateMetricsTwoMetrics()
me, err := NewMetricsExporter(fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(0, nil), WithResourceToTelemetryConversion(ResourceToTelemetrySettings{Enabled: true}))
me, err := NewMetricsExporter(fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(nil), WithResourceToTelemetryConversion(ResourceToTelemetrySettings{Enabled: true}))
assert.NotNil(t, me)
assert.NoError(t, err)

Expand All @@ -176,16 +161,16 @@ func TestMetricsExporter_WithShutdown_ReturnError(t *testing.T) {
want := errors.New("my_error")
shutdownErr := func(context.Context) error { return want }

me, err := NewMetricsExporter(fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(0, nil), WithShutdown(shutdownErr))
me, err := NewMetricsExporter(fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(nil), WithShutdown(shutdownErr))
assert.NotNil(t, me)
assert.NoError(t, err)

assert.Equal(t, me.Shutdown(context.Background()), want)
}

func newPushMetricsData(droppedTimeSeries int, retError error) PushMetrics {
return func(ctx context.Context, td pdata.Metrics) (int, error) {
return droppedTimeSeries, retError
func newPushMetricsData(retError error) PushMetrics {
return func(ctx context.Context, td pdata.Metrics) error {
return retError
}
}

Expand Down
Loading

0 comments on commit 03904de

Please sign in to comment.