Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql,kv: introduce Streamer API and use it for index joins in some cases #68430

Merged
merged 3 commits into from
Jan 12, 2022

Conversation

yuzefovich
Copy link
Member

@yuzefovich yuzefovich commented Aug 4, 2021

execinfrapb: add a helper for index joins based on the JoinReaderSpec

Release note: None

rowexec: refactor the joinReader to not exceed the batch size

The joinReader operates by buffering the input rows until a certain size
limit (which is dependent on the strategy). Previously, the buffering
would stop right after the size limit is reached or exceeded, and this
commit refactors the code to not exceed the limit except in a case of
a single large row. This is what we already do for vectorized index
joins.

Release note: None

sql,kv: introduce Streamer API and use it for index joins in some cases

This commit introduces the Streamer API (see
https://github.com/cockroachdb/cockroach/blob/master/docs/RFCS/20210617_index_lookups_memory_limits.md)
as well as its implementation for the simplest case - when requests are
unique and can be served in any order. It additionally hooks up the
implementation to be used by the index joins in both execution engines.

There are three main pieces that this commit adds:

  1. the Streamer struct itself. It is largely the same as described in
    the RFC. Some notable changes are:
  • Cancel is renamed to Close and is made blocking to ensure that all
    goroutines kicked off by the Streamer exit before Close returns.
  • Shrink has been removed from the budget struct (see below for
    more details).
  • furthermore, the budget interface has been unexported and the
    streamer has been tightly coupled with the budget's implementation.
  • the TODO about collecting DistSQL metadata is removed because we are
    collecting the LeafTxnFinalState already when using the LeafTxn.
  1. the limited Streamer implementation - only OutOfOrder mode is
    supported when the requests are unique. Notably, buffering and caching
    of the results is not implemented yet.
  2. TxnKVStreamer component that sits below the SQL fetchers, uses the
    Streamer, and is an adapter from BatchResponses to key/value pairs
    that fetchers understand. Although not used at the moment,
    TxnKVStreamer is written under the assumption that a single result can
    satisfy multiple requests.

The memory budget of the Streamer is utilized lazily. The RFC was
calling for making a large reservation upfront and then shrinking the
budget if we see that we don't need that large reservation; however,
while working on this I realized that lazy reservations are a better fit
for this. The Streamer can reserve up to the specified limit
(determined by distsql_workmem variable) against the root monitor (in
the degenerate case of a single large row more memory will be reserved).
The reservation then never shrinks under the assumption that if the
reservation has gotten large, it means it was needed for higher
concurrency (or large responses), and it is likely to be needed for the
same reasons in the future.

The layout of the main components of the Streamer implementation:

  • in Enqueue we have a logic similar to what DistSender does in order
    to split each request (that might span multiple ranges) into
    single-range requests. Those sub-requests are batched together to be
    evaluated by a single BatchRequest.
  • workerCoordinator.mainLoop is responsible for taking single-range
    batches, estimating the corresponding response size, and issuing
    requests to be evaluated in parallel while adhering to the provided
    memory budget.
  • workerCoordinator.performRequestAsync is responsible for populating
    the BatchRequest and then processing the results while updating the
    memory budget.

Current known limitations that will be addressed in the follow-up work:

  • at the moment a single row can be split across multiple BatchResponses
    when TargetBytes limit is reached when the table has multiple column
    families; therefore, we use the streamer only for single column family
    cases. We will expand the KV API shortly to not split the rows in
    multiple column family cases.
  • manual refresh of spans when ReadWithinUncertaintyIntervalError is
    encountered by a single streamer in a single flow is not implemented. It
    is an optimization that is considered a must for the final
    implementation in order to not regress on simple cases in terms of
    retriable errors. This will be implemented shortly as a follow-up.
  • I'm thinking that eventually we probably want to disable the batch
    splitting done by the DistSender to eliminate unnecessary blocking when
    the streamer's splitting was incorrect. This would give us some
    performance improvements in face of range boundary changes, but it
    doesn't seem important enough for the initial implementation.

Addresses: #54680.

Release note: None

@yuzefovich yuzefovich added the do-not-merge bors won't merge a PR with this label. label Aug 4, 2021
@cockroach-teamcity
Copy link
Member

This change is Reviewable

Copy link
Collaborator

@rharding6373 rharding6373 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarifying! Looks great. It would be nice to have a bit more test coverage of the streamer. I made some suggestions in a comment. Otherwise, the code :lgtm: 🎉

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @ajwerner, @andreimatei, @erikgrinaker, @rharding6373, @rytaft, and @yuzefovich)


pkg/sql/row/kv_batch_streamer.go, line 77 at r9 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

That's a very good catch. Indeed, as the code was written, we would create a new Streamer for each set of spans to scan (so for each batch of spans to lookup) in the vectorized index join. The row-by-row joinReader was already behaving as desired, and now the vectorized index join has been fixed.

(I think I ended up with this broken state due to the fact that initially I thought about using the streamer only in the row-by-row engine cause I mistakenly thought a new processor/operator implementation would be needed. Later though I realized that we can only implement the new TxnKVStreamer as an adapter, and everything else would just work. However, I incorrectly plumbed that into the vectorized engine. Thanks for catching!)

Ok, so my current understanding of the vectorized usage is that we will create a new cFetcher and TxnKVStreamer for every batch of spans, but the Streamer will be passed to the new ones so we'll reuse it for subsequent Enqueues. Thanks for clarifying/fixing, it makes more sense now.


pkg/kv/kvclient/kvstreamer/streamer_test.go, line 98 at r12 (raw file):

