Skip to content

Commit

Permalink
rowexec: fix remote lookups when streamer is used
Browse files Browse the repository at this point in the history
Previously, we could incorrectly pass non-zero batch bytes limit when
performing the remote lookups when the join reader is powered by the
streamer. This would lead to an internal error and is now fixed.

Release note (bug fix): CockroachDB previously could encounter an
internal error `unexpected non-zero bytes limit for txnKVStreamer` when
evaluating locality-optimized lookup joins in case it had to perform the
remote regions' lookup. The bug was introduced in 22.2 and is now fixed.
Temporary workaround without upgrading is to run
`SET streamer_enabled = false;`.
  • Loading branch information
yuzefovich committed Aug 4, 2023
1 parent 8c0426c commit c2b6d75
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3475,3 +3475,82 @@ SELECT pk FROM regional_by_row_table_virt GROUP BY v;
# can be proven to be monotonically increasing or decreasing.
statement error pq: column "pk" must appear in the GROUP BY clause or be used in an aggregate function
SELECT pk FROM regional_by_row_table_virt GROUP BY (a+10);

# Regression test for incorrectly setting bytes limit in the streamer on remote
# lookups (#108206).
statement ok
CREATE TABLE t108206_p (
id INT PRIMARY KEY,
p_id INT,
INDEX (p_id),
FAMILY (id, p_id)
) LOCALITY REGIONAL BY ROW;
CREATE TABLE t108206_c (
c_id INT PRIMARY KEY,
c_p_id INT,
INDEX (c_p_id),
FAMILY (c_id, c_p_id)
) LOCALITY REGIONAL BY ROW;
INSERT INTO t108206_p (crdb_region, id, p_id) VALUES ('ap-southeast-2', 1, 10), ('ca-central-1', 2, 20), ('us-east-1', 3, 30);
INSERT INTO t108206_c (crdb_region, c_id, c_p_id) VALUES ('ap-southeast-2', 10, 10), ('ca-central-1', 20, 20), ('us-east-1', 30, 30)

statement ok
SET tracing = on,kv,results; SELECT * FROM t108206_c WHERE EXISTS (SELECT * FROM t108206_p WHERE p_id = c_p_id) AND c_id = 20; SET tracing = off

# If the row is not found in the local region, the other regions are searched in
# parallel.
query T
SELECT message FROM [SHOW KV TRACE FOR SESSION] WITH ORDINALITY
WHERE message LIKE 'fetched:%' OR message LIKE 'output row%'
OR message LIKE 'Scan%'
ORDER BY ordinality ASC
----
Scan /Table/138/1/"@"/20/0
Scan /Table/138/1/"\x80"/20/0, /Table/138/1/"\xc0"/20/0
fetched: /t108206_c/t108206_c_pkey/?/20/c_p_id -> /20
Scan /Table/137/2/"@"/2{0-1}
Scan /Table/137/2/"\x80"/2{0-1}, /Table/137/2/"\xc0"/2{0-1}
fetched: /t108206_p/t108206_p_p_id_idx/'ca-central-1'/20/2 -> <undecoded>
output row: [20 20]

# Left join with locality optimized search enabled.
query T retry
SELECT * FROM [EXPLAIN (DISTSQL) SELECT * FROM t108206_c WHERE EXISTS (SELECT * FROM t108206_p WHERE p_id = c_p_id) AND c_id = 20] OFFSET 2
----
·
• lookup join (semi)
│ table: t108206_p@t108206_p_p_id_idx
│ lookup condition: (crdb_region = 'ap-southeast-2') AND (c_p_id = p_id)
│ remote lookup condition: (crdb_region IN ('ca-central-1', 'us-east-1')) AND (c_p_id = p_id)
└── • union all
│ limit: 1
├── • scan
│ missing stats
│ table: t108206_c@t108206_c_pkey
│ spans: [/'ap-southeast-2'/20 - /'ap-southeast-2'/20]
└── • scan
missing stats
table: t108206_c@t108206_c_pkey
spans: [/'ca-central-1'/20 - /'ca-central-1'/20] [/'us-east-1'/20 - /'us-east-1'/20]
·
Diagram: https://cockroachdb.github.io/distsqlplan/decode.html#eJysk1Fr6koQx9_vpxjmRb1scRPLRRYKlhq5KVZ7jXALVSRNpnZPYzZnd0Mtxe9-2Oix6tFyWk4eQjI7-9v__Gf2Dc33DAVGQT-4GsPf0BsNb-A-uLvtX4YDqHfDaBz912_AfoL1eNvn_8wS-P_fYBRAcOfyoH48q9hkFTOZwgUkM_fRgMtBF-rJOubzxhSGvV4UjMFHhrlKaRAvyKC4Rw-nDAutEjJGaRd6qxLCdImCM5R5UVoXnjJMlCYUb2ilzQgFjuOHjEYUp6SbHBmmZGOZVdhtEZ3t16x4pldkeKWycpEbAU4e2yhGhlERu2hzgpPJss0n2PR5k0Ocp-CBsk-kcbpiqEr7rsjYeE4ovJ0Swi4KvmJfq8L7g1V0NhWcVO0fqPZOqn4Xa0jLOIMyVzolTeme3unqSHkDdaaKpr9fWF8upAXvpDR-IM3_jKHXSuYbP1v7x45fCxLQD3pjiIKbEK6H4QDZ1uZia3NR2TmT6RIZ9pV6Lgv4pmQOKhdQ75zDBSxr57wmhOh4nHu8vRn5jg8X0Gk1kOGIFsoSZEd2u9u3rLV39zNY1pI94K_Ebc-Ldc91-jDTNJcqP2lk68DI1meMHJEpVG7ooMm_17Izz00DpXNaT5BRpU7oVqukyl3_DitQFUjJ2PWqv_4J82rJcydoihfbEd0leV8l8UOS_yGptUfiuyT_kNT6kHR-msSdY4-ZenFXWSDfPGdHXj8fdBviuXFti57US4V1Y25QPMaZIYY38TN1yZJeyFwaKxMUVpe0Wv31IwAA__-d1uWE

