Skip to content

Commit

Permalink
Merge #108208
Browse files Browse the repository at this point in the history
108208: rowexec: fix remote lookups when streamer is used r=yuzefovich a=yuzefovich

Previously, we could incorrectly pass non-zero batch bytes limit when performing the remote lookups when the join reader is powered by the streamer. This would lead to an internal error and is now fixed.

Fixes: #108206.

Release note (bug fix): CockroachDB previously could encounter an internal error `unexpected non-zero bytes limit for txnKVStreamer` when evaluating locality-optimized lookup joins in case it had to perform the remote regions' lookup. The bug was introduced in 22.2 and is now fixed. Temporary workaround without upgrading is to run
`SET streamer_enabled = false;`.

Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
craig[bot] and yuzefovich committed Aug 5, 2023
2 parents 5cfe9fe + c2b6d75 commit 695d4ec
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3475,3 +3475,82 @@ SELECT pk FROM regional_by_row_table_virt GROUP BY v;
# can be proven to be monotonically increasing or decreasing.
statement error pq: column "pk" must appear in the GROUP BY clause or be used in an aggregate function
SELECT pk FROM regional_by_row_table_virt GROUP BY (a+10);

# Regression test for incorrectly setting bytes limit in the streamer on remote
# lookups (#108206).
statement ok
CREATE TABLE t108206_p (
id INT PRIMARY KEY,
p_id INT,
INDEX (p_id),
FAMILY (id, p_id)
) LOCALITY REGIONAL BY ROW;
CREATE TABLE t108206_c (
c_id INT PRIMARY KEY,
c_p_id INT,
INDEX (c_p_id),
FAMILY (c_id, c_p_id)
) LOCALITY REGIONAL BY ROW;
INSERT INTO t108206_p (crdb_region, id, p_id) VALUES ('ap-southeast-2', 1, 10), ('ca-central-1', 2, 20), ('us-east-1', 3, 30);
INSERT INTO t108206_c (crdb_region, c_id, c_p_id) VALUES ('ap-southeast-2', 10, 10), ('ca-central-1', 20, 20), ('us-east-1', 30, 30)

statement ok
SET tracing = on,kv,results; SELECT * FROM t108206_c WHERE EXISTS (SELECT * FROM t108206_p WHERE p_id = c_p_id) AND c_id = 20; SET tracing = off

# If the row is not found in the local region, the other regions are searched in
# parallel.
query T
SELECT message FROM [SHOW KV TRACE FOR SESSION] WITH ORDINALITY
WHERE message LIKE 'fetched:%' OR message LIKE 'output row%'
OR message LIKE 'Scan%'
ORDER BY ordinality ASC
----
Scan /Table/138/1/"@"/20/0
Scan /Table/138/1/"\x80"/20/0, /Table/138/1/"\xc0"/20/0
fetched: /t108206_c/t108206_c_pkey/?/20/c_p_id -> /20
Scan /Table/137/2/"@"/2{0-1}
Scan /Table/137/2/"\x80"/2{0-1}, /Table/137/2/"\xc0"/2{0-1}
fetched: /t108206_p/t108206_p_p_id_idx/'ca-central-1'/20/2 -> <undecoded>
output row: [20 20]

# Left join with locality optimized search enabled.
query T retry
SELECT * FROM [EXPLAIN (DISTSQL) SELECT * FROM t108206_c WHERE EXISTS (SELECT * FROM t108206_p WHERE p_id = c_p_id) AND c_id = 20] OFFSET 2
----
·
• lookup join (semi)
│ table: t108206_p@t108206_p_p_id_idx
│ lookup condition: (crdb_region = 'ap-southeast-2') AND (c_p_id = p_id)
│ remote lookup condition: (crdb_region IN ('ca-central-1', 'us-east-1')) AND (c_p_id = p_id)
└── • union all
│ limit: 1
├── • scan
│ missing stats
│ table: t108206_c@t108206_c_pkey
│ spans: [/'ap-southeast-2'/20 - /'ap-southeast-2'/20]
└── • scan
missing stats
table: t108206_c@t108206_c_pkey
spans: [/'ca-central-1'/20 - /'ca-central-1'/20] [/'us-east-1'/20 - /'us-east-1'/20]
·
Diagram: https://cockroachdb.github.io/distsqlplan/decode.html#eJysk1Fr6koQx9_vpxjmRb1scRPLRRYKlhq5KVZ7jXALVSRNpnZPYzZnd0Mtxe9-2Oix6tFyWk4eQjI7-9v__Gf2Dc33DAVGQT-4GsPf0BsNb-A-uLvtX4YDqHfDaBz912_AfoL1eNvn_8wS-P_fYBRAcOfyoH48q9hkFTOZwgUkM_fRgMtBF-rJOubzxhSGvV4UjMFHhrlKaRAvyKC4Rw-nDAutEjJGaRd6qxLCdImCM5R5UVoXnjJMlCYUb2ilzQgFjuOHjEYUp6SbHBmmZGOZVdhtEZ3t16x4pldkeKWycpEbAU4e2yhGhlERu2hzgpPJss0n2PR5k0Ocp-CBsk-kcbpiqEr7rsjYeE4ovJ0Swi4KvmJfq8L7g1V0NhWcVO0fqPZOqn4Xa0jLOIMyVzolTeme3unqSHkDdaaKpr9fWF8upAXvpDR-IM3_jKHXSuYbP1v7x45fCxLQD3pjiIKbEK6H4QDZ1uZia3NR2TmT6RIZ9pV6Lgv4pmQOKhdQ75zDBSxr57wmhOh4nHu8vRn5jg8X0Gk1kOGIFsoSZEd2u9u3rLV39zNY1pI94K_Ebc-Ldc91-jDTNJcqP2lk68DI1meMHJEpVG7ooMm_17Izz00DpXNaT5BRpU7oVqukyl3_DitQFUjJ2PWqv_4J82rJcydoihfbEd0leV8l8UOS_yGptUfiuyT_kNT6kHR-msSdY4-ZenFXWSDfPGdHXj8fdBviuXFti57US4V1Y25QPMaZIYY38TN1yZJeyFwaKxMUVpe0Wv31IwAA__-d1uWE

