Skip to content

Commit

Permalink
streamingest: add AOST to stream ingestion job
Browse files Browse the repository at this point in the history
Release justification: low-risk (very experimental feature), high reward
(AOST stream ingestion)

Release note: None
  • Loading branch information
pbardea committed Mar 1, 2021
1 parent 91a58f4 commit ca795b7
Show file tree
Hide file tree
Showing 8 changed files with 27 additions and 7 deletions.
3 changes: 2 additions & 1 deletion docs/generated/sql/bnf/restore.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ restore_stmt ::=
| 'RESTORE' ( ( 'TABLE' | ) table_pattern ( ( ',' table_pattern ) )* | 'DATABASE' database_name ( ( ',' database_name ) )* ) 'FROM' subdirectory 'IN' full_backup_location ( | partitioned_backup_location ( ',' partitioned_backup_location )*) 'WITH' restore_options_list
| 'RESTORE' ( ( 'TABLE' | ) table_pattern ( ( ',' table_pattern ) )* | 'DATABASE' database_name ( ( ',' database_name ) )* ) 'FROM' subdirectory 'IN' full_backup_location ( | partitioned_backup_location ( ',' partitioned_backup_location )*) 'WITH' 'OPTIONS' '(' restore_options_list ')'
| 'RESTORE' ( ( 'TABLE' | ) table_pattern ( ( ',' table_pattern ) )* | 'DATABASE' database_name ( ( ',' database_name ) )* ) 'FROM' subdirectory 'IN' full_backup_location ( | partitioned_backup_location ( ',' partitioned_backup_location )*)
| 'RESTORE' ( ( 'TABLE' | ) table_pattern ( ( ',' table_pattern ) )* | 'DATABASE' database_name ( ( ',' database_name ) )* ) 'FROM' 'REPLICATION' 'STREAM' 'FROM' subdirectory_opt_list
| 'RESTORE' ( ( 'TABLE' | ) table_pattern ( ( ',' table_pattern ) )* | 'DATABASE' database_name ( ( ',' database_name ) )* ) 'FROM' 'REPLICATION' 'STREAM' 'FROM' subdirectory_opt_list 'AS' 'OF' 'SYSTEM' 'TIME' timestamp
| 'RESTORE' ( ( 'TABLE' | ) table_pattern ( ( ',' table_pattern ) )* | 'DATABASE' database_name ( ( ',' database_name ) )* ) 'FROM' 'REPLICATION' 'STREAM' 'FROM' subdirectory_opt_list
2 changes: 1 addition & 1 deletion docs/generated/sql/bnf/stmt_block.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ restore_stmt ::=
| 'RESTORE' 'FROM' string_or_placeholder 'IN' list_of_string_or_placeholder_opt_list opt_as_of_clause opt_with_restore_options
| 'RESTORE' targets 'FROM' list_of_string_or_placeholder_opt_list opt_as_of_clause opt_with_restore_options
| 'RESTORE' targets 'FROM' string_or_placeholder 'IN' list_of_string_or_placeholder_opt_list opt_as_of_clause opt_with_restore_options
| 'RESTORE' targets 'FROM' 'REPLICATION' 'STREAM' 'FROM' string_or_placeholder_opt_list
| 'RESTORE' targets 'FROM' 'REPLICATION' 'STREAM' 'FROM' string_or_placeholder_opt_list opt_as_of_clause

