From d65a236d2fd43d8ec683871e00b4fa2b10d3b4d2 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Thu, 3 Feb 2022 16:58:37 -0800 Subject: [PATCH 1/2] kvstreamer: fix a bug with incorrect processing of empty scan responses Previously, the `Streamer` could get stuck indefinitely when a response for a Scan request came back empty - namely the response neither had no bytes inside nor ResumeSpan set (this is the case when there is no data in the key span to scan). This would lead to `GetResults` call being stuck thinking there are more responses to come back, and none would show up. This is now fixed by creating a Result with an empty Scan response inside of it. This approach makes it easier to support Scans that span multiple ranges and the last range has no data in it - we want to be able to set Complete field on such an empty Result. Since this was the only known bug with the `Streamer` implementation, the streamer is now used by default again. Release note: None --- pkg/kv/kvclient/kvstreamer/streamer.go | 10 ++- pkg/kv/kvclient/kvstreamer/streamer_test.go | 90 ++++++++++++++++++++- pkg/sql/mem_limit_test.go | 4 - pkg/sql/row/kv_batch_fetcher.go | 5 ++ pkg/sql/row/kv_batch_streamer.go | 4 +- 5 files changed, 106 insertions(+), 7 deletions(-) diff --git a/pkg/kv/kvclient/kvstreamer/streamer.go b/pkg/kv/kvclient/kvstreamer/streamer.go index e32244f5118b..bef437264e4c 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer.go +++ b/pkg/kv/kvclient/kvstreamer/streamer.go @@ -1281,7 +1281,15 @@ func (w *workerCoordinator) processSingleRangeResults( case *roachpb.ScanRequest: scan := reply.(*roachpb.ScanResponse) - if len(scan.Rows) > 0 || len(scan.BatchResponses) > 0 { + if len(scan.Rows) > 0 || len(scan.BatchResponses) > 0 || scan.ResumeSpan == nil { + // Only the last part of the conditional is true whenever we + // received an empty response for the Scan request (i.e. there + // was no data in the span to scan). In such a scenario we still + // create a Result with no data that the client will skip over + // (this approach makes it easier to support Scans that span + // multiple ranges and the last range has no data in it - we + // want to be able to set Complete field on such an empty + // Result). result := Result{ // This currently only works because all requests // are unique. diff --git a/pkg/kv/kvclient/kvstreamer/streamer_test.go b/pkg/kv/kvclient/kvstreamer/streamer_test.go index bc65005148d0..2fce85b8c22b 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer_test.go +++ b/pkg/kv/kvclient/kvstreamer/streamer_test.go @@ -38,10 +38,11 @@ import ( func getStreamer( ctx context.Context, s serverutils.TestServerInterface, limitBytes int64, acc *mon.BoundAccount, ) *Streamer { + rootTxn := kv.NewTxn(ctx, s.DB(), s.NodeID()) return NewStreamer( s.DistSenderI().(*kvcoord.DistSender), s.Stopper(), - kv.NewTxn(ctx, s.DB(), s.NodeID()), + kv.NewLeafTxn(ctx, s.DB(), s.NodeID(), rootTxn.GetLeafTxnInputState(ctx)), cluster.MakeTestingClusterSettings(), lock.WaitPolicy(0), limitBytes, @@ -422,3 +423,90 @@ func TestStreamerWideRows(t *testing.T) { }) } } + +// TestStreamerEmptyScans verifies that the Streamer behaves correctly when +// Scan requests return empty responses. +func TestStreamerEmptyScans(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // Start a cluster with large --max-sql-memory parameter so that the + // Streamer isn't hitting the root budget exceeded error. + const rootPoolSize = 1 << 30 /* 1GiB */ + s, db, _ := serverutils.StartServer(t, base.TestServerArgs{ + SQLMemoryPoolSize: rootPoolSize, + }) + ctx := context.Background() + defer s.Stopper().Stop(ctx) + + // Create a dummy table for which we know the encoding of valid keys. + // Although not strictly necessary, we set up two column families since with + // a single family in production a Get request would have been used. + _, err := db.Exec("CREATE TABLE t (pk INT PRIMARY KEY, k INT, blob STRING, INDEX (k), FAMILY (pk, k), FAMILY (blob))") + require.NoError(t, err) + + // Split the table into 5 ranges and populate the range cache. + for pk := 1; pk < 5; pk++ { + _, err = db.Exec(fmt.Sprintf("ALTER TABLE t SPLIT AT VALUES(%d)", pk)) + require.NoError(t, err) + } + _, err = db.Exec("SELECT count(*) from t") + require.NoError(t, err) + + makeScanRequest := func(start, end int) roachpb.RequestUnion { + var res roachpb.RequestUnion + var scan roachpb.ScanRequest + var union roachpb.RequestUnion_Scan + makeKey := func(pk int) []byte { + // These numbers essentially make a key like '/t/primary/pk'. + return []byte{240, 137, byte(136 + pk)} + } + scan.Key = makeKey(start) + scan.EndKey = makeKey(end) + union.Scan = &scan + res.Value = &union + return res + } + + getStreamer := func() *Streamer { + s := getStreamer(ctx, s, math.MaxInt64, nil /* acc */) + // There are two column families in the table. + s.Init(OutOfOrder, Hints{UniqueRequests: true}, 2 /* maxKeysPerRow */) + return s + } + + t.Run("scan single range", func(t *testing.T) { + streamer := getStreamer() + defer streamer.Close() + + // Scan the row with pk=0. + reqs := make([]roachpb.RequestUnion, 1) + reqs[0] = makeScanRequest(0, 1) + require.NoError(t, streamer.Enqueue(ctx, reqs, nil /* enqueueKeys */)) + results, err := streamer.GetResults(ctx) + require.NoError(t, err) + // We expect a single empty Scan response. + require.Equal(t, 1, len(results)) + }) + + t.Run("scan multiple ranges", func(t *testing.T) { + streamer := getStreamer() + defer streamer.Close() + + // Scan the rows with pk in range [1, 4). + reqs := make([]roachpb.RequestUnion, 1) + reqs[0] = makeScanRequest(1, 4) + require.NoError(t, streamer.Enqueue(ctx, reqs, nil /* enqueueKeys */)) + // We expect an empty response for each range. + var numResults int + for { + results, err := streamer.GetResults(ctx) + require.NoError(t, err) + numResults += len(results) + if len(results) == 0 { + break + } + } + require.Equal(t, 3, numResults) + }) +} diff --git a/pkg/sql/mem_limit_test.go b/pkg/sql/mem_limit_test.go index 46ecc22da974..095113637e8a 100644 --- a/pkg/sql/mem_limit_test.go +++ b/pkg/sql/mem_limit_test.go @@ -175,10 +175,6 @@ func TestStreamerTightBudget(t *testing.T) { _, err = db.Exec(fmt.Sprintf("SET distsql_workmem = '%dB'", blobSize)) require.NoError(t, err) - // TODO(yuzefovich): remove this once the streamer is enabled by default. - _, err = db.Exec("SET CLUSTER SETTING sql.distsql.use_streamer.enabled = true;") - require.NoError(t, err) - // Perform an index join to read the blobs. query := "EXPLAIN ANALYZE SELECT sum(length(blob)) FROM t@t_k_idx WHERE k = 1" maximumMemoryUsageRegex := regexp.MustCompile(`maximum memory usage: (\d+\.\d+) MiB`) diff --git a/pkg/sql/row/kv_batch_fetcher.go b/pkg/sql/row/kv_batch_fetcher.go index ea1e3a5be633..9d5675b90346 100644 --- a/pkg/sql/row/kv_batch_fetcher.go +++ b/pkg/sql/row/kv_batch_fetcher.go @@ -467,11 +467,16 @@ func (f *txnKVFetcher) nextBatch( if len(t.BatchResponses) > 0 { batchResp, f.remainingBatches = popBatch(t.BatchResponses) } + // Note that t.Rows and batchResp might be nil when the ScanResponse + // is empty, and the caller (the KVFetcher) will skip over it. return true, t.Rows, batchResp, nil case *roachpb.ReverseScanResponse: if len(t.BatchResponses) > 0 { batchResp, f.remainingBatches = popBatch(t.BatchResponses) } + // Note that t.Rows and batchResp might be nil when the + // ReverseScanResponse is empty, and the caller (the KVFetcher) will + // skip over it. return true, t.Rows, batchResp, nil case *roachpb.GetResponse: if t.IntentValue != nil { diff --git a/pkg/sql/row/kv_batch_streamer.go b/pkg/sql/row/kv_batch_streamer.go index 0aae5582dbd4..286d396580a8 100644 --- a/pkg/sql/row/kv_batch_streamer.go +++ b/pkg/sql/row/kv_batch_streamer.go @@ -38,7 +38,7 @@ var useStreamerEnabled = settings.RegisterBoolSetting( "determines whether the usage of the Streamer API is allowed. "+ "Enabling this will increase the speed of lookup/index joins "+ "while adhering to memory limits.", - false, + true, ) // TxnKVStreamer handles retrieval of key/values. @@ -120,6 +120,8 @@ func (f *TxnKVStreamer) proceedWithLastResult( if len(f.lastResultState.remainingBatches) == 0 { f.processedScanResponse() } + // Note that scan.Rows and batchResp might be nil when the ScanResponse is + // empty, and the caller (the KVFetcher) will skip over it. return false, scan.Rows, batchResp, nil } From 4cc956f1f1b49dfa4ca76270b8b11f733ae12295 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Wed, 2 Feb 2022 22:30:44 -0800 Subject: [PATCH 2/2] kvstreamer: minor cleanup This commit replaces the last buffered channel we had in the `Streamer` code in favor of a condition variable which simplifies the control flow a bit. Additionally, this commit refactors the code so that empty Get responses are not returned which allows us to clean up `TxnKVStreamer` a bit. Care has to be taken so that we still signal the client's goroutine waiting for results when an empty Get response comes back (similar to the bug fixed by the previous commit). Also, the tracking of the "number of outstanding requests" in `TxnKVStreamer` has been removed since it provided little value (and probably was broken anyway). Release note: None --- pkg/kv/kvclient/kvstreamer/streamer.go | 118 +++++++++----------- pkg/kv/kvclient/kvstreamer/streamer_test.go | 8 +- pkg/sql/row/kv_batch_streamer.go | 58 ++-------- 3 files changed, 70 insertions(+), 114 deletions(-) diff --git a/pkg/kv/kvclient/kvstreamer/streamer.go b/pkg/kv/kvclient/kvstreamer/streamer.go index bef437264e4c..0cdca45eca07 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer.go +++ b/pkg/kv/kvclient/kvstreamer/streamer.go @@ -215,10 +215,6 @@ type Streamer struct { enqueueKeys []int - // waitForResults is used to block GetResults() call until some results are - // available. - waitForResults chan struct{} - mu struct { // If the budget's mutex also needs to be locked, the budget's mutex // must be acquired first. @@ -261,6 +257,10 @@ type Streamer struct { // by GetResults() to the caller which the caller hasn't processed yet. numUnreleasedResults int + // hasResults is used by the client's goroutine to block until there are + // some results to be picked up. + hasResults *sync.Cond + // results are the results of already completed requests that haven't // been returned by GetResults() yet. results []Result @@ -321,6 +321,7 @@ func NewStreamer( budget: newBudget(acc, limitBytes), } s.mu.hasWork = sync.NewCond(&s.mu.Mutex) + s.mu.hasResults = sync.NewCond(&s.mu.Mutex) s.coordinator = workerCoordinator{ s: s, txn: txn, @@ -362,7 +363,6 @@ func (s *Streamer) Init(mode OperationMode, hints Hints, maxKeysPerRow int) { } s.hints = hints s.maxKeysPerRow = int32(maxKeysPerRow) - s.waitForResults = make(chan struct{}, 1) } // Enqueue dispatches multiple requests for execution. Results are delivered @@ -583,45 +583,23 @@ func (s *Streamer) enqueueMemoryAccountingLocked( // result slice is returned once all enqueued requests have been responded to. func (s *Streamer) GetResults(ctx context.Context) ([]Result, error) { s.mu.Lock() - results := s.mu.results - err := s.mu.err - s.mu.results = nil - allComplete := s.mu.numCompleteRequests == s.mu.numEnqueuedRequests - // Non-blockingly clear the waitForResults channel in case we've just picked - // up some results. We do so while holding the mutex so that new results - // aren't appended. - select { - case <-s.waitForResults: - default: - } - s.mu.Unlock() - - if len(results) > 0 || allComplete || err != nil { - return results, err - } - - select { - case <-ctx.Done(): - return nil, ctx.Err() - case <-s.waitForResults: - s.mu.Lock() - results = s.mu.results - err = s.mu.err + defer s.mu.Unlock() + for { + results := s.mu.results s.mu.results = nil - s.mu.Unlock() - return results, err - } -} - -// notifyGetResultsLocked non-blockingly sends a message on waitForResults -// channel. This method should be called only while holding the lock of s.mu so -// that other results couldn't be appended which would cause us to miss the -// notification about that. -func (s *Streamer) notifyGetResultsLocked() { - s.mu.AssertHeld() - select { - case s.waitForResults <- struct{}{}: - default: + allComplete := s.mu.numCompleteRequests == s.mu.numEnqueuedRequests + if len(results) > 0 || allComplete || s.mu.err != nil { + return results, s.mu.err + } + s.mu.hasResults.Wait() + // Check whether the Streamer has been canceled or closed while we were + // waiting for the results. + if err := ctx.Err(); err != nil { + // No need to use setErrorLocked here because the current goroutine + // is the only one blocking on hasResults condition variable. + s.mu.err = err + return nil, err + } } } @@ -642,7 +620,7 @@ func (s *Streamer) setErrorLocked(err error) { if s.mu.err == nil { s.mu.err = err } - s.notifyGetResultsLocked() + s.mu.hasResults.Signal() } // Close cancels all in-flight operations and releases all of the resources of @@ -655,6 +633,11 @@ func (s *Streamer) Close() { s.mu.done = true // Unblock the coordinator in case it is waiting for more work. s.mu.hasWork.Signal() + // Note that only the client's goroutine can be blocked waiting for the + // results, and Close() is called only by the same goroutine, so + // signaling hasResult condition variable isn't necessary. However, we + // choose to be safe and do it anyway. + s.mu.hasResults.Signal() s.mu.Unlock() // Unblock the coordinator in case it is waiting for the budget. s.budget.mu.waitForBudget.Signal() @@ -1265,17 +1248,22 @@ func (w *workerCoordinator) processSingleRangeResults( resumeReqIdx++ } else { // This Get was completed. - result := Result{ - GetResp: get, - // This currently only works because all requests - // are unique. - EnqueueKeysSatisfied: []int{enqueueKey}, - position: req.positions[i], + if get.Value != nil { + // Create a Result only for non-empty Get responses. + result := Result{ + GetResp: get, + // This currently only works because all requests + // are unique. + EnqueueKeysSatisfied: []int{enqueueKey}, + position: req.positions[i], + } + result.memoryTok.streamer = w.s + result.memoryTok.toRelease = getResponseSize(get) + memoryTokensBytes += result.memoryTok.toRelease + results = append(results, result) } - result.memoryTok.streamer = w.s - result.memoryTok.toRelease = getResponseSize(get) - memoryTokensBytes += result.memoryTok.toRelease - results = append(results, result) + // Note that we count this Get response as complete regardless + // of the fact whether it is empty or not. numCompleteGetResponses++ } @@ -1329,13 +1317,9 @@ func (w *workerCoordinator) processSingleRangeResults( } } - // If we have any results, finalize them. - if len(results) > 0 { - w.finalizeSingleRangeResults( - results, memoryFootprintBytes, hasNonEmptyScanResponse, - numCompleteGetResponses, - ) - } + w.finalizeSingleRangeResults( + results, memoryFootprintBytes, hasNonEmptyScanResponse, numCompleteGetResponses, + ) // If we have any incomplete requests, add them back into the work // pool. @@ -1348,8 +1332,6 @@ func (w *workerCoordinator) processSingleRangeResults( // singleRangeBatch. By "finalization" we mean setting Complete field of // ScanResp to correct value for all scan responses, updating the estimate of an // average response size, and telling the Streamer about these results. -// -// This method assumes that results has length greater than zero. func (w *workerCoordinator) finalizeSingleRangeResults( results []Result, actualMemoryReservation int64, @@ -1397,9 +1379,15 @@ func (w *workerCoordinator) finalizeSingleRangeResults( w.s.mu.avgResponseEstimator.update(actualMemoryReservation, int64(len(results))) w.s.mu.numCompleteRequests += numCompleteResponses w.s.mu.numUnreleasedResults += len(results) - // Store the results and non-blockingly notify the Streamer about them. w.s.mu.results = append(w.s.mu.results, results...) - w.s.notifyGetResultsLocked() + if len(results) > 0 || numCompleteResponses > 0 { + // We want to signal the condition variable when either we have some + // results to return to the client or we received some empty responses. + // The latter is needed so that the client doesn't block forever + // thinking there are more requests in flight when, in fact, all + // responses have already come back empty. + w.s.mu.hasResults.Signal() + } } var zeroIntSlice []int diff --git a/pkg/kv/kvclient/kvstreamer/streamer_test.go b/pkg/kv/kvclient/kvstreamer/streamer_test.go index 2fce85b8c22b..7c1f6a74be08 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer_test.go +++ b/pkg/kv/kvclient/kvstreamer/streamer_test.go @@ -92,10 +92,12 @@ func TestStreamerLimitations(t *testing.T) { streamer := getStreamer() defer streamer.Close() streamer.Init(OutOfOrder, Hints{UniqueRequests: true}, 1 /* maxKeysPerRow */) - get := roachpb.NewGet(roachpb.Key("key"), false /* forUpdate */) + // Use a Scan request for this test case because Gets of non-existent + // keys aren't added to the results. + scan := roachpb.NewScan(roachpb.Key("key"), roachpb.Key("key1"), false /* forUpdate */) reqs := []roachpb.RequestUnion{{ - Value: &roachpb.RequestUnion_Get{ - Get: get.(*roachpb.GetRequest), + Value: &roachpb.RequestUnion_Scan{ + Scan: scan.(*roachpb.ScanRequest), }, }} require.NoError(t, streamer.Enqueue(ctx, reqs, nil /* enqueueKeys */)) diff --git a/pkg/sql/row/kv_batch_streamer.go b/pkg/sql/row/kv_batch_streamer.go index 286d396580a8..0aeb1cf5fe32 100644 --- a/pkg/sql/row/kv_batch_streamer.go +++ b/pkg/sql/row/kv_batch_streamer.go @@ -46,10 +46,6 @@ type TxnKVStreamer struct { streamer *kvstreamer.Streamer spans roachpb.Spans - // numOutstandingRequests tracks the number of requests that haven't been - // fully responded to yet. - numOutstandingRequests int - // getResponseScratch is reused to return the result of Get requests. getResponseScratch [1]roachpb.KeyValue @@ -82,9 +78,8 @@ func NewTxnKVStreamer( return nil, err } return &TxnKVStreamer{ - streamer: streamer, - spans: spans, - numOutstandingRequests: len(spans), + streamer: streamer, + spans: spans, }, nil } @@ -93,46 +88,30 @@ func NewTxnKVStreamer( // GetResponses). func (f *TxnKVStreamer) proceedWithLastResult( ctx context.Context, -) (skip bool, kvs []roachpb.KeyValue, batchResp []byte, err error) { +) (kvs []roachpb.KeyValue, batchResp []byte, err error) { result := f.lastResultState.Result if get := result.GetResp; get != nil { if get.IntentValue != nil { - return false, nil, nil, errors.AssertionFailedf( + return nil, nil, errors.AssertionFailedf( "unexpectedly got an IntentValue back from a SQL GetRequest %v", *get.IntentValue, ) } - if get.Value == nil { - // Nothing found in this particular response, so we skip it. - f.releaseLastResult(ctx) - return true, nil, nil, nil - } pos := result.EnqueueKeysSatisfied[f.lastResultState.numEmitted] origSpan := f.spans[pos] f.lastResultState.numEmitted++ - f.numOutstandingRequests-- f.getResponseScratch[0] = roachpb.KeyValue{Key: origSpan.Key, Value: *get.Value} - return false, f.getResponseScratch[:], nil, nil + return f.getResponseScratch[:], nil, nil } scan := result.ScanResp if len(scan.BatchResponses) > 0 { batchResp, f.lastResultState.remainingBatches = scan.BatchResponses[0], scan.BatchResponses[1:] } if len(f.lastResultState.remainingBatches) == 0 { - f.processedScanResponse() + f.lastResultState.numEmitted++ } // Note that scan.Rows and batchResp might be nil when the ScanResponse is // empty, and the caller (the KVFetcher) will skip over it. - return false, scan.Rows, batchResp, nil -} - -// processedScanResponse updates the lastResultState before emitting the last -// part of the ScanResponse. This method should be called for each request that -// the ScanResponse satisfies. -func (f *TxnKVStreamer) processedScanResponse() { - f.lastResultState.numEmitted++ - if f.lastResultState.ScanResp.Complete { - f.numOutstandingRequests-- - } + return scan.Rows, batchResp, nil } func (f *TxnKVStreamer) releaseLastResult(ctx context.Context) { @@ -145,17 +124,11 @@ func (f *TxnKVStreamer) releaseLastResult(ctx context.Context) { func (f *TxnKVStreamer) nextBatch( ctx context.Context, ) (ok bool, kvs []roachpb.KeyValue, batchResp []byte, err error) { - if f.numOutstandingRequests == 0 { - // All requests have already been responded to. - f.releaseLastResult(ctx) - return false, nil, nil, nil - } - // Check whether there are more batches in the current ScanResponse. if len(f.lastResultState.remainingBatches) > 0 { batchResp, f.lastResultState.remainingBatches = f.lastResultState.remainingBatches[0], f.lastResultState.remainingBatches[1:] if len(f.lastResultState.remainingBatches) == 0 { - f.processedScanResponse() + f.lastResultState.numEmitted++ } return true, nil, batchResp, nil } @@ -164,7 +137,7 @@ func (f *TxnKVStreamer) nextBatch( if f.lastResultState.numEmitted < len(f.lastResultState.EnqueueKeysSatisfied) { // Note that we should never get an error here since we're processing // the same result again. - _, kvs, batchResp, err = f.proceedWithLastResult(ctx) + kvs, batchResp, err = f.proceedWithLastResult(ctx) return true, kvs, batchResp, err } @@ -174,7 +147,7 @@ func (f *TxnKVStreamer) nextBatch( } // Process the next result we have already received from the streamer. - for len(f.results) > 0 { + if len(f.results) > 0 { // Peel off the next result and set it into lastResultState. f.lastResultState.Result = f.results[0] f.lastResultState.numEmitted = 0 @@ -183,15 +156,8 @@ func (f *TxnKVStreamer) nextBatch( // the next iteration. f.results[0] = kvstreamer.Result{} f.results = f.results[1:] - var skip bool - skip, kvs, batchResp, err = f.proceedWithLastResult(ctx) - if err != nil { - return false, nil, nil, err - } - if skip { - continue - } - return true, kvs, batchResp, nil + kvs, batchResp, err = f.proceedWithLastResult(ctx) + return true, kvs, batchResp, err } // Get more results from the streamer. This call will block until some