Skip to content

Commit

Permalink
sql,kv: introduce Streamer API and use it for index joins in some cases
Browse files Browse the repository at this point in the history
This commit introduces the Streamer API (see
https://github.com/cockroachdb/cockroach/blob/master/docs/RFCS/20210617_index_lookups_memory_limits.md)
as well as its implementation for the simplest case - when requests are
unique and can be served in any order. It additionally hooks up the
implementation to be used by the index joins in both execution engines.

There are three main pieces that this commit adds:
1. the `Streamer` struct itself. It is largely the same as described in
the RFC. Some notable changes are:
- `Cancel` is renamed to `Close` and is made blocking to ensure that all
goroutines kicked off by the `Streamer` exit before `Close` returns.
- `Shrink` has been removed from the `budget` struct (see below for
more details).
- furthermore, the `budget` interface has been unexported and the
`streamer` has been tightly coupled with the `budget`'s implementation.
- the TODO about collecting DistSQL metadata is removed because we are
collecting the LeafTxnFinalState already when using the LeafTxn.
2. the limited `Streamer` implementation - only `OutOfOrder` mode is
supported when the requests are unique. Notably, buffering and caching
of the results is not implemented yet.
3. `TxnKVStreamer` component that sits below the SQL fetchers, uses the
`Streamer`, and is an adapter from `BatchResponse`s to key/value pairs
that fetchers understand. Although not used at the moment,
`TxnKVStreamer` is written under the assumption that a single result can
satisfy multiple requests.

The memory budget of the `Streamer` is utilized lazily. The RFC was
calling for making a large reservation upfront and then shrinking the
budget if we see that we don't need that large reservation; however,
while working on this I realized that lazy reservations are a better fit
for this. The `Streamer` can reserve up to the specified limit
(determined by `distsql_workmem` variable) against the root monitor (in
the degenerate case of a single large row more memory will be reserved).
The reservation then never shrinks under the assumption that if the
reservation has gotten large, it means it was needed for higher
concurrency (or large responses), and it is likely to be needed for the
same reasons in the future.

The layout of the main components of the `Streamer` implementation:
- in `Enqueue` we have a logic similar to what DistSender does in order
to split each request (that might span multiple ranges) into
single-range requests. Those sub-requests are batched together to be
evaluated by a single `BatchRequest`.
- `workerCoordinator.mainLoop` is responsible for taking single-range
batches, estimating the corresponding response size, and issuing
requests to be evaluated in parallel while adhering to the provided
memory budget.
- `workerCoordinator.performRequestAsync` is responsible for populating
the `BatchRequest` and then processing the results while updating the
memory budget.

Current known limitations that will be addressed in the follow-up work:
- at the moment a single row can be split across multiple BatchResponses
when TargetBytes limit is reached when the table has multiple column
families; therefore, we use the streamer only for single column family
cases. We will expand the KV API shortly to not split the rows in
multiple column family cases.
- manual refresh of spans when `ReadWithinUncertaintyIntervalError` is
encountered by a single streamer in a single flow is not implemented. It
is an optimization that is considered a must for the final
implementation in order to not regress on simple cases in terms of
retriable errors. This will be implemented shortly as a follow-up.
- I'm thinking that eventually we probably want to disable the batch
splitting done by the DistSender to eliminate unnecessary blocking when
the streamer's splitting was incorrect. This would give us some
performance improvements in face of range boundary changes, but it
doesn't seem important enough for the initial implementation.

Release note: None
  • Loading branch information
yuzefovich committed Jan 12, 2022
1 parent 0404cd9 commit 938a856
Show file tree
Hide file tree
Showing 29 changed files with 2,228 additions and 46 deletions.
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ ALL_TESTS = [
"//pkg/keys:keys_test",
"//pkg/kv/bulk:bulk_test",
"//pkg/kv/kvclient/kvcoord:kvcoord_test",
"//pkg/kv/kvclient/kvstreamer:kvstreamer_test",
"//pkg/kv/kvclient/rangecache:rangecache_test",
"//pkg/kv/kvclient/rangefeed/rangefeedbuffer:rangefeedbuffer_test",
"//pkg/kv/kvclient/rangefeed:rangefeed_test",
Expand Down
12 changes: 6 additions & 6 deletions pkg/kv/kvclient/kvcoord/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (

var emptySpan = roachpb.Span{}

// truncate restricts all requests to the given key range and returns new,
// Truncate restricts all requests to the given key range and returns new,
// truncated, requests. All returned requests are "truncated" to the given span,
// and requests which are found to not overlap the given span at all are
// removed. A mapping of response index to request index is returned. For
Expand All @@ -27,8 +27,8 @@ var emptySpan = roachpb.Span{}
// reqs = Put[a], Put[c], Put[b],
// rs = [a,bb],
//
// then truncate(reqs,rs) returns (Put[a], Put[b]) and positions [0,2].
func truncate(
// then Truncate(reqs,rs) returns (Put[a], Put[b]) and positions [0,2].
func Truncate(
reqs []roachpb.RequestUnion, rs roachpb.RSpan,
) ([]roachpb.RequestUnion, []int, error) {
truncateOne := func(args roachpb.Request) (bool, roachpb.Span, error) {
Expand Down Expand Up @@ -191,18 +191,18 @@ func prev(reqs []roachpb.RequestUnion, k roachpb.RKey) (roachpb.RKey, error) {
return candidate, nil
}

// next gives the left boundary of the union of all requests which don't affect
// Next gives the left boundary of the union of all requests which don't affect
// keys less than the given key. Note that the left boundary is inclusive, that
// is, the returned RKey is the inclusive left endpoint of the keys the request
// should operate on next.
//
// Informally, a call `next(reqs, k)` means: we've already executed the parts of
// Informally, a call `Next(reqs, k)` means: we've already executed the parts of
// `reqs` that intersect `[KeyMin, k)`; please tell me how far to the right the
// next relevant request begins.
//
// TODO(tschottdorf): again, better on BatchRequest itself, but can't pull
// 'keys' into 'proto'.
func next(reqs []roachpb.RequestUnion, k roachpb.RKey) (roachpb.RKey, error) {
func Next(reqs []roachpb.RequestUnion, k roachpb.RKey) (roachpb.RKey, error) {
candidate := roachpb.RKeyMax
for _, union := range reqs {
inner := union.GetInner()
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func TestBatchPrevNext(t *testing.T) {
args.Key, args.EndKey = span.Key, span.EndKey
ba.Add(args)
}
if next, err := next(ba.Requests, roachpb.RKey(test.key)); err != nil {
if next, err := Next(ba.Requests, roachpb.RKey(test.key)); err != nil {
t.Error(err)
} else if !bytes.Equal(next, roachpb.Key(test.expFW)) {
t.Errorf("next: expected %q, got %q", test.expFW, next)
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1318,7 +1318,7 @@ func (ds *DistSender) divideAndSendBatchToRanges(
// one, and unless both descriptors are stale, the next descriptor's
// StartKey would move us to the beginning of the current range,
// resulting in a duplicate scan.
seekKey, err = next(ba.Requests, ri.Desc().EndKey)
seekKey, err = Next(ba.Requests, ri.Desc().EndKey)
nextRS.Key = seekKey
}
if err != nil {
Expand Down Expand Up @@ -1509,7 +1509,7 @@ func (ds *DistSender) sendPartialBatch(
if err != nil {
return response{pErr: roachpb.NewError(err)}
}
ba.Requests, positions, err = truncate(ba.Requests, rs)
ba.Requests, positions, err = Truncate(ba.Requests, rs)
if len(positions) == 0 && err == nil {
// This shouldn't happen in the wild, but some tests exercise it.
return response{
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/truncate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func TestTruncate(t *testing.T) {
t.Errorf("%d: intersection failure: %v", i, err)
continue
}
reqs, pos, err := truncate(original.Requests, rs)
reqs, pos, err := Truncate(original.Requests, rs)
if err != nil || test.err != "" {
if !testutils.IsError(err, test.err) {
t.Errorf("%d: %v (expected: %q)", i, err, test.err)
Expand Down
56 changes: 56 additions & 0 deletions pkg/kv/kvclient/kvstreamer/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "kvstreamer",
srcs = [
"avg_response_estimator.go",
"budget.go",
"streamer.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvstreamer",
visibility = ["//visibility:public"],
deps = [
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvclient/kvcoord:with-mocks",
"//pkg/kv/kvserver/concurrency/lock",
"//pkg/roachpb:with-mocks",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/util/admission",
"//pkg/util/mon",
"//pkg/util/quotapool",
"//pkg/util/stop",
"//pkg/util/syncutil",
"@com_github_cockroachdb_errors//:errors",
],
)

go_test(
name = "kvstreamer_test",
srcs = [
"avg_response_estimator_test.go",
"main_test.go",
"streamer_test.go",
],
embed = [":kvstreamer"],
deps = [
"//pkg/base",
"//pkg/kv",
"//pkg/kv/kvclient/kvcoord:with-mocks",
"//pkg/kv/kvserver/concurrency/lock",
"//pkg/roachpb:with-mocks",
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/mon",
"//pkg/util/randutil",
"@com_github_stretchr_testify//require",
],
)
42 changes: 42 additions & 0 deletions pkg/kv/kvclient/kvstreamer/avg_response_estimator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvstreamer

// avgResponseEstimator is a helper that estimates the average size of responses
// received by the Streamer. It is **not** thread-safe.
type avgResponseEstimator struct {
// responseBytes tracks the total footprint of all responses that the
// Streamer has already received.
responseBytes int64
numResponses int64
}

// TODO(yuzefovich): use the optimizer-driven estimates.
const initialAvgResponseSize = 1 << 10 // 1KiB

func (e *avgResponseEstimator) getAvgResponseSize() int64 {
if e.numResponses == 0 {
return initialAvgResponseSize
}
// TODO(yuzefovich): we currently use a simple average over the received
// responses, but it is likely to be suboptimal because it would be unfair
// to "large" batches that come in late (i.e. it would not be reactive
// enough). Consider using another function here.
return e.responseBytes / e.numResponses
}

// update updates the actual information of the estimator based on numResponses
// responses that took up responseBytes bytes and correspond to a single
// BatchResponse.
func (e *avgResponseEstimator) update(responseBytes int64, numResponses int64) {
e.responseBytes += responseBytes
e.numResponses += numResponses
}
56 changes: 56 additions & 0 deletions pkg/kv/kvclient/kvstreamer/avg_response_estimator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvstreamer

import (
"math"
"testing"

"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/require"
)

func TestAvgResponseEstimator(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

var e avgResponseEstimator

// Before receiving any responses, we should be using the initial estimate.
require.Equal(t, int64(initialAvgResponseSize), e.getAvgResponseSize())

// Simulate receiving a single response.
firstResponseSize := int64(42)
e.update(firstResponseSize, 1)
// The estimate should now be exactly the size of that single response.
require.Equal(t, firstResponseSize, e.getAvgResponseSize())

// Simulate receiving 100 small BatchResponses.
smallResponseSize := int64(63)
for i := 0; i < 100; i++ {
e.update(smallResponseSize*5, 5)
}
// The estimate should now be pretty close to the size of a single response
// in the small BatchResponse.
diff := smallResponseSize - e.getAvgResponseSize()
require.True(t, math.Abs(float64(diff))/float64(smallResponseSize) < 0.05)

// Now simulate receiving 10 large BatchResponses.
largeResponseSize := int64(17)
for i := 0; i < 10; i++ {
e.update(largeResponseSize*1000, 1000)
}
// The estimate should now be pretty close to the size of a single response
// in the large BatchResponse.
diff = largeResponseSize - e.getAvgResponseSize()
require.True(t, math.Abs(float64(diff))/float64(smallResponseSize) < 0.15)
}
124 changes: 124 additions & 0 deletions pkg/kv/kvclient/kvstreamer/budget.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvstreamer

import (
"context"

"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)

// budget abstracts the memory budget that is provided to the Streamer by its
// client.
//
// This struct is a wrapper on top of mon.BoundAccount because we want to
// support the notion of budget "going in debt". This can occur in a degenerate
// case when a single large row exceeds the provided limit. The Streamer is
// expected to have only a single request in flight in this case. Additionally,
// the budget provides blocking (via waitCh) until it gets out of debt.
type budget struct {
mu struct {
syncutil.Mutex
// acc represents the current reservation of this budget against the
// root memory pool.
acc *mon.BoundAccount
}
// limitBytes is the maximum amount of bytes that this budget should reserve
// against acc, i.e. acc.Used() should not exceed limitBytes. However, in a
// degenerate case of a single large row, the budget can go into debt and
// acc.Used() might exceed limitBytes.
limitBytes int64
// waitCh is used by the main loop of the workerCoordinator to block until
// available() becomes positive (until some release calls occur).
waitCh chan struct{}
}

// newBudget creates a new budget with the specified limit. The limit determines
// the maximum amount of memory this budget is allowed to use (i.e. it'll be
// used lazily, as needed).
//
// The budget itself is responsible for staying under the limit, so acc should
// be bound to an unlimited memory monitor. This is needed in order to support
// the case of budget going into debt. Note that although it is an "unlimited
// memory monitor", the monitor is still limited by --max-sql-memory in size
// eventually because all monitors are descendants of the root SQL monitor.
//
// The budget takes ownership of the memory account, and the caller is allowed
// to interact with the account only after canceling the Streamer (because
// memory accounts are not thread-safe).
func newBudget(acc *mon.BoundAccount, limitBytes int64) *budget {
b := budget{
limitBytes: limitBytes,
waitCh: make(chan struct{}),
}
b.mu.acc = acc
return &b
}

// available returns how many bytes are currently available in the budget. The
// answer can be negative, in case the Streamer has used un-budgeted memory
// (e.g. one result was very large putting the budget in debt).
//
// Note that it's possible that actually available budget is less than the
// number returned - this might occur if --max-sql-memory root pool is almost
// used up.
func (b *budget) available() int64 {
b.mu.Lock()
defer b.mu.Unlock()
return b.limitBytes - b.mu.acc.Used()
}

// consume draws bytes from the available budget. An error is returned if the
// root pool budget is used up such that the budget's limit cannot be fully
// reserved.
// - allowDebt indicates whether the budget is allowed to go into debt on this
// consumption. In other words, if allowDebt is true, then acc's reservation is
// allowed to exceed limitBytes (but the error is still returned if the root
// pool budget is exceeded). Note that allowDebt value applies only to this
// consume() call and is not carried forward.
//
// b's mutex should not be held when calling this method.
func (b *budget) consume(ctx context.Context, bytes int64, allowDebt bool) error {
b.mu.Lock()
defer b.mu.Unlock()
return b.consumeLocked(ctx, bytes, allowDebt)
}

// consumeLocked is the same as consume but assumes that the b's lock is held.
func (b *budget) consumeLocked(ctx context.Context, bytes int64, allowDebt bool) error {
b.mu.AssertHeld()
// If we're asked to not exceed the limit (and the limit is greater than
// five bytes - limits of five bytes or less are treated as a special case
// for "forced disk spilling" scenarios like in logic tests), we have to
// check whether we'll stay within the budget.
if !allowDebt && b.limitBytes > 5 {
if b.mu.acc.Used()+bytes > b.limitBytes {
return mon.MemoryResource.NewBudgetExceededError(bytes, b.mu.acc.Used(), b.limitBytes)
}
}
return b.mu.acc.Grow(ctx, bytes)
}

// release returns bytes to the available budget.
func (b *budget) release(ctx context.Context, bytes int64) {
b.mu.Lock()
defer b.mu.Unlock()
b.mu.acc.Shrink(ctx, bytes)
if b.limitBytes > b.mu.acc.Used() {
// Since we now have some available budget, we non-blockingly send on
// the wait channel to notify the mainCoordinator about it.
select {
case b.waitCh <- struct{}{}:
default:
}
}
}
31 changes: 31 additions & 0 deletions pkg/kv/kvclient/kvstreamer/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvstreamer_test

import (
"os"
"testing"

"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/security/securitytest"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
)

//go:generate ../../../util/leaktest/add-leaktest.sh *_test.go

func TestMain(m *testing.M) {
security.SetAssetLoader(securitytest.EmbeddedAssets)
randutil.SeedForTests()
serverutils.InitTestServerFactory(server.TestServerFactory)
os.Exit(m.Run())
}
Loading

0 comments on commit 938a856

Please sign in to comment.