Skip to content

Commit

Permalink
colexec: create new message to send metadata in unordered synchronizer
Browse files Browse the repository at this point in the history
This commit fixes a race condition where a metadata message would be
double-freed and therefore the same object returned to two different goroutines
from a sync.Pool.

The root cause of this issue was that input goroutines in the parallel
unordered synchronizer use a single message that is sent repeatedly over a
channel instead of multiple messages to avoid allocations. A scenario could
occur where an input would drain metadata and set its message's metadata field
while its message was still unread in the channel. The message would then be
sent on the channel again, and the synchronizer's DrainMeta method would read
the first message with the metadata field set, followed by the same message a
second time. This results in returning the same metadata message twice to the
distsql receiver, which would release the same metadata twice.

The solution is to instead allocate a new message when draining, which will
leave message already present in the channel untouched.

Release note: None (no release with bug)
  • Loading branch information
asubiotto committed Aug 19, 2020
1 parent ebd5c73 commit 43710e5
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 4 deletions.
12 changes: 9 additions & 3 deletions pkg/sql/colexec/parallel_unordered_synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ import (
"github.com/cockroachdb/errors"
)

// unorderedSynchronizerMsg is a light wrapper over a coldata.Batch sent over a
// channel so that the main goroutine can know which input this message
// originated from.
// unorderedSynchronizerMsg is a light wrapper over a coldata.Batch or metadata
// sent over a channel so that the main goroutine can know which input this
// message originated from.
// Note that either a batch or metadata must be sent, but not both.
type unorderedSynchronizerMsg struct {
inputIdx int
b coldata.Batch
Expand Down Expand Up @@ -237,6 +238,11 @@ func (s *ParallelUnorderedSynchronizer) init(ctx context.Context) {
// In case of a zero-length batch, proceed to drain the input.
fallthrough
case parallelUnorderedSynchronizerStateDraining:
// Create a new message for metadata. The previous message cannot be
// overwritten since it might still be in the channel.
msg = &unorderedSynchronizerMsg{
inputIdx: inputIdx,
}
if input.MetadataSources != nil {
msg.meta = input.MetadataSources.DrainMeta(ctx)
}
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/rowexec/processors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -852,7 +852,6 @@ func TestUncertaintyErrorIsReturned(t *testing.T) {
testutils.RunTrueAndFalse(t, "vectorize", func(t *testing.T, vectorize bool) {
vectorizeOpt := "off"
if vectorize {
skip.WithIssue(t, 52948)
vectorizeOpt = "on"
}
for _, testCase := range testCases {
Expand Down

0 comments on commit 43710e5

Please sign in to comment.