Skip to content

Commit

Permalink
No queued retry on bad data (#91)
Browse files Browse the repository at this point in the history
* No queued retry on bad data

Allows the queued-retry to identify permanent errors from the consumer attached to it. This way it can safely drop batches of data data without burdening the system with retries.

* Move errorkind package and use require package in tests
  • Loading branch information
Paulo Janotti authored Jul 3, 2019
1 parent 886e62f commit 31d6389
Show file tree
Hide file tree
Showing 9 changed files with 200 additions and 13 deletions.
3 changes: 2 additions & 1 deletion cmd/occollector/app/sender/jaeger_proto_grpc_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/open-telemetry/opentelemetry-service/consumer"
"github.com/open-telemetry/opentelemetry-service/data"
"github.com/open-telemetry/opentelemetry-service/errors/errorkind"
jaegertranslator "github.com/open-telemetry/opentelemetry-service/translator/trace/jaeger"
)

Expand Down Expand Up @@ -55,7 +56,7 @@ func (s *JaegerProtoGRPCSender) ConsumeTraceData(ctx context.Context, td data.Tr
protoBatch, err := jaegertranslator.OCProtoToJaegerProto(td)
if err != nil {
s.logger.Warn("Error translating OC proto batch to Jaeger proto", zap.Error(err))
return err
return errorkind.Permanent(err)
}

_, err = s.client.PostSpans(context.Background(), &jaegerproto.PostSpansRequest{Batch: *protoBatch})
Expand Down
3 changes: 2 additions & 1 deletion cmd/occollector/app/sender/jaeger_thrift_http_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/open-telemetry/opentelemetry-service/consumer"
"github.com/open-telemetry/opentelemetry-service/data"
"github.com/open-telemetry/opentelemetry-service/errors/errorkind"
jaegertranslator "github.com/open-telemetry/opentelemetry-service/translator/trace/jaeger"
)