// when the keys to lookup are large (i.e. the enqueued requests themselves have
// large memory footprint).
func TestLargeKeys(t *testing.T) {

It would be good to add more tests to streamer_test for better coverage. As a start, some potentially useful tests that come to mind are:

  • Happy path typical use case with multiple calls to Enqueue
  • KV layer returns results out of order
  • KV layer returns results in order
  • KV layer returns results one at a time
  • An input that goes over budget errors as expected
  • Similar to TestLargeKeys, a test that mixes several small keys (where multiple can be requested at once) before and after one big key to make sure they are handled appropriately. Especially if the KV layer returns results one at a time.

Copy link
Member Author

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for all the reviews Rachael!

Before merging, I'll give some more time to @andreimatei in case he wants to take another look.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @ajwerner, @andreimatei, @erikgrinaker, @rharding6373, and @rytaft)


pkg/sql/row/kv_batch_streamer.go, line 77 at r9 (raw file):

Previously, rharding6373 (Rachael Harding) wrote…

Ok, so my current understanding of the vectorized usage is that we will create a new cFetcher and TxnKVStreamer for every batch of spans, but the Streamer will be passed to the new ones so we'll reuse it for subsequent Enqueues. Thanks for clarifying/fixing, it makes more sense now.

That's not quite right. We create only a single cFetcher, but that cFetcher will create a new TxnKVStreamer for every batch of spans, and for latter we'll use the same kvstreamer.Streamer. I.e. we have ColIndexJoin using a single cFetcher that on each StartScanStreaming call creates a new TxnKVStreamer which uses the single kvstreamer.Streamer. TxnKVStreamer is a light-weight object, so we don't care much about optimizing the allocations of that struct.

For reference, in the old path (i.e. not using the Streamer) the sequence is as follows: single cFetcher -> many row.KVFetchers each creating new txnKVFetcher -> global DistSender.


pkg/kv/kvclient/kvstreamer/streamer_test.go, line 98 at r12 (raw file):
Thanks for the suggestions! I incorporated some of them, see my thoughts below.

Happy path typical use case with multiple calls to Enqueue

I think this is well covered by existing logic tests since an index join is such a ubiquitous operation. If it doesn't work in the happy case, then CRDB will likely not be able to start :)

KV layer returns results out of order
KV layer returns results in order

These two are probably well covered already and seem very hard to simulate in a unit test (some ideas that come to mind - introducing some knobs, or mocking out the KV layer).

KV layer returns results one at a time

I don't know how to simulate this.

An input that goes over budget errors as expected

Added another unit test for Enqueue. Also note there already exists TestMemoryLimit which exercises memory limit exceeded scenario in an end-to-end test, and I set it up such that the Streamer API is used.

Similar to TestLargeKeys, a test that mixes several small keys (where multiple can be requested at once) before and after one big key to make sure they are handled appropriately. Especially if the KV layer returns results one at a time.

I, instead, adjusted TestLargeKeys to have two different scenarios - "only large keys" (the old one) and "random mix of large and small keys" (the new one). I think this achieves (sometimes) what you described.


I think what I'm struggling test-wise is that I don't see an easy way to unit test the Streamer in isolation without using end-to-end tests (which we already have plenty) or having to implement some of the logic that SQL fetchers perform in order to construct valid requests that would be interesting.

Copy link
Contributor

@erikgrinaker erikgrinaker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @ajwerner, @andreimatei, @erikgrinaker, @rharding6373, @rytaft, and @yuzefovich)


pkg/kv/kvclient/kvcoord/streamer.go, line 73 at r4 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

Callers are expected to measure the size of reqs they are enqueuing. In a case when a single key to lookup is very large, the streamer now makes an exception and will proceed with processing the corresponding request, even if it puts its budget in debt, but only if the request is alone.

I added a corresponding test which revealed that in the row-by-row engine we currently create such requests that can exceed the streamer's budget on their own, so I added the second commit which adjusted that and brought the row-by-row joinReader to be similar to the vectorized ColIndexJoin - i.e. buffer requests up to the limit without exceeding it, except in a degenerate case of a single large request.

Another solution is to allow the budget to go into debt all the time based on the footprint of the enqueued requests. Then, all the requests will be processed sequentially with TargetBytes == 1 until the budget gets out of debt.

I'm not sure what option is better, currently leaning towards allowing the budget to go into debt only if the large request is alone. Thoughts?

Great, thanks. I think that's reasonable, and it matches the default semantics of the KV API.

Copy link
Contributor

@andreimatei andreimatei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've taken a look and left a couple of comments. There's a bit more here that I can do justice for without taking a lot of time, so feel free to go ahead with me. Unless you particularly want me, in which case I'm happy to review more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @ajwerner, @andreimatei, @erikgrinaker, @rharding6373, @rytaft, and @yuzefovich)


pkg/kv/kvclient/kvstreamer/avg_response_estimator.go, line 15 at r13 (raw file):

// avgResponseEstimator is a helper that estimates the average size of responses
// received by the Streamer. It is **not** thread-safe.
type avgResponseEstimator struct {

I haven't fully followed the flow, but what I imagined we'd want to estimate is the size of a row, (or even a single key, if rows are too complicated). For scans, the size of a "response" is influenced by the memory limit that we put on the request, right? So


pkg/kv/kvclient/kvstreamer/avg_response_estimator.go, line 16 at r13 (raw file):

// received by the Streamer. It is **not** thread-safe.
type avgResponseEstimator struct {
	actual struct {

seems weird to me to have a single anonymous struct field. Is there a point to it?


pkg/kv/kvclient/kvstreamer/avg_response_estimator.go, line 17 at r13 (raw file):

type avgResponseEstimator struct {
	actual struct {
		totalSize         int64

consider responseBytes


pkg/kv/kvclient/kvstreamer/avg_response_estimator.go, line 18 at r13 (raw file):

	actual struct {
		totalSize         int64
		totalNumResponses int64

I think the simpler numResponses would be just as good, if not better


pkg/kv/kvclient/kvstreamer/avg_response_estimator.go, line 38 at r13 (raw file):

// update updates the actual information based on numResponses responses that
// took up actualResponsesSize bytes and correspond to a single BatchResponse.
func (e *avgResponseEstimator) update(actualResponsesSize int64, numResponses int64) {

since you've extracted these helpers into this struct, it seems inappropriate to me to name an argument actual here. That suggest a reference to some other quantity, and this struct is unaware of any context. Just call it responseBytes.


pkg/kv/kvclient/kvstreamer/budget.go, line 23 at r13 (raw file):

// client.
//
// This struct is a wrapper on top of mon.BoundAccount because we want to

budget also provides waiting, which mon.BoundAccount doesn't, right?


pkg/kv/kvclient/kvstreamer/budget.go, line 50 at r13 (raw file):

// The budget itself is responsible for staying under the limit, so acc should
// be bound to an unlimited memory monitor. This is needed in order to support
// the case of budget going into debt. Note that although it is an "unlimited

I think this comment is confusing. acc's monitor can be limited (and in fact is limited; the comment says that it's both limited and not limited). The budget has its own limit, which is expected to be lower that the monitor's. This budget's relationship to the monitor is not special (is it?) so I don't think it deserves any particular comment.


pkg/kv/kvclient/kvstreamer/budget.go, line 67 at r13 (raw file):

// available returns how many bytes are currently available in the budget. The
// answer can be negative, in case the Streamer has used un-budgeted memory

make a reference to the "debt" that you talk about in other places


pkg/kv/kvclient/kvstreamer/budget.go, line 82 at r13 (raw file):

// root pool budget is used up such that the budget's limit cannot be fully
// reserved.
// - allowDebt indicates whether the budget is allowed to go into debt on this

please add words here further clarifying that an error can be returned even when allowDebt is set.


pkg/kv/kvclient/kvstreamer/streamer.go, line 338 at r13 (raw file):

	}
	s.hints = hints
	s.coordinator.hasWork = make(chan struct{}, 1)

Personally, I try to avoid buffered channels whenever I can. They complicated, cause you have to keep track of the possible state of the channel in relation to the state of requestsToServe. I think things will likely get simpler if you use a sync.Cond.


pkg/kv/kvclient/kvstreamer/streamer.go, line 476 at r13 (raw file):

		}

		s.mu.requestsToServe = append(s.mu.requestsToServe, r)

I think it's fishy that there's two places that append to requestsToServe - this one, and addRequest(). I'd try to use addRequest() here.


pkg/kv/kvclient/kvstreamer/streamer.go, line 504 at r13 (raw file):

	}

	// This will never block because hasWork is a buffered channel, and new

addRequest() deals with hasWork being full. Perhaps the code paths that call addRequests need that, and this call path doesn't, but I'd rather not have to think about that. So consider using addRequest(). And also consider turning the channel into a sync.Cond.


pkg/kv/kvclient/kvstreamer/streamer.go, line 600 at r13 (raw file):

}

type singleRangeBatch struct {

pls give a comment to this struct, suggesting the context in which its used.


pkg/kv/kvclient/kvstreamer/streamer.go, line 611 at r13 (raw file):

	// reqsAccountedFor tracks the memory reservation against the Budget for the
	// memory usage of reqs.
	reqsAccountedFor int64

consider reservedBytes


pkg/kv/kvclient/kvstreamer/streamer.go, line 667 at r13 (raw file):

				return
			}
			requestsToServe, avgResponseSize, ok = w.getRequests()

replace these duplicated lines with a continue


pkg/kv/kvclient/kvstreamer/streamer.go, line 730 at r13 (raw file):

A boolean that indicates whether the coordinator should exit is returned.

Consider returning an error from this and doing s.setError(err) in the caller. Otherwise you need more words to explain under which circumstances this returns true. Well, a word on behavior under budget trouble


pkg/kv/kvclient/kvstreamer/streamer.go, line 742 at r13 (raw file):

		w.s.mu.Unlock()
	}()
	// TODO(yuzefovich): check whether holding the budget's lock here is

I think TODO does more harm than good staying in the code, so one way or another I'd get rid of it.


pkg/kv/kvclient/kvstreamer/streamer.go, line 773 at r13 (raw file):

			targetBytes = availableBudget
		}
		allowDebt := headOfLine

