forked from glycerine/rbuf
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpbuf.go
172 lines (151 loc) · 4.37 KB
/
pbuf.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
package rbuf
// copyright (c) 2016, Jason E. Aten
// license: MIT
import "io"
// PointerRingBuf:
//
// a fixed-size circular ring buffer of interface{}
//
type PointerRingBuf struct {
A []interface{}
N int // MaxView, the total size of A, whether or not in use.
Beg int // start of in-use data in A
Readable int // number of pointers available in A (in use)
}
// constructor. NewPointerRingBuf will allocate internally
// a slice of size maxViewInBytes.
func NewPointerRingBuf(maxViewInBytes int) *PointerRingBuf {
n := maxViewInBytes
r := &PointerRingBuf{
N: n,
Beg: 0,
Readable: 0,
}
r.A = make([]interface{}, n, n)
return r
}
// TwoContig returns all readable pointers, but in two separate slices,
// to avoid copying. The two slices are from the same buffer, but
// are not contiguous. Either or both may be empty slices.
func (b *PointerRingBuf) TwoContig(makeCopy bool) (first []interface{}, second []interface{}) {
extent := b.Beg + b.Readable
if extent <= b.N {
// we fit contiguously in this buffer without wrapping to the other.
// Let second stay an empty slice.
return b.A[b.Beg:(b.Beg + b.Readable)], second
}
return b.A[b.Beg:b.N], b.A[0:(extent % b.N)]
}
// ReadPtrs():
//
// from bytes.Buffer.Read(): Read reads the next len(p) interface{}
// pointers from the buffer or until the buffer is drained. The return
// value n is the number of bytes read. If the buffer has no data
// to return, err is io.EOF (unless len(p) is zero); otherwise it is nil.
func (b *PointerRingBuf) ReadPtrs(p []interface{}) (n int, err error) {
return b.readAndMaybeAdvance(p, true)
}
// ReadWithoutAdvance(): if you want to Read the data and leave
// it in the buffer, so as to peek ahead for example.
func (b *PointerRingBuf) ReadWithoutAdvance(p []interface{}) (n int, err error) {
return b.readAndMaybeAdvance(p, false)
}
func (b *PointerRingBuf) readAndMaybeAdvance(p []interface{}, doAdvance bool) (n int, err error) {
if len(p) == 0 {
return 0, nil
}
if b.Readable == 0 {
return 0, io.EOF
}
extent := b.Beg + b.Readable
if extent <= b.N {
n += copy(p, b.A[b.Beg:extent])
} else {
n += copy(p, b.A[b.Beg:b.N])
if n < len(p) {
n += copy(p[n:], b.A[0:(extent%b.N)])
}
}
if doAdvance {
b.Advance(n)
}
return
}
//
// WritePtrs writes len(p) interface{} values from p to
// the underlying data stream.
// It returns the number of bytes written from p (0 <= n <= len(p))
// and any error encountered that caused the write to stop early.
// Write must return a non-nil error if it returns n < len(p).
//
func (b *PointerRingBuf) WritePtrs(p []interface{}) (n int, err error) {
for {
if len(p) == 0 {
// nothing (left) to copy in; notice we shorten our
// local copy p (below) as we read from it.
return
}
writeCapacity := b.N - b.Readable
if writeCapacity <= 0 {
// we are all full up already.
return n, io.ErrShortWrite
}
if len(p) > writeCapacity {
err = io.ErrShortWrite
// leave err set and
// keep going, write what we can.
}
writeStart := (b.Beg + b.Readable) % b.N
upperLim := intMin(writeStart+writeCapacity, b.N)
k := copy(b.A[writeStart:upperLim], p)
n += k
b.Readable += k
p = p[k:]
// we can fill from b.A[0:something] from
// p's remainder, so loop
}
}
// Reset quickly forgets any data stored in the ring buffer. The
// data is still there, but the ring buffer will ignore it and
// overwrite those buffers as new data comes in.
func (b *PointerRingBuf) Reset() {
b.Beg = 0
b.Readable = 0
}
// Advance(): non-standard, but better than Next(),
// because we don't have to unwrap our buffer and pay the cpu time
// for the copy that unwrapping may need.
// Useful in conjuction/after ReadWithoutAdvance() above.
func (b *PointerRingBuf) Advance(n int) {
if n <= 0 {
return
}
if n > b.Readable {
n = b.Readable
}
b.Readable -= n
b.Beg = (b.Beg + n) % b.N
}
// Adopt(): non-standard.
//
// For efficiency's sake, (possibly) take ownership of
// already allocated slice offered in me.
//
// If me is large we will adopt it, and we will potentially then
// write to the me buffer.
// If we already have a bigger buffer, copy me into the existing
// buffer instead.
func (b *PointerRingBuf) Adopt(me []interface{}) {
n := len(me)
if n > b.N {
b.A = me
b.N = n
b.Beg = 0
b.Readable = n
} else {
// we already have a larger buffer, reuse it.
copy(b.A, me)
b.Beg = 0
b.Readable = n
}
}