Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvstreamer: fix a bug with incorrect processing of empty scan responses #75888

Merged
merged 2 commits into from
Feb 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 62 additions & 66 deletions pkg/kv/kvclient/kvstreamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,10 +215,6 @@ type Streamer struct {

enqueueKeys []int

// waitForResults is used to block GetResults() call until some results are
// available.
waitForResults chan struct{}

mu struct {
// If the budget's mutex also needs to be locked, the budget's mutex
// must be acquired first.
Expand Down Expand Up @@ -261,6 +257,10 @@ type Streamer struct {
// by GetResults() to the caller which the caller hasn't processed yet.
numUnreleasedResults int

// hasResults is used by the client's goroutine to block until there are
// some results to be picked up.
hasResults *sync.Cond

// results are the results of already completed requests that haven't
// been returned by GetResults() yet.
results []Result
Expand Down Expand Up @@ -321,6 +321,7 @@ func NewStreamer(
budget: newBudget(acc, limitBytes),
}
s.mu.hasWork = sync.NewCond(&s.mu.Mutex)
s.mu.hasResults = sync.NewCond(&s.mu.Mutex)
s.coordinator = workerCoordinator{
s: s,
txn: txn,
Expand Down Expand Up @@ -362,7 +363,6 @@ func (s *Streamer) Init(mode OperationMode, hints Hints, maxKeysPerRow int) {
}
s.hints = hints
s.maxKeysPerRow = int32(maxKeysPerRow)
s.waitForResults = make(chan struct{}, 1)
}

// Enqueue dispatches multiple requests for execution. Results are delivered
Expand Down Expand Up @@ -583,45 +583,23 @@ func (s *Streamer) enqueueMemoryAccountingLocked(
// result slice is returned once all enqueued requests have been responded to.
func (s *Streamer) GetResults(ctx context.Context) ([]Result, error) {
s.mu.Lock()
results := s.mu.results
err := s.mu.err
s.mu.results = nil
allComplete := s.mu.numCompleteRequests == s.mu.numEnqueuedRequests
// Non-blockingly clear the waitForResults channel in case we've just picked
// up some results. We do so while holding the mutex so that new results
// aren't appended.
select {
case <-s.waitForResults:
default:
}
s.mu.Unlock()

if len(results) > 0 || allComplete || err != nil {
return results, err
}

select {
case <-ctx.Done():
return nil, ctx.Err()
case <-s.waitForResults:
s.mu.Lock()
results = s.mu.results
err = s.mu.err
defer s.mu.Unlock()
for {
results := s.mu.results
s.mu.results = nil
s.mu.Unlock()
return results, err
}
}

// notifyGetResultsLocked non-blockingly sends a message on waitForResults
// channel. This method should be called only while holding the lock of s.mu so
// that other results couldn't be appended which would cause us to miss the
// notification about that.
func (s *Streamer) notifyGetResultsLocked() {
s.mu.AssertHeld()
select {
case s.waitForResults <- struct{}{}:
default:
allComplete := s.mu.numCompleteRequests == s.mu.numEnqueuedRequests
if len(results) > 0 || allComplete || s.mu.err != nil {
return results, s.mu.err
}
s.mu.hasResults.Wait()
// Check whether the Streamer has been canceled or closed while we were
// waiting for the results.
if err := ctx.Err(); err != nil {
// No need to use setErrorLocked here because the current goroutine
// is the only one blocking on hasResults condition variable.
s.mu.err = err
return nil, err
}
}
}

Expand All @@ -642,7 +620,7 @@ func (s *Streamer) setErrorLocked(err error) {
if s.mu.err == nil {
s.mu.err = err
}
s.notifyGetResultsLocked()
s.mu.hasResults.Signal()
}

// Close cancels all in-flight operations and releases all of the resources of
Expand All @@ -655,6 +633,11 @@ func (s *Streamer) Close() {
s.mu.done = true
// Unblock the coordinator in case it is waiting for more work.
s.mu.hasWork.Signal()
// Note that only the client's goroutine can be blocked waiting for the
// results, and Close() is called only by the same goroutine, so
// signaling hasResult condition variable isn't necessary. However, we
// choose to be safe and do it anyway.
s.mu.hasResults.Signal()
s.mu.Unlock()
// Unblock the coordinator in case it is waiting for the budget.
s.budget.mu.waitForBudget.Signal()
Expand Down Expand Up @@ -1265,23 +1248,36 @@ func (w *workerCoordinator) processSingleRangeResults(
resumeReqIdx++
} else {
// This Get was completed.
result := Result{
GetResp: get,
// This currently only works because all requests
// are unique.
EnqueueKeysSatisfied: []int{enqueueKey},
position: req.positions[i],
if get.Value != nil {
// Create a Result only for non-empty Get responses.
result := Result{
GetResp: get,
// This currently only works because all requests
// are unique.
EnqueueKeysSatisfied: []int{enqueueKey},
position: req.positions[i],
}
result.memoryTok.streamer = w.s
result.memoryTok.toRelease = getResponseSize(get)
memoryTokensBytes += result.memoryTok.toRelease
results = append(results, result)
}
result.memoryTok.streamer = w.s
result.memoryTok.toRelease = getResponseSize(get)
memoryTokensBytes += result.memoryTok.toRelease
results = append(results, result)
// Note that we count this Get response as complete regardless
// of the fact whether it is empty or not.
numCompleteGetResponses++
}

case *roachpb.ScanRequest:
scan := reply.(*roachpb.ScanResponse)
if len(scan.Rows) > 0 || len(scan.BatchResponses) > 0 {
if len(scan.Rows) > 0 || len(scan.BatchResponses) > 0 || scan.ResumeSpan == nil {
// Only the last part of the conditional is true whenever we
// received an empty response for the Scan request (i.e. there
// was no data in the span to scan). In such a scenario we still
// create a Result with no data that the client will skip over
// (this approach makes it easier to support Scans that span
// multiple ranges and the last range has no data in it - we
// want to be able to set Complete field on such an empty
// Result).
result := Result{
// This currently only works because all requests
// are unique.
Expand Down Expand Up @@ -1321,13 +1317,9 @@ func (w *workerCoordinator) processSingleRangeResults(
}
}

// If we have any results, finalize them.
if len(results) > 0 {
w.finalizeSingleRangeResults(
results, memoryFootprintBytes, hasNonEmptyScanResponse,
numCompleteGetResponses,
)
}
w.finalizeSingleRangeResults(
results, memoryFootprintBytes, hasNonEmptyScanResponse, numCompleteGetResponses,
)

// If we have any incomplete requests, add them back into the work
// pool.
Expand All @@ -1340,8 +1332,6 @@ func (w *workerCoordinator) processSingleRangeResults(
// singleRangeBatch. By "finalization" we mean setting Complete field of
// ScanResp to correct value for all scan responses, updating the estimate of an
// average response size, and telling the Streamer about these results.
//
// This method assumes that results has length greater than zero.
func (w *workerCoordinator) finalizeSingleRangeResults(
results []Result,
actualMemoryReservation int64,
Expand Down Expand Up @@ -1389,9 +1379,15 @@ func (w *workerCoordinator) finalizeSingleRangeResults(
w.s.mu.avgResponseEstimator.update(actualMemoryReservation, int64(len(results)))
w.s.mu.numCompleteRequests += numCompleteResponses
w.s.mu.numUnreleasedResults += len(results)
// Store the results and non-blockingly notify the Streamer about them.
w.s.mu.results = append(w.s.mu.results, results...)
w.s.notifyGetResultsLocked()
if len(results) > 0 || numCompleteResponses > 0 {
// We want to signal the condition variable when either we have some
// results to return to the client or we received some empty responses.
// The latter is needed so that the client doesn't block forever
// thinking there are more requests in flight when, in fact, all
// responses have already come back empty.
w.s.mu.hasResults.Signal()
}
}

var zeroIntSlice []int
Expand Down
98 changes: 94 additions & 4 deletions pkg/kv/kvclient/kvstreamer/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@ import (
func getStreamer(
ctx context.Context, s serverutils.TestServerInterface, limitBytes int64, acc *mon.BoundAccount,
) *Streamer {
rootTxn := kv.NewTxn(ctx, s.DB(), s.NodeID())
return NewStreamer(
s.DistSenderI().(*kvcoord.DistSender),
s.Stopper(),
kv.NewTxn(ctx, s.DB(), s.NodeID()),
kv.NewLeafTxn(ctx, s.DB(), s.NodeID(), rootTxn.GetLeafTxnInputState(ctx)),
cluster.MakeTestingClusterSettings(),
lock.WaitPolicy(0),
limitBytes,
Expand Down Expand Up @@ -91,10 +92,12 @@ func TestStreamerLimitations(t *testing.T) {
streamer := getStreamer()
defer streamer.Close()
streamer.Init(OutOfOrder, Hints{UniqueRequests: true}, 1 /* maxKeysPerRow */)
get := roachpb.NewGet(roachpb.Key("key"), false /* forUpdate */)
// Use a Scan request for this test case because Gets of non-existent
// keys aren't added to the results.
scan := roachpb.NewScan(roachpb.Key("key"), roachpb.Key("key1"), false /* forUpdate */)
reqs := []roachpb.RequestUnion{{
Value: &roachpb.RequestUnion_Get{
Get: get.(*roachpb.GetRequest),
Value: &roachpb.RequestUnion_Scan{
Scan: scan.(*roachpb.ScanRequest),
},
}}
require.NoError(t, streamer.Enqueue(ctx, reqs, nil /* enqueueKeys */))
Expand Down Expand Up @@ -422,3 +425,90 @@ func TestStreamerWideRows(t *testing.T) {
})
}
}

// TestStreamerEmptyScans verifies that the Streamer behaves correctly when
// Scan requests return empty responses.
func TestStreamerEmptyScans(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Start a cluster with large --max-sql-memory parameter so that the
// Streamer isn't hitting the root budget exceeded error.
const rootPoolSize = 1 << 30 /* 1GiB */
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{
SQLMemoryPoolSize: rootPoolSize,
})
ctx := context.Background()
defer s.Stopper().Stop(ctx)

// Create a dummy table for which we know the encoding of valid keys.
// Although not strictly necessary, we set up two column families since with
// a single family in production a Get request would have been used.
_, err := db.Exec("CREATE TABLE t (pk INT PRIMARY KEY, k INT, blob STRING, INDEX (k), FAMILY (pk, k), FAMILY (blob))")
require.NoError(t, err)

// Split the table into 5 ranges and populate the range cache.
for pk := 1; pk < 5; pk++ {
_, err = db.Exec(fmt.Sprintf("ALTER TABLE t SPLIT AT VALUES(%d)", pk))
require.NoError(t, err)
}
_, err = db.Exec("SELECT count(*) from t")
require.NoError(t, err)

makeScanRequest := func(start, end int) roachpb.RequestUnion {
var res roachpb.RequestUnion
var scan roachpb.ScanRequest
var union roachpb.RequestUnion_Scan
makeKey := func(pk int) []byte {
// These numbers essentially make a key like '/t/primary/pk'.
return []byte{240, 137, byte(136 + pk)}
}
scan.Key = makeKey(start)
scan.EndKey = makeKey(end)
union.Scan = &scan
res.Value = &union
return res
}

getStreamer := func() *Streamer {
s := getStreamer(ctx, s, math.MaxInt64, nil /* acc */)
// There are two column families in the table.
s.Init(OutOfOrder, Hints{UniqueRequests: true}, 2 /* maxKeysPerRow */)
return s
}

t.Run("scan single range", func(t *testing.T) {
streamer := getStreamer()
defer streamer.Close()

// Scan the row with pk=0.
reqs := make([]roachpb.RequestUnion, 1)
reqs[0] = makeScanRequest(0, 1)
require.NoError(t, streamer.Enqueue(ctx, reqs, nil /* enqueueKeys */))
results, err := streamer.GetResults(ctx)
require.NoError(t, err)
// We expect a single empty Scan response.
require.Equal(t, 1, len(results))
})

t.Run("scan multiple ranges", func(t *testing.T) {
streamer := getStreamer()
defer streamer.Close()

// Scan the rows with pk in range [1, 4).
reqs := make([]roachpb.RequestUnion, 1)
reqs[0] = makeScanRequest(1, 4)
require.NoError(t, streamer.Enqueue(ctx, reqs, nil /* enqueueKeys */))
// We expect an empty response for each range.
var numResults int
for {
results, err := streamer.GetResults(ctx)
require.NoError(t, err)
numResults += len(results)
if len(results) == 0 {
break
}
}
require.Equal(t, 3, numResults)
})
}
4 changes: 0 additions & 4 deletions pkg/sql/mem_limit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,6 @@ func TestStreamerTightBudget(t *testing.T) {
_, err = db.Exec(fmt.Sprintf("SET distsql_workmem = '%dB'", blobSize))
require.NoError(t, err)

// TODO(yuzefovich): remove this once the streamer is enabled by default.
_, err = db.Exec("SET CLUSTER SETTING sql.distsql.use_streamer.enabled = true;")
require.NoError(t, err)

// Perform an index join to read the blobs.
query := "EXPLAIN ANALYZE SELECT sum(length(blob)) FROM t@t_k_idx WHERE k = 1"
maximumMemoryUsageRegex := regexp.MustCompile(`maximum memory usage: (\d+\.\d+) MiB`)
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/row/kv_batch_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,11 +467,16 @@ func (f *txnKVFetcher) nextBatch(
if len(t.BatchResponses) > 0 {
batchResp, f.remainingBatches = popBatch(t.BatchResponses)
}
// Note that t.Rows and batchResp might be nil when the ScanResponse
// is empty, and the caller (the KVFetcher) will skip over it.
return true, t.Rows, batchResp, nil
case *roachpb.ReverseScanResponse:
if len(t.BatchResponses) > 0 {
batchResp, f.remainingBatches = popBatch(t.BatchResponses)
}
// Note that t.Rows and batchResp might be nil when the
// ReverseScanResponse is empty, and the caller (the KVFetcher) will
// skip over it.
return true, t.Rows, batchResp, nil
case *roachpb.GetResponse:
if t.IntentValue != nil {
Expand Down
Loading