Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
65751: changefeedccl: Refactor errors and error wrapper event buffer. r=miretskiy a=miretskiy

This is a pure code move/refactoring to address comments from cockroachdb#65746:
  * Move errors to changefeedbase so that they are accessible.
  * Move error wrapper buffer to buffer.go
  * Unexport mem buffer.

Release Notes: None

Co-authored-by: Yevgeniy Miretskiy <[email protected]>
  • Loading branch information
craig[bot] and Yevgeniy Miretskiy committed Jun 2, 2021
2 parents 650e491 + f508eb3 commit eb046e1
Show file tree
Hide file tree
Showing 13 changed files with 71 additions and 68 deletions.
1 change: 0 additions & 1 deletion pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ go_library(
"changefeed_stmt.go",
"doc.go",
"encoder.go",
"errors.go",
"metrics.go",
"name.go",
"rowfetcher_cache.go",
Expand Down
19 changes: 9 additions & 10 deletions pkg/ccl/changefeedccl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,16 +216,15 @@ func createBenchmarkChangefeed(
}
_, withDiff := details.Opts[changefeedbase.OptDiff]
kvfeedCfg := kvfeed.Config{
Settings: settings,
DB: s.DB(),
Clock: feedClock,
Gossip: gossip.MakeOptionalGossip(s.GossipI().(*gossip.Gossip)),
Spans: spans,
Targets: details.Targets,
Sink: buf,
EventBufferFactory: func() kvfeed.EventBuffer {
return kvfeed.MakeMemBuffer(mm.MakeBoundAccount(), &metrics.KVFeedMetrics)
},
Settings: settings,
DB: s.DB(),
Clock: feedClock,
Gossip: gossip.MakeOptionalGossip(s.GossipI().(*gossip.Gossip)),
Spans: spans,
Targets: details.Targets,
Sink: buf,
Metrics: &metrics.KVFeedMetrics,
MM: mm,
InitialHighWater: initialHighWater,
WithDiff: withDiff,
NeedsInitialScan: needsInitialScan,
Expand Down
42 changes: 6 additions & 36 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func (ca *changeAggregator) Start(ctx context.Context) {
ca.spec.User(), kvFeedMemMon.MakeBoundAccount(), ca.spec.JobID)

if err != nil {
err = MarkRetryableError(err)
err = changefeedbase.MarkRetryableError(err)
// Early abort in the case that there is an error creating the sink.
ca.MoveToDraining(err)
ca.cancel()
Expand Down Expand Up @@ -355,33 +355,6 @@ func (f doNothingSchemaFeed) Pop(
return nil, nil
}

type errorWrapperEventBuffer struct {
kvfeed.EventBuffer
}

func (e errorWrapperEventBuffer) AddKV(
ctx context.Context, kv roachpb.KeyValue, prevVal roachpb.Value, backfillTimestamp hlc.Timestamp,
) error {
if err := e.EventBuffer.AddKV(ctx, kv, prevVal, backfillTimestamp); err != nil {
return MarkRetryableError(err)
}
return nil
}

func (e errorWrapperEventBuffer) AddResolved(
ctx context.Context,
span roachpb.Span,
ts hlc.Timestamp,
boundaryType jobspb.ResolvedSpan_BoundaryType,
) error {
if err := e.EventBuffer.AddResolved(ctx, span, ts, boundaryType); err != nil {
return MarkRetryableError(err)
}
return nil
}

var _ kvfeed.EventBuffer = (*errorWrapperEventBuffer)(nil)

func makeKVFeedCfg(
cfg *execinfra.ServerConfig,
mm *mon.BytesMonitor,
Expand All @@ -397,11 +370,7 @@ func makeKVFeedCfg(
spec.Feed.Opts[changefeedbase.OptSchemaChangePolicy])
_, withDiff := spec.Feed.Opts[changefeedbase.OptDiff]
initialHighWater, needsInitialScan := getKVFeedInitialParameters(spec)
bf := func() kvfeed.EventBuffer {
return &errorWrapperEventBuffer{
EventBuffer: kvfeed.MakeMemBuffer(mm.MakeBoundAccount(), &metrics.KVFeedMetrics),
}
}

kvfeedCfg := kvfeed.Config{
Sink: buf,
Settings: cfg.Settings,
Expand All @@ -411,7 +380,8 @@ func makeKVFeedCfg(
Gossip: cfg.Gossip,
Spans: spans,
Targets: spec.Feed.Targets,
EventBufferFactory: bf,
Metrics: &metrics.KVFeedMetrics,
MM: mm,
InitialHighWater: initialHighWater,
WithDiff: withDiff,
NeedsInitialScan: needsInitialScan,
Expand Down Expand Up @@ -1052,7 +1022,7 @@ func (cf *changeFrontier) Start(ctx context.Context) {
cf.spec.User(), mm.MakeBoundAccount(), cf.spec.JobID)

if err != nil {
err = MarkRetryableError(err)
err = changefeedbase.MarkRetryableError(err)
cf.MoveToDraining(err)
return
}
Expand Down Expand Up @@ -1170,7 +1140,7 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad
if cf.EvalCtx.Settings.Version.IsActive(
cf.Ctx, clusterversion.ChangefeedsSupportPrimaryIndexChanges,
) {
err = MarkRetryableError(err)
err = changefeedbase.MarkRetryableError(err)
} else {
err = errors.Wrap(err, "primary key change occurred")
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func changefeedPlanHook(
if err != nil {
telemetry.Count(`changefeed.core.error`)
}
return MaybeStripRetryableErrorMarker(err)
return changefeedbase.MaybeStripRetryableErrorMarker(err)
}

// Changefeeds are based on the Rangefeed abstraction, which requires the
Expand Down Expand Up @@ -322,7 +322,7 @@ func changefeedPlanHook(
jobspb.InvalidJobID,
)
if err != nil {
return MaybeStripRetryableErrorMarker(err)
return changefeedbase.MaybeStripRetryableErrorMarker(err)
}
if err := canarySink.Close(); err != nil {
return err
Expand Down Expand Up @@ -607,7 +607,7 @@ func (b *changefeedResumer) Resume(ctx context.Context, execCtx interface{}) err
}
}

if !IsRetryableError(err) {
if !changefeedbase.IsRetryableError(err) {
if ctx.Err() != nil {
return ctx.Err()
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2016,7 +2016,7 @@ func TestChangefeedRetryableError(t *testing.T) {
knobs.BeforeEmitRow = func(_ context.Context) error {
switch atomic.LoadInt64(&failEmit) {
case 1:
return MarkRetryableError(fmt.Errorf("synthetic retryable error"))
return changefeedbase.MarkRetryableError(fmt.Errorf("synthetic retryable error"))
case 2:
return fmt.Errorf("synthetic terminal error")
default:
Expand Down Expand Up @@ -3302,7 +3302,7 @@ func TestChangefeedMemBufferCapacityErrorRetryable(t *testing.T) {
// This function is invoked form a different go routine -- and calling
// t.Fatal will likely deadlock the test.
distErrCh <- err
return MaybeStripRetryableErrorMarker(err)
return changefeedbase.MaybeStripRetryableErrorMarker(err)
}

sqlDB := sqlutils.MakeSQLRunner(db)
Expand Down Expand Up @@ -3350,7 +3350,7 @@ func TestChangefeedMemBufferCapacityErrorRetryable(t *testing.T) {

err := <-distErrCh
require.Regexp(t, `memory budget exceeded`, err)
require.True(t, IsRetryableError(err))
require.True(t, changefeedbase.IsRetryableError(err))
}
}

Expand Down Expand Up @@ -3443,7 +3443,7 @@ func TestChangefeedRestartDuringBackfill(t *testing.T) {
require.NoError(t, feedJob.Pause())

// Make extra sure that the zombie changefeed can't write any more data.
beforeEmitRowCh <- MarkRetryableError(errors.New(`nope don't write it`))
beforeEmitRowCh <- changefeedbase.MarkRetryableError(errors.New(`nope don't write it`))

// Insert some data that we should only see out of the changefeed after it
// re-runs the backfill.
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/changefeedccl/changefeedbase/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ go_library(
name = "changefeedbase",
srcs = [
"avro.go",
"errors.go",
"options.go",
"settings.go",
"validate.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase",
visibility = ["//visibility:public"],
deps = [
"//pkg/ccl/utilccl",
"//pkg/jobs/jobspb",
"//pkg/keys",
"//pkg/settings",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package changefeedccl
package changefeedbase

import (
"fmt"
Expand Down
32 changes: 29 additions & 3 deletions pkg/ccl/changefeedccl/kvfeed/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
Expand Down Expand Up @@ -231,9 +232,7 @@ type memBuffer struct {
}
}

// MakeMemBuffer returns an EventBuffer backed by memory, limited
// as specified by bound account.
func MakeMemBuffer(acc mon.BoundAccount, metrics *Metrics) EventBuffer {
func makeMemBuffer(acc mon.BoundAccount, metrics *Metrics) EventBuffer {
b := &memBuffer{
metrics: metrics,
signalCh: make(chan struct{}, 1),
Expand Down Expand Up @@ -363,3 +362,30 @@ func (b *memBuffer) getRow(ctx context.Context) (tree.Datums, error) {
}
}
}

type errorWrapperEventBuffer struct {
EventBuffer
}

func (e errorWrapperEventBuffer) AddKV(
ctx context.Context, kv roachpb.KeyValue, prevVal roachpb.Value, backfillTimestamp hlc.Timestamp,
) error {
if err := e.EventBuffer.AddKV(ctx, kv, prevVal, backfillTimestamp); err != nil {
return changefeedbase.MarkRetryableError(err)
}
return nil
}

func (e errorWrapperEventBuffer) AddResolved(
ctx context.Context,
span roachpb.Span,
ts hlc.Timestamp,
boundaryType jobspb.ResolvedSpan_BoundaryType,
) error {
if err := e.EventBuffer.AddResolved(ctx, span, ts, boundaryType); err != nil {
return changefeedbase.MarkRetryableError(err)
}
return nil
}

var _ EventBuffer = (*errorWrapperEventBuffer)(nil)
10 changes: 8 additions & 2 deletions pkg/ccl/changefeedccl/kvfeed/kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/span"
"github.com/cockroachdb/errors"
)
Expand All @@ -42,7 +43,8 @@ type Config struct {
Spans []roachpb.Span
Targets jobspb.ChangefeedTargets
Sink EventBufferWriter
EventBufferFactory func() EventBuffer
Metrics *Metrics
MM *mon.BytesMonitor
WithDiff bool
SchemaChangeEvents changefeedbase.SchemaChangeEventClass
SchemaChangePolicy changefeedbase.SchemaChangePolicy
Expand Down Expand Up @@ -78,14 +80,18 @@ func Run(ctx context.Context, cfg Config) error {
pff = rangefeedFactory(distSender.RangeFeed)
}

bf := func() EventBuffer {
return &errorWrapperEventBuffer{makeMemBuffer(cfg.MM.MakeBoundAccount(), cfg.Metrics)}
}

f := newKVFeed(
cfg.Sink, cfg.Spans,
cfg.SchemaChangeEvents, cfg.SchemaChangePolicy,
cfg.NeedsInitialScan, cfg.WithDiff,
cfg.InitialHighWater,
cfg.Codec,
cfg.SchemaFeed,
sc, pff, cfg.EventBufferFactory)
sc, pff, bf)

g := ctxgroup.WithContext(ctx)
g.GoCtx(cfg.SchemaFeed.Run)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func TestKVFeed(t *testing.T) {
)
metrics := MakeMetrics(time.Minute)
bufferFactory := func() EventBuffer {
return MakeMemBuffer(mm.MakeBoundAccount(), &metrics)
return makeMemBuffer(mm.MakeBoundAccount(), &metrics)
}
scans := make(chan physicalConfig)
sf := scannerFunc(func(ctx context.Context, sink EventBufferWriter, cfg physicalConfig) error {
Expand Down
5 changes: 3 additions & 2 deletions pkg/ccl/changefeedccl/rowfetcher_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package changefeedccl
import (
"context"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -86,7 +87,7 @@ func (c *rowFetcherCache) TableDescForKey(
if err != nil {
// Manager can return all kinds of errors during chaos, but based on
// its usage, none of them should ever be terminal.
return nil, MarkRetryableError(err)
return nil, changefeedbase.MarkRetryableError(err)
}
tableDesc = desc.Desc().(catalog.TableDescriptor)
// Immediately release the lease, since we only need it for the exact
Expand All @@ -110,7 +111,7 @@ func (c *rowFetcherCache) TableDescForKey(
}); err != nil {
// Manager can return all kinds of errors during chaos, but based on
// its usage, none of them should ever be terminal.
return nil, MarkRetryableError(err)
return nil, changefeedbase.MarkRetryableError(err)
}
// Immediately release the lease, since we only need it for the exact
// timestamp requested.
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/schema_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func (r *confluentSchemaRegistry) doWithRetry(ctx context.Context, fn func() err
}
log.VInfof(ctx, 2, "retrying schema registry operation: %s", err.Error())
}
return MarkRetryableError(err)
return changefeedbase.MarkRetryableError(err)
}

func gracefulClose(ctx context.Context, toClose io.ReadCloser) {
Expand Down
8 changes: 4 additions & 4 deletions pkg/ccl/changefeedccl/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (s errorWrapperSink) EmitRow(
ctx context.Context, topicDescr TopicDescriptor, key, value []byte, updated hlc.Timestamp,
) error {
if err := s.wrapped.EmitRow(ctx, topicDescr, key, value, updated); err != nil {
return MarkRetryableError(err)
return changefeedbase.MarkRetryableError(err)
}
return nil
}
Expand All @@ -164,23 +164,23 @@ func (s errorWrapperSink) EmitResolvedTimestamp(
ctx context.Context, encoder Encoder, resolved hlc.Timestamp,
) error {
if err := s.wrapped.EmitResolvedTimestamp(ctx, encoder, resolved); err != nil {
return MarkRetryableError(err)
return changefeedbase.MarkRetryableError(err)
}
return nil
}

// Flush implements Sink interface.
func (s errorWrapperSink) Flush(ctx context.Context) error {
if err := s.wrapped.Flush(ctx); err != nil {
return MarkRetryableError(err)
return changefeedbase.MarkRetryableError(err)
}
return nil
}

// Close implements Sink interface.
func (s errorWrapperSink) Close() error {
if err := s.wrapped.Close(); err != nil {
return MarkRetryableError(err)
return changefeedbase.MarkRetryableError(err)
}
return nil
}
Expand Down

0 comments on commit eb046e1

Please sign in to comment.