nit: here and in the similar code in performRequestsAsync I think the allowDebt variable hurts more than it helps, because it tries to decouple two things that really are coupled: the expectation from the behavior of consumeLocked and headOfLine. Consider passing headOfLine /* allowDebt */ below.

But actually, what bothers me is that headOfLine has already been incorporated into the targetBytes computation above. So it's quite confusing to pass it here again. The most extra debt we can incur because of this consumeLocked() call is 1 byte, right? So consider adding a flavor of consumeLocked that no longer takes allowDebt. Maybe consumeChecked - since the budget has already been "checked" by the caller?


pkg/kv/kvclient/kvstreamer/streamer.go, line 776 at r13 (raw file):

		if err := w.s.budget.consumeLocked(ctx, targetBytes, allowDebt); err != nil {
			if !headOfLine {
				// There are some requests in flight, so we'll let them finish.

Please put more words here describing the thinking. I for one am not convinced at the moment that continuing is worth doing.


pkg/kv/kvclient/kvstreamer/streamer.go, line 794 at r13 (raw file):

// addRequest adds a single-range batch to be processed later.
func (w *workerCoordinator) addRequest(req singleRangeBatch) {
	// TODO(yuzefovich): in InOrder mode we cannot just append.

so what gives?


pkg/kv/kvclient/kvstreamer/streamer.go, line 814 at r13 (raw file):

// memory limitBytes), the "resume" single-range batch will be added into
// requestsToServe, and mainLoop will pick that up to process later.
// - targetBytes specifies the memory budget that this single-range batch should

say that targetBytes have already been consumed from the budget by the caller (right?) and talk a bit about how that reservation is adjusted and ultimately who owns it


pkg/kv/kvclient/kvstreamer/streamer.go, line 816 at r13 (raw file):
Please give a hint about how this is used.

In OutOfOrder mode any request can be treated as such.

This seems like a pretty weak statement to me; should a caller always set this or not? I'd either say more, or only focus on describing what the param does and not mention OutOfOrder at all.


pkg/kv/kvclient/kvstreamer/streamer.go, line 877 at r13 (raw file):

			// non-empty responses. This will be equal to the sum of the all
			// resultMemoryTokens created.
			var actualMemoryReservation int64

if you name it actual, I'm left wondering what it's in contrast to. Consider naming it more straight-forward, as memoryFootprintBytes.


pkg/kv/kvclient/kvstreamer/streamer.go, line 966 at r13 (raw file):

				if err := w.s.budget.consume(ctx, toConsume, allowDebt); err != nil {
					w.s.budget.release(ctx, targetBytes)
					if !headOfLine {

I'm bothered that I don't fully understand the principle guiding the situations when we continue after a budget error versus those when we don't. Here, we discard the result and continue when !headOfLine, and error out otherwise. If we're headOfLine, we don't attempt to re-execute with a smaller target (or to chop the response to the available budget). I think I would benefit from a comment explaining in which cases we can get a negative overaccountedTotal, so that I can form an opinion about this logic.

Copy link
Contributor

@andreimatei andreimatei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @ajwerner, @andreimatei, @erikgrinaker, @rharding6373, @rytaft, and @yuzefovich)


pkg/kv/kvclient/kvstreamer/avg_response_estimator.go, line 15 at r13 (raw file):

Previously, andreimatei (Andrei Matei) wrote…

I haven't fully followed the flow, but what I imagined we'd want to estimate is the size of a row, (or even a single key, if rows are too complicated). For scans, the size of a "response" is influenced by the memory limit that we put on the request, right? So

Scratch this comment, I was confused. But remind me, do we want to support partial scans? If so, how is the estimator supposed to deal with them?

Copy link
Member Author

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @ajwerner, @andreimatei, @erikgrinaker, @rharding6373, and @rytaft)


pkg/kv/kvclient/kvstreamer/avg_response_estimator.go, line 15 at r13 (raw file):

Previously, andreimatei (Andrei Matei) wrote…

Scratch this comment, I was confused. But remind me, do we want to support partial scans? If so, how is the estimator supposed to deal with them?

That's a good question. Yes, we do support partial scans - whenever TargetBytes is set too low so that the scan response cannot be complete within a single range, then we'll be receiving partial responses, and we'll currently account each part of the whole response separately, thus, skewing the average to smaller values. This is suboptimal, and I left a TODO to follow up on this.


pkg/kv/kvclient/kvstreamer/avg_response_estimator.go, line 16 at r13 (raw file):

Previously, andreimatei (Andrei Matei) wrote…

seems weird to me to have a single anonymous struct field. Is there a point to it?

No point at the moment (this was a remnant of the past when I tried to use an exponential moving average), removed.


pkg/kv/kvclient/kvstreamer/avg_response_estimator.go, line 17 at r13 (raw file):

Previously, andreimatei (Andrei Matei) wrote…

consider responseBytes

Done.


pkg/kv/kvclient/kvstreamer/avg_response_estimator.go, line 18 at r13 (raw file):

Previously, andreimatei (Andrei Matei) wrote…

I think the simpler numResponses would be just as good, if not better

Done.


pkg/kv/kvclient/kvstreamer/avg_response_estimator.go, line 38 at r13 (raw file):

Previously, andreimatei (Andrei Matei) wrote…

since you've extracted these helpers into this struct, it seems inappropriate to me to name an argument actual here. That suggest a reference to some other quantity, and this struct is unaware of any context. Just call it responseBytes.

Done.


pkg/kv/kvclient/kvstreamer/budget.go, line 23 at r13 (raw file):

Previously, andreimatei (Andrei Matei) wrote…

budget also provides waiting, which mon.BoundAccount doesn't, right?

Done.


pkg/kv/kvclient/kvstreamer/budget.go, line 50 at r13 (raw file):

Previously, andreimatei (Andrei Matei) wrote…

I think this comment is confusing. acc's monitor can be limited (and in fact is limited; the comment says that it's both limited and not limited). The budget has its own limit, which is expected to be lower that the monitor's. This budget's relationship to the monitor is not special (is it?) so I don't think it deserves any particular comment.

acc's monitor has to be "unlimited" in a sense that it doesn't have its own local limit, but only inherits the limit from the root SQL monitor (through several "generations" of monitors - something like budget monitor -> flow monitor -> txn monitor -> root SQL monitor with all monitors except for the root being "unlimited"). If acc's monitor were to have a local limit (say distsql_workmem), then the budget wouldn't be able to go into debt above the value of distsql_workmem limit (64MiB by default).

I think the first two sentences of this paragraph should be kept, but the last sentence could be dropped if you think it'll be less confusing overall. My goal with the last sentence was to alleviate the concern of a reader when they see "unlimited memory monitor" phrase. @andreimatei @rharding6373 please let me know what phrasing you prefer here.


pkg/kv/kvclient/kvstreamer/budget.go, line 67 at r13 (raw file):

Previously, andreimatei (Andrei Matei) wrote…

make a reference to the "debt" that you talk about in other places

Done.


pkg/kv/kvclient/kvstreamer/budget.go, line 82 at r13 (raw file):

Previously, andreimatei (Andrei Matei) wrote…

please add words here further clarifying that an error can be returned even when allowDebt is set.

Done.


pkg/kv/kvclient/kvstreamer/streamer.go, line 338 at r13 (raw file):

Previously, andreimatei (Andrei Matei) wrote…

Personally, I try to avoid buffered channels whenever I can. They complicated, cause you have to keep track of the possible state of the channel in relation to the state of requestsToServe. I think things will likely get simpler if you use a sync.Cond.

Done.


pkg/kv/kvclient/kvstreamer/streamer.go, line 476 at r13 (raw file):

Previously, andreimatei (Andrei Matei) wrote…

I think it's fishy that there's two places that append to requestsToServe - this one, and addRequest(). I'd try to use addRequest() here.

To me this seems justified because Enqueue possibly appends many requests, and it holds s.mu until it fully splits all enqueued requests into singleRangeBatches. In other words, unless we refactor the locking of s.mu in Enqueue, there is no reason to signal the worker coordinator about newly appended requests to requestsToServe because the coordinator will just block on s.mu. For now, I'll keep it this way, and I'll be looking at performance later.


pkg/kv/kvclient/kvstreamer/streamer.go, line 504 at r13 (raw file):

Previously, andreimatei (Andrei Matei) wrote…

addRequest() deals with hasWork being full. Perhaps the code paths that call addRequests need that, and this call path doesn't, but I'd rather not have to think about that. So consider using addRequest(). And also consider turning the channel into a sync.Cond.

Indeed, code paths that call addRequests needed that and here we didn't need that (because pipelining is not implemented currently). I did change hasWork from a buffered channel into sync.Cond.


pkg/kv/kvclient/kvstreamer/streamer.go, line 600 at r13 (raw file):

Previously, andreimatei (Andrei Matei) wrote…

pls give a comment to this struct, suggesting the context in which its used.

Done.


pkg/kv/kvclient/kvstreamer/streamer.go, line 611 at r13 (raw file):

Previously, andreimatei (Andrei Matei) wrote…

consider reservedBytes

Renamed to reqsReservedBytes so that it's not confused with the reservation of targetBytes for the responses.


pkg/kv/kvclient/kvstreamer/streamer.go, line 667 at r13 (raw file):

Previously, andreimatei (Andrei Matei) wrote…

replace these duplicated lines with a continue

Nice, thanks.


pkg/kv/kvclient/kvstreamer/streamer.go, line 730 at r13 (raw file):

Previously, andreimatei (Andrei Matei) wrote…

A boolean that indicates whether the coordinator should exit is returned.

Consider returning an error from this and doing s.setError(err) in the caller. Otherwise you need more words to explain under which circumstances this returns true. Well, a word on behavior under budget trouble

Done.


pkg/kv/kvclient/kvstreamer/streamer.go, line 742 at r13 (raw file):

Previously, andreimatei (Andrei Matei) wrote…

I think TODO does more harm than good staying in the code, so one way or another I'd get rid of it.

Done.


pkg/kv/kvclient/kvstreamer/streamer.go, line 773 at r13 (raw file):

The most extra debt we can incur because of this consumeLocked() call is 1 byte, right?

No, the debt can be arbitrarily large here.

Imagine that the budget has a limit of 1MiB, and the streamer is told to retrieve a single KV for which key part takes up 2MiB, and the value part is another 3MiB. Also, let's imagine that we have perfect estimation telling us that the response will be 3MiB in size. In Enqueue we will put the budget into 1MiB of debt because the key's span exceeds the budget's limit by this much. In such a case we'll override availableBudget to 1 byte in issueRequestsForAsyncProcessing, so we'll issue the BatchRequest with TargetBytes = 1 and will prohibit the empty response. When the BatchResponse comes back, we put the budget into 3MiB more of debt, for the total of 4MiB debt.


pkg/kv/kvclient/kvstreamer/streamer.go, line 776 at r13 (raw file):

Previously, andreimatei (Andrei Matei) wrote…

Please put more words here describing the thinking. I for one am not convinced at the moment that continuing is worth doing.

Done.


pkg/kv/kvclient/kvstreamer/streamer.go, line 794 at r13 (raw file):

Previously, andreimatei (Andrei Matei) wrote…

so what gives?

The logic of updating w.s.mu.requestsToServe will need to be refactored. Removed this TODO, since it's a pretty obvious thing to do when maintaining the ordering is being implemented.


pkg/kv/kvclient/kvstreamer/streamer.go, line 814 at r13 (raw file):

Previously, andreimatei (Andrei Matei) wrote…

say that targetBytes have already been consumed from the budget by the caller (right?) and talk a bit about how that reservation is adjusted and ultimately who owns it

Done.


pkg/kv/kvclient/kvstreamer/streamer.go, line 816 at r13 (raw file):

Previously, andreimatei (Andrei Matei) wrote…

Please give a hint about how this is used.

In OutOfOrder mode any request can be treated as such.

This seems like a pretty weak statement to me; should a caller always set this or not? I'd either say more, or only focus on describing what the param does and not mention OutOfOrder at all.

Done.


pkg/kv/kvclient/kvstreamer/streamer.go, line 877 at r13 (raw file):

Previously, andreimatei (Andrei Matei) wrote…

if you name it actual, I'm left wondering what it's in contrast to. Consider naming it more straight-forward, as memoryFootprintBytes.

Done.


pkg/kv/kvclient/kvstreamer/streamer.go, line 966 at r13 (raw file):

Previously, andreimatei (Andrei Matei) wrote…

I'm bothered that I don't fully understand the principle guiding the situations when we continue after a budget error versus those when we don't. Here, we discard the result and continue when !headOfLine, and error out otherwise. If we're headOfLine, we don't attempt to re-execute with a smaller target (or to chop the response to the available budget). I think I would benefit from a comment explaining in which cases we can get a negative overaccountedTotal, so that I can form an opinion about this logic.

Added a comment. I came up with two scenarios when overaccountedTotal can be negative, and both of them seem like edge cases to me.

@yuzefovich yuzefovich force-pushed the streamer branch 2 times, most recently from ab13aa8 to 0df2ff0 Compare January 11, 2022 00:45
Copy link
Member Author

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Andrei, thanks for the review! I know that Becca has some more comments, so I'll definitely wait for her to post the feedback. I'd like to merge this by the end of this week, so there is still some time for folks to review this further if interested.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @ajwerner, @andreimatei, @erikgrinaker, @rharding6373, and @rytaft)

Copy link
Collaborator

@rytaft rytaft left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! Just a few additional comments.

Reviewed 27 of 27 files at r6, 1 of 1 files at r7, 19 of 30 files at r8, 6 of 9 files at r9, 1 of 5 files at r10, 1 of 2 files at r11, 1 of 2 files at r12, 2 of 3 files at r13, 2 of 3 files at r14, 1 of 1 files at r15, all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @andreimatei, @erikgrinaker, @rharding6373, @rytaft, and @yuzefovich)