Expand Down Expand Up @@ -88,7 +89,7 @@ func (s *JaegerThriftHTTPSender) ConsumeTraceData(ctx context.Context, td data.T
// TODO: (@pjanotti) In case of failure the translation to Jaeger Thrift is going to be remade, cache it somehow.
tBatch, err := jaegertranslator.OCProtoToJaegerThrift(td)
if err != nil {
return err
return errorkind.Permanent(err)
}

body, err := serializeThrift(tBatch)
Expand Down
6 changes: 3 additions & 3 deletions cmd/occollector/app/sender/jaeger_thrift_tchannel_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ package sender
import (
"context"

"go.uber.org/zap"

reporter "github.com/jaegertracing/jaeger/cmd/agent/app/reporter"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-service/consumer"
"github.com/open-telemetry/opentelemetry-service/data"
"github.com/open-telemetry/opentelemetry-service/errors/errorkind"
jaegertranslator "github.com/open-telemetry/opentelemetry-service/translator/trace/jaeger"
)

Expand Down Expand Up @@ -51,7 +51,7 @@ func (s *JaegerThriftTChannelSender) ConsumeTraceData(ctx context.Context, td da
// TODO: (@pjanotti) In case of failure the translation to Jaeger Thrift is going to be remade, cache it somehow.
tBatch, err := jaegertranslator.OCProtoToJaegerThrift(td)
if err != nil {
return err
return errorkind.Permanent(err)
}

if err := s.reporter.EmitBatch(tBatch); err != nil {
Expand Down
41 changes: 41 additions & 0 deletions errors/errorkind/errorkind.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package errorkind provides wrappers to easily classify errors. This allows
// appropriate action by error handlers without the need to know each individual
// error type/instance.
package errorkind

// permanent is an error that will be always returned if its source
// receives the same inputs.
type permanent struct {
error
}

// Permanent wraps an error to indicate that it is a permanent error, i.e.: an
// error that will be always returned if its source receives the same inputs.
func Permanent(err error) error {
return permanent{err}
}

// IsPermanent checks if an error was wrapped with the Permanent function, that
// is used to indicate that a given error will always be returned in the case
// that its sources receives the same input.
func IsPermanent(err error) bool {
if err != nil {
_, isPermanent := err.(permanent)
return isPermanent
}
return false
}
41 changes: 41 additions & 0 deletions errors/errorkind/errorkind_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package errorkind provides wrappers to easily classify errors, allowing
// appropriate action by error handlers without need to know each individual
// error type/instance.
package errorkind

import (
"errors"
"testing"
)

func TestPermanent(t *testing.T) {
err := errors.New("testError")
if IsPermanent(err) {
t.Fatalf("IsPermanent() = true, want false")
}
err = Permanent(err)
if !IsPermanent(err) {
t.Fatalf("IsPermanent() = false, want true")
}
}

func TestIsPermanent_NilError(t *testing.T) {
var err error
if IsPermanent(err) {
t.Fatalf("IsPermanent() = true, want false")
}
}
6 changes: 3 additions & 3 deletions exporter/zipkinexporter/zipkin.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import (
"sync"
"time"

tracetranslator "github.com/open-telemetry/opentelemetry-service/translator/trace"

zipkinmodel "github.com/openzipkin/zipkin-go/model"
zipkinreporter "github.com/openzipkin/zipkin-go/reporter"
zipkinhttp "github.com/openzipkin/zipkin-go/reporter/http"
Expand All @@ -33,7 +31,9 @@ import (
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
"github.com/open-telemetry/opentelemetry-service/consumer"
"github.com/open-telemetry/opentelemetry-service/data"
"github.com/open-telemetry/opentelemetry-service/errors/errorkind"
"github.com/open-telemetry/opentelemetry-service/observability"
"github.com/open-telemetry/opentelemetry-service/translator/trace"
spandatatranslator "github.com/open-telemetry/opentelemetry-service/translator/trace/spandata"
)

Expand Down Expand Up @@ -208,7 +208,7 @@ func (ze *zipkinExporter) ConsumeTraceData(ctx context.Context, td data.TraceDat
for _, span := range td.Spans {
sd, err := spandatatranslator.ProtoSpanToOCSpanData(span)
if err != nil {
return err
return errorkind.Permanent(err)
}
zs, err := ze.zipkinSpan(td.Node, sd)
if err == nil {
Expand Down
37 changes: 34 additions & 3 deletions internal/collector/processor/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,18 @@ var (
TagServiceNameKey, _ = tag.NewKey("service")
TagExporterNameKey, _ = tag.NewKey("exporter")

StatReceivedSpanCount = stats.Int64("spans_received", "counts the number of spans received", stats.UnitDimensionless)
StatDroppedSpanCount = stats.Int64("spans_dropped", "counts the number of spans dropped", stats.UnitDimensionless)
StatReceivedSpanCount = stats.Int64(
"spans_received",
"counts the number of spans received",
stats.UnitDimensionless)
StatDroppedSpanCount = stats.Int64(
"spans_dropped",
"counts the number of spans dropped",
stats.UnitDimensionless)
StatBadBatchDroppedSpanCount = stats.Int64(
"bad_batch_spans_dropped",
"counts the number of spans dropped due to being in bad batches",
stats.UnitDimensionless)
)

// MetricTagKeys returns the metric tag keys according to the given telemetry level.
Expand Down Expand Up @@ -75,6 +85,13 @@ func MetricViews(level telemetry.Level) []*view.View {
TagKeys: tagKeys,
Aggregation: view.Count(),
}
droppedBadBatchesView := &view.View{
Name: "bad_batches_dropped",
Measure: StatBadBatchDroppedSpanCount,
Description: "The number of span batches with bad data that were dropped.",
TagKeys: tagKeys,
Aggregation: view.Count(),
}
receivedSpansView := &view.View{
Name: StatReceivedSpanCount.Name(),
Measure: StatReceivedSpanCount,
Expand All @@ -89,8 +106,22 @@ func MetricViews(level telemetry.Level) []*view.View {
TagKeys: tagKeys,
Aggregation: view.Sum(),
}
droppedSpansFromBadBatchesView := &view.View{
Name: StatBadBatchDroppedSpanCount.Name(),
Measure: StatBadBatchDroppedSpanCount,
Description: "The number of spans dropped from span batches with bad data.",
TagKeys: tagKeys,
Aggregation: view.Sum(),
}

return []*view.View{receivedBatchesView, droppedBatchesView, receivedSpansView, droppedSpansView}
return []*view.View{
receivedBatchesView,
droppedBatchesView,
receivedSpansView,
droppedSpansView,
droppedBadBatchesView,
droppedSpansFromBadBatchesView,
}
}

// ServiceNameForNode gets the service name for a specified node. Used for metrics.
Expand Down
21 changes: 21 additions & 0 deletions internal/collector/processor/queued/queued_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/open-telemetry/opentelemetry-service/consumer"
"github.com/open-telemetry/opentelemetry-service/data"
"github.com/open-telemetry/opentelemetry-service/errors/errorkind"
"github.com/open-telemetry/opentelemetry-service/internal/collector/processor"
"github.com/open-telemetry/opentelemetry-service/internal/collector/processor/nodebatcher"
"github.com/open-telemetry/opentelemetry-service/internal/collector/telemetry"
Expand Down Expand Up @@ -149,6 +150,26 @@ func (sp *queuedSpanProcessor) processItemFromQueue(item *queueItem) {

// There was an error
statsTags := processor.StatsTagsForBatch(sp.name, processor.ServiceNameForNode(item.td.Node), item.td.SourceFormat)

// Immediately drop data on permanent errors. In this context permanent
// errors indicate some kind of bad data.
if errorkind.IsPermanent(err) {
numSpans := len(item.td.Spans)
sp.logger.Warn(
"Unrecoverable bad data error",
zap.String("processor", sp.name),
zap.Int("#spans", numSpans),
zap.String("spanFormat", item.td.SourceFormat),
zap.Error(err))

stats.RecordWithTags(
context.Background(),
statsTags,
processor.StatBadBatchDroppedSpanCount.M(int64(numSpans)))

return
}

stats.RecordWithTags(context.Background(), statsTags, statFailedSendOps.M(1))
batchSize := len(item.td.Spans)
sp.logger.Warn("Sender failed", zap.String("processor", sp.name), zap.Error(err), zap.String("spanFormat", item.td.SourceFormat))
Expand Down
55 changes: 53 additions & 2 deletions internal/collector/processor/queued/queued_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,67 @@ package queued

import (
"context"
"errors"
"sync"
"sync/atomic"
"testing"

"github.com/open-telemetry/opentelemetry-service/consumer"
"time"

tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
"github.com/stretchr/testify/require"

"github.com/open-telemetry/opentelemetry-service/consumer"
"github.com/open-telemetry/opentelemetry-service/data"
"github.com/open-telemetry/opentelemetry-service/errors/errorkind"
)

func TestQueuedProcessor_noEnqueueOnPermanentError(t *testing.T) {
ctx := context.Background()
td := data.TraceData{
Spans: make([]*tracepb.Span, 7),
}

c := &waitGroupTraceConsumer{
consumeTraceDataError: errorkind.Permanent(errors.New("bad data")),
}

qp := NewQueuedSpanProcessor(
c,
Options.WithRetryOnProcessingFailures(true),
Options.WithBackoffDelay(time.Hour),
Options.WithNumWorkers(1),
Options.WithQueueSize(2),
).(*queuedSpanProcessor)

c.Add(1)
require.Nil(t, qp.ConsumeTraceData(ctx, td))
c.Wait()
<-time.After(50 * time.Millisecond)

require.Zero(t, qp.queue.Size())

c.consumeTraceDataError = errors.New("transient error")
c.Add(1)
// This is asynchronous so it should just enqueue, no errors expected.
require.Nil(t, qp.ConsumeTraceData(ctx, td))
c.Wait()
<-time.After(50 * time.Millisecond)

require.Equal(t, 1, qp.queue.Size())
}

type waitGroupTraceConsumer struct {
sync.WaitGroup
consumeTraceDataError error
}

var _ consumer.TraceConsumer = (*waitGroupTraceConsumer)(nil)

func (c *waitGroupTraceConsumer) ConsumeTraceData(ctx context.Context, td data.TraceData) error {
defer c.Done()
return c.consumeTraceDataError
}

func TestQueueProcessorHappyPath(t *testing.T) {
mockProc := newMockConcurrentSpanProcessor()
qp := NewQueuedSpanProcessor(mockProc)
Expand Down

0 comments on commit 31d6389

Please sign in to comment.