From f676eab561826a22ebfe7da172d0b3560f88020b Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Tue, 13 Feb 2024 15:20:22 -0800 Subject: [PATCH] sessiondatapb: correctly propagate streamer_enabled to remote nodes `streamer_enabled` session variable determines whether the Streamer API should be used for a particular DistSQL flow, and previously we incorrectly stored it in the local-only session data that doesn't get sent to remote nodes. As a result, remote nodes would get the Go default value `false` for this parameter, so we would end up using the Streamer API on the gateway and non-streamer on remote nodes within the same flow. This is now fixed (in backwards-compatible way - older binaries will keep the incorrect old behavior and newer binaries will respect the session variable). This bug was introduced about a year ago in ed3f640510fa2d993f9cf4508cd0f8b9c53733d5 (before that change we consulted the cluster setting). Epic: None Release note: None --- pkg/sql/colflow/draining_test.go | 3 +++ pkg/sql/internal.go | 2 +- pkg/sql/sessiondatapb/local_only_session_data.proto | 3 +-- pkg/sql/sessiondatapb/session_data.proto | 2 ++ 4 files changed, 7 insertions(+), 3 deletions(-) diff --git a/pkg/sql/colflow/draining_test.go b/pkg/sql/colflow/draining_test.go index 987a1426c282..2d593b7f7b7e 100644 --- a/pkg/sql/colflow/draining_test.go +++ b/pkg/sql/colflow/draining_test.go @@ -75,6 +75,9 @@ func TestDrainingAfterRemoteError(t *testing.T) { // Make sure that the query is fully distributed (i.e. all execution happens // on node 2). sqlDB.Exec(t, "SET distsql = always;") + // Disable the streamer to prevent this test from triggering the known race + // #119201. + sqlDB.Exec(t, "SET streamer_enabled = false;") // Sanity check that, indeed, node 2 is part of the physical plan. rows, err := conn.Query("EXPLAIN (VEC) SELECT sum(length(v)) FROM large, small WHERE small.k = large.k GROUP BY large.k;") diff --git a/pkg/sql/internal.go b/pkg/sql/internal.go index f9c767e78b22..dea77adecf2c 100644 --- a/pkg/sql/internal.go +++ b/pkg/sql/internal.go @@ -743,7 +743,7 @@ func applyInternalExecutorSessionExceptions(sd *sessiondata.SessionData) { // At the moment, we disable the usage of the Streamer API in the internal // executor to avoid possible concurrency with the "outer" query (which // might be using the RootTxn). - sd.LocalOnlySessionData.StreamerEnabled = false + sd.SessionData.StreamerEnabled = false } // applyOverrides overrides the respective fields from sd for all the fields set on o. diff --git a/pkg/sql/sessiondatapb/local_only_session_data.proto b/pkg/sql/sessiondatapb/local_only_session_data.proto index 22eb149d808b..c1af7cbd97cd 100644 --- a/pkg/sql/sessiondatapb/local_only_session_data.proto +++ b/pkg/sql/sessiondatapb/local_only_session_data.proto @@ -367,8 +367,7 @@ message LocalOnlySessionData { // Execution of these deallocated prepared statements will fail until they are // prepared again. int64 prepared_statements_cache_size = 97; - // StreamerEnabled controls whether the Streamer API can be used. - bool streamer_enabled = 98; + reserved 98; // DisableDropVirtualCluster causes errors when the client // attempts to drop virtual clusters or tenant records. bool disable_drop_virtual_cluster = 99; diff --git a/pkg/sql/sessiondatapb/session_data.proto b/pkg/sql/sessiondatapb/session_data.proto index 9ccbd64ad7ab..df6f93f9b0cd 100644 --- a/pkg/sql/sessiondatapb/session_data.proto +++ b/pkg/sql/sessiondatapb/session_data.proto @@ -132,6 +132,8 @@ message SessionData { // head-of-the-line request in case the "eager" memory usage limit has been // exceeded. double streamer_head_of_line_only_fraction = 30; + // StreamerEnabled controls whether the Streamer API can be used. + bool streamer_enabled = 32; } // DataConversionConfig contains the parameters that influence the output