pkg/sql/rowexec/joinreader.go, line 150 at r7 (raw file):

	// pendingRow tracks the row that has already been read from the input but
	// was not included into the lookup batch because it would make the batch
	// exceed curBatchSizeBytes.

curBatchSizeBytes -> batchSizeBytes ?


pkg/sql/rowexec/joinreader.go, line 707 at r7 (raw file):

	var encDatumRow rowenc.EncDatumRow
	var meta *execinfrapb.ProducerMetadata
	var rowSize int64

Why do these variables need to be defined outside of the loop? I think it would be a bit clearer (and less error-prone) if they were scoped inside the loop.


pkg/sql/rowexec/joinreader.go, line 732 at r7 (raw file):

			if jr.curBatchSizeBytes > 0 && jr.curBatchSizeBytes+rowSize > jr.batchSizeBytes {
				// Adding this row to the current batch will make the batch
				// exceed jr.curBatchSizeBytes. Additionally, the batch is not

curBatchSizeBytes -> batchSizeBytes ?

Also, why does it matter if the batch is not empty?


pkg/kv/kvclient/kvcoord/streamer.go, line 147 at r4 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

Used EnqueueKeys since I think it is less ambiguous ("request keys" could also be thought of roachpb.Keys that are used in roachpb.RequestUnions).

I don't love the name EnqueueKeys... any other option that would be a bit more self documenting? Maybe RequestKeysSatisfied?


pkg/kv/kvclient/kvcoord/streamer.go, line 789 at r4 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

I switched to concrete implementations for the Streamer and budget, so it seems a bit difficult to unit test this :/ Any ideas for this without introducing the interfaces back? This workerCoordinator is tightly coupled with the Streamer too.

I think the tests you added are good. Also I think the logic of this function is easier to understand now so I'm less concerned about unit-testing this specifically.


pkg/kv/kvclient/kvstreamer/streamer.go, line 180 at r11 (raw file):

// The parallelism is achieved by splitting the incoming requests into
// single-range batches where each such batch will hit a fast-path in the
// DistSender (unless there have been range boundaries' changes). Since these

nit: range boundaries' changes -> changes to range boundaries


pkg/kv/kvclient/kvstreamer/streamer.go, line 363 at r11 (raw file):

// return an error if that footprint exceeds the Streamer's limitBytes. The
// exception is made only when a single request is enqueued in order to allow
// the caller proceed when the key to lookup is arbitrarily large. As a rule of

nit: allow the caller proceed -> allow the caller to proceed


pkg/kv/kvclient/kvstreamer/streamer.go, line 778 at r15 (raw file):

		targetBytes := int64(len(singleRangeReqs.reqs)) * avgResponseSize
		if targetBytes > availableBudget {
			targetBytes = availableBudget

This seems suspicious -- is this what is getting accounted for by the memory monitor in the budget? Don't we want that memory monitor to track what we actually expect the size of the response to be?

Seems to me like this is more complicated than it needs to be. Why not just do the following:

availableBudget := w.s.budget.limitBytes - w.s.budget.mu.acc.Used()
singleRangeReqs := requestsToServe[numRequestsIssued]
targetBytes := int64(len(singleRangeReqs.reqs)) * avgResponseSize
if availableBudget < targetBytes {
	if !headOfLine {
		// We don't have enough budget available to serve this request,
		// and there are other requests in flight, so we'll wait for
		// some of them to finish.
		break
	}
	budgetIsExhausted = true
}
if err := w.s.budget.consumeLocked(ctx, targetBytes, headOfLine /* allowDebt */); err != nil {
	...
}

pkg/kv/kvclient/kvstreamer/streamer.go, line 792 at r15 (raw file):

				// other requests are fully processed (i.e. the corresponding
				// results are Release()'d), we'll be able to make progress on
				// this request too, without exceeding the root memory pool.

but what if we're actually about to OOM? Is it worth adding a second return value to consumeLocked to differentiate?

Copy link
Member Author

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review!

Had to rebase on top of master to resolve merge conflicts and merge skew.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @andreimatei, @erikgrinaker, @rharding6373, and @rytaft)


pkg/sql/rowexec/joinreader.go, line 150 at r7 (raw file):

Previously, rytaft (Rebecca Taft) wrote…

curBatchSizeBytes -> batchSizeBytes ?

Yep, thanks.


pkg/sql/rowexec/joinreader.go, line 707 at r7 (raw file):

Previously, rytaft (Rebecca Taft) wrote…

Why do these variables need to be defined outside of the loop? I think it would be a bit clearer (and less error-prone) if they were scoped inside the loop.

Good point, done.


pkg/sql/rowexec/joinreader.go, line 732 at r7 (raw file):
Fixed, thanks.

Also, why does it matter if the batch is not empty?

If the batch is empty and adding this (first) row would exceed batchSizeBytes, then we have a degenerate case of a very large key to lookup. If we don't add this row into the batch, then we'd be stalled - we'd generate no spans, so we'd not perform the lookup of anything.


pkg/kv/kvclient/kvcoord/streamer.go, line 147 at r4 (raw file):

Previously, rytaft (Rebecca Taft) wrote…

I don't love the name EnqueueKeys... any other option that would be a bit more self documenting? Maybe RequestKeysSatisfied?

I think currently there is nice symmetry between Enqueue calls and []Result returned by GetResults - the former has enqueueKeys argument and the latter uses those values. I agree that EnqueueKeys is not great, and RequestKeysSatisfied sounds better in isolation, but it is more confusing when looking at the Streamer API as a whole. I don't feel too strongly about this though, so please let me know if my reasoning doesn't make sense.


pkg/kv/kvclient/kvstreamer/streamer.go, line 778 at r15 (raw file):

This seems suspicious -- is this what is getting accounted for by the memory monitor in the budget? Don't we want that memory monitor to track what we actually expect the size of the response to be?

Yes, targetBytes will be registered with the memory account. Then, for headOfLine == false requests the response will not exceed TargetBytes limit, so we should be good on the budget usage here.

What you are proposing could work; however, we would not be utilizing the available budget to the fullest and would only expect to receive "full" responses (based on our estimate). As the code written currently, we can issue a request for which we expect to receive a partial result.

Consider the following scenario: our budget is 3MiB, and we have two requests both of which we expect to take up 2MiB in response. As the code is written, we'll issue the first request with the full 2MiB and the second with 1MiB, so we'll receive the full first response and the half second response; then we'll only need to issue the second half of the second request. (This discussion assumes that we're performing Scans, not Gets, and that we could receive that partial response.) With your proposal we'll be sending out one request at a time, but we'll be receiving full responses every time. I think the strategy with partial responses should be faster. I guess we could also track whether there are only Gets in the request we're about to send out with partial targetBytes, and if there are no Scans, then follow your proposal. Our estimate can also be wrong, especially before receiving any responses, so it's probably better to send out more requests given that we're using strict TargetBytes limits. Thoughts?


