From ca795b7356c37890f793a3c339f4a58aa02a0c8f Mon Sep 17 00:00:00 2001 From: Paul Bardea Date: Tue, 23 Feb 2021 21:55:21 -0500 Subject: [PATCH] streamingest: add AOST to stream ingestion job Release justification: low-risk (very experimental feature), high reward (AOST stream ingestion) Release note: None --- docs/generated/sql/bnf/restore.bnf | 3 ++- docs/generated/sql/bnf/stmt_block.bnf | 2 +- .../streamingest/stream_ingestion_job.go | 5 +++-- .../streamingest/stream_ingestion_planning.go | 13 +++++++++++-- .../stream_ingestion_processor_planning.go | 1 + pkg/sql/parser/parse_test.go | 2 ++ pkg/sql/parser/sql.y | 3 ++- pkg/sql/sem/tree/stream_ingestion.go | 5 +++++ 8 files changed, 27 insertions(+), 7 deletions(-) diff --git a/docs/generated/sql/bnf/restore.bnf b/docs/generated/sql/bnf/restore.bnf index e83146e7eebe..ada90bea0a3b 100644 --- a/docs/generated/sql/bnf/restore.bnf +++ b/docs/generated/sql/bnf/restore.bnf @@ -23,4 +23,5 @@ restore_stmt ::= | 'RESTORE' ( ( 'TABLE' | ) table_pattern ( ( ',' table_pattern ) )* | 'DATABASE' database_name ( ( ',' database_name ) )* ) 'FROM' subdirectory 'IN' full_backup_location ( | partitioned_backup_location ( ',' partitioned_backup_location )*) 'WITH' restore_options_list | 'RESTORE' ( ( 'TABLE' | ) table_pattern ( ( ',' table_pattern ) )* | 'DATABASE' database_name ( ( ',' database_name ) )* ) 'FROM' subdirectory 'IN' full_backup_location ( | partitioned_backup_location ( ',' partitioned_backup_location )*) 'WITH' 'OPTIONS' '(' restore_options_list ')' | 'RESTORE' ( ( 'TABLE' | ) table_pattern ( ( ',' table_pattern ) )* | 'DATABASE' database_name ( ( ',' database_name ) )* ) 'FROM' subdirectory 'IN' full_backup_location ( | partitioned_backup_location ( ',' partitioned_backup_location )*) - | 'RESTORE' ( ( 'TABLE' | ) table_pattern ( ( ',' table_pattern ) )* | 'DATABASE' database_name ( ( ',' database_name ) )* ) 'FROM' 'REPLICATION' 'STREAM' 'FROM' subdirectory_opt_list + | 'RESTORE' ( ( 'TABLE' | ) table_pattern ( ( ',' table_pattern ) )* | 'DATABASE' database_name ( ( ',' database_name ) )* ) 'FROM' 'REPLICATION' 'STREAM' 'FROM' subdirectory_opt_list 'AS' 'OF' 'SYSTEM' 'TIME' timestamp + | 'RESTORE' ( ( 'TABLE' | ) table_pattern ( ( ',' table_pattern ) )* | 'DATABASE' database_name ( ( ',' database_name ) )* ) 'FROM' 'REPLICATION' 'STREAM' 'FROM' subdirectory_opt_list diff --git a/docs/generated/sql/bnf/stmt_block.bnf b/docs/generated/sql/bnf/stmt_block.bnf index 699ae80495fd..ba81f09c1851 100644 --- a/docs/generated/sql/bnf/stmt_block.bnf +++ b/docs/generated/sql/bnf/stmt_block.bnf @@ -180,7 +180,7 @@ restore_stmt ::= | 'RESTORE' 'FROM' string_or_placeholder 'IN' list_of_string_or_placeholder_opt_list opt_as_of_clause opt_with_restore_options | 'RESTORE' targets 'FROM' list_of_string_or_placeholder_opt_list opt_as_of_clause opt_with_restore_options | 'RESTORE' targets 'FROM' string_or_placeholder 'IN' list_of_string_or_placeholder_opt_list opt_as_of_clause opt_with_restore_options - | 'RESTORE' targets 'FROM' 'REPLICATION' 'STREAM' 'FROM' string_or_placeholder_opt_list + | 'RESTORE' targets 'FROM' 'REPLICATION' 'STREAM' 'FROM' string_or_placeholder_opt_list opt_as_of_clause resume_stmt ::= resume_jobs_stmt diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go index 924bca16aa1f..8bbf83eca36d 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go @@ -30,6 +30,7 @@ type streamIngestionResumer struct { func ingest( ctx context.Context, execCtx sql.JobExecContext, + startTime hlc.Timestamp, streamAddress streamingccl.StreamAddress, progress jobspb.Progress, jobID jobspb.JobID, @@ -49,7 +50,7 @@ func ingest( // KVs. We can skip to ingesting after this resolved ts. Plumb the // initialHighwatermark to the ingestion processor spec based on what we read // from the job progress. - var initialHighWater hlc.Timestamp + initialHighWater := startTime if h := progress.GetHighWater(); h != nil && !h.IsEmpty() { initialHighWater = *h } @@ -80,7 +81,7 @@ func (s *streamIngestionResumer) Resume(resumeCtx context.Context, execCtx inter p := execCtx.(sql.JobExecContext) // Start ingesting KVs from the replication stream. - err := ingest(resumeCtx, p, details.StreamAddress, s.job.Progress(), s.job.ID()) + err := ingest(resumeCtx, p, details.StartTime, details.StreamAddress, s.job.Progress(), s.job.ID()) if err != nil { return err } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go index 3c631a6adec0..a22af70728de 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" ) @@ -92,11 +93,19 @@ func ingestionPlanHook( // TODO(adityamaru): Add privileges checks. Probably the same as RESTORE. prefix := keys.MakeTenantPrefix(ingestionStmt.Targets.Tenant) + startTime := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} + if ingestionStmt.AsOf.Expr != nil { + var err error + startTime, err = p.EvalAsOfTimestamp(ctx, ingestionStmt.AsOf) + if err != nil { + return err + } + } + streamIngestionDetails := jobspb.StreamIngestionDetails{ StreamAddress: streamingccl.StreamAddress(from[0]), Span: roachpb.Span{Key: prefix, EndKey: prefix.PrefixEnd()}, - // TODO: Figure out what the initial ts should be. - StartTime: hlc.Timestamp{}, + StartTime: startTime, } jobDescription, err := streamIngestionJobDescription(p, ingestionStmt) diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_planning.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_planning.go index a239e97f9942..009b070ac8a9 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_planning.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_planning.go @@ -45,6 +45,7 @@ func distStreamIngestionPlanSpecs( if i < len(nodes) { spec := &execinfrapb.StreamIngestionDataSpec{ JobID: int64(jobID), + StartTime: initialHighWater, StreamAddress: streamAddress, PartitionAddresses: make([]streamingccl.PartitionAddress, 0), } diff --git a/pkg/sql/parser/parse_test.go b/pkg/sql/parser/parse_test.go index 13dda0f380f5..cb5dd7f3d199 100644 --- a/pkg/sql/parser/parse_test.go +++ b/pkg/sql/parser/parse_test.go @@ -1729,6 +1729,8 @@ func TestParse(t *testing.T) { {`RESTORE TENANT 123 FROM REPLICATION STREAM FROM 'bar'`}, {`RESTORE TENANT 123 FROM REPLICATION STREAM FROM $1`}, + {`RESTORE TENANT 123 FROM REPLICATION STREAM FROM 'bar' AS OF SYSTEM TIME '1'`}, + {`RESTORE TENANT 123 FROM REPLICATION STREAM FROM $1 AS OF SYSTEM TIME '1'`}, // Currently, we only support TENANT as a target. We have grammar rules for // all targets supported by RESTORE but these will error out during the diff --git a/pkg/sql/parser/sql.y b/pkg/sql/parser/sql.y index 729b45232a93..c3e0868c7860 100644 --- a/pkg/sql/parser/sql.y +++ b/pkg/sql/parser/sql.y @@ -2639,11 +2639,12 @@ restore_stmt: Options: *($8.restoreOptions()), } } -| RESTORE targets FROM REPLICATION STREAM FROM string_or_placeholder_opt_list +| RESTORE targets FROM REPLICATION STREAM FROM string_or_placeholder_opt_list opt_as_of_clause { $$.val = &tree.StreamIngestion{ Targets: $2.targetList(), From: $7.stringOrPlaceholderOptList(), + AsOf: $8.asOfClause(), } } | RESTORE error // SHOW HELP: RESTORE diff --git a/pkg/sql/sem/tree/stream_ingestion.go b/pkg/sql/sem/tree/stream_ingestion.go index 1607f391aac1..aeaad0ea713f 100644 --- a/pkg/sql/sem/tree/stream_ingestion.go +++ b/pkg/sql/sem/tree/stream_ingestion.go @@ -14,6 +14,7 @@ package tree type StreamIngestion struct { Targets TargetList From StringOrPlaceholderOptList + AsOf AsOfClause } var _ Statement = &StreamIngestion{} @@ -25,4 +26,8 @@ func (node *StreamIngestion) Format(ctx *FmtCtx) { ctx.WriteString(" ") ctx.WriteString("FROM REPLICATION STREAM FROM ") ctx.FormatNode(&node.From) + if node.AsOf.Expr != nil { + ctx.WriteString(" ") + ctx.FormatNode(&node.AsOf) + } }