statement ok
SET vectorize=on

query T
EXPLAIN (VEC) SELECT * FROM child LEFT JOIN parent ON p_id = c_p_id WHERE c_id = 10
----
└ Node 1
└ *rowexec.joinReader
└ *colexec.limitOp
└ *colexec.SerialUnorderedSynchronizer
├ *colfetcher.ColBatchScan
└ *colfetcher.ColBatchScan

statement ok
RESET vectorize
37 changes: 20 additions & 17 deletions pkg/sql/rowexec/joinreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -819,6 +819,24 @@ func sortSpans(spans roachpb.Spans, spanIDs []int) {
}
}

func (jr *joinReader) getBatchBytesLimit() rowinfra.BytesLimit {
if jr.usesStreamer {
// The streamer itself sets the correct TargetBytes parameter on the
// BatchRequests.
return rowinfra.NoBytesLimit
}
if !jr.shouldLimitBatches {
// We deem it safe to not limit the batches in order to get the
// DistSender-level parallelism.
return rowinfra.NoBytesLimit
}
bytesLimit := jr.lookupBatchBytesLimit
if bytesLimit == 0 {
bytesLimit = rowinfra.GetDefaultBatchBytesLimit(jr.EvalCtx.TestingKnobs.ForceProductionValues)
}
return bytesLimit
}

// readInput reads the next batch of input rows and starts an index scan, which
// for lookup join is the lookup of matching KVs for a batch of input rows.
// It can sometimes emit a single row on behalf of the previous batch.
Expand Down Expand Up @@ -1016,19 +1034,8 @@ func (jr *joinReader) readInput() (
// modification here, but we want to be conscious about the memory
// accounting - we don't double count for any memory of spans because the
// joinReaderStrategy doesn't account for any memory used by the spans.
var bytesLimit rowinfra.BytesLimit
if !jr.usesStreamer {
if !jr.shouldLimitBatches {
bytesLimit = rowinfra.NoBytesLimit
} else {
bytesLimit = jr.lookupBatchBytesLimit
if jr.lookupBatchBytesLimit == 0 {
bytesLimit = rowinfra.GetDefaultBatchBytesLimit(jr.EvalCtx.TestingKnobs.ForceProductionValues)
}
}
}
if err = jr.fetcher.StartScan(
jr.Ctx(), spans, spanIDs, bytesLimit, rowinfra.NoRowLimit,
jr.Ctx(), spans, spanIDs, jr.getBatchBytesLimit(), rowinfra.NoRowLimit,
); err != nil {
jr.MoveToDraining(err)
return jrStateUnknown, nil, jr.DrainHelper()
Expand Down Expand Up @@ -1094,12 +1101,8 @@ func (jr *joinReader) fetchLookupRow() (joinReaderState, *execinfrapb.ProducerMe
}

log.VEventf(jr.Ctx(), 1, "scanning %d remote spans", len(spans))
bytesLimit := rowinfra.GetDefaultBatchBytesLimit(jr.EvalCtx.TestingKnobs.ForceProductionValues)
if !jr.shouldLimitBatches {
bytesLimit = rowinfra.NoBytesLimit
}
if err := jr.fetcher.StartScan(
jr.Ctx(), spans, spanIDs, bytesLimit, rowinfra.NoRowLimit,
jr.Ctx(), spans, spanIDs, jr.getBatchBytesLimit(), rowinfra.NoRowLimit,
); err != nil {
jr.MoveToDraining(err)
return jrStateUnknown, jr.DrainHelper()
Expand Down

0 comments on commit 695d4ec

Please sign in to comment.