pkg/kv/kvclient/kvstreamer/streamer.go, line 792 at r15 (raw file):

but what if we're actually about to OOM?

Here I think we should not be concerned about it - the streamer is within its budget and hasn't pushed the root memory pool past its limit (we're bailing on the request that would have done so). For not head-of-the-line request there are two scenarios how we can get the error here: the node is close to exhausting --max-sql-memory (i.e. close to OOM) without this streamer being responsible (when no requests have been issued) or with this streamer playing its part. If it's the latter, I think it's ok to not exit and let the head-of-the-line request make the determination whether to proceed or not. I added more words into the comments. Thoughts?

Copy link
Collaborator

@rytaft rytaft left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 30 of 30 files at r16, 1 of 1 files at r17, 29 of 29 files at r18, all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @andreimatei, @erikgrinaker, @rharding6373, @rytaft, and @yuzefovich)


pkg/sql/rowexec/joinreader.go, line 732 at r7 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

Fixed, thanks.

Also, why does it matter if the batch is not empty?

If the batch is empty and adding this (first) row would exceed batchSizeBytes, then we have a degenerate case of a very large key to lookup. If we don't add this row into the batch, then we'd be stalled - we'd generate no spans, so we'd not perform the lookup of anything.

Makes sense -- can you clarify the comment a bit?