statement ok
SET vectorize=on

query T
EXPLAIN (VEC) SELECT * FROM child LEFT JOIN parent ON p_id = c_p_id WHERE c_id = 10
----
└ Node 1
└ *rowexec.joinReader
└ *colexec.limitOp
└ *colexec.SerialUnorderedSynchronizer
├ *colfetcher.ColBatchScan
└ *colfetcher.ColBatchScan

statement ok
RESET vectorize
37 changes: 20 additions & 17 deletions pkg/sql/rowexec/joinreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -819,6 +819,24 @@ func sortSpans(spans roachpb.Spans, spanIDs []int) {
}
}

func (jr *joinReader) getBatchBytesLimit() rowinfra.BytesLimit {
if jr.usesStreamer {
// The streamer itself sets the correct TargetBytes parameter on the
// BatchRequests.
return rowinfra.NoBytesLimit
}
if !jr.shouldLimitBatches {
// We deem it safe to not limit the batches in order to get the
// DistSender-level parallelism.
return rowinfra.NoBytesLimit
}
bytesLimit := jr.lookupBatchBytesLimit
if bytesLimit == 0 {
bytesLimit = rowinfra.GetDefaultBatchBytesLimit(jr.EvalCtx.TestingKnobs.ForceProductionValues)
}
return bytesLimit
}

// readInput reads the next batch of input rows and starts an index scan, which
// for lookup join is the lookup of matching KVs for a batch of input rows.
// It can sometimes emit a single row on behalf of the previous batch.
Expand Down Expand Up @@ -1016,19 +1034,8 @@ func (jr *joinReader) readInput() (
// modification here, but we want to be conscious about the memory
// accounting - we don't double count for any memory of spans because the
// joinReaderStrategy doesn't account for any memory used by the spans.
var bytesLimit rowinfra.BytesLimit
if !jr.usesStreamer {
if !jr.shouldLimitBatches {
bytesLimit = rowinfra.NoBytesLimit
} else {
bytesLimit = jr.lookupBatchBytesLimit
if jr.lookupBatchBytesLimit == 0 {
bytesLimit = rowinfra.GetDefaultBatchBytesLimit(jr.EvalCtx.TestingKnobs.ForceProductionValues)
}
}
}
if err = jr.fetcher.StartScan(
jr.Ctx(), spans, spanIDs, bytesLimit, rowinfra.NoRowLimit,
jr.Ctx(), spans, spanIDs, jr.getBatchBytesLimit(), rowinfra.NoRowLimit,
); err != nil {
jr.MoveToDraining(err)
return jrStateUnknown, nil, jr.DrainHelper()
Expand Down Expand Up @@ -1094,12 +1101,8 @@ func (jr *joinReader) fetchLookupRow() (joinReaderState, *execinfrapb.ProducerMe
}

log.VEventf(jr.Ctx(), 1, "scanning %d remote spans", len(spans))
bytesLimit := rowinfra.GetDefaultBatchBytesLimit(jr.EvalCtx.TestingKnobs.ForceProductionValues)
if !jr.shouldLimitBatches {
bytesLimit = rowinfra.NoBytesLimit
}
if err := jr.fetcher.StartScan(
jr.Ctx(), spans, spanIDs, bytesLimit, rowinfra.NoRowLimit,
jr.Ctx(), spans, spanIDs, jr.getBatchBytesLimit(), rowinfra.NoRowLimit,
); err != nil {
jr.MoveToDraining(err)
return jrStateUnknown, jr.DrainHelper()
Expand Down

0 comments on commit c2b6d75

Please sign in to comment.