Skip to content

Commit

Permalink
Merge #60483
Browse files Browse the repository at this point in the history
60483: bulkio: Implement `CREATE REPLICATION STREAM` r=miretskiy a=miretskiy

Initial implementation of `CREATE REPLICATION STREAM`.

The implementation uses changefeed distflow processing which has
been refactor to accomodate this new use case.

The replication stream expects to receive raw KVs.  This is
accomplished by implementing native encoding in changefeeds:
this encoder emits raw bytes representing keys and values.

The plan hook does a "core" style changefeeds -- that is, it
expects the client to be connected to receive changed rows.

Follow on work will implement replication stream resumer
as well as replication stream sinks.

The other commits in this PR add SQL grammar definitions, as well
as add minor tweaks to CDC code to enable configuration for
streaming use case.

Informs #57422

Release Notes: None

Co-authored-by: Yevgeniy Miretskiy <[email protected]>
  • Loading branch information
craig[bot] and Yevgeniy Miretskiy committed Feb 18, 2021
2 parents 19726fa + fd9cde8 commit 95c0fe5
Show file tree
Hide file tree
Showing 31 changed files with 1,268 additions and 248 deletions.
47 changes: 32 additions & 15 deletions docs/generated/sql/bnf/stmt_block.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ create_stmt ::=
| create_ddl_stmt
| create_stats_stmt
| create_schedule_for_backup_stmt
| create_changefeed_stmt
| create_replication_stream_stmt
| create_extension_stmt

delete_stmt ::=
Expand Down Expand Up @@ -426,8 +428,7 @@ create_role_stmt ::=
| 'CREATE' role_or_group_or_user 'IF' 'NOT' 'EXISTS' string_or_placeholder opt_role_options

create_ddl_stmt ::=
create_changefeed_stmt
| create_database_stmt
create_database_stmt
| create_index_stmt
| create_schema_stmt
| create_table_stmt
Expand All @@ -442,6 +443,12 @@ create_stats_stmt ::=
create_schedule_for_backup_stmt ::=
'CREATE' 'SCHEDULE' opt_description 'FOR' 'BACKUP' opt_backup_targets 'INTO' string_or_placeholder_opt_list opt_with_backup_options cron_expr opt_full_backup_clause opt_with_schedule_options

create_changefeed_stmt ::=
'CREATE' 'CHANGEFEED' 'FOR' changefeed_targets opt_changefeed_sink opt_with_options

create_replication_stream_stmt ::=
'CREATE' 'REPLICATION' 'STREAM' 'FOR' targets opt_changefeed_sink opt_with_replication_options

create_extension_stmt ::=
'CREATE' 'EXTENSION' 'IF' 'NOT' 'EXISTS' name
| 'CREATE' 'EXTENSION' name
Expand Down Expand Up @@ -828,6 +835,7 @@ unreserved_keyword ::=
| 'CSV'
| 'CUBE'
| 'CURRENT'
| 'CURSOR'
| 'CYCLE'
| 'DATA'
| 'DATABASE'
Expand Down Expand Up @@ -1308,9 +1316,6 @@ for_schedules_clause ::=
'FOR' 'SCHEDULES' select_stmt
| 'FOR' 'SCHEDULE' a_expr

create_changefeed_stmt ::=
'CREATE' 'CHANGEFEED' 'FOR' changefeed_targets opt_changefeed_sink opt_with_options

create_database_stmt ::=
'CREATE' 'DATABASE' database_name opt_with opt_template_clause opt_encoding_clause opt_lc_collate_clause opt_lc_ctype_clause opt_connection_limit opt_primary_region_clause opt_regions_list opt_survival_goal_clause
| 'CREATE' 'DATABASE' 'IF' 'NOT' 'EXISTS' database_name opt_with opt_template_clause opt_encoding_clause opt_lc_collate_clause opt_lc_ctype_clause opt_connection_limit opt_primary_region_clause opt_regions_list opt_survival_goal_clause
Expand Down Expand Up @@ -1381,6 +1386,18 @@ opt_with_schedule_options ::=
| 'WITH' 'SCHEDULE' 'OPTIONS' '(' kv_option_list ')'
|

changefeed_targets ::=
single_table_pattern_list
| 'TABLE' single_table_pattern_list

opt_changefeed_sink ::=
'INTO' string_or_placeholder

opt_with_replication_options ::=
'WITH' replication_options_list
| 'WITH' 'OPTIONS' '(' replication_options_list ')'
|

with_clause ::=
'WITH' cte_list
| 'WITH' 'RECURSIVE' cte_list
Expand Down Expand Up @@ -1790,13 +1807,6 @@ sub_type ::=
| 'SOME'
| 'ALL'

