Skip to content

Commit

Permalink
util: Add generic queue.Queue[T] container
Browse files Browse the repository at this point in the history
Add generic `queue.Queue[T]` contaner.
Supports `Push()` operation.  Suports FIFO, and
LIFO consumption (`PopFront()`, `Pop()`).

This implementation allocates chunks of []T to
ammortize memory allocations.

Relase note: None
  • Loading branch information
Yevgeniy Miretskiy committed Apr 7, 2023
1 parent 7e9ab75 commit 64fef1a
Show file tree
Hide file tree
Showing 4 changed files with 371 additions and 0 deletions.
4 changes: 4 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,7 @@ ALL_TESTS = [
"//pkg/util/pretty:pretty_test",
"//pkg/util/protoutil:protoutil_test",
"//pkg/util/quantile:quantile_test",
"//pkg/util/queue:queue_test",
"//pkg/util/quotapool:quotapool_test",
"//pkg/util/randident:randident_test",
"//pkg/util/randutil:randutil_test",
Expand Down Expand Up @@ -2223,6 +2224,8 @@ GO_TARGETS = [
"//pkg/util/protoutil:protoutil_test",
"//pkg/util/quantile:quantile",
"//pkg/util/quantile:quantile_test",
"//pkg/util/queue:queue",
"//pkg/util/queue:queue_test",
"//pkg/util/quotapool:quotapool",
"//pkg/util/quotapool:quotapool_test",
"//pkg/util/randident/randidentcfg:randidentcfg",
Expand Down Expand Up @@ -3312,6 +3315,7 @@ GET_X_DATA_TARGETS = [
"//pkg/util/pretty:get_x_data",
"//pkg/util/protoutil:get_x_data",
"//pkg/util/quantile:get_x_data",
"//pkg/util/queue:get_x_data",
"//pkg/util/quotapool:get_x_data",
"//pkg/util/randident:get_x_data",
"//pkg/util/randident/randidentcfg:get_x_data",
Expand Down
23 changes: 23 additions & 0 deletions pkg/util/queue/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "queue",
srcs = ["queue.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/util/queue",
visibility = ["//visibility:public"],
)

go_test(
name = "queue_test",
srcs = ["queue_test.go"],
args = ["-test.timeout=295s"],
embed = [":queue"],
deps = [
"//pkg/util/leaktest",
"//pkg/util/randutil",
"@com_github_stretchr_testify//require",
],
)

get_x_data(name = "get_x_data")
131 changes: 131 additions & 0 deletions pkg/util/queue/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
//

package queue

import "math/bits"

// Queue is a generic queue container with amortized allocation cost.
// Zero initialized Queue[T] is safe to use.
type Queue[T any] struct {
head, tail *chunk[T]
len int
rPos int // read position in this queue relative to head.
wPos int // write position in this queue relative to tail.
}

// Len returns the number of elements in the queue.
func (q *Queue[T]) Len() int {
return q.len
}

// Empty returns true if the queue is empty.
func (q *Queue[T]) Empty() bool {
return q.len == 0
}

// Push adds element to this queue.
func (q *Queue[T]) Push(v T) {
if q.head == nil {
if q.tail != nil {
panic("expected empty queue tail")
}
c := q.newChunk()
q.head = &c
q.tail = q.head
q.rPos = 0
q.wPos = 0
}

if q.wPos == cap(q.tail.data) {
c := q.newChunk()
c.prev = q.tail
q.tail.next = &c
q.tail = &c
q.wPos = 0
}

q.tail.data[q.wPos] = v
q.wPos++
q.len++
}

// Pop returns the last element in the queue, or false if the queue is empty.
func (q *Queue[T]) Pop() (v T, ok bool) {
if q.len == 0 {
return v, false
}

q.wPos--
v = q.tail.data[q.wPos]
q.len--

if q.head == q.tail && q.rPos == q.wPos {
// Everything was consumed from the single chunk.
// Reset read/write positions.
q.rPos = 0
q.wPos = 0
} else if q.wPos == 0 {
if prev := q.tail.prev; prev != nil {
prev.next = nil
q.tail = prev
q.wPos = cap(prev.data)
}
}

return v, true
}

// PopFront returns the first element in the queue, or false if queue is empty.
func (q *Queue[T]) PopFront() (v T, ok bool) {
if q.len == 0 {
return v, false
}

v = q.head.data[q.rPos]
q.rPos++
q.len--

if q.head == q.tail && q.rPos == q.wPos {
// Everything was consumed from the single chunk.
// Reset read/write positions.
q.rPos = 0
q.wPos = 0
} else if q.rPos == cap(q.head.data) {
// Chunk is now empty, release it.
q.rPos = 0
q.head = q.head.next
}

return v, true
}

func (q *Queue[T]) newChunk() (c chunk[T]) {
c.data = make([]T, chunkSize(q.len))
return c
}

// returns next chunk size based on the current queue length.
func chunkSize(l int) int {
if l < 16 {
return 16
}
if l > 512 {
return 1024
}
return 1 << (64 - bits.LeadingZeros(uint(l)))
}

type chunk[T any] struct {
data []T
next *chunk[T]
prev *chunk[T]
}
213 changes: 213 additions & 0 deletions pkg/util/queue/queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package queue

import (
"fmt"
"math/rand"
"strconv"
"testing"

"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/stretchr/testify/require"
)

func countChunks[T any](q Queue[T]) (n int) {
for h := q.head; h != nil; h = h.next {
n++
}
return n
}

func pushPopTest[T any](t *testing.T, g func(i int) T) {
var q Queue[T]
require.Equal(t, 0, q.Len())
require.True(t, q.Empty())
require.Equal(t, 0, countChunks(q))

var vals [3]T
for i := range vals {
vals[i] = g(i)
q.Push(vals[i])
}

require.Equal(t, 3, q.Len())
require.False(t, q.Empty())
require.Equal(t, 1, countChunks(q))

v, ok := q.PopFront()
require.True(t, ok)
require.Equal(t, vals[0], v)

v, ok = q.Pop()
require.True(t, ok)
require.Equal(t, vals[2], v, vals)

v, ok = q.Pop()
require.True(t, ok)
require.Equal(t, vals[1], v)

// After popping the value, the underlying chunk remains.
require.Equal(t, 1, countChunks(q))
}

func corpusTest[T any](t *testing.T, n int, g func(i int) T) {
var q Queue[T]
require.Equal(t, 0, q.Len())
require.True(t, q.Empty())
require.Equal(t, 0, countChunks(q))

var corpus []T
if n > 0 || rand.Int()%2 == 0 { // when n is 0, keep corpus nil (50%), or initialized.
corpus = make([]T, n)
for i := 0; i < n; i++ {
corpus[i] = g(i)
}
}

expectChunks := func(n int) (c int) {
added := 0
for n > 0 {
s := chunkSize(added + 1)
added += s
n -= s
c++
}
return c
}

insertCorpus := func() {
for _, v := range corpus {
q.Push(v)
}
}

insertCorpus()
require.Equal(t, expectChunks(len(corpus)), countChunks(q))

n = 0
for !q.Empty() {
v, ok := q.PopFront()
require.True(t, ok)
require.Equal(t, corpus[n], v)
n++
}

require.Equal(t, 0, q.Len())
require.True(t, q.Empty())
}

func queueTestCases[T any](t *testing.T, g func(i int) T) {
t.Run("pushPop", func(t *testing.T) {
pushPopTest[T](t, g)
})

for _, n := range []int{0, 1, 15, 16, 17, 65, 256, rand.Intn(4 * 1024)} {
t.Run(fmt.Sprintf("corpus=%d", n), func(t *testing.T) {
corpusTest[T](t, n, g)
})
}
}

func TestQueue(t *testing.T) {
defer leaktest.AfterTest(t)()

t.Run("int", func(t *testing.T) {
queueTestCases[int](t, func(i int) int {
return rand.Int()
})
})

t.Run("string", func(t *testing.T) {
queueTestCases[string](t, func(i int) string {
return strconv.Itoa(rand.Int())
})
})

t.Run("[]int", func(t *testing.T) {
queueTestCases[[]int](t, func(i int) (res []int) {
for i := 0; i < rand.Intn(100); i++ {
res = append(res, rand.Int())
}
return res
})
})

t.Run("struct", func(t *testing.T) {
type rec struct {
i int
}
queueTestCases[rec](t, func(i int) rec {
return rec{i: i}
})
})
}

func queueBench[T any](b *testing.B, makeT func(i int, rnd *rand.Rand) T) {
for _, popChance := range []float64{0.5, 0.25, 0.1} {
b.Run(fmt.Sprintf("pop=%.2f%%", popChance*100), func(b *testing.B) {
b.ReportAllocs()
rnd, _ := randutil.NewTestRand()
var q Queue[T]
for i := 0; i < b.N; i++ {
if rnd.Float64() < popChance {
empty := q.Empty()
var ok bool
if rnd.Int()%2 == 0 {
_, ok = q.PopFront()
} else {
_, ok = q.Pop()
}
if !empty && !ok {
b.Fatal("expected value, got none")
}
} else {
q.Push(makeT(i, rnd))
}
}

c := countChunks(q)
avg := 0
if c > 0 {
avg = q.Len() / c
}
b.Logf("queue len %d; chunks %d (%d avg)", q.Len(), c, avg)
})
}
}

func BenchmarkQueue(b *testing.B) {
defer leaktest.AfterTest(b)()

b.Run("int", func(b *testing.B) {
queueBench[int](b, func(i int, rnd *rand.Rand) int {
return i
})
})
b.Run("*int", func(b *testing.B) {
queueBench[*int](b, func(i int, rnd *rand.Rand) *int {
p := new(int)
*p = i
return p
})
})

type record struct {
a int
slice []byte
}

b.Run("struct", func(b *testing.B) {
queueBench[record](b, func(i int, rnd *rand.Rand) record {
return record{a: i}
})
})
}

0 comments on commit 64fef1a

Please sign in to comment.