Skip to content

Commit

Permalink
Merge pull request cockroachdb#114152 from miretskiy/backport22.2-106421
Browse files Browse the repository at this point in the history
release-22.2: changefeedccl: Improve error handling
  • Loading branch information
miretskiy authored Nov 11, 2023
2 parents da4a3b1 + 8dfabf2 commit 0cfb560
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 0 deletions.
2 changes: 2 additions & 0 deletions pkg/ccl/changefeedccl/cdcevent/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ go_library(
"//pkg/util/encoding",
"//pkg/util/hlc",
"//pkg/util/iterutil",
"//pkg/util/log",
"//pkg/util/protoutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
],
Expand Down
28 changes: 28 additions & 0 deletions pkg/ccl/changefeedccl/cdcevent/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"fmt"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
Expand All @@ -26,6 +27,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/cache"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/iterutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)
Expand Down Expand Up @@ -400,6 +403,31 @@ func NewEventDecoder(
// DecodeKV decodes key value at specified schema timestamp.
func (d *eventDecoder) DecodeKV(
ctx context.Context, kv roachpb.KeyValue, schemaTS hlc.Timestamp,
) (Row, error) {
r, err := d.decodeKV(ctx, kv, schemaTS)
if err == nil {
return r, nil
}

// Failure to decode roachpb.KeyValue we received from rangefeed is pretty bad.
// At this point, we only have guesses why this happened (schema change? data corruption?).
// Retrying this error however is likely to produce exactly the same result.
// So, be loud and treat this error as a terminal changefeed error.
kvBytes, marshalErr := protoutil.Marshal(&kv)
if marshalErr != nil {
// That's mighty surprising. Just shove error message into kvBytes.
kvBytes = []byte(fmt.Sprintf("marshalErr<%s>", marshalErr.Error()))
}
err = changefeedbase.WithTerminalError(errors.Wrapf(err,
"error decoding key %s@%s (hex_kv: %x)",
keys.PrettyPrint(nil, kv.Key), kv.Value.Timestamp, kvBytes))
log.Errorf(ctx, "terminal error decoding KV: %v", err)
return Row{}, err
}

// decodeKV decodes key value at specified schema timestamp.
func (d *eventDecoder) decodeKV(
ctx context.Context, kv roachpb.KeyValue, schemaTS hlc.Timestamp,
) (Row, error) {
if err := d.initForKey(ctx, kv.Key, schemaTS); err != nil {
return Row{}, err
Expand Down

0 comments on commit 0cfb560

Please sign in to comment.