Skip to content

Commit

Permalink
fixups
Browse files Browse the repository at this point in the history
  • Loading branch information
mattayes committed Dec 21, 2023
1 parent 0e0085e commit f8fbefb
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 39 deletions.
69 changes: 37 additions & 32 deletions opwindow.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,20 @@ import (
// OpWindow provides back-pressure for both depth (i.e., number of entries in queue) and width (i.e., number of entries in a microbatch).
// OpWindow is safe for concurrent use. Its zero value is not safe to use, use NewOpWindow().
type OpWindow struct {
mu sync.Mutex
emptyCond sync.Cond
fullCond sync.Cond
mu sync.Mutex
q list.List // *queueItem
m map[ID]*queueItem

// These are selectable sync.Cond: use blocking read for Wait() and non-blocking write for Signal().
queueHasItems chan struct{}
queueHasSpace chan struct{}

once sync.Once
done chan struct{}

depth int
width int
windowedBy time.Duration

q list.List // *queueItem
m map[ID]*queueItem
}

// NewOpWindow creates a new OpWindow.
Expand All @@ -35,14 +36,14 @@ type OpWindow struct {
// windowedBy: window size.
func NewOpWindow(depth, width int, windowedBy time.Duration) *OpWindow {
q := &OpWindow{
done: make(chan struct{}),
depth: depth,
width: width,
windowedBy: windowedBy,
m: make(map[ID]*queueItem),
queueHasItems: make(chan struct{}, 1),
queueHasSpace: make(chan struct{}, 1),
done: make(chan struct{}),
depth: depth,
width: width,
windowedBy: windowedBy,
m: make(map[ID]*queueItem),
}
q.emptyCond.L = &q.mu
q.fullCond.L = &q.mu
q.q.Init()
return q
}
Expand All @@ -54,41 +55,36 @@ func (q *OpWindow) Close() {

q.once.Do(func() {
close(q.done)
// alert all dequeue calls that they should wake up and return.
q.emptyCond.Broadcast()
q.fullCond.Broadcast()
// HACK (2023-12) (mh): Set depth to zero so new entries are rejected.
q.depth = 0
})
}

// Enqueue op into queue, blocking until first of: op is enqueued, ID has hit max width, context is done, or queue is closed.
func (q *OpWindow) Enqueue(ctx context.Context, id ID, op *Op) error {
select {
case <-q.done:
return ErrQueueClosed
default:
}

q.mu.Lock()
defer q.mu.Unlock()
q.mu.Lock() // locked on returns below

for {
item, ok := q.m[id]
if ok {
if len(item.OpSet.set) >= q.width {
q.mu.Unlock()
return ErrQueueSaturatedWidth
}
item.OpSet.append(op)
q.mu.Unlock()
return nil
}

if q.q.Len() >= q.depth {
q.mu.Unlock()
select {
case <-ctx.Done():
return fmt.Errorf("%w: %w", ErrQueueSaturatedDepth, ctx.Err())
case <-q.done:
return ErrQueueClosed
default:
q.fullCond.Wait()
case <-q.queueHasSpace:
q.mu.Lock()
continue
}
}
Expand All @@ -100,29 +96,34 @@ func (q *OpWindow) Enqueue(ctx context.Context, id ID, op *Op) error {
}
q.m[id] = item
q.q.PushBack(item)
q.mu.Unlock()

select {
case q.queueHasItems <- struct{}{}:
default:
}

q.emptyCond.Signal()
return nil
}
}

// Dequeue removes and returns the oldest OpSet whose window has passed from the queue,
// blocking until first of: OpSet is ready, context is canceled, or queue is closed.
func (q *OpWindow) Dequeue(ctx context.Context) (*OpSet, error) {
q.mu.Lock()
defer q.mu.Unlock()
q.mu.Lock() // unlocked on returns below

var item *queueItem
for item == nil {
elem := q.q.Front()
if elem == nil {
q.mu.Unlock()
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-q.done:
return nil, ErrQueueClosed
default:
q.emptyCond.Wait()
case <-q.queueHasItems:
q.mu.Lock()
continue
}

Expand All @@ -146,9 +147,13 @@ func (q *OpWindow) Dequeue(ctx context.Context) (*OpSet, error) {

ops := item.OpSet
delete(q.m, item.ID)
q.mu.Unlock()
item = nil // gc

q.fullCond.Signal()
select {
case q.queueHasSpace <- struct{}{}:
default:
}
return ops, nil
}

Expand Down
48 changes: 41 additions & 7 deletions opwindow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,22 +159,56 @@ func TestOpWindowErrQueueSaturatedDepth(t *testing.T) {
op2 := cg.Add(2, &tsMsg{234, now})

window := NewOpWindow(1, 1, time.Millisecond)
err := window.Enqueue(context.Background(), op1.Key, op1)
ctx := context.Background()
err := window.Enqueue(ctx, op1.Key, op1)
require.NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) // let it run for a sec for coverage ¯\_(ツ)_/¯
ctx1, cancel := context.WithTimeout(ctx, time.Millisecond) // let it run for a sec for coverage ¯\_(ツ)_/¯
defer cancel()
ctx, cancel = context.WithCancel(context.Background())
defer cancel()
go func() {
<-ctx.Done()
// pretend we dequeued but were full again
window.fullCond.Signal()
<-ctx1.Done()
window.queueHasSpace <- struct{}{} // emulate queue having space then filling back up
cancel()
}()

err = window.Enqueue(ctx, op2.Key, op2)
require.ErrorIs(t, err, ErrQueueSaturatedDepth)

_, err = window.Dequeue(context.Background())
_, err = window.Dequeue(ctx)
require.NoError(t, err)

err = window.Enqueue(context.Background(), op2.Key, op2)
err = window.Enqueue(ctx, op2.Key, op2)
require.NoError(t, err)
}

func TestOpWindowErrQueueSaturatedDepthClose(t *testing.T) {
t.Parallel()
cg := NewCallGroup(func(map[ID]*Response) {})
now := time.Now()
op1 := cg.Add(1, &tsMsg{123, now})
op2 := cg.Add(2, &tsMsg{234, now})

window := NewOpWindow(1, 1, time.Millisecond)
ctx := context.Background()
err := window.Enqueue(ctx, op1.Key, op1)
require.NoError(t, err)

go func() {
time.Sleep(time.Millisecond)
window.Close()
}()

err = window.Enqueue(ctx, op2.Key, op2)
require.ErrorIs(t, err, ErrQueueClosed)
}

func TestOpWindowDequeueEmptyQueue(t *testing.T) {
t.Parallel()
window := NewOpWindow(1, 1, time.Hour)
ctx, cancel := context.WithCancel(context.Background())
cancel()
_, err := window.Dequeue(ctx)
require.ErrorIs(t, err, ctx.Err())
}

0 comments on commit f8fbefb

Please sign in to comment.