-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
Copy pathsingleflight.go
382 lines (345 loc) · 12.4 KB
/
singleflight.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in licenses/BSD-golang.txt.
// This code originated in Go's internal/singleflight package.
// Package singleflight provides a duplicate function call suppression
// mechanism.
package singleflight
import (
"context"
"fmt"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"go.opentelemetry.io/otel/attribute"
)
// call is an in-flight or completed singleflight.Do/DoChan call.
type call struct {
opName, key string
// c is closed when the call completes, signaling all waiters.
c chan struct{}
// sp is the tracing span of the flight leader. Nil if the leader does not
// have a span. This span's recording is captured as `rec` below, and might
// get copied into the traces of other flight members. A non-leader caller
// with a recording trace will enable recording on this span dynamically.
sp *tracing.Span
/////////////////////////////////////////////////////////////////////////////
// These fields are written once before the channel is closed and are only
// read after the channel is closed.
/////////////////////////////////////////////////////////////////////////////
val interface{}
err error
// rec is the call's recording, if any of the callers that joined the call
// requested the trace to be recording.
rec tracing.Trace
/////////////////////////////////////////////////////////////////////////////
// These fields are read and written with the singleflight mutex held before
// the channel is closed, and are read but not written after the chanel is
// closed.
/////////////////////////////////////////////////////////////////////////////
dups int
}
func newCall(opName, key string) *call {
c := &call{
opName: opName,
key: key,
c: make(chan struct{}),
}
return c
}
func (c *call) maybeStartRecording(mode tracingpb.RecordingType) {
if c.sp.RecordingType() < mode {
c.sp.SetRecordingType(mode)
}
}
// Group represents a class of work and forms a namespace in
// which units of work can be executed with duplicate suppression.
type Group struct {
// opName is used as the operation name of the spans produced by this Group
// for every flight.
opName string
// tagName represents the name of the tag containing the key for each flight.
// If not set, the spans do not get such a tag.
tagName string
mu syncutil.Mutex // protects m
m map[string]*call // lazily initialized
}
// NoTags can be passed to NewGroup as the tagName to indicate that the tracing
// spans created for operations should not have the operation key as a tag. In
// particular, in cases where a single dummy key is used with a Group, having it
// as a tag is not necessary.
const NoTags = ""
// NewGroup creates a Group.
//
// opName will be used as the operation name of the spans produced by this Group
// for every flight.
// tagName will be used as the name of the span tag containing the key for each
// flight span. If NoTags is passed, the spans do not get such tags.
func NewGroup(opName, tagName string) *Group {
return &Group{opName: opName, tagName: tagName}
}
// Result holds the results of Do, so they can be passed
// on a channel.
type Result struct {
// Val represents the call's return value.
Val interface{}
// Err represents the call's error, if any.
Err error
// Shared is set if the result has been shared with multiple callers.
Shared bool
// Leader is set if the caller was the flight leader (the caller that
// triggered the flight).
Leader bool
}
// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
// The return value shared indicates whether v was given to multiple callers.
//
// NOTE: If fn responds to ctx cancelation by interrupting the work and
// returning an error, canceling ctx might propagate such error to other callers
// if a flight's leader's ctx is canceled. See DoChan for more control over this
// behavior.
func (g *Group) Do(
ctx context.Context, key string, fn func(context.Context) (interface{}, error),
) (v interface{}, shared bool, err error) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
c.maybeStartRecording(tracing.SpanFromContext(ctx).RecordingType())
g.mu.Unlock()
log.Eventf(ctx, "waiting on singleflight %s:%s owned by another leader. Starting to record the leader's flight.", g.opName, key)
// Block on the call.
<-c.c
log.Eventf(ctx, "waiting on singleflight %s:%s owned by another leader... done", g.opName, key)
// Get the call's result through result() so that the call's trace gets
// imported into ctx.
res := c.result(ctx, false /* leader */)
return res.Val, true, res.Err
}
c := newCall(g.opName, key)
g.m[key] = c
g.mu.Unlock()
g.doCall(ctx,
c, key,
DoOpts{
Stop: nil,
InheritCancelation: true,
},
fn)
return c.val, c.dups > 0, c.err
}
// DoOpts groups options for the DoChan() method.
type DoOpts struct {
// Stop, if not nil, is used both to create the flight in a stopper task and
// to cancel the flight's ctx on stopper quiescence.
Stop *stop.Stopper
// InheritCancelation controls whether the cancelation of the caller's ctx
// affects the flight. If set, the flight closure gets the caller's ctx. If
// not set, the closure runs in a different ctx which does not inherit the
// caller's cancelation. It is common to not want the flight to inherit the
// caller's cancelation, so that a canceled leader does not propagate an error
// to everybody else that joined the sane flight.
InheritCancelation bool
}
// Future is the return type of the DoChan() call.
type Future struct {
call *call
leader bool
}
func makeFuture(c *call, leader bool) Future {
return Future{
call: c,
leader: leader,
}
}
// C returns the channel on which the result of the DoChan call will be
// delivered.
func (f Future) C() <-chan struct{} {
return f.call.c
}
// Result delivers the flight's result. If called before the call is done
// (i.e. before channel returned by C() is closed), then it will block;
// canceling ctx unblocks it.
//
// If the ctx has a recording tracing span, and if the context passed to DoChan
// also had a recording span in it (commonly because the same ctx was used),
// then the recording of the flight will be ingested into ctx even if this
// caller was not the flight's leader.
func (f Future) Result(ctx context.Context) Result {
return f.call.result(ctx, f.leader)
}
// result returns the call's results. It will block until the call completes.
func (c *call) result(ctx context.Context, leader bool) Result {
// Wait for the call to finish.
select {
// Give priority to c.c to ensure that a context error is not returned if the
// call is done.
case <-c.c:
default:
select {
case <-c.c:
case <-ctx.Done():
op := fmt.Sprintf("%s:%s", c.opName, c.key)
if !leader {
log.Eventf(ctx, "waiting for singleflight interrupted: %v", ctx.Err())
}
return Result{
Val: nil,
Err: errors.Wrapf(ctx.Err(), "interrupted during singleflight %s", op),
Shared: false,
Leader: leader}
}
}
if !leader {
// Copy over the call's trace.
sp := tracing.SpanFromContext(ctx)
if sp.RecordingType() != tracingpb.RecordingOff {
if rec := c.rec; !rec.Empty() {
tracing.SpanFromContext(ctx).ImportTrace(rec.PartialClone())
}
}
}
return Result{
Val: c.val,
Err: c.err,
Shared: c.dups > 0,
Leader: leader,
}
}
// DoChan is like Do but returns a Future that will receive the results when
// they are ready. The method also returns a boolean specifying whether the
// caller's fn function will be called or not. This return value lets callers
// identify a unique "leader" for a flight.
//
// Close() must be called on the returned Future if the caller does not
// wait for the Future's result.
//
// opts controls details about how the flight is to run.
//
// NOTE: DoChan makes it possible to initiate or join a flight while holding a
// lock without holding it for the duration of the flight. A common usage
// pattern is:
// 1. Check some datastructure to see if it contains the value you're looking
// for.
// 2. If it doesn't, initiate or join a flight to produce it.
//
// Step one is expected to be done while holding a lock. Modifying the
// datastructure in the callback is expected to need to take the same lock. Once
// a caller proceeds to step two, it likely wants to keep the lock until
// DoChan() returned a channel, in order to ensure that a flight is only started
// before any modifications to the datastructure occurred (relative to the state
// observed in step one). Were the lock to be released before calling DoChan(),
// a previous flight might modify the datastructure before our flight began.
//
// In addition to the above, another reason for using DoChan over Do is so that
// the caller can be canceled while not propagating the cancelation to other
// innocent callers that joined the same flight; the caller can listen to its
// cancelation in parallel to the result channel. See DoOpts.InheritCancelation.
func (g *Group) DoChan(
ctx context.Context, key string, opts DoOpts, fn func(context.Context) (interface{}, error),
) (Future, bool) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
c.maybeStartRecording(tracing.SpanFromContext(ctx).RecordingType())
g.mu.Unlock()
log.Eventf(ctx, "joining singleflight %s:%s owned by another leader", g.opName, key)
return makeFuture(c, false /* leader */), false
}
c := newCall(g.opName, key)
g.m[key] = c
g.mu.Unlock()
go g.doCall(ctx, c, key, opts, fn)
return makeFuture(c, true /* leader */), true
}
// doCall handles the single call for a key. At the end of the call, c.waiters
// are signaled (if any).
func (g *Group) doCall(
ctx context.Context,
c *call,
key string,
opts DoOpts,
fn func(ctx context.Context) (interface{}, error),
) {
// Prepare the ctx for the call.
// Open a child span for the flight. Note that this child span might outlive
// its parent if the caller doesn't wait for the result of this flight. It's
// common for the caller to not always wait, particularly if
// opts.InheritCancelation == false (i.e. if the caller can be canceled
// independently of the flight).
ctx, sp := tracing.ChildSpan(ctx, g.opName)
if g.tagName != "" {
sp.SetTag(g.tagName, attribute.StringValue(key))
}
defer func() {
if sp != nil {
sp.Finish()
}
}()
// Put a reference to the span in the call so that non-leaders can enable
// recording on it.
c.sp = sp
if !opts.InheritCancelation {
// Copy the log tags and the span.
ctx = logtags.AddTags(context.Background(), logtags.FromContext(ctx))
ctx = tracing.ContextWithSpan(ctx, sp)
}
if opts.Stop != nil {
var cancel func()
ctx, cancel = opts.Stop.WithCancelOnQuiesce(ctx)
defer cancel()
if err := opts.Stop.RunTask(ctx, g.opName+":"+key, func(ctx context.Context) {
c.val, c.err = fn(ctx)
}); err != nil {
c.err = err
}
} else {
c.val, c.err = fn(ctx)
}
g.mu.Lock()
delete(g.m, key)
c.rec = sp.FinishAndGetTraceRecording(sp.RecordingType())
sp = nil // Inhibit the deferred closing of the span.
// Publish the results to all waiters.
close(c.c)
g.mu.Unlock()
}
var _ = (*Group).Forget
// Forget tells the singleflight to forget about a key. Future calls
// to Do for this key will call the function rather than waiting for
// an earlier call to complete.
func (g *Group) Forget(key string) {
g.mu.Lock()
delete(g.m, key)
g.mu.Unlock()
}
// NumCalls returns the number of in-flight calls for a given key.
func (g *Group) NumCalls(key string) int {
g.mu.Lock()
defer g.mu.Unlock()
if c, ok := g.m[key]; ok {
return c.dups + 1
}
return 0
}