From 938a85667671d726c634ad5e553ab0e17c4d3511 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Tue, 7 Dec 2021 14:43:31 -0800 Subject: [PATCH] sql,kv: introduce Streamer API and use it for index joins in some cases This commit introduces the Streamer API (see https://github.com/cockroachdb/cockroach/blob/master/docs/RFCS/20210617_index_lookups_memory_limits.md) as well as its implementation for the simplest case - when requests are unique and can be served in any order. It additionally hooks up the implementation to be used by the index joins in both execution engines. There are three main pieces that this commit adds: 1. the `Streamer` struct itself. It is largely the same as described in the RFC. Some notable changes are: - `Cancel` is renamed to `Close` and is made blocking to ensure that all goroutines kicked off by the `Streamer` exit before `Close` returns. - `Shrink` has been removed from the `budget` struct (see below for more details). - furthermore, the `budget` interface has been unexported and the `streamer` has been tightly coupled with the `budget`'s implementation. - the TODO about collecting DistSQL metadata is removed because we are collecting the LeafTxnFinalState already when using the LeafTxn. 2. the limited `Streamer` implementation - only `OutOfOrder` mode is supported when the requests are unique. Notably, buffering and caching of the results is not implemented yet. 3. `TxnKVStreamer` component that sits below the SQL fetchers, uses the `Streamer`, and is an adapter from `BatchResponse`s to key/value pairs that fetchers understand. Although not used at the moment, `TxnKVStreamer` is written under the assumption that a single result can satisfy multiple requests. The memory budget of the `Streamer` is utilized lazily. The RFC was calling for making a large reservation upfront and then shrinking the budget if we see that we don't need that large reservation; however, while working on this I realized that lazy reservations are a better fit for this. The `Streamer` can reserve up to the specified limit (determined by `distsql_workmem` variable) against the root monitor (in the degenerate case of a single large row more memory will be reserved). The reservation then never shrinks under the assumption that if the reservation has gotten large, it means it was needed for higher concurrency (or large responses), and it is likely to be needed for the same reasons in the future. The layout of the main components of the `Streamer` implementation: - in `Enqueue` we have a logic similar to what DistSender does in order to split each request (that might span multiple ranges) into single-range requests. Those sub-requests are batched together to be evaluated by a single `BatchRequest`. - `workerCoordinator.mainLoop` is responsible for taking single-range batches, estimating the corresponding response size, and issuing requests to be evaluated in parallel while adhering to the provided memory budget. - `workerCoordinator.performRequestAsync` is responsible for populating the `BatchRequest` and then processing the results while updating the memory budget. Current known limitations that will be addressed in the follow-up work: - at the moment a single row can be split across multiple BatchResponses when TargetBytes limit is reached when the table has multiple column families; therefore, we use the streamer only for single column family cases. We will expand the KV API shortly to not split the rows in multiple column family cases. - manual refresh of spans when `ReadWithinUncertaintyIntervalError` is encountered by a single streamer in a single flow is not implemented. It is an optimization that is considered a must for the final implementation in order to not regress on simple cases in terms of retriable errors. This will be implemented shortly as a follow-up. - I'm thinking that eventually we probably want to disable the batch splitting done by the DistSender to eliminate unnecessary blocking when the streamer's splitting was incorrect. This would give us some performance improvements in face of range boundary changes, but it doesn't seem important enough for the initial implementation. Release note: None --- pkg/BUILD.bazel | 1 + pkg/kv/kvclient/kvcoord/batch.go | 12 +- pkg/kv/kvclient/kvcoord/batch_test.go | 2 +- pkg/kv/kvclient/kvcoord/dist_sender.go | 4 +- pkg/kv/kvclient/kvcoord/truncate_test.go | 2 +- pkg/kv/kvclient/kvstreamer/BUILD.bazel | 56 + .../kvstreamer/avg_response_estimator.go | 42 + .../kvstreamer/avg_response_estimator_test.go | 56 + pkg/kv/kvclient/kvstreamer/budget.go | 124 ++ pkg/kv/kvclient/kvstreamer/main_test.go | 31 + pkg/kv/kvclient/kvstreamer/streamer.go | 1142 +++++++++++++++++ pkg/kv/kvclient/kvstreamer/streamer_test.go | 271 ++++ pkg/server/server_sql.go | 1 + pkg/sql/colexec/colbuilder/execplan.go | 16 +- pkg/sql/colfetcher/BUILD.bazel | 1 + pkg/sql/colfetcher/cfetcher.go | 39 +- pkg/sql/colfetcher/index_join.go | 96 +- pkg/sql/distsql/server.go | 13 +- pkg/sql/distsql_running.go | 10 + pkg/sql/execinfra/BUILD.bazel | 1 + pkg/sql/execinfra/server_config.go | 3 + pkg/sql/row/BUILD.bazel | 4 + pkg/sql/row/kv_batch_streamer.go | 213 +++ pkg/sql/row/kv_fetcher.go | 11 +- pkg/sql/rowexec/BUILD.bazel | 3 + pkg/sql/rowexec/joinreader.go | 109 +- pkg/sql/rowexec/joinreader_test.go | 3 + pkg/sql/rowexec/project_set_test.go | 2 + pkg/sql/rowexec/utils_test.go | 6 +- 29 files changed, 2228 insertions(+), 46 deletions(-) create mode 100644 pkg/kv/kvclient/kvstreamer/BUILD.bazel create mode 100644 pkg/kv/kvclient/kvstreamer/avg_response_estimator.go create mode 100644 pkg/kv/kvclient/kvstreamer/avg_response_estimator_test.go create mode 100644 pkg/kv/kvclient/kvstreamer/budget.go create mode 100644 pkg/kv/kvclient/kvstreamer/main_test.go create mode 100644 pkg/kv/kvclient/kvstreamer/streamer.go create mode 100644 pkg/kv/kvclient/kvstreamer/streamer_test.go create mode 100644 pkg/sql/row/kv_batch_streamer.go diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 0e606e9fd599..3f92e4f1f473 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -118,6 +118,7 @@ ALL_TESTS = [ "//pkg/keys:keys_test", "//pkg/kv/bulk:bulk_test", "//pkg/kv/kvclient/kvcoord:kvcoord_test", + "//pkg/kv/kvclient/kvstreamer:kvstreamer_test", "//pkg/kv/kvclient/rangecache:rangecache_test", "//pkg/kv/kvclient/rangefeed/rangefeedbuffer:rangefeedbuffer_test", "//pkg/kv/kvclient/rangefeed:rangefeed_test", diff --git a/pkg/kv/kvclient/kvcoord/batch.go b/pkg/kv/kvclient/kvcoord/batch.go index 2f741385c34d..7da549d933c0 100644 --- a/pkg/kv/kvclient/kvcoord/batch.go +++ b/pkg/kv/kvclient/kvcoord/batch.go @@ -18,7 +18,7 @@ import ( var emptySpan = roachpb.Span{} -// truncate restricts all requests to the given key range and returns new, +// Truncate restricts all requests to the given key range and returns new, // truncated, requests. All returned requests are "truncated" to the given span, // and requests which are found to not overlap the given span at all are // removed. A mapping of response index to request index is returned. For @@ -27,8 +27,8 @@ var emptySpan = roachpb.Span{} // reqs = Put[a], Put[c], Put[b], // rs = [a,bb], // -// then truncate(reqs,rs) returns (Put[a], Put[b]) and positions [0,2]. -func truncate( +// then Truncate(reqs,rs) returns (Put[a], Put[b]) and positions [0,2]. +func Truncate( reqs []roachpb.RequestUnion, rs roachpb.RSpan, ) ([]roachpb.RequestUnion, []int, error) { truncateOne := func(args roachpb.Request) (bool, roachpb.Span, error) { @@ -191,18 +191,18 @@ func prev(reqs []roachpb.RequestUnion, k roachpb.RKey) (roachpb.RKey, error) { return candidate, nil } -// next gives the left boundary of the union of all requests which don't affect +// Next gives the left boundary of the union of all requests which don't affect // keys less than the given key. Note that the left boundary is inclusive, that // is, the returned RKey is the inclusive left endpoint of the keys the request // should operate on next. // -// Informally, a call `next(reqs, k)` means: we've already executed the parts of +// Informally, a call `Next(reqs, k)` means: we've already executed the parts of // `reqs` that intersect `[KeyMin, k)`; please tell me how far to the right the // next relevant request begins. // // TODO(tschottdorf): again, better on BatchRequest itself, but can't pull // 'keys' into 'proto'. -func next(reqs []roachpb.RequestUnion, k roachpb.RKey) (roachpb.RKey, error) { +func Next(reqs []roachpb.RequestUnion, k roachpb.RKey) (roachpb.RKey, error) { candidate := roachpb.RKeyMax for _, union := range reqs { inner := union.GetInner() diff --git a/pkg/kv/kvclient/kvcoord/batch_test.go b/pkg/kv/kvclient/kvcoord/batch_test.go index b0ccd8f83651..2fae3611dcaf 100644 --- a/pkg/kv/kvclient/kvcoord/batch_test.go +++ b/pkg/kv/kvclient/kvcoord/batch_test.go @@ -193,7 +193,7 @@ func TestBatchPrevNext(t *testing.T) { args.Key, args.EndKey = span.Key, span.EndKey ba.Add(args) } - if next, err := next(ba.Requests, roachpb.RKey(test.key)); err != nil { + if next, err := Next(ba.Requests, roachpb.RKey(test.key)); err != nil { t.Error(err) } else if !bytes.Equal(next, roachpb.Key(test.expFW)) { t.Errorf("next: expected %q, got %q", test.expFW, next) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index 184cc6e1f55a..2fa16df88a8d 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -1318,7 +1318,7 @@ func (ds *DistSender) divideAndSendBatchToRanges( // one, and unless both descriptors are stale, the next descriptor's // StartKey would move us to the beginning of the current range, // resulting in a duplicate scan. - seekKey, err = next(ba.Requests, ri.Desc().EndKey) + seekKey, err = Next(ba.Requests, ri.Desc().EndKey) nextRS.Key = seekKey } if err != nil { @@ -1509,7 +1509,7 @@ func (ds *DistSender) sendPartialBatch( if err != nil { return response{pErr: roachpb.NewError(err)} } - ba.Requests, positions, err = truncate(ba.Requests, rs) + ba.Requests, positions, err = Truncate(ba.Requests, rs) if len(positions) == 0 && err == nil { // This shouldn't happen in the wild, but some tests exercise it. return response{ diff --git a/pkg/kv/kvclient/kvcoord/truncate_test.go b/pkg/kv/kvclient/kvcoord/truncate_test.go index 4bab7cdd144f..04127f59c25d 100644 --- a/pkg/kv/kvclient/kvcoord/truncate_test.go +++ b/pkg/kv/kvclient/kvcoord/truncate_test.go @@ -164,7 +164,7 @@ func TestTruncate(t *testing.T) { t.Errorf("%d: intersection failure: %v", i, err) continue } - reqs, pos, err := truncate(original.Requests, rs) + reqs, pos, err := Truncate(original.Requests, rs) if err != nil || test.err != "" { if !testutils.IsError(err, test.err) { t.Errorf("%d: %v (expected: %q)", i, err, test.err) diff --git a/pkg/kv/kvclient/kvstreamer/BUILD.bazel b/pkg/kv/kvclient/kvstreamer/BUILD.bazel new file mode 100644 index 000000000000..6718c4893851 --- /dev/null +++ b/pkg/kv/kvclient/kvstreamer/BUILD.bazel @@ -0,0 +1,56 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "kvstreamer", + srcs = [ + "avg_response_estimator.go", + "budget.go", + "streamer.go", + ], + importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvstreamer", + visibility = ["//visibility:public"], + deps = [ + "//pkg/keys", + "//pkg/kv", + "//pkg/kv/kvclient/kvcoord:with-mocks", + "//pkg/kv/kvserver/concurrency/lock", + "//pkg/roachpb:with-mocks", + "//pkg/settings", + "//pkg/settings/cluster", + "//pkg/util/admission", + "//pkg/util/mon", + "//pkg/util/quotapool", + "//pkg/util/stop", + "//pkg/util/syncutil", + "@com_github_cockroachdb_errors//:errors", + ], +) + +go_test( + name = "kvstreamer_test", + srcs = [ + "avg_response_estimator_test.go", + "main_test.go", + "streamer_test.go", + ], + embed = [":kvstreamer"], + deps = [ + "//pkg/base", + "//pkg/kv", + "//pkg/kv/kvclient/kvcoord:with-mocks", + "//pkg/kv/kvserver/concurrency/lock", + "//pkg/roachpb:with-mocks", + "//pkg/security", + "//pkg/security/securitytest", + "//pkg/server", + "//pkg/settings/cluster", + "//pkg/testutils", + "//pkg/testutils/serverutils", + "//pkg/testutils/skip", + "//pkg/util/leaktest", + "//pkg/util/log", + "//pkg/util/mon", + "//pkg/util/randutil", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/kv/kvclient/kvstreamer/avg_response_estimator.go b/pkg/kv/kvclient/kvstreamer/avg_response_estimator.go new file mode 100644 index 000000000000..56c45eff01d4 --- /dev/null +++ b/pkg/kv/kvclient/kvstreamer/avg_response_estimator.go @@ -0,0 +1,42 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvstreamer + +// avgResponseEstimator is a helper that estimates the average size of responses +// received by the Streamer. It is **not** thread-safe. +type avgResponseEstimator struct { + // responseBytes tracks the total footprint of all responses that the + // Streamer has already received. + responseBytes int64 + numResponses int64 +} + +// TODO(yuzefovich): use the optimizer-driven estimates. +const initialAvgResponseSize = 1 << 10 // 1KiB + +func (e *avgResponseEstimator) getAvgResponseSize() int64 { + if e.numResponses == 0 { + return initialAvgResponseSize + } + // TODO(yuzefovich): we currently use a simple average over the received + // responses, but it is likely to be suboptimal because it would be unfair + // to "large" batches that come in late (i.e. it would not be reactive + // enough). Consider using another function here. + return e.responseBytes / e.numResponses +} + +// update updates the actual information of the estimator based on numResponses +// responses that took up responseBytes bytes and correspond to a single +// BatchResponse. +func (e *avgResponseEstimator) update(responseBytes int64, numResponses int64) { + e.responseBytes += responseBytes + e.numResponses += numResponses +} diff --git a/pkg/kv/kvclient/kvstreamer/avg_response_estimator_test.go b/pkg/kv/kvclient/kvstreamer/avg_response_estimator_test.go new file mode 100644 index 000000000000..7c3337f59f26 --- /dev/null +++ b/pkg/kv/kvclient/kvstreamer/avg_response_estimator_test.go @@ -0,0 +1,56 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvstreamer + +import ( + "math" + "testing" + + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/require" +) + +func TestAvgResponseEstimator(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + var e avgResponseEstimator + + // Before receiving any responses, we should be using the initial estimate. + require.Equal(t, int64(initialAvgResponseSize), e.getAvgResponseSize()) + + // Simulate receiving a single response. + firstResponseSize := int64(42) + e.update(firstResponseSize, 1) + // The estimate should now be exactly the size of that single response. + require.Equal(t, firstResponseSize, e.getAvgResponseSize()) + + // Simulate receiving 100 small BatchResponses. + smallResponseSize := int64(63) + for i := 0; i < 100; i++ { + e.update(smallResponseSize*5, 5) + } + // The estimate should now be pretty close to the size of a single response + // in the small BatchResponse. + diff := smallResponseSize - e.getAvgResponseSize() + require.True(t, math.Abs(float64(diff))/float64(smallResponseSize) < 0.05) + + // Now simulate receiving 10 large BatchResponses. + largeResponseSize := int64(17) + for i := 0; i < 10; i++ { + e.update(largeResponseSize*1000, 1000) + } + // The estimate should now be pretty close to the size of a single response + // in the large BatchResponse. + diff = largeResponseSize - e.getAvgResponseSize() + require.True(t, math.Abs(float64(diff))/float64(smallResponseSize) < 0.15) +} diff --git a/pkg/kv/kvclient/kvstreamer/budget.go b/pkg/kv/kvclient/kvstreamer/budget.go new file mode 100644 index 000000000000..44204c80af4c --- /dev/null +++ b/pkg/kv/kvclient/kvstreamer/budget.go @@ -0,0 +1,124 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvstreamer + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) + +// budget abstracts the memory budget that is provided to the Streamer by its +// client. +// +// This struct is a wrapper on top of mon.BoundAccount because we want to +// support the notion of budget "going in debt". This can occur in a degenerate +// case when a single large row exceeds the provided limit. The Streamer is +// expected to have only a single request in flight in this case. Additionally, +// the budget provides blocking (via waitCh) until it gets out of debt. +type budget struct { + mu struct { + syncutil.Mutex + // acc represents the current reservation of this budget against the + // root memory pool. + acc *mon.BoundAccount + } + // limitBytes is the maximum amount of bytes that this budget should reserve + // against acc, i.e. acc.Used() should not exceed limitBytes. However, in a + // degenerate case of a single large row, the budget can go into debt and + // acc.Used() might exceed limitBytes. + limitBytes int64 + // waitCh is used by the main loop of the workerCoordinator to block until + // available() becomes positive (until some release calls occur). + waitCh chan struct{} +} + +// newBudget creates a new budget with the specified limit. The limit determines +// the maximum amount of memory this budget is allowed to use (i.e. it'll be +// used lazily, as needed). +// +// The budget itself is responsible for staying under the limit, so acc should +// be bound to an unlimited memory monitor. This is needed in order to support +// the case of budget going into debt. Note that although it is an "unlimited +// memory monitor", the monitor is still limited by --max-sql-memory in size +// eventually because all monitors are descendants of the root SQL monitor. +// +// The budget takes ownership of the memory account, and the caller is allowed +// to interact with the account only after canceling the Streamer (because +// memory accounts are not thread-safe). +func newBudget(acc *mon.BoundAccount, limitBytes int64) *budget { + b := budget{ + limitBytes: limitBytes, + waitCh: make(chan struct{}), + } + b.mu.acc = acc + return &b +} + +// available returns how many bytes are currently available in the budget. The +// answer can be negative, in case the Streamer has used un-budgeted memory +// (e.g. one result was very large putting the budget in debt). +// +// Note that it's possible that actually available budget is less than the +// number returned - this might occur if --max-sql-memory root pool is almost +// used up. +func (b *budget) available() int64 { + b.mu.Lock() + defer b.mu.Unlock() + return b.limitBytes - b.mu.acc.Used() +} + +// consume draws bytes from the available budget. An error is returned if the +// root pool budget is used up such that the budget's limit cannot be fully +// reserved. +// - allowDebt indicates whether the budget is allowed to go into debt on this +// consumption. In other words, if allowDebt is true, then acc's reservation is +// allowed to exceed limitBytes (but the error is still returned if the root +// pool budget is exceeded). Note that allowDebt value applies only to this +// consume() call and is not carried forward. +// +// b's mutex should not be held when calling this method. +func (b *budget) consume(ctx context.Context, bytes int64, allowDebt bool) error { + b.mu.Lock() + defer b.mu.Unlock() + return b.consumeLocked(ctx, bytes, allowDebt) +} + +// consumeLocked is the same as consume but assumes that the b's lock is held. +func (b *budget) consumeLocked(ctx context.Context, bytes int64, allowDebt bool) error { + b.mu.AssertHeld() + // If we're asked to not exceed the limit (and the limit is greater than + // five bytes - limits of five bytes or less are treated as a special case + // for "forced disk spilling" scenarios like in logic tests), we have to + // check whether we'll stay within the budget. + if !allowDebt && b.limitBytes > 5 { + if b.mu.acc.Used()+bytes > b.limitBytes { + return mon.MemoryResource.NewBudgetExceededError(bytes, b.mu.acc.Used(), b.limitBytes) + } + } + return b.mu.acc.Grow(ctx, bytes) +} + +// release returns bytes to the available budget. +func (b *budget) release(ctx context.Context, bytes int64) { + b.mu.Lock() + defer b.mu.Unlock() + b.mu.acc.Shrink(ctx, bytes) + if b.limitBytes > b.mu.acc.Used() { + // Since we now have some available budget, we non-blockingly send on + // the wait channel to notify the mainCoordinator about it. + select { + case b.waitCh <- struct{}{}: + default: + } + } +} diff --git a/pkg/kv/kvclient/kvstreamer/main_test.go b/pkg/kv/kvclient/kvstreamer/main_test.go new file mode 100644 index 000000000000..40dc560be5f8 --- /dev/null +++ b/pkg/kv/kvclient/kvstreamer/main_test.go @@ -0,0 +1,31 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvstreamer_test + +import ( + "os" + "testing" + + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/util/randutil" +) + +//go:generate ../../../util/leaktest/add-leaktest.sh *_test.go + +func TestMain(m *testing.M) { + security.SetAssetLoader(securitytest.EmbeddedAssets) + randutil.SeedForTests() + serverutils.InitTestServerFactory(server.TestServerFactory) + os.Exit(m.Run()) +} diff --git a/pkg/kv/kvclient/kvstreamer/streamer.go b/pkg/kv/kvclient/kvstreamer/streamer.go new file mode 100644 index 000000000000..7964e9e745f5 --- /dev/null +++ b/pkg/kv/kvclient/kvstreamer/streamer.go @@ -0,0 +1,1142 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvstreamer + +import ( + "context" + "runtime" + "sort" + "sync" + "unsafe" + + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/admission" + "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/cockroachdb/cockroach/pkg/util/quotapool" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" +) + +// OperationMode describes the mode of operation of the Streamer. +type OperationMode int + +const ( + _ OperationMode = iota + // InOrder is the mode of operation in which the results are delivered in + // the order in which the requests were handed off to the Streamer. This + // mode forces the Streamer to buffer the results it produces through its + // internal parallel execution of the requests. Since the results of the + // concurrent requests can come in an arbitrary order, they are buffered and + // might end up being dropped (resulting in wasted/duplicate work) to make + // space for the results at the front of the line. This would occur when the + // budget limitBytes is reached and the size estimates that lead to too much + // concurrency in the execution were wrong. + InOrder + // OutOfOrder is the mode of operation in which the results are delivered in + // the order in which they're produced. The caller will use the keys field + // of each Result to associate it with the corresponding requests. This mode + // of operation lets the Streamer reuse the memory budget as quickly as + // possible. + OutOfOrder +) + +// Remove an unused warning for now. +// TODO(yuzefovich): remove this when supported. +var _ = InOrder + +// Result describes the result of performing a single KV request. +type Result struct { + // GetResp and ScanResp represent the response to a request. Only one of the + // two will be populated. + // + // The responses are to be considered immutable; the Streamer might hold on + // to the respective memory. Calling MemoryTok.Release() tells the Streamer + // that the response is no longer needed. + GetResp *roachpb.GetResponse + // ScanResp can contain a partial response to a ScanRequest (when Complete + // is false). In that case, there will be a further result with the + // continuation; that result will use the same Key. Notably, SQL rows will + // never be split across multiple results. + ScanResp struct { + *roachpb.ScanResponse + // If the Result represents a scan result, Complete indicates whether + // this is the last response for the respective scan, or if there are + // more responses to come. In any case, ScanResp never contains partial + // rows (i.e. a single row is never split into different Results). + // + // When running in InOrder mode, Results for a single scan will be + // delivered in key order (in addition to results for different scans + // being delivered in request order). When running in OutOfOrder mode, + // Results for a single scan can be delivered out of key order (in + // addition to results for different scans being delivered out of + // request order). + Complete bool + } + // EnqueueKeys identifies the requests that this Result satisfies. In + // OutOfOrder mode, a single Result can satisfy multiple identical requests. + // In InOrder mode a Result can only satisfy multiple consecutive requests. + EnqueueKeys []int + // MemoryTok.Release() needs to be called by the recipient once it's not + // referencing this Result any more. If this was the last (or only) + // reference to this Result, the memory used by this Result is made + // available in the Streamer's budget. + // + // Internally, Results are refcounted. Multiple Results referencing the same + // GetResp/ScanResp can be returned from separate `GetResults()` calls, and + // the Streamer internally does buffering and caching of Results - which + // also contributes to the refcounts. + MemoryTok ResultMemoryToken + // position tracks the ordinal among all originally enqueued requests that + // this result satisfies. See singleRangeBatch.positions for more details. + // TODO(yuzefovich): this might need to be []int when non-unique requests + // are supported. + position int +} + +// ResultMemoryToken represents a handle to a Result's memory tracking. The +// recipient of a Result is required to call Release() when the Result is not in +// use any more so that its memory is returned to the Streamer's budget. +type ResultMemoryToken interface { + // Release decrements the refcount. + Release(context.Context) +} + +// Hints provides different hints to the Streamer for optimization purposes. +type Hints struct { + // UniqueRequests tells the Streamer that the requests will be unique. As + // such, there's no point in de-duping them or caching results. + UniqueRequests bool +} + +type resultMemoryToken struct { + budget *budget + toRelease int64 +} + +var _ ResultMemoryToken = &resultMemoryToken{} + +func (t *resultMemoryToken) Release(ctx context.Context) { + t.budget.release(ctx, t.toRelease) +} + +// Streamer provides a streaming oriented API for reading from the KV layer. At +// the moment the Streamer only works when SQL rows are comprised of a single KV +// (i.e. a single column family). +// TODO(yuzefovich): lift the restriction on a single column family once KV is +// updated so that rows are never split across different BatchResponses when +// TargetBytes limitBytes is exceeded. +// +// The example usage is roughly as follows: +// +// s := NewStreamer(...) +// s.Init(OperationMode, Hints) +// ... +// for needMoreKVs { +// // Check whether there are results to the previously enqueued requests. +// // This will block if no results are available, but there are some +// // enqueued requests. +// results, err := s.GetResults(ctx) +// // err check +// ... +// if len(results) > 0 { +// processResults(results) +// // return to the client +// } +// // All previously enqueued requests have already been responded to. +// if moreRequestsToEnqueue { +// err := s.Enqueue(ctx, requests, enqueueKeys) +// // err check +// ... +// } else { +// // done +// ... +// } +// } +// ... +// s.Close() +// +// The Streamer builds on top of the BatchRequest API provided by the DistSender +// and aims to allow for executing the requests in parallel (to improve the +// performance) while setting the memory limits on those requests (for stability +// purposes). +// +// The parallelism is achieved by splitting the incoming requests into +// single-range batches where each such batch will hit a fast-path in the +// DistSender (unless there have been changes to range boundaries). Since these +// batches are executed concurrently, the LeafTxns are used. +// +// The memory limit handling is achieved by the Streamer guessing the size of +// the response for each request and setting TargetBytes accordingly. The +// concurrency of the Streamer is limited by its memory limit. +// +// The Streamer additionally utilizes different optimizations to improve the +// performance: +// - when possible, sorting requests in key order to take advantage of low-level +// Pebble locality optimizations +// - when necessary, buffering the responses received out of order +// - when necessary, caching the responses to short-circuit repeated lookups. +// TODO(yuzefovich): add an optimization of transparent refreshes when there is +// a single Streamer in the local flow. +// TODO(yuzefovich): support pipelining of Enqueue and GetResults calls. +type Streamer struct { + distSender *kvcoord.DistSender + stopper *stop.Stopper + + mode OperationMode + hints Hints + budget *budget + + coordinator workerCoordinator + coordinatorStarted bool + coordinatorCtxCancel context.CancelFunc + + waitGroup sync.WaitGroup + + enqueueKeys []int + + // waitForResults is used to block GetResults() call until some results are + // available. + waitForResults chan struct{} + + mu struct { + syncutil.Mutex + + avgResponseEstimator avgResponseEstimator + + // requestsToServe contains all single-range sub-requests that have yet + // to be served. + // TODO(yuzefovich): consider using ring.Buffer instead of a slice. + requestsToServe []singleRangeBatch + + // numRangesLeftPerScanRequest tracks how many ranges a particular + // originally enqueued ScanRequest touches, but scanning of those ranges + // isn't complete. It is allocated lazily when the first ScanRequest is + // encountered in Enqueue. + numRangesLeftPerScanRequest []int + + // numEnqueuedRequests tracks the number of the originally enqueued + // requests. + numEnqueuedRequests int + + // numCompleteRequests tracks the number of the originally enqueued + // requests that have already been completed. + numCompleteRequests int + + // numRequestsInFlight tracks the number of single-range batches that + // are currently being served asynchronously (i.e. those that have + // already left requestsToServe queue, but for which we haven't received + // the results yet). + // TODO(yuzefovich): check whether the contention on mu when accessing + // this field is sufficient to justify pulling it out into an atomic. + numRequestsInFlight int + + // results are the results of already completed requests that haven't + // been returned by GetResults() yet. + results []Result + err error + } +} + +// streamerConcurrencyLimit is an upper bound on the number of asynchronous +// requests that a single Streamer can have in flight. The default value for +// this setting is chosen arbitrarily as 1/8th of the default value for the +// senderConcurrencyLimit. +var streamerConcurrencyLimit = settings.RegisterIntSetting( + settings.TenantWritable, + "kv.streamer.concurrency_limit", + "maximum number of asynchronous requests by a single streamer", + max(128, int64(8*runtime.GOMAXPROCS(0))), + settings.NonNegativeInt, +) + +func max(a, b int64) int64 { + if a > b { + return a + } + return b +} + +// NewStreamer creates a new Streamer. +// +// limitBytes determines the maximum amount of memory this Streamer is allowed +// to use (i.e. it'll be used lazily, as needed). The more memory it has, the +// higher its internal concurrency and throughput. +// +// acc should be bound to an unlimited memory monitor, and the Streamer itself +// is responsible for staying under the limitBytes. +// +// The Streamer takes ownership of the memory account, and the caller is allowed +// to interact with the account only after canceling the Streamer (because +// memory accounts are not thread-safe). +func NewStreamer( + distSender *kvcoord.DistSender, + stopper *stop.Stopper, + txn *kv.Txn, + st *cluster.Settings, + lockWaitPolicy lock.WaitPolicy, + limitBytes int64, + acc *mon.BoundAccount, +) *Streamer { + s := &Streamer{ + distSender: distSender, + stopper: stopper, + budget: newBudget(acc, limitBytes), + } + s.coordinator = workerCoordinator{ + s: s, + txn: txn, + lockWaitPolicy: lockWaitPolicy, + requestAdmissionHeader: txn.AdmissionHeader(), + responseAdmissionQ: txn.DB().SQLKVResponseAdmissionQ, + } + // TODO(yuzefovich): consider lazily allocating this IntPool only when + // enqueued requests span multiple batches. + s.coordinator.asyncSem = quotapool.NewIntPool( + "single Streamer async concurrency", + uint64(streamerConcurrencyLimit.Get(&st.SV)), + ) + s.coordinator.mu.hasWork = sync.NewCond(&s.coordinator.mu) + streamerConcurrencyLimit.SetOnChange(&st.SV, func(ctx context.Context) { + s.coordinator.asyncSem.UpdateCapacity(uint64(streamerConcurrencyLimit.Get(&st.SV))) + }) + stopper.AddCloser(s.coordinator.asyncSem.Closer("stopper")) + return s +} + +// Init initializes the Streamer. +// +// OperationMode controls the order in which results are delivered to the +// client. When possible, prefer OutOfOrder mode. +// +// Hints can be used to hint the aggressiveness of the caching policy. In +// particular, it can be used to disable caching when the client knows that all +// looked-up keys are unique (e.g. in the case of an index-join). +func (s *Streamer) Init(mode OperationMode, hints Hints) { + if mode != OutOfOrder { + panic(errors.AssertionFailedf("only OutOfOrder mode is supported")) + } + s.mode = mode + if !hints.UniqueRequests { + panic(errors.AssertionFailedf("only unique requests are currently supported")) + } + s.hints = hints + s.waitForResults = make(chan struct{}, 1) +} + +// Enqueue dispatches multiple requests for execution. Results are delivered +// through the GetResults call. If enqueueKeys is not nil, it needs to contain +// one ID for each request; responses will reference that ID so that the client +// can associate them to the requests. If enqueueKeys is nil, then the responses +// will reference the ordinals of the corresponding requests among reqs. +// +// Multiple requests can specify the same key. In this case, their respective +// responses will also reference the same key. This is useful, for example, for +// "range-based lookup joins" where multiple spans are read in the context of +// the same input-side row (see multiSpanGenerator implementation of +// rowexec.joinReaderSpanGenerator interface for more details). +// +// The Streamer takes over the given requests, will perform the memory +// accounting against its budget and might modify the requests in place. +// +// In InOrder operation mode, responses will be delivered in reqs order. +// +// It is the caller's responsibility to ensure that the memory footprint of reqs +// (i.e. roachpb.Spans inside of the requests) is reasonable. Enqueue will +// return an error if that footprint exceeds the Streamer's limitBytes. The +// exception is made only when a single request is enqueued in order to allow +// the caller to proceed when the key to lookup is arbitrarily large. As a rule +// of thumb though, the footprint of reqs should be on the order of MBs, and not +// tens of MBs. +// +// Currently, enqueuing new requests while there are still requests in progress +// from the previous invocation is prohibited. +// TODO(yuzefovich): lift this restriction and introduce the pipelining. +func (s *Streamer) Enqueue( + ctx context.Context, reqs []roachpb.RequestUnion, enqueueKeys []int, +) (retErr error) { + if !s.coordinatorStarted { + var coordinatorCtx context.Context + coordinatorCtx, s.coordinatorCtxCancel = context.WithCancel(ctx) + s.waitGroup.Add(1) + if err := s.stopper.RunAsyncTask(coordinatorCtx, "streamer-coordinator", s.coordinator.mainLoop); err != nil { + // The new goroutine wasn't spun up, so mainLoop won't get executed + // and we have to decrement the wait group ourselves. + s.waitGroup.Done() + return err + } + s.coordinatorStarted = true + } + + // TODO(yuzefovich): we might want to have more fine-grained lock + // acquisitions once pipelining is implemented. + s.mu.Lock() + defer func() { + if retErr != nil && s.mu.err == nil { + // Set the error so that mainLoop of the worker coordinator exits + // as soon as possible, without issuing any more requests. + s.mu.err = retErr + } + s.mu.Unlock() + }() + + if enqueueKeys != nil && len(enqueueKeys) != len(reqs) { + return errors.AssertionFailedf("invalid enqueueKeys: len(reqs) = %d, len(enqueueKeys) = %d", len(reqs), len(enqueueKeys)) + } + s.enqueueKeys = enqueueKeys + + if s.mu.numEnqueuedRequests != s.mu.numCompleteRequests { + return errors.AssertionFailedf("Enqueue is called before the previous requests have been completed") + } + if len(s.mu.results) > 0 { + return errors.AssertionFailedf("Enqueue is called before the results of the previous requests have been retrieved") + } + + s.mu.numEnqueuedRequests = len(reqs) + s.mu.numCompleteRequests = 0 + + // The minimal key range encompassing all requests contained within. + // Local addressing has already been resolved. + rs, err := keys.Range(reqs) + if err != nil { + return err + } + + // Divide the given requests into single-range batches that are added to + // requestsToServe, and the worker coordinator will then pick those batches + // up to execute asynchronously. + var totalReqsMemUsage int64 + // TODO(yuzefovich): in InOrder mode we need to treat the head-of-the-line + // request differently. + seekKey := rs.Key + const scanDir = kvcoord.Ascending + ri := kvcoord.MakeRangeIterator(s.distSender) + ri.Seek(ctx, seekKey, scanDir) + if !ri.Valid() { + return ri.Error() + } + firstScanRequest := true + for ; ri.Valid(); ri.Seek(ctx, seekKey, scanDir) { + // Truncate the request span to the current range. + singleRangeSpan, err := rs.Intersect(ri.Token().Desc()) + if err != nil { + return err + } + // Find all requests that touch the current range. + singleRangeReqs, positions, err := kvcoord.Truncate(reqs, singleRangeSpan) + if err != nil { + return err + } + for _, pos := range positions { + if _, isScan := reqs[pos].GetInner().(*roachpb.ScanRequest); isScan { + if firstScanRequest { + // We have some ScanRequests, so we have to set up + // numRangesLeftPerScanRequest. + if cap(s.mu.numRangesLeftPerScanRequest) < len(reqs) { + s.mu.numRangesLeftPerScanRequest = make([]int, len(reqs)) + } else { + // We can reuse numRangesLeftPerScanRequest allocated on + // the previous call to Enqueue after we zero it out. + s.mu.numRangesLeftPerScanRequest = s.mu.numRangesLeftPerScanRequest[:len(reqs)] + for n := 0; n < len(s.mu.numRangesLeftPerScanRequest); { + n += copy(s.mu.numRangesLeftPerScanRequest[n:], zeroIntSlice) + } + } + } + s.mu.numRangesLeftPerScanRequest[pos]++ + firstScanRequest = false + } + } + + // TODO(yuzefovich): perform the de-duplication here. + //if !s.hints.UniqueRequests { + //} + + r := singleRangeBatch{ + reqs: singleRangeReqs, + positions: positions, + reqsReservedBytes: requestsMemUsage(singleRangeReqs), + } + totalReqsMemUsage += r.reqsReservedBytes + + if s.mode == OutOfOrder { + // Sort all single-range requests to be in the key order. + sort.Sort(&r) + } + + s.mu.requestsToServe = append(s.mu.requestsToServe, r) + + // Determine next seek key, taking potentially sparse requests into + // consideration. + // + // In next iteration, query next range. + // It's important that we use the EndKey of the current descriptor + // as opposed to the StartKey of the next one: if the former is stale, + // it's possible that the next range has since merged the subsequent + // one, and unless both descriptors are stale, the next descriptor's + // StartKey would move us to the beginning of the current range, + // resulting in a duplicate scan. + seekKey, err = kvcoord.Next(reqs, ri.Desc().EndKey) + rs.Key = seekKey + if err != nil { + return err + } + } + + // Account for the memory used by all the requests. We allow the budget to + // go into debt iff a single request was enqueued. This is needed to support + // the case of arbitrarily large keys - the caller is expected to produce + // requests with such cases one at a time. + allowDebt := len(reqs) == 1 + if err = s.budget.consume(ctx, totalReqsMemUsage, allowDebt); err != nil { + return err + } + + // TODO(yuzefovich): it might be better to notify the coordinator once + // one singleRangeBatch object has been appended to s.mu.requestsToServe. + s.coordinator.mu.hasWork.Signal() + return nil +} + +// GetResults blocks until at least one result is available. If the operation +// mode is OutOfOrder, any result will do, and the caller is expected to examine +// Result.EnqueueKeys to understand which request the result corresponds to. For +// InOrder, only head-of-line results will do. Zero-length result slice is +// returned once all enqueued requests have been responded to. +func (s *Streamer) GetResults(ctx context.Context) ([]Result, error) { + s.mu.Lock() + results := s.mu.results + err := s.mu.err + s.mu.results = nil + allComplete := s.mu.numCompleteRequests == s.mu.numEnqueuedRequests + // Non-blockingly clear the waitForResults channel in case we've just picked + // up some results. We do so while holding the mutex so that new results + // aren't appended. + select { + case <-s.waitForResults: + default: + } + s.mu.Unlock() + + if len(results) > 0 || allComplete || err != nil { + return results, err + } + + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-s.waitForResults: + s.mu.Lock() + results = s.mu.results + err = s.mu.err + s.mu.results = nil + s.mu.Unlock() + return results, err + } +} + +// notifyGetResultsLocked non-blockingly sends a message on waitForResults +// channel. This method should be called only while holding the lock of s.mu so +// that other results couldn't be appended which would cause us to miss the +// notification about that. +func (s *Streamer) notifyGetResultsLocked() { + s.mu.AssertHeld() + select { + case s.waitForResults <- struct{}{}: + default: + } +} + +// setError sets the error on the Streamer if no error has been set previously +// and unblocks GetResults() if needed. +// +// The mutex of s must not be already held. +func (s *Streamer) setError(err error) { + s.mu.Lock() + defer s.mu.Unlock() + if s.mu.err == nil { + s.mu.err = err + } + s.notifyGetResultsLocked() +} + +// Close cancels all in-flight operations and releases all of the resources of +// the Streamer. It blocks until all goroutines created by the Streamer exit. No +// other calls on s are allowed after this. +func (s *Streamer) Close() { + if s.coordinatorStarted { + s.coordinatorCtxCancel() + s.coordinator.mu.Lock() + s.coordinator.mu.done = true + // Unblock the coordinator in case it is waiting for more work. + s.coordinator.mu.hasWork.Signal() + s.coordinator.mu.Unlock() + } + s.waitGroup.Wait() + *s = Streamer{} +} + +// getNumRequestsInFlight returns the number of requests that are currently in +// flight. This method should be called without holding the lock of s. +func (s *Streamer) getNumRequestsInFlight() int { + s.mu.Lock() + defer s.mu.Unlock() + return s.mu.numRequestsInFlight +} + +// adjustNumRequestsInFlight updates the number of requests that are currently +// in flight. This method should be called without holding the lock of s. +func (s *Streamer) adjustNumRequestsInFlight(delta int) { + s.mu.Lock() + defer s.mu.Unlock() + s.mu.numRequestsInFlight += delta +} + +// singleRangeBatch contains parts of the originally enqueued requests that have +// been truncated to be within a single range. All requests within the +// singleRangeBatch will be issued as a single BatchRequest. +type singleRangeBatch struct { + reqs []roachpb.RequestUnion + // positions is a 1-to-1 mapping with reqs to indicate which ordinal among + // the originally enqueued requests a particular reqs[i] corresponds to. In + // other words, if reqs[i] is (or a part of) enqueuedReqs[j], then + // positions[i] = j. + // TODO(yuzefovich): this might need to be [][]int when non-unique requests + // are supported. + positions []int + // reqsReservedBytes tracks the memory reservation against the budget for + // the memory usage of reqs. + reqsReservedBytes int64 +} + +var _ sort.Interface = &singleRangeBatch{} + +func (r *singleRangeBatch) Len() int { + return len(r.reqs) +} + +func (r *singleRangeBatch) Swap(i, j int) { + r.reqs[i], r.reqs[j] = r.reqs[j], r.reqs[i] + r.positions[i], r.positions[j] = r.positions[j], r.positions[i] +} + +// Less returns true if r.reqs[i]'s key comes before r.reqs[j]'s key. +func (r *singleRangeBatch) Less(i, j int) bool { + // TODO(yuzefovich): figure out whether it's worth extracting the keys when + // constructing singleRangeBatch object. + return r.reqs[i].GetInner().Header().Key.Compare(r.reqs[j].GetInner().Header().Key) < 0 +} + +type workerCoordinator struct { + s *Streamer + txn *kv.Txn + lockWaitPolicy lock.WaitPolicy + + asyncSem *quotapool.IntPool + + // For request and response admission control. + requestAdmissionHeader roachpb.AdmissionHeader + responseAdmissionQ *admission.WorkQueue + + mu struct { + syncutil.Mutex + hasWork *sync.Cond + // done is set to true once the Streamer is closed meaning the worker + // coordinator must exit. + done bool + } +} + +// mainLoop runs throughout the lifetime of the Streamer (from the first Enqueue +// call until Cancel) and routes the single-range batches for asynchronous +// execution. This function is dividing up the Streamer's budget for each of +// those batches and won't start executing the batches if the available budget +// is insufficient. The function exits when an error is encountered by one of +// the asynchronous requests. +func (w *workerCoordinator) mainLoop(ctx context.Context) { + defer w.s.waitGroup.Done() + for { + // Get next requests to serve. + requestsToServe, avgResponseSize, shouldExit := w.getRequests() + if shouldExit { + return + } + if len(requestsToServe) == 0 { + // If the Streamer isn't closed yet, block until there are enqueued + // requests. + w.mu.Lock() + if !w.mu.done { + w.mu.hasWork.Wait() + } + w.mu.Unlock() + if ctx.Err() != nil { + w.s.setError(ctx.Err()) + return + } + continue + } + + // Now wait until there is enough budget to at least receive one full + // response (but only if there are requests in flight - if there are + // none, then we might have a degenerate case when a single row is + // expected to exceed the budget). + // TODO(yuzefovich): consider using a multiple of avgResponseSize here. + for w.s.getNumRequestsInFlight() > 0 && w.s.budget.available() < avgResponseSize { + select { + case <-w.s.budget.waitCh: + case <-ctx.Done(): + w.s.setError(ctx.Err()) + return + } + } + + err := w.issueRequestsForAsyncProcessing(ctx, requestsToServe, avgResponseSize) + if err != nil { + w.s.setError(err) + return + } + } +} + +// getRequests returns all currently enqueued requests to be served. +// +// A boolean that indicates whether the coordinator should exit is returned. +func (w *workerCoordinator) getRequests() ( + requestsToServe []singleRangeBatch, + avgResponseSize int64, + shouldExit bool, +) { + w.s.mu.Lock() + defer w.s.mu.Unlock() + requestsToServe = w.s.mu.requestsToServe + avgResponseSize = w.s.mu.avgResponseEstimator.getAvgResponseSize() + shouldExit = w.s.mu.err != nil + return requestsToServe, avgResponseSize, shouldExit +} + +// issueRequestsForAsyncProcessing iterates over the given requests and issues +// them to be served asynchronously while there is enough budget available to +// receive the responses. Once the budget is exhausted, no new requests are +// issued, the only exception is made for the case when there are no other +// requests in flight, and in that scenario, a single request will be issued. +// +// It is assumed that requestsToServe is a prefix of w.s.mu.requestsToServe +// (i.e. it is possible that some other requests have been appended to +// w.s.mu.requestsToServe after requestsToServe have been grabbed). All issued +// requests are removed from w.s.mu.requestToServe. +func (w *workerCoordinator) issueRequestsForAsyncProcessing( + ctx context.Context, requestsToServe []singleRangeBatch, avgResponseSize int64, +) error { + var numRequestsIssued int + defer func() { + w.s.mu.Lock() + // We can just slice here since we only append to requestToServe at + // the moment. + w.s.mu.requestsToServe = w.s.mu.requestsToServe[numRequestsIssued:] + w.s.mu.Unlock() + }() + w.s.budget.mu.Lock() + defer w.s.budget.mu.Unlock() + + headOfLine := w.s.getNumRequestsInFlight() == 0 + var budgetIsExhausted bool + for numRequestsIssued < len(requestsToServe) && !budgetIsExhausted { + availableBudget := w.s.budget.limitBytes - w.s.budget.mu.acc.Used() + if availableBudget < avgResponseSize { + if !headOfLine { + // We don't have enough budget available to serve this request, + // and there are other requests in flight, so we'll wait for + // some of them to finish. + break + } + budgetIsExhausted = true + if availableBudget < 1 { + // The budget is already in debt, and we have no requests in + // flight. This occurs when we have very large roachpb.Span in + // the request. In such a case, we still want to make progress + // by giving the smallest TargetBytes possible while asking the + // KV layer to not return an empty response. + availableBudget = 1 + } + } + singleRangeReqs := requestsToServe[numRequestsIssued] + targetBytes := int64(len(singleRangeReqs.reqs)) * avgResponseSize + if targetBytes > availableBudget { + targetBytes = availableBudget + } + if err := w.s.budget.consumeLocked(ctx, targetBytes, headOfLine /* allowDebt */); err != nil { + // This error cannot be because of the budget going into debt. If + // headOfLine is true, then we're allowing debt; otherwise, we have + // truncated targetBytes above to not exceed availableBudget, and + // we're holding the budget's mutex. Thus, the error indicates that + // the root memory pool has been exhausted. + if !headOfLine { + // There are some requests in flight, so we'll let them finish. + // + // This is opportunistic behavior where we're hoping that once + // other requests are fully processed (i.e. the corresponding + // results are Release()'d), we'll be able to make progress on + // this request too, without exceeding the root memory pool. + // + // We're not really concerned about pushing the node towards the + // OOM situation because we're still staying within the root + // memory pool limit (which should have some safety gap with the + // available RAM). Furthermore, if other queries are consuming + // all of the root memory pool limit, then the head-of-the-line + // request will notice it and will exit accordingly. + break + } + // We don't have any requests in flight, so we'll exit to be safe + // (in order not to OOM the node). Most likely this occurs when + // there are concurrent memory-intensive queries which this Streamer + // has no control over. + // + // We could have issued this head-of-the-line request with lower + // targetBytes value (unless it is already 1), but the fact that the + // root memory pool is exhausted indicates that the node might be + // overloaded already, so it seems better to not ask it to receive + // any more responses at the moment. + return err + } + w.performRequestAsync(ctx, singleRangeReqs, targetBytes, headOfLine) + numRequestsIssued++ + headOfLine = false + } + return nil +} + +// addRequest adds a single-range batch to be processed later. +func (w *workerCoordinator) addRequest(req singleRangeBatch) { + w.s.mu.Lock() + defer w.s.mu.Unlock() + w.s.mu.requestsToServe = append(w.s.mu.requestsToServe, req) + w.mu.hasWork.Signal() +} + +func (w *workerCoordinator) asyncRequestCleanup() { + w.s.adjustNumRequestsInFlight(-1 /* delta */) + w.s.waitGroup.Done() +} + +// performRequestAsync dispatches the given single-range batch for evaluation +// asynchronously. If the batch cannot be evaluated fully (due to exhausting its +// memory limitBytes), the "resume" single-range batch will be added into +// requestsToServe, and mainLoop will pick that up to process later. +// +// targetBytes specifies the memory budget that this single-range batch should +// be issued with. targetBytes bytes have already been consumed from the budget, +// and this amount of memory is owned by the goroutine that is spun up to +// perform the request. Once the response is received, performRequestAsync +// reconciles the budget so that the actual footprint of the response is +// consumed. Each Result produced based on that response will track a part of +// the memory reservation (according to the Result's footprint) that will be +// returned back to the budget once Result.MemoryTok.Release is called. +// +// headOfLine indicates whether this request is the current head of the line. +// Head-of-the-line requests are treated specially in a sense that they are +// allowed to put the budget into debt. The caller is responsible for ensuring +// that there is at most one asynchronous request with headOfLine=true at all +// times. +func (w *workerCoordinator) performRequestAsync( + ctx context.Context, req singleRangeBatch, targetBytes int64, headOfLine bool, +) { + w.s.waitGroup.Add(1) + w.s.adjustNumRequestsInFlight(1 /* delta */) + if err := w.s.stopper.RunAsyncTaskEx( + ctx, + stop.TaskOpts{ + TaskName: "streamer-lookup-async", + Sem: w.asyncSem, + WaitForSem: true, + }, + func(ctx context.Context) { + defer w.asyncRequestCleanup() + var ba roachpb.BatchRequest + ba.Header.WaitPolicy = w.lockWaitPolicy + ba.Header.TargetBytes = targetBytes + ba.Header.TargetBytesAllowEmpty = !headOfLine + // TODO(yuzefovich): consider setting MaxSpanRequestKeys whenever + // applicable (#67885). + ba.AdmissionHeader = w.requestAdmissionHeader + // We always have some memory reserved against the memory account, + // regardless of the value of headOfLine. + ba.AdmissionHeader.NoMemoryReservedAtSource = false + ba.Requests = req.reqs + + // TODO(yuzefovich): in Enqueue we split all requests into + // single-range batches, so ideally ba touches a single range in + // which case we hit the fast path in the DistSender. However, if + // the range boundaries have changed after we performed the split + // (or we had stale range cache at the time of the split), the + // DistSender will transparently re-split ba into several + // sub-batches that will be executed sequentially because of the + // presence of limits. We could, instead, ask the DistSender to not + // perform that re-splitting and return an error, then we'll rely on + // the updated range cache to perform re-splitting ourselves. This + // should offer some performance improvements since we'd eliminate + // unnecessary blocking (due to sequential evaluation of sub-batches + // by the DistSender). For the initial implementation it doesn't + // seem important though. + br, err := w.txn.Send(ctx, ba) + if err != nil { + // TODO(yuzefovich): if err is + // ReadWithinUncertaintyIntervalError and there is only a single + // Streamer in a single local flow, attempt to transparently + // refresh. + w.s.setError(err.GoError()) + return + } + + var resumeReq singleRangeBatch + // We will reuse the slices for the resume spans, if any. + resumeReq.reqs = req.reqs[:0] + resumeReq.positions = req.positions[:0] + var results []Result + var numCompleteGetResponses int + // memoryFootprintBytes tracks the total memory footprint of + // non-empty responses. This will be equal to the sum of the all + // resultMemoryTokens created. + var memoryFootprintBytes int64 + var hasNonEmptyScanResponse bool + for i, resp := range br.Responses { + enqueueKey := req.positions[i] + if w.s.enqueueKeys != nil { + enqueueKey = w.s.enqueueKeys[req.positions[i]] + } + reply := resp.GetInner() + origReq := req.reqs[i] + // Unset the original request so that we lose the reference to + // the span. + req.reqs[i] = roachpb.RequestUnion{} + switch origRequest := origReq.GetInner().(type) { + case *roachpb.GetRequest: + get := reply.(*roachpb.GetResponse) + if get.ResumeSpan != nil { + // This Get wasn't completed - update the original + // request according to the ResumeSpan and include it + // into the batch again. + origRequest.SetSpan(*get.ResumeSpan) + resumeReq.reqs = append(resumeReq.reqs, origReq) + resumeReq.positions = append(resumeReq.positions, req.positions[i]) + } else { + // This Get was completed. + toRelease := int64(get.Size()) + result := Result{ + GetResp: get, + // This currently only works because all requests + // are unique. + EnqueueKeys: []int{enqueueKey}, + MemoryTok: &resultMemoryToken{ + toRelease: toRelease, + budget: w.s.budget, + }, + position: req.positions[i], + } + memoryFootprintBytes += toRelease + results = append(results, result) + numCompleteGetResponses++ + } + + case *roachpb.ScanRequest: + scan := reply.(*roachpb.ScanResponse) + resumeSpan := scan.ResumeSpan + if len(scan.Rows) > 0 || len(scan.BatchResponses) > 0 { + toRelease := int64(scan.Size()) + result := Result{ + // This currently only works because all requests + // are unique. + EnqueueKeys: []int{enqueueKey}, + MemoryTok: &resultMemoryToken{ + toRelease: toRelease, + budget: w.s.budget, + }, + position: req.positions[i], + } + result.ScanResp.ScanResponse = scan + // Complete field will be set below. + memoryFootprintBytes += toRelease + results = append(results, result) + hasNonEmptyScanResponse = true + } + if resumeSpan != nil { + // This Scan wasn't completed - update the original + // request according to the resumeSpan and include it + // into the batch again. + origRequest.SetSpan(*resumeSpan) + resumeReq.reqs = append(resumeReq.reqs, origReq) + resumeReq.positions = append(resumeReq.positions, req.positions[i]) + } + } + } + + // Now adjust the budget based on the actual memory footprint of + // non-empty responses as well as resume spans, if any. + respOverestimate := targetBytes - memoryFootprintBytes + var reqsMemUsage int64 + if len(resumeReq.reqs) > 0 { + reqsMemUsage = requestsMemUsage(resumeReq.reqs) + } + reqOveraccounted := req.reqsReservedBytes - reqsMemUsage + overaccountedTotal := respOverestimate + reqOveraccounted + if overaccountedTotal >= 0 { + w.s.budget.release(ctx, overaccountedTotal) + } else { + // There is an under-accounting at the moment, so we have to + // increase the memory reservation. + // + // This under-accounting can occur in a couple of edge cases: + // 1) the estimate of the response sizes is pretty good (i.e. + // respOverestimate is around 0), but we received many partial + // responses with ResumeSpans that take up much more space than + // the original requests; + // 2) we have a single large row in the response. In this case + // headOfLine must be true (targetBytes might be 1 or higher, + // but not enough for that large row). + toConsume := -overaccountedTotal + if err := w.s.budget.consume(ctx, toConsume, headOfLine /* allowDebt */); err != nil { + w.s.budget.release(ctx, targetBytes) + if !headOfLine { + // Since this is not the head of the line, we'll just + // discard the result and add the request back to be + // served. + // + // This is opportunistic behavior where we're hoping + // that once other requests are fully processed (i.e. + // the corresponding results are Release()'d), we'll be + // able to make progress on this request too. + // TODO(yuzefovich): consider updating the + // avgResponseSize and/or storing the information about + // the returned bytes size in req. + w.addRequest(req) + return + } + // The error indicates that the root memory pool has been + // exhausted, so we'll exit to be safe (in order not to OOM + // the node). + // TODO(yuzefovich): if the response contains multiple rows, + // consider adding the request back to be served with a note + // to issue it with smaller targetBytes. + w.s.setError(err) + return + } + } + // Update the resume request accordingly. + resumeReq.reqsReservedBytes = reqsMemUsage + + // Do admission control after we've finalized the memory accounting. + if br != nil && w.responseAdmissionQ != nil { + responseAdmission := admission.WorkInfo{ + TenantID: roachpb.SystemTenantID, + Priority: admission.WorkPriority(w.requestAdmissionHeader.Priority), + CreateTime: w.requestAdmissionHeader.CreateTime, + } + if _, err := w.responseAdmissionQ.Admit(ctx, responseAdmission); err != nil { + w.s.setError(err) + return + } + } + + // If we have any results, finalize them. + if len(results) > 0 { + w.finalizeSingleRangeResults( + results, memoryFootprintBytes, hasNonEmptyScanResponse, + numCompleteGetResponses, + ) + } + + // If we have any incomplete requests, add them back into the work + // pool. + if len(resumeReq.reqs) > 0 { + w.addRequest(resumeReq) + } + }); err != nil { + // The new goroutine for the request wasn't spun up, so we have to + // perform the cleanup of this request ourselves. + w.asyncRequestCleanup() + w.s.setError(err) + } +} + +// finalizeSingleRangeResults "finalizes" the results of evaluation of a +// singleRangeBatch. By "finalization" we mean setting Complete field of +// ScanResp to correct value for all scan responses, updating the estimate of an +// average response size, and telling the Streamer about these results. +// +// This method assumes that results has length greater than zero. +func (w *workerCoordinator) finalizeSingleRangeResults( + results []Result, + actualMemoryReservation int64, + hasNonEmptyScanResponse bool, + numCompleteGetResponses int, +) { + w.s.mu.Lock() + defer w.s.mu.Unlock() + + numCompleteResponses := numCompleteGetResponses + // If we have non-empty scan response, it might be complete. This will be + // the case when a scan response doesn't have a resume span and there are no + // other scan requests in flight (involving other ranges) that are part of + // the same original ScanRequest. + // + // We need to do this check as well as adding the results to be returned to + // the client as an atomic operation so that Complete is set to true only on + // the last partial scan response. + if hasNonEmptyScanResponse { + for _, r := range results { + if r.ScanResp.ScanResponse != nil { + if r.ScanResp.ResumeSpan == nil { + // The scan within the range is complete. + w.s.mu.numRangesLeftPerScanRequest[r.position]-- + r.ScanResp.Complete = w.s.mu.numRangesLeftPerScanRequest[r.position] == 0 + numCompleteResponses++ + } else { + // Unset the ResumeSpan on the result in order to + // not confuse the user of the Streamer. Non-nil + // resume span was already included into resumeReq. + r.ScanResp.ResumeSpan = nil + } + } + } + } + + // Update the average response size based on this batch. + // TODO(yuzefovich): some of the responses might be partial, yet the + // estimator doesn't distinguish the footprint of the full response vs the + // partial one. Think more about this. + w.s.mu.avgResponseEstimator.update(actualMemoryReservation, int64(len(results))) + w.s.mu.numCompleteRequests += numCompleteResponses + // Store the results and non-blockingly notify the Streamer about them. + w.s.mu.results = append(w.s.mu.results, results...) + w.s.notifyGetResultsLocked() +} + +var zeroIntSlice []int + +func init() { + zeroIntSlice = make([]int, 1<<10) +} + +const requestUnionSliceOverhead = int64(unsafe.Sizeof([]roachpb.RequestUnion{})) + +func requestsMemUsage(reqs []roachpb.RequestUnion) int64 { + memUsage := requestUnionSliceOverhead + // Slice up to the capacity to account for everything. + for _, r := range reqs[:cap(reqs)] { + memUsage += int64(r.Size()) + } + return memUsage +} diff --git a/pkg/kv/kvclient/kvstreamer/streamer_test.go b/pkg/kv/kvclient/kvstreamer/streamer_test.go new file mode 100644 index 000000000000..2183cbbd0f42 --- /dev/null +++ b/pkg/kv/kvclient/kvstreamer/streamer_test.go @@ -0,0 +1,271 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvstreamer + +import ( + "context" + "math" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/cockroachdb/cockroach/pkg/util/randutil" + "github.com/stretchr/testify/require" +) + +func getStreamer( + ctx context.Context, s serverutils.TestServerInterface, limitBytes int64, acc *mon.BoundAccount, +) *Streamer { + return NewStreamer( + s.DistSenderI().(*kvcoord.DistSender), + s.Stopper(), + kv.NewTxn(ctx, s.DB(), s.NodeID()), + cluster.MakeTestingClusterSettings(), + lock.WaitPolicy(0), + limitBytes, + acc, + ) +} + +// TestStreamerLimitations verifies that the streamer panics or encounters +// errors in currently unsupported or invalid scenarios. +func TestStreamerLimitations(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + s, _, _ := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(ctx) + + getStreamer := func() *Streamer { + return getStreamer(ctx, s, math.MaxInt64, nil /* acc */) + } + + t.Run("InOrder mode unsupported", func(t *testing.T) { + require.Panics(t, func() { + streamer := getStreamer() + streamer.Init(InOrder, Hints{UniqueRequests: true}) + }) + }) + + t.Run("non-unique requests unsupported", func(t *testing.T) { + require.Panics(t, func() { + streamer := getStreamer() + streamer.Init(OutOfOrder, Hints{UniqueRequests: false}) + }) + }) + + t.Run("invalid enqueueKeys", func(t *testing.T) { + streamer := getStreamer() + defer streamer.Close() + streamer.Init(OutOfOrder, Hints{UniqueRequests: true}) + // Use a single request but two keys which is invalid. + reqs := []roachpb.RequestUnion{{Value: &roachpb.RequestUnion_Get{}}} + enqueueKeys := []int{0, 1} + require.Error(t, streamer.Enqueue(ctx, reqs, enqueueKeys)) + }) + + t.Run("pipelining unsupported", func(t *testing.T) { + streamer := getStreamer() + defer streamer.Close() + streamer.Init(OutOfOrder, Hints{UniqueRequests: true}) + get := roachpb.NewGet(roachpb.Key("key"), false /* forUpdate */) + reqs := []roachpb.RequestUnion{{ + Value: &roachpb.RequestUnion_Get{ + Get: get.(*roachpb.GetRequest), + }, + }} + require.NoError(t, streamer.Enqueue(ctx, reqs, nil /* enqueueKeys */)) + // It is invalid to enqueue more requests before the previous have been + // responded to. + require.Error(t, streamer.Enqueue(ctx, reqs, nil /* enqueueKeys */)) + }) +} + +// TestLargeKeys verifies that the Streamer successfully completes the queries +// when the keys to lookup are large (i.e. the enqueued requests themselves have +// large memory footprint). +func TestLargeKeys(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + skip.UnderStress(t, "the test inserts large blobs, and the machine can be overloaded when under stress") + + rng, _ := randutil.NewTestRand() + s, db, _ := serverutils.StartServer(t, base.TestServerArgs{}) + ctx := context.Background() + defer s.Stopper().Stop(ctx) + + // Lower the distsql_workmem limit so that we can operate with smaller + // blobs. Note that the joinReader in the row-by-row engine will override + // the limit if it is lower than 8MiB, so we cannot go lower than that here. + _, err := db.Exec("SET distsql_workmem='8MiB'") + require.NoError(t, err) + // In both engines, the index joiner buffers input rows up to 4MiB in size, + // so we have a couple of interesting options for the blob size: + // - 3000000 is interesting because it doesn't exceed the buffer size, yet + // two rows with such blobs do exceed it. The index joiners are expected to + // to process each row on its own. + // - 5000000 is interesting because a single row already exceeds the buffer + // size. + for _, blobSize := range []int{3000000, 5000000} { + // onlyLarge determines whether only large blobs are inserted or a mix + // of large and small blobs. + for _, onlyLarge := range []bool{false, true} { + _, err = db.Exec("DROP TABLE IF EXISTS foo") + require.NoError(t, err) + // We set up such a table that contains two large columns, one of them + // being the primary key. The idea is that the query below will first + // read from the secondary index which would include only the PK blob, + // and that will be used to construct index join lookups (i.e. the PK + // blobs will be the enqueued requests for the Streamer) whereas the + // other blob will be part of the response. + _, err = db.Exec("CREATE TABLE foo (pk_blob STRING PRIMARY KEY, attribute INT, blob TEXT, INDEX(attribute))") + require.NoError(t, err) + + // Insert a handful of rows. + numRows := rng.Intn(3) + 3 + for i := 0; i < numRows; i++ { + letter := string(byte('a') + byte(i)) + valueSize := blobSize + if !onlyLarge && rng.Float64() < 0.5 { + // If we're using a mix of large and small values, with 50% + // use a small value now. + valueSize = rng.Intn(10) + 1 + } + _, err = db.Exec("INSERT INTO foo SELECT repeat($1, $2), 1, repeat($1, $2)", letter, valueSize) + require.NoError(t, err) + } + + // Perform an index join so that the Streamer API is used. + query := "SELECT * FROM foo@foo_attribute_idx WHERE attribute=1" + testutils.RunTrueAndFalse(t, "vectorize", func(t *testing.T, vectorize bool) { + vectorizeMode := "off" + if vectorize { + vectorizeMode = "on" + } + _, err = db.Exec("SET vectorize = " + vectorizeMode) + require.NoError(t, err) + _, err = db.Exec(query) + require.NoError(t, err) + }) + } + } +} + +// TestStreamerBudgetErrorInEnqueue verifies the behavior of the Streamer in +// Enqueue when its limit and/or root pool limit are exceeded. Additional tests +// around the memory limit errors (when the responses exceed the limit) can be +// found in TestMemoryLimit in pkg/sql. +func TestStreamerBudgetErrorInEnqueue(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + s, db, _ := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(ctx) + + // Create a dummy table for which we know the encoding of valid keys. + _, err := db.Exec("CREATE TABLE foo (pk_blob STRING PRIMARY KEY, attribute INT, blob TEXT, INDEX(attribute))") + require.NoError(t, err) + + // makeGetRequest returns a valid GetRequest that wants to lookup a key with + // value 'a' repeated keySize number of times in the primary index of table + // foo. + makeGetRequest := func(keySize int) roachpb.RequestUnion { + var res roachpb.RequestUnion + var get roachpb.GetRequest + var union roachpb.RequestUnion_Get + key := make([]byte, keySize+6) + key[0] = 190 + key[1] = 137 + key[2] = 18 + for i := 0; i < keySize; i++ { + key[i+3] = 97 + } + key[keySize+3] = 0 + key[keySize+4] = 1 + key[keySize+5] = 136 + get.Key = key + union.Get = &get + res.Value = &union + return res + } + + // Imitate a root SQL memory monitor with 1MiB size. + const rootPoolSize = 1 << 20 /* 1MiB */ + rootMemMonitor := mon.NewMonitor( + "root", /* name */ + mon.MemoryResource, + nil, /* curCount */ + nil, /* maxHist */ + -1, /* increment */ + math.MaxInt64, /* noteworthy */ + cluster.MakeTestingClusterSettings(), + ) + rootMemMonitor.Start(ctx, nil /* pool */, mon.MakeStandaloneBudget(rootPoolSize)) + defer rootMemMonitor.Stop(ctx) + + acc := rootMemMonitor.MakeBoundAccount() + defer acc.Close(ctx) + + getStreamer := func(limitBytes int64) *Streamer { + acc.Clear(ctx) + s := getStreamer(ctx, s, limitBytes, &acc) + s.Init(OutOfOrder, Hints{UniqueRequests: true}) + return s + } + + t.Run("single key exceeds limit", func(t *testing.T) { + const limitBytes = 10 + streamer := getStreamer(limitBytes) + defer streamer.Close() + + // A single request that exceeds the limit should be allowed. + reqs := make([]roachpb.RequestUnion, 1) + reqs[0] = makeGetRequest(limitBytes + 1) + require.NoError(t, streamer.Enqueue(ctx, reqs, nil /* enqueueKeys */)) + }) + + t.Run("single key exceeds root pool size", func(t *testing.T) { + const limitBytes = 10 + streamer := getStreamer(limitBytes) + defer streamer.Close() + + // A single request that exceeds the limit as well as the root SQL pool + // should be denied. + reqs := make([]roachpb.RequestUnion, 1) + reqs[0] = makeGetRequest(rootPoolSize + 1) + require.Error(t, streamer.Enqueue(ctx, reqs, nil /* enqueueKeys */)) + }) + + t.Run("multiple keys exceed limit", func(t *testing.T) { + const limitBytes = 10 + streamer := getStreamer(limitBytes) + defer streamer.Close() + + // Create two requests which exceed the limit when combined. + reqs := make([]roachpb.RequestUnion, 2) + reqs[0] = makeGetRequest(limitBytes/2 + 1) + reqs[1] = makeGetRequest(limitBytes/2 + 1) + require.Error(t, streamer.Enqueue(ctx, reqs, nil /* enqueueKeys */)) + }) +} diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 15771b9f7ffd..474acd4c4b53 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -572,6 +572,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { ExternalStorage: cfg.externalStorage, ExternalStorageFromURI: cfg.externalStorageFromURI, + DistSender: cfg.distSender, RangeCache: cfg.distSender.RangeDescriptorCache(), SQLSQLResponseAdmissionQ: cfg.sqlSQLResponseAdmissionQ, CollectionFactory: collectionFactory, diff --git a/pkg/sql/colexec/colbuilder/execplan.go b/pkg/sql/colexec/colbuilder/execplan.go index 299d1308dcd4..daa7b0402424 100644 --- a/pkg/sql/colexec/colbuilder/execplan.go +++ b/pkg/sql/colexec/colbuilder/execplan.go @@ -753,11 +753,23 @@ func NewColOperator( kvFetcherMemAcc := args.MonitorRegistry.CreateUnlimitedMemAccount( ctx, flowCtx, "kvfetcher" /* opName */, spec.ProcessorID, ) + var streamerBudgetAcc *mon.BoundAccount + // We have an index join, and when the ordering doesn't have to be + // maintained, we might use the Streamer API which requires a + // separate memory account that is bound to an unlimited memory + // monitor. + if !core.JoinReader.MaintainOrdering { + streamerBudgetAcc = args.MonitorRegistry.CreateUnlimitedMemAccount( + ctx, flowCtx, "streamer" /* opName */, spec.ProcessorID, + ) + } inputTypes := make([]*types.T, len(spec.Input[0].ColumnTypes)) copy(inputTypes, spec.Input[0].ColumnTypes) indexJoinOp, err := colfetcher.NewColIndexJoin( - ctx, getStreamingAllocator(ctx, args), colmem.NewAllocator(ctx, cFetcherMemAcc, factory), kvFetcherMemAcc, - flowCtx, args.ExprHelper, inputs[0].Root, core.JoinReader, post, inputTypes, + ctx, getStreamingAllocator(ctx, args), + colmem.NewAllocator(ctx, cFetcherMemAcc, factory), + kvFetcherMemAcc, streamerBudgetAcc, flowCtx, args.ExprHelper, + inputs[0].Root, core.JoinReader, post, inputTypes, ) if err != nil { return r, err diff --git a/pkg/sql/colfetcher/BUILD.bazel b/pkg/sql/colfetcher/BUILD.bazel index c30642f3f2e5..94ddbc62ab7f 100644 --- a/pkg/sql/colfetcher/BUILD.bazel +++ b/pkg/sql/colfetcher/BUILD.bazel @@ -17,6 +17,7 @@ go_library( "//pkg/col/typeconv", "//pkg/keys", "//pkg/kv", + "//pkg/kv/kvclient/kvstreamer", "//pkg/roachpb:with-mocks", "//pkg/sql/catalog", "//pkg/sql/catalog/colinfo", diff --git a/pkg/sql/colfetcher/cfetcher.go b/pkg/sql/colfetcher/cfetcher.go index 4570ad55a3dc..17d3de0fb219 100644 --- a/pkg/sql/colfetcher/cfetcher.go +++ b/pkg/sql/colfetcher/cfetcher.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvstreamer" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" @@ -579,6 +580,15 @@ func getColumnTypesFromCols(cols []catalog.Column, outTypes []*types.T) []*types return outTypes } +//gcassert:inline +func (rf *cFetcher) setFetcher(f *row.KVFetcher, limitHint rowinfra.RowLimit) { + rf.fetcher = f + rf.machine.lastRowPrefix = nil + rf.machine.limitHint = int(limitHint) + rf.machine.state[0] = stateResetBatch + rf.machine.state[1] = stateInitFetch +} + // StartScan initializes and starts the key-value scan. Can be used multiple // times. // @@ -646,11 +656,30 @@ func (rf *cFetcher) StartScan( if err != nil { return err } - rf.fetcher = f - rf.machine.lastRowPrefix = nil - rf.machine.limitHint = int(limitHint) - rf.machine.state[0] = stateResetBatch - rf.machine.state[1] = stateInitFetch + rf.setFetcher(f, limitHint) + return nil +} + +// StartScanStreaming initializes and starts the key-value scan using the +// Streamer API. Can be used multiple times. +// +// The fetcher takes ownership of the spans slice - it can modify the slice and +// will perform the memory accounting accordingly. The caller can only reuse the +// spans slice after the fetcher has been closed (which happens when the fetcher +// emits the first zero batch), and if the caller does, it becomes responsible +// for the memory accounting. +func (rf *cFetcher) StartScanStreaming( + ctx context.Context, + streamer *kvstreamer.Streamer, + spans roachpb.Spans, + limitHint rowinfra.RowLimit, +) error { + kvBatchFetcher, err := row.NewTxnKVStreamer(ctx, streamer, spans, rf.lockStrength) + if err != nil { + return err + } + f := row.NewKVStreamingFetcher(kvBatchFetcher) + rf.setFetcher(f, limitHint) return nil } diff --git a/pkg/sql/colfetcher/index_join.go b/pkg/sql/colfetcher/index_join.go index a17afd63057e..eb6cc42158d1 100644 --- a/pkg/sql/colfetcher/index_join.go +++ b/pkg/sql/colfetcher/index_join.go @@ -12,11 +12,13 @@ package colfetcher import ( "context" + "math" "sort" "time" "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/col/typeconv" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvstreamer" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecargs" "github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecspan" @@ -26,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/memsize" + "github.com/cockroachdb/cockroach/pkg/sql/row" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/rowinfra" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -102,6 +105,15 @@ type ColIndexJoin struct { // maintainOrdering is true when the index join is required to maintain its // input ordering, in which case the ordering of the spans cannot be changed. maintainOrdering bool + + // usesStreamer indicates whether the ColIndexJoin is using the Streamer + // API. + usesStreamer bool + streamerInfo struct { + *kvstreamer.Streamer + budgetAcc *mon.BoundAccount + budgetLimit int64 + } } var _ colexecop.KVReader = &ColIndexJoin{} @@ -119,6 +131,21 @@ func (s *ColIndexJoin) Init(ctx context.Context) { // tracing is enabled. s.Ctx, s.tracingSpan = execinfra.ProcessorSpan(s.Ctx, "colindexjoin") s.Input.Init(s.Ctx) + if s.usesStreamer { + s.streamerInfo.Streamer = kvstreamer.NewStreamer( + s.flowCtx.Cfg.DistSender, + s.flowCtx.Stopper(), + s.flowCtx.Txn, + s.flowCtx.EvalCtx.Settings, + row.GetWaitPolicy(s.rf.lockWaitPolicy), + s.streamerInfo.budgetLimit, + s.streamerInfo.budgetAcc, + ) + s.streamerInfo.Streamer.Init( + kvstreamer.OutOfOrder, + kvstreamer.Hints{UniqueRequests: true}, + ) + } } type indexJoinState uint8 @@ -174,16 +201,27 @@ func (s *ColIndexJoin) Next() coldata.Batch { // the memory accounting - we don't double count for any memory of // spans because the spanAssembler released all of the relevant // memory from its account in GetSpans(). - if err := s.rf.StartScan( - s.Ctx, - s.flowCtx.Txn, - spans, - nil, /* bsHeader */ - false, /* limitBatches */ - rowinfra.NoBytesLimit, - rowinfra.NoRowLimit, - s.flowCtx.EvalCtx.TestingKnobs.ForceProductionBatchSizes, - ); err != nil { + var err error + if s.usesStreamer { + err = s.rf.StartScanStreaming( + s.Ctx, + s.streamerInfo.Streamer, + spans, + rowinfra.NoRowLimit, + ) + } else { + err = s.rf.StartScan( + s.Ctx, + s.flowCtx.Txn, + spans, + nil, /* bsHeader */ + false, /* limitBatches */ + rowinfra.NoBytesLimit, + rowinfra.NoRowLimit, + s.flowCtx.EvalCtx.TestingKnobs.ForceProductionBatchSizes, + ) + } + if err != nil { colexecerror.InternalError(err) } s.state = indexJoinScanning @@ -385,6 +423,7 @@ func NewColIndexJoin( allocator *colmem.Allocator, fetcherAllocator *colmem.Allocator, kvFetcherMemAcc *mon.BoundAccount, + streamerBudgetAcc *mon.BoundAccount, flowCtx *execinfra.FlowCtx, helper *colexecargs.ExprHelper, input colexecop.Operator, @@ -420,19 +459,44 @@ func NewColIndexJoin( return nil, err } + memoryLimit := execinfra.GetWorkMemLimit(flowCtx) + + useStreamer := row.CanUseStreamer(ctx, flowCtx.EvalCtx.Settings) && !spec.MaintainOrdering + if useStreamer { + // TODO(yuzefovich): remove this conditional once multiple column + // families are supported. + if maxKeysPerRow, err := tableArgs.desc.KeysPerRow(tableArgs.index.GetID()); err != nil { + return nil, err + } else if maxKeysPerRow > 1 { + // Currently, the streamer only supports cases with a single column + // family. + useStreamer = false + } else { + if streamerBudgetAcc == nil { + return nil, errors.AssertionFailedf("streamer budget account is nil when the Streamer API is desired") + } + // Keep the quarter of the memory limit for the output batch of the + // cFetcher, and we'll give the remaining three quarters to the + // streamer budget below. + memoryLimit = int64(math.Ceil(float64(memoryLimit) / 4.0)) + } + } + fetcher := cFetcherPool.Get().(*cFetcher) fetcher.cFetcherArgs = cFetcherArgs{ spec.LockingStrength, spec.LockingWaitPolicy, flowCtx.EvalCtx.SessionData().LockTimeout, - execinfra.GetWorkMemLimit(flowCtx), + memoryLimit, // Note that the correct estimated row count will be set by the index // joiner for each set of spans to read. 0, /* estimatedRowCount */ false, /* reverse */ flowCtx.TraceKV, } - if err = fetcher.Init(flowCtx.Codec(), fetcherAllocator, kvFetcherMemAcc, tableArgs, spec.HasSystemColumns); err != nil { + if err = fetcher.Init( + flowCtx.Codec(), fetcherAllocator, kvFetcherMemAcc, tableArgs, spec.HasSystemColumns, + ); err != nil { fetcher.Release() return nil, err } @@ -448,8 +512,13 @@ func NewColIndexJoin( spanAssembler: spanAssembler, ResultTypes: tableArgs.typs, maintainOrdering: spec.MaintainOrdering, + usesStreamer: useStreamer, } op.prepareMemLimit(inputTypes) + if useStreamer { + op.streamerInfo.budgetLimit = 3 * memoryLimit + op.streamerInfo.budgetAcc = streamerBudgetAcc + } return op, nil } @@ -532,5 +601,8 @@ func (s *ColIndexJoin) closeInternal() { // spanAssembler can be nil if Release() has already been called. s.spanAssembler.Close() } + if s.streamerInfo.Streamer != nil { + s.streamerInfo.Streamer.Close() + } s.batch = nil } diff --git a/pkg/sql/distsql/server.go b/pkg/sql/distsql/server.go index 96e1bcd8b801..8712cb63a2f9 100644 --- a/pkg/sql/distsql/server.go +++ b/pkg/sql/distsql/server.go @@ -408,8 +408,19 @@ func (ds *ServerImpl) setupFlow( // that have no remote flows and also no concurrency, the txn comes from // localState.Txn. Otherwise, we create a txn based on the request's // LeafTxnInputState. + useLeaf := false + for _, proc := range req.Flow.Processors { + if jr := proc.Core.JoinReader; jr != nil { + if !jr.MaintainOrdering && jr.IsIndexJoin() { + // Index joins when ordering doesn't have to be maintained are + // executed via the Streamer API that has concurrency. + useLeaf = true + break + } + } + } var txn *kv.Txn - if localState.IsLocal && !f.ConcurrentTxnUse() { + if localState.IsLocal && !f.ConcurrentTxnUse() && !useLeaf { txn = localState.Txn } else { // If I haven't created the leaf already, do it now. diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index 2e7b16dcd94e..e61ca87a4762 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -447,6 +447,16 @@ func (dsp *DistSQLPlanner) Run( localState.HasConcurrency = localState.HasConcurrency || execinfra.HasParallelProcessors(flow) } } + for _, proc := range plan.Processors { + if js := proc.Spec.Core.JoinReader; js != nil { + if !js.MaintainOrdering && js.IsIndexJoin() { + // Index joins when ordering doesn't have to be maintained + // are executed via the Streamer API that has concurrency. + localState.HasConcurrency = true + break + } + } + } } if localState.MustUseLeafTxn() && txn != nil { // Set up leaf txns using the txnCoordMeta if we need to. diff --git a/pkg/sql/execinfra/BUILD.bazel b/pkg/sql/execinfra/BUILD.bazel index 7dbe25f72fad..fe9946cbd326 100644 --- a/pkg/sql/execinfra/BUILD.bazel +++ b/pkg/sql/execinfra/BUILD.bazel @@ -32,6 +32,7 @@ go_library( "//pkg/jobs", "//pkg/keys", "//pkg/kv", + "//pkg/kv/kvclient/kvcoord:with-mocks", "//pkg/kv/kvclient/rangecache:with-mocks", "//pkg/kv/kvserver/diskmap", "//pkg/kv/kvserver/kvserverbase", diff --git a/pkg/sql/execinfra/server_config.go b/pkg/sql/execinfra/server_config.go index 9eeda739e7d6..1e405372998e 100644 --- a/pkg/sql/execinfra/server_config.go +++ b/pkg/sql/execinfra/server_config.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/diskmap" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" @@ -145,6 +146,8 @@ type ServerConfig struct { // AdminVerifyProtectedTimestampRequest. ProtectedTimestampProvider protectedts.Provider + DistSender *kvcoord.DistSender + // RangeCache is used by processors that were supposed to have been planned on // the leaseholders of the data ranges that they're consuming. These // processors query the cache to see if they should communicate updates to the diff --git a/pkg/sql/row/BUILD.bazel b/pkg/sql/row/BUILD.bazel index 77250db03c06..77fe5d008a13 100644 --- a/pkg/sql/row/BUILD.bazel +++ b/pkg/sql/row/BUILD.bazel @@ -11,6 +11,7 @@ go_library( "helper.go", "inserter.go", "kv_batch_fetcher.go", + "kv_batch_streamer.go", "kv_fetcher.go", "locking.go", "metrics.go", @@ -23,14 +24,17 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/sql/row", visibility = ["//visibility:public"], deps = [ + "//pkg/clusterversion", "//pkg/jobs", "//pkg/jobs/jobspb", "//pkg/keys", "//pkg/kv", + "//pkg/kv/kvclient/kvstreamer", "//pkg/kv/kvserver", "//pkg/kv/kvserver/concurrency/lock", "//pkg/roachpb:with-mocks", "//pkg/settings", + "//pkg/settings/cluster", "//pkg/sql/catalog", "//pkg/sql/catalog/catalogkeys", "//pkg/sql/catalog/catalogkv", diff --git a/pkg/sql/row/kv_batch_streamer.go b/pkg/sql/row/kv_batch_streamer.go new file mode 100644 index 000000000000..dbd8946aced3 --- /dev/null +++ b/pkg/sql/row/kv_batch_streamer.go @@ -0,0 +1,213 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package row + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvstreamer" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" +) + +// CanUseStreamer returns whether the kvstreamer.Streamer API should be used. +func CanUseStreamer(ctx context.Context, settings *cluster.Settings) bool { + // TODO(yuzefovich): remove the version gate in 22.2 cycle. + return settings.Version.IsActive(ctx, clusterversion.TargetBytesAvoidExcess) && + useStreamerEnabled.Get(&settings.SV) +} + +// useStreamerEnabled determines whether the Streamer API should be used. +// TODO(yuzefovich): remove this in 22.2. +var useStreamerEnabled = settings.RegisterBoolSetting( + settings.TenantWritable, + "sql.distsql.use_streamer.enabled", + "determines whether the usage of the Streamer API is allowed. "+ + "Enabling this will increase the speed of lookup/index joins "+ + "while adhering to memory limits.", + true, +) + +// TxnKVStreamer handles retrieval of key/values. +type TxnKVStreamer struct { + streamer *kvstreamer.Streamer + spans roachpb.Spans + + // numOutstandingRequests tracks the number of requests that haven't been + // fully responded to yet. + numOutstandingRequests int + + results []kvstreamer.Result + lastResultState struct { + kvstreamer.Result + // numEmitted tracks the number of times this result has been fully + // emitted. + numEmitted int + // Used only for ScanResponses. + remainingBatches [][]byte + } +} + +var _ KVBatchFetcher = &TxnKVStreamer{} + +// NewTxnKVStreamer creates a new TxnKVStreamer. +func NewTxnKVStreamer( + ctx context.Context, + streamer *kvstreamer.Streamer, + spans roachpb.Spans, + lockStrength descpb.ScanLockingStrength, +) (*TxnKVStreamer, error) { + if log.ExpensiveLogEnabled(ctx, 2) { + log.VEventf(ctx, 2, "Scan %s", spans) + } + keyLocking := getKeyLockingStrength(lockStrength) + reqs := spansToRequests(spans, false /* reverse */, keyLocking) + if err := streamer.Enqueue(ctx, reqs, nil /* enqueueKeys */); err != nil { + return nil, err + } + return &TxnKVStreamer{ + streamer: streamer, + spans: spans, + numOutstandingRequests: len(spans), + }, nil +} + +// proceedWithLastResult processes the result which must be already set on the +// lastResultState and emits the first part of the response (the only part for +// GetResponses). +func (f *TxnKVStreamer) proceedWithLastResult( + ctx context.Context, +) (skip bool, kvs []roachpb.KeyValue, batchResp []byte, err error) { + result := f.lastResultState.Result + if get := result.GetResp; get != nil { + if get.IntentValue != nil { + return false, nil, nil, errors.AssertionFailedf( + "unexpectedly got an IntentValue back from a SQL GetRequest %v", *get.IntentValue, + ) + } + if get.Value == nil { + // Nothing found in this particular response, so we skip it. + f.releaseLastResult(ctx) + return true, nil, nil, nil + } + pos := result.EnqueueKeys[f.lastResultState.numEmitted] + origSpan := f.spans[pos] + f.lastResultState.numEmitted++ + f.numOutstandingRequests-- + return false, []roachpb.KeyValue{{Key: origSpan.Key, Value: *get.Value}}, nil, nil + } + scan := result.ScanResp + if len(scan.BatchResponses) > 0 { + batchResp, f.lastResultState.remainingBatches = scan.BatchResponses[0], scan.BatchResponses[1:] + } + if len(f.lastResultState.remainingBatches) == 0 { + f.processedScanResponse() + } + return false, scan.Rows, batchResp, nil +} + +// processedScanResponse updates the lastResultState before emitting the last +// part of the ScanResponse. This method should be called for each request that +// the ScanResponse satisfies. +func (f *TxnKVStreamer) processedScanResponse() { + f.lastResultState.numEmitted++ + if f.lastResultState.ScanResp.Complete { + f.numOutstandingRequests-- + } +} + +func (f *TxnKVStreamer) releaseLastResult(ctx context.Context) { + f.lastResultState.MemoryTok.Release(ctx) + f.lastResultState.Result = kvstreamer.Result{} +} + +// nextBatch returns the next batch of key/value pairs. If there are none +// available, a fetch is initiated. When there are no more keys, ok is false. +func (f *TxnKVStreamer) nextBatch( + ctx context.Context, +) (ok bool, kvs []roachpb.KeyValue, batchResp []byte, err error) { + if f.numOutstandingRequests == 0 { + // All requests have already been responded to. + f.releaseLastResult(ctx) + return false, nil, nil, nil + } + + // Check whether there are more batches in the current ScanResponse. + if len(f.lastResultState.remainingBatches) > 0 { + batchResp, f.lastResultState.remainingBatches = f.lastResultState.remainingBatches[0], f.lastResultState.remainingBatches[1:] + if len(f.lastResultState.remainingBatches) == 0 { + f.processedScanResponse() + } + return true, nil, batchResp, nil + } + + // Check whether the current result satisfies multiple requests. + if f.lastResultState.numEmitted < len(f.lastResultState.EnqueueKeys) { + // Note that we should never get an error here since we're processing + // the same result again. + _, kvs, batchResp, err = f.proceedWithLastResult(ctx) + return true, kvs, batchResp, err + } + + // Release the current result. + if f.lastResultState.numEmitted == len(f.lastResultState.EnqueueKeys) && f.lastResultState.numEmitted > 0 { + f.releaseLastResult(ctx) + } + + // Process the next result we have already received from the streamer. + for len(f.results) > 0 { + // Peel off the next result and set it into lastResultState. + f.lastResultState.Result = f.results[0] + f.lastResultState.numEmitted = 0 + f.lastResultState.remainingBatches = nil + // Lose the reference to that result and advance the results slice for + // the next iteration. + f.results[0] = kvstreamer.Result{} + f.results = f.results[1:] + var skip bool + skip, kvs, batchResp, err = f.proceedWithLastResult(ctx) + if err != nil { + return false, nil, nil, err + } + if skip { + continue + } + return true, kvs, batchResp, nil + } + + // Get more results from the streamer. This call will block until some + // results are available or we're done. + // + // The memory accounting for the returned results has already been performed + // by the streamer against its own budget, so we don't have to concern + // ourselves with the memory accounting here. + f.results, err = f.streamer.GetResults(ctx) + if len(f.results) == 0 || err != nil { + return false, nil, nil, err + } + return f.nextBatch(ctx) +} + +// close releases the resources of this TxnKVStreamer. +func (f *TxnKVStreamer) close(ctx context.Context) { + if f.lastResultState.MemoryTok != nil { + f.lastResultState.MemoryTok.Release(ctx) + } + for _, r := range f.results { + r.MemoryTok.Release(ctx) + } + *f = TxnKVStreamer{} +} diff --git a/pkg/sql/row/kv_fetcher.go b/pkg/sql/row/kv_fetcher.go index 25eac1ef211a..7c0ed1b164f5 100644 --- a/pkg/sql/row/kv_fetcher.go +++ b/pkg/sql/row/kv_fetcher.go @@ -108,11 +108,18 @@ func NewKVFetcher( return newKVFetcher(&kvBatchFetcher), err } +// NewKVStreamingFetcher returns a new KVFetcher that utilizes the provided +// TxnKVStreamer to perform KV reads. +func NewKVStreamingFetcher(streamer *TxnKVStreamer) *KVFetcher { + return &KVFetcher{ + KVBatchFetcher: streamer, + } +} + func newKVFetcher(batchFetcher KVBatchFetcher) *KVFetcher { - ret := &KVFetcher{ + return &KVFetcher{ KVBatchFetcher: batchFetcher, } - return ret } // GetBytesRead returns the number of bytes read by this fetcher. It is safe for diff --git a/pkg/sql/rowexec/BUILD.bazel b/pkg/sql/rowexec/BUILD.bazel index af23fb59fe02..abd6579165cd 100644 --- a/pkg/sql/rowexec/BUILD.bazel +++ b/pkg/sql/rowexec/BUILD.bazel @@ -45,6 +45,8 @@ go_library( "//pkg/jobs/jobspb", "//pkg/keys", "//pkg/kv", + "//pkg/kv/kvclient/kvstreamer", + "//pkg/kv/kvserver/concurrency/lock", "//pkg/kv/kvserver/kvserverbase", "//pkg/roachpb:with-mocks", "//pkg/server/telemetry", @@ -184,6 +186,7 @@ go_test( "//pkg/util/mon", "//pkg/util/protoutil", "//pkg/util/randutil", + "//pkg/util/stop", "//pkg/util/syncutil", "//pkg/util/timeutil", "//pkg/util/tracing", diff --git a/pkg/sql/rowexec/joinreader.go b/pkg/sql/rowexec/joinreader.go index ffbe7aeaaa2e..7ce029dc6d4b 100644 --- a/pkg/sql/rowexec/joinreader.go +++ b/pkg/sql/rowexec/joinreader.go @@ -12,9 +12,12 @@ package rowexec import ( "context" + "math" "sort" "unsafe" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvstreamer" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql/catalog" @@ -115,6 +118,19 @@ type joinReader struct { shouldLimitBatches bool readerType joinReaderType + keyLocking descpb.ScanLockingStrength + lockWaitPolicy lock.WaitPolicy + + // usesStreamer indicates whether the joinReader performs the lookups using + // the kvcoord.Streamer API. + usesStreamer bool + streamerInfo struct { + *kvstreamer.Streamer + unlimitedMemMonitor *mon.BytesMonitor + budgetAcc mon.BoundAccount + budgetLimit int64 + } + input execinfra.RowSource // lookupCols and lookupExpr (and optionally remoteLookupExpr) represent the @@ -285,6 +301,8 @@ func newJoinReader( if flowCtx.EvalCtx.SessionData().ParallelizeMultiKeyLookupJoinsEnabled { shouldLimitBatches = false } + tryStreamer := row.CanUseStreamer(flowCtx.EvalCtx.Ctx(), flowCtx.EvalCtx.Settings) && !spec.MaintainOrdering + jr := &joinReader{ desc: tableDesc, maintainOrdering: spec.MaintainOrdering, @@ -293,6 +311,9 @@ func newJoinReader( outputGroupContinuationForLeftRow: spec.OutputGroupContinuationForLeftRow, shouldLimitBatches: shouldLimitBatches, readerType: readerType, + keyLocking: spec.LockingStrength, + lockWaitPolicy: row.GetWaitPolicy(spec.LockingWaitPolicy), + usesStreamer: (readerType == indexJoinReaderType) && tryStreamer, lookupBatchBytesLimit: rowinfra.BytesLimit(spec.LookupBatchBytesLimit), } if readerType != indexJoinReaderType { @@ -443,6 +464,32 @@ func newJoinReader( } jr.batchSizeBytes = jr.strategy.getLookupRowsBatchSizeHint(flowCtx.EvalCtx.SessionData()) + if jr.usesStreamer { + maxKeysPerRow, err := jr.desc.KeysPerRow(jr.index.GetID()) + if err != nil { + return nil, err + } + if maxKeysPerRow > 1 { + // Currently, the streamer only supports cases with a single column + // family. + jr.usesStreamer = false + } else { + // jr.batchSizeBytes will be used up by the input batch, and we'll + // give everything else to the streamer budget. Note that + // budgetLimit will always be positive given that memoryLimit is at + // least 8MiB and batchSizeBytes is at most 4MiB. + jr.streamerInfo.budgetLimit = memoryLimit - jr.batchSizeBytes + // We need to use an unlimited monitor for the streamer's budget + // since the streamer itself is responsible for staying under the + // limit. + jr.streamerInfo.unlimitedMemMonitor = mon.NewMonitorInheritWithLimit( + "joinreader-streamer-unlimited" /* name */, math.MaxInt64, flowCtx.EvalCtx.Mon, + ) + jr.streamerInfo.unlimitedMemMonitor.Start(flowCtx.EvalCtx.Ctx(), flowCtx.EvalCtx.Mon, mon.BoundAccount{}) + jr.streamerInfo.budgetAcc = jr.streamerInfo.unlimitedMemMonitor.MakeBoundAccount() + } + } + // TODO(radu): verify the input types match the index key types return jr, nil } @@ -834,24 +881,35 @@ func (jr *joinReader) readInput() ( } log.VEventf(jr.Ctx, 1, "scanning %d spans", len(spans)) - var bytesLimit rowinfra.BytesLimit - if !jr.shouldLimitBatches { - bytesLimit = rowinfra.NoBytesLimit - } else { - bytesLimit = jr.lookupBatchBytesLimit - if jr.lookupBatchBytesLimit == 0 { - bytesLimit = rowinfra.DefaultBatchBytesLimit - } - } // Note that the fetcher takes ownership of the spans slice - it will modify // it and perform the memory accounting. We don't care about the // modification here, but we want to be conscious about the memory // accounting - we don't double count for any memory of spans because the // joinReaderStrategy doesn't account for any memory used by the spans. - if err := jr.fetcher.StartScan( - jr.Ctx, jr.FlowCtx.Txn, spans, bytesLimit, rowinfra.NoRowLimit, - jr.FlowCtx.TraceKV, jr.EvalCtx.TestingKnobs.ForceProductionBatchSizes, - ); err != nil { + if jr.usesStreamer { + var kvBatchFetcher *row.TxnKVStreamer + kvBatchFetcher, err = row.NewTxnKVStreamer(jr.Ctx, jr.streamerInfo.Streamer, spans, jr.keyLocking) + if err != nil { + jr.MoveToDraining(err) + return jrStateUnknown, nil, jr.DrainHelper() + } + err = jr.fetcher.StartScanFrom(jr.Ctx, kvBatchFetcher, jr.FlowCtx.TraceKV) + } else { + var bytesLimit rowinfra.BytesLimit + if !jr.shouldLimitBatches { + bytesLimit = rowinfra.NoBytesLimit + } else { + bytesLimit = jr.lookupBatchBytesLimit + if jr.lookupBatchBytesLimit == 0 { + bytesLimit = rowinfra.DefaultBatchBytesLimit + } + } + err = jr.fetcher.StartScan( + jr.Ctx, jr.FlowCtx.Txn, spans, bytesLimit, rowinfra.NoRowLimit, + jr.FlowCtx.TraceKV, jr.EvalCtx.TestingKnobs.ForceProductionBatchSizes, + ) + } + if err != nil { jr.MoveToDraining(err) return jrStateUnknown, nil, jr.DrainHelper() } @@ -966,6 +1024,21 @@ func (jr *joinReader) performMemoryAccounting() error { func (jr *joinReader) Start(ctx context.Context) { ctx = jr.StartInternal(ctx, joinReaderProcName) jr.input.Start(ctx) + if jr.usesStreamer { + jr.streamerInfo.Streamer = kvstreamer.NewStreamer( + jr.FlowCtx.Cfg.DistSender, + jr.FlowCtx.Stopper(), + jr.FlowCtx.Txn, + jr.FlowCtx.EvalCtx.Settings, + jr.lockWaitPolicy, + jr.streamerInfo.budgetLimit, + &jr.streamerInfo.budgetAcc, + ) + jr.streamerInfo.Streamer.Init( + kvstreamer.OutOfOrder, + kvstreamer.Hints{UniqueRequests: true}, + ) + } jr.runningState = jrReadingInput } @@ -980,6 +1053,16 @@ func (jr *joinReader) close() { if jr.fetcher != nil { jr.fetcher.Close(jr.Ctx) } + if jr.usesStreamer { + // We have to cleanup the streamer after closing the fetcher because + // the latter might release some memory tracked by the budget of the + // streamer. + if jr.streamerInfo.Streamer != nil { + jr.streamerInfo.Streamer.Close() + } + jr.streamerInfo.budgetAcc.Close(jr.Ctx) + jr.streamerInfo.unlimitedMemMonitor.Stop(jr.Ctx) + } jr.strategy.close(jr.Ctx) jr.memAcc.Close(jr.Ctx) if jr.limitedMemMonitor != nil { diff --git a/pkg/sql/rowexec/joinreader_test.go b/pkg/sql/rowexec/joinreader_test.go index fdafdb722511..60a386a3f762 100644 --- a/pkg/sql/rowexec/joinreader_test.go +++ b/pkg/sql/rowexec/joinreader_test.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv" @@ -1314,6 +1315,8 @@ func TestIndexJoiner(t *testing.T) { c.outputTypes, c.expected, txn, + s.Stopper(), + s.DistSenderI().(*kvcoord.DistSender), ) }) } diff --git a/pkg/sql/rowexec/project_set_test.go b/pkg/sql/rowexec/project_set_test.go index ae695d067a11..bb02898a91f2 100644 --- a/pkg/sql/rowexec/project_set_test.go +++ b/pkg/sql/rowexec/project_set_test.go @@ -116,6 +116,8 @@ func TestProjectSet(t *testing.T) { append(c.inputTypes, c.spec.GeneratedColumns...), /* outputTypes */ c.expected, nil, + nil, + nil, ) }) } diff --git a/pkg/sql/rowexec/utils_test.go b/pkg/sql/rowexec/utils_test.go index 45a7a5034dae..fffa1d97d290 100644 --- a/pkg/sql/rowexec/utils_test.go +++ b/pkg/sql/rowexec/utils_test.go @@ -16,6 +16,7 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" @@ -25,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/testutils/distsqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/stop" ) // runProcessorTest instantiates a processor with the provided spec, runs it @@ -38,6 +40,8 @@ func runProcessorTest( outputTypes []*types.T, expected rowenc.EncDatumRows, txn *kv.Txn, + stopper *stop.Stopper, + distSender *kvcoord.DistSender, ) { in := distsqlutils.NewRowBuffer(inputTypes, inputRows, distsqlutils.RowBufferArgs{}) out := &distsqlutils.RowBuffer{} @@ -46,7 +50,7 @@ func runProcessorTest( evalCtx := tree.MakeTestingEvalContext(st) defer evalCtx.Stop(context.Background()) flowCtx := execinfra.FlowCtx{ - Cfg: &execinfra.ServerConfig{Settings: st}, + Cfg: &execinfra.ServerConfig{Settings: st, Stopper: stopper, DistSender: distSender}, EvalCtx: &evalCtx, Txn: txn, }