pkg/kv/kvclient/kvcoord/streamer.go, line 147 at r4 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

I think currently there is nice symmetry between Enqueue calls and []Result returned by GetResults - the former has enqueueKeys argument and the latter uses those values. I agree that EnqueueKeys is not great, and RequestKeysSatisfied sounds better in isolation, but it is more confusing when looking at the Streamer API as a whole. I don't feel too strongly about this though, so please let me know if my reasoning doesn't make sense.

Then maybe EnqueueKeysSatisfied? I don't feel too strongly though, feel free to leave it as-is if you prefer.


pkg/kv/kvclient/kvstreamer/streamer.go, line 778 at r15 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

This seems suspicious -- is this what is getting accounted for by the memory monitor in the budget? Don't we want that memory monitor to track what we actually expect the size of the response to be?

Yes, targetBytes will be registered with the memory account. Then, for headOfLine == false requests the response will not exceed TargetBytes limit, so we should be good on the budget usage here.

What you are proposing could work; however, we would not be utilizing the available budget to the fullest and would only expect to receive "full" responses (based on our estimate). As the code written currently, we can issue a request for which we expect to receive a partial result.

Consider the following scenario: our budget is 3MiB, and we have two requests both of which we expect to take up 2MiB in response. As the code is written, we'll issue the first request with the full 2MiB and the second with 1MiB, so we'll receive the full first response and the half second response; then we'll only need to issue the second half of the second request. (This discussion assumes that we're performing Scans, not Gets, and that we could receive that partial response.) With your proposal we'll be sending out one request at a time, but we'll be receiving full responses every time. I think the strategy with partial responses should be faster. I guess we could also track whether there are only Gets in the request we're about to send out with partial targetBytes, and if there are no Scans, then follow your proposal. Our estimate can also be wrong, especially before receiving any responses, so it's probably better to send out more requests given that we're using strict TargetBytes limits. Thoughts?

Ok, got it. I re-reviewed the whole file, so now I think I finally have a sense of how this all works. And I see that you re-account for the responses with the actual size after they are received, so this all makes sense.