resume_stmt ::=
resume_jobs_stmt
Expand Down
5 changes: 3 additions & 2 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type streamIngestionResumer struct {
func ingest(
ctx context.Context,
execCtx sql.JobExecContext,
startTime hlc.Timestamp,
streamAddress streamingccl.StreamAddress,
progress jobspb.Progress,
jobID jobspb.JobID,
Expand All @@ -49,7 +50,7 @@ func ingest(
// KVs. We can skip to ingesting after this resolved ts. Plumb the
// initialHighwatermark to the ingestion processor spec based on what we read
// from the job progress.
var initialHighWater hlc.Timestamp
initialHighWater := startTime
if h := progress.GetHighWater(); h != nil && !h.IsEmpty() {
initialHighWater = *h
}
Expand Down Expand Up @@ -80,7 +81,7 @@ func (s *streamIngestionResumer) Resume(resumeCtx context.Context, execCtx inter
p := execCtx.(sql.JobExecContext)

// Start ingesting KVs from the replication stream.
err := ingest(resumeCtx, p, details.StreamAddress, s.job.Progress(), s.job.ID())
err := ingest(resumeCtx, p, details.StartTime, details.StreamAddress, s.job.Progress(), s.job.ID())
if err != nil {
return err
}
Expand Down
13 changes: 11 additions & 2 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -92,11 +93,19 @@ func ingestionPlanHook(
// TODO(adityamaru): Add privileges checks. Probably the same as RESTORE.

prefix := keys.MakeTenantPrefix(ingestionStmt.Targets.Tenant)
startTime := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}
if ingestionStmt.AsOf.Expr != nil {
var err error
startTime, err = p.EvalAsOfTimestamp(ctx, ingestionStmt.AsOf)
if err != nil {
return err
}
}

streamIngestionDetails := jobspb.StreamIngestionDetails{
StreamAddress: streamingccl.StreamAddress(from[0]),
Span: roachpb.Span{Key: prefix, EndKey: prefix.PrefixEnd()},
// TODO: Figure out what the initial ts should be.
StartTime: hlc.Timestamp{},
StartTime: startTime,
}

jobDescription, err := streamIngestionJobDescription(p, ingestionStmt)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func distStreamIngestionPlanSpecs(
if i < len(nodes) {
spec := &execinfrapb.StreamIngestionDataSpec{
JobID: int64(jobID),
StartTime: initialHighWater,
StreamAddress: streamAddress,
PartitionAddresses: make([]streamingccl.PartitionAddress, 0),
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/parser/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1729,6 +1729,8 @@ func TestParse(t *testing.T) {

{`RESTORE TENANT 123 FROM REPLICATION STREAM FROM 'bar'`},
{`RESTORE TENANT 123 FROM REPLICATION STREAM FROM $1`},
{`RESTORE TENANT 123 FROM REPLICATION STREAM FROM 'bar' AS OF SYSTEM TIME '1'`},
{`RESTORE TENANT 123 FROM REPLICATION STREAM FROM $1 AS OF SYSTEM TIME '1'`},

// Currently, we only support TENANT as a target. We have grammar rules for
// all targets supported by RESTORE but these will error out during the
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/parser/sql.y
Original file line number Diff line number Diff line change
Expand Up @@ -2639,11 +2639,12 @@ restore_stmt:
Options: *($8.restoreOptions()),
}
}
| RESTORE targets FROM REPLICATION STREAM FROM string_or_placeholder_opt_list
| RESTORE targets FROM REPLICATION STREAM FROM string_or_placeholder_opt_list opt_as_of_clause
{
$$.val = &tree.StreamIngestion{
Targets: $2.targetList(),
From: $7.stringOrPlaceholderOptList(),
AsOf: $8.asOfClause(),
}
}
| RESTORE error // SHOW HELP: RESTORE
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/sem/tree/stream_ingestion.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ package tree
type StreamIngestion struct {
Targets TargetList
From StringOrPlaceholderOptList
AsOf AsOfClause
}

var _ Statement = &StreamIngestion{}
Expand All @@ -25,4 +26,8 @@ func (node *StreamIngestion) Format(ctx *FmtCtx) {
ctx.WriteString(" ")
ctx.WriteString("FROM REPLICATION STREAM FROM ")
ctx.FormatNode(&node.From)
if node.AsOf.Expr != nil {
ctx.WriteString(" ")
ctx.FormatNode(&node.AsOf)
}
}

0 comments on commit ca795b7

Please sign in to comment.