-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathadaptive_pool.go
232 lines (203 loc) · 6.91 KB
/
adaptive_pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
package adaptivepool
import (
"bytes"
"math"
"sync"
"sync/atomic"
)
// ItemProvider handles and measures items for an [AdaptivePool].
type ItemProvider[T any] interface {
// Costof measures the cost of an item. Items with cost zero will not be put
// back in the pool nor will be fed into statistics.
Costof(T) int
// New creates a new item with cost zero, but pre-allocated to `prealloc`
// cost. If it is not possible to perform this preallocation, it is
// acceptable to return an item with a smaller preallocated cost.
New(prealloc int) T
// Reset clears leftover data from past uses.
Reset(T) T
}
// SliceProvider is a generic [ItemProvider] for slice items.
type SliceProvider[T any] struct{}
// Costof returns the capacity of the slice.
func (p SliceProvider[T]) Costof(v []T) int {
return cap(v)
}
// New returns a new slice with `len` zero and `cap` equal to `prealloc`.
func (p SliceProvider[T]) New(prealloc int) []T {
return make([]T, 0, prealloc)
}
// Reset clears the underlying elements and reslices the item to zero-length.
func (p SliceProvider[T]) Reset(v []T) []T {
if v != nil {
clear(v[:cap(v)])
v = v[:0]
}
return v
}
// BytesBufferProvider is an [ItemProvider] for [*bytes.Buffer] items.
type BytesBufferProvider struct{}
// Costof returns the capacity of the buffer if it's not nil, and zero if it's
// nil.
func (p BytesBufferProvider) Costof(v *bytes.Buffer) int {
if v != nil {
return v.Cap()
}
return 0
}
// Reset clears the underlying data and returns the buffer after resetting it.
func (p BytesBufferProvider) Reset(v *bytes.Buffer) *bytes.Buffer {
if v != nil {
v.Reset()
b := v.Bytes()
clear(b[:cap(b)])
}
return v
}
// New returns a new *bytes.Buffer with `Len` zero and `Cap` equal to
// `prealloc`.
func (p BytesBufferProvider) New(prealloc int) *bytes.Buffer {
return bytes.NewBuffer(make([]byte, 0, prealloc))
}
// EstimatorStats provides a set of statistics based on observed item costs put
// in an [AdaptivePool].
type EstimatorStats struct {
Mean float64 // Arithmetic Mean
StdDev float64 // Population Standard Deviation
// FIXME: it would be interesting to at least provide N. The current
// implementation is lock-free in the read-path at the cost of Mean and
// StdDev actually having float32 precision. In order to add more stats, it
// would probably require to protect the read-path. That could also give
// back their precision to Mean and StdDev. We have this struct so that
// future iterations can solve for that independently of the rest of the
// code, allowing previous Estimator implementations to keep working.
}
// Estimator provides opinions for decisions made by an [AdaptivePool].
// Implementations should correctly handle `stdDev` being NaN.
type Estimator interface {
// Suggest returns a suggested item cost for a new item.
Suggest(EstimatorStats) int
// Accept returns whether an item of the given cost should be accepted into
// the internal sync.Pool of an AdaptivePool, or otherwise just dropped for
// garbage collection.
Accept(s EstimatorStats, itemCost int) bool
}
// NormalEstimator is an [Estimator] that assumes a Normal Distribution over the
// item costs put into an [AdaptivePool].
type NormalEstimator struct {
Threshold float64 // Threshold must be non-negative.
MinCost int // Minimum cost that will be suggested.
}
// Suggest returns `mean ± e.Threshold * stdDev` as an estimation if `stdDev` is
// not `NaN`, or `mean` otherwise. If `MinCost` is positive, then it will not
// return a value less than that.
func (e NormalEstimator) Suggest(s EstimatorStats) int {
if math.IsNaN(s.StdDev) {
return max(e.MinCost, int(math.Round(s.Mean)))
}
return max(e.MinCost, int(math.Round(s.Mean+e.Threshold*s.StdDev)))
}
// Accept will return true if `stdDev` is `NaN` or if `itemCost` is in the
// inclusive range `mean ± e.Threshold * stdDev`.
func (e NormalEstimator) Accept(s EstimatorStats, itemCost int) bool {
if math.IsNaN(s.StdDev) {
return true
}
ct64 := float64(itemCost)
sdThresh := e.Threshold * s.StdDev
return s.Mean-sdThresh <= ct64 && ct64 <= s.Mean+sdThresh
}
// AdaptivePool uses an [Estimator] to more effectively use an internal
// [sync.Pool] that holds items created by an [ItemProvider].
type AdaptivePool[T any] struct {
pool pool
provider ItemProvider[T]
estimator Estimator
// reading is lock-free, and actually uses 32bit floating points to store
// mean and stdDev in a single 64bit atomic value
rStats atomic.Uint64
statsMu sync.RWMutex
stats Stats
}
// New creates an AdaptivePool. See [Stats.SetMaxN] for a description of the
// `maxN` argument.
func New[T any](p ItemProvider[T], e Estimator, maxN float64) *AdaptivePool[T] {
return new(AdaptivePool[T]).init(p, e, maxN)
}
func (p *AdaptivePool[T]) init(
pp ItemProvider[T],
e Estimator,
maxN float64,
) *AdaptivePool[T] {
p.provider = pp
p.estimator = e
p.stats.SetMaxN(maxN)
p.pool = new(sync.Pool)
return p
}
// Get returns a new object from the pool, allocating it from the ItemProvider
// if needed.
func (p *AdaptivePool[T]) Get() T {
if v := p.pool.Get(); v != nil {
return v.(T)
}
mn32, sd32 := decodeBits(p.rStats.Load())
cost := p.estimator.Suggest(EstimatorStats{
Mean: float64(mn32),
StdDev: float64(sd32),
})
return p.provider.New(cost)
}
// GetWithCost returns a new object with the specified cost from the pool,
// allocating it from the ItemProvider if needed.
func (p *AdaptivePool[T]) GetWithCost(cost int) T {
// if the item we got from the pool is smaller than needed, drop it for
// garbage collection and instead directly allocate a new one with the
// appropriate cost
if v, ok := p.pool.Get().(T); ok && p.provider.Costof(v) >= cost {
return v
}
return p.provider.New(cost)
}
// Put updates the internal statistics with the cost of the object and puts
// it back into the pool if [Estimator.Accept] allows it. Items with a
// non-positive cost are immediately dropped.
func (p *AdaptivePool[T]) Put(x T) {
// we call Reset here, which would allow an implementation to hijack the
// item if they wanted to, and then return a zero-cost item
p.provider.Reset(x)
s := p.provider.Costof(x)
if s < 1 {
return
}
mean, stdDev := p.writeThenRead(s)
st := EstimatorStats{
Mean: mean,
StdDev: stdDev,
}
if p.estimator.Accept(st, s) {
p.pool.Put(x)
}
}
func (p *AdaptivePool[T]) writeThenRead(s int) (mean, stdDev float64) {
p.statsMu.Lock()
defer p.statsMu.Unlock()
p.stats.Push(float64(s))
mn32, sd32 := float32(p.stats.Mean()), float32(p.stats.StdDev())
u64 := encodeBits(mn32, sd32)
p.rStats.Store(u64)
// reduced precision for consistency with the values passed to `Create`
return float64(mn32), float64(sd32)
}
func encodeBits(lo, hi float32) uint64 {
return uint64(math.Float32bits(lo)) +
uint64(math.Float32bits(hi))<<32
}
func decodeBits(u64 uint64) (lo, hi float32) {
return math.Float32frombits(uint32(u64 & (1<<32 - 1))),
math.Float32frombits(uint32(u64 >> 32))
}
type pool interface {
Get() any
Put(any)
}