Skip to content

Commit

Permalink
colrpc: enhance warnings from the outbox
Browse files Browse the repository at this point in the history
This commit marks several string constants as "safe" from the
redactability perspective so that the warnings logged by the outboxes
are more helpful. Additionally, several minor nits around error
formatting are addressed.

Release note: None
  • Loading branch information
yuzefovich committed Aug 13, 2021
1 parent 8adbb79 commit 9ac3f70
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 14 deletions.
1 change: 1 addition & 0 deletions pkg/sql/colexec/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ go_library(
"//pkg/util/tracing",
"@com_github_cockroachdb_apd_v2//:apd", # keep
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
"@com_github_marusama_semaphore//:semaphore",
],
)
Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,9 +555,9 @@ func (r opResult) createAndWrapRowSource(
if args.ProcessorConstructor == nil {
return errors.New("processorConstructor is nil")
}
log.VEventf(ctx, 1, "planning a row-execution processor in the vectorized flow because %v", causeToWrap)
log.VEventf(ctx, 1, "planning a row-execution processor in the vectorized flow: %v", causeToWrap)
if err := canWrap(flowCtx.EvalCtx.SessionData.VectorizeMode, spec); err != nil {
log.VEventf(ctx, 1, "planning a wrapped processor failed because %v", err)
log.VEventf(ctx, 1, "planning a wrapped processor failed: %v", err)
// Return the original error for why we don't support this spec
// natively since it is more interesting.
return causeToWrap
Expand Down Expand Up @@ -698,7 +698,7 @@ func NewColOperator(
}
result.OpMonitors = result.OpMonitors[:0]
if returnedErr != nil {
log.VEventf(ctx, 1, "vectorized planning failed with %v", returnedErr)
log.VEventf(ctx, 1, "vectorized planning failed: %v", returnedErr)
}
}
if panicErr != nil {
Expand Down Expand Up @@ -1522,7 +1522,7 @@ func NewColOperator(
streamingAllocator, r.Root, i, castedIdx, actual, expected, evalCtx,
)
if err != nil {
return r, errors.AssertionFailedf("unexpectedly couldn't plan a cast although IsCastSupported returned true: %v", err)
return r, errors.NewAssertionErrorWithWrappedErrf(err, "unexpectedly couldn't plan a cast although IsCastSupported returned true")
}
projection[i] = uint32(castedIdx)
typesWithCasts = append(typesWithCasts, expected)
Expand Down
7 changes: 4 additions & 3 deletions pkg/sql/colexec/hash_based_partitioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
"github.com/marusama/semaphore"
)

Expand Down Expand Up @@ -120,7 +121,7 @@ type hashBasedPartitioner struct {
colexecop.CloserHelper

unlimitedAllocator *colmem.Allocator
name string
name redact.SafeString
state hashBasedPartitionerState
inputs []colexecop.Operator
inputTypes [][]*types.T
Expand Down Expand Up @@ -209,7 +210,7 @@ func newHashBasedPartitioner(
unlimitedAllocator *colmem.Allocator,
flowCtx *execinfra.FlowCtx,
args *colexecargs.NewColOperatorArgs,
name string,
name redact.SafeString,
inputs []colexecop.Operator,
inputTypes [][]*types.T,
hashCols [][]uint32,
Expand Down Expand Up @@ -536,7 +537,7 @@ StateChanged:
if partitionInfo.memSize <= op.maxPartitionSizeToProcessUsingMain {
log.VEventf(op.Ctx, 2,
`%s processes partition with idx %d of size %s using the "main" strategy`,
op.name, partitionIdx, humanizeutil.IBytes(partitionInfo.memSize),
op.name, partitionIdx, redact.SafeString(humanizeutil.IBytes(partitionInfo.memSize)),
)
for i := range op.partitionedInputs {
op.partitionedInputs[i].partitionIdx = partitionIdx
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colflow/colrpc/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ go_library(
"@com_github_apache_arrow_go_arrow//array",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_redact//:redact",
],
)

Expand Down
15 changes: 10 additions & 5 deletions pkg/sql/colflow/colrpc/outbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package colrpc
import (
"bytes"
"context"
"fmt"
"io"
"sync/atomic"
"time"
Expand All @@ -32,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"github.com/cockroachdb/redact"
)

// flowStreamClient is a utility interface used to mock out the RPC layer.
Expand Down Expand Up @@ -222,7 +222,10 @@ func (o *Outbox) Run(
// called, for all other errors flowCtxCancel is. The given error is logged with
// the associated opName.
func handleStreamErr(
ctx context.Context, opName string, err error, flowCtxCancel, outboxCtxCancel context.CancelFunc,
ctx context.Context,
opName redact.SafeString,
err error,
flowCtxCancel, outboxCtxCancel context.CancelFunc,
) {
if err == io.EOF {
if log.V(1) {
Expand All @@ -235,7 +238,7 @@ func handleStreamErr(
}
}

func (o *Outbox) moveToDraining(ctx context.Context, reason string) {
func (o *Outbox) moveToDraining(ctx context.Context, reason redact.RedactableString) {
if atomic.CompareAndSwapUint32(&o.draining, 0, 1) {
log.VEventf(ctx, 2, "Outbox moved to draining (%s)", reason)
}
Expand Down Expand Up @@ -406,9 +409,11 @@ func (o *Outbox) runWithStream(

terminatedGracefully, errToSend := o.sendBatches(ctx, stream, flowCtxCancel, outboxCtxCancel)
if terminatedGracefully || errToSend != nil {
reason := "terminated gracefully"
var reason redact.RedactableString
if errToSend != nil {
reason = fmt.Sprintf("encountered error when sending batches: %v", errToSend)
reason = redact.Sprintf("encountered error when sending batches: %v", errToSend)
} else {
reason = redact.Sprint(redact.SafeString("terminated gracefully"))
}
o.moveToDraining(ctx, reason)
if err := o.sendMetadata(ctx, stream, errToSend); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func (f *vectorizedFlow) GetPath(ctx context.Context) string {
f.tempStorage.path = filepath.Join(f.Cfg.TempStoragePath, tempDirName)
log.VEventf(ctx, 1, "flow %s spilled to disk, stack trace: %s", f.ID, util.GetSmallTrace(2))
if err := f.Cfg.TempFS.MkdirAll(f.tempStorage.path); err != nil {
colexecerror.InternalError(errors.Errorf("unable to create temporary storage directory: %v", err))
colexecerror.InternalError(errors.Wrap(err, "unable to create temporary storage directory"))
}
return f.tempStorage.path
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/distsql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (ds *ServerImpl) Drain(
ctx context.Context, flowDrainWait time.Duration, reporter func(int, redact.SafeString),
) {
if err := ds.setDraining(true); err != nil {
log.Warningf(ctx, "unable to gossip distsql draining state: %s", err)
log.Warningf(ctx, "unable to gossip distsql draining state: %v", err)
}

flowWait := flowDrainWait
Expand Down

0 comments on commit 9ac3f70

Please sign in to comment.