Skip to content

Commit

Permalink
sampling: prevent duplicate publication of remotely tail-sampled trac…
Browse files Browse the repository at this point in the history
…e events (#4438)

* eventstorage: add Delete{Transaction,Span} methods

* sampling: delete events from local storage

Delete events from local storage when receiving
a remote sampling decision. This prevents events
from being published multiple times, e.g. due to
the server being restarted and observing remote
sampling decisions again.
  • Loading branch information
axw authored Nov 18, 2020
1 parent 33e1a92 commit f0dc152
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 2 deletions.
22 changes: 22 additions & 0 deletions x-pack/apm-server/sampling/eventstorage/sharded.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,16 @@ func (s *ShardedReadWriter) IsTraceSampled(traceID string) (bool, error) {
return s.getWriter(traceID).IsTraceSampled(traceID)
}

// DeleteTransaction calls Writer.DeleteTransaction, using a sharded, locked, Writer.
func (s *ShardedReadWriter) DeleteTransaction(tx *model.Transaction) error {
return s.getWriter(tx.TraceID).DeleteTransaction(tx)
}

// DeleteSpan calls Writer.DeleteSpan, using a sharded, locked, Writer.
func (s *ShardedReadWriter) DeleteSpan(span *model.Span) error {
return s.getWriter(span.TraceID).DeleteSpan(span)
}

// getWriter returns an event storage writer for the given trace ID.
//
// This method is idempotent, which is necessary to avoid transaction
Expand Down Expand Up @@ -133,3 +143,15 @@ func (rw *lockedReadWriter) IsTraceSampled(traceID string) (bool, error) {
defer rw.mu.Unlock()
return rw.rw.IsTraceSampled(traceID)
}

func (rw *lockedReadWriter) DeleteTransaction(tx *model.Transaction) error {
rw.mu.Lock()
defer rw.mu.Unlock()
return rw.rw.DeleteTransaction(tx)
}

func (rw *lockedReadWriter) DeleteSpan(span *model.Span) error {
rw.mu.Lock()
defer rw.mu.Unlock()
return rw.rw.DeleteSpan(span)
}
12 changes: 12 additions & 0 deletions x-pack/apm-server/sampling/eventstorage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,18 @@ func (rw *ReadWriter) writeEntry(e *badger.Entry) error {
return rw.txn.SetEntry(e)
}

// DeleteTransaction deletes the transaction from storage.
func (rw *ReadWriter) DeleteTransaction(tx *model.Transaction) error {
key := append(append([]byte(tx.TraceID), ':'), tx.ID...)
return rw.txn.Delete(key)
}

// DeleteSpan deletes the span from storage.
func (rw *ReadWriter) DeleteSpan(span *model.Span) error {
key := append(append([]byte(span.TraceID), ':'), span.ID...)
return rw.txn.Delete(key)
}

// ReadEvents reads events with the given trace ID from storage into a batch.
//
// ReadEvents may implicitly commit the current transaction when the number
Expand Down
20 changes: 20 additions & 0 deletions x-pack/apm-server/sampling/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,12 +408,14 @@ func (p *Processor) Run() error {
// and just waiting as long as it takes here.
var events model.Batch
for {
var remoteDecision bool
var traceID string
select {
case <-ctx.Done():
return ctx.Err()
case traceID = <-remoteSampledTraceIDs:
p.logger.Debug("received remotely sampled trace ID")
remoteDecision = true
case traceID = <-localSampledTraceIDs:
}
if err := p.storage.WriteTraceSampled(traceID, true); err != nil {
Expand All @@ -425,6 +427,24 @@ func (p *Processor) Run() error {
transformables := events.Transformables()
if len(transformables) > 0 {
p.logger.Debugf("reporting %d events", len(transformables))
if remoteDecision {
// Remote decisions may be received multiple times,
// e.g. if this server restarts and resubscribes to
// remote sampling decisions before they have been
// deleted. We delete events from local storage so
// we don't publish duplicates; delivery is therefore
// at-most-once, not guaranteed.
for _, tx := range events.Transactions {
if err := p.storage.DeleteTransaction(tx); err != nil {
return errors.Wrap(err, "failed to delete transaction from local storage")
}
}
for _, span := range events.Spans {
if err := p.storage.DeleteSpan(span); err != nil {
return errors.Wrap(err, "failed to delete span from local storage")
}
}
}
if err := p.config.Reporter(ctx, publish.PendingReq{
Transformables: transformables,
Trace: true,
Expand Down
11 changes: 9 additions & 2 deletions x-pack/apm-server/sampling/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,11 +322,16 @@ func TestProcessRemoteTailSampling(t *testing.T) {
require.NoError(t, err)
assert.Empty(t, out)

// Simulate receiving remote sampling decisions multiple times,
// to show that we don't report duplicate events.
subscriberChan <- traceID2
subscriberChan <- traceID1
subscriberChan <- traceID2
subscriberChan <- traceID1

var events []transform.Transformable
select {
case <-reported:
case events = <-reported:
case <-time.After(10 * time.Second):
t.Fatal("timed out waiting for reporting")
}
Expand All @@ -346,6 +351,8 @@ func TestProcessRemoteTailSampling(t *testing.T) {
expectedMonitoring.Ints["sampling.events.dropped"] = 0
assertMonitoring(t, processor, expectedMonitoring, `sampling.events.*`)

assert.Equal(t, trace1Events.Transformables(), events)

withBadger(t, config.StorageDir, func(db *badger.DB) {
storage := eventstorage.New(db, eventstorage.JSONCodec{}, time.Minute)
reader := storage.NewReadWriter()
Expand All @@ -362,7 +369,7 @@ func TestProcessRemoteTailSampling(t *testing.T) {
var batch model.Batch
err = reader.ReadEvents(traceID1, &batch)
assert.NoError(t, err)
assert.Equal(t, trace1Events, batch)
assert.Zero(t, batch) // events are deleted from local storage

batch = model.Batch{}
err = reader.ReadEvents(traceID2, &batch)
Expand Down

0 comments on commit f0dc152

Please sign in to comment.