From 496b012d34e6475778e622def5c26f909928b544 Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Fri, 26 Feb 2021 10:01:22 -0500 Subject: [PATCH] bulkio: Correctly check stream replication feature gate. Verify experimental stream replication feature is enabled only after making sure that the statement we're attempting to execute is a replication stream statement. Release Justification: minor bug fix. Release Notes: None --- .../streamingest/stream_ingestion_planning.go | 10 +++++----- .../streamproducer/replication_stream_planning.go | 8 ++++---- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go index e87f88fb7940..aab0806be75e 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go @@ -40,6 +40,11 @@ func streamIngestionJobDescription( func ingestionPlanHook( ctx context.Context, stmt tree.Statement, p sql.PlanHookState, ) (sql.PlanHookRowFn, colinfo.ResultColumns, []sql.PlanNode, bool, error) { + ingestionStmt, ok := stmt.(*tree.StreamIngestion) + if !ok { + return nil, nil, nil, false, nil + } + // Check if the experimental feature is enabled. if !p.SessionData().EnableStreamReplication { return nil, nil, nil, false, errors.WithTelemetry( @@ -54,11 +59,6 @@ func ingestionPlanHook( ) } - ingestionStmt, ok := stmt.(*tree.StreamIngestion) - if !ok { - return nil, nil, nil, false, nil - } - fromFn, err := p.TypeAsStringArray(ctx, tree.Exprs(ingestionStmt.From), "INGESTION") if err != nil { return nil, nil, nil, false, err diff --git a/pkg/ccl/streamingccl/streamproducer/replication_stream_planning.go b/pkg/ccl/streamingccl/streamproducer/replication_stream_planning.go index e95cb43903ed..111cc215cad7 100644 --- a/pkg/ccl/streamingccl/streamproducer/replication_stream_planning.go +++ b/pkg/ccl/streamingccl/streamproducer/replication_stream_planning.go @@ -167,6 +167,10 @@ var replicationStreamHeader = colinfo.ResultColumns{ func createReplicationStreamHook( ctx context.Context, stmt tree.Statement, p sql.PlanHookState, ) (sql.PlanHookRowFn, colinfo.ResultColumns, []sql.PlanNode, bool, error) { + stream, ok := stmt.(*tree.ReplicationStream) + if !ok { + return nil, nil, nil, false, nil + } if !p.SessionData().EnableStreamReplication { return nil, nil, nil, false, errors.WithTelemetry( pgerror.WithCandidateCode( @@ -180,10 +184,6 @@ func createReplicationStreamHook( ) } - stream, ok := stmt.(*tree.ReplicationStream) - if !ok { - return nil, nil, nil, false, nil - } eval, err := makeReplicationStreamEval(ctx, p, stream) if err != nil { return nil, nil, nil, false, err