Skip to content

Commit

Permalink
rowexec: increase the batch size for join reader ordering strategy
Browse files Browse the repository at this point in the history
This commit increases the default value of
`sql.distsql.join_reader_ordering_strategy.batch_size` cluster setting
(which determines the input row batch size for the lookup joins when
ordering needs to be maintained) from 10KiB to 100KiB since the previous
number is likely to have been too conservative. We have seen support
cases (cockroachlabs/support#1627) where
bumping up this setting was needed to get reasonable performance, we
also change this setting to 100KiB in our TPC-E setup
(https://github.com/cockroachlabs/tpc-e/blob/d47d3ea5ce71ecb1be5e0e466a8aa7c94af63d17/tier-a/src/schema.rs#L404).

I did some historical digging, and I believe that the number 10KiB was
chosen somewhat arbitrarily with no real justification in #48058. That
PR changed how we measure the input row batches from the number of rows
to the memory footprint of the input rows. Prior to that change we had
100 rows limit, so my guess that 10KiB was somewhat dependent on that
number.

The reason we don't want this batch size to be too large is that we
buffer all looked up rows in a disk-backed container, so if too many
responses come back (because we included too many input rows in the
batch), that container has to spill to disk. To make sure we don't
regress in such scenarios this commit teaches the join reader to lower
the batch size bytes limit if the container does spill to disk, until
10 KiB which is treated as the minimum.

Release note: None
  • Loading branch information
yuzefovich committed Jul 14, 2022
1 parent 750b231 commit e9f1c05
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 8 deletions.
2 changes: 1 addition & 1 deletion pkg/sql/logictest/testdata/logic_test/information_schema
Original file line number Diff line number Diff line change
Expand Up @@ -4657,7 +4657,7 @@ integer_datetimes on
intervalstyle postgres
intervalstyle_enabled on
is_superuser on
join_reader_ordering_strategy_batch_size 10 KiB
join_reader_ordering_strategy_batch_size 100 KiB
large_full_scan_rows 1000
lc_collate C.UTF-8
lc_ctype C.UTF-8
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/logictest/testdata/logic_test/pg_catalog
Original file line number Diff line number Diff line change
Expand Up @@ -4186,7 +4186,7 @@ inject_retry_errors_enabled off NULL
integer_datetimes on NULL NULL NULL string
intervalstyle postgres NULL NULL NULL string
is_superuser on NULL NULL NULL string
join_reader_ordering_strategy_batch_size 10 KiB NULL NULL NULL string
join_reader_ordering_strategy_batch_size 100 KiB NULL NULL NULL string
large_full_scan_rows 1000 NULL NULL NULL string
lc_collate C.UTF-8 NULL NULL NULL string
lc_ctype C.UTF-8 NULL NULL NULL string
Expand Down Expand Up @@ -4310,7 +4310,7 @@ inject_retry_errors_enabled off NULL
integer_datetimes on NULL user NULL on on
intervalstyle postgres NULL user NULL postgres postgres
is_superuser on NULL user NULL on on
join_reader_ordering_strategy_batch_size 10 KiB NULL user NULL 10 KiB 10 KiB
join_reader_ordering_strategy_batch_size 100 KiB NULL user NULL 100 KiB 100 KiB
large_full_scan_rows 1000 NULL user NULL 1000 1000
lc_collate C.UTF-8 NULL user NULL C.UTF-8 C.UTF-8
lc_ctype C.UTF-8 NULL user NULL C.UTF-8 C.UTF-8
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/logictest/testdata/logic_test/show_source
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ inject_retry_errors_enabled off
integer_datetimes on
intervalstyle postgres
is_superuser on
join_reader_ordering_strategy_batch_size 10 KiB
join_reader_ordering_strategy_batch_size 100 KiB
large_full_scan_rows 1000
lc_collate C.UTF-8
lc_ctype C.UTF-8
Expand Down
19 changes: 19 additions & 0 deletions pkg/sql/rowexec/joinreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1047,9 +1047,28 @@ func (jr *joinReader) performLookup() (joinReaderState, *execinfrapb.ProducerMet
log.VEvent(jr.Ctx, 1, "done joining rows")
jr.strategy.prepareToEmit(jr.Ctx)

// Check if the strategy spilled to disk and reduce the batch size if it
// did.
// TODO(yuzefovich): we should probably also grow the batch size bytes limit
// dynamically if we haven't spilled and are not close to spilling (say not
// exceeding half of the memory limit of the disk-backed container), up to
// some limit. (This would only apply to the joinReaderOrderingStrategy
// since other strategies cannot spill in the first place.) Probably it'd be
// good to look at not just the current batch of input rows, but to keep
// some statistics over the last several batches to make a more informed
// decision.
if jr.strategy.spilled() && jr.batchSizeBytes > joinReaderMinBatchSize {
jr.batchSizeBytes = jr.batchSizeBytes / 2
if jr.batchSizeBytes < joinReaderMinBatchSize {
jr.batchSizeBytes = joinReaderMinBatchSize
}
}

return jrEmittingRows, nil
}

const joinReaderMinBatchSize = 10 << 10 /* 10 KiB */

// emitRow returns the next row from jr.toEmit, if present. Otherwise it
// prepares for another input batch.
func (jr *joinReader) emitRow() (
Expand Down
6 changes: 2 additions & 4 deletions pkg/sql/rowexec/joinreader_strategies.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ var partialJoinSentinel = []int{-1}
//
// Say the joinReader looks up rows in order: (red, x), then (blue, y). Once
// (red, x) is fetched, it is handed to
// joinReaderOderingStrategy.processLookedUpRow(), which will match it against
// joinReaderOrderingStrategy.processLookedUpRow(), which will match it against
// all the corresponding input rows, producing (1, x), (4, x). These two rows
// are not emitted because that would violate the input ordering (well, (1, x)
// could be emitted, but we're not smart enough). So, they are buffered until
Expand Down Expand Up @@ -535,7 +535,7 @@ type joinReaderOrderingStrategy struct {
testingInfoSpilled bool
}

const joinReaderOrderingStrategyBatchSizeDefault = 10 << 10 /* 10 KiB */
const joinReaderOrderingStrategyBatchSizeDefault = 100 << 10 /* 100 KiB */

// JoinReaderOrderingStrategyBatchSize determines the size of input batches used
// to construct a single lookup KV batch by joinReaderOrderingStrategy.
Expand All @@ -548,8 +548,6 @@ var JoinReaderOrderingStrategyBatchSize = settings.RegisterByteSizeSetting(
)

func (s *joinReaderOrderingStrategy) getLookupRowsBatchSizeHint(sd *sessiondata.SessionData) int64 {
// TODO(asubiotto): Eventually we might want to adjust this batch size
// dynamically based on whether the result row container spilled or not.
if sd.JoinReaderOrderingStrategyBatchSize == 0 {
// In some tests the session data might not be set - use the default
// value then.
Expand Down

0 comments on commit e9f1c05

Please sign in to comment.