-
Notifications
You must be signed in to change notification settings - Fork 3.9k
/
Copy pathinternal_result_channel.go
216 lines (190 loc) · 6.48 KB
/
internal_result_channel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package sql
import (
"context"
"sync"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/errors"
)
// ieResultReader is used to read internalExecutor results.
// It is managed by the rowsIterator.
type ieResultReader interface {
// firstResult returns the first result. The return values carry the same
// semantics as of nextResult. This method assumes that the writer is not
// currently blocked and waits for the initial result to be written.
firstResult(ctx context.Context) (_ ieIteratorResult, done bool, err error)
// nextResult returns the nextResult. Done will always be true if err
// is non-nil. Err will be non-nil if either close has been called or
// the passed context is finished.
nextResult(ctx context.Context) (_ ieIteratorResult, done bool, err error)
// close ensures that the either writer has finished writing. In the case
// of an asynchronous channel, close will drain the writer's channel. In the
// case of the synchronous channel, it will ensure that the writer receives
// an error when it wakes.
close() error
}
// ieResultWriter is used by the internalExecutor to write results to an
// iterator.
type ieResultWriter interface {
// addResult adds a result. It may block until the next result is requested
// by the reader, depending on the synchronization strategy.
addResult(ctx context.Context, result ieIteratorResult) error
// finish is used to indicate that the writer is done writing rows.
finish()
}
var asyncIEResultChannelBufferSize = util.ConstantWithMetamorphicTestRange(
"async-IE-result-channel-buffer-size",
32, /* defaultValue */
1, /* min */
32, /* max */
)
// newAsyncIEResultChannel returns an ieResultChannel which does not attempt to
// synchronize the writer with the reader.
func newAsyncIEResultChannel() *ieResultChannel {
return &ieResultChannel{
dataCh: make(chan ieIteratorResult, asyncIEResultChannelBufferSize),
doneCh: make(chan struct{}),
}
}
// ieResultChannel is used to coordinate passing results from an
// internalExecutor to its corresponding iterator. It can be constructed to
// ensure that there is no concurrency between the reader and writer.
type ieResultChannel struct {
// dataCh is the channel on which the connExecutor goroutine sends the rows
// (in addResult) and will block on waitCh after each send. The iterator
// goroutine blocks on dataCh until there is something to receive (rows or
// other metadata) and will return the data to the caller. On the next call
// to Next(), the iterator goroutine unblocks the producer and will block
// itself again. dataCh will be closed (in finish()) when the connExecutor
// goroutine exits its run() loop whereas waitCh is closed when closing the
// iterator.
dataCh chan ieIteratorResult
// waitCh is nil for async ieResultChannels. It is never closed. In all places
// where the caller may interact with it the doneCh is also used. This policy
// is in place to make it safe to unblock both the reader and the writer
// without any hazards of a blocked reader attempting to send on a closed
// channel.
waitCh chan struct{}
// doneCh is used to indicate that the ReadWriter has been closed.
// doneCh is closed under the doneOnce. The doneCh is only used for the
// ieResultChannel. This is crucial to ensure that a synchronous writer
// does not attempt to continue to operate after the reader has called close.
doneCh chan struct{}
doneErr error
doneOnce sync.Once
}
// newSyncIEResultChannel is used to ensure that in execution scenarios which
// do not permit concurrency that there is none. It works by blocking the
// writing goroutine immediately upon sending on the data channel and only
// unblocking it after the reader signals.
func newSyncIEResultChannel() *ieResultChannel {
return &ieResultChannel{
dataCh: make(chan ieIteratorResult),
waitCh: make(chan struct{}),
doneCh: make(chan struct{}),
}
}
func (i *ieResultChannel) firstResult(
ctx context.Context,
) (_ ieIteratorResult, done bool, err error) {
select {
case <-ctx.Done():
return ieIteratorResult{}, true, ctx.Err()
case <-i.doneCh:
return ieIteratorResult{}, true, ctx.Err()
case res, ok := <-i.dataCh:
if !ok {
return ieIteratorResult{}, true, ctx.Err()
}
return res, false, nil
}
}
func (i *ieResultChannel) maybeUnblockWriter(ctx context.Context) (done bool, err error) {
if i.async() {
return false, nil
}
select {
case <-ctx.Done():
return true, ctx.Err()
case <-i.doneCh:
return true, ctx.Err()
case i.waitCh <- struct{}{}:
return false, nil
}
}
func (i *ieResultChannel) async() bool {
return i.waitCh == nil
}
func (i *ieResultChannel) nextResult(
ctx context.Context,
) (_ ieIteratorResult, done bool, err error) {
if done, err = i.maybeUnblockWriter(ctx); done {
return ieIteratorResult{}, done, err
}
return i.firstResult(ctx)
}
func (i *ieResultChannel) close() error {
i.doneOnce.Do(func() {
close(i.doneCh)
for {
res, done, err := i.nextResult(context.TODO())
if i.doneErr == nil {
if res.err != nil {
i.doneErr = res.err
} else if err != nil {
i.doneErr = err
}
}
if done {
return
}
}
})
return i.doneErr
}
// errIEResultChannelClosed is returned by the writer when the reader has
// closed ieResultChannel. The error indicates to the writer to shut down
// the query execution, but the reader won't propagate it further.
var errIEResultChannelClosed = errors.New("ieResultReader closed")
func (i *ieResultChannel) addResult(ctx context.Context, result ieIteratorResult) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-i.doneCh:
// Prefer the context error if there is one.
if ctxErr := ctx.Err(); ctxErr != nil {
return ctxErr
}
return errIEResultChannelClosed
case i.dataCh <- result:
}
return i.maybeBlock(ctx)
}
func (i *ieResultChannel) maybeBlock(ctx context.Context) error {
if i.async() {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case <-i.doneCh:
// Prefer the context error if there is one.
if ctxErr := ctx.Err(); ctxErr != nil {
return ctxErr
}
return errIEResultChannelClosed
case <-i.waitCh:
return nil
}
}
func (i *ieResultChannel) finish() {
close(i.dataCh)
}