diff --git a/pkg/kv/kvclient/kvstreamer/BUILD.bazel b/pkg/kv/kvclient/kvstreamer/BUILD.bazel index 95d22fef48db..15171854663a 100644 --- a/pkg/kv/kvclient/kvstreamer/BUILD.bazel +++ b/pkg/kv/kvclient/kvstreamer/BUILD.bazel @@ -22,6 +22,7 @@ go_library( "//pkg/roachpb", "//pkg/settings", "//pkg/settings/cluster", + "//pkg/sql/sessiondata", "//pkg/util", "//pkg/util/admission", "//pkg/util/admission/admissionpb", @@ -65,6 +66,7 @@ go_test( "//pkg/sql/rowcontainer", "//pkg/storage", "//pkg/testutils/serverutils", + "//pkg/testutils/skip", "//pkg/testutils/sqlutils", "//pkg/util/hlc", "//pkg/util/leaktest", diff --git a/pkg/kv/kvclient/kvstreamer/avg_response_estimator.go b/pkg/kv/kvclient/kvstreamer/avg_response_estimator.go index eea8bfe8578b..2868afe16ac2 100644 --- a/pkg/kv/kvclient/kvstreamer/avg_response_estimator.go +++ b/pkg/kv/kvclient/kvstreamer/avg_response_estimator.go @@ -32,8 +32,9 @@ type avgResponseEstimator struct { const ( // TODO(yuzefovich): use the optimizer-driven estimates. initialAvgResponseSize = 1 << 10 // 1KiB - // This value was determined using tpchvec/bench test on all TPC-H queries. - defaultAvgResponseSizeMultiple = 1.5 + // This value was determined using tpchvec/bench test on all TPC-H queries + // as well as the query in TestStreamerVaryingResponseSizes. + defaultAvgResponseSizeMultiple = 3.0 ) // streamerAvgResponseSizeMultiple determines the multiple used when calculating diff --git a/pkg/kv/kvclient/kvstreamer/avg_response_estimator_test.go b/pkg/kv/kvclient/kvstreamer/avg_response_estimator_test.go index 61403860097d..3e36efefbc94 100644 --- a/pkg/kv/kvclient/kvstreamer/avg_response_estimator_test.go +++ b/pkg/kv/kvclient/kvstreamer/avg_response_estimator_test.go @@ -85,10 +85,10 @@ func TestAvgResponseSizeForPartialResponses(t *testing.T) { } // We started with the TargetBytes equal to the initial estimate of 1KiB, // and then with each update the estimate should have grown. In particular, - // we expect 7 BatchRequests total that fetch 1, 1, 3, 7, 18, 45, 25 rows - // respectively (note that the growth is faster than 2x because we use 1.5 - // multiple on top of the average). - require.Equal(t, 7, batchRequestsCount) + // we expect 5 BatchRequests total that fetch 1, 3, 12, 48, 36 rows + // respectively (note that the growth is 3-4x because we use 3.0 multiple on + // top of the average). + require.Equal(t, 5, batchRequestsCount) // From the perspective of the response estimator, we received only one // response (that happened to be paginated across BatchRequests), so our // estimate should match exactly the total size. diff --git a/pkg/kv/kvclient/kvstreamer/streamer.go b/pkg/kv/kvclient/kvstreamer/streamer.go index e69a14cf3ce3..fc3e4b4e054f 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer.go +++ b/pkg/kv/kvclient/kvstreamer/streamer.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" @@ -219,12 +220,23 @@ func (r Result) Release(ctx context.Context) { type Streamer struct { distSender *kvcoord.DistSender stopper *stop.Stopper + // sd can be nil in tests. + sd *sessiondata.SessionData mode OperationMode hints Hints maxKeysPerRow int32 - budget *budget - keyLocking lock.Strength + // eagerMemUsageLimitBytes determines the maximum memory used from the + // budget at which point the streamer stops sending non-head-of-the-line + // requests eagerly. + eagerMemUsageLimitBytes int64 + // headOfLineOnlyFraction controls the fraction of the available streamer's + // memory budget that will be used to set the TargetBytes limit on + // head-of-the-line request in case the "eager" memory usage limit has been + // exceeded. In such case, only head-of-the-line request will be sent. + headOfLineOnlyFraction float64 + budget *budget + keyLocking lock.Strength streamerStatistics @@ -353,6 +365,8 @@ func max(a, b int64) int64 { // to interact with the account only after canceling the Streamer (because // memory accounts are not thread-safe). // +// sd can be nil in tests in which case some reasonable defaults will be used. +// // batchRequestsIssued should be incremented every time a new BatchRequest is // sent. func NewStreamer( @@ -360,6 +374,7 @@ func NewStreamer( stopper *stop.Stopper, txn *kv.Txn, st *cluster.Settings, + sd *sessiondata.SessionData, lockWaitPolicy lock.WaitPolicy, limitBytes int64, acc *mon.BoundAccount, @@ -369,11 +384,18 @@ func NewStreamer( if txn.Type() != kv.LeafTxn { panic(errors.AssertionFailedf("RootTxn is given to the Streamer")) } + // sd can be nil in tests. + headOfLineOnlyFraction := 0.8 + if sd != nil { + headOfLineOnlyFraction = sd.StreamerHeadOfLineOnlyFraction + } s := &Streamer{ - distSender: distSender, - stopper: stopper, - budget: newBudget(acc, limitBytes), - keyLocking: keyLocking, + distSender: distSender, + stopper: stopper, + sd: sd, + headOfLineOnlyFraction: headOfLineOnlyFraction, + budget: newBudget(acc, limitBytes), + keyLocking: keyLocking, } if batchRequestsIssued == nil { batchRequestsIssued = new(int64) @@ -411,12 +433,29 @@ func (s *Streamer) Init( mode OperationMode, hints Hints, maxKeysPerRow int, diskBuffer ResultDiskBuffer, ) { s.mode = mode + // s.sd can be nil in tests, so use almost all the budget eagerly then. + eagerFraction := 0.9 if mode == OutOfOrder { s.requestsToServe = newOutOfOrderRequestsProvider() s.results = newOutOfOrderResultsBuffer(s.budget) + if s.sd != nil { + eagerFraction = s.sd.StreamerOutOfOrderEagerMemoryUsageFraction + } } else { s.requestsToServe = newInOrderRequestsProvider() s.results = newInOrderResultsBuffer(s.budget, diskBuffer) + if s.sd != nil { + eagerFraction = s.sd.StreamerInOrderEagerMemoryUsageFraction + } + } + s.eagerMemUsageLimitBytes = int64(math.Ceil(float64(s.budget.limitBytes) * eagerFraction)) + // Ensure some reasonable lower bound. + const minEagerMemUsage = 10 << 10 // 10KiB + if s.eagerMemUsageLimitBytes <= 0 { + // Protect from overflow. + s.eagerMemUsageLimitBytes = math.MaxInt64 + } else if s.eagerMemUsageLimitBytes < minEagerMemUsage { + s.eagerMemUsageLimitBytes = minEagerMemUsage } if !hints.UniqueRequests { panic(errors.AssertionFailedf("only unique requests are currently supported")) @@ -1034,6 +1073,30 @@ func (w *workerCoordinator) issueRequestsForAsyncProcessing( ) var budgetIsExhausted bool for !w.s.requestsToServe.emptyLocked() && maxNumRequestsToIssue > 0 && !budgetIsExhausted { + if !headOfLine && w.s.budget.mu.acc.Used() > w.s.eagerMemUsageLimitBytes { + // The next batch is not head-of-the-line, and the budget has used + // up more than eagerMemUsageLimitBytes bytes. At this point, we + // stop issuing "eager" requests, so we just exit. + // + // This exit will not lead to the streamer deadlocking because we + // already have at least one batch to serve, and at some point we'll + // get into this loop with headOfLine=true, so we'll always be + // issuing at least one batch, eventually. + // + // This behavior is helpful to prevent pathological behavior + // observed in #113729. Namely, if we issue too many batches eagerly + // in the InOrder mode, the buffered responses might consume most of + // our memory budget, and at some point we might regress to + // processing requests one batch with a single Get / Scan request at + // a time. We don't want to just drop already received responses + // (we've already discarded the original singleRangeBatches anyway), + // so this mechanism allows us to preserve a fraction of the budget + // to processing head-of-the-line batches. + // + // Similar pattern can occur in the OutOfOrder mode too although the + // degradation is not as severe as in the InOrder mode. + return nil + } singleRangeReqs := w.s.requestsToServe.nextLocked() availableBudget := w.s.budget.limitBytes - w.s.budget.mu.acc.Used() // minTargetBytes is the minimum TargetBytes limit with which it makes @@ -1099,6 +1162,18 @@ func (w *workerCoordinator) issueRequestsForAsyncProcessing( if targetBytes < singleRangeReqs.minTargetBytes { targetBytes = singleRangeReqs.minTargetBytes } + if headOfLine && w.s.budget.mu.acc.Used() > w.s.eagerMemUsageLimitBytes { + // Given that the eager memory usage limit has already been + // exceeded, we won't issue any more requests for now, so rather + // than use the estimate on the response size, this head-of-the-line + // batch will use most of the available budget, as controlled by the + // session variable. + if headOfLineOnly := int64(float64(availableBudget) * w.s.headOfLineOnlyFraction); headOfLineOnly > targetBytes { + targetBytes = headOfLineOnly + // Ensure that we won't issue any more requests for now. + budgetIsExhausted = true + } + } if targetBytes+responsesOverhead > availableBudget { // We don't have enough budget to account for both the TargetBytes // limit and the overhead of the responses. We give higher diff --git a/pkg/kv/kvclient/kvstreamer/streamer_test.go b/pkg/kv/kvclient/kvstreamer/streamer_test.go index b5576703e8e9..f5c7fb6a54f0 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer_test.go +++ b/pkg/kv/kvclient/kvstreamer/streamer_test.go @@ -15,6 +15,9 @@ import ( gosql "database/sql" "fmt" "math" + "regexp" + "strconv" + "strings" "sync" "testing" @@ -25,6 +28,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" @@ -43,6 +48,7 @@ func getStreamer( s.Stopper(), kv.NewLeafTxn(ctx, s.DB(), s.NodeID(), rootTxn.GetLeafTxnInputState(ctx)), cluster.MakeTestingClusterSettings(), + nil, /* sd */ lock.WaitPolicy(0), limitBytes, acc, @@ -95,6 +101,7 @@ func TestStreamerLimitations(t *testing.T) { s.Stopper(), kv.NewTxn(ctx, s.DB(), s.NodeID()), cluster.MakeTestingClusterSettings(), + nil, /* sd */ lock.WaitPolicy(0), math.MaxInt64, /* limitBytes */ nil, /* acc */ @@ -633,3 +640,62 @@ func TestStreamerMemoryAccounting(t *testing.T) { }) } } + +// TestStreamerVaryingResponseSizes verifies that the Streamer handles the +// responses of vastly variable sizes reasonably well. It is a regression test +// for #113729. +func TestStreamerVaryingResponseSizes(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + skip.UnderStress(t, "the test is too memory intensive") + skip.UnderRace(t, "the test is too memory intensive") + + // Start a cluster with large --max-sql-memory parameter since we'll be + // inserting relatively large amount of data. + s, db, _ := serverutils.StartServer(t, base.TestServerArgs{ + SQLMemoryPoolSize: 1 << 30, /* 1GiB */ + }) + defer s.Stopper().Stop(context.Background()) + + runner := sqlutils.MakeSQLRunner(db) + // Create a table with 10 ranges, with 3k rows in each. Within each range, + // first 1k rows are relatively small, then next 1k rows are medium, and + // the last 1k rows are large. + runner.Exec(t, ` +CREATE TABLE t ( + k INT PRIMARY KEY, + v STRING, + blob STRING, + INDEX t_v_idx (v ASC) +); + +INSERT INTO t SELECT 3000 * (i // 1000) + i % 1000, '1', repeat('a', 600 + i % 200) FROM generate_series(1, 10000) AS g(i); +INSERT INTO t SELECT 3000 * (i // 1000) + i % 1000 + 1000, '1', repeat('a', 3000 + i % 1000) FROM generate_series(1, 10000) AS g(i); +INSERT INTO t SELECT 3000 * (i // 1000) + i % 1000 + 2000, '1', repeat('a', 20000 + i) FROM generate_series(1, 10000) AS g(i); + +ALTER TABLE t SPLIT AT SELECT generate_series(1, 30000, 3000); +`) + + // The meat of the test - run the query that performs an index join to fetch + // all rows via the streamer, both in the OutOfOrder and InOrder modes. Each + // time assert that the number of BatchRequests issued is in double digits + // (if not, then the streamer was extremely suboptimal). + kvGRPCCallsRegex := regexp.MustCompile(`KV gRPC calls: (\d+,)`) + for inOrder := range []bool{false, true} { + runner.Exec(t, `SET streamer_always_maintain_ordering = $1;`, inOrder) + for i := 0; i < 2; i++ { + var gRPCCalls int + var err error + rows := runner.QueryStr(t, `EXPLAIN ANALYZE SELECT length(blob) FROM t@t_v_idx WHERE v = '1';`) + for _, row := range rows { + if matches := kvGRPCCallsRegex.FindStringSubmatch(row[0]); len(matches) > 0 { + gRPCCalls, err = strconv.Atoi(strings.ReplaceAll(matches[1], ",", "")) + require.NoError(t, err) + break + } + } + require.Greater(t, 100, gRPCCalls, rows) + } + } +} diff --git a/pkg/sql/colfetcher/index_join.go b/pkg/sql/colfetcher/index_join.go index 1e7564d4ab27..bcc02d6e0f34 100644 --- a/pkg/sql/colfetcher/index_join.go +++ b/pkg/sql/colfetcher/index_join.go @@ -538,6 +538,7 @@ func NewColIndexJoin( flowCtx.Stopper(), txn, flowCtx.EvalCtx.Settings, + flowCtx.EvalCtx.SessionData(), spec.LockingWaitPolicy, spec.LockingStrength, streamerBudgetLimit, diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index d16cd73c8601..18b503b62307 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -3419,6 +3419,18 @@ func (m *sessionDataMutator) SetStreamerAlwaysMaintainOrdering(val bool) { m.data.StreamerAlwaysMaintainOrdering = val } +func (m *sessionDataMutator) SetStreamerInOrderEagerMemoryUsageFraction(val float64) { + m.data.StreamerInOrderEagerMemoryUsageFraction = val +} + +func (m *sessionDataMutator) SetStreamerOutOfOrderEagerMemoryUsageFraction(val float64) { + m.data.StreamerOutOfOrderEagerMemoryUsageFraction = val +} + +func (m *sessionDataMutator) SetStreamerHeadOfLineOnlyFraction(val float64) { + m.data.StreamerHeadOfLineOnlyFraction = val +} + func (m *sessionDataMutator) SetUnboundedParallelScans(val bool) { m.data.UnboundedParallelScans = val } diff --git a/pkg/sql/logictest/testdata/logic_test/information_schema b/pkg/sql/logictest/testdata/logic_test/information_schema index 94366c085fe2..7bfba3e2d17f 100644 --- a/pkg/sql/logictest/testdata/logic_test/information_schema +++ b/pkg/sql/logictest/testdata/logic_test/information_schema @@ -4838,6 +4838,9 @@ standard_conforming_strings on statement_timeout 0 streamer_always_maintain_ordering off streamer_enabled on +streamer_head_of_line_only_fraction 0.8 +streamer_in_order_eager_memory_usage_fraction 0.5 +streamer_out_of_order_eager_memory_usage_fraction 0.8 stub_catalog_tables on synchronize_seqscans on synchronous_commit on diff --git a/pkg/sql/logictest/testdata/logic_test/pg_catalog b/pkg/sql/logictest/testdata/logic_test/pg_catalog index 29c201e7d265..90b543993258 100644 --- a/pkg/sql/logictest/testdata/logic_test/pg_catalog +++ b/pkg/sql/logictest/testdata/logic_test/pg_catalog @@ -2880,6 +2880,9 @@ standard_conforming_strings on N statement_timeout 0 NULL NULL NULL string streamer_always_maintain_ordering off NULL NULL NULL string streamer_enabled on NULL NULL NULL string +streamer_head_of_line_only_fraction 0.8 NULL NULL NULL string +streamer_in_order_eager_memory_usage_fraction 0.5 NULL NULL NULL string +streamer_out_of_order_eager_memory_usage_fraction 0.8 NULL NULL NULL string stub_catalog_tables on NULL NULL NULL string synchronize_seqscans on NULL NULL NULL string synchronous_commit on NULL NULL NULL string @@ -3030,6 +3033,9 @@ standard_conforming_strings on N statement_timeout 0 NULL user NULL 0s 0s streamer_always_maintain_ordering off NULL user NULL off off streamer_enabled on NULL user NULL on on +streamer_head_of_line_only_fraction 0.8 NULL user NULL 0.8 0.8 +streamer_in_order_eager_memory_usage_fraction 0.5 NULL user NULL 0.5 0.5 +streamer_out_of_order_eager_memory_usage_fraction 0.8 NULL user NULL 0.8 0.8 stub_catalog_tables on NULL user NULL on on synchronize_seqscans on NULL user NULL on on synchronous_commit on NULL user NULL on on @@ -3179,6 +3185,9 @@ standard_conforming_strings NULL NULL NULL statement_timeout NULL NULL NULL NULL NULL streamer_always_maintain_ordering NULL NULL NULL NULL NULL streamer_enabled NULL NULL NULL NULL NULL +streamer_head_of_line_only_fraction NULL NULL NULL NULL NULL +streamer_in_order_eager_memory_usage_fraction NULL NULL NULL NULL NULL +streamer_out_of_order_eager_memory_usage_fraction NULL NULL NULL NULL NULL stub_catalog_tables NULL NULL NULL NULL NULL synchronize_seqscans NULL NULL NULL NULL NULL synchronous_commit NULL NULL NULL NULL NULL diff --git a/pkg/sql/logictest/testdata/logic_test/show_source b/pkg/sql/logictest/testdata/logic_test/show_source index d2b09a3febb9..585aaaff27db 100644 --- a/pkg/sql/logictest/testdata/logic_test/show_source +++ b/pkg/sql/logictest/testdata/logic_test/show_source @@ -141,6 +141,9 @@ standard_conforming_strings on statement_timeout 0 streamer_always_maintain_ordering off streamer_enabled on +streamer_head_of_line_only_fraction 0.8 +streamer_in_order_eager_memory_usage_fraction 0.5 +streamer_out_of_order_eager_memory_usage_fraction 0.8 stub_catalog_tables on synchronize_seqscans on synchronous_commit on diff --git a/pkg/sql/row/BUILD.bazel b/pkg/sql/row/BUILD.bazel index 77a21423d4e8..139cb5c637ec 100644 --- a/pkg/sql/row/BUILD.bazel +++ b/pkg/sql/row/BUILD.bazel @@ -57,6 +57,7 @@ go_library( "//pkg/sql/sem/transform", "//pkg/sql/sem/tree", "//pkg/sql/sem/volatility", + "//pkg/sql/sessiondata", "//pkg/sql/sessiondatapb", "//pkg/sql/span", "//pkg/sql/sqlerrors", diff --git a/pkg/sql/row/kv_fetcher.go b/pkg/sql/row/kv_fetcher.go index a1fd62679252..edb268fd0dfd 100644 --- a/pkg/sql/row/kv_fetcher.go +++ b/pkg/sql/row/kv_fetcher.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/rowinfra" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -131,6 +132,7 @@ func NewStreamingKVFetcher( stopper *stop.Stopper, txn *kv.Txn, st *cluster.Settings, + sd *sessiondata.SessionData, lockWaitPolicy descpb.ScanLockingWaitPolicy, lockStrength descpb.ScanLockingStrength, streamerBudgetLimit int64, @@ -147,6 +149,7 @@ func NewStreamingKVFetcher( stopper, txn, st, + sd, getWaitPolicy(lockWaitPolicy), streamerBudgetLimit, streamerBudgetAcc, diff --git a/pkg/sql/rowexec/joinreader.go b/pkg/sql/rowexec/joinreader.go index 6fbadf171eb1..5d3aaa03e291 100644 --- a/pkg/sql/rowexec/joinreader.go +++ b/pkg/sql/rowexec/joinreader.go @@ -536,6 +536,7 @@ func newJoinReader( flowCtx.Stopper(), jr.txn, flowCtx.EvalCtx.Settings, + flowCtx.EvalCtx.SessionData(), spec.LockingWaitPolicy, spec.LockingStrength, streamerBudgetLimit, diff --git a/pkg/sql/sessiondatapb/session_data.proto b/pkg/sql/sessiondatapb/session_data.proto index ed2d488ef763..89c64800daed 100644 --- a/pkg/sql/sessiondatapb/session_data.proto +++ b/pkg/sql/sessiondatapb/session_data.proto @@ -111,6 +111,19 @@ message SessionData { // This session variable is introduced as a possible workaround in case we // have more bugs like #113013. bool streamer_always_maintain_ordering = 27; + // StreamerInOrderEagerMemoryUsageFraction controls the fraction of the + // streamer's memory budget that might be used for issuing requests eagerly, + // in the InOrder mode. + double streamer_in_order_eager_memory_usage_fraction = 28; + // StreamerOutOfOrderEagerMemoryUsageFraction controls the fraction of the + // streamer's memory budget that might be used for issuing requests eagerly, + // in the OutOfOrder mode. + double streamer_out_of_order_eager_memory_usage_fraction = 29; + // StreamerHeadOfLineOnlyFraction controls the fraction of the available + // streamer's memory budget that will be used to set the TargetBytes limit on + // head-of-the-line request in case the "eager" memory usage limit has been + // exceeded. + double streamer_head_of_line_only_fraction = 30; } // DataConversionConfig contains the parameters that influence the output diff --git a/pkg/sql/vars.go b/pkg/sql/vars.go index a31303f27370..faf1fbda6ce7 100644 --- a/pkg/sql/vars.go +++ b/pkg/sql/vars.go @@ -2510,6 +2510,79 @@ var varGen = map[string]sessionVar{ GlobalDefault: globalFalse, }, + // CockroachDB extension. + `streamer_in_order_eager_memory_usage_fraction`: { + GetStringVal: makeFloatGetStringValFn(`streamer_in_order_eager_memory_usage_fraction`), + Get: func(evalCtx *extendedEvalContext, _ *kv.Txn) (string, error) { + return formatFloatAsPostgresSetting(evalCtx.SessionData().StreamerInOrderEagerMemoryUsageFraction), nil + }, + GlobalDefault: func(sv *settings.Values) string { + return "0.5" + }, + Set: func(_ context.Context, m sessionDataMutator, s string) error { + f, err := strconv.ParseFloat(s, 64) + if err != nil { + return err + } + // Note that we permit fractions above 1.0 to allow for disabling + // the "eager memory usage" limit. + if f < 0 { + return pgerror.New(pgcode.InvalidParameterValue, "streamer_in_order_eager_memory_usage_fraction must be non-negative") + } + m.SetStreamerInOrderEagerMemoryUsageFraction(f) + return nil + }, + }, + + // CockroachDB extension. + `streamer_out_of_order_eager_memory_usage_fraction`: { + GetStringVal: makeFloatGetStringValFn(`streamer_out_of_order_eager_memory_usage_fraction`), + Get: func(evalCtx *extendedEvalContext, _ *kv.Txn) (string, error) { + return formatFloatAsPostgresSetting(evalCtx.SessionData().StreamerOutOfOrderEagerMemoryUsageFraction), nil + }, + GlobalDefault: func(sv *settings.Values) string { + return "0.8" + }, + Set: func(_ context.Context, m sessionDataMutator, s string) error { + f, err := strconv.ParseFloat(s, 64) + if err != nil { + return err + } + // Note that we permit fractions above 1.0 to allow for disabling + // the "eager memory usage" limit. + if f < 0 { + return pgerror.New(pgcode.InvalidParameterValue, "streamer_out_of_order_eager_memory_usage_fraction must be non-negative") + } + m.SetStreamerOutOfOrderEagerMemoryUsageFraction(f) + return nil + }, + }, + + // CockroachDB extension. + `streamer_head_of_line_only_fraction`: { + GetStringVal: makeFloatGetStringValFn(`streamer_head_of_line_only_fraction`), + Get: func(evalCtx *extendedEvalContext, _ *kv.Txn) (string, error) { + return formatFloatAsPostgresSetting(evalCtx.SessionData().StreamerHeadOfLineOnlyFraction), nil + }, + GlobalDefault: func(sv *settings.Values) string { + return "0.8" + }, + Set: func(_ context.Context, m sessionDataMutator, s string) error { + f, err := strconv.ParseFloat(s, 64) + if err != nil { + return err + } + // Note that we permit fractions above 1.0 to allow for giving + // head-of-the-line batch more memory that is available - this will + // put the budget in debt. + if f < 0 { + return pgerror.New(pgcode.InvalidParameterValue, "streamer_head_of_line_only_fraction must be non-negative") + } + m.SetStreamerHeadOfLineOnlyFraction(f) + return nil + }, + }, + // CockroachDB extension. `unbounded_parallel_scans`: { GetStringVal: makePostgresBoolGetStringValFn(`unbounded_parallel_scans`),