diff --git a/internal/quic/gate.go b/internal/quic/gate.go new file mode 100644 index 000000000..efb28daf8 --- /dev/null +++ b/internal/quic/gate.go @@ -0,0 +1,104 @@ +// Copyright 2023 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build go1.21 + +package quic + +import "context" + +// An gate is a monitor (mutex + condition variable) with one bit of state. +// +// The condition may be either set or unset. +// Lock operations may be unconditional, or wait for the condition to be set. +// Unlock operations record the new state of the condition. +type gate struct { + // When unlocked, exactly one of set or unset contains a value. + // When locked, neither chan contains a value. + set chan struct{} + unset chan struct{} +} + +func newGate() gate { + g := gate{ + set: make(chan struct{}, 1), + unset: make(chan struct{}, 1), + } + g.unset <- struct{}{} + return g +} + +// lock acquires the gate unconditionally. +// It reports whether the condition is set. +func (g *gate) lock() (set bool) { + select { + case <-g.set: + return true + case <-g.unset: + return false + } +} + +// waitAndLock waits until the condition is set before acquiring the gate. +func (g *gate) waitAndLock() { + <-g.set +} + +// waitAndLockContext waits until the condition is set before acquiring the gate. +// If the context expires, waitAndLockContext returns an error and does not acquire the gate. +func (g *gate) waitAndLockContext(ctx context.Context) error { + select { + case <-g.set: + return nil + default: + } + select { + case <-g.set: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +// waitWithLock releases an acquired gate until the condition is set. +// The caller must have previously acquired the gate. +// Upon return from waitWithLock, the gate will still be held. +// If waitWithLock returns nil, the condition is set. +func (g *gate) waitWithLock(ctx context.Context) error { + g.unlock(false) + err := g.waitAndLockContext(ctx) + if err != nil { + if g.lock() { + // The condition was set in between the context expiring + // and us reacquiring the gate. + err = nil + } + } + return err +} + +// lockIfSet acquires the gate if and only if the condition is set. +func (g *gate) lockIfSet() (acquired bool) { + select { + case <-g.set: + return true + default: + return false + } +} + +// unlock sets the condition and releases the gate. +func (g *gate) unlock(set bool) { + if set { + g.set <- struct{}{} + } else { + g.unset <- struct{}{} + } +} + +// unlock sets the condition to the result of f and releases the gate. +// Useful in defers. +func (g *gate) unlockFunc(f func() bool) { + g.unlock(f()) +} diff --git a/internal/quic/gate_test.go b/internal/quic/gate_test.go new file mode 100644 index 000000000..0122e3986 --- /dev/null +++ b/internal/quic/gate_test.go @@ -0,0 +1,142 @@ +// Copyright 2023 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build go1.21 + +package quic + +import ( + "context" + "testing" + "time" +) + +func TestGateLockAndUnlock(t *testing.T) { + g := newGate() + if set := g.lock(); set { + t.Errorf("g.lock() of never-locked gate: true, want false") + } + unlockedc := make(chan struct{}) + donec := make(chan struct{}) + go func() { + defer close(donec) + set := g.lock() + select { + case <-unlockedc: + default: + t.Errorf("g.lock() succeeded while gate was held") + } + if !set { + t.Errorf("g.lock() of set gate: false, want true") + } + g.unlock(false) + }() + time.Sleep(1 * time.Millisecond) + close(unlockedc) + g.unlock(true) + <-donec + if set := g.lock(); set { + t.Errorf("g.lock() of unset gate: true, want false") + } +} + +func TestGateWaitAndLock(t *testing.T) { + g := newGate() + set := false + go func() { + for i := 0; i < 3; i++ { + g.lock() + g.unlock(false) + time.Sleep(1 * time.Millisecond) + } + g.lock() + set = true + g.unlock(true) + }() + g.waitAndLock() + if !set { + t.Errorf("g.waitAndLock() returned before gate was set") + } +} + +func TestGateWaitAndLockContext(t *testing.T) { + g := newGate() + // waitAndLockContext is canceled + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(1 * time.Millisecond) + cancel() + }() + if err := g.waitAndLockContext(ctx); err != context.Canceled { + t.Errorf("g.waitAndLockContext() = %v, want context.Canceled", err) + } + // waitAndLockContext succeeds + set := false + go func() { + time.Sleep(1 * time.Millisecond) + g.lock() + set = true + g.unlock(true) + }() + if err := g.waitAndLockContext(context.Background()); err != nil { + t.Errorf("g.waitAndLockContext() = %v, want nil", err) + } + if !set { + t.Errorf("g.waitAndLockContext() returned before gate was set") + } + g.unlock(true) + // waitAndLockContext succeeds when the gate is set and the context is canceled + if err := g.waitAndLockContext(ctx); err != nil { + t.Errorf("g.waitAndLockContext() = %v, want nil", err) + } +} + +func TestGateWaitWithLock(t *testing.T) { + g := newGate() + // waitWithLock is canceled + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(1 * time.Millisecond) + cancel() + }() + g.lock() + if err := g.waitWithLock(ctx); err != context.Canceled { + t.Errorf("g.waitWithLock() = %v, want context.Canceled", err) + } + // waitWithLock succeeds + set := false + go func() { + g.lock() + set = true + g.unlock(true) + }() + time.Sleep(1 * time.Millisecond) + if err := g.waitWithLock(context.Background()); err != nil { + t.Errorf("g.waitWithLock() = %v, want nil", err) + } + if !set { + t.Errorf("g.waitWithLock() returned before gate was set") + } +} + +func TestGateLockIfSet(t *testing.T) { + g := newGate() + if locked := g.lockIfSet(); locked { + t.Errorf("g.lockIfSet() of unset gate = %v, want false", locked) + } + g.lock() + g.unlock(true) + if locked := g.lockIfSet(); !locked { + t.Errorf("g.lockIfSet() of set gate = %v, want true", locked) + } +} + +func TestGateUnlockFunc(t *testing.T) { + g := newGate() + go func() { + g.lock() + defer g.unlockFunc(func() bool { return true }) + }() + g.waitAndLock() +} diff --git a/internal/quic/queue.go b/internal/quic/queue.go new file mode 100644 index 000000000..9bb71ca3f --- /dev/null +++ b/internal/quic/queue.go @@ -0,0 +1,65 @@ +// Copyright 2023 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build go1.21 + +package quic + +import "context" + +// A queue is an unbounded queue of some item (new connections and streams). +type queue[T any] struct { + // The gate condition is set if the queue is non-empty or closed. + gate gate + err error + q []T +} + +func newQueue[T any]() queue[T] { + return queue[T]{gate: newGate()} +} + +// close closes the queue, causing pending and future pop operations +// to return immediately with err. +func (q *queue[T]) close(err error) { + q.gate.lock() + defer q.unlock() + if q.err == nil { + q.err = err + } +} + +// put appends an item to the queue. +// It returns true if the item was added, false if the queue is closed. +func (q *queue[T]) put(v T) bool { + q.gate.lock() + defer q.unlock() + if q.err != nil { + return false + } + q.q = append(q.q, v) + return true +} + +// get removes the first item from the queue, blocking until ctx is done, an item is available, +// or the queue is closed. +func (q *queue[T]) get(ctx context.Context) (T, error) { + var zero T + if err := q.gate.waitAndLockContext(ctx); err != nil { + return zero, err + } + defer q.unlock() + if q.err != nil { + return zero, q.err + } + v := q.q[0] + copy(q.q[:], q.q[1:]) + q.q[len(q.q)-1] = zero + q.q = q.q[:len(q.q)-1] + return v, nil +} + +func (q *queue[T]) unlock() { + q.gate.unlock(q.err != nil || len(q.q) > 0) +} diff --git a/internal/quic/queue_test.go b/internal/quic/queue_test.go new file mode 100644 index 000000000..8debeff11 --- /dev/null +++ b/internal/quic/queue_test.go @@ -0,0 +1,59 @@ +// Copyright 2023 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build go1.21 + +package quic + +import ( + "context" + "io" + "testing" + "time" +) + +func TestQueue(t *testing.T) { + nonblocking, cancel := context.WithCancel(context.Background()) + cancel() + + q := newQueue[int]() + if got, err := q.get(nonblocking); err != context.Canceled { + t.Fatalf("q.get() = %v, %v, want nil, contex.Canceled", got, err) + } + + if !q.put(1) { + t.Fatalf("q.put(1) = false, want true") + } + if !q.put(2) { + t.Fatalf("q.put(2) = false, want true") + } + if got, err := q.get(nonblocking); got != 1 || err != nil { + t.Fatalf("q.get() = %v, %v, want 1, nil", got, err) + } + if got, err := q.get(nonblocking); got != 2 || err != nil { + t.Fatalf("q.get() = %v, %v, want 2, nil", got, err) + } + if got, err := q.get(nonblocking); err != context.Canceled { + t.Fatalf("q.get() = %v, %v, want nil, contex.Canceled", got, err) + } + + go func() { + time.Sleep(1 * time.Millisecond) + q.put(3) + }() + if got, err := q.get(context.Background()); got != 3 || err != nil { + t.Fatalf("q.get() = %v, %v, want 3, nil", got, err) + } + + if !q.put(4) { + t.Fatalf("q.put(2) = false, want true") + } + q.close(io.EOF) + if got, err := q.get(context.Background()); got != 0 || err != io.EOF { + t.Fatalf("q.get() = %v, %v, want 0, io.EOF", got, err) + } + if q.put(5) { + t.Fatalf("q.put(5) = true, want false") + } +}