Skip to content

Commit

Permalink
Merge #61187
Browse files Browse the repository at this point in the history
61187: bulkio: Correctly check stream replication feature gate. r=miretskiy a=miretskiy

Verify experimental stream replication feature is enabled only
after making sure that the statement we're attempting to execute
is a replication stream statement.

Release Justification: minor bug fix.
Release Notes: None

Co-authored-by: Yevgeniy Miretskiy <[email protected]>
  • Loading branch information
craig[bot] and Yevgeniy Miretskiy committed Feb 26, 2021
2 parents ed15112 + 496b012 commit 7df2237
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 9 deletions.
10 changes: 5 additions & 5 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ func streamIngestionJobDescription(
func ingestionPlanHook(
ctx context.Context, stmt tree.Statement, p sql.PlanHookState,
) (sql.PlanHookRowFn, colinfo.ResultColumns, []sql.PlanNode, bool, error) {
ingestionStmt, ok := stmt.(*tree.StreamIngestion)
if !ok {
return nil, nil, nil, false, nil
}

// Check if the experimental feature is enabled.
if !p.SessionData().EnableStreamReplication {
return nil, nil, nil, false, errors.WithTelemetry(
Expand All @@ -54,11 +59,6 @@ func ingestionPlanHook(
)
}

ingestionStmt, ok := stmt.(*tree.StreamIngestion)
if !ok {
return nil, nil, nil, false, nil
}

fromFn, err := p.TypeAsStringArray(ctx, tree.Exprs(ingestionStmt.From), "INGESTION")
if err != nil {
return nil, nil, nil, false, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ var replicationStreamHeader = colinfo.ResultColumns{
func createReplicationStreamHook(
ctx context.Context, stmt tree.Statement, p sql.PlanHookState,
) (sql.PlanHookRowFn, colinfo.ResultColumns, []sql.PlanNode, bool, error) {
stream, ok := stmt.(*tree.ReplicationStream)
if !ok {
return nil, nil, nil, false, nil
}
if !p.SessionData().EnableStreamReplication {
return nil, nil, nil, false, errors.WithTelemetry(
pgerror.WithCandidateCode(
Expand All @@ -180,10 +184,6 @@ func createReplicationStreamHook(
)
}

stream, ok := stmt.(*tree.ReplicationStream)
if !ok {
return nil, nil, nil, false, nil
}
eval, err := makeReplicationStreamEval(ctx, p, stream)
if err != nil {
return nil, nil, nil, false, err
Expand Down

0 comments on commit 7df2237

Please sign in to comment.