I think what I was missing is that "targetBytes" is what gets passed to the kv layer to tell it how much data to return. For non-head-of-line requests this is a strict limit, while for head-of-line requests the limit can be exceeded. You sort of hint at that in the comment above where you set availableBudget to 1, but I was definitely missing the bigger picture. You might consider adding a comment somewhere to explain this.

But anyway, no need to change the code, I think what you've done seems reasonable.


pkg/kv/kvclient/kvstreamer/streamer.go, line 792 at r15 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

but what if we're actually about to OOM?

Here I think we should not be concerned about it - the streamer is within its budget and hasn't pushed the root memory pool past its limit (we're bailing on the request that would have done so). For not head-of-the-line request there are two scenarios how we can get the error here: the node is close to exhausting --max-sql-memory (i.e. close to OOM) without this streamer being responsible (when no requests have been issued) or with this streamer playing its part. If it's the latter, I think it's ok to not exit and let the head-of-the-line request make the determination whether to proceed or not. I added more words into the comments. Thoughts?

Makes sense, thanks


pkg/kv/kvclient/kvstreamer/streamer.go, line 91 at r18 (raw file):

	}
	// EnqueueKeys identifies the requests that this Result satisfies. In
	// OutOfOrder mode, a single Result can satisfy multiple identical requests.

nit: what do you mean by "identical" requests? Different keys that target the same range? Also, for InOrder mode, the consecutive requests would also have to target the same range, correct?


pkg/kv/kvclient/kvstreamer/streamer.go, line 105 at r18 (raw file):

	MemoryTok ResultMemoryToken
	// position tracks the ordinal among all originally enqueued requests that
	// this result satisfies. See singleRangeBatch.positions for more details.

how does position relate to EnqueueKeys? Consider expanding the comment


pkg/kv/kvclient/kvstreamer/streamer.go, line 483 at r18 (raw file):

		s.mu.requestsToServe = append(s.mu.requestsToServe, r)

		// Determine next seek key, taking potentially sparse requests into

Do you have any tests for sparse requests? E.g., if the reqs are for [/2 - /5] and [/21 - /25], and we have range boundaries at 10 and 20?


pkg/kv/kvclient/kvstreamer/streamer.go, line 928 at r18 (raw file):

				case *roachpb.GetRequest:
					get := reply.(*roachpb.GetResponse)
					if get.ResumeSpan != nil {

Would you ever have a partial result from a get? Or is it always all or nothing?


pkg/kv/kvclient/kvstreamer/streamer.go, line 1105 at r18 (raw file):

					w.s.mu.numRangesLeftPerScanRequest[r.position]--
					r.ScanResp.Complete = w.s.mu.numRangesLeftPerScanRequest[r.position] == 0
					numCompleteResponses++

shouldn't this only be incremented if r.ScanResp.Complete is true?

Related: do you have any tests for scans that touch multiple ranges?

Copy link
Member Author

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @andreimatei, @erikgrinaker, @rharding6373, and @rytaft)


pkg/kv/kvclient/kvcoord/streamer.go, line 147 at r4 (raw file):

Previously, rytaft (Rebecca Taft) wrote…

Then maybe EnqueueKeysSatisfied? I don't feel too strongly though, feel free to leave it as-is if you prefer.

Done.


pkg/kv/kvclient/kvstreamer/streamer.go, line 778 at r15 (raw file):

Previously, rytaft (Rebecca Taft) wrote…

Ok, got it. I re-reviewed the whole file, so now I think I finally have a sense of how this all works. And I see that you re-account for the responses with the actual size after they are received, so this all makes sense.

I think what I was missing is that "targetBytes" is what gets passed to the kv layer to tell it how much data to return. For non-head-of-line requests this is a strict limit, while for head-of-line requests the limit can be exceeded. You sort of hint at that in the comment above where you set availableBudget to 1, but I was definitely missing the bigger picture. You might consider adding a comment somewhere to explain this.

But anyway, no need to change the code, I think what you've done seems reasonable.

Sounds good, added more comments into the code.

Note that TargetBytes is not a strict limit only when we have head-of-the-line request AND a single large row is returned in the response (because we have to make progress), if there are some full rows that don't exceed TargetBytes, then KV will not include the row that would put the response above TargetBytes limit.


pkg/kv/kvclient/kvstreamer/streamer.go, line 91 at r18 (raw file):

Previously, rytaft (Rebecca Taft) wrote…

nit: what do you mean by "identical" requests? Different keys that target the same range? Also, for InOrder mode, the consecutive requests would also have to target the same range, correct?

"Identical" requests are exactly the same, regardless of the range boundaries, i.e. the caller had something like reqs = [2]{Scan('a', 'b'), Scan('a', 'b')}. At the moment this cannot occur because we only support unique requests, but in the future the streamer will be performing the de-duplication of such requests.

An open question is whether we want to perform this de-duplication within each range separately (i.e. within singleRangeBatch), then the comment will need an adjustment. It seems beneficial to me to do this per-range de-duplication, but we could do it only for OutOfOrder mode (because for InOrder we cannot make the results be consecutive unless the original requests are fully contained within a single range in which case it is the same as "global" de-duplication).


pkg/kv/kvclient/kvstreamer/streamer.go, line 105 at r18 (raw file):

Previously, rytaft (Rebecca Taft) wrote…

how does position relate to EnqueueKeys? Consider expanding the comment

Done.


pkg/kv/kvclient/kvstreamer/streamer.go, line 483 at r18 (raw file):

Previously, rytaft (Rebecca Taft) wrote…

Do you have any tests for sparse requests? E.g., if the reqs are for [/2 - /5] and [/21 - /25], and we have range boundaries at 10 and 20?

I didn't add any explicit tests for this, but I think we're getting sufficient test coverage with fakedist logic test configs. This code (as well as the comment) was also copied from DistSender.divideAndSendBatchToRanges.


pkg/kv/kvclient/kvstreamer/streamer.go, line 928 at r18 (raw file):

Previously, rytaft (Rebecca Taft) wrote…

Would you ever have a partial result from a get? Or is it always all or nothing?

I think by definition GetRequest can either receive a full or an empty response because the response is a single Value (from a single key-value pair), and we never break up values across responses.


pkg/kv/kvclient/kvstreamer/streamer.go, line 1105 at r18 (raw file):

Previously, rytaft (Rebecca Taft) wrote…

shouldn't this only be incremented if r.ScanResp.Complete is true?

Related: do you have any tests for scans that touch multiple ranges?

Nice catch, fixed.

I started thinking about how to add a regression test for this and realized that currently it's not possible because the Streamer at the moment will only process Get requests (because for index joins to issue Scan requests we need to have multiple column families in the primary index).

