From a9e25d6da47ef0c9126e76c2a49a651f642593b3 Mon Sep 17 00:00:00 2001 From: Eric Wang Date: Thu, 7 Jan 2021 11:59:14 -0500 Subject: [PATCH 1/7] sql: add qualification prefix for user-defined schema names Fixes https://github.com/cockroachdb/cockroach/issues/57738. Previously, event logs were not capturing the qualified schema names for create_schema, drop_schema, rename_schema and alter_schema_owner events. This PR changes the event logs to use the qualified schema names. Tests were also updated to reflect these changes. Release note (bug fix): add qualification prefix for user-defined schema names. --- pkg/sql/alter_schema.go | 28 +++++++++++++------ pkg/sql/create_schema.go | 10 +++++-- pkg/sql/drop_schema.go | 11 ++++++-- .../logictest/testdata/logic_test/event_log | 20 ++++++------- pkg/sql/resolver.go | 19 +++++++++++++ 5 files changed, 64 insertions(+), 24 deletions(-) diff --git a/pkg/sql/alter_schema.go b/pkg/sql/alter_schema.go index a4e9d0b73184..1150dd6b076a 100644 --- a/pkg/sql/alter_schema.go +++ b/pkg/sql/alter_schema.go @@ -93,18 +93,27 @@ func (p *planner) AlterSchema(ctx context.Context, n *tree.AlterSchema) (planNod func (n *alterSchemaNode) startExec(params runParams) error { switch t := n.n.Cmd.(type) { case *tree.AlterSchemaRename: - oldName := n.desc.Name newName := string(t.NewName) + + oldQualifiedSchemaName, err := params.p.getQualifiedSchemaName(params.ctx, n.desc) + if err != nil { + return err + } + if err := params.p.renameSchema( params.ctx, n.db, n.desc, newName, tree.AsStringWithFQNames(n.n, params.Ann()), ); err != nil { return err } + + newQualifiedSchemaName, err := params.p.getQualifiedSchemaName(params.ctx, n.desc) + if err != nil { + return err + } + return params.p.logEvent(params.ctx, n.desc.ID, &eventpb.RenameSchema{ - // TODO(knz): This name is insufficiently qualified. - // See: https://github.com/cockroachdb/cockroach/issues/57738 - SchemaName: oldName, - NewSchemaName: newName, + SchemaName: oldQualifiedSchemaName.String(), + NewSchemaName: newQualifiedSchemaName.String(), }) case *tree.AlterSchemaOwner: newOwner := t.Owner @@ -158,12 +167,15 @@ func (p *planner) checkCanAlterSchemaAndSetNewOwner( privs := scDesc.GetPrivileges() privs.SetOwner(newOwner) + qualifiedSchemaName, err := p.getQualifiedSchemaName(ctx, scDesc) + if err != nil { + return err + } + return p.logEvent(ctx, scDesc.GetID(), &eventpb.AlterSchemaOwner{ - // TODO(knz): This name is insufficiently qualified. - // See: https://github.com/cockroachdb/cockroach/issues/57738 - SchemaName: scDesc.GetName(), + SchemaName: qualifiedSchemaName.String(), Owner: newOwner.Normalized(), }) } diff --git a/pkg/sql/create_schema.go b/pkg/sql/create_schema.go index a254ed1eacaf..134756ae4650 100644 --- a/pkg/sql/create_schema.go +++ b/pkg/sql/create_schema.go @@ -167,12 +167,16 @@ func (p *planner) createUserDefinedSchema(params runParams, n *tree.CreateSchema ); err != nil { return err } + + qualifiedSchemaName, err := p.getQualifiedSchemaName(params.ctx, desc) + if err != nil { + return err + } + return params.p.logEvent(params.ctx, desc.GetID(), - // TODO(knz): This is missing some details about the database. - // See: https://github.com/cockroachdb/cockroach/issues/57738 &eventpb.CreateSchema{ - SchemaName: schemaName, + SchemaName: qualifiedSchemaName.String(), Owner: privs.Owner().Normalized(), }) } diff --git a/pkg/sql/drop_schema.go b/pkg/sql/drop_schema.go index 73a9efcb5629..78bfd0dd7c18 100644 --- a/pkg/sql/drop_schema.go +++ b/pkg/sql/drop_schema.go @@ -169,11 +169,16 @@ func (n *dropSchemaNode) startExec(params runParams) error { // in the same transaction as table descriptor update. for _, schemaToDelete := range n.d.schemasToDelete { sc := schemaToDelete.schema + qualifiedSchemaName, err := p.getQualifiedSchemaName(params.ctx, sc.Desc) + if err != nil { + return err + } + if err := params.p.logEvent(params.ctx, sc.ID, - // TODO(knz): This is missing some details about the database. - // See: https://github.com/cockroachdb/cockroach/issues/57738 - &eventpb.DropSchema{SchemaName: sc.Name}); err != nil { + &eventpb.DropSchema{ + SchemaName: qualifiedSchemaName.String(), + }); err != nil { return err } } diff --git a/pkg/sql/logictest/testdata/logic_test/event_log b/pkg/sql/logictest/testdata/logic_test/event_log index 8182ef6b3210..992ce32e131e 100644 --- a/pkg/sql/logictest/testdata/logic_test/event_log +++ b/pkg/sql/logictest/testdata/logic_test/event_log @@ -657,9 +657,9 @@ FROM system.eventlog WHERE "eventType" = 'create_schema' ORDER BY "timestamp", info ---- -1 {"EventType": "create_schema", "Owner": "root", "SchemaName": "sc", "Statement": "CREATE SCHEMA \"\".sc", "User": "root"} -1 {"EventType": "create_schema", "Owner": "root", "SchemaName": "s", "Statement": "CREATE SCHEMA \"\".s", "User": "root"} -1 {"EventType": "create_schema", "Owner": "u", "SchemaName": "u", "Statement": "CREATE SCHEMA AUTHORIZATION u", "User": "root"} +1 {"EventType": "create_schema", "Owner": "root", "SchemaName": "test.sc", "Statement": "CREATE SCHEMA \"\".sc", "User": "root"} +1 {"EventType": "create_schema", "Owner": "root", "SchemaName": "test.s", "Statement": "CREATE SCHEMA \"\".s", "User": "root"} +1 {"EventType": "create_schema", "Owner": "u", "SchemaName": "test.u", "Statement": "CREATE SCHEMA AUTHORIZATION u", "User": "root"} statement ok ALTER SCHEMA u RENAME TO t @@ -669,7 +669,7 @@ SELECT "reportingID", info::JSONB - 'Timestamp' - 'DescriptorID' FROM system.eventlog WHERE "eventType" = 'rename_schema' ---- -1 {"EventType": "rename_schema", "NewSchemaName": "t", "SchemaName": "u", "Statement": "ALTER SCHEMA \"\".u RENAME TO t", "User": "root"} +1 {"EventType": "rename_schema", "NewSchemaName": "test.t", "SchemaName": "test.u", "Statement": "ALTER SCHEMA \"\".u RENAME TO t", "User": "root"} statement ok DROP SCHEMA s, t @@ -683,10 +683,10 @@ FROM system.eventlog WHERE "eventType" = 'drop_schema' ORDER BY "timestamp", info ---- -1 {"EventType": "drop_schema", "SchemaName": "eventlogtonewname", "Statement": "DROP SCHEMA \"\".eventlogtonewname", "User": "root"} -1 {"EventType": "drop_schema", "SchemaName": "sc", "Statement": "DROP SCHEMA \"\".sc", "User": "root"} -1 {"EventType": "drop_schema", "SchemaName": "s", "Statement": "DROP SCHEMA \"\".s, \"\".t", "User": "root"} -1 {"EventType": "drop_schema", "SchemaName": "t", "Statement": "DROP SCHEMA \"\".s, \"\".t", "User": "root"} +1 {"EventType": "drop_schema", "SchemaName": "test.eventlogtonewname", "Statement": "DROP SCHEMA \"\".eventlogtonewname", "User": "root"} +1 {"EventType": "drop_schema", "SchemaName": "test.sc", "Statement": "DROP SCHEMA \"\".sc", "User": "root"} +1 {"EventType": "drop_schema", "SchemaName": "test.s", "Statement": "DROP SCHEMA \"\".s, \"\".t", "User": "root"} +1 {"EventType": "drop_schema", "SchemaName": "test.t", "Statement": "DROP SCHEMA \"\".s, \"\".t", "User": "root"} subtest eventlog_setting_disable @@ -801,7 +801,7 @@ SELECT "reportingID", "eventType", info::JSONB - 'Timestamp' - 'DescriptorID' ORDER BY "timestamp", info ---- 1 alter_database_owner {"DatabaseName": "atest", "EventType": "alter_database_owner", "Owner": "u", "Statement": "ALTER DATABASE atest OWNER TO u", "User": "root"} -1 alter_schema_owner {"EventType": "alter_schema_owner", "Owner": "u", "SchemaName": "sc", "Statement": "ALTER SCHEMA atest.sc OWNER TO u", "User": "root"} +1 alter_schema_owner {"EventType": "alter_schema_owner", "Owner": "u", "SchemaName": "atest.sc", "Statement": "ALTER SCHEMA atest.sc OWNER TO u", "User": "root"} 1 alter_table_owner {"EventType": "alter_table_owner", "Owner": "u", "Statement": "ALTER TABLE atest.sc.t OWNER TO u", "TableName": "atest.sc.t", "User": "root"} 1 alter_type_owner {"EventType": "alter_type_owner", "Owner": "u", "Statement": "ALTER TYPE atest.sc.ty OWNER TO u", "TypeName": "ty", "User": "root"} 1 alter_type_owner {"EventType": "alter_type_owner", "Owner": "u", "Statement": "ALTER TYPE atest.sc.ty OWNER TO u", "TypeName": "_ty", "User": "root"} @@ -840,7 +840,7 @@ SELECT "reportingID", "eventType", info::JSONB - 'Timestamp' - 'DescriptorID' ORDER BY "timestamp", info ---- 1 alter_database_owner {"DatabaseName": "atest", "EventType": "alter_database_owner", "Owner": "v", "Statement": "REASSIGN OWNED BY testuser TO v", "User": "root"} -1 alter_schema_owner {"EventType": "alter_schema_owner", "Owner": "v", "SchemaName": "sc", "Statement": "REASSIGN OWNED BY testuser TO v", "User": "root"} +1 alter_schema_owner {"EventType": "alter_schema_owner", "Owner": "v", "SchemaName": "atest.sc", "Statement": "REASSIGN OWNED BY testuser TO v", "User": "root"} 1 alter_table_owner {"EventType": "alter_table_owner", "Owner": "v", "Statement": "REASSIGN OWNED BY testuser TO v", "TableName": "atest.sc.t", "User": "root"} 1 alter_table_owner {"EventType": "alter_table_owner", "Owner": "v", "Statement": "REASSIGN OWNED BY testuser TO v", "TableName": "atest.sc.v", "User": "root"} 1 alter_table_owner {"EventType": "alter_table_owner", "Owner": "v", "Statement": "REASSIGN OWNED BY testuser TO v", "TableName": "atest.sc.s", "User": "root"} diff --git a/pkg/sql/resolver.go b/pkg/sql/resolver.go index dbeb77d8e771..d1e8d82c3932 100644 --- a/pkg/sql/resolver.go +++ b/pkg/sql/resolver.go @@ -423,6 +423,25 @@ func (p *planner) getQualifiedTableName( return &tbName, nil } +// getQualifiedSchemaName returns the database-qualified name of the +// schema represented by the provided descriptor. +func (p *planner) getQualifiedSchemaName( + ctx context.Context, desc catalog.SchemaDescriptor, +) (*tree.ObjectNamePrefix, error) { + dbDesc, err := p.Descriptors().GetImmutableDatabaseByID(ctx, p.txn, desc.GetParentID(), tree.DatabaseLookupFlags{ + Required: true, + }) + if err != nil { + return nil, err + } + return &tree.ObjectNamePrefix{ + CatalogName: tree.Name(dbDesc.GetName()), + SchemaName: tree.Name(desc.GetName()), + ExplicitCatalog: true, + ExplicitSchema: true, + }, nil +} + // findTableContainingIndex returns the descriptor of a table // containing the index of the given name. // This is used by expandMutableIndexName(). From bab2c61ece7fdf52e23da50e30ef60d018df51aa Mon Sep 17 00:00:00 2001 From: Andrew Kimball Date: Wed, 20 Jan 2021 16:34:30 -0700 Subject: [PATCH 2/7] opt: add opttester command to check rule applications and memo size Previously, it was difficult to test that the number of rule applications and/or memo groups remains reasonable during optimization of a query. This patch adds the `check-size` command, which outputs the number of rules applied and memo groups created during optimization of the query. The `rule-limit` and `group-limit` flags can be used to throw an error if the number of rule applications or memo groups exceed some limit. Release note: None Fixes #59192 --- pkg/sql/opt/testutils/opttester/opt_tester.go | 77 +++++ pkg/sql/opt/xform/testdata/rules/join_order | 266 +----------------- 2 files changed, 82 insertions(+), 261 deletions(-) diff --git a/pkg/sql/opt/testutils/opttester/opt_tester.go b/pkg/sql/opt/testutils/opttester/opt_tester.go index 617dd5e551a2..f92955f0870b 100644 --- a/pkg/sql/opt/testutils/opttester/opt_tester.go +++ b/pkg/sql/opt/testutils/opttester/opt_tester.go @@ -211,6 +211,14 @@ type Flags struct { // optsteps command with a split diff where the before and after expressions // are printed in their entirety. The default value is false. OptStepsSplitDiff bool + + // RuleApplicationLimit is used by the check-size command to check whether + // more than RuleApplicationLimit rules are applied during optimization. + RuleApplicationLimit int64 + + // MemoGroupLimit is used by the check-size command to check whether + // more than MemoGroupLimit memo groups are constructed during optimization. + MemoGroupLimit int64 } // New constructs a new instance of the OptTester for the given SQL statement. @@ -331,6 +339,13 @@ func New(catalog cat.Catalog, sql string) *OptTester { // // Injects table statistics from a json file. // +// - check-size [rule-limit=...] [group-limit=...] +// +// Fully optimizes the given query and outputs the number of rules applied +// and memo groups created. If the rule-limit or group-limit flags are set, +// check-size will result in a test error if the rule application or memo +// group count exceeds the corresponding limit. +// // Supported flags: // // - format: controls the formatting of expressions for build, opt, and @@ -411,6 +426,12 @@ func New(catalog cat.Catalog, sql string) *OptTester { // a split diff where the before and after expressions are printed in their // entirety. This is only used by the optsteps command. // +// - rule-limit: used with check-size to set a max limit on the number of rules +// that can be applied before a testing error is returned. +// +// - group-limit: used with check-size to set a max limit on the number of +// groups that can be added to the memo before a testing error is returned. +// func (ot *OptTester) RunCommand(tb testing.TB, d *datadriven.TestData) string { // Allow testcases to override the flags. for _, a := range d.CmdArgs { @@ -602,6 +623,13 @@ func (ot *OptTester) RunCommand(tb testing.TB, d *datadriven.TestData) string { } return result + case "check-size": + result, err := ot.CheckSize() + if err != nil { + d.Fatalf(tb, "%+v", err) + } + return result + default: d.Fatalf(tb, "unsupported command: %s", d.Cmd) return "" @@ -876,6 +904,26 @@ func (f *Flags) Set(arg datadriven.CmdArg) error { case "split-diff": f.OptStepsSplitDiff = true + case "rule-limit": + if len(arg.Vals) != 1 { + return fmt.Errorf("rule-limit requires one argument") + } + limit, err := strconv.ParseInt(arg.Vals[0], 10, 64) + if err != nil { + return err + } + f.RuleApplicationLimit = limit + + case "group-limit": + if len(arg.Vals) != 1 { + return fmt.Errorf("group-limit requires one argument") + } + limit, err := strconv.ParseInt(arg.Vals[0], 10, 64) + if err != nil { + return err + } + f.MemoGroupLimit = limit + default: return fmt.Errorf("unknown argument: %s", arg.Key) } @@ -1541,6 +1589,35 @@ func (ot *OptTester) makeStat( } } +// CheckSize optimizes the given query and tracks the number of rule +// applications that take place and the number of groups added to the memo. +// If either of these values exceeds the given limits (if any), an error is +// returned. +func (ot *OptTester) CheckSize() (string, error) { + o := ot.makeOptimizer() + var ruleApplications int64 + o.NotifyOnAppliedRule( + func(ruleName opt.RuleName, source, target opt.Expr) { + ruleApplications++ + }, + ) + var groups int64 + o.Memo().NotifyOnNewGroup(func(expr opt.Expr) { + groups++ + }) + if _, err := ot.optimizeExpr(o); err != nil { + return "", err + } + if ot.Flags.RuleApplicationLimit > 0 && ruleApplications > ot.Flags.RuleApplicationLimit { + return "", fmt.Errorf( + "rule applications exceeded limit: %d applications", ruleApplications) + } + if ot.Flags.MemoGroupLimit > 0 && groups > ot.Flags.MemoGroupLimit { + return "", fmt.Errorf("memo groups exceeded limit: %d groups", groups) + } + return fmt.Sprintf("Rules Applied: %d\nGroups Added: %d\n", ruleApplications, groups), nil +} + func (ot *OptTester) buildExpr(factory *norm.Factory) error { stmt, err := parser.ParseOne(ot.sql) if err != nil { diff --git a/pkg/sql/opt/xform/testdata/rules/join_order b/pkg/sql/opt/xform/testdata/rules/join_order index aa6ac6089f0a..81e0487162c0 100644 --- a/pkg/sql/opt/xform/testdata/rules/join_order +++ b/pkg/sql/opt/xform/testdata/rules/join_order @@ -2367,269 +2367,13 @@ inner-join (hash) ---- # Regression test for #59076. Do not reorder on the inner join produced by -# CommuteSemiJoin when it matches on an already-reordered semi join. The -# following test output should only have 6 join trees. -reorderjoins format=hide-all +# CommuteSemiJoin when it matches on an already-reordered semi join because +# doing so can lead to an exponential blowup in the size of the memo. +check-size rule-limit=200 group-limit=100 SELECT * FROM cy WHERE EXISTS (SELECT 1 FROM dz WHERE z = y) AND EXISTS (SELECT 1 FROM bx WHERE x = y) AND EXISTS (SELECT 1 FROM abc WHERE a = y) ---- ----- --------------------------------------------------------------------------------- -----Join Tree #1---- -semi-join (hash) - ├── scan cy - ├── scan abc - └── filters - └── a = y - -----Vertexes---- -A: -scan cy - -B: -scan abc - -----Edges---- -a = y [semi] - -----Joining AB---- -A B refs [AB] [semi] - -Joins Considered: 1 --------------------------------------------------------------------------------- -----Join Tree #2---- -inner-join (hash) - ├── scan cy - ├── scan abc - └── filters - └── a = y - -----Vertexes---- -A: -scan cy - -B: -scan abc - -----Edges---- -a = y [inner] - -----Joining AB---- -A B refs [AB] [inner] -B A refs [AB] [inner] - -Joins Considered: 2 --------------------------------------------------------------------------------- -----Join Tree #3---- -semi-join (hash) - ├── semi-join (hash) - │ ├── scan cy - │ ├── scan abc - │ └── filters - │ └── a = y - ├── scan bx - └── filters - └── x = y - -----Vertexes---- -A: -scan cy - -B: -scan abc - -C: -scan bx - -----Edges---- -a = y [semi] -x = y [semi] - -----Joining AB---- -A B refs [AB] [semi] -----Joining AC---- -A C refs [AC] [semi] -----Joining ABC---- -AC B refs [AB] [semi] -AB C refs [AC] [semi] - -Joins Considered: 4 --------------------------------------------------------------------------------- -----Join Tree #4---- -inner-join (hash) - ├── semi-join (hash) - │ ├── scan cy - │ ├── scan abc - │ └── filters - │ └── a = y - ├── distinct-on - │ └── scan bx - └── filters - └── x = y - -----Vertexes---- -A: -scan cy - -B: -scan abc - -C: -distinct-on - └── scan bx - -----Edges---- -a = y [semi] -x = y [inner] - -----Joining AB---- -A B refs [AB] [semi] -----Joining AC---- -A C refs [AC] [inner] -C A refs [AC] [inner] -----Joining ABC---- -AC B refs [AB] [semi] -AB C refs [AC] [inner] -C AB refs [AC] [inner] - -Joins Considered: 6 --------------------------------------------------------------------------------- -----Join Tree #5---- -semi-join (hash) - ├── semi-join (hash) - │ ├── semi-join (hash) - │ │ ├── scan cy - │ │ ├── scan abc - │ │ └── filters - │ │ └── a = y - │ ├── scan bx - │ └── filters - │ └── x = y - ├── scan dz - └── filters - └── z = y - -----Vertexes---- -A: -scan cy - -B: -scan abc - -C: -scan bx - -D: -scan dz - -----Edges---- -a = y [semi] -x = y [semi] -z = y [semi] - -----Joining AB---- -A B refs [AB] [semi] -----Joining AC---- -A C refs [AC] [semi] -----Joining ABC---- -AC B refs [AB] [semi] -AB C refs [AC] [semi] -----Joining AD---- -A D refs [AD] [semi] -----Joining ABD---- -AD B refs [AB] [semi] -AB D refs [AD] [semi] -----Joining ACD---- -AD C refs [AC] [semi] -AC D refs [AD] [semi] -----Joining ABCD---- -ACD B refs [AB] [semi] -ABD C refs [AC] [semi] -ABC D refs [AD] [semi] - -Joins Considered: 12 --------------------------------------------------------------------------------- -----Join Tree #6---- -inner-join (hash) - ├── semi-join (hash) - │ ├── semi-join (hash) - │ │ ├── scan cy - │ │ ├── scan abc - │ │ └── filters - │ │ └── a = y - │ ├── scan bx - │ └── filters - │ └── x = y - ├── distinct-on - │ └── scan dz - └── filters - └── z = y - -----Vertexes---- -A: -scan cy - -B: -scan abc - -C: -scan bx - -D: -distinct-on - └── scan dz - -----Edges---- -a = y [semi] -x = y [semi] -z = y [inner] - -----Joining AB---- -A B refs [AB] [semi] -----Joining AC---- -A C refs [AC] [semi] -----Joining ABC---- -AC B refs [AB] [semi] -AB C refs [AC] [semi] -----Joining AD---- -A D refs [AD] [inner] -D A refs [AD] [inner] -----Joining ABD---- -AD B refs [AB] [semi] -AB D refs [AD] [inner] -D AB refs [AD] [inner] -----Joining ACD---- -AD C refs [AC] [semi] -AC D refs [AD] [inner] -D AC refs [AD] [inner] -----Joining ABCD---- -ACD B refs [AB] [semi] -ABD C refs [AC] [semi] -ABC D refs [AD] [inner] -D ABC refs [AD] [inner] - -Joins Considered: 16 --------------------------------------------------------------------------------- -----Final Plan---- -project - └── semi-join (hash) - ├── project - │ └── inner-join (hash) - │ ├── inner-join (hash) - │ │ ├── scan cy - │ │ ├── distinct-on - │ │ │ └── scan dz - │ │ └── filters - │ │ └── z = y - │ ├── distinct-on - │ │ └── scan bx - │ └── filters - │ └── x = y - ├── scan abc - └── filters - └── a = y --------------------------------------------------------------------------------- ----- ----- +Rules Applied: 149 +Groups Added: 76 From 4d419c85ac888dcaace18a4cbcd62b6ac729b135 Mon Sep 17 00:00:00 2001 From: Paul Bardea Date: Sun, 17 Jan 2021 20:10:45 -0500 Subject: [PATCH 3/7] streamingest: refactor stream ingestion processor This commit refactors the stream ingestion processor to do all of its work during the Next() call rather than starting a parallel producer goroutine. This was not needed since there is no pipeline of stages to process in this processor. Release note: None --- .../stream_ingestion_processor.go | 170 ++++++++++-------- 1 file changed, 93 insertions(+), 77 deletions(-) diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go index 69894836beeb..288274d1d5dc 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go @@ -65,7 +65,8 @@ type streamIngestionProcessor struct { // that it can be forwarded through the DistSQL flow. ingestionErr error - progressCh chan jobspb.ResolvedSpan + // eventCh is the merged event channel of all of the partition event streams. + eventCh chan partitionEvent } // partitionEvent augments a normal event with the partition it came from. @@ -93,12 +94,26 @@ func newStreamIngestionDataProcessor( output: output, curBatch: make([]storage.MVCCKeyValue, 0), client: streamclient.NewStreamClient(), - // TODO: This channel size was chosen arbitrarily. - progressCh: make(chan jobspb.ResolvedSpan, 10), + } + + evalCtx := flowCtx.EvalCtx + db := flowCtx.Cfg.DB + var err error + sip.batcher, err = bulk.MakeStreamSSTBatcher(sip.Ctx, db, evalCtx.Settings, + func() int64 { return storageccl.MaxImportBatchSize(evalCtx.Settings) }) + if err != nil { + return nil, errors.Wrap(err, "making sst batcher") } if err := sip.Init(sip, post, streamIngestionResultTypes, flowCtx, processorID, output, nil, /* memMonitor */ - execinfra.ProcStateOpts{}); err != nil { + execinfra.ProcStateOpts{ + InputsToDrain: []execinfra.RowSource{}, + TrailingMetaCallback: func(context.Context) []execinfrapb.ProducerMetadata { + sip.close() + return nil + }, + }, + ); err != nil { return nil, err } @@ -109,26 +124,17 @@ func newStreamIngestionDataProcessor( func (sip *streamIngestionProcessor) Start(ctx context.Context) context.Context { ctx = sip.StartInternal(ctx, streamIngestionProcessorName) - go func() { - defer close(sip.progressCh) - - startTime := timeutil.Unix(0 /* sec */, sip.spec.StartTime.WallTime) - eventChs := make(map[streamingccl.PartitionAddress]chan streamingccl.Event) - for _, partitionAddress := range sip.spec.PartitionAddresses { - eventCh, err := sip.client.ConsumePartition(partitionAddress, startTime) - if err != nil { - sip.ingestionErr = err - return - } - eventChs[partitionAddress] = eventCh + startTime := timeutil.Unix(0 /* sec */, sip.spec.StartTime.WallTime) + eventChs := make(map[streamingccl.PartitionAddress]chan streamingccl.Event) + for _, partitionAddress := range sip.spec.PartitionAddresses { + eventCh, err := sip.client.ConsumePartition(partitionAddress, startTime) + if err != nil { + sip.ingestionErr = errors.Wrapf(err, "consuming partition %v", partitionAddress) } - eventCh := merge(sip.Ctx, eventChs) + eventChs[partitionAddress] = eventCh + } + sip.eventCh = merge(ctx, eventChs) - if err := sip.startIngestion(eventCh); err != nil { - sip.ingestionErr = err - return - } - }() return ctx } @@ -138,9 +144,14 @@ func (sip *streamIngestionProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Pr return nil, sip.DrainHelper() } - progressUpdate, ok := <-sip.progressCh - if ok { - progressBytes, err := protoutil.Marshal(&progressUpdate) + progressUpdate, err := sip.consumeEvents() + if err != nil { + sip.MoveToDraining(err) + return nil, sip.DrainHelper() + } + + if progressUpdate != nil { + progressBytes, err := protoutil.Marshal(progressUpdate) if err != nil { sip.MoveToDraining(err) return nil, sip.DrainHelper() @@ -162,62 +173,15 @@ func (sip *streamIngestionProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Pr // ConsumerClosed is part of the RowSource interface. func (sip *streamIngestionProcessor) ConsumerClosed() { - if sip.batcher != nil { - sip.batcher.Close() - } - - sip.InternalClose() + sip.close() } -func (sip *streamIngestionProcessor) startIngestion(eventCh chan partitionEvent) error { - var err error - evalCtx := sip.flowCtx.EvalCtx - db := sip.flowCtx.Cfg.DB - - sip.batcher, err = bulk.MakeStreamSSTBatcher(sip.Ctx, db, evalCtx.Settings, - func() int64 { return storageccl.MaxImportBatchSize(evalCtx.Settings) }) - if err != nil { - sip.ingestionErr = err - } - - for event := range eventCh { - switch event.Type() { - case streamingccl.KVEvent: - kv := event.GetKV() - mvccKey := storage.MVCCKey{ - Key: kv.Key, - Timestamp: kv.Value.Timestamp, - } - sip.curBatch = append(sip.curBatch, storage.MVCCKeyValue{Key: mvccKey, Value: kv.Value.RawBytes}) - case streamingccl.CheckpointEvent: - // TODO: In addition to flushing when receiving a checkpoint event, we - // should also flush when we've buffered sufficient KVs. A buffering adder - // would save us here. - // TODO: Add a setting to control the max flush-rate. This would be a time - // interval to allow us to limit the number of flushes we do on - // checkpoints. - resolvedTimePtr := event.GetResolved() - if resolvedTimePtr == nil { - return errors.New("checkpoint event was expected to have a resolved timestamp") - } - resolvedTime := *resolvedTimePtr - if err := sip.flush(); err != nil { - return err - } - spanStartKey := roachpb.Key(event.partition) - sip.progressCh <- jobspb.ResolvedSpan{ - Span: roachpb.Span{ - Key: spanStartKey, - EndKey: spanStartKey.Next(), - }, - Timestamp: resolvedTime, - } - default: - return errors.Newf("unknown streaming event type %v", event.Type()) +func (sip *streamIngestionProcessor) close() { + if sip.InternalClose() { + if sip.batcher != nil { + sip.batcher.Close() } } - - return nil } func (sip *streamIngestionProcessor) flush() error { @@ -276,6 +240,58 @@ func merge( return merged } +// consumeEvents handles processing events on the merged event queue and returns +// once a checkpoint event has been emitted so that it can inform the downstream +// frontier processor to consider updating the frontier. +// +// It should only make a claim that about the resolved timestamp of a partition +// increasing after it has flushed all KV events previously received by that +// partition. +func (sip *streamIngestionProcessor) consumeEvents() (*jobspb.ResolvedSpan, error) { + for event := range sip.eventCh { + switch event.Type() { + case streamingccl.KVEvent: + kv := event.GetKV() + if kv == nil { + return nil, errors.New("kv event expected to have kv") + } + mvccKey := storage.MVCCKey{ + Key: kv.Key, + Timestamp: kv.Value.Timestamp, + } + sip.curBatch = append(sip.curBatch, storage.MVCCKeyValue{Key: mvccKey, Value: kv.Value.RawBytes}) + case streamingccl.CheckpointEvent: + // TODO: In addition to flushing when receiving a checkpoint event, we + // should also flush when we've buffered sufficient KVs. A buffering adder + // would save us here. + // + // TODO: Add a setting to control the max flush-rate. This would be a time + // interval to allow us to limit the number of flushes we do on + // checkpoints. + resolvedTimePtr := event.GetResolved() + if resolvedTimePtr == nil { + return nil, errors.New("checkpoint event expected to have a resolved timestamp") + } + resolvedTime := *resolvedTimePtr + if err := sip.flush(); err != nil { + return nil, errors.Wrap(err, "flushing") + } + + // Each partition is represented by a span defined by the + // partition address. + spanStartKey := roachpb.Key(event.partition) + return &jobspb.ResolvedSpan{ + Span: roachpb.Span{Key: spanStartKey, EndKey: spanStartKey.Next()}, + Timestamp: resolvedTime, + }, nil + default: + return nil, errors.Newf("unknown streaming event type %v", event.Type()) + } + } + + return nil, nil +} + func init() { rowexec.NewStreamIngestionDataProcessor = newStreamIngestionDataProcessor } From be9e8cc296acb3da606815bb0a92a039ba175699 Mon Sep 17 00:00:00 2001 From: Paul Bardea Date: Sun, 17 Jan 2021 18:51:26 -0500 Subject: [PATCH 4/7] streamclient: close event channels on context cancellation Stream clients now take in a context when opening an event stream for a given partition. To close the event stream returned by the client, the given context should be cancelled. Release note: None --- pkg/ccl/streamingccl/streamclient/client.go | 8 +++++- .../streamingccl/streamclient/client_test.go | 26 +++++++++++++++++-- .../streamclient/stream_client.go | 7 ++++- .../stream_ingestion_processor.go | 20 +++++++------- .../stream_ingestion_processor_test.go | 2 +- 5 files changed, 48 insertions(+), 15 deletions(-) diff --git a/pkg/ccl/streamingccl/streamclient/client.go b/pkg/ccl/streamingccl/streamclient/client.go index 6637c8d9d24e..51fec66532d8 100644 --- a/pkg/ccl/streamingccl/streamclient/client.go +++ b/pkg/ccl/streamingccl/streamclient/client.go @@ -9,6 +9,7 @@ package streamclient import ( + "context" "time" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" @@ -24,5 +25,10 @@ type Client interface { // ConsumePartition returns a channel on which we can start listening for // events from a given partition that occur after a startTime. - ConsumePartition(address streamingccl.PartitionAddress, startTime time.Time) (chan streamingccl.Event, error) + // + // Canceling the context will stop reading the partition and close the event + // channel. + // TODO: Add an error channel so that the client can report any errors + // encountered while reading the stream. + ConsumePartition(ctx context.Context, address streamingccl.PartitionAddress, startTime time.Time) (chan streamingccl.Event, error) } diff --git a/pkg/ccl/streamingccl/streamclient/client_test.go b/pkg/ccl/streamingccl/streamclient/client_test.go index 2dc2105cae47..174325bb9001 100644 --- a/pkg/ccl/streamingccl/streamclient/client_test.go +++ b/pkg/ccl/streamingccl/streamclient/client_test.go @@ -9,6 +9,7 @@ package streamclient import ( + "context" "testing" "time" @@ -35,7 +36,7 @@ func (sc testStreamClient) GetTopology( // ConsumePartition implements the Client interface. func (sc testStreamClient) ConsumePartition( - _ streamingccl.PartitionAddress, _ time.Time, + _ context.Context, _ streamingccl.PartitionAddress, _ time.Time, ) (chan streamingccl.Event, error) { sampleKV := roachpb.KeyValue{ Key: []byte("key_1"), @@ -56,6 +57,7 @@ func (sc testStreamClient) ConsumePartition( // TestExampleClientUsage serves as documentation to indicate how a stream // client could be used. func TestExampleClientUsage(t *testing.T) { + ctx := context.Background() client := testStreamClient{} sa := streamingccl.StreamAddress("s3://my_bucket/my_stream") topology, err := client.GetTopology(sa) @@ -65,7 +67,7 @@ func TestExampleClientUsage(t *testing.T) { numReceivedEvents := 0 for _, partition := range topology.Partitions { - eventCh, err := client.ConsumePartition(partition, startTimestamp) + eventCh, err := client.ConsumePartition(ctx, partition, startTimestamp) require.NoError(t, err) // This example looks for the closing of the channel to terminate the test, @@ -83,3 +85,23 @@ func TestExampleClientUsage(t *testing.T) { // We expect 4 events, 2 from each partition. require.Equal(t, 4, numReceivedEvents) } + +// Ensure that all implementations specified in this test properly close the +// eventChannel when the given context is canceled. +func TestImplementationsCloseChannel(t *testing.T) { + // TODO: Add SQL client and file client here when implemented. + impls := []Client{ + &client{}, + } + + for _, impl := range impls { + ctx, cancel := context.WithCancel(context.Background()) + eventCh, err := impl.ConsumePartition(ctx, "test://53/", timeutil.Now()) + require.NoError(t, err) + + // Ensure that the eventCh closes when the context is canceled. + cancel() + for range eventCh { + } + } +} diff --git a/pkg/ccl/streamingccl/streamclient/stream_client.go b/pkg/ccl/streamingccl/streamclient/stream_client.go index fa2196955d4f..b37803b76245 100644 --- a/pkg/ccl/streamingccl/streamclient/stream_client.go +++ b/pkg/ccl/streamingccl/streamclient/stream_client.go @@ -9,6 +9,7 @@ package streamclient import ( + "context" "time" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" @@ -33,8 +34,12 @@ func (m *client) GetTopology(_ streamingccl.StreamAddress) (streamingccl.Topolog // ConsumePartition implements the Client interface. func (m *client) ConsumePartition( - _ streamingccl.PartitionAddress, _ time.Time, + ctx context.Context, _ streamingccl.PartitionAddress, _ time.Time, ) (chan streamingccl.Event, error) { eventCh := make(chan streamingccl.Event) + go func() { + <-ctx.Done() + close(eventCh) + }() return eventCh, nil } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go index 288274d1d5dc..09b35e8eb80d 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go @@ -127,7 +127,7 @@ func (sip *streamIngestionProcessor) Start(ctx context.Context) context.Context startTime := timeutil.Unix(0 /* sec */, sip.spec.StartTime.WallTime) eventChs := make(map[streamingccl.PartitionAddress]chan streamingccl.Event) for _, partitionAddress := range sip.spec.PartitionAddresses { - eventCh, err := sip.client.ConsumePartition(partitionAddress, startTime) + eventCh, err := sip.client.ConsumePartition(ctx, partitionAddress, startTime) if err != nil { sip.ingestionErr = errors.Wrapf(err, "consuming partition %v", partitionAddress) } @@ -216,17 +216,17 @@ func merge( for partition, eventCh := range partitionStreams { go func(partition streamingccl.PartitionAddress, eventCh <-chan streamingccl.Event) { defer wg.Done() - for { + for event := range eventCh { + pe := partitionEvent{ + Event: event, + partition: partition, + } + select { - case event, ok := <-eventCh: - if !ok { - return - } - merged <- partitionEvent{ - Event: event, - partition: partition, - } + case merged <- pe: case <-ctx.Done(): + // TODO: Add ctx.Err() to an error channel once ConsumePartition + // supports an error ch. return } } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go index b1550359a710..e86f0ba3e7dc 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go @@ -49,7 +49,7 @@ func (m *mockStreamClient) GetTopology( // ConsumePartition implements the StreamClient interface. func (m *mockStreamClient) ConsumePartition( - _ streamingccl.PartitionAddress, _ time.Time, + _ context.Context, _ streamingccl.PartitionAddress, _ time.Time, ) (chan streamingccl.Event, error) { eventCh := make(chan streamingccl.Event, len(m.partitionEvents)) From baf89289c7f3301aac7016aea2d99feba43f2a07 Mon Sep 17 00:00:00 2001 From: Paul Bardea Date: Sun, 17 Jan 2021 19:29:36 -0500 Subject: [PATCH 5/7] streamclient: add random stream client This commit introduces a new stream client implementation that generates events of a specific schema for a table ID that is specified by the stream URI. Properties of the stream, such as the frequency of the events and the range of the randomly generated KVs can be controlled with the appropriate parameters specified in the stream address. To use the new stream client the `NewStreamClient` constructor has been modified to accept a stream address. The stream address allows the client to determine which client implementation should be used. Further, the addition of this client exposed a bug in the SST batcher which rejects batches that modify the same key more than once, even if disallowShadowing is set to false. Release note: None --- pkg/ccl/streamingccl/addresses.go | 7 + pkg/ccl/streamingccl/streamclient/BUILD.bazel | 16 +- pkg/ccl/streamingccl/streamclient/client.go | 22 ++ .../streamingccl/streamclient/client_test.go | 7 + .../streamclient/random_stream_client.go | 256 +++++++++++++++ .../streamclient/stream_client.go | 5 - pkg/ccl/streamingccl/streamingest/BUILD.bazel | 3 + .../streamingest/stream_ingestion_job.go | 12 +- .../stream_ingestion_processor.go | 8 +- .../stream_ingestion_processor_test.go | 229 +++++++++++--- pkg/kv/bulk/sst_batcher.go | 11 +- pkg/kv/kvserver/kvserverbase/bulk_adder.go | 6 +- pkg/sql/execinfrapb/processors_bulk_io.pb.go | 293 ++++++++++-------- pkg/sql/execinfrapb/processors_bulk_io.proto | 2 + 14 files changed, 689 insertions(+), 188 deletions(-) create mode 100644 pkg/ccl/streamingccl/streamclient/random_stream_client.go diff --git a/pkg/ccl/streamingccl/addresses.go b/pkg/ccl/streamingccl/addresses.go index 3b38112aa959..8153fd8d3f66 100644 --- a/pkg/ccl/streamingccl/addresses.go +++ b/pkg/ccl/streamingccl/addresses.go @@ -8,10 +8,17 @@ package streamingccl +import "net/url" + // StreamAddress is the location of the stream. The topology of a stream should // be resolvable given a stream address. type StreamAddress string +// URL parses the stream address as a URL. +func (sa StreamAddress) URL() (*url.URL, error) { + return url.Parse(string(sa)) +} + // PartitionAddress is the address where the stream client should be able to // read the events produced by a partition of a stream. // diff --git a/pkg/ccl/streamingccl/streamclient/BUILD.bazel b/pkg/ccl/streamingccl/streamclient/BUILD.bazel index ed0faeea4338..33b33eae133a 100644 --- a/pkg/ccl/streamingccl/streamclient/BUILD.bazel +++ b/pkg/ccl/streamingccl/streamclient/BUILD.bazel @@ -4,11 +4,25 @@ go_library( name = "streamclient", srcs = [ "client.go", + "random_stream_client.go", "stream_client.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient", visibility = ["//visibility:public"], - deps = ["//pkg/ccl/streamingccl"], + deps = [ + "//pkg/ccl/streamingccl", + "//pkg/keys", + "//pkg/roachpb", + "//pkg/sql", + "//pkg/sql/catalog/descpb", + "//pkg/sql/catalog/systemschema", + "//pkg/sql/catalog/tabledesc", + "//pkg/sql/rowenc", + "//pkg/sql/sem/tree", + "//pkg/util/hlc", + "//pkg/util/syncutil", + "//pkg/util/timeutil", + ], ) go_test( diff --git a/pkg/ccl/streamingccl/streamclient/client.go b/pkg/ccl/streamingccl/streamclient/client.go index 51fec66532d8..df6302ed36d1 100644 --- a/pkg/ccl/streamingccl/streamclient/client.go +++ b/pkg/ccl/streamingccl/streamclient/client.go @@ -32,3 +32,25 @@ type Client interface { // encountered while reading the stream. ConsumePartition(ctx context.Context, address streamingccl.PartitionAddress, startTime time.Time) (chan streamingccl.Event, error) } + +// NewStreamClient creates a new stream client based on the stream +// address. +func NewStreamClient(streamAddress streamingccl.StreamAddress) (Client, error) { + var streamClient Client + streamURL, err := streamAddress.URL() + if err != nil { + return streamClient, err + } + + switch streamURL.Scheme { + case TestScheme: + streamClient, err = newRandomStreamClient(streamURL) + if err != nil { + return streamClient, err + } + default: + streamClient = &client{} + } + + return streamClient, nil +} diff --git a/pkg/ccl/streamingccl/streamclient/client_test.go b/pkg/ccl/streamingccl/streamclient/client_test.go index 174325bb9001..054cab14a3cf 100644 --- a/pkg/ccl/streamingccl/streamclient/client_test.go +++ b/pkg/ccl/streamingccl/streamclient/client_test.go @@ -10,6 +10,7 @@ package streamclient import ( "context" + "net/url" "testing" "time" @@ -89,9 +90,15 @@ func TestExampleClientUsage(t *testing.T) { // Ensure that all implementations specified in this test properly close the // eventChannel when the given context is canceled. func TestImplementationsCloseChannel(t *testing.T) { + streamURL, err := url.Parse("test://52") + require.NoError(t, err) + randomClient, err := newRandomStreamClient(streamURL) + require.NoError(t, err) + // TODO: Add SQL client and file client here when implemented. impls := []Client{ &client{}, + randomClient, } for _, impl := range impls { diff --git a/pkg/ccl/streamingccl/streamclient/random_stream_client.go b/pkg/ccl/streamingccl/streamclient/random_stream_client.go new file mode 100644 index 000000000000..61e7a5b34bdb --- /dev/null +++ b/pkg/ccl/streamingccl/streamclient/random_stream_client.go @@ -0,0 +1,256 @@ +// Copyright 2020 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package streamclient + +import ( + "context" + "math/rand" + "net/url" + "strconv" + "time" + + "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" +) + +const ( + // RandomStreamSchema is the schema of the KVs emitted by the random stream + // client. + RandomStreamSchema = "CREATE TABLE test (k INT PRIMARY KEY, v INT)" + + // TestScheme is the URI scheme used to create a test load. + TestScheme = "test" + // ValueRangeKey controls the range of the randomly generated values produced + // by this workload. The workload will generate between 0 and this value. + ValueRangeKey = "VALUE_RANGE" + // KVFrequency is the frequency in nanoseconds that the stream will emit + // randomly generated KV events. + KVFrequency = "KV_FREQUENCY" + // KVsPerCheckpoint controls approximately how many KV events should be emitted + // between checkpoint events. + KVsPerCheckpoint = "KVS_PER_CHECKPOINT" +) + +// randomStreamConfig specifies the variables that controls the rate and type of +// events that the generated stream emits. +type randomStreamConfig struct { + valueRange int + kvFrequency time.Duration + kvsPerCheckpoint int +} + +func parseRandomStreamConfig(streamURL *url.URL) (randomStreamConfig, error) { + c := randomStreamConfig{ + valueRange: 100, + kvFrequency: 10 * time.Microsecond, + kvsPerCheckpoint: 100, + } + + var err error + if valueRangeStr := streamURL.Query().Get(ValueRangeKey); valueRangeStr != "" { + c.valueRange, err = strconv.Atoi(valueRangeStr) + if err != nil { + return c, err + } + } + + if kvFreqStr := streamURL.Query().Get(KVFrequency); kvFreqStr != "" { + kvFreq, err := strconv.Atoi(kvFreqStr) + c.kvFrequency = time.Duration(kvFreq) + if err != nil { + return c, err + } + } + + if kvsPerCheckpointStr := streamURL.Query().Get(KVsPerCheckpoint); kvsPerCheckpointStr != "" { + c.kvsPerCheckpoint, err = strconv.Atoi(kvsPerCheckpointStr) + if err != nil { + return c, err + } + } + + return c, nil +} + +// randomStreamClient is a temporary stream client implementation that generates +// random events. +// +// It expects a table with the schema `RandomStreamSchema` to already exist, +// with table ID `` to be used in the URI. Opening the stream client +// on the URI 'test://' will generate random events into this table. +// +// TODO: Move this over to a _test file in the ingestion package when there is a +// real stream client implementation. +type randomStreamClient struct { + baseDesc *tabledesc.Mutable + config randomStreamConfig + + // interceptors can be registered to peek at every event generated by this + // client. + mu struct { + syncutil.Mutex + + interceptors []func(streamingccl.Event) + } +} + +var _ Client = &randomStreamClient{} + +// newRandomStreamClient returns a stream client that generates a random set of +// events on a table with an integer key and integer value for the table with +// the given ID. +func newRandomStreamClient(streamURL *url.URL) (Client, error) { + tableID, err := strconv.Atoi(streamURL.Host) + if err != nil { + return nil, err + } + testTable, err := sql.CreateTestTableDescriptor( + context.Background(), + 50, /* defaultdb */ + descpb.ID(tableID), + RandomStreamSchema, + systemschema.JobsTable.Privileges, + ) + if err != nil { + return nil, err + } + + streamConfig, err := parseRandomStreamConfig(streamURL) + if err != nil { + return nil, err + } + + return &randomStreamClient{ + baseDesc: testTable, + config: streamConfig, + }, nil +} + +// GetTopology implements the Client interface. +func (m *randomStreamClient) GetTopology( + _ streamingccl.StreamAddress, +) (streamingccl.Topology, error) { + panic("not yet implemented") +} + +// ConsumePartition implements the Client interface. +func (m *randomStreamClient) ConsumePartition( + ctx context.Context, _ streamingccl.PartitionAddress, startTime time.Time, +) (chan streamingccl.Event, error) { + eventCh := make(chan streamingccl.Event) + now := timeutil.Now() + if startTime.After(now) { + panic("cannot start random stream client event stream in the future") + } + lastResolvedTime := startTime + + go func() { + defer close(eventCh) + + // rand is not thread safe, so create a random source for each partition. + r := rand.New(rand.NewSource(timeutil.Now().UnixNano())) + kvInterval := m.config.kvFrequency + resolvedInterval := kvInterval * time.Duration(m.config.kvsPerCheckpoint) + + kvTimer := timeutil.NewTimer() + kvTimer.Reset(0) + defer kvTimer.Stop() + + resolvedTimer := timeutil.NewTimer() + resolvedTimer.Reset(0) + defer resolvedTimer.Stop() + + for { + var event streamingccl.Event + select { + case <-kvTimer.C: + kvTimer.Read = true + event = streamingccl.MakeKVEvent(m.makeRandomKey(r, lastResolvedTime)) + kvTimer.Reset(kvInterval) + case <-resolvedTimer.C: + resolvedTimer.Read = true + resolvedTime := timeutil.Now() + hlcResolvedTime := hlc.Timestamp{WallTime: resolvedTime.UnixNano()} + event = streamingccl.MakeCheckpointEvent(hlcResolvedTime) + lastResolvedTime = resolvedTime + resolvedTimer.Reset(resolvedInterval) + } + + // TODO: Consider keeping an in-memory copy so that tests can verify + // that the data we've ingested is correct. + select { + case eventCh <- event: + case <-ctx.Done(): + return + } + + if len(m.mu.interceptors) > 0 { + m.mu.Lock() + for _, interceptor := range m.mu.interceptors { + if interceptor != nil { + interceptor(event) + } + } + m.mu.Unlock() + } + } + }() + + return eventCh, nil +} + +func (m *randomStreamClient) makeRandomKey(r *rand.Rand, minTs time.Time) roachpb.KeyValue { + tableDesc := m.baseDesc + + // Create a key holding a random integer. + k, err := rowenc.TestingMakePrimaryIndexKey(tableDesc, r.Intn(m.config.valueRange)) + if err != nil { + panic(err) + } + k = keys.MakeFamilyKey(k, uint32(tableDesc.Families[0].ID)) + + // Create a value holding a random integer. + valueDatum := tree.NewDInt(tree.DInt(r.Intn(m.config.valueRange))) + valueBuf, err := rowenc.EncodeTableValue( + []byte(nil), tableDesc.Columns[1].ID, valueDatum, []byte(nil)) + if err != nil { + panic(err) + } + var v roachpb.Value + v.SetTuple(valueBuf) + v.ClearChecksum() + v.InitChecksum(k) + + // Generate a timestamp between minTs and now(). + randOffset := int(timeutil.Now().UnixNano()) - int(minTs.UnixNano()) + newTimestamp := rand.Intn(randOffset) + int(minTs.UnixNano()) + v.Timestamp = hlc.Timestamp{WallTime: int64(newTimestamp)} + + return roachpb.KeyValue{ + Key: k, + Value: v, + } +} + +// RegisterInterception implements streamingest.interceptableStreamClient. +func (m *randomStreamClient) RegisterInterception(f func(event streamingccl.Event)) { + m.mu.Lock() + defer m.mu.Unlock() + m.mu.interceptors = append(m.mu.interceptors, f) +} diff --git a/pkg/ccl/streamingccl/streamclient/stream_client.go b/pkg/ccl/streamingccl/streamclient/stream_client.go index b37803b76245..81ebd251a09f 100644 --- a/pkg/ccl/streamingccl/streamclient/stream_client.go +++ b/pkg/ccl/streamingccl/streamclient/stream_client.go @@ -20,11 +20,6 @@ type client struct{} var _ Client = &client{} -// NewStreamClient returns a new mock stream client. -func NewStreamClient() Client { - return &client{} -} - // GetTopology implements the Client interface. func (m *client) GetTopology(_ streamingccl.StreamAddress) (streamingccl.Topology, error) { return streamingccl.Topology{ diff --git a/pkg/ccl/streamingccl/streamingest/BUILD.bazel b/pkg/ccl/streamingccl/streamingest/BUILD.bazel index f78786673122..e10347b69c98 100644 --- a/pkg/ccl/streamingccl/streamingest/BUILD.bazel +++ b/pkg/ccl/streamingccl/streamingest/BUILD.bazel @@ -52,6 +52,7 @@ go_test( "//pkg/jobs", "//pkg/jobs/jobspb", "//pkg/keys", + "//pkg/kv", "//pkg/roachpb", "//pkg/security", "//pkg/security/securitytest", @@ -60,12 +61,14 @@ go_test( "//pkg/sql/execinfra", "//pkg/sql/execinfrapb", "//pkg/sql/sem/tree", + "//pkg/testutils", "//pkg/testutils/distsqlutils", "//pkg/testutils/serverutils", "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", "//pkg/util/hlc", "//pkg/util/leaktest", + "//pkg/util/log", "//pkg/util/protoutil", "//pkg/util/randutil", "//pkg/util/timeutil", diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go index 541c82b9fd8b..8e4931a11432 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go @@ -30,13 +30,15 @@ type streamIngestionResumer struct { func ingest( ctx context.Context, execCtx sql.JobExecContext, - streamAddress streamingccl.PartitionAddress, + streamAddress streamingccl.StreamAddress, job *jobs.Job, ) error { // Initialize a stream client and resolve topology. - client := streamclient.NewStreamClient() - sa := streamingccl.StreamAddress(streamAddress) - topology, err := client.GetTopology(sa) + client, err := streamclient.NewStreamClient(streamAddress) + if err != nil { + return err + } + topology, err := client.GetTopology(streamAddress) if err != nil { return err } @@ -73,7 +75,7 @@ func (s *streamIngestionResumer) Resume( details := s.job.Details().(jobspb.StreamIngestionDetails) p := execCtx.(sql.JobExecContext) - err := ingest(ctx, p, streamingccl.PartitionAddress(details.StreamAddress), s.job) + err := ingest(ctx, p, details.StreamAddress, s.job) if err != nil { return err } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go index 09b35e8eb80d..05b5786100c9 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go @@ -88,17 +88,21 @@ func newStreamIngestionDataProcessor( post *execinfrapb.PostProcessSpec, output execinfra.RowReceiver, ) (execinfra.Processor, error) { + streamClient, err := streamclient.NewStreamClient(spec.StreamAddress) + if err != nil { + return nil, err + } + sip := &streamIngestionProcessor{ flowCtx: flowCtx, spec: spec, output: output, curBatch: make([]storage.MVCCKeyValue, 0), - client: streamclient.NewStreamClient(), + client: streamClient, } evalCtx := flowCtx.EvalCtx db := flowCtx.Cfg.DB - var err error sip.batcher, err = bulk.MakeStreamSSTBatcher(sip.Ctx, db, evalCtx.Settings, func() int64 { return storageccl.MaxImportBatchSize(evalCtx.Settings) }) if err != nil { diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go index e86f0ba3e7dc..581ee81a7420 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go @@ -11,7 +11,7 @@ package streamingest import ( "context" "fmt" - "sync" + "strconv" "testing" "time" @@ -19,19 +19,30 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/distsqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/stretchr/testify/require" ) +type interceptableStreamClient interface { + streamclient.Client + + RegisterInterception(func(event streamingccl.Event)) +} + // mockStreamClient will always return the given slice of events when consuming // a stream partition. type mockStreamClient struct { @@ -61,14 +72,157 @@ func (m *mockStreamClient) ConsumePartition( return eventCh, nil } +// Close implements the StreamClient interface. +func (m *mockStreamClient) Close() {} + func TestStreamIngestionProcessor(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() tc := testcluster.StartTestCluster(t, 3 /* nodes */, base.TestClusterArgs{}) - defer tc.Stopper().Stop(context.Background()) + defer tc.Stopper().Stop(ctx) kvDB := tc.Server(0).DB() + v := roachpb.MakeValueFromString("value_1") + v.Timestamp = hlc.Timestamp{WallTime: 1} + sampleKV := roachpb.KeyValue{Key: roachpb.Key("key_1"), Value: v} + mockClient := &mockStreamClient{ + partitionEvents: []streamingccl.Event{ + streamingccl.MakeKVEvent(sampleKV), + streamingccl.MakeKVEvent(sampleKV), + streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 1}), + streamingccl.MakeKVEvent(sampleKV), + streamingccl.MakeKVEvent(sampleKV), + streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 4}), + }, + } + + startTime := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} + out, err := runStreamIngestionProcessor(ctx, t, kvDB, "some://stream", startTime, + nil /* interceptors */, mockClient) + require.NoError(t, err) + + // Compare the set of results since the ordering is not guaranteed. + expectedRows := map[string]struct{}{ + "partition1{-\\x00} 0.000000001,0": {}, + "partition1{-\\x00} 0.000000004,0": {}, + "partition2{-\\x00} 0.000000001,0": {}, + "partition2{-\\x00} 0.000000004,0": {}, + } + actualRows := make(map[string]struct{}) + for { + row := out.NextNoMeta(t) + if row == nil { + break + } + datum := row[0].Datum + protoBytes, ok := datum.(*tree.DBytes) + require.True(t, ok) + + var resolvedSpan jobspb.ResolvedSpan + require.NoError(t, protoutil.Unmarshal([]byte(*protoBytes), &resolvedSpan)) + + actualRows[fmt.Sprintf("%s %s", resolvedSpan.Span, resolvedSpan.Timestamp)] = struct{}{} + } + + require.Equal(t, expectedRows, actualRows) +} + +// TestRandomClientGeneration tests the ingestion processor against a random +// stream workload. +func TestRandomClientGeneration(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + makeTestStreamURI := func( + tableID string, + valueRange, kvsPerResolved int, + kvFrequency time.Duration, + ) string { + return "test://" + tableID + "?VALUE_RANGE=" + strconv.Itoa(valueRange) + + "&KV_FREQUENCY=" + strconv.Itoa(int(kvFrequency)) + + "&KVS_PER_RESOLVED=" + strconv.Itoa(kvsPerResolved) + } + + tc := testcluster.StartTestCluster(t, 3 /* nodes */, base.TestClusterArgs{}) + defer tc.Stopper().Stop(ctx) + kvDB := tc.Server(0).DB() + conn := tc.Conns[0] + sqlDB := sqlutils.MakeSQLRunner(conn) + + // Create the expected table for the random stream to ingest into. + sqlDB.Exec(t, streamclient.RandomStreamSchema) + tableID := sqlDB.QueryStr(t, `SELECT id FROM system.namespace WHERE name = 'test'`)[0][0] + + // TODO: Consider testing variations on these parameters. + valueRange := 100 + kvsPerResolved := 1_000 + kvFrequency := 50 * time.Nanosecond + streamAddr := makeTestStreamURI(tableID, valueRange, kvsPerResolved, kvFrequency) + + startTime := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} + + ctx, cancel := context.WithCancel(ctx) + // Cancel the flow after emitting 1000 checkpoint events from the client. + cancelAfterCheckpoints := makeCheckpointEventCounter(1_000, cancel) + out, err := runStreamIngestionProcessor(ctx, t, kvDB, streamAddr, startTime, + cancelAfterCheckpoints, nil /* mockClient */) + require.NoError(t, err) + + p1Key := roachpb.Key("partition1") + p2Key := roachpb.Key("partition2") + p1Span := roachpb.Span{Key: p1Key, EndKey: p1Key.Next()} + p2Span := roachpb.Span{Key: p2Key, EndKey: p2Key.Next()} + numResolvedEvents := 0 + for { + row, meta := out.Next() + if meta != nil { + // The flow may fail with a context cancellation error if the processor + // was cut of during flushing. + if !testutils.IsError(meta.Err, "context canceled") { + t.Fatalf("unexpected meta error %v", meta.Err) + } + } + if row == nil { + break + } + datum := row[0].Datum + protoBytes, ok := datum.(*tree.DBytes) + require.True(t, ok) + + var resolvedSpan jobspb.ResolvedSpan + require.NoError(t, protoutil.Unmarshal([]byte(*protoBytes), &resolvedSpan)) + + if resolvedSpan.Span.String() != p1Span.String() && resolvedSpan.Span.String() != p2Span.String() { + t.Fatalf("expected resolved span %v to be either %v or %v", resolvedSpan.Span, p1Span, p2Span) + } + + // All resolved timestamp events should be greater than the start time. + require.Less(t, startTime.WallTime, resolvedSpan.Timestamp.WallTime) + numResolvedEvents++ + } + + // Check that some rows have been ingested and that we've emitted some resolved events. + numRows, err := strconv.Atoi(sqlDB.QueryStr(t, `SELECT count(*) FROM defaultdb.test`)[0][0]) + require.NoError(t, err) + require.Greater(t, numRows, 0, "at least 1 row ingested expected") + + require.Greater(t, numResolvedEvents, 0, "at least 1 resolved event expected") +} + +func runStreamIngestionProcessor( + ctx context.Context, + t *testing.T, + kvDB *kv.DB, + streamAddr string, + startTime hlc.Timestamp, + interceptEvents func(streamingccl.Event), + mockClient streamclient.Client, +) (*distsqlutils.RowBuffer, error) { st := cluster.MakeTestingClusterSettings() evalCtx := tree.MakeTestingEvalContext(st) @@ -84,64 +238,55 @@ func TestStreamIngestionProcessor(t *testing.T) { EvalCtx: &evalCtx, } - var wg sync.WaitGroup out := &distsqlutils.RowBuffer{} post := execinfrapb.PostProcessSpec{} var spec execinfrapb.StreamIngestionDataSpec - spec.PartitionAddresses = []streamingccl.PartitionAddress{"s3://my_streams/stream/partition1", "s3://my_streams/stream/partition2"} - proc, err := newStreamIngestionDataProcessor(&flowCtx, 0 /* processorID */, spec, &post, out) + spec.StreamAddress = streamingccl.StreamAddress(streamAddr) + + spec.PartitionAddresses = []streamingccl.PartitionAddress{"partition1", "partition2"} + spec.StartTime = startTime + processorID := int32(0) + proc, err := newStreamIngestionDataProcessor(&flowCtx, processorID, spec, &post, out) require.NoError(t, err) sip, ok := proc.(*streamIngestionProcessor) if !ok { t.Fatal("expected the processor that's created to be a split and scatter processor") } - // Inject a mock client. - v := roachpb.MakeValueFromString("value_1") - v.Timestamp = hlc.Timestamp{WallTime: 1} - sampleKV := roachpb.KeyValue{Key: roachpb.Key("key_1"), Value: v} - sip.client = &mockStreamClient{ - partitionEvents: []streamingccl.Event{ - streamingccl.MakeKVEvent(sampleKV), - streamingccl.MakeKVEvent(sampleKV), - streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 1}), - streamingccl.MakeKVEvent(sampleKV), - streamingccl.MakeKVEvent(sampleKV), - streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 4}), - }, + if mockClient != nil { + sip.client = mockClient + } + + if interceptableClient, ok := sip.client.(interceptableStreamClient); ok { + interceptableClient.RegisterInterception(interceptEvents) + // TODO: Inject an interceptor here that keeps track of generated events so + // we can compare. + } else if interceptEvents != nil { + t.Fatalf("interceptor specified, but client %T does not implement interceptableStreamClient", + sip.client) } - sip.Run(context.Background()) - wg.Wait() + sip.Run(ctx) // Ensure that all the outputs are properly closed. if !out.ProducerClosed() { t.Fatalf("output RowReceiver not closed") } + return out, err +} - // Compare the set of results since the ordering is not guaranteed. - expectedRows := map[string]struct{}{ - "s3://my_streams/stream/partition1{-\\x00} 0.000000001,0": {}, - "s3://my_streams/stream/partition1{-\\x00} 0.000000004,0": {}, - "s3://my_streams/stream/partition2{-\\x00} 0.000000001,0": {}, - "s3://my_streams/stream/partition2{-\\x00} 0.000000004,0": {}, - } - actualRows := make(map[string]struct{}) - for { - row := out.NextNoMeta(t) - if row == nil { - break +// makeCheckpointEventCounter runs f after seeing `threshold` number of +// checkpoint events. +func makeCheckpointEventCounter(threshold int, f func()) func(streamingccl.Event) { + numCheckpointEventsGenerated := 0 + return func(event streamingccl.Event) { + switch event.Type() { + case streamingccl.CheckpointEvent: + numCheckpointEventsGenerated++ + if numCheckpointEventsGenerated > threshold { + f() + } } - datum := row[0].Datum - protoBytes, ok := datum.(*tree.DBytes) - require.True(t, ok) - - var resolvedSpan jobspb.ResolvedSpan - require.NoError(t, protoutil.Unmarshal([]byte(*protoBytes), &resolvedSpan)) - - actualRows[fmt.Sprintf("%s %s", resolvedSpan.Span, resolvedSpan.Timestamp)] = struct{}{} } - - require.Equal(t, expectedRows, actualRows) } diff --git a/pkg/kv/bulk/sst_batcher.go b/pkg/kv/bulk/sst_batcher.go index edbc30c735fe..09d0e94e7d66 100644 --- a/pkg/kv/bulk/sst_batcher.go +++ b/pkg/kv/bulk/sst_batcher.go @@ -64,12 +64,17 @@ type SSTBatcher struct { // which are the same, will all correspond to the same kv in the inverted // index. The method which generates these kvs does not dedup, thus we rely on // the SSTBatcher to dedup them (by skipping), rather than throwing a - // DuplicateKeyError. This is also true when used with IMPORT. Import + // DuplicateKeyError. + // This is also true when used with IMPORT. Import // generally prohibits the ingestion of KVs which will shadow existing data, // with the exception of duplicates having the same value and timestamp. To // maintain uniform behavior, duplicates in the same batch with equal values // will not raise a DuplicateKeyError. skipDuplicates bool + // ingestAll can only be set when disallowShadowing and skipDuplicates are + // false. It will never return a duplicateKey error and continue ingesting all + // data provided to it. + ingestAll bool // The rest of the fields accumulated state as opposed to configuration. Some, // like totalRows, are accumulated _across_ batches and are not reset between @@ -117,7 +122,7 @@ func MakeSSTBatcher( func MakeStreamSSTBatcher( ctx context.Context, db SSTSender, settings *cluster.Settings, flushBytes func() int64, ) (*SSTBatcher, error) { - b := &SSTBatcher{db: db, settings: settings, maxSize: flushBytes, disallowShadowing: false, skipDuplicates: true} + b := &SSTBatcher{db: db, settings: settings, maxSize: flushBytes, ingestAll: true} err := b.Reset(ctx) return b, err } @@ -143,7 +148,7 @@ func (b *SSTBatcher) updateMVCCStats(key storage.MVCCKey, value []byte) { // keys -- like RESTORE where we want the restored data to look the like backup. // Keys must be added in order. func (b *SSTBatcher) AddMVCCKey(ctx context.Context, key storage.MVCCKey, value []byte) error { - if len(b.batchEndKey) > 0 && bytes.Equal(b.batchEndKey, key.Key) { + if len(b.batchEndKey) > 0 && bytes.Equal(b.batchEndKey, key.Key) && !b.ingestAll { if b.skipDuplicates && bytes.Equal(b.batchEndValue, value) { return nil } diff --git a/pkg/kv/kvserver/kvserverbase/bulk_adder.go b/pkg/kv/kvserver/kvserverbase/bulk_adder.go index 563054da7d3d..4f8f686093a8 100644 --- a/pkg/kv/kvserver/kvserverbase/bulk_adder.go +++ b/pkg/kv/kvserver/kvserverbase/bulk_adder.go @@ -47,10 +47,10 @@ type BulkAdderOptions struct { // BulkAdder buffer if the memory monitor permits. StepBufferSize int64 - // SkipLocalDuplicates configures handling of duplicate keys within a local - // sorted batch. When true if the same key/value pair is added more than once + // SkipDuplicates configures handling of duplicate keys within a local sorted + // batch. When true if the same key/value pair is added more than once // subsequent additions will be ignored instead of producing an error. If an - // attempt to add the same key has a differnet value, it is always an error. + // attempt to add the same key has a different value, it is always an error. // Once a batch is flushed – explicitly or automatically – local duplicate // detection does not apply. SkipDuplicates bool diff --git a/pkg/sql/execinfrapb/processors_bulk_io.pb.go b/pkg/sql/execinfrapb/processors_bulk_io.pb.go index d84221ebeb82..4560f871184b 100644 --- a/pkg/sql/execinfrapb/processors_bulk_io.pb.go +++ b/pkg/sql/execinfrapb/processors_bulk_io.pb.go @@ -72,7 +72,7 @@ func (x *FileCompression) UnmarshalJSON(data []byte) error { return nil } func (FileCompression) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_processors_bulk_io_fd790481d663b450, []int{0} + return fileDescriptor_processors_bulk_io_791817333cf840e0, []int{0} } type BackfillerSpec_Type int32 @@ -111,7 +111,7 @@ func (x *BackfillerSpec_Type) UnmarshalJSON(data []byte) error { return nil } func (BackfillerSpec_Type) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_processors_bulk_io_fd790481d663b450, []int{0, 0} + return fileDescriptor_processors_bulk_io_791817333cf840e0, []int{0, 0} } // BackfillerSpec is the specification for a "schema change backfiller". @@ -143,7 +143,7 @@ func (m *BackfillerSpec) Reset() { *m = BackfillerSpec{} } func (m *BackfillerSpec) String() string { return proto.CompactTextString(m) } func (*BackfillerSpec) ProtoMessage() {} func (*BackfillerSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_bulk_io_fd790481d663b450, []int{0} + return fileDescriptor_processors_bulk_io_791817333cf840e0, []int{0} } func (m *BackfillerSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -183,7 +183,7 @@ func (m *JobProgress) Reset() { *m = JobProgress{} } func (m *JobProgress) String() string { return proto.CompactTextString(m) } func (*JobProgress) ProtoMessage() {} func (*JobProgress) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_bulk_io_fd790481d663b450, []int{1} + return fileDescriptor_processors_bulk_io_791817333cf840e0, []int{1} } func (m *JobProgress) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -246,7 +246,7 @@ func (m *ReadImportDataSpec) Reset() { *m = ReadImportDataSpec{} } func (m *ReadImportDataSpec) String() string { return proto.CompactTextString(m) } func (*ReadImportDataSpec) ProtoMessage() {} func (*ReadImportDataSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_bulk_io_fd790481d663b450, []int{2} + return fileDescriptor_processors_bulk_io_791817333cf840e0, []int{2} } func (m *ReadImportDataSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -284,7 +284,7 @@ func (m *ReadImportDataSpec_ImportTable) Reset() { *m = ReadImportDataSp func (m *ReadImportDataSpec_ImportTable) String() string { return proto.CompactTextString(m) } func (*ReadImportDataSpec_ImportTable) ProtoMessage() {} func (*ReadImportDataSpec_ImportTable) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_bulk_io_fd790481d663b450, []int{2, 0} + return fileDescriptor_processors_bulk_io_791817333cf840e0, []int{2, 0} } func (m *ReadImportDataSpec_ImportTable) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -315,13 +315,15 @@ type StreamIngestionDataSpec struct { PartitionAddresses []github_com_cockroachdb_cockroach_pkg_ccl_streamingccl.PartitionAddress `protobuf:"bytes,1,rep,name=partition_addresses,json=partitionAddresses,customtype=github.com/cockroachdb/cockroach/pkg/ccl/streamingccl.PartitionAddress" json:"partition_addresses"` // The processor will ingest events from StartTime onwards. StartTime hlc.Timestamp `protobuf:"bytes,2,opt,name=start_time,json=startTime" json:"start_time"` + // StreamAddress locate the stream so that a stream client can be initialized. + StreamAddress github_com_cockroachdb_cockroach_pkg_ccl_streamingccl.StreamAddress `protobuf:"bytes,3,opt,name=stream_address,json=streamAddress,customtype=github.com/cockroachdb/cockroach/pkg/ccl/streamingccl.StreamAddress" json:"stream_address"` } func (m *StreamIngestionDataSpec) Reset() { *m = StreamIngestionDataSpec{} } func (m *StreamIngestionDataSpec) String() string { return proto.CompactTextString(m) } func (*StreamIngestionDataSpec) ProtoMessage() {} func (*StreamIngestionDataSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_bulk_io_fd790481d663b450, []int{3} + return fileDescriptor_processors_bulk_io_791817333cf840e0, []int{3} } func (m *StreamIngestionDataSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -367,7 +369,7 @@ func (m *BackupDataSpec) Reset() { *m = BackupDataSpec{} } func (m *BackupDataSpec) String() string { return proto.CompactTextString(m) } func (*BackupDataSpec) ProtoMessage() {} func (*BackupDataSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_bulk_io_fd790481d663b450, []int{4} + return fileDescriptor_processors_bulk_io_791817333cf840e0, []int{4} } func (m *BackupDataSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -406,7 +408,7 @@ func (m *RestoreSpanEntry) Reset() { *m = RestoreSpanEntry{} } func (m *RestoreSpanEntry) String() string { return proto.CompactTextString(m) } func (*RestoreSpanEntry) ProtoMessage() {} func (*RestoreSpanEntry) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_bulk_io_fd790481d663b450, []int{5} + return fileDescriptor_processors_bulk_io_791817333cf840e0, []int{5} } func (m *RestoreSpanEntry) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -444,7 +446,7 @@ func (m *RestoreDataSpec) Reset() { *m = RestoreDataSpec{} } func (m *RestoreDataSpec) String() string { return proto.CompactTextString(m) } func (*RestoreDataSpec) ProtoMessage() {} func (*RestoreDataSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_bulk_io_fd790481d663b450, []int{6} + return fileDescriptor_processors_bulk_io_791817333cf840e0, []int{6} } func (m *RestoreDataSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -478,7 +480,7 @@ func (m *SplitAndScatterSpec) Reset() { *m = SplitAndScatterSpec{} } func (m *SplitAndScatterSpec) String() string { return proto.CompactTextString(m) } func (*SplitAndScatterSpec) ProtoMessage() {} func (*SplitAndScatterSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_bulk_io_fd790481d663b450, []int{7} + return fileDescriptor_processors_bulk_io_791817333cf840e0, []int{7} } func (m *SplitAndScatterSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -511,7 +513,7 @@ func (m *SplitAndScatterSpec_RestoreEntryChunk) Reset() { *m = SplitAndS func (m *SplitAndScatterSpec_RestoreEntryChunk) String() string { return proto.CompactTextString(m) } func (*SplitAndScatterSpec_RestoreEntryChunk) ProtoMessage() {} func (*SplitAndScatterSpec_RestoreEntryChunk) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_bulk_io_fd790481d663b450, []int{7, 0} + return fileDescriptor_processors_bulk_io_791817333cf840e0, []int{7, 0} } func (m *SplitAndScatterSpec_RestoreEntryChunk) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -558,7 +560,7 @@ func (m *CSVWriterSpec) Reset() { *m = CSVWriterSpec{} } func (m *CSVWriterSpec) String() string { return proto.CompactTextString(m) } func (*CSVWriterSpec) ProtoMessage() {} func (*CSVWriterSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_bulk_io_fd790481d663b450, []int{8} + return fileDescriptor_processors_bulk_io_791817333cf840e0, []int{8} } func (m *CSVWriterSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -593,7 +595,7 @@ func (m *BulkRowWriterSpec) Reset() { *m = BulkRowWriterSpec{} } func (m *BulkRowWriterSpec) String() string { return proto.CompactTextString(m) } func (*BulkRowWriterSpec) ProtoMessage() {} func (*BulkRowWriterSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_bulk_io_fd790481d663b450, []int{9} + return fileDescriptor_processors_bulk_io_791817333cf840e0, []int{9} } func (m *BulkRowWriterSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -933,6 +935,10 @@ func (m *StreamIngestionDataSpec) MarshalTo(dAtA []byte) (int, error) { return 0, err } i += n7 + dAtA[i] = 0x1a + i++ + i = encodeVarintProcessorsBulkIo(dAtA, i, uint64(len(m.StreamAddress))) + i += copy(dAtA[i:], m.StreamAddress) return i, nil } @@ -1442,6 +1448,8 @@ func (m *StreamIngestionDataSpec) Size() (n int) { } l = m.StartTime.Size() n += 1 + l + sovProcessorsBulkIo(uint64(l)) + l = len(m.StreamAddress) + n += 1 + l + sovProcessorsBulkIo(uint64(l)) return n } @@ -2660,6 +2668,35 @@ func (m *StreamIngestionDataSpec) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field StreamAddress", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowProcessorsBulkIo + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthProcessorsBulkIo + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.StreamAddress = github_com_cockroachdb_cockroach_pkg_ccl_streamingccl.StreamAddress(dAtA[iNdEx:postIndex]) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipProcessorsBulkIo(dAtA[iNdEx:]) @@ -4135,119 +4172,121 @@ var ( ) func init() { - proto.RegisterFile("sql/execinfrapb/processors_bulk_io.proto", fileDescriptor_processors_bulk_io_fd790481d663b450) + proto.RegisterFile("sql/execinfrapb/processors_bulk_io.proto", fileDescriptor_processors_bulk_io_791817333cf840e0) } -var fileDescriptor_processors_bulk_io_fd790481d663b450 = []byte{ - // 1755 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x58, 0xcf, 0x6e, 0x1b, 0xb9, - 0x19, 0xf7, 0x48, 0x23, 0x59, 0xfa, 0x14, 0xdb, 0x32, 0x93, 0xdd, 0x9d, 0x1a, 0xa8, 0x6d, 0x68, - 0xd7, 0xa9, 0x9a, 0x22, 0x12, 0x36, 0x69, 0x8b, 0xa0, 0xed, 0x6e, 0x1a, 0xc9, 0x71, 0x56, 0xce, - 0x6e, 0xe2, 0x8e, 0x62, 0x2f, 0xb0, 0x68, 0x31, 0xa0, 0x66, 0x68, 0x99, 0xd1, 0x68, 0x38, 0x26, - 0x39, 0x4e, 0x94, 0x4b, 0x0b, 0xf4, 0xd4, 0x5b, 0x1f, 0xa1, 0x6f, 0xd0, 0xbe, 0x43, 0x2f, 0x39, - 0xee, 0x71, 0xd1, 0x83, 0xd1, 0x3a, 0x6f, 0xd1, 0x53, 0x41, 0x72, 0x46, 0x1e, 0x3b, 0xb6, 0x63, - 0x37, 0xc8, 0x45, 0x99, 0x90, 0xdf, 0xef, 0xc7, 0xef, 0xff, 0x47, 0x1a, 0x9a, 0x62, 0x3f, 0x6c, - 0x93, 0x97, 0xc4, 0xa7, 0xd1, 0x2e, 0xc7, 0xf1, 0xa0, 0x1d, 0x73, 0xe6, 0x13, 0x21, 0x18, 0x17, - 0xde, 0x20, 0x09, 0x47, 0x1e, 0x65, 0xad, 0x98, 0x33, 0xc9, 0x90, 0xe3, 0x33, 0x7f, 0xc4, 0x19, - 0xf6, 0xf7, 0x5a, 0x62, 0x3f, 0x6c, 0x05, 0x54, 0x48, 0xb1, 0x1f, 0xf2, 0x24, 0x5a, 0xfa, 0xf8, - 0x39, 0x1b, 0x88, 0xb6, 0xfa, 0x89, 0x07, 0xfa, 0x1f, 0x83, 0x58, 0x72, 0xb4, 0x74, 0x3c, 0x68, - 0x53, 0x76, 0x7b, 0x97, 0xf1, 0x31, 0x96, 0xd9, 0xce, 0xa7, 0xea, 0x54, 0x1f, 0x4b, 0x1c, 0xb2, - 0x61, 0x3b, 0x20, 0xc2, 0x8f, 0x07, 0x6d, 0x21, 0x79, 0xe2, 0xcb, 0x84, 0x93, 0x20, 0x15, 0x5a, - 0xbb, 0x48, 0x35, 0x2c, 0x48, 0x76, 0x4a, 0x22, 0x69, 0xd8, 0xde, 0x0b, 0xfd, 0xb6, 0xa4, 0x63, - 0x22, 0x24, 0x1e, 0xc7, 0xe9, 0xce, 0x8d, 0x21, 0x1b, 0x32, 0xfd, 0xd9, 0x56, 0x5f, 0xe9, 0x2a, - 0xca, 0xb4, 0x0a, 0xb0, 0xc4, 0xe9, 0xda, 0x62, 0xb6, 0x86, 0x63, 0x6a, 0x96, 0x1a, 0x7f, 0x2f, - 0xc2, 0x7c, 0x07, 0xfb, 0xa3, 0x5d, 0x1a, 0x86, 0x84, 0xf7, 0x63, 0xe2, 0xa3, 0x47, 0x60, 0xcb, - 0x49, 0x4c, 0x1c, 0x6b, 0xd5, 0x6a, 0xce, 0xdf, 0xb9, 0xdd, 0x3a, 0xcf, 0x21, 0xad, 0x93, 0xb8, - 0xd6, 0xb3, 0x49, 0x4c, 0x3a, 0xf6, 0xeb, 0xc3, 0x95, 0x19, 0x57, 0x13, 0xa0, 0x0e, 0x94, 0x24, - 0x1e, 0x84, 0xc4, 0x29, 0xac, 0x5a, 0xcd, 0xda, 0x9d, 0x9b, 0xa7, 0x98, 0xc4, 0x7e, 0xa8, 0xed, - 0x7b, 0xa6, 0x64, 0xd6, 0x89, 0xf0, 0x39, 0x8d, 0x25, 0xe3, 0x29, 0x85, 0x81, 0xa2, 0x87, 0x50, - 0x12, 0x31, 0x8e, 0x84, 0x53, 0x5c, 0x2d, 0x36, 0x6b, 0x77, 0x7e, 0x7a, 0xbe, 0x36, 0x9a, 0xc6, - 0x25, 0x38, 0x50, 0xea, 0xe0, 0x28, 0xa3, 0xd1, 0x68, 0xf4, 0x39, 0x54, 0x82, 0x84, 0x63, 0x49, - 0x59, 0xe4, 0xd8, 0xab, 0x56, 0xb3, 0xd8, 0xf9, 0x48, 0x6d, 0xff, 0xf7, 0x70, 0x65, 0x4e, 0xb9, - 0xb3, 0xb5, 0x9e, 0x6e, 0xba, 0x53, 0x31, 0xf4, 0x29, 0x80, 0xbf, 0x97, 0x44, 0x23, 0x4f, 0xd0, - 0x57, 0xc4, 0x29, 0x69, 0x90, 0xe1, 0xac, 0xea, 0xf5, 0x3e, 0x7d, 0x45, 0xd0, 0x7d, 0xa8, 0x70, - 0x82, 0x83, 0x07, 0xe2, 0xe9, 0xae, 0x33, 0xab, 0xad, 0xfc, 0x71, 0x4e, 0x43, 0x15, 0xb2, 0xd6, - 0x5e, 0xe8, 0xb7, 0x9e, 0x65, 0x21, 0x4b, 0x19, 0xa6, 0xa0, 0xc6, 0x2d, 0xb0, 0x95, 0xdf, 0x50, - 0x0d, 0x66, 0x7b, 0xd1, 0x01, 0x0e, 0x69, 0x50, 0x9f, 0x41, 0x00, 0xe5, 0x2e, 0x0b, 0x93, 0x71, - 0x54, 0xb7, 0x50, 0x15, 0x4a, 0xbd, 0x28, 0x20, 0x2f, 0xeb, 0x85, 0x4d, 0xbb, 0x52, 0xae, 0xcf, - 0x36, 0x5e, 0x40, 0x6d, 0x93, 0x0d, 0xb6, 0x38, 0x1b, 0x72, 0x22, 0x04, 0xfa, 0x0c, 0xca, 0xcf, - 0xd9, 0xc0, 0xa3, 0x81, 0x8e, 0x57, 0xb1, 0x33, 0xa7, 0x0e, 0x38, 0x3a, 0x5c, 0x29, 0x6d, 0xb2, - 0x41, 0x6f, 0xdd, 0x2d, 0x3d, 0x67, 0x83, 0x5e, 0x80, 0x9a, 0x70, 0xcd, 0x67, 0x91, 0xe4, 0x74, - 0x90, 0x68, 0x1f, 0xa8, 0x88, 0x14, 0x52, 0x65, 0x4e, 0xec, 0x20, 0x07, 0x6c, 0x11, 0x32, 0xe9, - 0x14, 0x57, 0xad, 0x66, 0x29, 0x0b, 0xa7, 0x5a, 0x69, 0xbc, 0xae, 0x00, 0x52, 0xfe, 0xed, 0x8d, - 0x63, 0xc6, 0xe5, 0x3a, 0x96, 0x58, 0xa7, 0xcb, 0x1a, 0xd4, 0x04, 0x1e, 0xc7, 0x21, 0x31, 0x8e, - 0x2a, 0xe4, 0x70, 0x60, 0x36, 0xb4, 0xa7, 0x1e, 0x41, 0x25, 0x4e, 0x75, 0x76, 0xca, 0xda, 0x53, - 0x6b, 0xe7, 0xc7, 0x32, 0x67, 0x60, 0xe6, 0xb1, 0x0c, 0x8c, 0x1e, 0x41, 0x31, 0xe1, 0xd4, 0x99, - 0xd5, 0xf9, 0xf0, 0x8b, 0xf3, 0x39, 0xde, 0x56, 0xb5, 0xb5, 0xcd, 0xe9, 0xc3, 0x48, 0xf2, 0x89, - 0xab, 0x18, 0xd0, 0x17, 0x50, 0x36, 0xe5, 0xea, 0x54, 0xb4, 0x3e, 0x2b, 0x39, 0xae, 0xb4, 0x50, - 0x5a, 0xbd, 0xa7, 0x1b, 0x34, 0x24, 0x1b, 0x5a, 0x2c, 0xd5, 0x24, 0x05, 0xa1, 0x1d, 0x28, 0xeb, - 0x14, 0x15, 0x4e, 0x55, 0xab, 0x72, 0xef, 0x4a, 0xaa, 0xe8, 0x6c, 0x15, 0x5a, 0x1b, 0xcd, 0x6b, - 0xb9, 0x29, 0x1b, 0xba, 0x0f, 0x3f, 0x12, 0x23, 0x1a, 0x7b, 0x63, 0x2a, 0x04, 0x8d, 0x86, 0xde, - 0x2e, 0xe3, 0x84, 0x0e, 0x23, 0x6f, 0x44, 0x26, 0xc2, 0x81, 0x55, 0xab, 0x59, 0x49, 0x15, 0xf9, - 0x58, 0x89, 0x7d, 0x63, 0xa4, 0x36, 0x8c, 0xd0, 0x63, 0x32, 0x11, 0xe8, 0x16, 0xcc, 0xbd, 0xc0, - 0x61, 0xa8, 0xf2, 0xfa, 0x09, 0x8e, 0x98, 0x70, 0x6a, 0xb9, 0xdc, 0x3d, 0xb9, 0x85, 0xee, 0xc0, - 0x22, 0xd7, 0x25, 0xb3, 0x85, 0x39, 0x0e, 0x43, 0x12, 0x52, 0x31, 0x76, 0xe6, 0x72, 0x21, 0x7c, - 0x7b, 0x1b, 0x7d, 0x07, 0xc0, 0x89, 0x48, 0xc6, 0xc4, 0x8b, 0x99, 0x70, 0xe6, 0xb5, 0xf1, 0xbf, - 0xbe, 0x92, 0xf1, 0xae, 0x86, 0x6f, 0x31, 0x63, 0xbf, 0x5b, 0xe5, 0xd9, 0xff, 0x11, 0x01, 0x48, - 0x04, 0xe1, 0x9e, 0x6e, 0x4e, 0xce, 0xc2, 0xaa, 0xd5, 0xac, 0x76, 0x36, 0xd2, 0x4a, 0xfd, 0x72, - 0x48, 0xe5, 0x5e, 0x32, 0x68, 0xf9, 0x6c, 0xdc, 0x9e, 0x9e, 0x16, 0x0c, 0x8e, 0xbf, 0xdb, 0xf1, - 0x68, 0xd8, 0x16, 0xc4, 0x4f, 0x38, 0x95, 0x93, 0x56, 0xff, 0x77, 0x5f, 0x6f, 0x0b, 0xc2, 0x23, - 0x3c, 0x26, 0x5b, 0x8a, 0xcd, 0xad, 0x2a, 0x66, 0xfd, 0xb9, 0x94, 0x40, 0xcd, 0xa8, 0xa4, 0xc3, - 0x80, 0x7e, 0x0b, 0xb6, 0xea, 0xce, 0xba, 0x82, 0xae, 0xd6, 0xa7, 0x2c, 0x57, 0x23, 0xd1, 0x67, - 0x00, 0x12, 0xf3, 0x21, 0x91, 0x5d, 0x16, 0x0a, 0xa7, 0xb0, 0x5a, 0x6c, 0x56, 0xd3, 0xfd, 0xdc, - 0xfa, 0x92, 0x80, 0x5a, 0x2e, 0xee, 0xa8, 0x0e, 0xc5, 0x11, 0x99, 0xe8, 0x53, 0xab, 0xae, 0xfa, - 0x44, 0x4f, 0xa0, 0x74, 0x80, 0xc3, 0x24, 0xeb, 0x98, 0x57, 0x4b, 0xa9, 0x9c, 0x45, 0xae, 0xa1, - 0xf9, 0x55, 0xe1, 0x9e, 0xb5, 0xf4, 0x4b, 0xa8, 0x64, 0x79, 0x9f, 0x3f, 0xb1, 0x64, 0x4e, 0xbc, - 0x91, 0x3f, 0xb1, 0x9a, 0xc7, 0xfd, 0x06, 0xe6, 0x4f, 0xc6, 0xe9, 0x5d, 0xe8, 0x62, 0x0e, 0xbd, - 0x69, 0x57, 0x2c, 0xdd, 0xb1, 0x8a, 0x75, 0x7b, 0xd3, 0xae, 0xd8, 0xf5, 0xd2, 0xa6, 0x5d, 0x29, - 0xd5, 0xcb, 0x9b, 0x76, 0xe5, 0x5a, 0x7d, 0xae, 0x71, 0x68, 0xc1, 0x27, 0x7d, 0xc9, 0x09, 0x1e, - 0xf7, 0xa2, 0x21, 0x11, 0xaa, 0xf1, 0x4c, 0xfb, 0xc9, 0x1f, 0xe1, 0x7a, 0x8c, 0xb9, 0xa4, 0x6a, - 0xd1, 0xc3, 0x41, 0xa0, 0x8a, 0x9e, 0x08, 0xc7, 0xd2, 0x3e, 0x7d, 0xa2, 0x72, 0xe1, 0x5f, 0x87, - 0x2b, 0x1b, 0x97, 0xca, 0x05, 0xdf, 0x0f, 0xd5, 0xbc, 0x25, 0x78, 0x4c, 0xa3, 0xa1, 0xef, 0x87, - 0xad, 0xad, 0x8c, 0xf8, 0x81, 0xe1, 0x75, 0x51, 0x7c, 0x6a, 0x85, 0x08, 0xd4, 0x01, 0x10, 0x12, - 0x73, 0xe9, 0xa9, 0x32, 0x49, 0x23, 0x71, 0xa9, 0xae, 0x5e, 0xd5, 0x30, 0xb5, 0xda, 0xf8, 0xe7, - 0xac, 0x19, 0xab, 0x49, 0x3c, 0xb5, 0xeb, 0x6e, 0x36, 0xc9, 0x2c, 0x5d, 0x31, 0x9f, 0x9c, 0xd1, - 0x6d, 0xde, 0x9e, 0x5b, 0x5f, 0x41, 0x9d, 0x46, 0x92, 0xb3, 0x20, 0xf1, 0x49, 0xe0, 0x19, 0x7c, - 0xe1, 0x32, 0xf8, 0x85, 0x63, 0x58, 0x5f, 0x33, 0xdd, 0x85, 0x5a, 0x40, 0x76, 0x71, 0x12, 0x4a, - 0x4f, 0xb5, 0xcf, 0xa2, 0x2e, 0x2d, 0x94, 0x0e, 0x0b, 0x58, 0x37, 0x5b, 0xdb, 0x6e, 0xcf, 0x85, - 0x54, 0x6c, 0x9b, 0x53, 0xf4, 0x67, 0x0b, 0xae, 0x27, 0x9c, 0x0a, 0x6f, 0x30, 0xf1, 0x42, 0xe6, - 0xe3, 0x90, 0xca, 0x89, 0x37, 0x3a, 0x70, 0x6c, 0xad, 0xc2, 0x97, 0x17, 0x5f, 0x0d, 0x8e, 0x6d, - 0x57, 0x8d, 0x57, 0x74, 0x26, 0x5f, 0xa7, 0x0c, 0x8f, 0x0f, 0x4c, 0xdf, 0xbb, 0x71, 0x74, 0xb8, - 0x52, 0xdf, 0x76, 0x7b, 0xf9, 0xad, 0x1d, 0xb7, 0x9e, 0x9c, 0x12, 0x46, 0x2e, 0xd4, 0xc6, 0x07, - 0xbe, 0xef, 0xed, 0xd2, 0x50, 0x12, 0xae, 0x47, 0xf1, 0xfc, 0x89, 0x88, 0x64, 0xf6, 0x7f, 0xb3, - 0xd3, 0xed, 0x6e, 0x68, 0xa1, 0x63, 0xcb, 0x8e, 0xd7, 0x5c, 0x50, 0x2c, 0xe6, 0x1b, 0x7d, 0x05, - 0x40, 0x22, 0x9f, 0x4f, 0x62, 0x3d, 0x0e, 0xcd, 0x40, 0x6a, 0x9e, 0x41, 0xa9, 0xda, 0xff, 0xc3, - 0xa9, 0xe0, 0x53, 0xfd, 0x2b, 0xdc, 0x1c, 0x16, 0x3d, 0x85, 0xc5, 0x81, 0xb6, 0xd6, 0xcb, 0x65, - 0xcd, 0x15, 0xee, 0x02, 0x0b, 0x06, 0xdd, 0xcf, 0x72, 0x07, 0x3d, 0x86, 0x74, 0xc9, 0x23, 0x51, - 0x60, 0xe8, 0x2a, 0x97, 0xa7, 0x9b, 0x33, 0xd8, 0x87, 0x51, 0xa0, 0xc9, 0xb6, 0xa1, 0x1c, 0x8f, - 0x3c, 0x1a, 0x64, 0x53, 0xea, 0xee, 0xa5, 0x63, 0xb6, 0x35, 0xea, 0x05, 0xe9, 0x80, 0xaa, 0xaa, - 0xfb, 0xc4, 0xd6, 0xe3, 0xde, 0xba, 0x70, 0x4b, 0xb1, 0x5a, 0x3e, 0xd5, 0xa7, 0xe1, 0x43, 0xf5, - 0xe9, 0x2e, 0x7c, 0x74, 0x66, 0xea, 0x9c, 0xd1, 0x3a, 0xcf, 0x6f, 0x64, 0xf7, 0x00, 0x8e, 0x6d, - 0xc9, 0x23, 0xed, 0x33, 0x90, 0x95, 0x1c, 0xb2, 0xf1, 0x0f, 0x0b, 0xea, 0x2e, 0x11, 0x92, 0x71, - 0xa2, 0x8a, 0xc8, 0x10, 0x7c, 0x0e, 0xb6, 0xaa, 0xc3, 0x74, 0x58, 0xbc, 0xa3, 0x0c, 0xb5, 0x28, - 0x7a, 0x00, 0xa5, 0x5d, 0xaa, 0x6e, 0x0a, 0xa6, 0x74, 0xd7, 0xce, 0xba, 0x68, 0xe8, 0xe6, 0xed, - 0x92, 0xfd, 0x84, 0x08, 0xa9, 0xb3, 0x2e, 0x6b, 0x04, 0x1a, 0x89, 0x6e, 0x42, 0x2d, 0xbb, 0x01, - 0xf5, 0x82, 0x97, 0xba, 0x7c, 0xb3, 0x91, 0x9e, 0xdf, 0x68, 0xfc, 0xa9, 0x08, 0x0b, 0xa9, 0xca, - 0xd3, 0xce, 0xb3, 0x01, 0xd7, 0xb8, 0x59, 0x32, 0xd9, 0x64, 0x5d, 0x3e, 0x9b, 0x6a, 0x29, 0x50, - 0xe7, 0xd2, 0xc9, 0x9a, 0x29, 0xbc, 0x47, 0xcd, 0xf4, 0xa0, 0xcc, 0x89, 0xbe, 0xd0, 0x98, 0x6b, - 0xfd, 0xcf, 0xde, 0xe9, 0x91, 0xf4, 0x76, 0x3f, 0x22, 0x93, 0xec, 0x1a, 0x66, 0x08, 0xd4, 0x35, - 0x2c, 0x4d, 0x70, 0xd3, 0x94, 0x7e, 0x7e, 0xd1, 0xcc, 0x3c, 0xe1, 0x97, 0x0b, 0x33, 0xfc, 0x3d, - 0xb2, 0xe6, 0x6f, 0x05, 0xb8, 0xde, 0x8f, 0x43, 0x2a, 0x1f, 0x44, 0x41, 0xdf, 0xc7, 0x52, 0xa6, - 0xef, 0xaa, 0x3f, 0x40, 0x59, 0x3f, 0x1c, 0xb2, 0x09, 0x70, 0xff, 0x7c, 0x4d, 0xcf, 0x80, 0x67, - 0xda, 0x6b, 0x7d, 0xba, 0x8a, 0x27, 0x73, 0x84, 0x21, 0xcd, 0xf9, 0xb4, 0xf0, 0x9e, 0x3e, 0x5d, - 0xf2, 0x60, 0xf1, 0xad, 0xd3, 0xd0, 0x26, 0xcc, 0x12, 0xf5, 0x4e, 0x20, 0x99, 0xfe, 0xb7, 0xde, - 0xe9, 0xe9, 0x69, 0xd1, 0xa4, 0xfc, 0x19, 0x41, 0xe3, 0x2f, 0x45, 0x98, 0xeb, 0xf6, 0x77, 0xbe, - 0xe5, 0x34, 0x73, 0xce, 0x4d, 0x35, 0x9e, 0x84, 0xa4, 0x91, 0x79, 0xa3, 0xe9, 0xc2, 0xce, 0x72, - 0x30, 0xb7, 0x81, 0x7e, 0x02, 0xd7, 0x54, 0xa7, 0xf0, 0x62, 0xed, 0x18, 0x93, 0x85, 0x53, 0x41, - 0xdd, 0x43, 0xcc, 0x06, 0xfa, 0x02, 0x66, 0x99, 0xc9, 0x3c, 0x5d, 0x2c, 0xb5, 0x33, 0x07, 0x46, - 0xb7, 0xbf, 0x93, 0xa6, 0x67, 0xa6, 0x61, 0x8a, 0x39, 0x7e, 0xfd, 0x71, 0xf6, 0x42, 0xa4, 0x4f, - 0xc6, 0xfc, 0xeb, 0xcf, 0x65, 0x2f, 0x04, 0xfa, 0x3d, 0x2c, 0xfa, 0x6c, 0x1c, 0xab, 0xda, 0x53, - 0x97, 0x15, 0x9f, 0x05, 0xc4, 0x4f, 0xc7, 0xd3, 0x05, 0x0f, 0x55, 0x55, 0x1e, 0xdd, 0x63, 0x58, - 0x4a, 0x5b, 0xcf, 0x31, 0x75, 0x15, 0xd1, 0xa9, 0x1e, 0x5b, 0xfe, 0x40, 0x3d, 0xb6, 0xf1, 0x2d, - 0x2c, 0x76, 0x92, 0x50, 0x19, 0x94, 0x0b, 0xc7, 0xf4, 0xe9, 0x6e, 0xfd, 0xdf, 0x4f, 0xf7, 0x5b, - 0x6b, 0xb0, 0x70, 0xca, 0x54, 0x54, 0x01, 0xfb, 0x09, 0x8b, 0x48, 0x7d, 0x46, 0x7d, 0x3d, 0x7a, - 0x45, 0xe3, 0xba, 0xd5, 0xb9, 0xfd, 0xfa, 0x3f, 0xcb, 0x33, 0xaf, 0x8f, 0x96, 0xad, 0xef, 0x8f, - 0x96, 0xad, 0x1f, 0x8e, 0x96, 0xad, 0x7f, 0x1f, 0x2d, 0x5b, 0x7f, 0x7d, 0xb3, 0x3c, 0xf3, 0xfd, - 0x9b, 0xe5, 0x99, 0x1f, 0xde, 0x2c, 0xcf, 0x7c, 0x57, 0xcb, 0xfd, 0x75, 0xe4, 0x7f, 0x01, 0x00, - 0x00, 0xff, 0xff, 0x8d, 0xad, 0xd6, 0x98, 0xca, 0x11, 0x00, 0x00, +var fileDescriptor_processors_bulk_io_791817333cf840e0 = []byte{ + // 1783 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x58, 0x4f, 0x6f, 0x1b, 0xb9, + 0x15, 0xf7, 0x48, 0x23, 0x59, 0x7a, 0x8a, 0x1d, 0x99, 0xc9, 0xee, 0x4e, 0x0d, 0xd4, 0x36, 0xb4, + 0xeb, 0x54, 0x4d, 0x11, 0x09, 0x9b, 0xb4, 0x45, 0xd0, 0x76, 0x37, 0x8d, 0xe4, 0x38, 0x2b, 0x7b, + 0x37, 0x71, 0x47, 0xb1, 0x17, 0x58, 0xb4, 0x18, 0x50, 0x33, 0xb4, 0xcc, 0x68, 0x34, 0x1c, 0x93, + 0x1c, 0x27, 0xca, 0xa5, 0x05, 0x7a, 0xea, 0xad, 0x1f, 0xa1, 0xdf, 0xa0, 0xfd, 0x0e, 0xbd, 0xe4, + 0xb8, 0xe8, 0x69, 0xd1, 0x83, 0xd1, 0x3a, 0xdf, 0xa2, 0xa7, 0x82, 0xe4, 0x8c, 0x3c, 0x76, 0x6c, + 0xc7, 0xde, 0x60, 0x2f, 0x36, 0x45, 0xbe, 0xdf, 0x8f, 0xef, 0x3d, 0xbe, 0x3f, 0xe4, 0x40, 0x53, + 0xec, 0x87, 0x6d, 0xf2, 0x92, 0xf8, 0x34, 0xda, 0xe5, 0x38, 0x1e, 0xb4, 0x63, 0xce, 0x7c, 0x22, + 0x04, 0xe3, 0xc2, 0x1b, 0x24, 0xe1, 0xc8, 0xa3, 0xac, 0x15, 0x73, 0x26, 0x19, 0x72, 0x7c, 0xe6, + 0x8f, 0x38, 0xc3, 0xfe, 0x5e, 0x4b, 0xec, 0x87, 0xad, 0x80, 0x0a, 0x29, 0xf6, 0x43, 0x9e, 0x44, + 0x8b, 0x1f, 0x3e, 0x67, 0x03, 0xd1, 0x56, 0x7f, 0xe2, 0x81, 0xfe, 0x67, 0x10, 0x8b, 0x8e, 0x96, + 0x8e, 0x07, 0x6d, 0xca, 0xee, 0xec, 0x32, 0x3e, 0xc6, 0x32, 0x5b, 0xf9, 0x58, 0xed, 0xea, 0x63, + 0x89, 0x43, 0x36, 0x6c, 0x07, 0x44, 0xf8, 0xf1, 0xa0, 0x2d, 0x24, 0x4f, 0x7c, 0x99, 0x70, 0x12, + 0xa4, 0x42, 0xab, 0x17, 0xa9, 0x86, 0x05, 0xc9, 0x76, 0x49, 0x24, 0x0d, 0xdb, 0x7b, 0xa1, 0xdf, + 0x96, 0x74, 0x4c, 0x84, 0xc4, 0xe3, 0x38, 0x5d, 0xb9, 0x39, 0x64, 0x43, 0xa6, 0x87, 0x6d, 0x35, + 0x4a, 0x67, 0x51, 0xa6, 0x55, 0x80, 0x25, 0x4e, 0xe7, 0x16, 0xb2, 0x39, 0x1c, 0x53, 0x33, 0xd5, + 0xf8, 0x7b, 0x11, 0xe6, 0x3b, 0xd8, 0x1f, 0xed, 0xd2, 0x30, 0x24, 0xbc, 0x1f, 0x13, 0x1f, 0x3d, + 0x06, 0x5b, 0x4e, 0x62, 0xe2, 0x58, 0x2b, 0x56, 0x73, 0xfe, 0xee, 0x9d, 0xd6, 0x79, 0x0e, 0x69, + 0x9d, 0xc4, 0xb5, 0x9e, 0x4d, 0x62, 0xd2, 0xb1, 0x5f, 0x1f, 0x2e, 0xcf, 0xb8, 0x9a, 0x00, 0x75, + 0xa0, 0x24, 0xf1, 0x20, 0x24, 0x4e, 0x61, 0xc5, 0x6a, 0xd6, 0xee, 0xde, 0x3a, 0xc5, 0x24, 0xf6, + 0x43, 0x6d, 0xdf, 0x33, 0x25, 0xb3, 0x46, 0x84, 0xcf, 0x69, 0x2c, 0x19, 0x4f, 0x29, 0x0c, 0x14, + 0x3d, 0x82, 0x92, 0x88, 0x71, 0x24, 0x9c, 0xe2, 0x4a, 0xb1, 0x59, 0xbb, 0xfb, 0xd3, 0xf3, 0xb5, + 0xd1, 0x34, 0x2e, 0xc1, 0x81, 0x52, 0x07, 0x47, 0x19, 0x8d, 0x46, 0xa3, 0x4f, 0xa1, 0x12, 0x24, + 0x1c, 0x4b, 0xca, 0x22, 0xc7, 0x5e, 0xb1, 0x9a, 0xc5, 0xce, 0x07, 0x6a, 0xf9, 0x7f, 0x87, 0xcb, + 0x73, 0xca, 0x9d, 0xad, 0xb5, 0x74, 0xd1, 0x9d, 0x8a, 0xa1, 0x8f, 0x01, 0xfc, 0xbd, 0x24, 0x1a, + 0x79, 0x82, 0xbe, 0x22, 0x4e, 0x49, 0x83, 0x0c, 0x67, 0x55, 0xcf, 0xf7, 0xe9, 0x2b, 0x82, 0x1e, + 0x40, 0x85, 0x13, 0x1c, 0x3c, 0x14, 0x4f, 0x77, 0x9d, 0x59, 0x6d, 0xe5, 0x8f, 0x73, 0x1a, 0xaa, + 0x23, 0x6b, 0xed, 0x85, 0x7e, 0xeb, 0x59, 0x76, 0x64, 0x29, 0xc3, 0x14, 0xd4, 0xb8, 0x0d, 0xb6, + 0xf2, 0x1b, 0xaa, 0xc1, 0x6c, 0x2f, 0x3a, 0xc0, 0x21, 0x0d, 0xea, 0x33, 0x08, 0xa0, 0xdc, 0x65, + 0x61, 0x32, 0x8e, 0xea, 0x16, 0xaa, 0x42, 0xa9, 0x17, 0x05, 0xe4, 0x65, 0xbd, 0xb0, 0x61, 0x57, + 0xca, 0xf5, 0xd9, 0xc6, 0x0b, 0xa8, 0x6d, 0xb0, 0xc1, 0x16, 0x67, 0x43, 0x4e, 0x84, 0x40, 0x9f, + 0x40, 0xf9, 0x39, 0x1b, 0x78, 0x34, 0xd0, 0xe7, 0x55, 0xec, 0xcc, 0xa9, 0x0d, 0x8e, 0x0e, 0x97, + 0x4b, 0x1b, 0x6c, 0xd0, 0x5b, 0x73, 0x4b, 0xcf, 0xd9, 0xa0, 0x17, 0xa0, 0x26, 0x5c, 0xf3, 0x59, + 0x24, 0x39, 0x1d, 0x24, 0xda, 0x07, 0xea, 0x44, 0x0a, 0xa9, 0x32, 0x27, 0x56, 0x90, 0x03, 0xb6, + 0x08, 0x99, 0x74, 0x8a, 0x2b, 0x56, 0xb3, 0x94, 0x1d, 0xa7, 0x9a, 0x69, 0xbc, 0xae, 0x00, 0x52, + 0xfe, 0xed, 0x8d, 0x63, 0xc6, 0xe5, 0x1a, 0x96, 0x58, 0x87, 0xcb, 0x2a, 0xd4, 0x04, 0x1e, 0xc7, + 0x21, 0x31, 0x8e, 0x2a, 0xe4, 0x70, 0x60, 0x16, 0xb4, 0xa7, 0x1e, 0x43, 0x25, 0x4e, 0x75, 0x76, + 0xca, 0xda, 0x53, 0xab, 0xe7, 0x9f, 0x65, 0xce, 0xc0, 0xcc, 0x63, 0x19, 0x18, 0x3d, 0x86, 0x62, + 0xc2, 0xa9, 0x33, 0xab, 0xe3, 0xe1, 0x17, 0xe7, 0x73, 0xbc, 0xad, 0x6a, 0x6b, 0x9b, 0xd3, 0x47, + 0x91, 0xe4, 0x13, 0x57, 0x31, 0xa0, 0xcf, 0xa0, 0x6c, 0xd2, 0xd5, 0xa9, 0x68, 0x7d, 0x96, 0x73, + 0x5c, 0x69, 0xa2, 0xb4, 0x7a, 0x4f, 0xd7, 0x69, 0x48, 0xd6, 0xb5, 0x58, 0xaa, 0x49, 0x0a, 0x42, + 0x3b, 0x50, 0xd6, 0x21, 0x2a, 0x9c, 0xaa, 0x56, 0xe5, 0xfe, 0x95, 0x54, 0xd1, 0xd1, 0x2a, 0xb4, + 0x36, 0x9a, 0xd7, 0x72, 0x53, 0x36, 0xf4, 0x00, 0x7e, 0x24, 0x46, 0x34, 0xf6, 0xc6, 0x54, 0x08, + 0x1a, 0x0d, 0xbd, 0x5d, 0xc6, 0x09, 0x1d, 0x46, 0xde, 0x88, 0x4c, 0x84, 0x03, 0x2b, 0x56, 0xb3, + 0x92, 0x2a, 0xf2, 0xa1, 0x12, 0xfb, 0xca, 0x48, 0xad, 0x1b, 0xa1, 0x4d, 0x32, 0x11, 0xe8, 0x36, + 0xcc, 0xbd, 0xc0, 0x61, 0xa8, 0xe2, 0xfa, 0x09, 0x8e, 0x98, 0x70, 0x6a, 0xb9, 0xd8, 0x3d, 0xb9, + 0x84, 0xee, 0xc2, 0x02, 0xd7, 0x29, 0xb3, 0x85, 0x39, 0x0e, 0x43, 0x12, 0x52, 0x31, 0x76, 0xe6, + 0x72, 0x47, 0xf8, 0xf6, 0x32, 0xfa, 0x06, 0x80, 0x13, 0x91, 0x8c, 0x89, 0x17, 0x33, 0xe1, 0xcc, + 0x6b, 0xe3, 0x7f, 0x7d, 0x25, 0xe3, 0x5d, 0x0d, 0xdf, 0x62, 0xc6, 0x7e, 0xb7, 0xca, 0xb3, 0xdf, + 0x88, 0x00, 0x24, 0x82, 0x70, 0x4f, 0x17, 0x27, 0xe7, 0xfa, 0x8a, 0xd5, 0xac, 0x76, 0xd6, 0xd3, + 0x4c, 0xfd, 0x7c, 0x48, 0xe5, 0x5e, 0x32, 0x68, 0xf9, 0x6c, 0xdc, 0x9e, 0xee, 0x16, 0x0c, 0x8e, + 0xc7, 0xed, 0x78, 0x34, 0x6c, 0x0b, 0xe2, 0x27, 0x9c, 0xca, 0x49, 0xab, 0xff, 0xbb, 0x2f, 0xb7, + 0x05, 0xe1, 0x11, 0x1e, 0x93, 0x2d, 0xc5, 0xe6, 0x56, 0x15, 0xb3, 0x1e, 0x2e, 0x26, 0x50, 0x33, + 0x2a, 0xe9, 0x63, 0x40, 0xbf, 0x05, 0x5b, 0x55, 0x67, 0x9d, 0x41, 0x57, 0xab, 0x53, 0x96, 0xab, + 0x91, 0xe8, 0x13, 0x00, 0x89, 0xf9, 0x90, 0xc8, 0x2e, 0x0b, 0x85, 0x53, 0x58, 0x29, 0x36, 0xab, + 0xe9, 0x7a, 0x6e, 0x7e, 0x51, 0x40, 0x2d, 0x77, 0xee, 0xa8, 0x0e, 0xc5, 0x11, 0x99, 0xe8, 0x5d, + 0xab, 0xae, 0x1a, 0xa2, 0x27, 0x50, 0x3a, 0xc0, 0x61, 0x92, 0x55, 0xcc, 0xab, 0x85, 0x54, 0xce, + 0x22, 0xd7, 0xd0, 0xfc, 0xaa, 0x70, 0xdf, 0x5a, 0xfc, 0x25, 0x54, 0xb2, 0xb8, 0xcf, 0xef, 0x58, + 0x32, 0x3b, 0xde, 0xcc, 0xef, 0x58, 0xcd, 0xe3, 0x7e, 0x03, 0xf3, 0x27, 0xcf, 0xe9, 0x5d, 0xe8, + 0x62, 0x0e, 0xbd, 0x61, 0x57, 0x2c, 0x5d, 0xb1, 0x8a, 0x75, 0x7b, 0xc3, 0xae, 0xd8, 0xf5, 0xd2, + 0x86, 0x5d, 0x29, 0xd5, 0xcb, 0x1b, 0x76, 0xe5, 0x5a, 0x7d, 0xae, 0xf1, 0xaf, 0x02, 0x7c, 0xd4, + 0x97, 0x9c, 0xe0, 0x71, 0x2f, 0x1a, 0x12, 0xa1, 0x0a, 0xcf, 0xb4, 0x9e, 0xfc, 0x11, 0x6e, 0xc4, + 0x98, 0x4b, 0xaa, 0x26, 0x3d, 0x1c, 0x04, 0x2a, 0xe9, 0x89, 0x70, 0x2c, 0xed, 0xd3, 0x27, 0x2a, + 0x16, 0xfe, 0x7d, 0xb8, 0xbc, 0x7e, 0xa9, 0x58, 0xf0, 0xfd, 0x50, 0xf5, 0x5b, 0x82, 0xc7, 0x34, + 0x1a, 0xfa, 0x7e, 0xd8, 0xda, 0xca, 0x88, 0x1f, 0x1a, 0x5e, 0x17, 0xc5, 0xa7, 0x66, 0x88, 0x40, + 0x1d, 0x00, 0x21, 0x31, 0x97, 0x9e, 0x4a, 0x93, 0xf4, 0x24, 0x2e, 0x55, 0xd5, 0xab, 0x1a, 0xa6, + 0x66, 0x11, 0x87, 0x79, 0xb3, 0x71, 0x66, 0x81, 0xae, 0xa7, 0xd5, 0xce, 0x66, 0xaa, 0x7f, 0xf7, + 0xfb, 0xe9, 0x6f, 0x7c, 0x96, 0x29, 0x3f, 0x27, 0xf2, 0x3f, 0x1b, 0xff, 0x9c, 0x35, 0xad, 0x3c, + 0x89, 0xa7, 0xbe, 0xbc, 0x97, 0x75, 0x4f, 0x4b, 0x67, 0xe9, 0x47, 0x67, 0x54, 0xb8, 0xb7, 0x7b, + 0xe5, 0x17, 0x50, 0xa7, 0x91, 0xe4, 0x2c, 0x48, 0x7c, 0x12, 0x78, 0x06, 0x5f, 0xb8, 0x0c, 0xfe, + 0xfa, 0x31, 0xac, 0xaf, 0x99, 0xee, 0x41, 0x2d, 0x20, 0xbb, 0x38, 0x09, 0xa5, 0xa7, 0x4a, 0xb6, + 0x71, 0x01, 0x4a, 0x1b, 0x14, 0xac, 0x99, 0xa5, 0x6d, 0xb7, 0xe7, 0x42, 0x2a, 0xb6, 0xcd, 0x29, + 0xfa, 0xb3, 0x05, 0x37, 0x12, 0x4e, 0x85, 0x37, 0x98, 0x78, 0x21, 0xf3, 0x71, 0x48, 0xe5, 0xc4, + 0x1b, 0x1d, 0x38, 0xb6, 0x56, 0xe1, 0xf3, 0x8b, 0xaf, 0x23, 0xc7, 0xb6, 0xab, 0x62, 0x2f, 0x3a, + 0x93, 0x2f, 0x53, 0x86, 0xcd, 0x03, 0x53, 0x6b, 0x6f, 0x1e, 0x1d, 0x2e, 0xd7, 0xb7, 0xdd, 0x5e, + 0x7e, 0x69, 0xc7, 0xad, 0x27, 0xa7, 0x84, 0x91, 0x0b, 0xb5, 0xf1, 0x81, 0xef, 0x7b, 0xbb, 0x34, + 0x94, 0x84, 0xeb, 0xf6, 0x3f, 0x7f, 0x22, 0x0a, 0x32, 0xfb, 0xbf, 0xda, 0xe9, 0x76, 0xd7, 0xb5, + 0xd0, 0xb1, 0x65, 0xc7, 0x73, 0x2e, 0x28, 0x16, 0x33, 0x46, 0x5f, 0x00, 0x90, 0xc8, 0xe7, 0x93, + 0x58, 0xb7, 0x60, 0xd3, 0x04, 0x9b, 0x67, 0x50, 0xaa, 0x96, 0xf3, 0x68, 0x2a, 0xf8, 0x54, 0xff, + 0x15, 0x6e, 0x0e, 0x8b, 0x9e, 0xc2, 0xc2, 0x40, 0x5b, 0xeb, 0xe5, 0x22, 0xf5, 0x0a, 0xf7, 0x8f, + 0xeb, 0x06, 0xdd, 0x9f, 0xc6, 0xeb, 0x26, 0xa4, 0x53, 0x1e, 0x89, 0x02, 0x43, 0x57, 0xb9, 0x3c, + 0xdd, 0x9c, 0xc1, 0x3e, 0x8a, 0x02, 0x4d, 0xb6, 0x0d, 0xe5, 0x78, 0xe4, 0xd1, 0x20, 0xeb, 0x8c, + 0xf7, 0x2e, 0x7d, 0x66, 0x5b, 0xa3, 0x5e, 0x90, 0x36, 0xc5, 0xaa, 0xba, 0xc3, 0x6c, 0x6d, 0xf6, + 0xd6, 0x84, 0x5b, 0x8a, 0xd5, 0xf4, 0xa9, 0xde, 0x00, 0x3f, 0x54, 0x6f, 0xe8, 0xc2, 0x07, 0x67, + 0x86, 0xce, 0x19, 0xe5, 0xfa, 0xfc, 0xe2, 0x79, 0x1f, 0xe0, 0xd8, 0x96, 0x3c, 0xd2, 0x3e, 0x03, + 0x59, 0xc9, 0x21, 0x1b, 0xff, 0xb0, 0xa0, 0xee, 0x12, 0x21, 0x19, 0x27, 0x2a, 0x89, 0x0c, 0xc1, + 0xa7, 0x60, 0xab, 0x3c, 0x4c, 0x1b, 0xd4, 0x3b, 0xd2, 0x50, 0x8b, 0xa2, 0x87, 0x50, 0xda, 0xa5, + 0xea, 0x76, 0x62, 0x52, 0x77, 0xf5, 0xac, 0xcb, 0x8d, 0x6e, 0x18, 0x2e, 0xd9, 0x4f, 0x88, 0x90, + 0x3a, 0xea, 0xb2, 0x42, 0xa0, 0x91, 0xe8, 0x16, 0xd4, 0xb2, 0x5b, 0x57, 0x2f, 0x78, 0xa9, 0xd3, + 0x37, 0xbb, 0x46, 0xe4, 0x17, 0x1a, 0x7f, 0x2a, 0xc2, 0xf5, 0x54, 0xe5, 0x69, 0xe5, 0x59, 0x87, + 0x6b, 0xdc, 0x4c, 0x99, 0x68, 0xb2, 0x2e, 0x1f, 0x4d, 0xb5, 0x14, 0xa8, 0x63, 0xe9, 0x64, 0xce, + 0x14, 0xde, 0x23, 0x67, 0x7a, 0x50, 0xe6, 0x44, 0x5f, 0xa2, 0xcc, 0x53, 0xe2, 0x67, 0xef, 0xf4, + 0x48, 0xfa, 0xa2, 0x18, 0x91, 0x49, 0x76, 0xf5, 0x33, 0x04, 0xea, 0xea, 0x97, 0x06, 0xb8, 0x29, + 0x4a, 0x3f, 0xbf, 0xa8, 0x4f, 0x9f, 0xf0, 0xcb, 0x85, 0x11, 0xfe, 0x1e, 0x51, 0xf3, 0xb7, 0x02, + 0xdc, 0xe8, 0xc7, 0x21, 0x95, 0x0f, 0xa3, 0xa0, 0xef, 0x63, 0x29, 0xd3, 0xb7, 0xdc, 0x1f, 0xa0, + 0xac, 0x1f, 0x2b, 0x59, 0x07, 0x78, 0x70, 0xbe, 0xa6, 0x67, 0xc0, 0x33, 0xed, 0xb5, 0x3e, 0x5d, + 0xc5, 0x93, 0x39, 0xc2, 0x90, 0xe6, 0x7c, 0x5a, 0x78, 0x4f, 0x9f, 0x2e, 0x7a, 0xb0, 0xf0, 0xd6, + 0x6e, 0x68, 0x03, 0x66, 0x89, 0x7a, 0x9b, 0x90, 0x4c, 0xff, 0xdb, 0xef, 0xf4, 0xf4, 0x34, 0x69, + 0x52, 0xfe, 0x8c, 0xa0, 0xf1, 0x97, 0x22, 0xcc, 0x75, 0xfb, 0x3b, 0x5f, 0x73, 0x9a, 0x39, 0xe7, + 0x96, 0x6a, 0x4f, 0x42, 0xd2, 0xc8, 0xbc, 0x0b, 0x75, 0x62, 0x67, 0x31, 0x98, 0x5b, 0x40, 0x3f, + 0x81, 0x6b, 0xaa, 0x52, 0x78, 0xb1, 0x76, 0x8c, 0x89, 0xc2, 0xa9, 0xa0, 0xae, 0x21, 0x66, 0x01, + 0x7d, 0x06, 0xb3, 0xcc, 0x44, 0x9e, 0x4e, 0x96, 0xda, 0x99, 0x0d, 0xa3, 0xdb, 0xdf, 0x49, 0xc3, + 0x33, 0xd3, 0x30, 0xc5, 0x1c, 0xbf, 0x38, 0x39, 0x7b, 0x21, 0xd2, 0x67, 0x6a, 0xfe, 0xc5, 0xe9, + 0xb2, 0x17, 0x02, 0xfd, 0x1e, 0x16, 0x7c, 0x36, 0x8e, 0x55, 0xee, 0xa9, 0x0b, 0x92, 0xcf, 0x02, + 0xe2, 0xa7, 0xed, 0xe9, 0x82, 0xc7, 0xb1, 0x4a, 0x8f, 0xee, 0x31, 0x2c, 0xa5, 0xad, 0xe7, 0x98, + 0xba, 0x8a, 0xe8, 0x54, 0x8d, 0x2d, 0xff, 0x40, 0x35, 0xb6, 0xf1, 0x35, 0x2c, 0x74, 0x92, 0x50, + 0x19, 0x94, 0x3b, 0x8e, 0xe9, 0xe7, 0x02, 0xeb, 0x7b, 0x7f, 0x2e, 0xb8, 0xbd, 0x0a, 0xd7, 0x4f, + 0x99, 0x8a, 0x2a, 0x60, 0x3f, 0x61, 0x11, 0xa9, 0xcf, 0xa8, 0xd1, 0xe3, 0x57, 0x34, 0xae, 0x5b, + 0x9d, 0x3b, 0xaf, 0xff, 0xbb, 0x34, 0xf3, 0xfa, 0x68, 0xc9, 0xfa, 0xf6, 0x68, 0xc9, 0xfa, 0xee, + 0x68, 0xc9, 0xfa, 0xcf, 0xd1, 0x92, 0xf5, 0xd7, 0x37, 0x4b, 0x33, 0xdf, 0xbe, 0x59, 0x9a, 0xf9, + 0xee, 0xcd, 0xd2, 0xcc, 0x37, 0xb5, 0xdc, 0x17, 0x99, 0xff, 0x07, 0x00, 0x00, 0xff, 0xff, 0xc0, + 0xb0, 0xb4, 0x69, 0x3e, 0x12, 0x00, 0x00, } diff --git a/pkg/sql/execinfrapb/processors_bulk_io.proto b/pkg/sql/execinfrapb/processors_bulk_io.proto index 7008638e3645..6027e9a4472d 100644 --- a/pkg/sql/execinfrapb/processors_bulk_io.proto +++ b/pkg/sql/execinfrapb/processors_bulk_io.proto @@ -140,6 +140,8 @@ message StreamIngestionDataSpec { repeated string partition_addresses = 1 [(gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl.PartitionAddress",(gogoproto.nullable) = false]; // The processor will ingest events from StartTime onwards. optional util.hlc.Timestamp start_time = 2 [(gogoproto.nullable) = false]; + // StreamAddress locate the stream so that a stream client can be initialized. + optional string stream_address = 3 [(gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl.StreamAddress",(gogoproto.nullable) = false]; } message BackupDataSpec { From 23dbf054f75ade4de598b6c23612ad011a020ffb Mon Sep 17 00:00:00 2001 From: Paul Bardea Date: Sun, 17 Jan 2021 19:38:07 -0500 Subject: [PATCH 6/7] streamclient: rename client to mockClient Release note: None --- pkg/ccl/streamingccl/streamclient/client.go | 2 +- pkg/ccl/streamingccl/streamclient/client_test.go | 2 +- pkg/ccl/streamingccl/streamclient/stream_client.go | 10 +++++----- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/ccl/streamingccl/streamclient/client.go b/pkg/ccl/streamingccl/streamclient/client.go index df6302ed36d1..1c7b574e266c 100644 --- a/pkg/ccl/streamingccl/streamclient/client.go +++ b/pkg/ccl/streamingccl/streamclient/client.go @@ -49,7 +49,7 @@ func NewStreamClient(streamAddress streamingccl.StreamAddress) (Client, error) { return streamClient, err } default: - streamClient = &client{} + streamClient = &mockClient{} } return streamClient, nil diff --git a/pkg/ccl/streamingccl/streamclient/client_test.go b/pkg/ccl/streamingccl/streamclient/client_test.go index 054cab14a3cf..f19c9c28941a 100644 --- a/pkg/ccl/streamingccl/streamclient/client_test.go +++ b/pkg/ccl/streamingccl/streamclient/client_test.go @@ -97,7 +97,7 @@ func TestImplementationsCloseChannel(t *testing.T) { // TODO: Add SQL client and file client here when implemented. impls := []Client{ - &client{}, + &mockClient{}, randomClient, } diff --git a/pkg/ccl/streamingccl/streamclient/stream_client.go b/pkg/ccl/streamingccl/streamclient/stream_client.go index 81ebd251a09f..25af66666c4b 100644 --- a/pkg/ccl/streamingccl/streamclient/stream_client.go +++ b/pkg/ccl/streamingccl/streamclient/stream_client.go @@ -15,20 +15,20 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" ) -// client is a mock stream client. -type client struct{} +// mockClient is a mock stream client. +type mockClient struct{} -var _ Client = &client{} +var _ Client = &mockClient{} // GetTopology implements the Client interface. -func (m *client) GetTopology(_ streamingccl.StreamAddress) (streamingccl.Topology, error) { +func (m *mockClient) GetTopology(_ streamingccl.StreamAddress) (streamingccl.Topology, error) { return streamingccl.Topology{ Partitions: []streamingccl.PartitionAddress{"some://address"}, }, nil } // ConsumePartition implements the Client interface. -func (m *client) ConsumePartition( +func (m *mockClient) ConsumePartition( ctx context.Context, _ streamingccl.PartitionAddress, _ time.Time, ) (chan streamingccl.Event, error) { eventCh := make(chan streamingccl.Event) From be6163f95269508e142bd4440c409a26be105173 Mon Sep 17 00:00:00 2001 From: Paul Bardea Date: Sun, 17 Jan 2021 20:08:57 -0500 Subject: [PATCH 7/7] streamclient: change client example from Test to Example Release note: None --- .../streamingccl/streamclient/client_test.go | 47 +++++++++++-------- 1 file changed, 28 insertions(+), 19 deletions(-) diff --git a/pkg/ccl/streamingccl/streamclient/client_test.go b/pkg/ccl/streamingccl/streamclient/client_test.go index f19c9c28941a..4aea27ee9979 100644 --- a/pkg/ccl/streamingccl/streamclient/client_test.go +++ b/pkg/ccl/streamingccl/streamclient/client_test.go @@ -10,6 +10,7 @@ package streamclient import ( "context" + "fmt" "net/url" "testing" "time" @@ -42,49 +43,57 @@ func (sc testStreamClient) ConsumePartition( sampleKV := roachpb.KeyValue{ Key: []byte("key_1"), Value: roachpb.Value{ - RawBytes: []byte("value 1"), + RawBytes: []byte("value_1"), Timestamp: hlc.Timestamp{WallTime: 1}, }, } - events := make(chan streamingccl.Event, 100) + events := make(chan streamingccl.Event, 2) events <- streamingccl.MakeKVEvent(sampleKV) - events <- streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}) + events <- streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 100}) close(events) return events, nil } -// TestExampleClientUsage serves as documentation to indicate how a stream +// ExampleClientUsage serves as documentation to indicate how a stream // client could be used. -func TestExampleClientUsage(t *testing.T) { - ctx := context.Background() +func ExampleClient() { client := testStreamClient{} - sa := streamingccl.StreamAddress("s3://my_bucket/my_stream") - topology, err := client.GetTopology(sa) - require.NoError(t, err) + topology, err := client.GetTopology("s3://my_bucket/my_stream") + if err != nil { + panic(err) + } startTimestamp := timeutil.Now() - numReceivedEvents := 0 for _, partition := range topology.Partitions { - eventCh, err := client.ConsumePartition(ctx, partition, startTimestamp) - require.NoError(t, err) + eventCh, err := client.ConsumePartition(context.Background(), partition, startTimestamp) + if err != nil { + panic(err) + } // This example looks for the closing of the channel to terminate the test, // but an ingestion job should look for another event such as the user // cutting over to the new cluster to move to the next stage. - for { - _, ok := <-eventCh - if !ok { - break + for event := range eventCh { + switch event.Type() { + case streamingccl.KVEvent: + kv := event.GetKV() + fmt.Printf("%s->%s@%d\n", kv.Key.String(), string(kv.Value.RawBytes), kv.Value.Timestamp.WallTime) + case streamingccl.CheckpointEvent: + fmt.Printf("resolved %d\n", event.GetResolved().WallTime) + default: + panic(fmt.Sprintf("unexpected event type %v", event.Type())) } - numReceivedEvents++ } } - // We expect 4 events, 2 from each partition. - require.Equal(t, 4, numReceivedEvents) + // Output: + // "key_1"->value_1@1 + // resolved 100 + // "key_1"->value_1@1 + // resolved 100 } // Ensure that all implementations specified in this test properly close the