Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
61037: streamingest: add AOST to stream ingestion job r=adityamaru,dt a=pbardea

To be rebased on the PR that introduces the core changefeed stream client.


Release justification: low-risk (very experimental feature), high reward
(AOST stream ingestion)

Release note: None

61197: execinfra: fix starting of processors r=yuzefovich a=yuzefovich

Almost all processors embed `ProcessorBase` and call `StartInternal` on
it. That function derives a new context and starts a tracing span if the
tracing is enabled on the parent ctx and the derived context is
returned. Previously, in all callsites the inputs to the processors
would get `Start`ed before `StartInternal` is called, but this doesn't
make sense: the consumer's ctx and span should be encompassing the
producer's (input's) ctx and span, so this commit switches all callsites
to the following layout
```
  ctx = p.StartInternal(ctx)
  input.Start(ctx) // if there are any inputs
```

Release justification: bug fix.

Release note: None

61198: storage: add a scan benchmark with resolved intents r=sumeerbhola a=sumeerbhola

It stresses the case where many versions were written
transactionally and all were resolved.

This is the benchmark mentioned in
cockroachdb/pebble#1067
to justify the Pebble seek noop optimization.

Release justification: Non-production code change that
adds a benchmark.

Release note: None

Co-authored-by: Paul Bardea <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: sumeerbhola <[email protected]>
  • Loading branch information
4 people committed Mar 1, 2021
4 parents 09514e9 + ca795b7 + 1032aea + de2f6f9 commit 0fe3d67
Show file tree
Hide file tree
Showing 33 changed files with 121 additions and 117 deletions.
3 changes: 2 additions & 1 deletion docs/generated/sql/bnf/restore.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ restore_stmt ::=
| 'RESTORE' ( ( 'TABLE' | ) table_pattern ( ( ',' table_pattern ) )* | 'DATABASE' database_name ( ( ',' database_name ) )* ) 'FROM' subdirectory 'IN' full_backup_location ( | partitioned_backup_location ( ',' partitioned_backup_location )*) 'WITH' restore_options_list
| 'RESTORE' ( ( 'TABLE' | ) table_pattern ( ( ',' table_pattern ) )* | 'DATABASE' database_name ( ( ',' database_name ) )* ) 'FROM' subdirectory 'IN' full_backup_location ( | partitioned_backup_location ( ',' partitioned_backup_location )*) 'WITH' 'OPTIONS' '(' restore_options_list ')'
| 'RESTORE' ( ( 'TABLE' | ) table_pattern ( ( ',' table_pattern ) )* | 'DATABASE' database_name ( ( ',' database_name ) )* ) 'FROM' subdirectory 'IN' full_backup_location ( | partitioned_backup_location ( ',' partitioned_backup_location )*)
| 'RESTORE' ( ( 'TABLE' | ) table_pattern ( ( ',' table_pattern ) )* | 'DATABASE' database_name ( ( ',' database_name ) )* ) 'FROM' 'REPLICATION' 'STREAM' 'FROM' subdirectory_opt_list
| 'RESTORE' ( ( 'TABLE' | ) table_pattern ( ( ',' table_pattern ) )* | 'DATABASE' database_name ( ( ',' database_name ) )* ) 'FROM' 'REPLICATION' 'STREAM' 'FROM' subdirectory_opt_list 'AS' 'OF' 'SYSTEM' 'TIME' timestamp
| 'RESTORE' ( ( 'TABLE' | ) table_pattern ( ( ',' table_pattern ) )* | 'DATABASE' database_name ( ( ',' database_name ) )* ) 'FROM' 'REPLICATION' 'STREAM' 'FROM' subdirectory_opt_list
2 changes: 1 addition & 1 deletion docs/generated/sql/bnf/stmt_block.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ restore_stmt ::=
| 'RESTORE' 'FROM' string_or_placeholder 'IN' list_of_string_or_placeholder_opt_list opt_as_of_clause opt_with_restore_options
| 'RESTORE' targets 'FROM' list_of_string_or_placeholder_opt_list opt_as_of_clause opt_with_restore_options
| 'RESTORE' targets 'FROM' string_or_placeholder 'IN' list_of_string_or_placeholder_opt_list opt_as_of_clause opt_with_restore_options
| 'RESTORE' targets 'FROM' 'REPLICATION' 'STREAM' 'FROM' string_or_placeholder_opt_list
| 'RESTORE' targets 'FROM' 'REPLICATION' 'STREAM' 'FROM' string_or_placeholder_opt_list opt_as_of_clause

