diff --git a/docs/generated/sql/bnf/restore.bnf b/docs/generated/sql/bnf/restore.bnf index e83146e7eebe..ada90bea0a3b 100644 --- a/docs/generated/sql/bnf/restore.bnf +++ b/docs/generated/sql/bnf/restore.bnf @@ -23,4 +23,5 @@ restore_stmt ::= | 'RESTORE' ( ( 'TABLE' | ) table_pattern ( ( ',' table_pattern ) )* | 'DATABASE' database_name ( ( ',' database_name ) )* ) 'FROM' subdirectory 'IN' full_backup_location ( | partitioned_backup_location ( ',' partitioned_backup_location )*) 'WITH' restore_options_list | 'RESTORE' ( ( 'TABLE' | ) table_pattern ( ( ',' table_pattern ) )* | 'DATABASE' database_name ( ( ',' database_name ) )* ) 'FROM' subdirectory 'IN' full_backup_location ( | partitioned_backup_location ( ',' partitioned_backup_location )*) 'WITH' 'OPTIONS' '(' restore_options_list ')' | 'RESTORE' ( ( 'TABLE' | ) table_pattern ( ( ',' table_pattern ) )* | 'DATABASE' database_name ( ( ',' database_name ) )* ) 'FROM' subdirectory 'IN' full_backup_location ( | partitioned_backup_location ( ',' partitioned_backup_location )*) - | 'RESTORE' ( ( 'TABLE' | ) table_pattern ( ( ',' table_pattern ) )* | 'DATABASE' database_name ( ( ',' database_name ) )* ) 'FROM' 'REPLICATION' 'STREAM' 'FROM' subdirectory_opt_list + | 'RESTORE' ( ( 'TABLE' | ) table_pattern ( ( ',' table_pattern ) )* | 'DATABASE' database_name ( ( ',' database_name ) )* ) 'FROM' 'REPLICATION' 'STREAM' 'FROM' subdirectory_opt_list 'AS' 'OF' 'SYSTEM' 'TIME' timestamp + | 'RESTORE' ( ( 'TABLE' | ) table_pattern ( ( ',' table_pattern ) )* | 'DATABASE' database_name ( ( ',' database_name ) )* ) 'FROM' 'REPLICATION' 'STREAM' 'FROM' subdirectory_opt_list diff --git a/docs/generated/sql/bnf/stmt_block.bnf b/docs/generated/sql/bnf/stmt_block.bnf index 699ae80495fd..ba81f09c1851 100644 --- a/docs/generated/sql/bnf/stmt_block.bnf +++ b/docs/generated/sql/bnf/stmt_block.bnf @@ -180,7 +180,7 @@ restore_stmt ::= | 'RESTORE' 'FROM' string_or_placeholder 'IN' list_of_string_or_placeholder_opt_list opt_as_of_clause opt_with_restore_options | 'RESTORE' targets 'FROM' list_of_string_or_placeholder_opt_list opt_as_of_clause opt_with_restore_options | 'RESTORE' targets 'FROM' string_or_placeholder 'IN' list_of_string_or_placeholder_opt_list opt_as_of_clause opt_with_restore_options - | 'RESTORE' targets 'FROM' 'REPLICATION' 'STREAM' 'FROM' string_or_placeholder_opt_list + | 'RESTORE' targets 'FROM' 'REPLICATION' 'STREAM' 'FROM' string_or_placeholder_opt_list opt_as_of_clause resume_stmt ::= resume_jobs_stmt diff --git a/pkg/ccl/backupccl/restore_data_processor.go b/pkg/ccl/backupccl/restore_data_processor.go index 4aaf04c71903..ed73198ca318 100644 --- a/pkg/ccl/backupccl/restore_data_processor.go +++ b/pkg/ccl/backupccl/restore_data_processor.go @@ -80,12 +80,8 @@ func newRestoreDataProcessor( // Start is part of the RowSource interface. func (rd *restoreDataProcessor) Start(ctx context.Context) { - rd.input.Start(ctx) ctx = rd.StartInternal(ctx, restoreDataProcName) - // Go around "this value of ctx is never used" linter error. We do it this - // way instead of omitting the assignment to ctx above so that if in the - // future other initialization is added, the correct ctx is used. - _ = ctx + rd.input.Start(ctx) } // Next is part of the RowSource interface. diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 433af3fde802..daf4a8df76f2 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -906,11 +906,10 @@ func newChangeFrontierProcessor( // Start is part of the RowSource interface. func (cf *changeFrontier) Start(ctx context.Context) { - cf.input.Start(ctx) - // StartInternal called at the beginning of the function because there are // early returns if errors are detected. ctx = cf.StartInternal(ctx, changeFrontierProcName) + cf.input.Start(ctx) // Pass a nil oracle because this sink is only used to emit resolved timestamps // but the oracle is only used when emitting row updates. diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go index 6045b08c8b68..d36ac399e85f 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go @@ -88,12 +88,8 @@ func newStreamIngestionFrontierProcessor( // Start is part of the RowSource interface. func (sf *streamIngestionFrontier) Start(ctx context.Context) { - sf.input.Start(ctx) ctx = sf.StartInternal(ctx, streamIngestionFrontierProcName) - // Go around "this value of ctx is never used" linter error. We do it this - // way instead of omitting the assignment to ctx above so that if in the - // future other initialization is added, the correct ctx is used. - _ = ctx + sf.input.Start(ctx) } // Next is part of the RowSource interface. diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go index 924bca16aa1f..8bbf83eca36d 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go @@ -30,6 +30,7 @@ type streamIngestionResumer struct { func ingest( ctx context.Context, execCtx sql.JobExecContext, + startTime hlc.Timestamp, streamAddress streamingccl.StreamAddress, progress jobspb.Progress, jobID jobspb.JobID, @@ -49,7 +50,7 @@ func ingest( // KVs. We can skip to ingesting after this resolved ts. Plumb the // initialHighwatermark to the ingestion processor spec based on what we read // from the job progress. - var initialHighWater hlc.Timestamp + initialHighWater := startTime if h := progress.GetHighWater(); h != nil && !h.IsEmpty() { initialHighWater = *h } @@ -80,7 +81,7 @@ func (s *streamIngestionResumer) Resume(resumeCtx context.Context, execCtx inter p := execCtx.(sql.JobExecContext) // Start ingesting KVs from the replication stream. - err := ingest(resumeCtx, p, details.StreamAddress, s.job.Progress(), s.job.ID()) + err := ingest(resumeCtx, p, details.StartTime, details.StreamAddress, s.job.Progress(), s.job.ID()) if err != nil { return err } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go index 3c631a6adec0..a22af70728de 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" ) @@ -92,11 +93,19 @@ func ingestionPlanHook( // TODO(adityamaru): Add privileges checks. Probably the same as RESTORE. prefix := keys.MakeTenantPrefix(ingestionStmt.Targets.Tenant) + startTime := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} + if ingestionStmt.AsOf.Expr != nil { + var err error + startTime, err = p.EvalAsOfTimestamp(ctx, ingestionStmt.AsOf) + if err != nil { + return err + } + } + streamIngestionDetails := jobspb.StreamIngestionDetails{ StreamAddress: streamingccl.StreamAddress(from[0]), Span: roachpb.Span{Key: prefix, EndKey: prefix.PrefixEnd()}, - // TODO: Figure out what the initial ts should be. - StartTime: hlc.Timestamp{}, + StartTime: startTime, } jobDescription, err := streamIngestionJobDescription(p, ingestionStmt) diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_planning.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_planning.go index a239e97f9942..009b070ac8a9 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_planning.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_planning.go @@ -45,6 +45,7 @@ func distStreamIngestionPlanSpecs( if i < len(nodes) { spec := &execinfrapb.StreamIngestionDataSpec{ JobID: int64(jobID), + StartTime: initialHighWater, StreamAddress: streamAddress, PartitionAddresses: make([]streamingccl.PartitionAddress, 0), } diff --git a/pkg/sql/colexec/materializer.go b/pkg/sql/colexec/materializer.go index eb5f5a85885f..5545858ff043 100644 --- a/pkg/sql/colexec/materializer.go +++ b/pkg/sql/colexec/materializer.go @@ -233,12 +233,8 @@ func (m *Materializer) Child(nth int, verbose bool) execinfra.OpNode { // Start is part of the execinfra.RowSource interface. func (m *Materializer) Start(ctx context.Context) { - m.drainHelper.Start(ctx) ctx = m.ProcessorBase.StartInternal(ctx, materializerProcName) - // Go around "this value of ctx is never used" linter error. We do it this - // way instead of omitting the assignment to ctx above so that if in the - // future other initialization is added, the correct ctx is used. - _ = ctx + m.drainHelper.Start(ctx) // We can encounter an expected error during Init (e.g. an operator // attempts to allocate a batch, but the memory budget limit has been // reached), so we need to wrap it with a catcher. diff --git a/pkg/sql/execinfra/metadata_test_receiver.go b/pkg/sql/execinfra/metadata_test_receiver.go index 04fbb49950e3..24b71a4075ad 100644 --- a/pkg/sql/execinfra/metadata_test_receiver.go +++ b/pkg/sql/execinfra/metadata_test_receiver.go @@ -140,12 +140,8 @@ func (mtr *MetadataTestReceiver) checkRowNumMetadata() *execinfrapb.ProducerMeta // Start is part of the RowSource interface. func (mtr *MetadataTestReceiver) Start(ctx context.Context) { - mtr.input.Start(ctx) ctx = mtr.StartInternal(ctx, metadataTestReceiverProcName) - // Go around "this value of ctx is never used" linter error. We do it this - // way instead of omitting the assignment to ctx above so that if in the - // future other initialization is added, the correct ctx is used. - _ = ctx + mtr.input.Start(ctx) } // Next is part of the RowSource interface. diff --git a/pkg/sql/execinfra/metadata_test_sender.go b/pkg/sql/execinfra/metadata_test_sender.go index 8394d58a48db..a7b383390e99 100644 --- a/pkg/sql/execinfra/metadata_test_sender.go +++ b/pkg/sql/execinfra/metadata_test_sender.go @@ -75,12 +75,8 @@ func NewMetadataTestSender( // Start is part of the RowSource interface. func (mts *MetadataTestSender) Start(ctx context.Context) { - mts.input.Start(ctx) ctx = mts.StartInternal(ctx, metadataTestSenderProcName) - // Go around "this value of ctx is never used" linter error. We do it this - // way instead of omitting the assignment to ctx above so that if in the - // future other initialization is added, the correct ctx is used. - _ = ctx + mts.input.Start(ctx) } // Next is part of the RowSource interface. diff --git a/pkg/sql/execinfra/processorsbase.go b/pkg/sql/execinfra/processorsbase.go index b885bb1a7609..5b10633407e2 100644 --- a/pkg/sql/execinfra/processorsbase.go +++ b/pkg/sql/execinfra/processorsbase.go @@ -870,6 +870,7 @@ func ProcessorSpan(ctx context.Context, name string) (context.Context, *tracing. // It is likely that this method is called from RowSource.Start implementation, // and the recommended layout is the following: // ctx = pb.StartInternal(ctx, name) +// < inputs >.Start(ctx) // if there are any inputs-RowSources to pb // < other initialization > // so that the caller doesn't mistakenly use old ctx object. func (pb *ProcessorBase) StartInternal(ctx context.Context, name string) context.Context { diff --git a/pkg/sql/parser/parse_test.go b/pkg/sql/parser/parse_test.go index 13dda0f380f5..cb5dd7f3d199 100644 --- a/pkg/sql/parser/parse_test.go +++ b/pkg/sql/parser/parse_test.go @@ -1729,6 +1729,8 @@ func TestParse(t *testing.T) { {`RESTORE TENANT 123 FROM REPLICATION STREAM FROM 'bar'`}, {`RESTORE TENANT 123 FROM REPLICATION STREAM FROM $1`}, + {`RESTORE TENANT 123 FROM REPLICATION STREAM FROM 'bar' AS OF SYSTEM TIME '1'`}, + {`RESTORE TENANT 123 FROM REPLICATION STREAM FROM $1 AS OF SYSTEM TIME '1'`}, // Currently, we only support TENANT as a target. We have grammar rules for // all targets supported by RESTORE but these will error out during the diff --git a/pkg/sql/parser/sql.y b/pkg/sql/parser/sql.y index 729b45232a93..c3e0868c7860 100644 --- a/pkg/sql/parser/sql.y +++ b/pkg/sql/parser/sql.y @@ -2639,11 +2639,12 @@ restore_stmt: Options: *($8.restoreOptions()), } } -| RESTORE targets FROM REPLICATION STREAM FROM string_or_placeholder_opt_list +| RESTORE targets FROM REPLICATION STREAM FROM string_or_placeholder_opt_list opt_as_of_clause { $$.val = &tree.StreamIngestion{ Targets: $2.targetList(), From: $7.stringOrPlaceholderOptList(), + AsOf: $8.asOfClause(), } } | RESTORE error // SHOW HELP: RESTORE diff --git a/pkg/sql/rowexec/aggregator.go b/pkg/sql/rowexec/aggregator.go index b4a6f6cdfa0a..dcceac67e14c 100644 --- a/pkg/sql/rowexec/aggregator.go +++ b/pkg/sql/rowexec/aggregator.go @@ -337,8 +337,8 @@ func (ag *orderedAggregator) Start(ctx context.Context) { } func (ag *aggregatorBase) start(ctx context.Context, procName string) { - ag.input.Start(ctx) ctx = ag.StartInternal(ctx, procName) + ag.input.Start(ctx) ag.cancelChecker = cancelchecker.NewCancelChecker(ctx) ag.runningState = aggAccumulating } diff --git a/pkg/sql/rowexec/bulk_row_writer.go b/pkg/sql/rowexec/bulk_row_writer.go index 37f0d4b94a29..9271a2500b00 100644 --- a/pkg/sql/rowexec/bulk_row_writer.go +++ b/pkg/sql/rowexec/bulk_row_writer.go @@ -76,8 +76,8 @@ func newBulkRowWriterProcessor( // Start is part of the RowSource interface. func (sp *bulkRowWriter) Start(ctx context.Context) { - sp.input.Start(ctx) ctx = sp.StartInternal(ctx, "bulkRowWriter") + sp.input.Start(ctx) err := sp.work(ctx) sp.MoveToDraining(err) } diff --git a/pkg/sql/rowexec/countrows.go b/pkg/sql/rowexec/countrows.go index 324b51b38512..c915e84469f3 100644 --- a/pkg/sql/rowexec/countrows.go +++ b/pkg/sql/rowexec/countrows.go @@ -68,12 +68,8 @@ func newCountAggregator( } func (ag *countAggregator) Start(ctx context.Context) { - ag.input.Start(ctx) ctx = ag.StartInternal(ctx, countRowsProcName) - // Go around "this value of ctx is never used" linter error. We do it this - // way instead of omitting the assignment to ctx above so that if in the - // future other initialization is added, the correct ctx is used. - _ = ctx + ag.input.Start(ctx) } func (ag *countAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) { diff --git a/pkg/sql/rowexec/distinct.go b/pkg/sql/rowexec/distinct.go index 8920d93b9e2d..4b1dc4733f66 100644 --- a/pkg/sql/rowexec/distinct.go +++ b/pkg/sql/rowexec/distinct.go @@ -142,22 +142,14 @@ func newDistinct( // Start is part of the RowSource interface. func (d *distinct) Start(ctx context.Context) { - d.input.Start(ctx) ctx = d.StartInternal(ctx, distinctProcName) - // Go around "this value of ctx is never used" linter error. We do it this - // way instead of omitting the assignment to ctx above so that if in the - // future other initialization is added, the correct ctx is used. - _ = ctx + d.input.Start(ctx) } // Start is part of the RowSource interface. func (d *sortedDistinct) Start(ctx context.Context) { - d.input.Start(ctx) ctx = d.StartInternal(ctx, sortedDistinctProcName) - // Go around "this value of ctx is never used" linter error. We do it this - // way instead of omitting the assignment to ctx above so that if in the - // future other initialization is added, the correct ctx is used. - _ = ctx + d.input.Start(ctx) } func (d *distinct) matchLastGroupKey(row rowenc.EncDatumRow) (bool, error) { diff --git a/pkg/sql/rowexec/filterer.go b/pkg/sql/rowexec/filterer.go index b133c4e6c918..caa5d20eb5ae 100644 --- a/pkg/sql/rowexec/filterer.go +++ b/pkg/sql/rowexec/filterer.go @@ -71,12 +71,8 @@ func newFiltererProcessor( // Start is part of the RowSource interface. func (f *filtererProcessor) Start(ctx context.Context) { - f.input.Start(ctx) ctx = f.StartInternal(ctx, filtererProcName) - // Go around "this value of ctx is never used" linter error. We do it this - // way instead of omitting the assignment to ctx above so that if in the - // future other initialization is added, the correct ctx is used. - _ = ctx + f.input.Start(ctx) } // Next is part of the RowSource interface. diff --git a/pkg/sql/rowexec/hashjoiner.go b/pkg/sql/rowexec/hashjoiner.go index 5ae8ea6bd592..c15f3d626005 100644 --- a/pkg/sql/rowexec/hashjoiner.go +++ b/pkg/sql/rowexec/hashjoiner.go @@ -158,9 +158,9 @@ func newHashJoiner( // Start is part of the RowSource interface. func (h *hashJoiner) Start(ctx context.Context) { + ctx = h.StartInternal(ctx, hashJoinerProcName) h.leftSource.Start(ctx) h.rightSource.Start(ctx) - ctx = h.StartInternal(ctx, hashJoinerProcName) h.cancelChecker = cancelchecker.NewCancelChecker(ctx) h.runningState = hjBuilding } diff --git a/pkg/sql/rowexec/inverted_filterer.go b/pkg/sql/rowexec/inverted_filterer.go index dd9ae84ee904..3222993e6419 100644 --- a/pkg/sql/rowexec/inverted_filterer.go +++ b/pkg/sql/rowexec/inverted_filterer.go @@ -288,12 +288,8 @@ func (ifr *invertedFilterer) emitRow() ( // Start is part of the RowSource interface. func (ifr *invertedFilterer) Start(ctx context.Context) { - ifr.input.Start(ctx) ctx = ifr.StartInternal(ctx, invertedFiltererProcName) - // Go around "this value of ctx is never used" linter error. We do it this - // way instead of omitting the assignment to ctx above so that if in the - // future other initialization is added, the correct ctx is used. - _ = ctx + ifr.input.Start(ctx) ifr.runningState = ifrReadingInput } diff --git a/pkg/sql/rowexec/inverted_joiner.go b/pkg/sql/rowexec/inverted_joiner.go index 9ee66480e1c4..bed24816f59a 100644 --- a/pkg/sql/rowexec/inverted_joiner.go +++ b/pkg/sql/rowexec/inverted_joiner.go @@ -734,12 +734,8 @@ func (ij *invertedJoiner) transformToTableRow(indexRow rowenc.EncDatumRow) { // Start is part of the RowSource interface. func (ij *invertedJoiner) Start(ctx context.Context) { - ij.input.Start(ctx) ctx = ij.StartInternal(ctx, invertedJoinerProcName) - // Go around "this value of ctx is never used" linter error. We do it this - // way instead of omitting the assignment to ctx above so that if in the - // future other initialization is added, the correct ctx is used. - _ = ctx + ij.input.Start(ctx) ij.runningState = ijReadingInput } diff --git a/pkg/sql/rowexec/joinreader.go b/pkg/sql/rowexec/joinreader.go index 9e40508cb063..2cc85bd3c52b 100644 --- a/pkg/sql/rowexec/joinreader.go +++ b/pkg/sql/rowexec/joinreader.go @@ -700,12 +700,8 @@ func (jr *joinReader) emitRow() ( // Start is part of the RowSource interface. func (jr *joinReader) Start(ctx context.Context) { - jr.input.Start(ctx) ctx = jr.StartInternal(ctx, joinReaderProcName) - // Go around "this value of ctx is never used" linter error. We do it this - // way instead of omitting the assignment to ctx above so that if in the - // future other initialization is added, the correct ctx is used. - _ = ctx + jr.input.Start(ctx) jr.runningState = jrReadingInput } diff --git a/pkg/sql/rowexec/mergejoiner.go b/pkg/sql/rowexec/mergejoiner.go index 15ce6a7e5250..e000ae613eb3 100644 --- a/pkg/sql/rowexec/mergejoiner.go +++ b/pkg/sql/rowexec/mergejoiner.go @@ -115,8 +115,8 @@ func newMergeJoiner( // Start is part of the RowSource interface. func (m *mergeJoiner) Start(ctx context.Context) { - m.streamMerger.start(ctx) ctx = m.StartInternal(ctx, mergeJoinerProcName) + m.streamMerger.start(ctx) m.cancelChecker = cancelchecker.NewCancelChecker(ctx) } diff --git a/pkg/sql/rowexec/noop.go b/pkg/sql/rowexec/noop.go index 4b2cd3913f57..8c2dc8577a8c 100644 --- a/pkg/sql/rowexec/noop.go +++ b/pkg/sql/rowexec/noop.go @@ -64,12 +64,8 @@ func newNoopProcessor( // Start is part of the RowSource interface. func (n *noopProcessor) Start(ctx context.Context) { - n.input.Start(ctx) ctx = n.StartInternal(ctx, noopProcName) - // Go around "this value of ctx is never used" linter error. We do it this - // way instead of omitting the assignment to ctx above so that if in the - // future other initialization is added, the correct ctx is used. - _ = ctx + n.input.Start(ctx) } // Next is part of the RowSource interface. diff --git a/pkg/sql/rowexec/ordinality.go b/pkg/sql/rowexec/ordinality.go index 55e350b1481a..52af5e2fdf71 100644 --- a/pkg/sql/rowexec/ordinality.go +++ b/pkg/sql/rowexec/ordinality.go @@ -76,12 +76,8 @@ func newOrdinalityProcessor( // Start is part of the RowSource interface. func (o *ordinalityProcessor) Start(ctx context.Context) { - o.input.Start(ctx) ctx = o.StartInternal(ctx, ordinalityProcName) - // Go around "this value of ctx is never used" linter error. We do it this - // way instead of omitting the assignment to ctx above so that if in the - // future other initialization is added, the correct ctx is used. - _ = ctx + o.input.Start(ctx) } // Next is part of the RowSource interface. diff --git a/pkg/sql/rowexec/project_set.go b/pkg/sql/rowexec/project_set.go index e03cfca3307b..31b5a9434430 100644 --- a/pkg/sql/rowexec/project_set.go +++ b/pkg/sql/rowexec/project_set.go @@ -116,8 +116,8 @@ func newProjectSetProcessor( // Start is part of the RowSource interface. func (ps *projectSetProcessor) Start(ctx context.Context) { - ps.input.Start(ctx) ctx = ps.StartInternal(ctx, projectSetProcName) + ps.input.Start(ctx) ps.cancelChecker = cancelchecker.NewCancelChecker(ctx) } diff --git a/pkg/sql/rowexec/sample_aggregator.go b/pkg/sql/rowexec/sample_aggregator.go index 22a03c4b13fa..b63fcab44494 100644 --- a/pkg/sql/rowexec/sample_aggregator.go +++ b/pkg/sql/rowexec/sample_aggregator.go @@ -181,18 +181,14 @@ func (s *sampleAggregator) pushTrailingMeta(ctx context.Context) { // Run is part of the Processor interface. func (s *sampleAggregator) Run(ctx context.Context) { - s.input.Start(ctx) ctx = s.StartInternal(ctx, sampleAggregatorProcName) - // Go around "this value of ctx is never used" linter error. We do it this - // way instead of omitting the assignment to ctx above so that if in the - // future other initialization is added, the correct ctx is used. - _ = ctx + s.input.Start(ctx) - earlyExit, err := s.mainLoop(s.Ctx) + earlyExit, err := s.mainLoop(ctx) if err != nil { - execinfra.DrainAndClose(s.Ctx, s.Out.Output(), err, s.pushTrailingMeta, s.input) + execinfra.DrainAndClose(ctx, s.Out.Output(), err, s.pushTrailingMeta, s.input) } else if !earlyExit { - s.pushTrailingMeta(s.Ctx) + s.pushTrailingMeta(ctx) s.input.ConsumerClosed() s.Out.Close() } diff --git a/pkg/sql/rowexec/sampler.go b/pkg/sql/rowexec/sampler.go index 45d6ba5c9784..4b00ebeb9c74 100644 --- a/pkg/sql/rowexec/sampler.go +++ b/pkg/sql/rowexec/sampler.go @@ -218,18 +218,14 @@ func (s *samplerProcessor) pushTrailingMeta(ctx context.Context) { // Run is part of the Processor interface. func (s *samplerProcessor) Run(ctx context.Context) { - s.input.Start(ctx) ctx = s.StartInternal(ctx, samplerProcName) - // Go around "this value of ctx is never used" linter error. We do it this - // way instead of omitting the assignment to ctx above so that if in the - // future other initialization is added, the correct ctx is used. - _ = ctx + s.input.Start(ctx) - earlyExit, err := s.mainLoop(s.Ctx) + earlyExit, err := s.mainLoop(ctx) if err != nil { - execinfra.DrainAndClose(s.Ctx, s.Out.Output(), err, s.pushTrailingMeta, s.input) + execinfra.DrainAndClose(ctx, s.Out.Output(), err, s.pushTrailingMeta, s.input) } else if !earlyExit { - s.pushTrailingMeta(s.Ctx) + s.pushTrailingMeta(ctx) s.input.ConsumerClosed() s.Out.Close() } diff --git a/pkg/sql/rowexec/sorter.go b/pkg/sql/rowexec/sorter.go index 754e1e2f4641..44a9fe60f01b 100644 --- a/pkg/sql/rowexec/sorter.go +++ b/pkg/sql/rowexec/sorter.go @@ -227,12 +227,8 @@ func newSortAllProcessor( // Start is part of the RowSource interface. func (s *sortAllProcessor) Start(ctx context.Context) { - s.input.Start(ctx) ctx = s.StartInternal(ctx, sortAllProcName) - // Go around "this value of ctx is never used" linter error. We do it this - // way instead of omitting the assignment to ctx above so that if in the - // future other initialization is added, the correct ctx is used. - _ = ctx + s.input.Start(ctx) valid, err := s.fill() if !valid || err != nil { @@ -343,8 +339,8 @@ func newSortTopKProcessor( // Start is part of the RowSource interface. func (s *sortTopKProcessor) Start(ctx context.Context) { - s.input.Start(ctx) ctx = s.StartInternal(ctx, sortTopKProcName) + s.input.Start(ctx) // The execution loop for the SortTopK processor is similar to that of the // SortAll processor; the difference is that we push rows into a max-heap @@ -526,12 +522,8 @@ func (s *sortChunksProcessor) fill() (bool, error) { // Start is part of the RowSource interface. func (s *sortChunksProcessor) Start(ctx context.Context) { - s.input.Start(ctx) ctx = s.StartInternal(ctx, sortChunksProcName) - // Go around "this value of ctx is never used" linter error. We do it this - // way instead of omitting the assignment to ctx above so that if in the - // future other initialization is added, the correct ctx is used. - _ = ctx + s.input.Start(ctx) } // Next is part of the RowSource interface. diff --git a/pkg/sql/rowexec/windower.go b/pkg/sql/rowexec/windower.go index 72812329b417..8fcaefb878a6 100644 --- a/pkg/sql/rowexec/windower.go +++ b/pkg/sql/rowexec/windower.go @@ -205,8 +205,8 @@ func newWindower( // Start is part of the RowSource interface. func (w *windower) Start(ctx context.Context) { - w.input.Start(ctx) ctx = w.StartInternal(ctx, windowerProcName) + w.input.Start(ctx) w.cancelChecker = cancelchecker.NewCancelChecker(ctx) w.runningState = windowerAccumulating } diff --git a/pkg/sql/sem/tree/stream_ingestion.go b/pkg/sql/sem/tree/stream_ingestion.go index 1607f391aac1..aeaad0ea713f 100644 --- a/pkg/sql/sem/tree/stream_ingestion.go +++ b/pkg/sql/sem/tree/stream_ingestion.go @@ -14,6 +14,7 @@ package tree type StreamIngestion struct { Targets TargetList From StringOrPlaceholderOptList + AsOf AsOfClause } var _ Statement = &StreamIngestion{} @@ -25,4 +26,8 @@ func (node *StreamIngestion) Format(ctx *FmtCtx) { ctx.WriteString(" ") ctx.WriteString("FROM REPLICATION STREAM FROM ") ctx.FormatNode(&node.From) + if node.AsOf.Expr != nil { + ctx.WriteString(" ") + ctx.FormatNode(&node.AsOf) + } } diff --git a/pkg/storage/bench_test.go b/pkg/storage/bench_test.go index 188df3cbacd2..aa64153d59f3 100644 --- a/pkg/storage/bench_test.go +++ b/pkg/storage/bench_test.go @@ -141,7 +141,7 @@ func BenchmarkExportToSst(b *testing.B) { const numIntentKeys = 1000 func setupKeysWithIntent( - b *testing.B, eng Engine, numVersions int, numFlushedVersions int, + b *testing.B, eng Engine, numVersions int, numFlushedVersions int, resolveAll bool, ) roachpb.LockUpdate { txnIDCount := 2 * numVersions val := []byte("value") @@ -179,7 +179,7 @@ func setupKeysWithIntent( Txn: txn.TxnMeta, Status: roachpb.COMMITTED, } - if i < numVersions { + if i < numVersions || resolveAll { batch := eng.NewBatch() for j := 0; j < numIntentKeys; j++ { key := makeKey(nil, j) @@ -198,6 +198,8 @@ func setupKeysWithIntent( return lockUpdate } +// BenchmarkIntentScan compares separated and interleaved intents, when +// reading the intent and latest version for a range of keys. func BenchmarkIntentScan(b *testing.B) { skip.UnderShort(b, "setting up unflushed data takes too long") for _, sep := range []bool{false, true} { @@ -209,7 +211,7 @@ func BenchmarkIntentScan(b *testing.B) { eng := setupMVCCInMemPebbleWithSettings( b, makeSettingsForSeparatedIntents(false, sep)) numFlushedVersions := (percentFlushed * numVersions) / 100 - setupKeysWithIntent(b, eng, numVersions, numFlushedVersions) + setupKeysWithIntent(b, eng, numVersions, numFlushedVersions, false /* resolveAll */) lower := makeKey(nil, 0) iter := eng.NewMVCCIterator(MVCCKeyAndIntentsIterKind, IterOptions{ LowerBound: lower, @@ -258,6 +260,58 @@ func BenchmarkIntentScan(b *testing.B) { } } +// BenchmarkScanAllIntentsResolved compares separated and interleaved intents, +// when reading the latest version for a range of keys, when all the intents +// have been resolved. +func BenchmarkScanAllIntentsResolved(b *testing.B) { + skip.UnderShort(b, "setting up unflushed data takes too long") + for _, sep := range []bool{false, true} { + b.Run(fmt.Sprintf("separated=%t", sep), func(b *testing.B) { + for _, numVersions := range []int{200} { + b.Run(fmt.Sprintf("versions=%d", numVersions), func(b *testing.B) { + for _, percentFlushed := range []int{0, 50, 90, 100} { + b.Run(fmt.Sprintf("percent-flushed=%d", percentFlushed), func(b *testing.B) { + eng := setupMVCCInMemPebbleWithSettings( + b, makeSettingsForSeparatedIntents(false, sep)) + numFlushedVersions := (percentFlushed * numVersions) / 100 + setupKeysWithIntent(b, eng, numVersions, numFlushedVersions, true /* resolveAll */) + lower := makeKey(nil, 0) + iter := eng.NewMVCCIterator(MVCCKeyAndIntentsIterKind, IterOptions{ + LowerBound: lower, + UpperBound: makeKey(nil, numIntentKeys), + }) + iter.SeekGE(MVCCKey{Key: lower}) + var buf []byte + b.ResetTimer() + for i := 0; i < b.N; i++ { + valid, err := iter.Valid() + if err != nil { + b.Fatal(err) + } + if !valid { + iter.SeekGE(MVCCKey{Key: lower}) + } else { + // Read latest version. + k := iter.UnsafeKey() + if !k.IsValue() { + b.Fatalf("expected value %s", k.String()) + } + // Skip to next key. + buf = append(buf[:0], k.Key...) + buf = roachpb.BytesNext(buf) + iter.SeekGE(MVCCKey{Key: buf}) + } + } + }) + } + }) + } + }) + } +} + +// BenchmarkIntentResolution compares separated and interleaved intents, when +// doing intent resolution for individual intents. func BenchmarkIntentResolution(b *testing.B) { skip.UnderShort(b, "setting up unflushed data takes too long") for _, sep := range []bool{false, true} { @@ -269,7 +323,7 @@ func BenchmarkIntentResolution(b *testing.B) { eng := setupMVCCInMemPebbleWithSettings( b, makeSettingsForSeparatedIntents(false, sep)) numFlushedVersions := (percentFlushed * numVersions) / 100 - lockUpdate := setupKeysWithIntent(b, eng, numVersions, numFlushedVersions) + lockUpdate := setupKeysWithIntent(b, eng, numVersions, numFlushedVersions, false /* resolveAll */) keys := make([]roachpb.Key, numIntentKeys) for i := range keys { keys[i] = makeKey(nil, i) @@ -298,6 +352,8 @@ func BenchmarkIntentResolution(b *testing.B) { } } +// BenchmarkIntentRangeResolution compares separated and interleaved intents, +// when doing ranged intent resolution. func BenchmarkIntentRangeResolution(b *testing.B) { skip.UnderShort(b, "setting up unflushed data takes too long") for _, sep := range []bool{false, true} { @@ -309,7 +365,7 @@ func BenchmarkIntentRangeResolution(b *testing.B) { eng := setupMVCCInMemPebbleWithSettings( b, makeSettingsForSeparatedIntents(false, sep)) numFlushedVersions := (percentFlushed * numVersions) / 100 - lockUpdate := setupKeysWithIntent(b, eng, numVersions, numFlushedVersions) + lockUpdate := setupKeysWithIntent(b, eng, numVersions, numFlushedVersions, false /* resolveAll */) keys := make([]roachpb.Key, numIntentKeys+1) for i := range keys { keys[i] = makeKey(nil, i)