From f0dc15289f1220d422677933ebd1f9d679d57be7 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Wed, 18 Nov 2020 17:35:03 +0800 Subject: [PATCH] sampling: prevent duplicate publication of remotely tail-sampled trace events (#4438) * eventstorage: add Delete{Transaction,Span} methods * sampling: delete events from local storage Delete events from local storage when receiving a remote sampling decision. This prevents events from being published multiple times, e.g. due to the server being restarted and observing remote sampling decisions again. --- .../sampling/eventstorage/sharded.go | 22 +++++++++++++++++++ .../sampling/eventstorage/storage.go | 12 ++++++++++ x-pack/apm-server/sampling/processor.go | 20 +++++++++++++++++ x-pack/apm-server/sampling/processor_test.go | 11 ++++++++-- 4 files changed, 63 insertions(+), 2 deletions(-) diff --git a/x-pack/apm-server/sampling/eventstorage/sharded.go b/x-pack/apm-server/sampling/eventstorage/sharded.go index c0cb1089a75..afc433a5bb4 100644 --- a/x-pack/apm-server/sampling/eventstorage/sharded.go +++ b/x-pack/apm-server/sampling/eventstorage/sharded.go @@ -76,6 +76,16 @@ func (s *ShardedReadWriter) IsTraceSampled(traceID string) (bool, error) { return s.getWriter(traceID).IsTraceSampled(traceID) } +// DeleteTransaction calls Writer.DeleteTransaction, using a sharded, locked, Writer. +func (s *ShardedReadWriter) DeleteTransaction(tx *model.Transaction) error { + return s.getWriter(tx.TraceID).DeleteTransaction(tx) +} + +// DeleteSpan calls Writer.DeleteSpan, using a sharded, locked, Writer. +func (s *ShardedReadWriter) DeleteSpan(span *model.Span) error { + return s.getWriter(span.TraceID).DeleteSpan(span) +} + // getWriter returns an event storage writer for the given trace ID. // // This method is idempotent, which is necessary to avoid transaction @@ -133,3 +143,15 @@ func (rw *lockedReadWriter) IsTraceSampled(traceID string) (bool, error) { defer rw.mu.Unlock() return rw.rw.IsTraceSampled(traceID) } + +func (rw *lockedReadWriter) DeleteTransaction(tx *model.Transaction) error { + rw.mu.Lock() + defer rw.mu.Unlock() + return rw.rw.DeleteTransaction(tx) +} + +func (rw *lockedReadWriter) DeleteSpan(span *model.Span) error { + rw.mu.Lock() + defer rw.mu.Unlock() + return rw.rw.DeleteSpan(span) +} diff --git a/x-pack/apm-server/sampling/eventstorage/storage.go b/x-pack/apm-server/sampling/eventstorage/storage.go index 1e6648f2d89..a9716e34e53 100644 --- a/x-pack/apm-server/sampling/eventstorage/storage.go +++ b/x-pack/apm-server/sampling/eventstorage/storage.go @@ -175,6 +175,18 @@ func (rw *ReadWriter) writeEntry(e *badger.Entry) error { return rw.txn.SetEntry(e) } +// DeleteTransaction deletes the transaction from storage. +func (rw *ReadWriter) DeleteTransaction(tx *model.Transaction) error { + key := append(append([]byte(tx.TraceID), ':'), tx.ID...) + return rw.txn.Delete(key) +} + +// DeleteSpan deletes the span from storage. +func (rw *ReadWriter) DeleteSpan(span *model.Span) error { + key := append(append([]byte(span.TraceID), ':'), span.ID...) + return rw.txn.Delete(key) +} + // ReadEvents reads events with the given trace ID from storage into a batch. // // ReadEvents may implicitly commit the current transaction when the number diff --git a/x-pack/apm-server/sampling/processor.go b/x-pack/apm-server/sampling/processor.go index 87d15cf54eb..4e378c5d503 100644 --- a/x-pack/apm-server/sampling/processor.go +++ b/x-pack/apm-server/sampling/processor.go @@ -408,12 +408,14 @@ func (p *Processor) Run() error { // and just waiting as long as it takes here. var events model.Batch for { + var remoteDecision bool var traceID string select { case <-ctx.Done(): return ctx.Err() case traceID = <-remoteSampledTraceIDs: p.logger.Debug("received remotely sampled trace ID") + remoteDecision = true case traceID = <-localSampledTraceIDs: } if err := p.storage.WriteTraceSampled(traceID, true); err != nil { @@ -425,6 +427,24 @@ func (p *Processor) Run() error { transformables := events.Transformables() if len(transformables) > 0 { p.logger.Debugf("reporting %d events", len(transformables)) + if remoteDecision { + // Remote decisions may be received multiple times, + // e.g. if this server restarts and resubscribes to + // remote sampling decisions before they have been + // deleted. We delete events from local storage so + // we don't publish duplicates; delivery is therefore + // at-most-once, not guaranteed. + for _, tx := range events.Transactions { + if err := p.storage.DeleteTransaction(tx); err != nil { + return errors.Wrap(err, "failed to delete transaction from local storage") + } + } + for _, span := range events.Spans { + if err := p.storage.DeleteSpan(span); err != nil { + return errors.Wrap(err, "failed to delete span from local storage") + } + } + } if err := p.config.Reporter(ctx, publish.PendingReq{ Transformables: transformables, Trace: true, diff --git a/x-pack/apm-server/sampling/processor_test.go b/x-pack/apm-server/sampling/processor_test.go index ed7fffe0128..a010a4ad0f9 100644 --- a/x-pack/apm-server/sampling/processor_test.go +++ b/x-pack/apm-server/sampling/processor_test.go @@ -322,11 +322,16 @@ func TestProcessRemoteTailSampling(t *testing.T) { require.NoError(t, err) assert.Empty(t, out) + // Simulate receiving remote sampling decisions multiple times, + // to show that we don't report duplicate events. + subscriberChan <- traceID2 + subscriberChan <- traceID1 subscriberChan <- traceID2 subscriberChan <- traceID1 + var events []transform.Transformable select { - case <-reported: + case events = <-reported: case <-time.After(10 * time.Second): t.Fatal("timed out waiting for reporting") } @@ -346,6 +351,8 @@ func TestProcessRemoteTailSampling(t *testing.T) { expectedMonitoring.Ints["sampling.events.dropped"] = 0 assertMonitoring(t, processor, expectedMonitoring, `sampling.events.*`) + assert.Equal(t, trace1Events.Transformables(), events) + withBadger(t, config.StorageDir, func(db *badger.DB) { storage := eventstorage.New(db, eventstorage.JSONCodec{}, time.Minute) reader := storage.NewReadWriter() @@ -362,7 +369,7 @@ func TestProcessRemoteTailSampling(t *testing.T) { var batch model.Batch err = reader.ReadEvents(traceID1, &batch) assert.NoError(t, err) - assert.Equal(t, trace1Events, batch) + assert.Zero(t, batch) // events are deleted from local storage batch = model.Batch{} err = reader.ReadEvents(traceID2, &batch)