From c2b6d75885fb0fe7d2d70488e8d04226d65764a6 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Fri, 4 Aug 2023 12:58:51 -0700 Subject: [PATCH] rowexec: fix remote lookups when streamer is used Previously, we could incorrectly pass non-zero batch bytes limit when performing the remote lookups when the join reader is powered by the streamer. This would lead to an internal error and is now fixed. Release note (bug fix): CockroachDB previously could encounter an internal error `unexpected non-zero bytes limit for txnKVStreamer` when evaluating locality-optimized lookup joins in case it had to perform the remote regions' lookup. The bug was introduced in 22.2 and is now fixed. Temporary workaround without upgrading is to run `SET streamer_enabled = false;`. --- .../logic_test/regional_by_row_query_behavior | 79 +++++++++++++++++++ pkg/sql/rowexec/joinreader.go | 37 +++++---- 2 files changed, 99 insertions(+), 17 deletions(-) diff --git a/pkg/ccl/logictestccl/testdata/logic_test/regional_by_row_query_behavior b/pkg/ccl/logictestccl/testdata/logic_test/regional_by_row_query_behavior index 73773d16e348..5d337eefccd1 100644 --- a/pkg/ccl/logictestccl/testdata/logic_test/regional_by_row_query_behavior +++ b/pkg/ccl/logictestccl/testdata/logic_test/regional_by_row_query_behavior @@ -3475,3 +3475,82 @@ SELECT pk FROM regional_by_row_table_virt GROUP BY v; # can be proven to be monotonically increasing or decreasing. statement error pq: column "pk" must appear in the GROUP BY clause or be used in an aggregate function SELECT pk FROM regional_by_row_table_virt GROUP BY (a+10); + +# Regression test for incorrectly setting bytes limit in the streamer on remote +# lookups (#108206). +statement ok +CREATE TABLE t108206_p ( + id INT PRIMARY KEY, + p_id INT, + INDEX (p_id), + FAMILY (id, p_id) +) LOCALITY REGIONAL BY ROW; +CREATE TABLE t108206_c ( + c_id INT PRIMARY KEY, + c_p_id INT, + INDEX (c_p_id), + FAMILY (c_id, c_p_id) +) LOCALITY REGIONAL BY ROW; +INSERT INTO t108206_p (crdb_region, id, p_id) VALUES ('ap-southeast-2', 1, 10), ('ca-central-1', 2, 20), ('us-east-1', 3, 30); +INSERT INTO t108206_c (crdb_region, c_id, c_p_id) VALUES ('ap-southeast-2', 10, 10), ('ca-central-1', 20, 20), ('us-east-1', 30, 30) + +statement ok +SET tracing = on,kv,results; SELECT * FROM t108206_c WHERE EXISTS (SELECT * FROM t108206_p WHERE p_id = c_p_id) AND c_id = 20; SET tracing = off + +# If the row is not found in the local region, the other regions are searched in +# parallel. +query T +SELECT message FROM [SHOW KV TRACE FOR SESSION] WITH ORDINALITY + WHERE message LIKE 'fetched:%' OR message LIKE 'output row%' + OR message LIKE 'Scan%' + ORDER BY ordinality ASC +---- +Scan /Table/138/1/"@"/20/0 +Scan /Table/138/1/"\x80"/20/0, /Table/138/1/"\xc0"/20/0 +fetched: /t108206_c/t108206_c_pkey/?/20/c_p_id -> /20 +Scan /Table/137/2/"@"/2{0-1} +Scan /Table/137/2/"\x80"/2{0-1}, /Table/137/2/"\xc0"/2{0-1} +fetched: /t108206_p/t108206_p_p_id_idx/'ca-central-1'/20/2 -> +output row: [20 20] + +# Left join with locality optimized search enabled. +query T retry +SELECT * FROM [EXPLAIN (DISTSQL) SELECT * FROM t108206_c WHERE EXISTS (SELECT * FROM t108206_p WHERE p_id = c_p_id) AND c_id = 20] OFFSET 2 +---- +· +• lookup join (semi) +│ table: t108206_p@t108206_p_p_id_idx +│ lookup condition: (crdb_region = 'ap-southeast-2') AND (c_p_id = p_id) +│ remote lookup condition: (crdb_region IN ('ca-central-1', 'us-east-1')) AND (c_p_id = p_id) +│ +└── • union all + │ limit: 1 + │ + ├── • scan + │ missing stats + │ table: t108206_c@t108206_c_pkey + │ spans: [/'ap-southeast-2'/20 - /'ap-southeast-2'/20] + │ + └── • scan + missing stats + table: t108206_c@t108206_c_pkey + spans: [/'ca-central-1'/20 - /'ca-central-1'/20] [/'us-east-1'/20 - /'us-east-1'/20] +· +Diagram: https://cockroachdb.github.io/distsqlplan/decode.html#eJysk1Fr6koQx9_vpxjmRb1scRPLRRYKlhq5KVZ7jXALVSRNpnZPYzZnd0Mtxe9-2Oix6tFyWk4eQjI7-9v__Gf2Dc33DAVGQT-4GsPf0BsNb-A-uLvtX4YDqHfDaBz912_AfoL1eNvn_8wS-P_fYBRAcOfyoH48q9hkFTOZwgUkM_fRgMtBF-rJOubzxhSGvV4UjMFHhrlKaRAvyKC4Rw-nDAutEjJGaRd6qxLCdImCM5R5UVoXnjJMlCYUb2ilzQgFjuOHjEYUp6SbHBmmZGOZVdhtEZ3t16x4pldkeKWycpEbAU4e2yhGhlERu2hzgpPJss0n2PR5k0Ocp-CBsk-kcbpiqEr7rsjYeE4ovJ0Swi4KvmJfq8L7g1V0NhWcVO0fqPZOqn4Xa0jLOIMyVzolTeme3unqSHkDdaaKpr9fWF8upAXvpDR-IM3_jKHXSuYbP1v7x45fCxLQD3pjiIKbEK6H4QDZ1uZia3NR2TmT6RIZ9pV6Lgv4pmQOKhdQ75zDBSxr57wmhOh4nHu8vRn5jg8X0Gk1kOGIFsoSZEd2u9u3rLV39zNY1pI94K_Ebc-Ldc91-jDTNJcqP2lk68DI1meMHJEpVG7ooMm_17Izz00DpXNaT5BRpU7oVqukyl3_DitQFUjJ2PWqv_4J82rJcydoihfbEd0leV8l8UOS_yGptUfiuyT_kNT6kHR-msSdY4-ZenFXWSDfPGdHXj8fdBviuXFti57US4V1Y25QPMaZIYY38TN1yZJeyFwaKxMUVpe0Wv31IwAA__-d1uWE + +statement ok +SET vectorize=on + +query T +EXPLAIN (VEC) SELECT * FROM child LEFT JOIN parent ON p_id = c_p_id WHERE c_id = 10 +---- +│ +└ Node 1 + └ *rowexec.joinReader + └ *colexec.limitOp + └ *colexec.SerialUnorderedSynchronizer + ├ *colfetcher.ColBatchScan + └ *colfetcher.ColBatchScan + +statement ok +RESET vectorize diff --git a/pkg/sql/rowexec/joinreader.go b/pkg/sql/rowexec/joinreader.go index 9f22f9fd040c..b71b18cb20b5 100644 --- a/pkg/sql/rowexec/joinreader.go +++ b/pkg/sql/rowexec/joinreader.go @@ -819,6 +819,24 @@ func sortSpans(spans roachpb.Spans, spanIDs []int) { } } +func (jr *joinReader) getBatchBytesLimit() rowinfra.BytesLimit { + if jr.usesStreamer { + // The streamer itself sets the correct TargetBytes parameter on the + // BatchRequests. + return rowinfra.NoBytesLimit + } + if !jr.shouldLimitBatches { + // We deem it safe to not limit the batches in order to get the + // DistSender-level parallelism. + return rowinfra.NoBytesLimit + } + bytesLimit := jr.lookupBatchBytesLimit + if bytesLimit == 0 { + bytesLimit = rowinfra.GetDefaultBatchBytesLimit(jr.EvalCtx.TestingKnobs.ForceProductionValues) + } + return bytesLimit +} + // readInput reads the next batch of input rows and starts an index scan, which // for lookup join is the lookup of matching KVs for a batch of input rows. // It can sometimes emit a single row on behalf of the previous batch. @@ -1016,19 +1034,8 @@ func (jr *joinReader) readInput() ( // modification here, but we want to be conscious about the memory // accounting - we don't double count for any memory of spans because the // joinReaderStrategy doesn't account for any memory used by the spans. - var bytesLimit rowinfra.BytesLimit - if !jr.usesStreamer { - if !jr.shouldLimitBatches { - bytesLimit = rowinfra.NoBytesLimit - } else { - bytesLimit = jr.lookupBatchBytesLimit - if jr.lookupBatchBytesLimit == 0 { - bytesLimit = rowinfra.GetDefaultBatchBytesLimit(jr.EvalCtx.TestingKnobs.ForceProductionValues) - } - } - } if err = jr.fetcher.StartScan( - jr.Ctx(), spans, spanIDs, bytesLimit, rowinfra.NoRowLimit, + jr.Ctx(), spans, spanIDs, jr.getBatchBytesLimit(), rowinfra.NoRowLimit, ); err != nil { jr.MoveToDraining(err) return jrStateUnknown, nil, jr.DrainHelper() @@ -1094,12 +1101,8 @@ func (jr *joinReader) fetchLookupRow() (joinReaderState, *execinfrapb.ProducerMe } log.VEventf(jr.Ctx(), 1, "scanning %d remote spans", len(spans)) - bytesLimit := rowinfra.GetDefaultBatchBytesLimit(jr.EvalCtx.TestingKnobs.ForceProductionValues) - if !jr.shouldLimitBatches { - bytesLimit = rowinfra.NoBytesLimit - } if err := jr.fetcher.StartScan( - jr.Ctx(), spans, spanIDs, bytesLimit, rowinfra.NoRowLimit, + jr.Ctx(), spans, spanIDs, jr.getBatchBytesLimit(), rowinfra.NoRowLimit, ); err != nil { jr.MoveToDraining(err) return jrStateUnknown, jr.DrainHelper()