changefeed_targets ::=
single_table_pattern_list
| 'TABLE' single_table_pattern_list

opt_changefeed_sink ::=
'INTO' string_or_placeholder

opt_template_clause ::=
'TEMPLATE' opt_equal non_reserved_word_or_sconst
|
Expand Down Expand Up @@ -1912,6 +1922,12 @@ opt_sequence_option_list ::=
sequence_option_list
|

single_table_pattern_list ::=
( table_name ) ( ( ',' table_name ) )*

replication_options_list ::=
( replication_options ) ( ( ',' replication_options ) )*

cte_list ::=
( common_table_expr ) ( ( ',' common_table_expr ) )*

Expand Down Expand Up @@ -2298,9 +2314,6 @@ math_op ::=
| 'GREATER_EQUALS'
| 'NOT_EQUALS'

single_table_pattern_list ::=
( table_name ) ( ( ',' table_name ) )*

opt_equal ::=
'='
|
Expand Down Expand Up @@ -2345,6 +2358,10 @@ create_as_table_defs ::=
enum_val_list ::=
( 'SCONST' ) ( ( ',' 'SCONST' ) )*

replication_options ::=
'CURSOR' '=' a_expr
| 'DETACHED'

common_table_expr ::=
table_alias_name opt_column_list 'AS' '(' preparable_stmt ')'
| table_alias_name opt_column_list 'AS' materialize_clause '(' preparable_stmt ')'
Expand Down
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ ALL_TESTS = [
"//pkg/ccl/streamingccl/streamclient:streamclient_test",
"//pkg/ccl/streamingccl/streamingest:streamingest_test",
"//pkg/ccl/streamingccl/streamingutils:streamingutils_test",
"//pkg/ccl/streamingccl/streamproducer:streamproducer_test",
"//pkg/ccl/utilccl/sampledataccl:sampledataccl_test",
"//pkg/ccl/utilccl:utilccl_test",
"//pkg/ccl/workloadccl/allccl:allccl_test",
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ go_library(
"//pkg/ccl/storageccl/engineccl",
"//pkg/ccl/streamingccl/streamingest",
"//pkg/ccl/streamingccl/streamingutils",
"//pkg/ccl/streamingccl/streamproducer",
"//pkg/ccl/utilccl",
"//pkg/ccl/workloadccl",
],
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/ccl_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
_ "github.com/cockroachdb/cockroach/pkg/ccl/storageccl/engineccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamingest"
_ "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamingutils"
_ "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamproducer"
_ "github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/workloadccl"
)
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ go_library(
"//pkg/base",
"//pkg/ccl/backupccl/backupbase",
"//pkg/ccl/changefeedccl/changefeedbase",
"//pkg/ccl/changefeedccl/changefeeddist",
"//pkg/ccl/changefeedccl/kvfeed",
"//pkg/ccl/utilccl",
"//pkg/docs",
Expand Down Expand Up @@ -57,7 +58,6 @@ go_library(
"//pkg/sql/flowinfra",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/physicalplan",
"//pkg/sql/privilege",
"//pkg/sql/roleoption",
"//pkg/sql/row",
Expand Down
3 changes: 1 addition & 2 deletions pkg/ccl/changefeedccl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/lease"
"github.com/cockroachdb/cockroach/pkg/sql/distsql"
Expand Down Expand Up @@ -132,7 +131,7 @@ func makeBenchSink() *benchSink {
}

func (s *benchSink) EmitRow(
ctx context.Context, table catalog.TableDescriptor, key, value []byte, updated hlc.Timestamp,
ctx context.Context, topicDescr TopicDescriptor, key, value []byte, updated hlc.Timestamp,
) error {
return s.emit(int64(len(key) + len(value)))
}
Expand Down
137 changes: 3 additions & 134 deletions pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,15 @@ package changefeedccl
import (
"context"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeeddist"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

Expand All @@ -35,13 +33,6 @@ const (
changeFrontierProcName = `changefntr`
)

var changefeedResultTypes = []*types.T{
types.Bytes, // resolved span
types.String, // topic
types.Bytes, // key
types.Bytes, // value
}

// distChangefeedFlow plans and runs a distributed changefeed.
//
// One or more ChangeAggregator processors watch table data for changes. These
Expand Down Expand Up @@ -105,96 +96,8 @@ func distChangefeedFlow(
return err
}

// Changefeed flows handle transactional consistency themselves.
var noTxn *kv.Txn

dsp := execCtx.DistSQLPlanner()
evalCtx := execCtx.ExtendedEvalContext()
planCtx := dsp.NewPlanningCtx(ctx, evalCtx, nil /* planner */, noTxn, execCfg.Codec.ForSystemTenant() /* distribute */)

var spanPartitions []sql.SpanPartition
if details.SinkURI == `` {
// Sinkless feeds get one ChangeAggregator on the gateway.
spanPartitions = []sql.SpanPartition{{Node: dsp.GatewayID(), Spans: trackedSpans}}
} else {
// All other feeds get a ChangeAggregator local on the leaseholder.
spanPartitions, err = dsp.PartitionSpans(planCtx, trackedSpans)
if err != nil {
return err
}
}

corePlacement := make([]physicalplan.ProcessorCorePlacement, len(spanPartitions))
for i, sp := range spanPartitions {
// TODO(dan): Merge these watches with the span-level resolved
// timestamps from the job progress.
watches := make([]execinfrapb.ChangeAggregatorSpec_Watch, len(sp.Spans))
for watchIdx, nodeSpan := range sp.Spans {
watches[watchIdx] = execinfrapb.ChangeAggregatorSpec_Watch{
Span: nodeSpan,
InitialResolved: initialHighWater,
}
}

corePlacement[i].NodeID = sp.Node
corePlacement[i].Core.ChangeAggregator = &execinfrapb.ChangeAggregatorSpec{
Watches: watches,
Feed: details,
UserProto: execCtx.User().EncodeProto(),
}
}
// NB: This SpanFrontier processor depends on the set of tracked spans being
// static. Currently there is no way for them to change after the changefeed
// is created, even if it is paused and unpaused, but #28982 describes some
// ways that this might happen in the future.
changeFrontierSpec := execinfrapb.ChangeFrontierSpec{
TrackedSpans: trackedSpans,
Feed: details,
JobID: jobID,
UserProto: execCtx.User().EncodeProto(),
}

p := planCtx.NewPhysicalPlan()
p.AddNoInputStage(corePlacement, execinfrapb.PostProcessSpec{}, changefeedResultTypes, execinfrapb.Ordering{})
p.AddSingleGroupStage(
dsp.GatewayID(),
execinfrapb.ProcessorCoreUnion{ChangeFrontier: &changeFrontierSpec},
execinfrapb.PostProcessSpec{},
changefeedResultTypes,
)

p.PlanToStreamColMap = []int{1, 2, 3}
dsp.FinalizePlan(planCtx, p)

resultRows := makeChangefeedResultWriter(resultsCh)
recv := sql.MakeDistSQLReceiver(
ctx,
resultRows,
tree.Rows,
execCfg.RangeDescriptorCache,
noTxn,
nil, /* clockUpdater */
evalCtx.Tracing,
execCfg.ContentionRegistry,
)
defer recv.Release()

var finishedSetupFn func()
if details.SinkURI != `` {
// We abuse the job's results channel to make CREATE CHANGEFEED wait for
// this before returning to the user to ensure the setup went okay. Job
// resumption doesn't have the same hack, but at the moment ignores
// results and so is currently okay. Return nil instead of anything
// meaningful so that if we start doing anything with the results
// returned by resumed jobs, then it breaks instead of returning
// nonsense.
finishedSetupFn = func() { resultsCh <- tree.Datums(nil) }
}

// Copy the evalCtx, as dsp.Run() might change it.
evalCtxCopy := *evalCtx
dsp.Run(planCtx, noTxn, p, recv, &evalCtxCopy, finishedSetupFn)()
return resultRows.Err()
return changefeeddist.StartDistChangefeed(
ctx, execCtx, jobID, details, trackedSpans, initialHighWater, resultsCh)
}

func fetchSpansForTargets(
Expand All @@ -220,37 +123,3 @@ func fetchSpansForTargets(
})
return spans, err
}

// changefeedResultWriter implements the `rowexec.resultWriter` that sends
// the received rows back over the given channel.
type changefeedResultWriter struct {
rowsCh chan<- tree.Datums
rowsAffected int
err error
}

func makeChangefeedResultWriter(rowsCh chan<- tree.Datums) *changefeedResultWriter {
return &changefeedResultWriter{rowsCh: rowsCh}
}

func (w *changefeedResultWriter) AddRow(ctx context.Context, row tree.Datums) error {
// Copy the row because it's not guaranteed to exist after this function
// returns.
row = append(tree.Datums(nil), row...)

select {
case <-ctx.Done():
return ctx.Err()
case w.rowsCh <- row:
return nil
}
}
func (w *changefeedResultWriter) IncrementRowsAffected(n int) {
w.rowsAffected += n
}
func (w *changefeedResultWriter) SetError(err error) {
w.err = err
}
func (w *changefeedResultWriter) Err() error {
return w.err
}
Loading

0 comments on commit 95c0fe5

Please sign in to comment.