-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
server.go
609 lines (562 loc) · 21.1 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
// Copyright 2016 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package distsql
import (
"context"
"io"
"sync"
"time"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/sql/colflow"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
"github.com/cockroachdb/cockroach/pkg/sql/lex"
"github.com/cockroachdb/cockroach/pkg/sql/rowflow"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"github.com/opentracing/opentracing-go"
)
// minFlowDrainWait is the minimum amount of time a draining server allows for
// any incoming flows to be registered. It acts as a grace period in which the
// draining server waits for its gossiped draining state to be received by other
// nodes.
const minFlowDrainWait = 1 * time.Second
const distSQLGossipIssueNo = 47900
var noteworthyMemoryUsageBytes = envutil.EnvOrDefaultInt64("COCKROACH_NOTEWORTHY_DISTSQL_MEMORY_USAGE", 1024*1024 /* 1MB */)
// ServerImpl implements the server for the distributed SQL APIs.
type ServerImpl struct {
execinfra.ServerConfig
flowRegistry *flowinfra.FlowRegistry
flowScheduler *flowinfra.FlowScheduler
memMonitor mon.BytesMonitor
regexpCache *tree.RegexpCache
}
var _ execinfrapb.DistSQLServer = &ServerImpl{}
// NewServer instantiates a DistSQLServer.
func NewServer(ctx context.Context, cfg execinfra.ServerConfig) *ServerImpl {
ds := &ServerImpl{
ServerConfig: cfg,
regexpCache: tree.NewRegexpCache(512),
flowRegistry: flowinfra.NewFlowRegistry(cfg.NodeID.Get()),
flowScheduler: flowinfra.NewFlowScheduler(cfg.AmbientContext, cfg.Stopper, cfg.Settings, cfg.Metrics),
memMonitor: mon.MakeMonitor(
"distsql",
mon.MemoryResource,
cfg.Metrics.CurBytesCount,
cfg.Metrics.MaxBytesHist,
-1, /* increment: use default block size */
noteworthyMemoryUsageBytes,
cfg.Settings,
),
}
ds.memMonitor.Start(ctx, cfg.ParentMemoryMonitor, mon.BoundAccount{})
return ds
}
// Start launches workers for the server.
func (ds *ServerImpl) Start() {
// Gossip the version info so that other nodes don't plan incompatible flows
// for us.
if err := ds.ServerConfig.Gossip.Deprecated(distSQLGossipIssueNo).AddInfoProto(
gossip.MakeDistSQLNodeVersionKey(ds.ServerConfig.NodeID.Get()),
&execinfrapb.DistSQLVersionGossipInfo{
Version: execinfra.Version,
MinAcceptedVersion: execinfra.MinAcceptedVersion,
},
0, // ttl - no expiration
); err != nil {
panic(err)
}
if err := ds.setDraining(false); err != nil {
panic(err)
}
ds.flowScheduler.Start()
}
// Drain changes the node's draining state through gossip and drains the
// server's flowRegistry. See flowRegistry.Drain for more details.
func (ds *ServerImpl) Drain(
ctx context.Context, flowDrainWait time.Duration, reporter func(int, string),
) {
if err := ds.setDraining(true); err != nil {
log.Warningf(ctx, "unable to gossip distsql draining state: %s", err)
}
flowWait := flowDrainWait
minWait := minFlowDrainWait
if ds.ServerConfig.TestingKnobs.DrainFast {
flowWait = 0
minWait = 0
} else if len(ds.Gossip.Deprecated(distSQLGossipIssueNo).Outgoing()) == 0 {
// If there is only one node in the cluster (us), there's no need to
// wait a minimum time for the draining state to be gossiped.
minWait = 0
}
ds.flowRegistry.Drain(flowWait, minWait, reporter)
}
// setDraining changes the node's draining state through gossip to the provided
// state.
func (ds *ServerImpl) setDraining(drain bool) error {
return ds.ServerConfig.Gossip.Deprecated(distSQLGossipIssueNo).AddInfoProto(
gossip.MakeDistSQLDrainingKey(ds.ServerConfig.NodeID.Get()),
&execinfrapb.DistSQLDrainingInfo{
Draining: drain,
},
0, // ttl - no expiration
)
}
// FlowVerIsCompatible checks a flow's version is compatible with this node's
// DistSQL version.
func FlowVerIsCompatible(
flowVer, minAcceptedVersion, serverVersion execinfrapb.DistSQLVersion,
) bool {
return flowVer >= minAcceptedVersion && flowVer <= serverVersion
}
// setupFlow creates a Flow.
//
// Args:
// localState: Specifies if the flow runs entirely on this node and, if it does,
// specifies the txn and other attributes.
//
// Note: unless an error is returned, the returned context contains a span that
// must be finished through Flow.Cleanup.
func (ds *ServerImpl) setupFlow(
ctx context.Context,
parentSpan opentracing.Span,
parentMonitor *mon.BytesMonitor,
req *execinfrapb.SetupFlowRequest,
syncFlowConsumer execinfra.RowReceiver,
localState LocalState,
) (context.Context, flowinfra.Flow, error) {
if !FlowVerIsCompatible(req.Version, execinfra.MinAcceptedVersion, execinfra.Version) {
err := errors.Errorf(
"version mismatch in flow request: %d; this node accepts %d through %d",
req.Version, execinfra.MinAcceptedVersion, execinfra.Version,
)
log.Warning(ctx, err)
return ctx, nil, err
}
nodeID := ds.ServerConfig.NodeID.Get()
if nodeID == 0 {
return nil, nil, errors.AssertionFailedf("setupFlow called before the NodeID was resolved")
}
const opName = "flow"
var sp opentracing.Span
if parentSpan == nil {
sp = ds.Tracer.(*tracing.Tracer).StartRootSpan(
opName, logtags.FromContext(ctx), tracing.NonRecordableSpan)
} else if localState.IsLocal {
// If we're a local flow, we don't need a "follows from" relationship: we're
// going to run this flow synchronously.
// TODO(andrei): localState.IsLocal is not quite the right thing to use.
// If that field is unset, we might still want to create a child span if
// this flow is run synchronously.
sp = tracing.StartChildSpan(opName, parentSpan, logtags.FromContext(ctx), false /* separateRecording */)
} else {
// We use FollowsFrom because the flow's span outlives the SetupFlow request.
// TODO(andrei): We should use something more efficient than StartSpan; we
// should use AmbientContext.AnnotateCtxWithSpan() but that interface
// doesn't currently support FollowsFrom relationships.
sp = ds.Tracer.StartSpan(
opName,
opentracing.FollowsFrom(parentSpan.Context()),
tracing.LogTagsFromCtx(ctx),
)
}
// sp will be Finish()ed by Flow.Cleanup().
ctx = opentracing.ContextWithSpan(ctx, sp)
// The monitor opened here is closed in Flow.Cleanup().
monitor := mon.MakeMonitor(
"flow",
mon.MemoryResource,
ds.Metrics.CurBytesCount,
ds.Metrics.MaxBytesHist,
-1, /* use default block size */
noteworthyMemoryUsageBytes,
ds.Settings,
)
monitor.Start(ctx, parentMonitor, mon.BoundAccount{})
makeLeaf := func(req *execinfrapb.SetupFlowRequest) (*kv.Txn, error) {
tis := req.LeafTxnInputState
if tis == nil {
// This must be a flow running for some bulk-io operation that doesn't use
// a txn.
return nil, nil
}
if tis.Txn.Status != roachpb.PENDING {
return nil, errors.AssertionFailedf("cannot create flow in non-PENDING txn: %s",
tis.Txn)
}
// The flow will run in a LeafTxn because we do not want each distributed
// Txn to heartbeat the transaction.
return kv.NewLeafTxn(ctx, ds.DB, req.Flow.Gateway, tis), nil
}
var evalCtx *tree.EvalContext
var leafTxn *kv.Txn
if localState.EvalContext != nil {
evalCtx = localState.EvalContext
evalCtx.Mon = &monitor
} else {
if localState.IsLocal {
return nil, nil, errors.AssertionFailedf(
"EvalContext expected to be populated when IsLocal is set")
}
location, err := timeutil.TimeZoneStringToLocation(
req.EvalContext.Location,
timeutil.TimeZoneStringToLocationISO8601Standard,
)
if err != nil {
tracing.FinishSpan(sp)
return ctx, nil, err
}
var be lex.BytesEncodeFormat
switch req.EvalContext.BytesEncodeFormat {
case execinfrapb.BytesEncodeFormat_HEX:
be = lex.BytesEncodeHex
case execinfrapb.BytesEncodeFormat_ESCAPE:
be = lex.BytesEncodeEscape
case execinfrapb.BytesEncodeFormat_BASE64:
be = lex.BytesEncodeBase64
default:
return nil, nil, errors.AssertionFailedf("unknown byte encode format: %s",
errors.Safe(req.EvalContext.BytesEncodeFormat))
}
sd := &sessiondata.SessionData{
ApplicationName: req.EvalContext.ApplicationName,
Database: req.EvalContext.Database,
User: req.EvalContext.User,
SearchPath: sessiondata.MakeSearchPath(req.EvalContext.SearchPath).WithTemporarySchemaName(req.EvalContext.TemporarySchemaName),
SequenceState: sessiondata.NewSequenceState(),
DataConversion: sessiondata.DataConversionConfig{
Location: location,
BytesEncodeFormat: be,
ExtraFloatDigits: int(req.EvalContext.ExtraFloatDigits),
},
VectorizeMode: sessiondata.VectorizeExecMode(req.EvalContext.Vectorize),
}
ie := &lazyInternalExecutor{
newInternalExecutor: func() sqlutil.InternalExecutor {
return ds.SessionBoundInternalExecutorFactory(ctx, sd)
},
}
// It's important to populate evalCtx.Txn early. We'll write it again in the
// f.SetTxn() call below, but by then it will already have been captured by
// processors.
leafTxn, err = makeLeaf(req)
if err != nil {
return nil, nil, err
}
evalCtx = &tree.EvalContext{
Settings: ds.ServerConfig.Settings,
SessionData: sd,
ClusterID: ds.ServerConfig.ClusterID.Get(),
ClusterName: ds.ServerConfig.ClusterName,
NodeID: nodeID,
ReCache: ds.regexpCache,
Mon: &monitor,
// Most processors will override this Context with their own context in
// ProcessorBase. StartInternal().
Context: ctx,
Planner: &sqlbase.DummyEvalPlanner{},
SessionAccessor: &sqlbase.DummySessionAccessor{},
PrivilegedAccessor: &sqlbase.DummyPrivilegedAccessor{},
Sequence: &sqlbase.DummySequenceOperators{},
ClientNoticeSender: &sqlbase.DummyClientNoticeSender{},
InternalExecutor: ie,
Txn: leafTxn,
}
evalCtx.SetStmtTimestamp(timeutil.Unix(0 /* sec */, req.EvalContext.StmtTimestampNanos))
evalCtx.SetTxnTimestamp(timeutil.Unix(0 /* sec */, req.EvalContext.TxnTimestampNanos))
var haveSequences bool
for _, seq := range req.EvalContext.SeqState.Seqs {
evalCtx.SessionData.SequenceState.RecordValue(seq.SeqID, seq.LatestVal)
haveSequences = true
}
if haveSequences {
evalCtx.SessionData.SequenceState.SetLastSequenceIncremented(
*req.EvalContext.SeqState.LastSeqIncremented)
}
}
// TODO(radu): we should sanity check some of these fields.
flowCtx := execinfra.FlowCtx{
AmbientContext: ds.AmbientContext,
Cfg: &ds.ServerConfig,
ID: req.Flow.FlowID,
EvalCtx: evalCtx,
NodeID: nodeID,
TraceKV: req.TraceKV,
Local: localState.IsLocal,
}
// req always contains the desired vectorize mode, regardless of whether we
// have non-nil localState.EvalContext. We don't want to update EvalContext
// itself when the vectorize mode needs to be changed because we would need
// to restore the original value which can have data races under stress.
isVectorized := sessiondata.VectorizeExecMode(req.EvalContext.Vectorize) != sessiondata.VectorizeOff
f := newFlow(flowCtx, ds.flowRegistry, syncFlowConsumer, localState.LocalProcs, isVectorized)
opt := flowinfra.FuseNormally
if localState.IsLocal {
// If there's no remote flows, fuse everything. This is needed in order for
// us to be able to use the RootTxn for the flow 's execution; the RootTxn
// doesn't allow for concurrent operations. Local flows with mutations need
// to use the RootTxn.
opt = flowinfra.FuseAggressively
}
var err error
if ctx, err = f.Setup(ctx, &req.Flow, opt); err != nil {
log.Errorf(ctx, "error setting up flow: %s", err)
// Flow.Cleanup will not be called, so we have to close the memory monitor
// and finish the span manually.
monitor.Stop(ctx)
tracing.FinishSpan(sp)
ctx = opentracing.ContextWithSpan(ctx, nil)
return ctx, nil, err
}
if !f.IsLocal() {
flowCtx.AddLogTag("f", f.GetFlowCtx().ID.Short())
flowCtx.AnnotateCtx(ctx)
telemetry.Inc(sqltelemetry.DistSQLExecCounter)
}
if f.IsVectorized() {
telemetry.Inc(sqltelemetry.VecExecCounter)
}
// Figure out what txn the flow needs to run in, if any. For gateway flows
// that have no remote flows and also no concurrency, the txn comes from
// localState.Txn. Otherwise, we create a txn based on the request's
// LeafTxnInputState.
var txn *kv.Txn
if localState.IsLocal && !f.ConcurrentExecution() {
txn = localState.Txn
} else {
// If I haven't created the leaf already, do it now.
if leafTxn == nil {
var err error
leafTxn, err = makeLeaf(req)
if err != nil {
return nil, nil, err
}
}
txn = leafTxn
}
// TODO(andrei): We're about to overwrite f.EvalCtx.Txn, but the existing
// field has already been captured by various processors and operators that
// have already made a copy of the EvalCtx. In case this is not the gateway,
// we had already set the LeafTxn on the EvalCtx above, so it's OK. In case
// this is the gateway, if we're running with the RootTxn, then again it was
// set above so it's fine. If we're using a LeafTxn on the gateway, though,
// then the processors have erroneously captured the Root. See #41992.
f.SetTxn(txn)
return ctx, f, nil
}
func newFlow(
flowCtx execinfra.FlowCtx,
flowReg *flowinfra.FlowRegistry,
syncFlowConsumer execinfra.RowReceiver,
localProcessors []execinfra.LocalProcessor,
isVectorized bool,
) flowinfra.Flow {
base := flowinfra.NewFlowBase(flowCtx, flowReg, syncFlowConsumer, localProcessors)
if isVectorized {
return colflow.NewVectorizedFlow(base)
}
return rowflow.NewRowBasedFlow(base)
}
// SetupSyncFlow sets up a synchronous flow, connecting the sync response
// output stream to the given RowReceiver. The flow is not started. The flow
// will be associated with the given context.
// Note: the returned context contains a span that must be finished through
// Flow.Cleanup.
func (ds *ServerImpl) SetupSyncFlow(
ctx context.Context,
parentMonitor *mon.BytesMonitor,
req *execinfrapb.SetupFlowRequest,
output execinfra.RowReceiver,
) (context.Context, flowinfra.Flow, error) {
ctx, f, err := ds.setupFlow(ds.AnnotateCtx(ctx), opentracing.SpanFromContext(ctx), parentMonitor,
req, output, LocalState{})
if err != nil {
return nil, nil, err
}
return ctx, f, err
}
// LocalState carries information that is required to set up a flow with wrapped
// planNodes.
type LocalState struct {
EvalContext *tree.EvalContext
// IsLocal is set if the flow is running on the gateway and there are no
// remote flows.
IsLocal bool
// Txn is filled in on the gateway only. It is the RootTxn that the query is running in.
// This will be used directly by the flow if the flow has no concurrency and IsLocal is set.
// If there is concurrency, a LeafTxn will be created.
Txn *kv.Txn
/////////////////////////////////////////////
// Fields below are empty if IsLocal == false
/////////////////////////////////////////////
// LocalProcs is an array of planNodeToRowSource processors. It's in order and
// will be indexed into by the RowSourceIdx field in LocalPlanNodeSpec.
LocalProcs []execinfra.LocalProcessor
}
// SetupLocalSyncFlow sets up a synchronous flow on the current (planning) node.
// It's used by the gateway node to set up the flows local to it.
// It's the same as SetupSyncFlow except it takes the localState.
func (ds *ServerImpl) SetupLocalSyncFlow(
ctx context.Context,
parentMonitor *mon.BytesMonitor,
req *execinfrapb.SetupFlowRequest,
output execinfra.RowReceiver,
localState LocalState,
) (context.Context, flowinfra.Flow, error) {
ctx, f, err := ds.setupFlow(ctx, opentracing.SpanFromContext(ctx), parentMonitor, req, output,
localState)
if err != nil {
return nil, nil, err
}
return ctx, f, err
}
// RunSyncFlow is part of the DistSQLServer interface.
func (ds *ServerImpl) RunSyncFlow(stream execinfrapb.DistSQL_RunSyncFlowServer) error {
// Set up the outgoing mailbox for the stream.
mbox := flowinfra.NewOutboxSyncFlowStream(stream)
firstMsg, err := stream.Recv()
if err != nil {
return err
}
if firstMsg.SetupFlowRequest == nil {
return errors.AssertionFailedf("first message in RunSyncFlow doesn't contain SetupFlowRequest")
}
req := firstMsg.SetupFlowRequest
ctx, f, err := ds.SetupSyncFlow(stream.Context(), &ds.memMonitor, req, mbox)
if err != nil {
return err
}
mbox.SetFlowCtx(f.GetFlowCtx())
if err := ds.Stopper.RunTask(ctx, "distsql.ServerImpl: sync flow", func(ctx context.Context) {
ctx, ctxCancel := contextutil.WithCancel(ctx)
defer ctxCancel()
f.AddStartable(mbox)
ds.Metrics.FlowStart()
if err := f.Run(ctx, func() {}); err != nil {
log.Fatalf(ctx, "unexpected error from syncFlow.Start(): %s "+
"The error should have gone to the consumer.", err)
}
f.Cleanup(ctx)
ds.Metrics.FlowStop()
}); err != nil {
return err
}
return mbox.Err()
}
// SetupFlow is part of the DistSQLServer interface.
func (ds *ServerImpl) SetupFlow(
ctx context.Context, req *execinfrapb.SetupFlowRequest,
) (*execinfrapb.SimpleResponse, error) {
log.VEventf(ctx, 1, "received SetupFlow request from n%v for flow %v", req.Flow.Gateway, req.Flow.FlowID)
parentSpan := opentracing.SpanFromContext(ctx)
// Note: the passed context will be canceled when this RPC completes, so we
// can't associate it with the flow.
ctx = ds.AnnotateCtx(context.Background())
ctx, f, err := ds.setupFlow(ctx, parentSpan, &ds.memMonitor, req, nil /* syncFlowConsumer */, LocalState{})
if err == nil {
err = ds.flowScheduler.ScheduleFlow(ctx, f)
}
if err != nil {
// We return flow deployment errors in the response so that they are
// packaged correctly over the wire. If we return them directly to this
// function, they become part of an rpc error.
return &execinfrapb.SimpleResponse{Error: execinfrapb.NewError(ctx, err)}, nil
}
return &execinfrapb.SimpleResponse{}, nil
}
func (ds *ServerImpl) flowStreamInt(
ctx context.Context, stream execinfrapb.DistSQL_FlowStreamServer,
) error {
// Receive the first message.
msg, err := stream.Recv()
if err != nil {
if err == io.EOF {
return errors.AssertionFailedf("missing header message")
}
return err
}
if msg.Header == nil {
return errors.AssertionFailedf("no header in first message")
}
flowID := msg.Header.FlowID
streamID := msg.Header.StreamID
if log.V(1) {
log.Infof(ctx, "connecting inbound stream %s/%d", flowID.Short(), streamID)
}
f, streamStrategy, cleanup, err := ds.flowRegistry.ConnectInboundStream(
ctx, flowID, streamID, stream, flowinfra.SettingFlowStreamTimeout.Get(&ds.Settings.SV),
)
if err != nil {
return err
}
defer cleanup()
log.VEventf(ctx, 1, "connected inbound stream %s/%d", flowID.Short(), streamID)
return streamStrategy.Run(f.AnnotateCtx(ctx), stream, msg, f)
}
// FlowStream is part of the DistSQLServer interface.
func (ds *ServerImpl) FlowStream(stream execinfrapb.DistSQL_FlowStreamServer) error {
ctx := ds.AnnotateCtx(stream.Context())
err := ds.flowStreamInt(ctx, stream)
if err != nil && log.V(2) {
// flowStreamInt may return an error during normal operation (e.g. a flow
// was canceled as part of a graceful teardown). Log this error at the INFO
// level behind a verbose flag for visibility.
log.Info(ctx, err)
}
return err
}
// lazyInternalExecutor is a tree.InternalExecutor that initializes
// itself only on the first call to QueryRow.
type lazyInternalExecutor struct {
// Set when an internal executor has been initialized.
sqlutil.InternalExecutor
// Used for initializing the internal executor exactly once.
once sync.Once
// newInternalExecutor must be set when instantiating a lazyInternalExecutor,
// it provides an internal executor to use when necessary.
newInternalExecutor func() sqlutil.InternalExecutor
}
var _ sqlutil.InternalExecutor = &lazyInternalExecutor{}
func (ie *lazyInternalExecutor) QueryRowEx(
ctx context.Context,
opName string,
txn *kv.Txn,
opts sqlbase.InternalExecutorSessionDataOverride,
stmt string,
qargs ...interface{},
) (tree.Datums, error) {
ie.once.Do(func() {
ie.InternalExecutor = ie.newInternalExecutor()
})
return ie.InternalExecutor.QueryRowEx(ctx, opName, txn, opts, stmt, qargs...)
}
func (ie *lazyInternalExecutor) QueryRow(
ctx context.Context, opName string, txn *kv.Txn, stmt string, qargs ...interface{},
) (tree.Datums, error) {
ie.once.Do(func() {
ie.InternalExecutor = ie.newInternalExecutor()
})
return ie.InternalExecutor.QueryRow(ctx, opName, txn, stmt, qargs...)
}