Skip to content

Commit

Permalink
bulkio: Implement CREATE REPLICATION STREAM.
Browse files Browse the repository at this point in the history
Initial implementation of `CREATE REPLICATION STREAM`.

The implementation uses changefeed distflow processing which has
been refactor to accomodate this new use case.

The replication stream expects to receive raw KVs.  This is
accomplished by implementing native encoding in changefeeds:
this encoder emits raw bytes representing keys and values.

The plan hook does a "core" style changefeeds -- that is, it
expects the client to be connected to receive changed rows.

Follow on work will implement replication stream resumer
as well as replication stream sinks.

Release Notes: None
  • Loading branch information
Yevgeniy Miretskiy committed Feb 17, 2021
1 parent 0eecc47 commit fd9cde8
Show file tree
Hide file tree
Showing 19 changed files with 948 additions and 169 deletions.
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ ALL_TESTS = [
"//pkg/ccl/streamingccl/streamclient:streamclient_test",
"//pkg/ccl/streamingccl/streamingest:streamingest_test",
"//pkg/ccl/streamingccl/streamingutils:streamingutils_test",
"//pkg/ccl/streamingccl/streamproducer:streamproducer_test",
"//pkg/ccl/utilccl/sampledataccl:sampledataccl_test",
"//pkg/ccl/utilccl:utilccl_test",
"//pkg/ccl/workloadccl/allccl:allccl_test",
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ go_library(
"//pkg/ccl/storageccl/engineccl",
"//pkg/ccl/streamingccl/streamingest",
"//pkg/ccl/streamingccl/streamingutils",
"//pkg/ccl/streamingccl/streamproducer",
"//pkg/ccl/utilccl",
"//pkg/ccl/workloadccl",
],
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/ccl_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
_ "github.com/cockroachdb/cockroach/pkg/ccl/storageccl/engineccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamingest"
_ "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamingutils"
_ "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamproducer"
_ "github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/workloadccl"
)
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ go_library(
"//pkg/base",
"//pkg/ccl/backupccl/backupbase",
"//pkg/ccl/changefeedccl/changefeedbase",
"//pkg/ccl/changefeedccl/changefeeddist",
"//pkg/ccl/changefeedccl/kvfeed",
"//pkg/ccl/utilccl",
"//pkg/docs",
Expand Down Expand Up @@ -57,7 +58,6 @@ go_library(
"//pkg/sql/flowinfra",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/physicalplan",
"//pkg/sql/privilege",
"//pkg/sql/roleoption",
"//pkg/sql/row",
Expand Down
137 changes: 3 additions & 134 deletions pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,15 @@ package changefeedccl
import (
"context"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeeddist"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

Expand All @@ -35,13 +33,6 @@ const (
changeFrontierProcName = `changefntr`
)

var changefeedResultTypes = []*types.T{
types.Bytes, // resolved span
types.String, // topic
types.Bytes, // key
types.Bytes, // value
}

// distChangefeedFlow plans and runs a distributed changefeed.
//
// One or more ChangeAggregator processors watch table data for changes. These
Expand Down Expand Up @@ -105,96 +96,8 @@ func distChangefeedFlow(
return err
}

// Changefeed flows handle transactional consistency themselves.
var noTxn *kv.Txn

dsp := execCtx.DistSQLPlanner()
evalCtx := execCtx.ExtendedEvalContext()
planCtx := dsp.NewPlanningCtx(ctx, evalCtx, nil /* planner */, noTxn, execCfg.Codec.ForSystemTenant() /* distribute */)

var spanPartitions []sql.SpanPartition
if details.SinkURI == `` {
// Sinkless feeds get one ChangeAggregator on the gateway.
spanPartitions = []sql.SpanPartition{{Node: dsp.GatewayID(), Spans: trackedSpans}}
} else {
// All other feeds get a ChangeAggregator local on the leaseholder.
spanPartitions, err = dsp.PartitionSpans(planCtx, trackedSpans)
if err != nil {
return err
}
}

corePlacement := make([]physicalplan.ProcessorCorePlacement, len(spanPartitions))
for i, sp := range spanPartitions {
// TODO(dan): Merge these watches with the span-level resolved
// timestamps from the job progress.
watches := make([]execinfrapb.ChangeAggregatorSpec_Watch, len(sp.Spans))
for watchIdx, nodeSpan := range sp.Spans {
watches[watchIdx] = execinfrapb.ChangeAggregatorSpec_Watch{
Span: nodeSpan,
InitialResolved: initialHighWater,
}
}

corePlacement[i].NodeID = sp.Node
corePlacement[i].Core.ChangeAggregator = &execinfrapb.ChangeAggregatorSpec{
Watches: watches,
Feed: details,
UserProto: execCtx.User().EncodeProto(),
}
}
// NB: This SpanFrontier processor depends on the set of tracked spans being
// static. Currently there is no way for them to change after the changefeed
// is created, even if it is paused and unpaused, but #28982 describes some
// ways that this might happen in the future.
changeFrontierSpec := execinfrapb.ChangeFrontierSpec{
TrackedSpans: trackedSpans,
Feed: details,
JobID: jobID,
UserProto: execCtx.User().EncodeProto(),
}

p := planCtx.NewPhysicalPlan()
p.AddNoInputStage(corePlacement, execinfrapb.PostProcessSpec{}, changefeedResultTypes, execinfrapb.Ordering{})
p.AddSingleGroupStage(
dsp.GatewayID(),
execinfrapb.ProcessorCoreUnion{ChangeFrontier: &changeFrontierSpec},
execinfrapb.PostProcessSpec{},
changefeedResultTypes,
)

p.PlanToStreamColMap = []int{1, 2, 3}
dsp.FinalizePlan(planCtx, p)

resultRows := makeChangefeedResultWriter(resultsCh)
recv := sql.MakeDistSQLReceiver(
ctx,
resultRows,
tree.Rows,
execCfg.RangeDescriptorCache,
noTxn,
nil, /* clockUpdater */
evalCtx.Tracing,
execCfg.ContentionRegistry,
)
defer recv.Release()

var finishedSetupFn func()
if details.SinkURI != `` {
// We abuse the job's results channel to make CREATE CHANGEFEED wait for
// this before returning to the user to ensure the setup went okay. Job
// resumption doesn't have the same hack, but at the moment ignores
// results and so is currently okay. Return nil instead of anything
// meaningful so that if we start doing anything with the results
// returned by resumed jobs, then it breaks instead of returning
// nonsense.
finishedSetupFn = func() { resultsCh <- tree.Datums(nil) }
}

// Copy the evalCtx, as dsp.Run() might change it.
evalCtxCopy := *evalCtx
dsp.Run(planCtx, noTxn, p, recv, &evalCtxCopy, finishedSetupFn)()
return resultRows.Err()
return changefeeddist.StartDistChangefeed(
ctx, execCtx, jobID, details, trackedSpans, initialHighWater, resultsCh)
}

func fetchSpansForTargets(
Expand All @@ -220,37 +123,3 @@ func fetchSpansForTargets(
})
return spans, err
}

// changefeedResultWriter implements the `rowexec.resultWriter` that sends
// the received rows back over the given channel.
type changefeedResultWriter struct {
rowsCh chan<- tree.Datums
rowsAffected int
err error
}

func makeChangefeedResultWriter(rowsCh chan<- tree.Datums) *changefeedResultWriter {
return &changefeedResultWriter{rowsCh: rowsCh}
}

func (w *changefeedResultWriter) AddRow(ctx context.Context, row tree.Datums) error {
// Copy the row because it's not guaranteed to exist after this function
// returns.
row = append(tree.Datums(nil), row...)

select {
case <-ctx.Done():
return ctx.Err()
case w.rowsCh <- row:
return nil
}
}
func (w *changefeedResultWriter) IncrementRowsAffected(n int) {
w.rowsAffected += n
}
func (w *changefeedResultWriter) SetError(err error) {
w.err = err
}
func (w *changefeedResultWriter) Err() error {
return w.err
}
71 changes: 55 additions & 16 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeeddist"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvfeed"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
Expand All @@ -23,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/lease"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
Expand Down Expand Up @@ -120,17 +122,6 @@ func (o *changeAggregatorLowerBoundOracle) inclusiveLowerBoundTS() hlc.Timestamp
var _ execinfra.Processor = &changeAggregator{}
var _ execinfra.RowSource = &changeAggregator{}

// Default frequency to flush sink.
// See comment in newChangeAggregatorProcessor for explanation on the value.
var defaultFlushFrequency = 5 * time.Second

// TestingSetDefaultFlushFrequency changes defaultFlushFrequency for tests.
// Returns function to restore flush frequency to its original value.
func TestingSetDefaultFlushFrequency(f time.Duration) func() {
defaultFlushFrequency = f
return func() { defaultFlushFrequency = 5 * time.Second }
}

func newChangeAggregatorProcessor(
flowCtx *execinfra.FlowCtx,
processorID int32,
Expand All @@ -149,7 +140,7 @@ func newChangeAggregatorProcessor(
if err := ca.Init(
ca,
post,
changefeedResultTypes,
changefeeddist.ChangefeedResultTypes,
flowCtx,
processorID,
output,
Expand Down Expand Up @@ -192,7 +183,7 @@ func newChangeAggregatorProcessor(
return nil, err
}
} else {
ca.flushFrequency = defaultFlushFrequency
ca.flushFrequency = changefeedbase.DefaultFlushFrequency
}
return ca, nil
}
Expand Down Expand Up @@ -257,8 +248,14 @@ func (ca *changeAggregator) Start(ctx context.Context) context.Context {
cfg := ca.flowCtx.Cfg

ca.eventProducer = &bufEventProducer{buf}
ca.eventConsumer = newKVEventToRowConsumer(ctx, cfg, ca.spanFrontier, kvfeedCfg.InitialHighWater,
ca.sink, ca.encoder, ca.spec.Feed, ca.knobs)

if ca.spec.Feed.Opts[changefeedbase.OptFormat] == string(changefeedbase.OptFormatNative) {
ca.eventConsumer = newNativeKVConsumer(ca.sink)
} else {
ca.eventConsumer = newKVEventToRowConsumer(ctx, cfg, ca.spanFrontier, kvfeedCfg.InitialHighWater,
ca.sink, ca.encoder, ca.spec.Feed, ca.knobs)
}

ca.startKVFeed(ctx, kvfeedCfg)

return ctx
Expand Down Expand Up @@ -729,6 +726,47 @@ func (c *kvEventToRowConsumer) eventToRow(
return r, nil
}

type nativeKVConsumer struct {
sink Sink
}

var _ kvEventConsumer = &nativeKVConsumer{}

func newNativeKVConsumer(sink Sink) kvEventConsumer {
return &nativeKVConsumer{sink: sink}
}

type noTopic struct{}

var _ TopicDescriptor = &noTopic{}

func (n noTopic) GetName() string {
return ""
}

func (n noTopic) GetID() descpb.ID {
return 0
}

func (n noTopic) GetVersion() descpb.DescriptorVersion {
return 0
}

// ConsumeEvent implements kvEventConsumer interface.
func (c *nativeKVConsumer) ConsumeEvent(ctx context.Context, event kvfeed.Event) error {
if event.Type() != kvfeed.KVEvent {
return errors.AssertionFailedf("expected kv event, got %v", event.Type())
}
keyBytes := []byte(event.KV().Key)
val := event.KV().Value
valBytes, err := protoutil.Marshal(&val)
if err != nil {
return err
}

return c.sink.EmitRow(ctx, &noTopic{}, keyBytes, valBytes, val.Timestamp)
}

const (
emitAllResolved = 0
emitNoResolved = -1
Expand Down Expand Up @@ -887,6 +925,7 @@ func (cf *changeFrontier) Start(ctx context.Context) context.Context {
// The job registry has a set of metrics used to monitor the various jobs it
// runs. They're all stored as the `metric.Struct` interface because of
// dependency cycles.
// TODO(yevgeniy): Figure out how to inject replication stream metrics.
cf.metrics = cf.flowCtx.Cfg.JobRegistry.MetricsStruct().Changefeed.(*Metrics)
cf.sink = makeMetricsSink(cf.metrics, cf.sink)
cf.sink = &errorWrapperSink{wrapped: cf.sink}
Expand Down Expand Up @@ -1022,7 +1061,7 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad
}

func (cf *changeFrontier) noteResolvedSpan(d rowenc.EncDatum) error {
if err := d.EnsureDecoded(changefeedResultTypes[0], &cf.a); err != nil {
if err := d.EnsureDecoded(changefeeddist.ChangefeedResultTypes[0], &cf.a); err != nil {
return err
}
raw, ok := d.Datum.(*tree.DBytes)
Expand Down
7 changes: 5 additions & 2 deletions pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,17 @@ const (
// cursor is specified. This option is useful to create a changefeed which
// subscribes only to new messages.
OptNoInitialScan = `no_initial_scan`
// Sentinel value to indicate that all resolved timestamp events should be emitted.
OptEmitAllResolvedTimestamps = ``

OptEnvelopeKeyOnly EnvelopeType = `key_only`
OptEnvelopeRow EnvelopeType = `row`
OptEnvelopeDeprecatedRow EnvelopeType = `deprecated_row`
OptEnvelopeWrapped EnvelopeType = `wrapped`

OptFormatJSON FormatType = `json`
OptFormatAvro FormatType = `experimental_avro`
OptFormatJSON FormatType = `json`
OptFormatAvro FormatType = `experimental_avro`
OptFormatNative FormatType = `native`

SinkParamCACert = `ca_cert`
SinkParamClientCert = `client_cert`
Expand Down
12 changes: 12 additions & 0 deletions pkg/ccl/changefeedccl/changefeedbase/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,15 @@ var TableDescriptorPollInterval = settings.RegisterDurationSetting(
1*time.Second,
settings.NonNegativeDuration,
)

// DefaultFlushFrequency is the default frequency to flush sink.
// See comment in newChangeAggregatorProcessor for explanation on the value.
var DefaultFlushFrequency = 5 * time.Second

// TestingSetDefaultFlushFrequency changes defaultFlushFrequency for tests.
// Returns function to restore flush frequency to its original value.
func TestingSetDefaultFlushFrequency(f time.Duration) func() {
old := DefaultFlushFrequency
DefaultFlushFrequency = f
return func() { DefaultFlushFrequency = old }
}
19 changes: 19 additions & 0 deletions pkg/ccl/changefeedccl/changefeeddist/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "changefeeddist",
srcs = ["distflow.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeeddist",
visibility = ["//visibility:public"],
deps = [
"//pkg/jobs/jobspb",
"//pkg/kv",
"//pkg/roachpb",
"//pkg/sql",
"//pkg/sql/execinfrapb",
"//pkg/sql/physicalplan",
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"//pkg/util/hlc",
],
)
Loading

0 comments on commit fd9cde8

Please sign in to comment.