Once we add the support of multiple column families, I'll try reverting the fix and checking whether CI would have caught this (I'm pretty sure it would).

Copy link
Collaborator

@rytaft rytaft left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm_strong: Great work!!

Reviewed 29 of 29 files at r19, 29 of 29 files at r20, all commit messages.
Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (and 1 stale) (waiting on @andreimatei, @erikgrinaker, @rharding6373, @rytaft, and @yuzefovich)


pkg/kv/kvclient/kvstreamer/streamer.go, line 91 at r18 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

"Identical" requests are exactly the same, regardless of the range boundaries, i.e. the caller had something like reqs = [2]{Scan('a', 'b'), Scan('a', 'b')}. At the moment this cannot occur because we only support unique requests, but in the future the streamer will be performing the de-duplication of such requests.

An open question is whether we want to perform this de-duplication within each range separately (i.e. within singleRangeBatch), then the comment will need an adjustment. It seems beneficial to me to do this per-range de-duplication, but we could do it only for OutOfOrder mode (because for InOrder we cannot make the results be consecutive unless the original requests are fully contained within a single range in which case it is the same as "global" de-duplication).

You could possibly do it for InOrder too if you buffer results, right? Anyway, up to you if you want to change the comment now or later.


pkg/kv/kvclient/kvstreamer/streamer.go, line 483 at r18 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

I didn't add any explicit tests for this, but I think we're getting sufficient test coverage with fakedist logic test configs. This code (as well as the comment) was also copied from DistSender.divideAndSendBatchToRanges.

👍


pkg/kv/kvclient/kvstreamer/streamer.go, line 1105 at r18 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

Nice catch, fixed.

I started thinking about how to add a regression test for this and realized that currently it's not possible because the Streamer at the moment will only process Get requests (because for index joins to issue Scan requests we need to have multiple column families in the primary index).

Once we add the support of multiple column families, I'll try reverting the fix and checking whether CI would have caught this (I'm pretty sure it would).

sounds good

The joinReader operates by buffering the input rows until a certain size
limit (which is dependent on the strategy). Previously, the buffering
would stop right after the size limit is reached or exceeded, and this
commit refactors the code to not exceed the limit except in a case of
a single large row. This is what we already do for vectorized index
joins.

Release note: None
This commit introduces the Streamer API (see
https://github.com/cockroachdb/cockroach/blob/master/docs/RFCS/20210617_index_lookups_memory_limits.md)
as well as its implementation for the simplest case - when requests are
unique and can be served in any order. It additionally hooks up the
implementation to be used by the index joins in both execution engines.

There are three main pieces that this commit adds:
1. the `Streamer` struct itself. It is largely the same as described in
the RFC. Some notable changes are:
- `Cancel` is renamed to `Close` and is made blocking to ensure that all
goroutines kicked off by the `Streamer` exit before `Close` returns.
- `Shrink` has been removed from the `budget` struct (see below for
more details).
- furthermore, the `budget` interface has been unexported and the
`streamer` has been tightly coupled with the `budget`'s implementation.
- the TODO about collecting DistSQL metadata is removed because we are
collecting the LeafTxnFinalState already when using the LeafTxn.
2. the limited `Streamer` implementation - only `OutOfOrder` mode is
supported when the requests are unique. Notably, buffering and caching
of the results is not implemented yet.
3. `TxnKVStreamer` component that sits below the SQL fetchers, uses the
`Streamer`, and is an adapter from `BatchResponse`s to key/value pairs
that fetchers understand. Although not used at the moment,
`TxnKVStreamer` is written under the assumption that a single result can
satisfy multiple requests.

The memory budget of the `Streamer` is utilized lazily. The RFC was
calling for making a large reservation upfront and then shrinking the
budget if we see that we don't need that large reservation; however,
while working on this I realized that lazy reservations are a better fit
for this. The `Streamer` can reserve up to the specified limit
(determined by `distsql_workmem` variable) against the root monitor (in
the degenerate case of a single large row more memory will be reserved).
The reservation then never shrinks under the assumption that if the
reservation has gotten large, it means it was needed for higher
concurrency (or large responses), and it is likely to be needed for the
same reasons in the future.

The layout of the main components of the `Streamer` implementation:
- in `Enqueue` we have a logic similar to what DistSender does in order
to split each request (that might span multiple ranges) into
single-range requests. Those sub-requests are batched together to be
evaluated by a single `BatchRequest`.
- `workerCoordinator.mainLoop` is responsible for taking single-range
batches, estimating the corresponding response size, and issuing
requests to be evaluated in parallel while adhering to the provided
memory budget.
- `workerCoordinator.performRequestAsync` is responsible for populating
the `BatchRequest` and then processing the results while updating the
memory budget.

Current known limitations that will be addressed in the follow-up work:
- at the moment a single row can be split across multiple BatchResponses
when TargetBytes limit is reached when the table has multiple column
families; therefore, we use the streamer only for single column family
cases. We will expand the KV API shortly to not split the rows in
multiple column family cases.
- manual refresh of spans when `ReadWithinUncertaintyIntervalError` is
encountered by a single streamer in a single flow is not implemented. It
is an optimization that is considered a must for the final
implementation in order to not regress on simple cases in terms of
retriable errors. This will be implemented shortly as a follow-up.
- I'm thinking that eventually we probably want to disable the batch
splitting done by the DistSender to eliminate unnecessary blocking when
the streamer's splitting was incorrect. This would give us some
performance improvements in face of range boundary changes, but it
doesn't seem important enough for the initial implementation.

Release note: None
@yuzefovich yuzefovich dismissed rharding6373’s stale review January 12, 2022 21:15

Already received LGTM from Rachael.

Copy link
Member Author

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks everyone for the feedback! I'll go ahead and merge this, but in case any feedback comes up, I'm more than happy to address it in a follow-up.

bors r+

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (and 1 stale) (waiting on @andreimatei, @erikgrinaker, @rharding6373, and @rytaft)


pkg/kv/kvclient/kvstreamer/streamer.go, line 91 at r18 (raw file):

Previously, rytaft (Rebecca Taft) wrote…

You could possibly do it for InOrder too if you buffer results, right? Anyway, up to you if you want to change the comment now or later.

Hm, EnqueueKeysSatisfied can only have multiple values in InOrder if a single Result satisfies multiple consecutive results, so I think the comment in the code is correct as is. You are right that we can perform buffering (and that we should be doing de-duplication regardless of the mode - my previous comment above was wrong), but the buffered result will be returned later in InOrder, through a separate Result object.

@craig
Copy link
Contributor

craig bot commented Jan 12, 2022

Build succeeded:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants