Skip to content

Commit

Permalink
Add non-blocking bounded queue (ava-labs#2657)
Browse files Browse the repository at this point in the history
  • Loading branch information
Dan Laine authored Mar 15, 2023
1 parent 6f31b05 commit 6f58c6d
Show file tree
Hide file tree
Showing 2 changed files with 232 additions and 0 deletions.
90 changes: 90 additions & 0 deletions utils/buffer/bounded_nonblocking_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright (C) 2019-2022, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package buffer

import "errors"

var (
_ Queue[struct{}] = (*boundedQueue[struct{}])(nil)

errInvalidMaxSize = errors.New("maxSize must be greater than 0")
)

// A FIFO queue.
type Queue[T any] interface {
// Pushes [elt] onto the queue.
// If the queue is full, the oldest element is evicted to make space.
Push(T)
// Pops the oldest element from the queue.
// Returns false if the queue is empty.
Pop() (T, bool)
// Returns the oldest element without removing it.
// Returns false if the queue is empty.
Peek() (T, bool)
// Returns the element at the given index without removing it.
// Index(0) returns the oldest element.
// Index(Len() - 1) returns the newest element.
// Returns false if there is no element at that index.
Index(int) (T, bool)
// Returns the number of elements in the queue.
Len() int
// Returns the queue elements from oldest to newest.
// This is an O(n) operation and should be used sparingly.
List() []T
}

// Keeps up to [maxSize] entries in an ordered buffer
// and calls [onEvict] on any item that is evicted.
// Not safe for concurrent use.
type boundedQueue[T any] struct {
deque Deque[T]
maxSize int
onEvict func(T)
}

// Returns a new bounded, non-blocking queue that holds up to [maxSize] elements.
// When an element is evicted, [onEvict] is called with the evicted element.
// If [onEvict] is nil, this is a no-op.
// [maxSize] must be >= 1.
// Not safe for concurrent use.
func NewBoundedQueue[T any](maxSize int, onEvict func(T)) (Queue[T], error) {
if maxSize < 1 {
return nil, errInvalidMaxSize
}
return &boundedQueue[T]{
deque: NewUnboundedDeque[T](maxSize + 1), // +1 so we never resize
maxSize: maxSize,
onEvict: onEvict,
}, nil
}

func (b *boundedQueue[T]) Push(elt T) {
if b.deque.Len() == b.maxSize {
evicted, _ := b.deque.PopLeft()
if b.onEvict != nil {
b.onEvict(evicted)
}
}
_ = b.deque.PushRight(elt)
}

func (b *boundedQueue[T]) Pop() (T, bool) {
return b.deque.PopLeft()
}

func (b *boundedQueue[T]) Peek() (T, bool) {
return b.deque.PeekLeft()
}

func (b *boundedQueue[T]) Index(i int) (T, bool) {
return b.deque.Index(i)
}

func (b *boundedQueue[T]) Len() int {
return b.deque.Len()
}

func (b *boundedQueue[T]) List() []T {
return b.deque.List()
}
142 changes: 142 additions & 0 deletions utils/buffer/bounded_nonblocking_queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Copyright (C) 2019-2022, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package buffer

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestNewBoundedQueue(t *testing.T) {
require := require.New(t)

// Case: maxSize < 1
_, err := NewBoundedQueue[bool](0, nil)
require.Error(err)

// Case: maxSize == 1 and nil onEvict
b, err := NewBoundedQueue[bool](1, nil)
require.NoError(err)

// Put 2 elements to make sure we don't panic on evict
b.Push(true)
b.Push(true)
}

func TestBoundedQueue(t *testing.T) {
require := require.New(t)

maxSize := 3
evicted := []int{}
onEvict := func(elt int) {
evicted = append(evicted, elt)
}
b, err := NewBoundedQueue(maxSize, onEvict)
require.NoError(err)

require.Equal(0, b.Len())

// Fill the queue
for i := 0; i < maxSize; i++ {
b.Push(i)
require.Equal(i+1, b.Len())
got, ok := b.Peek()
require.True(ok)
require.Equal(0, got)
got, ok = b.Index(i)
require.True(ok)
require.Equal(i, got)
require.Len(b.List(), i+1)
}
require.Equal([]int{}, evicted)
require.Len(b.List(), maxSize)
// Queue is [0, 1, 2]

// Empty the queue
for i := 0; i < maxSize; i++ {
got, ok := b.Pop()
require.True(ok)
require.Equal(i, got)
require.Equal(maxSize-i-1, b.Len())
require.Len(b.List(), maxSize-i-1)
}

// Queue is empty

_, ok := b.Pop()
require.False(ok)
_, ok = b.Peek()
require.False(ok)
_, ok = b.Index(0)
require.False(ok)
require.Equal(0, b.Len())
require.Empty(b.List())

// Fill the queue again
for i := 0; i < maxSize; i++ {
b.Push(i)
require.Equal(i+1, b.Len())
}

// Queue is [0, 1, 2]

// Putting another element should evict the oldest.
b.Push(maxSize)

// Queue is [1, 2, 3]

require.Equal(maxSize, b.Len())
require.Len(b.List(), maxSize)
got, ok := b.Peek()
require.True(ok)
require.Equal(1, got)
got, ok = b.Index(0)
require.True(ok)
require.Equal(1, got)
got, ok = b.Index(maxSize - 1)
require.True(ok)
require.Equal(maxSize, got)
require.Equal([]int{0}, evicted)

// Put 2 more elements
b.Push(maxSize + 1)
b.Push(maxSize + 2)

// Queue is [3, 4, 5]

require.Equal(maxSize, b.Len())
require.Equal([]int{0, 1, 2}, evicted)
got, ok = b.Peek()
require.True(ok)
require.Equal(3, got)
require.Equal([]int{3, 4, 5}, b.List())

for i := maxSize; i < 2*maxSize; i++ {
got, ok := b.Index(i - maxSize)
require.True(ok)
require.Equal(i, got)
}

// Empty the queue
for i := 0; i < maxSize; i++ {
got, ok := b.Pop()
require.True(ok)
require.Equal(i+3, got)
require.Equal(maxSize-i-1, b.Len())
require.Len(b.List(), maxSize-i-1)
}

// Queue is empty

require.Empty(b.List())
require.Equal(0, b.Len())
require.Equal([]int{0, 1, 2}, evicted)
_, ok = b.Pop()
require.False(ok)
_, ok = b.Peek()
require.False(ok)
_, ok = b.Index(0)
require.False(ok)
}

0 comments on commit 6f58c6d

Please sign in to comment.