-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
outbox.go
354 lines (325 loc) · 11.4 KB
/
outbox.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package colrpc
import (
"bytes"
"context"
"io"
"sync/atomic"
"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/col/colserde"
"github.com/cockroachdb/cockroach/pkg/col/coltypes"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/sql/colexec"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/execerror"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"google.golang.org/grpc"
)
// flowStreamClient is a utility interface used to mock out the RPC layer.
type flowStreamClient interface {
Send(*execinfrapb.ProducerMessage) error
Recv() (*execinfrapb.ConsumerSignal, error)
CloseSend() error
}
// Dialer is used for dialing based on node IDs. It extracts out the single
// method that Outbox.Run needs from nodedialer.Dialer so that we can mock it
// in tests outside of this package.
type Dialer interface {
Dial(context.Context, roachpb.NodeID, rpc.ConnectionClass) (*grpc.ClientConn, error)
}
// Outbox is used to push data from local flows to a remote endpoint. Run may
// be called with the necessary information to establish a connection to a
// given remote endpoint.
type Outbox struct {
colexec.OneInputNode
typs []coltypes.T
// batch is the last batch received from the input.
batch coldata.Batch
converter *colserde.ArrowBatchConverter
serializer *colserde.RecordBatchSerializer
// draining is an atomic that represents whether the Outbox is draining.
draining uint32
metadataSources []execinfrapb.MetadataSource
// closers is a slice of Closers that need to be Closed on termination.
closers []colexec.IdempotentCloser
scratch struct {
buf *bytes.Buffer
msg *execinfrapb.ProducerMessage
}
// A copy of Run's caller ctx, with no StreamID tag.
// Used to pass a clean context to the input.Next.
runnerCtx context.Context
}
// NewOutbox creates a new Outbox.
func NewOutbox(
allocator *colexec.Allocator,
input colexec.Operator,
typs []coltypes.T,
metadataSources []execinfrapb.MetadataSource,
toClose []colexec.IdempotentCloser,
) (*Outbox, error) {
c, err := colserde.NewArrowBatchConverter(typs)
if err != nil {
return nil, err
}
s, err := colserde.NewRecordBatchSerializer(typs)
if err != nil {
return nil, err
}
o := &Outbox{
// Add a deselector as selection vectors are not serialized (nor should they
// be).
OneInputNode: colexec.NewOneInputNode(colexec.NewDeselectorOp(allocator, input, typs)),
typs: typs,
converter: c,
serializer: s,
metadataSources: metadataSources,
closers: toClose,
}
o.scratch.buf = &bytes.Buffer{}
o.scratch.msg = &execinfrapb.ProducerMessage{}
return o, nil
}
func (o *Outbox) close(ctx context.Context) {
for _, closer := range o.closers {
if err := closer.IdempotentClose(ctx); err != nil {
if log.V(1) {
log.Infof(ctx, "error closing Closer: %v", err)
}
}
}
}
// Run starts an outbox by connecting to the provided node and pushing
// coldata.Batches over the stream after sending a header with the provided flow
// and stream ID. Note that an extra goroutine is spawned so that Recv may be
// called concurrently wrt the Send goroutine to listen for drain signals.
// If an io.EOF is received while sending, the outbox will call cancelFn to
// indicate an unexpected termination of the stream.
// If an error is encountered that cannot be sent over the stream, the error
// will be logged but not returned.
// There are several ways the bidirectional FlowStream RPC may terminate.
// 1) Execution is finished. In this case, the upstream operator signals
// termination by returning a zero-length batch. The Outbox will drain its
// metadata sources, send the metadata, and then call CloseSend on the
// stream. The Outbox will wait until its Recv goroutine receives a non-nil
// error to not leak resources.
// 2) A cancellation happened. This can come from the provided context or the
// remote reader. Refer to tests for expected behavior.
// 3) A drain signal was received from the server (consumer). In this case, the
// Outbox goes through the same steps as 1).
func (o *Outbox) Run(
ctx context.Context,
dialer Dialer,
nodeID roachpb.NodeID,
flowID execinfrapb.FlowID,
streamID execinfrapb.StreamID,
cancelFn context.CancelFunc,
) {
o.runnerCtx = ctx
ctx = logtags.AddTag(ctx, "streamID", streamID)
log.VEventf(ctx, 2, "Outbox Dialing %s", nodeID)
var stream execinfrapb.DistSQL_FlowStreamClient
if err := func() error {
conn, err := dialer.Dial(ctx, nodeID, rpc.DefaultClass)
if err != nil {
log.Warningf(
ctx,
"Outbox Dial connection error, distributed query will fail: %+v",
err,
)
return err
}
client := execinfrapb.NewDistSQLClient(conn)
stream, err = client.FlowStream(ctx)
if err != nil {
log.Warningf(
ctx,
"Outbox FlowStream connection error, distributed query will fail: %+v",
err,
)
return err
}
log.VEvent(ctx, 2, "Outbox sending header")
// Send header message to establish the remote server (consumer).
if err := stream.Send(
&execinfrapb.ProducerMessage{Header: &execinfrapb.ProducerHeader{FlowID: flowID, StreamID: streamID}},
); err != nil {
log.Warningf(
ctx,
"Outbox Send header error, distributed query will fail: %+v",
err,
)
return err
}
return nil
}(); err != nil {
// error during stream set up.
o.close(ctx)
return
}
log.VEvent(ctx, 2, "Outbox starting normal operation")
o.runWithStream(ctx, stream, cancelFn)
log.VEvent(ctx, 2, "Outbox exiting")
}
// handleStreamErr is a utility method used to handle an error when calling
// a method on a flowStreamClient. If err is an io.EOF, cancelFn is called. The
// given error is logged with the associated opName.
func (o *Outbox) handleStreamErr(
ctx context.Context, opName string, err error, cancelFn context.CancelFunc,
) {
if err == io.EOF {
if log.V(1) {
log.Infof(ctx, "Outbox calling cancelFn after %s EOF", opName)
}
cancelFn()
} else {
if log.V(1) {
log.Warningf(ctx, "Outbox %s connection error: %+v", opName, err)
}
}
}
func (o *Outbox) moveToDraining(ctx context.Context) {
if atomic.CompareAndSwapUint32(&o.draining, 0, 1) {
log.VEvent(ctx, 2, "Outbox moved to draining")
}
}
// sendBatches reads from the Outbox's input in a loop and sends the
// coldata.Batches over the stream. A boolean is returned, indicating whether
// execution completed gracefully (either received a zero-length batch or a
// drain signal) as well as an error which is non-nil if an error was
// encountered AND the error should be sent over the stream as metadata. The for
// loop continues iterating until one of the following conditions becomes true:
// 1) A zero-length batch is received from the input. This indicates graceful
// termination. true, nil is returned.
// 2) Outbox.draining is observed to be true. This is also considered graceful
// termination. true, nil is returned.
// 3) An error unrelated to the stream occurs (e.g. while deserializing a
// coldata.Batch). false, err is returned. This err should be sent over the
// stream as metadata.
// 4) An error related to the stream occurs. In this case, the error is logged
// but not returned, as there is no way to propagate this error anywhere
// meaningful. false, nil is returned. NOTE: io.EOF is a special case. This
// indicates non-graceful termination initiated by the remote Inbox. cancelFn
// will be called in this case.
func (o *Outbox) sendBatches(
ctx context.Context, stream flowStreamClient, cancelFn context.CancelFunc,
) (terminatedGracefully bool, _ error) {
nextBatch := func() {
if o.runnerCtx == nil {
o.runnerCtx = ctx
}
o.batch = o.Input().Next(o.runnerCtx)
}
serializeBatch := func() {
o.scratch.buf.Reset()
d, err := o.converter.BatchToArrow(o.batch)
if err != nil {
execerror.VectorizedInternalPanic(errors.Wrap(err, "Outbox BatchToArrow data serialization error"))
}
if _, _, err := o.serializer.Serialize(o.scratch.buf, d); err != nil {
execerror.VectorizedInternalPanic(errors.Wrap(err, "Outbox Serialize data error"))
}
}
for {
if atomic.LoadUint32(&o.draining) == 1 {
return true, nil
}
if err := execerror.CatchVectorizedRuntimeError(nextBatch); err != nil {
if log.V(1) {
log.Warningf(ctx, "Outbox Next error: %+v", err)
}
return false, err
}
if o.batch.Length() == 0 {
return true, nil
}
if err := execerror.CatchVectorizedRuntimeError(serializeBatch); err != nil {
log.Errorf(ctx, "%+v", err)
return false, err
}
o.scratch.msg.Data.RawBytes = o.scratch.buf.Bytes()
// o.scratch.msg can be reused as soon as Send returns since it returns as
// soon as the message is written to the control buffer. The message is
// marshaled (bytes are copied) before writing.
if err := stream.Send(o.scratch.msg); err != nil {
o.handleStreamErr(ctx, "Send (batches)", err, cancelFn)
return false, nil
}
}
}
// sendMetadata drains the Outbox.metadataSources and sends the metadata over
// the given stream, returning the Send error, if any. sendMetadata also sends
// errToSend as metadata if non-nil.
func (o *Outbox) sendMetadata(ctx context.Context, stream flowStreamClient, errToSend error) error {
msg := &execinfrapb.ProducerMessage{}
if errToSend != nil {
msg.Data.Metadata = append(
msg.Data.Metadata, execinfrapb.LocalMetaToRemoteProducerMeta(ctx, execinfrapb.ProducerMetadata{Err: errToSend}),
)
}
for _, src := range o.metadataSources {
for _, meta := range src.DrainMeta(ctx) {
msg.Data.Metadata = append(msg.Data.Metadata, execinfrapb.LocalMetaToRemoteProducerMeta(ctx, meta))
}
}
if len(msg.Data.Metadata) == 0 {
return nil
}
return stream.Send(msg)
}
// runWithStream should be called after sending the ProducerHeader on the
// stream. It implements the behavior described in Run.
func (o *Outbox) runWithStream(
ctx context.Context, stream flowStreamClient, cancelFn context.CancelFunc,
) {
o.Input().Init()
waitCh := make(chan struct{})
go func() {
for {
msg, err := stream.Recv()
if err != nil {
if err != io.EOF {
if log.V(1) {
log.Warningf(ctx, "Outbox Recv connection error: %+v", err)
}
}
break
}
switch {
case msg.Handshake != nil:
log.VEventf(ctx, 2, "Outbox received handshake: %v", msg.Handshake)
case msg.DrainRequest != nil:
o.moveToDraining(ctx)
}
}
close(waitCh)
}()
terminatedGracefully, errToSend := o.sendBatches(ctx, stream, cancelFn)
if terminatedGracefully || errToSend != nil {
o.moveToDraining(ctx)
if err := o.sendMetadata(ctx, stream, errToSend); err != nil {
o.handleStreamErr(ctx, "Send (metadata)", err, cancelFn)
} else {
// Close the stream. Note that if this block isn't reached, the stream
// is unusable.
// The receiver goroutine will read from the stream until io.EOF is
// returned.
if err := stream.CloseSend(); err != nil {
o.handleStreamErr(ctx, "CloseSend", err, cancelFn)
}
}
}
o.close(ctx)
<-waitCh
}