Skip to content

Commit

Permalink
Merge pull request cockroachdb#114145 from yuzefovich/backport22.2-11…
Browse files Browse the repository at this point in the history
…3809
  • Loading branch information
yuzefovich authored Nov 13, 2023
2 parents 0cfb560 + 94d256a commit 579da0f
Show file tree
Hide file tree
Showing 15 changed files with 275 additions and 12 deletions.
2 changes: 2 additions & 0 deletions pkg/kv/kvclient/kvstreamer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go_library(
"//pkg/roachpb",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql/sessiondata",
"//pkg/util",
"//pkg/util/admission",
"//pkg/util/admission/admissionpb",
Expand Down Expand Up @@ -65,6 +66,7 @@ go_test(
"//pkg/sql/rowcontainer",
"//pkg/storage",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/util/hlc",
"//pkg/util/leaktest",
Expand Down
5 changes: 3 additions & 2 deletions pkg/kv/kvclient/kvstreamer/avg_response_estimator.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ type avgResponseEstimator struct {
const (
// TODO(yuzefovich): use the optimizer-driven estimates.
initialAvgResponseSize = 1 << 10 // 1KiB
// This value was determined using tpchvec/bench test on all TPC-H queries.
defaultAvgResponseSizeMultiple = 1.5
// This value was determined using tpchvec/bench test on all TPC-H queries
// as well as the query in TestStreamerVaryingResponseSizes.
defaultAvgResponseSizeMultiple = 3.0
)

// streamerAvgResponseSizeMultiple determines the multiple used when calculating
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvclient/kvstreamer/avg_response_estimator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@ func TestAvgResponseSizeForPartialResponses(t *testing.T) {
}
// We started with the TargetBytes equal to the initial estimate of 1KiB,
// and then with each update the estimate should have grown. In particular,
// we expect 7 BatchRequests total that fetch 1, 1, 3, 7, 18, 45, 25 rows
// respectively (note that the growth is faster than 2x because we use 1.5
// multiple on top of the average).
require.Equal(t, 7, batchRequestsCount)
// we expect 5 BatchRequests total that fetch 1, 3, 12, 48, 36 rows
// respectively (note that the growth is 3-4x because we use 3.0 multiple on
// top of the average).
require.Equal(t, 5, batchRequestsCount)
// From the perspective of the response estimator, we received only one
// response (that happened to be paginated across BatchRequests), so our
// estimate should match exactly the total size.
Expand Down
87 changes: 81 additions & 6 deletions pkg/kv/kvclient/kvstreamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
Expand Down Expand Up @@ -219,12 +220,23 @@ func (r Result) Release(ctx context.Context) {
type Streamer struct {
distSender *kvcoord.DistSender
stopper *stop.Stopper
// sd can be nil in tests.
sd *sessiondata.SessionData

mode OperationMode
hints Hints
maxKeysPerRow int32
budget *budget
keyLocking lock.Strength
// eagerMemUsageLimitBytes determines the maximum memory used from the
// budget at which point the streamer stops sending non-head-of-the-line
// requests eagerly.
eagerMemUsageLimitBytes int64
// headOfLineOnlyFraction controls the fraction of the available streamer's
// memory budget that will be used to set the TargetBytes limit on
// head-of-the-line request in case the "eager" memory usage limit has been
// exceeded. In such case, only head-of-the-line request will be sent.
headOfLineOnlyFraction float64
budget *budget
keyLocking lock.Strength

streamerStatistics

Expand Down Expand Up @@ -353,13 +365,16 @@ func max(a, b int64) int64 {
// to interact with the account only after canceling the Streamer (because
// memory accounts are not thread-safe).
//
// sd can be nil in tests in which case some reasonable defaults will be used.
//
// batchRequestsIssued should be incremented every time a new BatchRequest is
// sent.
func NewStreamer(
distSender *kvcoord.DistSender,
stopper *stop.Stopper,
txn *kv.Txn,
st *cluster.Settings,
sd *sessiondata.SessionData,
lockWaitPolicy lock.WaitPolicy,
limitBytes int64,
acc *mon.BoundAccount,
Expand All @@ -369,11 +384,18 @@ func NewStreamer(
if txn.Type() != kv.LeafTxn {
panic(errors.AssertionFailedf("RootTxn is given to the Streamer"))
}
// sd can be nil in tests.
headOfLineOnlyFraction := 0.8
if sd != nil {
headOfLineOnlyFraction = sd.StreamerHeadOfLineOnlyFraction
}
s := &Streamer{
distSender: distSender,
stopper: stopper,
budget: newBudget(acc, limitBytes),
keyLocking: keyLocking,
distSender: distSender,
stopper: stopper,
sd: sd,
headOfLineOnlyFraction: headOfLineOnlyFraction,
budget: newBudget(acc, limitBytes),
keyLocking: keyLocking,
}
if batchRequestsIssued == nil {
batchRequestsIssued = new(int64)
Expand Down Expand Up @@ -411,12 +433,29 @@ func (s *Streamer) Init(
mode OperationMode, hints Hints, maxKeysPerRow int, diskBuffer ResultDiskBuffer,
) {
s.mode = mode
// s.sd can be nil in tests, so use almost all the budget eagerly then.
eagerFraction := 0.9
if mode == OutOfOrder {
s.requestsToServe = newOutOfOrderRequestsProvider()
s.results = newOutOfOrderResultsBuffer(s.budget)
if s.sd != nil {
eagerFraction = s.sd.StreamerOutOfOrderEagerMemoryUsageFraction
}
} else {
s.requestsToServe = newInOrderRequestsProvider()
s.results = newInOrderResultsBuffer(s.budget, diskBuffer)
if s.sd != nil {
eagerFraction = s.sd.StreamerInOrderEagerMemoryUsageFraction
}
}
s.eagerMemUsageLimitBytes = int64(math.Ceil(float64(s.budget.limitBytes) * eagerFraction))
// Ensure some reasonable lower bound.
const minEagerMemUsage = 10 << 10 // 10KiB
if s.eagerMemUsageLimitBytes <= 0 {
// Protect from overflow.
s.eagerMemUsageLimitBytes = math.MaxInt64
} else if s.eagerMemUsageLimitBytes < minEagerMemUsage {
s.eagerMemUsageLimitBytes = minEagerMemUsage
}
if !hints.UniqueRequests {
panic(errors.AssertionFailedf("only unique requests are currently supported"))
Expand Down Expand Up @@ -1034,6 +1073,30 @@ func (w *workerCoordinator) issueRequestsForAsyncProcessing(
)
var budgetIsExhausted bool
for !w.s.requestsToServe.emptyLocked() && maxNumRequestsToIssue > 0 && !budgetIsExhausted {
if !headOfLine && w.s.budget.mu.acc.Used() > w.s.eagerMemUsageLimitBytes {
// The next batch is not head-of-the-line, and the budget has used
// up more than eagerMemUsageLimitBytes bytes. At this point, we
// stop issuing "eager" requests, so we just exit.
//
// This exit will not lead to the streamer deadlocking because we
// already have at least one batch to serve, and at some point we'll
// get into this loop with headOfLine=true, so we'll always be
// issuing at least one batch, eventually.
//
// This behavior is helpful to prevent pathological behavior
// observed in #113729. Namely, if we issue too many batches eagerly
// in the InOrder mode, the buffered responses might consume most of
// our memory budget, and at some point we might regress to
// processing requests one batch with a single Get / Scan request at
// a time. We don't want to just drop already received responses
// (we've already discarded the original singleRangeBatches anyway),
// so this mechanism allows us to preserve a fraction of the budget
// to processing head-of-the-line batches.
//
// Similar pattern can occur in the OutOfOrder mode too although the
// degradation is not as severe as in the InOrder mode.
return nil
}
singleRangeReqs := w.s.requestsToServe.nextLocked()
availableBudget := w.s.budget.limitBytes - w.s.budget.mu.acc.Used()
// minTargetBytes is the minimum TargetBytes limit with which it makes
Expand Down Expand Up @@ -1099,6 +1162,18 @@ func (w *workerCoordinator) issueRequestsForAsyncProcessing(
if targetBytes < singleRangeReqs.minTargetBytes {
targetBytes = singleRangeReqs.minTargetBytes
}
if headOfLine && w.s.budget.mu.acc.Used() > w.s.eagerMemUsageLimitBytes {
// Given that the eager memory usage limit has already been
// exceeded, we won't issue any more requests for now, so rather
// than use the estimate on the response size, this head-of-the-line
// batch will use most of the available budget, as controlled by the
// session variable.
if headOfLineOnly := int64(float64(availableBudget) * w.s.headOfLineOnlyFraction); headOfLineOnly > targetBytes {
targetBytes = headOfLineOnly
// Ensure that we won't issue any more requests for now.
budgetIsExhausted = true
}
}
if targetBytes+responsesOverhead > availableBudget {
// We don't have enough budget to account for both the TargetBytes
// limit and the overhead of the responses. We give higher
Expand Down
66 changes: 66 additions & 0 deletions pkg/kv/kvclient/kvstreamer/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ import (
gosql "database/sql"
"fmt"
"math"
"regexp"
"strconv"
"strings"
"sync"
"testing"

Expand All @@ -25,6 +28,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
Expand All @@ -43,6 +48,7 @@ func getStreamer(
s.Stopper(),
kv.NewLeafTxn(ctx, s.DB(), s.NodeID(), rootTxn.GetLeafTxnInputState(ctx)),
cluster.MakeTestingClusterSettings(),
nil, /* sd */
lock.WaitPolicy(0),
limitBytes,
acc,
Expand Down Expand Up @@ -95,6 +101,7 @@ func TestStreamerLimitations(t *testing.T) {
s.Stopper(),
kv.NewTxn(ctx, s.DB(), s.NodeID()),
cluster.MakeTestingClusterSettings(),
nil, /* sd */
lock.WaitPolicy(0),
math.MaxInt64, /* limitBytes */
nil, /* acc */
Expand Down Expand Up @@ -633,3 +640,62 @@ func TestStreamerMemoryAccounting(t *testing.T) {
})
}
}

// TestStreamerVaryingResponseSizes verifies that the Streamer handles the
// responses of vastly variable sizes reasonably well. It is a regression test
// for #113729.
func TestStreamerVaryingResponseSizes(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderStress(t, "the test is too memory intensive")
skip.UnderRace(t, "the test is too memory intensive")

// Start a cluster with large --max-sql-memory parameter since we'll be
// inserting relatively large amount of data.
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{
SQLMemoryPoolSize: 1 << 30, /* 1GiB */
})
defer s.Stopper().Stop(context.Background())

runner := sqlutils.MakeSQLRunner(db)
// Create a table with 10 ranges, with 3k rows in each. Within each range,
// first 1k rows are relatively small, then next 1k rows are medium, and
// the last 1k rows are large.
runner.Exec(t, `
CREATE TABLE t (
k INT PRIMARY KEY,
v STRING,
blob STRING,
INDEX t_v_idx (v ASC)
);
INSERT INTO t SELECT 3000 * (i // 1000) + i % 1000, '1', repeat('a', 600 + i % 200) FROM generate_series(1, 10000) AS g(i);
INSERT INTO t SELECT 3000 * (i // 1000) + i % 1000 + 1000, '1', repeat('a', 3000 + i % 1000) FROM generate_series(1, 10000) AS g(i);
INSERT INTO t SELECT 3000 * (i // 1000) + i % 1000 + 2000, '1', repeat('a', 20000 + i) FROM generate_series(1, 10000) AS g(i);
ALTER TABLE t SPLIT AT SELECT generate_series(1, 30000, 3000);
`)

// The meat of the test - run the query that performs an index join to fetch
// all rows via the streamer, both in the OutOfOrder and InOrder modes. Each
// time assert that the number of BatchRequests issued is in double digits
// (if not, then the streamer was extremely suboptimal).
kvGRPCCallsRegex := regexp.MustCompile(`KV gRPC calls: (\d+,)`)
for inOrder := range []bool{false, true} {
runner.Exec(t, `SET streamer_always_maintain_ordering = $1;`, inOrder)
for i := 0; i < 2; i++ {
var gRPCCalls int
var err error
rows := runner.QueryStr(t, `EXPLAIN ANALYZE SELECT length(blob) FROM t@t_v_idx WHERE v = '1';`)
for _, row := range rows {
if matches := kvGRPCCallsRegex.FindStringSubmatch(row[0]); len(matches) > 0 {
gRPCCalls, err = strconv.Atoi(strings.ReplaceAll(matches[1], ",", ""))
require.NoError(t, err)
break
}
}
require.Greater(t, 100, gRPCCalls, rows)
}
}
}
1 change: 1 addition & 0 deletions pkg/sql/colfetcher/index_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,7 @@ func NewColIndexJoin(
flowCtx.Stopper(),
txn,
flowCtx.EvalCtx.Settings,
flowCtx.EvalCtx.SessionData(),
spec.LockingWaitPolicy,
spec.LockingStrength,
streamerBudgetLimit,
Expand Down
12 changes: 12 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3419,6 +3419,18 @@ func (m *sessionDataMutator) SetStreamerAlwaysMaintainOrdering(val bool) {
m.data.StreamerAlwaysMaintainOrdering = val
}

func (m *sessionDataMutator) SetStreamerInOrderEagerMemoryUsageFraction(val float64) {
m.data.StreamerInOrderEagerMemoryUsageFraction = val
}

func (m *sessionDataMutator) SetStreamerOutOfOrderEagerMemoryUsageFraction(val float64) {
m.data.StreamerOutOfOrderEagerMemoryUsageFraction = val
}

func (m *sessionDataMutator) SetStreamerHeadOfLineOnlyFraction(val float64) {
m.data.StreamerHeadOfLineOnlyFraction = val
}

func (m *sessionDataMutator) SetUnboundedParallelScans(val bool) {
m.data.UnboundedParallelScans = val
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/information_schema
Original file line number Diff line number Diff line change
Expand Up @@ -4838,6 +4838,9 @@ standard_conforming_strings on
statement_timeout 0
streamer_always_maintain_ordering off
streamer_enabled on
streamer_head_of_line_only_fraction 0.8
streamer_in_order_eager_memory_usage_fraction 0.5
streamer_out_of_order_eager_memory_usage_fraction 0.8
stub_catalog_tables on
synchronize_seqscans on
synchronous_commit on
Expand Down
9 changes: 9 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/pg_catalog
Original file line number Diff line number Diff line change
Expand Up @@ -2880,6 +2880,9 @@ standard_conforming_strings on N
statement_timeout 0 NULL NULL NULL string
streamer_always_maintain_ordering off NULL NULL NULL string
streamer_enabled on NULL NULL NULL string
streamer_head_of_line_only_fraction 0.8 NULL NULL NULL string
streamer_in_order_eager_memory_usage_fraction 0.5 NULL NULL NULL string
streamer_out_of_order_eager_memory_usage_fraction 0.8 NULL NULL NULL string
stub_catalog_tables on NULL NULL NULL string
synchronize_seqscans on NULL NULL NULL string
synchronous_commit on NULL NULL NULL string
Expand Down Expand Up @@ -3030,6 +3033,9 @@ standard_conforming_strings on N
statement_timeout 0 NULL user NULL 0s 0s
streamer_always_maintain_ordering off NULL user NULL off off
streamer_enabled on NULL user NULL on on
streamer_head_of_line_only_fraction 0.8 NULL user NULL 0.8 0.8
streamer_in_order_eager_memory_usage_fraction 0.5 NULL user NULL 0.5 0.5
streamer_out_of_order_eager_memory_usage_fraction 0.8 NULL user NULL 0.8 0.8
stub_catalog_tables on NULL user NULL on on
synchronize_seqscans on NULL user NULL on on
synchronous_commit on NULL user NULL on on
Expand Down Expand Up @@ -3179,6 +3185,9 @@ standard_conforming_strings NULL NULL NULL
statement_timeout NULL NULL NULL NULL NULL
streamer_always_maintain_ordering NULL NULL NULL NULL NULL
streamer_enabled NULL NULL NULL NULL NULL
streamer_head_of_line_only_fraction NULL NULL NULL NULL NULL
streamer_in_order_eager_memory_usage_fraction NULL NULL NULL NULL NULL
streamer_out_of_order_eager_memory_usage_fraction NULL NULL NULL NULL NULL
stub_catalog_tables NULL NULL NULL NULL NULL
synchronize_seqscans NULL NULL NULL NULL NULL
synchronous_commit NULL NULL NULL NULL NULL
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/show_source
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ standard_conforming_strings on
statement_timeout 0
streamer_always_maintain_ordering off
streamer_enabled on
streamer_head_of_line_only_fraction 0.8
streamer_in_order_eager_memory_usage_fraction 0.5
streamer_out_of_order_eager_memory_usage_fraction 0.8
stub_catalog_tables on
synchronize_seqscans on
synchronous_commit on
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/row/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ go_library(
"//pkg/sql/sem/transform",
"//pkg/sql/sem/tree",
"//pkg/sql/sem/volatility",
"//pkg/sql/sessiondata",
"//pkg/sql/sessiondatapb",
"//pkg/sql/span",
"//pkg/sql/sqlerrors",
Expand Down
Loading

0 comments on commit 579da0f

Please sign in to comment.