Skip to content

Commit

Permalink
flowinfra: preserve flowRetryableError correctly across network
Browse files Browse the repository at this point in the history
This commit makes it so that `flowinfra.flowRetryableError` type is
correctly preserved across network. Previously, if the error originated
on the remote node, the coordinator node would receive
`errbase.opaqueLeaf` error since the decoder method wasn't registered
for the error, now the error is preserved correctly.

Release note: None
  • Loading branch information
yuzefovich committed Aug 3, 2022
1 parent 6e6a053 commit 3b6c868
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 3 deletions.
1 change: 1 addition & 0 deletions pkg/sql/flowinfra/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ go_library(
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
"@com_github_gogo_protobuf//proto",
"@io_opentelemetry_go_otel//attribute",
],
)
Expand Down
18 changes: 15 additions & 3 deletions pkg/sql/flowinfra/flow_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ package flowinfra

import (
"context"
"fmt"
"sync"
"time"

Expand All @@ -25,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
"github.com/gogo/protobuf/proto"
)

// errNoInboundStreamConnection is the error propagated through the flow when
Expand Down Expand Up @@ -239,8 +239,20 @@ type flowRetryableError struct {
cause error
}

func (e *flowRetryableError) Error() string {
return fmt.Sprintf("flow retryable error: %+v", e.cause)
var _ errors.Wrapper = &flowRetryableError{}

func (e *flowRetryableError) Error() string { return e.cause.Error() }
func (e *flowRetryableError) Cause() error { return e.cause }
func (e *flowRetryableError) Unwrap() error { return e.Cause() }

func decodeFlowRetryableError(
_ context.Context, cause error, _ string, _ []string, _ proto.Message,
) error {
return &flowRetryableError{cause: cause}
}

func init() {
errors.RegisterWrapperDecoder(errors.GetTypeKey((*flowRetryableError)(nil)), decodeFlowRetryableError)
}

// IsFlowRetryableError returns true if an error represents a retryable
Expand Down
1 change: 1 addition & 0 deletions pkg/testutils/lint/lint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1175,6 +1175,7 @@ func TestLint(t *testing.T) {
":!spanconfig/errors.go",
":!roachpb/replica_unavailable_error.go",
":!roachpb/ambiguous_result_error.go",
":!sql/flowinfra/flow_registry.go",
":!sql/pgwire/pgerror/constraint_name.go",
":!sql/pgwire/pgerror/severity.go",
":!sql/pgwire/pgerror/with_candidate_code.go",
Expand Down

0 comments on commit 3b6c868

Please sign in to comment.