-
-
Notifications
You must be signed in to change notification settings - Fork 111
/
arena.go
475 lines (417 loc) · 13.5 KB
/
arena.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
package capnp
import (
"errors"
"sync"
"capnproto.org/go/capnp/v3/exp/bufferpool"
"capnproto.org/go/capnp/v3/internal/str"
)
// An Arena loads and allocates segments for a Message.
type Arena interface {
// NumSegments returns the number of segments in the arena.
// This must not be larger than 1<<32.
NumSegments() int64
// Segment returns the segment identified with the specified id. This
// may return nil if the segment with the specified ID does not exist.
Segment(id SegmentID) *Segment
// Allocate selects a segment to place a new object in, creating a
// segment or growing the capacity of a previously loaded segment if
// necessary. If Allocate does not return an error, then the returned
// segment may store up to minsz bytes starting at the returned address
// offset.
//
// Some allocators may specifically choose to grow the passed seg (if
// non nil), but that is not a requirement.
//
// If Allocate creates a new segment, the ID must be one larger than
// the last segment's ID or zero if it is the first segment.
//
// If Allocate returns an previously loaded segment, then the arena is
// responsible for preserving the existing data.
Allocate(minsz Size, msg *Message, seg *Segment) (*Segment, address, error)
// Release all resources associated with the Arena. Callers MUST NOT
// use the Arena after it has been released.
//
// Calling Release() is OPTIONAL, but may reduce allocations.
//
// Implementations MAY use Release() as a signal to return resources
// to free lists, or otherwise reuse the Arena. However, they MUST
// NOT assume Release() will be called.
Release()
}
// singleSegmentPool is a pool of *SingleSegmentArena.
var singleSegmentPool = sync.Pool{
New: func() any {
return &SingleSegmentArena{}
},
}
// SingleSegmentArena is an Arena implementation that stores message data
// in a continguous slice. Allocation is performed by first allocating a
// new slice and copying existing data. SingleSegment arena does not fail
// unless the caller attempts to access another segment.
type SingleSegmentArena struct {
seg Segment
// bp is the bufferpool assotiated with this arena if it was initialized
// for writing.
bp *bufferpool.Pool
// fromPool determines if this should return to the pool when released.
fromPool bool
}
func zeroSlice(b []byte) {
for i := range b {
b[i] = 0
}
}
// SingleSegment constructs a SingleSegmentArena from b. b MAY be nil.
// Callers MAY use b to populate the segment for reading, or to reserve
// memory of a specific size.
func SingleSegment(b []byte) Arena {
if b == nil {
ssa := singleSegmentPool.Get().(*SingleSegmentArena)
ssa.fromPool = true
ssa.bp = &bufferpool.Default
return ssa
}
return &SingleSegmentArena{seg: Segment{data: b}}
}
func (ssa *SingleSegmentArena) NumSegments() int64 {
return 1
}
func (ssa *SingleSegmentArena) Segment(id SegmentID) *Segment {
if id != 0 {
return nil
}
return &ssa.seg
}
func (ssa *SingleSegmentArena) Allocate(sz Size, msg *Message, seg *Segment) (*Segment, address, error) {
if seg != nil && seg != &ssa.seg {
return nil, 0, errors.New("segment is not associated with arena")
}
data := ssa.seg.data
if len(data)%int(wordSize) != 0 {
return nil, 0, errors.New("segment size is not a multiple of word size")
}
ssa.seg.BindTo(msg)
if hasCapacity(data, sz) {
addr := address(len(ssa.seg.data))
ssa.seg.data = ssa.seg.data[:len(ssa.seg.data)+int(sz)]
return &ssa.seg, addr, nil
}
inc, err := nextAlloc(int64(len(data)), int64(maxAllocSize()), sz)
if err != nil {
return nil, 0, err
}
if ssa.bp == nil {
return nil, 0, errors.New("cannot allocate on read-only SingleSegmentArena")
}
addr := address(len(ssa.seg.data))
ssa.seg.data = ssa.bp.Get(cap(data) + inc)[:len(data)+int(sz)]
copy(ssa.seg.data, data)
zeroSlice(data)
ssa.bp.Put(data)
return &ssa.seg, addr, nil
}
func (ssa *SingleSegmentArena) String() string {
return "single-segment arena [len=" + str.Itod(len(ssa.seg.data)) + " cap=" + str.Itod(cap(ssa.seg.data)) + "]"
}
// Return this arena to an internal sync.Pool of arenas that can be
// re-used. Any time SingleSegment(nil) is called, arenas from this
// pool will be used if available, which can help reduce memory
// allocations.
//
// All segments will be zeroed before re-use.
//
// Calling Release is optional; if not done the garbage collector
// will release the memory per usual.
func (ssa *SingleSegmentArena) Release() {
if ssa.bp != nil {
zeroSlice(ssa.seg.data)
ssa.bp.Put(ssa.seg.data)
}
ssa.seg.BindTo(nil)
ssa.seg.data = nil
if ssa.fromPool {
ssa.fromPool = false // Prevent double return
singleSegmentPool.Put(ssa)
}
}
// MultiSegment is an arena that stores object data across multiple []byte
// buffers, allocating new buffers of exponentially-increasing size when
// full. This avoids the potentially-expensive slice copying of SingleSegment.
type MultiSegmentArena struct {
segs []Segment
// bp is the bufferpool assotiated with this arena's segments if it was
// initialized for writing.
bp *bufferpool.Pool
// fromPool is true if this msa instance was obtained from the
// multiSegmentPool and should be returned there upon release.
fromPool bool
}
// MultiSegment returns a new arena that allocates new segments when
// they are full. b MAY be nil. Callers MAY use b to populate the
// buffer for reading or to reserve memory of a specific size.
func MultiSegment(b [][]byte) *MultiSegmentArena {
if b == nil {
msa := multiSegmentPool.Get().(*MultiSegmentArena)
msa.fromPool = true
return msa
}
return multiSegment(b)
}
// Return this arena to an internal sync.Pool of arenas that can be
// re-used. Any time MultiSegment(nil) is called, arenas from this
// pool will be used if available, which can help reduce memory
// allocations.
//
// All segments will be zeroed before re-use.
//
// Calling Release is optional; if not done the garbage collector
// will release the memory per usual.
func (msa *MultiSegmentArena) Release() {
for i := range msa.segs {
if msa.bp != nil {
zeroSlice(msa.segs[i].data)
msa.bp.Put(msa.segs[i].data)
}
msa.segs[i].data = nil
msa.segs[i].BindTo(nil)
}
if msa.segs != nil {
msa.segs = msa.segs[:0]
}
if msa.fromPool {
// Prevent double inclusion if it is used after release.
msa.fromPool = false
multiSegmentPool.Put(msa)
}
}
// Like MultiSegment, but doesn't use the pool
func multiSegment(b [][]byte) *MultiSegmentArena {
var bp *bufferpool.Pool
var segs []Segment
if b == nil {
bp = &bufferpool.Default
segs = make([]Segment, 0, 5) // Typical size.
} else {
segs = make([]Segment, len(b))
for i := range b {
segs[i].data = b[i]
segs[i].id = SegmentID(i)
}
}
return &MultiSegmentArena{segs: segs, bp: bp}
}
var multiSegmentPool = sync.Pool{
New: func() any {
return multiSegment(nil)
},
}
// demuxArena slices data into a multi-segment arena. It assumes that
// len(data) >= hdr.totalSize().
func (msa *MultiSegmentArena) demux(hdr streamHeader, data []byte) error {
maxSeg := hdr.maxSegment()
if int64(maxSeg) > int64(maxInt-1) {
return errors.New("number of segments overflows int")
}
// Grow list of existing segments as needed.
numSegs := int(maxSeg + 1)
if cap(msa.segs) >= numSegs {
msa.segs = msa.segs[:numSegs]
} else {
inc := numSegs - len(msa.segs)
msa.segs = append(msa.segs, make([]Segment, inc)...)
}
for i := SegmentID(0); i <= maxSeg; i++ {
sz, err := hdr.segmentSize(SegmentID(i))
if err != nil {
return err
}
msa.segs[i].data, data = data[:sz:sz], data[sz:]
msa.segs[i].id = i
}
return nil
}
func (msa *MultiSegmentArena) NumSegments() int64 {
return int64(len(msa.segs))
}
func (msa *MultiSegmentArena) Segment(id SegmentID) *Segment {
if int(id) >= len(msa.segs) {
return nil
}
return &msa.segs[id]
}
func (msa *MultiSegmentArena) Allocate(sz Size, msg *Message, seg *Segment) (*Segment, address, error) {
// Prefer allocating in seg if it has capacity.
if seg != nil && hasCapacity(seg.data, sz) {
// Double check this segment is part of this arena.
contains := false
for i := range msa.segs {
if &msa.segs[i] == seg {
contains = true
break
}
}
if !contains {
// This is a usage error.
return nil, 0, errors.New("preferred segment is not part of the arena")
}
// Double check this segment is for this message.
if seg.Message() != nil && seg.Message() != msg {
return nil, 0, errors.New("attempt to allocate in segment for different message")
}
addr := address(len(seg.data))
newLen := int(addr) + int(sz)
seg.data = seg.data[:newLen]
seg.BindTo(msg)
return seg, addr, nil
}
var total int64
for i := range msa.segs {
data := msa.segs[i].data
if hasCapacity(data, sz) {
// Found segment with spare capacity.
addr := address(len(msa.segs[i].data))
newLen := int(addr) + int(sz)
msa.segs[i].data = msa.segs[i].data[:newLen]
msa.segs[i].BindTo(msg)
return &msa.segs[i], addr, nil
}
if total += int64(cap(data)); total < 0 {
// Overflow.
return nil, 0, errors.New("alloc " + str.Utod(sz) + " bytes: message too large")
}
}
// Check for read-only arena.
if msa.bp == nil {
return nil, 0, errors.New("cannot allocate segment in read-only multi-segment arena")
}
// If this is the very first segment and the requested allocation
// size is zero, modify the requested size to at least one word.
//
// FIXME: this is to maintain compatibility to existing behavior and
// tests in NewMessage(), which assumes this. Remove once arenas
// enforce the contract of always having at least one segment.
compatFirstSegLenZeroAddSize := Size(0)
if len(msa.segs) == 0 && sz == 0 {
compatFirstSegLenZeroAddSize = wordSize
}
// Determine actual allocation size (may be greater than sz).
n, err := nextAlloc(total, 1<<63-1, sz+compatFirstSegLenZeroAddSize)
if err != nil {
return nil, 0, err
}
// We have determined this will be a new segment. Get the backing
// buffer for it.
buf := msa.bp.Get(n)
buf = buf[:sz]
// Setup the segment.
id := SegmentID(len(msa.segs))
msa.segs = append(msa.segs, Segment{
data: buf,
id: id,
})
res := &msa.segs[int(id)]
res.BindTo(msg)
return res, 0, nil
}
func (msa *MultiSegmentArena) String() string {
return "multi-segment arena [" + str.Itod(len(msa.segs)) + " segments]"
}
// nextAlloc computes how much more space to allocate given the number
// of bytes allocated in the entire message and the requested number of
// bytes. It will always return a multiple of wordSize. max must be a
// multiple of wordSize. The sum of curr and the returned size will
// always be less than max.
func nextAlloc(curr, max int64, req Size) (int, error) {
if req == 0 {
return 0, nil
}
if req > maxAllocSize() {
return 0, errors.New("alloc " + req.String() + ": too large")
}
padreq := req.padToWord()
want := curr + int64(padreq)
if want <= curr || want > max {
return 0, errors.New("alloc " + req.String() + ": message size overflow")
}
new := curr
double := new + new
switch {
case want < 1024:
next := (1024 - curr + 7) &^ 7
if next < curr {
return int((curr + 7) &^ 7), nil
}
return int(next), nil
case want > double:
return int(padreq), nil
default:
for 0 < new && new < want {
new += new / 4
}
if new <= 0 {
return int(padreq), nil
}
delta := new - curr
if delta > int64(maxAllocSize()) {
return int(maxAllocSize()), nil
}
return int((delta + 7) &^ 7), nil
}
}
func hasCapacity(b []byte, sz Size) bool {
return sz <= Size(cap(b)-len(b))
}
type ReadOnlySingleSegment struct {
seg Segment
}
// NumSegments returns the number of segments in the arena.
// This must not be larger than 1<<32.
func (r *ReadOnlySingleSegment) NumSegments() int64 {
return 1
}
// Segment returns the segment identified with the specified id. This
// may return nil if the segment with the specified ID does not exist.
func (r *ReadOnlySingleSegment) Segment(id SegmentID) *Segment {
if id == 0 {
return &r.seg
}
return nil
}
// Allocate selects a segment to place a new object in, creating a
// segment or growing the capacity of a previously loaded segment if
// necessary. If Allocate does not return an error, then the
// difference of the capacity and the length of the returned slice
// must be at least minsz. Some allocators may specifically choose to
// grow the passed seg (if non nil).
//
// If Allocate creates a new segment, the ID must be one larger than
// the last segment's ID or zero if it is the first segment.
//
// If Allocate returns an previously loaded segment, then the
// arena is responsible for preserving the existing data.
func (r *ReadOnlySingleSegment) Allocate(minsz Size, msg *Message, seg *Segment) (*Segment, address, error) {
return nil, 0, errors.New("readOnly segment cannot allocate data")
}
// Release all resources associated with the Arena. Callers MUST NOT
// use the Arena after it has been released.
//
// Calling Release() is OPTIONAL, but may reduce allocations.
//
// Implementations MAY use Release() as a signal to return resources
// to free lists, or otherwise reuse the Arena. However, they MUST
// NOT assume Release() will be called.
func (r *ReadOnlySingleSegment) Release() {
r.seg.data = nil
}
// ReplaceData replaces the current data of the arena. This should ONLY be
// called on an empty or released arena, or else it panics.
func (r *ReadOnlySingleSegment) ReplaceData(b []byte) {
if r.seg.data != nil {
panic("replacing data on unreleased ReadOnlyArena")
}
r.seg.data = b
}
// NewReadOnlySingleSegment creates a new read only arena with the given data.
func NewReadOnlySingleSegment(b []byte) *ReadOnlySingleSegment {
return &ReadOnlySingleSegment{seg: Segment{data: b}}
}