From 6c6e1e7416e996690a5d2dbd117ed9c8bb1a45cd Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 3 Apr 2024 04:53:16 -0700 Subject: [PATCH] Add queue for BatchingProcessor (#5131) --- sdk/log/batch.go | 87 ++++++++++++++++++++++++++++++++ sdk/log/batch_test.go | 113 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 200 insertions(+) diff --git a/sdk/log/batch.go b/sdk/log/batch.go index a17f92f5ed8..bb85a2a34fb 100644 --- a/sdk/log/batch.go +++ b/sdk/log/batch.go @@ -4,7 +4,9 @@ package log // import "go.opentelemetry.io/otel/sdk/log" import ( + "container/ring" "context" + "sync" "time" ) @@ -76,6 +78,91 @@ func (b *BatchingProcessor) ForceFlush(ctx context.Context) error { return nil } +// queue holds a queue of logging records. +// +// When the queue becomes full, the oldest records in the queue are +// overwritten. +type queue struct { + sync.Mutex + + cap, len int + read, write *ring.Ring +} + +func newQueue(size int) *queue { + r := ring.New(size) + return &queue{ + cap: size, + read: r, + write: r, + } +} + +// Enqueue adds r to the queue. The queue size, including the addition of r, is +// returned. +// +// If enqueueing r will exceed the capacity of q, the oldest Record held in q +// will be dropped and r retained. +func (q *queue) Enqueue(r Record) int { + q.Lock() + defer q.Unlock() + + q.write.Value = r + q.write = q.write.Next() + + q.len++ + if q.len > q.cap { + // Overflow. Advance read to be the new "oldest". + q.len = q.cap + q.read = q.read.Next() + } + return q.len +} + +// TryDequeue attempts to dequeue up to len(buf) Records. The available Records +// will be assigned into buf and passed to write. If write fails, returning +// false, the Records will not be removed from the queue. If write succeeds, +// returning true, the dequeued Records are removed from the queue. The number +// of Records remaining in the queue are returned. +// +// When write is called the lock of q is held. The write function must not call +// other methods of this q that acquire the lock. +func (q *queue) TryDequeue(buf []Record, write func([]Record) bool) int { + q.Lock() + defer q.Unlock() + + origRead := q.read + + n := min(len(buf), q.len) + for i := 0; i < n; i++ { + buf[i] = q.read.Value.(Record) + q.read = q.read.Next() + } + + if write(buf[:n]) { + q.len -= n + } else { + q.read = origRead + } + return q.len +} + +// Flush returns all the Records held in the queue and resets it to be +// empty. +func (q *queue) Flush() []Record { + q.Lock() + defer q.Unlock() + + out := make([]Record, q.len) + for i := range out { + out[i] = q.read.Value.(Record) + q.read = q.read.Next() + } + q.len = 0 + + return out +} + type batchingConfig struct { maxQSize setting[int] expInterval setting[time.Duration] diff --git a/sdk/log/batch_test.go b/sdk/log/batch_test.go index 111e2dea374..adbdb1d8bce 100644 --- a/sdk/log/batch_test.go +++ b/sdk/log/batch_test.go @@ -4,13 +4,17 @@ package log // import "go.opentelemetry.io/otel/sdk/log" import ( + "slices" "strconv" + "sync" "testing" "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/log" ) func TestNewBatchingConfig(t *testing.T) { @@ -126,3 +130,112 @@ func TestNewBatchingConfig(t *testing.T) { }) } } + +func TestQueue(t *testing.T) { + var r Record + r.SetBody(log.BoolValue(true)) + + t.Run("newQueue", func(t *testing.T) { + const size = 1 + q := newQueue(size) + assert.Equal(t, q.len, 0) + assert.Equal(t, size, q.cap, "capacity") + assert.Equal(t, size, q.read.Len(), "read ring") + assert.Same(t, q.read, q.write, "different rings") + }) + + t.Run("Enqueue", func(t *testing.T) { + const size = 2 + q := newQueue(size) + + var notR Record + notR.SetBody(log.IntValue(10)) + + assert.Equal(t, 1, q.Enqueue(notR), "incomplete batch") + assert.Equal(t, 1, q.len, "length") + assert.Equal(t, size, q.cap, "capacity") + + assert.Equal(t, 2, q.Enqueue(r), "complete batch") + assert.Equal(t, 2, q.len, "length") + assert.Equal(t, size, q.cap, "capacity") + + assert.Equal(t, 2, q.Enqueue(r), "overflow batch") + assert.Equal(t, 2, q.len, "length") + assert.Equal(t, size, q.cap, "capacity") + + assert.Equal(t, []Record{r, r}, q.Flush(), "flushed Records") + }) + + t.Run("Flush", func(t *testing.T) { + const size = 2 + q := newQueue(size) + q.write.Value = r + q.write = q.write.Next() + q.len = 1 + + assert.Equal(t, []Record{r}, q.Flush(), "flushed") + }) + + t.Run("TryFlush", func(t *testing.T) { + const size = 3 + q := newQueue(size) + for i := 0; i < size-1; i++ { + q.write.Value = r + q.write = q.write.Next() + q.len++ + } + + buf := make([]Record, 1) + f := func([]Record) bool { return false } + assert.Equal(t, size-1, q.TryDequeue(buf, f), "not flushed") + require.Equal(t, size-1, q.len, "length") + require.NotSame(t, q.read, q.write, "read ring advanced") + + var flushed []Record + f = func(r []Record) bool { + flushed = append(flushed, r...) + return true + } + if assert.Equal(t, size-2, q.TryDequeue(buf, f), "did not flush len(buf)") { + assert.Equal(t, []Record{r}, flushed, "Records") + } + + buf = slices.Grow(buf, size) + flushed = flushed[:0] + if assert.Equal(t, 0, q.TryDequeue(buf, f), "did not flush len(queue)") { + assert.Equal(t, []Record{r}, flushed, "Records") + } + }) + + t.Run("ConcurrentSafe", func(t *testing.T) { + const goRoutines = 10 + + flushed := make(chan []Record, goRoutines) + out := make([]Record, 0, goRoutines) + done := make(chan struct{}) + go func() { + defer close(done) + for recs := range flushed { + out = append(out, recs...) + } + }() + + var wg sync.WaitGroup + wg.Add(goRoutines) + + b := newQueue(goRoutines) + for i := 0; i < goRoutines; i++ { + go func() { + defer wg.Done() + b.Enqueue(Record{}) + flushed <- b.Flush() + }() + } + + wg.Wait() + close(flushed) + <-done + + assert.Len(t, out, goRoutines, "flushed Records") + }) +}