Skip to content

Commit

Permalink
bulkio: Correctly check stream replication feature gate.
Browse files Browse the repository at this point in the history
Verify experimental stream replication feature is enabled only
after making sure that the statement we're attempting to execute
is a replication stream statement.

Release Justification: minor bug fix.
Release Notes: None
  • Loading branch information
Yevgeniy Miretskiy committed Feb 26, 2021
1 parent dac79a2 commit 496b012
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 9 deletions.
10 changes: 5 additions & 5 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ func streamIngestionJobDescription(
func ingestionPlanHook(
ctx context.Context, stmt tree.Statement, p sql.PlanHookState,
) (sql.PlanHookRowFn, colinfo.ResultColumns, []sql.PlanNode, bool, error) {
ingestionStmt, ok := stmt.(*tree.StreamIngestion)
if !ok {
return nil, nil, nil, false, nil
}

// Check if the experimental feature is enabled.
if !p.SessionData().EnableStreamReplication {
return nil, nil, nil, false, errors.WithTelemetry(
Expand All @@ -54,11 +59,6 @@ func ingestionPlanHook(
)
}

ingestionStmt, ok := stmt.(*tree.StreamIngestion)
if !ok {
return nil, nil, nil, false, nil
}

fromFn, err := p.TypeAsStringArray(ctx, tree.Exprs(ingestionStmt.From), "INGESTION")
if err != nil {
return nil, nil, nil, false, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ var replicationStreamHeader = colinfo.ResultColumns{
func createReplicationStreamHook(
ctx context.Context, stmt tree.Statement, p sql.PlanHookState,
) (sql.PlanHookRowFn, colinfo.ResultColumns, []sql.PlanNode, bool, error) {
stream, ok := stmt.(*tree.ReplicationStream)
if !ok {
return nil, nil, nil, false, nil
}
if !p.SessionData().EnableStreamReplication {
return nil, nil, nil, false, errors.WithTelemetry(
pgerror.WithCandidateCode(
Expand All @@ -180,10 +184,6 @@ func createReplicationStreamHook(
)
}

stream, ok := stmt.(*tree.ReplicationStream)
if !ok {
return nil, nil, nil, false, nil
}
eval, err := makeReplicationStreamEval(ctx, p, stream)
if err != nil {
return nil, nil, nil, false, err
Expand Down

0 comments on commit 496b012

Please sign in to comment.