diff --git a/pkg/ccl/logictestccl/testdata/logic_test/regional_by_row_query_behavior b/pkg/ccl/logictestccl/testdata/logic_test/regional_by_row_query_behavior index 73773d16e348..5d337eefccd1 100644 --- a/pkg/ccl/logictestccl/testdata/logic_test/regional_by_row_query_behavior +++ b/pkg/ccl/logictestccl/testdata/logic_test/regional_by_row_query_behavior @@ -3475,3 +3475,82 @@ SELECT pk FROM regional_by_row_table_virt GROUP BY v; # can be proven to be monotonically increasing or decreasing. statement error pq: column "pk" must appear in the GROUP BY clause or be used in an aggregate function SELECT pk FROM regional_by_row_table_virt GROUP BY (a+10); + +# Regression test for incorrectly setting bytes limit in the streamer on remote +# lookups (#108206). +statement ok +CREATE TABLE t108206_p ( + id INT PRIMARY KEY, + p_id INT, + INDEX (p_id), + FAMILY (id, p_id) +) LOCALITY REGIONAL BY ROW; +CREATE TABLE t108206_c ( + c_id INT PRIMARY KEY, + c_p_id INT, + INDEX (c_p_id), + FAMILY (c_id, c_p_id) +) LOCALITY REGIONAL BY ROW; +INSERT INTO t108206_p (crdb_region, id, p_id) VALUES ('ap-southeast-2', 1, 10), ('ca-central-1', 2, 20), ('us-east-1', 3, 30); +INSERT INTO t108206_c (crdb_region, c_id, c_p_id) VALUES ('ap-southeast-2', 10, 10), ('ca-central-1', 20, 20), ('us-east-1', 30, 30) + +statement ok +SET tracing = on,kv,results; SELECT * FROM t108206_c WHERE EXISTS (SELECT * FROM t108206_p WHERE p_id = c_p_id) AND c_id = 20; SET tracing = off + +# If the row is not found in the local region, the other regions are searched in +# parallel. +query T +SELECT message FROM [SHOW KV TRACE FOR SESSION] WITH ORDINALITY + WHERE message LIKE 'fetched:%' OR message LIKE 'output row%' + OR message LIKE 'Scan%' + ORDER BY ordinality ASC +---- +Scan /Table/138/1/"@"/20/0 +Scan /Table/138/1/"\x80"/20/0, /Table/138/1/"\xc0"/20/0 +fetched: /t108206_c/t108206_c_pkey/?/20/c_p_id -> /20 +Scan /Table/137/2/"@"/2{0-1} +Scan /Table/137/2/"\x80"/2{0-1}, /Table/137/2/"\xc0"/2{0-1} +fetched: /t108206_p/t108206_p_p_id_idx/'ca-central-1'/20/2 -> +output row: [20 20] + +# Left join with locality optimized search enabled. +query T retry +SELECT * FROM [EXPLAIN (DISTSQL) SELECT * FROM t108206_c WHERE EXISTS (SELECT * FROM t108206_p WHERE p_id = c_p_id) AND c_id = 20] OFFSET 2 +---- +· +• lookup join (semi) +│ table: t108206_p@t108206_p_p_id_idx +│ lookup condition: (crdb_region = 'ap-southeast-2') AND (c_p_id = p_id) +│ remote lookup condition: (crdb_region IN ('ca-central-1', 'us-east-1')) AND (c_p_id = p_id) +│ +└── • union all + │ limit: 1 + │ + ├── • scan + │ missing stats + │ table: t108206_c@t108206_c_pkey + │ spans: [/'ap-southeast-2'/20 - /'ap-southeast-2'/20] + │ + └── • scan + missing stats + table: t108206_c@t108206_c_pkey + spans: [/'ca-central-1'/20 - /'ca-central-1'/20] [/'us-east-1'/20 - /'us-east-1'/20] +· +Diagram: https://cockroachdb.github.io/distsqlplan/decode.html#eJysk1Fr6koQx9_vpxjmRb1scRPLRRYKlhq5KVZ7jXALVSRNpnZPYzZnd0Mtxe9-2Oix6tFyWk4eQjI7-9v__Gf2Dc33DAVGQT-4GsPf0BsNb-A-uLvtX4YDqHfDaBz912_AfoL1eNvn_8wS-P_fYBRAcOfyoH48q9hkFTOZwgUkM_fRgMtBF-rJOubzxhSGvV4UjMFHhrlKaRAvyKC4Rw-nDAutEjJGaRd6qxLCdImCM5R5UVoXnjJMlCYUb2ilzQgFjuOHjEYUp6SbHBmmZGOZVdhtEZ3t16x4pldkeKWycpEbAU4e2yhGhlERu2hzgpPJss0n2PR5k0Ocp-CBsk-kcbpiqEr7rsjYeE4ovJ0Swi4KvmJfq8L7g1V0NhWcVO0fqPZOqn4Xa0jLOIMyVzolTeme3unqSHkDdaaKpr9fWF8upAXvpDR-IM3_jKHXSuYbP1v7x45fCxLQD3pjiIKbEK6H4QDZ1uZia3NR2TmT6RIZ9pV6Lgv4pmQOKhdQ75zDBSxr57wmhOh4nHu8vRn5jg8X0Gk1kOGIFsoSZEd2u9u3rLV39zNY1pI94K_Ebc-Ldc91-jDTNJcqP2lk68DI1meMHJEpVG7ooMm_17Izz00DpXNaT5BRpU7oVqukyl3_DitQFUjJ2PWqv_4J82rJcydoihfbEd0leV8l8UOS_yGptUfiuyT_kNT6kHR-msSdY4-ZenFXWSDfPGdHXj8fdBviuXFti57US4V1Y25QPMaZIYY38TN1yZJeyFwaKxMUVpe0Wv31IwAA__-d1uWE + +statement ok +SET vectorize=on + +query T +EXPLAIN (VEC) SELECT * FROM child LEFT JOIN parent ON p_id = c_p_id WHERE c_id = 10 +---- +│ +└ Node 1 + └ *rowexec.joinReader + └ *colexec.limitOp + └ *colexec.SerialUnorderedSynchronizer + ├ *colfetcher.ColBatchScan + └ *colfetcher.ColBatchScan + +statement ok +RESET vectorize diff --git a/pkg/sql/rowexec/joinreader.go b/pkg/sql/rowexec/joinreader.go index 9f22f9fd040c..b71b18cb20b5 100644 --- a/pkg/sql/rowexec/joinreader.go +++ b/pkg/sql/rowexec/joinreader.go @@ -819,6 +819,24 @@ func sortSpans(spans roachpb.Spans, spanIDs []int) { } } +func (jr *joinReader) getBatchBytesLimit() rowinfra.BytesLimit { + if jr.usesStreamer { + // The streamer itself sets the correct TargetBytes parameter on the + // BatchRequests. + return rowinfra.NoBytesLimit + } + if !jr.shouldLimitBatches { + // We deem it safe to not limit the batches in order to get the + // DistSender-level parallelism. + return rowinfra.NoBytesLimit + } + bytesLimit := jr.lookupBatchBytesLimit + if bytesLimit == 0 { + bytesLimit = rowinfra.GetDefaultBatchBytesLimit(jr.EvalCtx.TestingKnobs.ForceProductionValues) + } + return bytesLimit +} + // readInput reads the next batch of input rows and starts an index scan, which // for lookup join is the lookup of matching KVs for a batch of input rows. // It can sometimes emit a single row on behalf of the previous batch. @@ -1016,19 +1034,8 @@ func (jr *joinReader) readInput() ( // modification here, but we want to be conscious about the memory // accounting - we don't double count for any memory of spans because the // joinReaderStrategy doesn't account for any memory used by the spans. - var bytesLimit rowinfra.BytesLimit - if !jr.usesStreamer { - if !jr.shouldLimitBatches { - bytesLimit = rowinfra.NoBytesLimit - } else { - bytesLimit = jr.lookupBatchBytesLimit - if jr.lookupBatchBytesLimit == 0 { - bytesLimit = rowinfra.GetDefaultBatchBytesLimit(jr.EvalCtx.TestingKnobs.ForceProductionValues) - } - } - } if err = jr.fetcher.StartScan( - jr.Ctx(), spans, spanIDs, bytesLimit, rowinfra.NoRowLimit, + jr.Ctx(), spans, spanIDs, jr.getBatchBytesLimit(), rowinfra.NoRowLimit, ); err != nil { jr.MoveToDraining(err) return jrStateUnknown, nil, jr.DrainHelper() @@ -1094,12 +1101,8 @@ func (jr *joinReader) fetchLookupRow() (joinReaderState, *execinfrapb.ProducerMe } log.VEventf(jr.Ctx(), 1, "scanning %d remote spans", len(spans)) - bytesLimit := rowinfra.GetDefaultBatchBytesLimit(jr.EvalCtx.TestingKnobs.ForceProductionValues) - if !jr.shouldLimitBatches { - bytesLimit = rowinfra.NoBytesLimit - } if err := jr.fetcher.StartScan( - jr.Ctx(), spans, spanIDs, bytesLimit, rowinfra.NoRowLimit, + jr.Ctx(), spans, spanIDs, jr.getBatchBytesLimit(), rowinfra.NoRowLimit, ); err != nil { jr.MoveToDraining(err) return jrStateUnknown, jr.DrainHelper()