Skip to content

Commit

Permalink
Remove internal goroutine loop for persistent queue (#8868)
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored Nov 14, 2023
1 parent 1a21d18 commit d0f9a78
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 165 deletions.
13 changes: 13 additions & 0 deletions .chloggen/rmloop.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: "enhancement"

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: "exporterhelper"

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Remove internal goroutine loop for persistent queue"

# One or more tracking issues or pull requests related to the change
issues: [8868]
13 changes: 6 additions & 7 deletions exporter/exporterhelper/internal/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ var (
// persistentQueue holds the queue backed by file storage
type persistentQueue struct {
stopWG sync.WaitGroup
stopChan chan struct{}
set exporter.CreateSettings
storageID component.ID
storage *persistentContiguousStorage
Expand All @@ -48,7 +47,6 @@ func NewPersistentQueue(capacity int, numConsumers int, storageID component.ID,
storageID: storageID,
marshaler: marshaler,
unmarshaler: unmarshaler,
stopChan: make(chan struct{}),
}
}

Expand All @@ -64,12 +62,11 @@ func (pq *persistentQueue) Start(ctx context.Context, host component.Host, set Q
go func() {
defer pq.stopWG.Done()
for {
select {
case req := <-pq.storage.get():
set.Callback(req)
case <-pq.stopChan:
req, found := pq.storage.get()
if !found {
return
}
set.Callback(req)
}
}()
}
Expand All @@ -85,7 +82,9 @@ func (pq *persistentQueue) Produce(_ context.Context, item any) bool {

// Shutdown stops accepting items, shuts down the queue and closes the persistent queue
func (pq *persistentQueue) Shutdown(ctx context.Context) error {
close(pq.stopChan)
if pq.storage != nil {
close(pq.storage.stopChan)
}
pq.stopWG.Wait()
return stopStorage(pq.storage, ctx)
}
Expand Down
12 changes: 2 additions & 10 deletions exporter/exporterhelper/internal/persistent_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestPersistentQueue_Capacity(t *testing.T) {
require.NoError(t, err)

// Stop consumer to imitate queue overflow
close(pq.(*persistentQueue).stopChan)
close(pq.(*persistentQueue).storage.stopChan)
pq.(*persistentQueue).stopWG.Wait()

assert.Equal(t, 0, pq.Size())
Expand All @@ -63,19 +63,11 @@ func TestPersistentQueue_Capacity(t *testing.T) {

for i := 0; i < 10; i++ {
result := pq.Produce(context.Background(), req)
if i < 6 {
if i < 5 {
assert.True(t, result)
} else {
assert.False(t, result)
}

// Let's make sure the loop picks the first element into the channel,
// so the capacity could be used in full
if i == 0 {
assert.Eventually(t, func() bool {
return pq.Size() == 0
}, 5*time.Second, 10*time.Millisecond)
}
}
assert.Equal(t, 5, pq.Size())
assert.NoError(t, stopStorage(pq.(*persistentQueue).storage, context.Background()))
Expand Down
85 changes: 36 additions & 49 deletions exporter/exporterhelper/internal/persistent_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ type persistentContiguousStorage struct {
stopChan chan struct{}
capacity uint64

reqChan chan QueueRequest

mu sync.Mutex
readIndex itemIndex
writeIndex itemIndex
Expand Down Expand Up @@ -88,7 +86,6 @@ func newPersistentContiguousStorage(ctx context.Context, client storage.Client,
marshaler: marshaler,
capacity: capacity,
putChan: make(chan struct{}, capacity),
reqChan: make(chan QueueRequest),
stopChan: make(chan struct{}),
itemsCount: &atomic.Uint64{},
}
Expand All @@ -105,9 +102,6 @@ func newPersistentContiguousStorage(ctx context.Context, client storage.Client,
pcs.putChan <- struct{}{}
}

// start the loop which moves items from storage to the outbound channel
go pcs.loop()

return pcs
}

Expand Down Expand Up @@ -156,34 +150,28 @@ func (pcs *persistentContiguousStorage) enqueueNotDispatchedReqs(reqs []any) {
}
}

// loop is the main loop that handles fetching items from the persistent buffer
func (pcs *persistentContiguousStorage) loop() {
// get returns the request channel that all the requests will be send on
func (pcs *persistentContiguousStorage) get() (QueueRequest, bool) {
for {
select {
case <-pcs.stopChan:
return
return QueueRequest{}, false
case <-pcs.putChan:
req, found := pcs.getNextItem(context.Background())
if found {
pcs.reqChan <- req
req := pcs.getNextItem(context.Background())
if req.Request != nil {
return req, true
}
}
}
}

// get returns the request channel that all the requests will be send on
func (pcs *persistentContiguousStorage) get() <-chan QueueRequest {
return pcs.reqChan
}

// size returns the number of currently available items, which were not picked by consumers yet
func (pcs *persistentContiguousStorage) size() uint64 {
return pcs.itemsCount.Load()
}

func (pcs *persistentContiguousStorage) stop(ctx context.Context) error {
pcs.logger.Debug("Stopping persistentContiguousStorage")
close(pcs.stopChan)
return pcs.client.Close(ctx)
}

Expand Down Expand Up @@ -222,47 +210,46 @@ func (pcs *persistentContiguousStorage) put(req any) error {
}

// getNextItem pulls the next available item from the persistent storage; if none is found, returns (nil, false)
func (pcs *persistentContiguousStorage) getNextItem(ctx context.Context) (QueueRequest, bool) {
func (pcs *persistentContiguousStorage) getNextItem(ctx context.Context) QueueRequest {
pcs.mu.Lock()
defer pcs.mu.Unlock()

if pcs.readIndex != pcs.writeIndex {
index := pcs.readIndex
// Increase here, so even if errors happen below, it always iterates
pcs.readIndex++
pcs.itemsCount.Store(uint64(pcs.writeIndex - pcs.readIndex))
if pcs.readIndex == pcs.writeIndex {
return QueueRequest{}
}
index := pcs.readIndex
// Increase here, so even if errors happen below, it always iterates
pcs.readIndex++
pcs.itemsCount.Store(uint64(pcs.writeIndex - pcs.readIndex))

pcs.updateReadIndex(ctx)
pcs.itemDispatchingStart(ctx, index)
pcs.updateReadIndex(ctx)
pcs.itemDispatchingStart(ctx, index)

req := newQueueRequest(context.Background(), nil)
itemKey := getItemKey(index)
buf, err := pcs.client.Get(ctx, itemKey)
if err == nil {
req.Request, err = pcs.unmarshaler(buf)
}

if err != nil || req.Request == nil {
// We need to make sure that currently dispatched items list is cleaned
if err := pcs.itemDispatchingFinish(ctx, index); err != nil {
pcs.logger.Error("Error deleting item from queue", zap.Error(err))
}
req := newQueueRequest(context.Background(), nil)
itemKey := getItemKey(index)
buf, err := pcs.client.Get(ctx, itemKey)
if err == nil {
req.Request, err = pcs.unmarshaler(buf)
}

return QueueRequest{}, false
if err != nil || req.Request == nil {
// We need to make sure that currently dispatched items list is cleaned
if err := pcs.itemDispatchingFinish(ctx, index); err != nil {
pcs.logger.Error("Error deleting item from queue", zap.Error(err))
}

// If all went well so far, cleanup will be handled by callback
req.onProcessingFinishedFunc = func() {
pcs.mu.Lock()
defer pcs.mu.Unlock()
if err := pcs.itemDispatchingFinish(ctx, index); err != nil {
pcs.logger.Error("Error deleting item from queue", zap.Error(err))
}
}
return req, true
return QueueRequest{}
}

return QueueRequest{}, false
// If all went well so far, cleanup will be handled by callback
req.onProcessingFinishedFunc = func() {
pcs.mu.Lock()
defer pcs.mu.Unlock()
if err := pcs.itemDispatchingFinish(ctx, index); err != nil {
pcs.logger.Error("Error deleting item from queue", zap.Error(err))
}
}
return req
}

// retrieveNotDispatchedReqs gets the items for which sending was not finished, cleans the storage
Expand Down
Loading

0 comments on commit d0f9a78

Please sign in to comment.