resume_stmt ::=
resume_jobs_stmt
Expand Down
6 changes: 1 addition & 5 deletions pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,8 @@ func newRestoreDataProcessor(

// Start is part of the RowSource interface.
func (rd *restoreDataProcessor) Start(ctx context.Context) {
rd.input.Start(ctx)
ctx = rd.StartInternal(ctx, restoreDataProcName)
// Go around "this value of ctx is never used" linter error. We do it this
// way instead of omitting the assignment to ctx above so that if in the
// future other initialization is added, the correct ctx is used.
_ = ctx
rd.input.Start(ctx)
}

// Next is part of the RowSource interface.
Expand Down
3 changes: 1 addition & 2 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -906,11 +906,10 @@ func newChangeFrontierProcessor(

// Start is part of the RowSource interface.
func (cf *changeFrontier) Start(ctx context.Context) {
cf.input.Start(ctx)

// StartInternal called at the beginning of the function because there are
// early returns if errors are detected.
ctx = cf.StartInternal(ctx, changeFrontierProcName)
cf.input.Start(ctx)

// Pass a nil oracle because this sink is only used to emit resolved timestamps
// but the oracle is only used when emitting row updates.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,8 @@ func newStreamIngestionFrontierProcessor(

// Start is part of the RowSource interface.
func (sf *streamIngestionFrontier) Start(ctx context.Context) {
sf.input.Start(ctx)
ctx = sf.StartInternal(ctx, streamIngestionFrontierProcName)
// Go around "this value of ctx is never used" linter error. We do it this
// way instead of omitting the assignment to ctx above so that if in the
// future other initialization is added, the correct ctx is used.
_ = ctx
sf.input.Start(ctx)
}

// Next is part of the RowSource interface.
Expand Down
5 changes: 3 additions & 2 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type streamIngestionResumer struct {
func ingest(
ctx context.Context,
execCtx sql.JobExecContext,
startTime hlc.Timestamp,
streamAddress streamingccl.StreamAddress,
progress jobspb.Progress,
jobID jobspb.JobID,
Expand All @@ -49,7 +50,7 @@ func ingest(
// KVs. We can skip to ingesting after this resolved ts. Plumb the
// initialHighwatermark to the ingestion processor spec based on what we read
// from the job progress.
var initialHighWater hlc.Timestamp
initialHighWater := startTime
if h := progress.GetHighWater(); h != nil && !h.IsEmpty() {
initialHighWater = *h
}
Expand Down Expand Up @@ -80,7 +81,7 @@ func (s *streamIngestionResumer) Resume(resumeCtx context.Context, execCtx inter
p := execCtx.(sql.JobExecContext)

// Start ingesting KVs from the replication stream.
err := ingest(resumeCtx, p, details.StreamAddress, s.job.Progress(), s.job.ID())
err := ingest(resumeCtx, p, details.StartTime, details.StreamAddress, s.job.Progress(), s.job.ID())
if err != nil {
return err
}
Expand Down
13 changes: 11 additions & 2 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -92,11 +93,19 @@ func ingestionPlanHook(
// TODO(adityamaru): Add privileges checks. Probably the same as RESTORE.

prefix := keys.MakeTenantPrefix(ingestionStmt.Targets.Tenant)
startTime := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}
if ingestionStmt.AsOf.Expr != nil {
var err error
startTime, err = p.EvalAsOfTimestamp(ctx, ingestionStmt.AsOf)
if err != nil {
return err
}
}

streamIngestionDetails := jobspb.StreamIngestionDetails{
StreamAddress: streamingccl.StreamAddress(from[0]),
Span: roachpb.Span{Key: prefix, EndKey: prefix.PrefixEnd()},
// TODO: Figure out what the initial ts should be.
StartTime: hlc.Timestamp{},
StartTime: startTime,
}

jobDescription, err := streamIngestionJobDescription(p, ingestionStmt)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func distStreamIngestionPlanSpecs(
if i < len(nodes) {
spec := &execinfrapb.StreamIngestionDataSpec{
JobID: int64(jobID),
StartTime: initialHighWater,
StreamAddress: streamAddress,
PartitionAddresses: make([]streamingccl.PartitionAddress, 0),
}
Expand Down
6 changes: 1 addition & 5 deletions pkg/sql/colexec/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,12 +233,8 @@ func (m *Materializer) Child(nth int, verbose bool) execinfra.OpNode {

// Start is part of the execinfra.RowSource interface.
func (m *Materializer) Start(ctx context.Context) {
m.drainHelper.Start(ctx)
ctx = m.ProcessorBase.StartInternal(ctx, materializerProcName)
// Go around "this value of ctx is never used" linter error. We do it this
// way instead of omitting the assignment to ctx above so that if in the
// future other initialization is added, the correct ctx is used.
_ = ctx
m.drainHelper.Start(ctx)
// We can encounter an expected error during Init (e.g. an operator
// attempts to allocate a batch, but the memory budget limit has been
// reached), so we need to wrap it with a catcher.
Expand Down
6 changes: 1 addition & 5 deletions pkg/sql/execinfra/metadata_test_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,8 @@ func (mtr *MetadataTestReceiver) checkRowNumMetadata() *execinfrapb.ProducerMeta

// Start is part of the RowSource interface.
func (mtr *MetadataTestReceiver) Start(ctx context.Context) {
mtr.input.Start(ctx)
ctx = mtr.StartInternal(ctx, metadataTestReceiverProcName)
// Go around "this value of ctx is never used" linter error. We do it this
// way instead of omitting the assignment to ctx above so that if in the
// future other initialization is added, the correct ctx is used.
_ = ctx
mtr.input.Start(ctx)
}

// Next is part of the RowSource interface.
Expand Down
6 changes: 1 addition & 5 deletions pkg/sql/execinfra/metadata_test_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,8 @@ func NewMetadataTestSender(

// Start is part of the RowSource interface.
func (mts *MetadataTestSender) Start(ctx context.Context) {
mts.input.Start(ctx)
ctx = mts.StartInternal(ctx, metadataTestSenderProcName)
// Go around "this value of ctx is never used" linter error. We do it this
// way instead of omitting the assignment to ctx above so that if in the
// future other initialization is added, the correct ctx is used.
_ = ctx
mts.input.Start(ctx)
}

// Next is part of the RowSource interface.
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/execinfra/processorsbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,7 @@ func ProcessorSpan(ctx context.Context, name string) (context.Context, *tracing.
// It is likely that this method is called from RowSource.Start implementation,
// and the recommended layout is the following:
// ctx = pb.StartInternal(ctx, name)
// < inputs >.Start(ctx) // if there are any inputs-RowSources to pb
// < other initialization >
// so that the caller doesn't mistakenly use old ctx object.
func (pb *ProcessorBase) StartInternal(ctx context.Context, name string) context.Context {
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/parser/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1729,6 +1729,8 @@ func TestParse(t *testing.T) {

{`RESTORE TENANT 123 FROM REPLICATION STREAM FROM 'bar'`},
{`RESTORE TENANT 123 FROM REPLICATION STREAM FROM $1`},
{`RESTORE TENANT 123 FROM REPLICATION STREAM FROM 'bar' AS OF SYSTEM TIME '1'`},
{`RESTORE TENANT 123 FROM REPLICATION STREAM FROM $1 AS OF SYSTEM TIME '1'`},

// Currently, we only support TENANT as a target. We have grammar rules for
// all targets supported by RESTORE but these will error out during the
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/parser/sql.y
Original file line number Diff line number Diff line change
Expand Up @@ -2639,11 +2639,12 @@ restore_stmt:
Options: *($8.restoreOptions()),
}
}
| RESTORE targets FROM REPLICATION STREAM FROM string_or_placeholder_opt_list
| RESTORE targets FROM REPLICATION STREAM FROM string_or_placeholder_opt_list opt_as_of_clause
{
$$.val = &tree.StreamIngestion{
Targets: $2.targetList(),
From: $7.stringOrPlaceholderOptList(),
AsOf: $8.asOfClause(),
}
}
| RESTORE error // SHOW HELP: RESTORE
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/rowexec/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,8 @@ func (ag *orderedAggregator) Start(ctx context.Context) {
}

func (ag *aggregatorBase) start(ctx context.Context, procName string) {
ag.input.Start(ctx)
ctx = ag.StartInternal(ctx, procName)
ag.input.Start(ctx)
ag.cancelChecker = cancelchecker.NewCancelChecker(ctx)
ag.runningState = aggAccumulating
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/rowexec/bulk_row_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ func newBulkRowWriterProcessor(

// Start is part of the RowSource interface.
func (sp *bulkRowWriter) Start(ctx context.Context) {
sp.input.Start(ctx)
ctx = sp.StartInternal(ctx, "bulkRowWriter")
sp.input.Start(ctx)
err := sp.work(ctx)
sp.MoveToDraining(err)
}
Expand Down
6 changes: 1 addition & 5 deletions pkg/sql/rowexec/countrows.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,8 @@ func newCountAggregator(
}

func (ag *countAggregator) Start(ctx context.Context) {
ag.input.Start(ctx)
ctx = ag.StartInternal(ctx, countRowsProcName)
// Go around "this value of ctx is never used" linter error. We do it this
// way instead of omitting the assignment to ctx above so that if in the
// future other initialization is added, the correct ctx is used.
_ = ctx
ag.input.Start(ctx)
}

func (ag *countAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) {
Expand Down
12 changes: 2 additions & 10 deletions pkg/sql/rowexec/distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,22 +142,14 @@ func newDistinct(

// Start is part of the RowSource interface.
func (d *distinct) Start(ctx context.Context) {
d.input.Start(ctx)
ctx = d.StartInternal(ctx, distinctProcName)
// Go around "this value of ctx is never used" linter error. We do it this
// way instead of omitting the assignment to ctx above so that if in the
// future other initialization is added, the correct ctx is used.
_ = ctx
d.input.Start(ctx)
}

// Start is part of the RowSource interface.
func (d *sortedDistinct) Start(ctx context.Context) {
d.input.Start(ctx)
ctx = d.StartInternal(ctx, sortedDistinctProcName)
// Go around "this value of ctx is never used" linter error. We do it this
// way instead of omitting the assignment to ctx above so that if in the
// future other initialization is added, the correct ctx is used.
_ = ctx
d.input.Start(ctx)
}

func (d *distinct) matchLastGroupKey(row rowenc.EncDatumRow) (bool, error) {
Expand Down
6 changes: 1 addition & 5 deletions pkg/sql/rowexec/filterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,8 @@ func newFiltererProcessor(

// Start is part of the RowSource interface.
func (f *filtererProcessor) Start(ctx context.Context) {
f.input.Start(ctx)
ctx = f.StartInternal(ctx, filtererProcName)
// Go around "this value of ctx is never used" linter error. We do it this
// way instead of omitting the assignment to ctx above so that if in the
// future other initialization is added, the correct ctx is used.
_ = ctx
f.input.Start(ctx)
}

// Next is part of the RowSource interface.
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/rowexec/hashjoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,9 @@ func newHashJoiner(

// Start is part of the RowSource interface.
func (h *hashJoiner) Start(ctx context.Context) {
ctx = h.StartInternal(ctx, hashJoinerProcName)
h.leftSource.Start(ctx)
h.rightSource.Start(ctx)
ctx = h.StartInternal(ctx, hashJoinerProcName)
h.cancelChecker = cancelchecker.NewCancelChecker(ctx)
h.runningState = hjBuilding
}
Expand Down
6 changes: 1 addition & 5 deletions pkg/sql/rowexec/inverted_filterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,12 +288,8 @@ func (ifr *invertedFilterer) emitRow() (

// Start is part of the RowSource interface.
func (ifr *invertedFilterer) Start(ctx context.Context) {
ifr.input.Start(ctx)
ctx = ifr.StartInternal(ctx, invertedFiltererProcName)
// Go around "this value of ctx is never used" linter error. We do it this
// way instead of omitting the assignment to ctx above so that if in the
// future other initialization is added, the correct ctx is used.
_ = ctx
ifr.input.Start(ctx)
ifr.runningState = ifrReadingInput
}

Expand Down
6 changes: 1 addition & 5 deletions pkg/sql/rowexec/inverted_joiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,12 +734,8 @@ func (ij *invertedJoiner) transformToTableRow(indexRow rowenc.EncDatumRow) {

// Start is part of the RowSource interface.
func (ij *invertedJoiner) Start(ctx context.Context) {
ij.input.Start(ctx)
ctx = ij.StartInternal(ctx, invertedJoinerProcName)
// Go around "this value of ctx is never used" linter error. We do it this
// way instead of omitting the assignment to ctx above so that if in the
// future other initialization is added, the correct ctx is used.
_ = ctx
ij.input.Start(ctx)
ij.runningState = ijReadingInput
}

Expand Down
6 changes: 1 addition & 5 deletions pkg/sql/rowexec/joinreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,12 +700,8 @@ func (jr *joinReader) emitRow() (

// Start is part of the RowSource interface.
func (jr *joinReader) Start(ctx context.Context) {
jr.input.Start(ctx)
ctx = jr.StartInternal(ctx, joinReaderProcName)
// Go around "this value of ctx is never used" linter error. We do it this
// way instead of omitting the assignment to ctx above so that if in the
// future other initialization is added, the correct ctx is used.
_ = ctx
jr.input.Start(ctx)
jr.runningState = jrReadingInput
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/rowexec/mergejoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ func newMergeJoiner(

// Start is part of the RowSource interface.
func (m *mergeJoiner) Start(ctx context.Context) {
m.streamMerger.start(ctx)
ctx = m.StartInternal(ctx, mergeJoinerProcName)
m.streamMerger.start(ctx)
m.cancelChecker = cancelchecker.NewCancelChecker(ctx)
}

Expand Down
6 changes: 1 addition & 5 deletions pkg/sql/rowexec/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,8 @@ func newNoopProcessor(

// Start is part of the RowSource interface.
func (n *noopProcessor) Start(ctx context.Context) {
n.input.Start(ctx)
ctx = n.StartInternal(ctx, noopProcName)
// Go around "this value of ctx is never used" linter error. We do it this
// way instead of omitting the assignment to ctx above so that if in the
// future other initialization is added, the correct ctx is used.
_ = ctx
n.input.Start(ctx)
}

// Next is part of the RowSource interface.
Expand Down
6 changes: 1 addition & 5 deletions pkg/sql/rowexec/ordinality.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,8 @@ func newOrdinalityProcessor(

// Start is part of the RowSource interface.
func (o *ordinalityProcessor) Start(ctx context.Context) {
o.input.Start(ctx)
ctx = o.StartInternal(ctx, ordinalityProcName)
// Go around "this value of ctx is never used" linter error. We do it this
// way instead of omitting the assignment to ctx above so that if in the
// future other initialization is added, the correct ctx is used.
_ = ctx
o.input.Start(ctx)
}

// Next is part of the RowSource interface.
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/rowexec/project_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ func newProjectSetProcessor(

// Start is part of the RowSource interface.
func (ps *projectSetProcessor) Start(ctx context.Context) {
ps.input.Start(ctx)
ctx = ps.StartInternal(ctx, projectSetProcName)
ps.input.Start(ctx)
ps.cancelChecker = cancelchecker.NewCancelChecker(ctx)
}

Expand Down
12 changes: 4 additions & 8 deletions pkg/sql/rowexec/sample_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,18 +181,14 @@ func (s *sampleAggregator) pushTrailingMeta(ctx context.Context) {

// Run is part of the Processor interface.
func (s *sampleAggregator) Run(ctx context.Context) {
s.input.Start(ctx)
ctx = s.StartInternal(ctx, sampleAggregatorProcName)
// Go around "this value of ctx is never used" linter error. We do it this
// way instead of omitting the assignment to ctx above so that if in the
// future other initialization is added, the correct ctx is used.
_ = ctx
s.input.Start(ctx)

earlyExit, err := s.mainLoop(s.Ctx)
earlyExit, err := s.mainLoop(ctx)
if err != nil {
execinfra.DrainAndClose(s.Ctx, s.Out.Output(), err, s.pushTrailingMeta, s.input)
execinfra.DrainAndClose(ctx, s.Out.Output(), err, s.pushTrailingMeta, s.input)
} else if !earlyExit {
s.pushTrailingMeta(s.Ctx)
s.pushTrailingMeta(ctx)
s.input.ConsumerClosed()
s.Out.Close()
}
Expand Down
Loading

0 comments on commit 0fe3d67

Please sign in to comment.