Skip to content

Commit

Permalink
colrpc: fix tracing when an inbox receives an error
Browse files Browse the repository at this point in the history
Currently, whenever an inbox encounters an error in `Next`, regardless
of the origin of error, we close the inbox which results in shutting
down the gRPC stream. However, such behavior is intended to occur only
in case of an ungraceful termination of the gRPC stream itself, and for
other error cases (like an error is sent from the outbox, internal error
in the inbox) we want to keep the stream alive in order to be able to
drain it later. As a consequence of the current incorrect behavior,
whenever a remote node encounters an error, we will not collect the
remote metadata, so we won't populate the trace (if the tracing is
enabled). This bug was introduced during a large refactor in
3771c58 and is now fixed.

Another thing this commit fixes is that all metadata objects that are
sent together with an error by the outbox are now correctly buffered to
be returned when draining. Previously, they would just get lost, but
that didn't matter since the inbox was marked as "done", so draining was
a noop (mistakenly).

Release note (bug fix): Previously, whenever a distributed query
resulted in an error on the remote node, then the trace would be
incomplete. This is now fixed.
  • Loading branch information
yuzefovich committed Mar 31, 2022
1 parent a0632ce commit 773d9ca
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 8 deletions.
1 change: 1 addition & 0 deletions pkg/sql/colflow/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ go_test(
srcs = [
"colbatch_scan_test.go",
"dep_test.go",
"draining_test.go",
"main_test.go",
"routers_test.go",
"stats_test.go",
Expand Down
9 changes: 9 additions & 0 deletions pkg/sql/colflow/colrpc/colrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,15 @@ func TestOutboxInboxMetadataPropagation(t *testing.T) {
return len(meta) == 1 && errors.Is(meta[0].Err, expectedError)
},
test: func(ctx context.Context, inbox *Inbox) []execinfrapb.ProducerMetadata {
defer func() {
// Make sure that the error is not propagated for the second
// time.
//
// We still need to drain to simulate what happens in
// production - there, the consumer of the inbox would
// transition into draining upon receiving the error.
require.True(t, len(inbox.DrainMeta()) == 0)
}()
for {
var b coldata.Batch
if err := colexecerror.CatchVectorizedRuntimeError(func() {
Expand Down
36 changes: 28 additions & 8 deletions pkg/sql/colflow/colrpc/inbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,13 +328,17 @@ func (i *Inbox) Next() coldata.Batch {
return coldata.ZeroBatch
}

var ungracefulStreamTermination bool
defer func() {
// Catch any panics that occur and close the Inbox in order to not leak
// the goroutine listening for context cancellation. The Inbox must
// still be closed during normal termination.
if panicObj := recover(); panicObj != nil {
// Only close the Inbox here in case of an ungraceful termination.
i.close()
if ungracefulStreamTermination {
// Only close the Inbox here in case of an ungraceful
// termination.
i.close()
}
err := logcrash.PanicAsError(0, panicObj)
log.VEventf(i.Ctx, 1, "Inbox encountered an error in Next: %v", err)
// Note that here we use InternalError to propagate the error
Expand Down Expand Up @@ -363,21 +367,37 @@ func (i *Inbox) Next() coldata.Batch {
// to handle it.
err = pgerror.Wrap(err, pgcode.InternalConnectionFailure, "inbox communication error")
i.errCh <- err
ungracefulStreamTermination = true
colexecerror.ExpectedError(err)
}
if len(m.Data.Metadata) != 0 {
// If an error was encountered, it needs to be propagated
// immediately. All other metadata will simply be buffered and
// returned in DrainMeta.
var receivedErr error
for _, rpm := range m.Data.Metadata {
meta, ok := execinfrapb.RemoteProducerMetaToLocalMeta(i.Ctx, rpm)
if !ok {
continue
}
if meta.Err != nil {
// If an error was encountered, it needs to be propagated
// immediately. All other metadata will simply be buffered
// and returned in DrainMeta.
colexecerror.ExpectedError(meta.Err)
if meta.Err != nil && receivedErr == nil {
receivedErr = meta.Err
} else {
// Note that if multiple errors are sent in a single
// message, then we'll propagate the first one right away
// (via a panic below) and will buffer the rest to be
// returned in DrainMeta. The caller will catch the panic
// and will transition to draining, so this all works out.
//
// We choose this way of handling multiple errors rather
// than something like errors.CombineErrors() since we want
// to keep errors unchanged (e.g. roachpb.ErrPriority() will
// be called on each error in the DistSQLReceiver).
i.bufferedMeta = append(i.bufferedMeta, meta)
}
i.bufferedMeta = append(i.bufferedMeta, meta)
}
if receivedErr != nil {
colexecerror.ExpectedError(receivedErr)
}
// Continue until we get the next batch or EOF.
continue
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/colflow/colrpc/inbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,10 @@ func TestInboxNextPanicDoesntLeakGoroutines(t *testing.T) {
m := &execinfrapb.ProducerMessage{}
m.Data.RawBytes = []byte("garbage")

// Simulate the client (outbox) that sends only a single piece of metadata.
go func() {
_ = rpcLayer.client.Send(m)
_ = rpcLayer.client.CloseSend()
}()

// inbox.Next should panic given that the deserializer will encounter garbage
Expand All @@ -150,6 +152,10 @@ func TestInboxNextPanicDoesntLeakGoroutines(t *testing.T) {
inbox.Next()
})

// Upon catching the panic and converting it into an error, the caller
// transitions to draining.
inbox.DrainMeta()

// We require no error from the stream handler as nothing was canceled. The
// panic is bubbled up through the Next chain on the Inbox's host.
require.NoError(t, <-streamHandlerErrCh)
Expand Down
113 changes: 113 additions & 0 deletions pkg/sql/colflow/draining_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package colflow_test

import (
"context"
"math"
"strings"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/stretchr/testify/require"
)

// TestDrainingAfterRemoteError verifies that the draining is fully completed if
// an error occurs on a remote node. The verification is done by checking that
// the trace from a distributed query contains spans of the remote processors.
func TestDrainingAfterRemoteError(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

// Create a disk monitor for the temp storage only with 1 byte of space.
// This ensures that the query will run into "out of temporary storage"
// error.
diskMonitor := mon.NewMonitor(
"test-disk",
mon.DiskResource,
nil, /* curCount */
nil, /* maxHist */
-1, /* increment: use default block size */
math.MaxInt64,
cluster.MakeTestingClusterSettings(),
)
diskMonitor.Start(ctx, nil /* pool */, mon.MakeStandaloneBudget(1))

// Set up a two node cluster.
tempStorageConfig := base.TempStorageConfig{InMemory: true, Mon: diskMonitor}
args := base.TestClusterArgs{
ServerArgs: base.TestServerArgs{TempStorageConfig: tempStorageConfig},
ReplicationMode: base.ReplicationManual,
}
tc := testcluster.StartTestCluster(t, 2 /* nodes */, args)
defer tc.Stopper().Stop(ctx)

// Create two tables, one with small values, and another with large rows.
// Relocate the range for the small table to node 2.
conn := tc.ServerConn(0)
sqlDB := sqlutils.MakeSQLRunner(conn)
sqlDB.Exec(t, "CREATE TABLE small (k INT PRIMARY KEY);")
sqlDB.Exec(t, "INSERT INTO small SELECT generate_series(1, 100);")
sqlDB.Exec(t, "ANALYZE small;")
sqlDB.Exec(t, "ALTER TABLE small EXPERIMENTAL_RELOCATE VALUES (ARRAY[2], 2)")
sqlDB.Exec(t, "SELECT count(*) FROM small;")
sqlDB.Exec(t, "CREATE TABLE large (k INT PRIMARY KEY, v STRING);")
sqlDB.Exec(t, "INSERT INTO large SELECT generate_series(1, 100), repeat('a', 100000);")
sqlDB.Exec(t, "ANALYZE large;")

// Make sure that the query is fully distributed (i.e. all execution happens
// on node 2).
sqlDB.Exec(t, "SET distsql = always;")

// Sanity check that, indeed, node 2 is part of the physical plan.
rows, err := conn.Query("EXPLAIN (VEC) SELECT sum(length(v)) FROM large, small WHERE small.k = large.k GROUP BY large.k;")
require.NoError(t, err)
defer func() {
require.NoError(t, rows.Close())
}()
var foundNode2 bool
for rows.Next() {
var line string
require.NoError(t, rows.Scan(&line))
if strings.Contains(line, "Node 2") {
foundNode2 = true
}
}
require.True(t, foundNode2, "expected that most of the work is done on node 2")

// Lower the workmem setting so that the join reader gets a memory error
// first, followed by the temp disk storage error.
sqlDB.Exec(t, "SET distsql_workmem = '8MiB';")
// Enable the tracing since we'll check that it contains spans for the join
// reader.
sqlDB.Exec(t, "SET tracing = on;")

// Perform a query that uses the join reader when ordering has to be
// maintained. Ensure that it encounters the error that we expect.
sqlDB.ExpectErr(t, ".*test-disk.*", "SELECT sum(length(v)) FROM large, small WHERE small.k = large.k GROUP BY large.k;")
sqlDB.Exec(t, "SET tracing = off;")

// Now, the crux of the test - verify that the spans for the join reader on
// the remote node have been imported into the trace on the gateway. If we
// see no such spans, then the draining wasn't fully performed.
row := conn.QueryRow(`SELECT count(*) FROM [SHOW TRACE FOR SESSION] WHERE operation = 'join reader'`)
var count int
require.NoError(t, row.Scan(&count))
require.True(t, count > 0, "expected to find some spans for join reader in the trace")
}

0 comments on commit 773d9ca

Please sign in to comment.