Skip to content

Commit

Permalink
quic: add pipe type
Browse files Browse the repository at this point in the history
Streams (including CRYPTO streams) are an ordered byte sequence.

Both outgoing and incoming streams require random access to a portion
of that sequence. Outbound packets may be lost, requiring us to
resend the data in the lost packet. Inbound packets may arrive out
of order.

Add a "pipe" type as a building block for both inbound and outbound
streams. A pipe is a window into a portion of a stream, permitting
random read and write access within that window (unlike bufio.Reader
or bufio.Writer).

Pipes are implemented as a linked list of blocks.

Block sizes are uniform and allocations are pooled,
avoiding non-pool allocations in the steady state.

Pipe memory consumption is proportional to the current window,
and goes to zero when the window has been fully consumed
(unlike bytes.Buffer).

For golang/go#58547

Change-Id: I0c16707552c9c46f31055daea2396590a924fc60
Reviewed-on: https://go-review.googlesource.com/c/net/+/510615
Run-TryBot: Damien Neil <[email protected]>
Reviewed-by: Jonathan Amsterdam <[email protected]>
TryBot-Result: Gopher Robot <[email protected]>
  • Loading branch information
neild committed Jul 18, 2023
1 parent 8db2ead commit d0912d4
Show file tree
Hide file tree
Showing 2 changed files with 244 additions and 0 deletions.
149 changes: 149 additions & 0 deletions internal/quic/pipe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

//go:build go1.21

package quic

import (
"sync"
)

// A pipe is a byte buffer used in implementing streams.
//
// A pipe contains a window of stream data.
// Random access reads and writes are supported within the window.
// Writing past the end of the window extends it.
// Data may be discarded from the start of the pipe, advancing the window.
type pipe struct {
start int64
end int64
head *pipebuf
tail *pipebuf
}

type pipebuf struct {
off int64
b []byte
next *pipebuf
}

func (pb *pipebuf) end() int64 {
return pb.off + int64(len(pb.b))
}

var pipebufPool = sync.Pool{
New: func() any {
return &pipebuf{
b: make([]byte, 4096),
}
},
}

func newPipebuf() *pipebuf {
return pipebufPool.Get().(*pipebuf)
}

func (b *pipebuf) recycle() {
b.off = 0
b.next = nil
pipebufPool.Put(b)
}

// writeAt writes len(b) bytes to the pipe at offset off.
//
// Writes to offsets before p.start are discarded.
// Writes to offsets after p.end extend the pipe window.
func (p *pipe) writeAt(b []byte, off int64) {
end := off + int64(len(b))
if end > p.end {
p.end = end
} else if end <= p.start {
return
}

if off < p.start {
// Discard the portion of b which falls before p.start.
trim := p.start - off
b = b[trim:]
off = p.start
}

if p.head == nil {
p.head = newPipebuf()
p.head.off = p.start
p.tail = p.head
}
pb := p.head
if off >= p.tail.off {
// Common case: Writing past the end of the pipe.
pb = p.tail
}
for {
pboff := off - pb.off
if pboff < int64(len(pb.b)) {
n := copy(pb.b[pboff:], b)
if n == len(b) {
return
}
off += int64(n)
b = b[n:]
}
if pb.next == nil {
pb.next = newPipebuf()
pb.next.off = pb.off + int64(len(pb.b))
p.tail = pb.next
}
pb = pb.next
}
}

// copy copies len(b) bytes into b starting from off.
// The pipe must contain [off, off+len(b)).
func (p *pipe) copy(off int64, b []byte) {
dst := b[:0]
p.read(off, len(b), func(c []byte) error {
dst = append(dst, c...)
return nil
})
}

// read calls f with the data in [off, off+n)
// The data may be provided sequentially across multiple calls to f.
func (p *pipe) read(off int64, n int, f func([]byte) error) error {
if off < p.start {
panic("invalid read range")
}
for pb := p.head; pb != nil && n > 0; pb = pb.next {
if off >= pb.end() {
continue
}
b := pb.b[off-pb.off:]
if len(b) > n {
b = b[:n]
}
off += int64(len(b))
n -= len(b)
if err := f(b); err != nil {
return err
}
}
if n > 0 {
panic("invalid read range")
}
return nil
}

// discardBefore discards all data prior to off.
func (p *pipe) discardBefore(off int64) {
for p.head != nil && p.head.end() < off {
head := p.head
p.head = p.head.next
head.recycle()
}
if p.head == nil {
p.tail = nil
}
p.start = off
}
95 changes: 95 additions & 0 deletions internal/quic/pipe_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

//go:build go1.21

package quic

import (
"bytes"
"math/rand"
"testing"
)

func TestPipeWrites(t *testing.T) {
type writeOp struct {
start, end int64
}
type discardBeforeOp struct {
off int64
}
type op any
src := make([]byte, 65536)
rand.New(rand.NewSource(0)).Read(src)
for _, test := range []struct {
desc string
ops []op
}{{
desc: "sequential writes",
ops: []op{
writeOp{0, 1024},
writeOp{1024, 4096},
writeOp{4096, 65536},
},
}, {
desc: "disordered overlapping writes",
ops: []op{
writeOp{2000, 8000},
writeOp{0, 3000},
writeOp{7000, 12000},
},
}, {
desc: "write to discarded region",
ops: []op{
writeOp{0, 65536},
discardBeforeOp{32768},
writeOp{0, 1000},
writeOp{3000, 5000},
writeOp{0, 32768},
},
}, {
desc: "write overlaps discarded region",
ops: []op{
discardBeforeOp{10000},
writeOp{0, 20000},
},
}, {
desc: "discard everything",
ops: []op{
writeOp{0, 10000},
discardBeforeOp{10000},
writeOp{10000, 20000},
},
}} {
var p pipe
var wantset rangeset[int64]
var wantStart, wantEnd int64
for i, o := range test.ops {
switch o := o.(type) {
case writeOp:
p.writeAt(src[o.start:o.end], o.start)
wantset.add(o.start, o.end)
wantset.sub(0, wantStart)
if o.end > wantEnd {
wantEnd = o.end
}
case discardBeforeOp:
p.discardBefore(o.off)
wantset.sub(0, o.off)
wantStart = o.off
}
if p.start != wantStart || p.end != wantEnd {
t.Errorf("%v: after %#v p contains [%v,%v), want [%v,%v)", test.desc, test.ops[:i+1], p.start, p.end, wantStart, wantEnd)
}
for _, r := range wantset {
want := src[r.start:][:r.size()]
got := make([]byte, r.size())
p.copy(r.start, got)
if !bytes.Equal(got, want) {
t.Errorf("%v after %#v, mismatch in data in %v", test.desc, test.ops[:i+1], r)
}
}
}
}
}

0 comments on commit d0912d4

Please sign in to comment.