Skip to content

Commit

Permalink
Merge pull request #109653 from michae2/backport23.1-109308
Browse files Browse the repository at this point in the history
release-23.1: kvstreamer: add more tracing
  • Loading branch information
rharding6373 authored Aug 29, 2023
2 parents e57d7e6 + 4ff3dfc commit 935db31
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 16 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvclient/kvstreamer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go_library(
"//pkg/roachpb",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/util",
"//pkg/util/admission",
"//pkg/util/admission/admissionpb",
"//pkg/util/bitmap",
Expand Down
19 changes: 19 additions & 0 deletions pkg/kv/kvclient/kvstreamer/requests_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package kvstreamer

import (
"fmt"
"sort"
"sync"

Expand Down Expand Up @@ -144,6 +145,16 @@ func (r singleRangeBatch) subPriority() int32 {
return r.subRequestIdx[0]
}

// String implements fmt.Stringer.
func (r singleRangeBatch) String() string {
return fmt.Sprintf(
"{reqs:%s keys:%v pos:%v subIdx:%v start:%v gets:%v reserved:%v overhead:%v minTarget:%v}",
kvpb.TruncatedRequestsString(r.reqs, 1024), r.reqsKeys, r.positions,
r.subRequestIdx, r.isScanStarted, r.numGetsInReqs, r.reqsReservedBytes, r.overheadAccountedFor,
r.minTargetBytes,
)
}

// requestsProvider encapsulates the logic of supplying the requests to serve in
// the Streamer. The implementations are concurrency safe and have its own
// mutex, separate from the Streamer's and the budget's ones, so the ordering of
Expand Down Expand Up @@ -192,6 +203,9 @@ type requestsProvider interface {
// emptyLocked returns true if there are no requests to serve at the moment.
// The lock of the provider must be already held.
emptyLocked() bool
// lengthLocked returns the number of requests that have yet to be served at
// the moment. The lock of the provider must be already held.
lengthLocked() int
// nextLocked returns the next request to serve. In OutOfOrder mode, the
// request is arbitrary, in InOrder mode, the request is the current
// head-of-the-line. The lock of the provider must be already held. Panics
Expand Down Expand Up @@ -236,6 +250,11 @@ func (b *requestsProviderBase) emptyLocked() bool {
return len(b.requests) == 0
}

func (b *requestsProviderBase) lengthLocked() int {
b.Mutex.AssertHeld()
return len(b.requests)
}

func (b *requestsProviderBase) close() {
b.Lock()
defer b.Unlock()
Expand Down
20 changes: 13 additions & 7 deletions pkg/kv/kvclient/kvstreamer/results_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,10 @@ type resultsBuffer interface {
// goroutine blocked in wait(), the goroutine is woken up.
//
// It is assumed that the budget's mutex is already being held.
doneAddingLocked(context.Context)
//
// doneAddingLocked returns the naumber of results that have been added but
// not yet returned to the client, and whether the client goroutine was woken.
doneAddingLocked(context.Context) (int, bool)

///////////////////////////////////////////////////////////////////////////
// //
Expand Down Expand Up @@ -219,11 +222,13 @@ func (b *resultsBufferBase) accountForOverheadLocked(ctx context.Context, overhe
b.overheadAccountedFor = overheadMemUsage
}

// signal non-blockingly sends on hasResults channel.
func (b *resultsBufferBase) signal() {
// signal non-blockingly sends on hasResults channel and returns whether sent.
func (b *resultsBufferBase) signal() bool {
select {
case b.hasResults <- struct{}{}:
return true
default:
return false
}
}

Expand Down Expand Up @@ -305,9 +310,9 @@ func (b *outOfOrderResultsBuffer) addLocked(r Result) {

const resultSize = int64(unsafe.Sizeof(Result{}))

func (b *outOfOrderResultsBuffer) doneAddingLocked(ctx context.Context) {
func (b *outOfOrderResultsBuffer) doneAddingLocked(ctx context.Context) (int, bool) {
b.accountForOverheadLocked(ctx, int64(cap(b.results))*resultSize)
b.signal()
return len(b.results), b.signal()
}

func (b *outOfOrderResultsBuffer) clearOverhead(ctx context.Context) {
Expand Down Expand Up @@ -511,13 +516,14 @@ func (b *inOrderResultsBuffer) addLocked(r Result) {

const inOrderBufferedResultSize = int64(unsafe.Sizeof(inOrderBufferedResult{}))

func (b *inOrderResultsBuffer) doneAddingLocked(ctx context.Context) {
func (b *inOrderResultsBuffer) doneAddingLocked(ctx context.Context) (int, bool) {
overhead := int64(cap(b.buffered))*inOrderBufferedResultSize + // b.buffered
int64(cap(b.resultScratch))*resultSize // b.resultsScratch
b.accountForOverheadLocked(ctx, overhead)
if len(b.buffered) > 0 && b.buffered[0].Position == b.headOfLinePosition && b.buffered[0].subRequestIdx == b.headOfLineSubRequestIdx {
b.signal()
return len(b.buffered), b.signal()
}
return len(b.buffered), false
}

func (b *inOrderResultsBuffer) clearOverhead(ctx context.Context) {
Expand Down
54 changes: 53 additions & 1 deletion pkg/kv/kvclient/kvstreamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ package kvstreamer

import (
"context"
"fmt"
"math"
"runtime"
"sort"
"sync"
"sync/atomic"
"unicode/utf8"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
Expand All @@ -26,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/bitmap"
Expand Down Expand Up @@ -445,6 +448,11 @@ func (s *Streamer) Init(
// from the previous invocation is prohibited.
// TODO(yuzefovich): lift this restriction and introduce the pipelining.
func (s *Streamer) Enqueue(ctx context.Context, reqs []kvpb.RequestUnion) (retErr error) {
if log.ExpensiveLogEnabled(ctx, 2) {
log.VEventf(ctx, 2, "Enqueue %d original requests: %s", len(reqs),
kvpb.TruncatedRequestsString(reqs, 1024),
)
}
if !s.coordinatorStarted {
var coordinatorCtx context.Context
coordinatorCtx, s.coordinatorCtxCancel = s.stopper.WithCancelOnQuiesce(ctx)
Expand Down Expand Up @@ -702,6 +710,13 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []kvpb.RequestUnion) (retEr
}

// Memory reservation was approved, so the requests are good to go.
if log.ExpensiveLogEnabled(ctx, 2) {
reqStr := fmt.Sprintf("%v", requestsToServe)
if utf8.RuneCountInString(reqStr) > 1024 {
reqStr = util.TruncateString(reqStr, 1022) + "…]"
}
log.VEventf(ctx, 2, "enqueuing %d requests to serve: %s", len(requestsToServe), reqStr)
}
s.requestsToServe.enqueue(requestsToServe)
return nil
}
Expand All @@ -714,11 +729,14 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []kvpb.RequestUnion) (retEr
//
// Calling GetResults() invalidates the results returned on the previous call.
func (s *Streamer) GetResults(ctx context.Context) ([]Result, error) {
log.VEvent(ctx, 2, "GetResults")
defer log.VEvent(ctx, 2, "exiting GetResults")
for {
results, allComplete, err := s.results.get(ctx)
if len(results) > 0 || allComplete || err != nil {
return results, err
}
log.VEvent(ctx, 2, "waiting in GetResults")
if err = s.results.wait(ctx); err != nil {
s.results.setError(err)
return nil, err
Expand Down Expand Up @@ -802,6 +820,8 @@ type workerCoordinator struct {
// is insufficient. The function exits when an error is encountered by one of
// the asynchronous requests.
func (w *workerCoordinator) mainLoop(ctx context.Context) {
log.VEvent(ctx, 2, "starting coordinator main loop")
defer log.VEvent(ctx, 2, "exiting coordinator main loop")
defer w.s.waitGroup.Done()
for {
if err := w.waitForRequests(ctx); err != nil {
Expand Down Expand Up @@ -1009,6 +1029,10 @@ func (w *workerCoordinator) issueRequestsForAsyncProcessing(
defer w.s.budget.mu.Unlock()

headOfLine := w.s.getNumRequestsInProgress() == 0
log.VEventf(
ctx, 2, "serving %d requests, max:%v head:%v",
w.s.requestsToServe.lengthLocked(), maxNumRequestsToIssue, headOfLine,
)
var budgetIsExhausted bool
for !w.s.requestsToServe.emptyLocked() && maxNumRequestsToIssue > 0 && !budgetIsExhausted {
singleRangeReqs := w.s.requestsToServe.nextLocked()
Expand Down Expand Up @@ -1044,6 +1068,10 @@ func (w *workerCoordinator) issueRequestsForAsyncProcessing(
// We don't have enough budget available to serve this request,
// and there are other requests in progress, so we'll wait for
// some of them to finish.
log.VEventf(ctx, 2,
"pausing serving requests, remaining:%d, available:%d < min:%d",
w.s.requestsToServe.lengthLocked(), availableBudget, minAcceptableBudget,
)
return nil
}
budgetIsExhausted = true
Expand Down Expand Up @@ -1103,6 +1131,10 @@ func (w *workerCoordinator) issueRequestsForAsyncProcessing(
// truncated targetBytes above to not exceed availableBudget, and
// we're holding the budget's mutex. Thus, the error indicates that
// the root memory pool has been exhausted.
log.VEventf(ctx, 2,
"pausing serving requests, remaining:%d, root memory pool exhausted (head:%v): %v",
w.s.requestsToServe.lengthLocked(), headOfLine, err,
)
if !headOfLine {
// There are some requests in progress, so we'll let them
// finish / be released.
Expand Down Expand Up @@ -1255,6 +1287,7 @@ func (w *workerCoordinator) performRequestAsync(
// ReadWithinUncertaintyIntervalError and there is only a single
// Streamer in a single local flow, attempt to transparently
// refresh.
log.VEventf(ctx, 2, "dropping response: error from kv: %v", pErr.GoError())
w.s.results.setError(pErr.GoError())
return
}
Expand All @@ -1268,6 +1301,7 @@ func (w *workerCoordinator) performRequestAsync(
// doesn't allow mutability.
fp, err := calculateFootprint(req, br)
if err != nil {
log.VEventf(ctx, 2, "dropping response: error calculating footprint: %v", err)
w.s.results.setError(err)
return
}
Expand Down Expand Up @@ -1299,6 +1333,9 @@ func (w *workerCoordinator) performRequestAsync(
// but not enough for that large row).
toConsume := -overaccountedTotal
if err = w.s.budget.consume(ctx, toConsume, headOfLine /* allowDebt */); err != nil {
log.VEventf(ctx, 2,
"dropping response: root memory pool exhausted (head:%v): %v", headOfLine, err,
)
// TODO(yuzefovich): rather than dropping the response
// altogether, consider blocking to wait for the budget to
// open up, up to some limit.
Expand Down Expand Up @@ -1338,6 +1375,7 @@ func (w *workerCoordinator) performRequestAsync(
CreateTime: w.requestAdmissionHeader.CreateTime,
}
if _, err = w.responseAdmissionQ.Admit(ctx, responseAdmission); err != nil {
log.VEventf(ctx, 2, "dropping response: admission control: %v", err)
w.s.results.setError(err)
return
}
Expand Down Expand Up @@ -1472,6 +1510,7 @@ func processSingleRangeResponse(
processSingleRangeResults(ctx, s, req, br, fp)
if fp.hasIncomplete() {
resumeReq := buildResumeSingleRangeBatch(s, req, br, fp)
log.VEventf(ctx, 2, "adding resume batch %v", resumeReq)
s.requestsToServe.add(resumeReq)
}
}
Expand All @@ -1487,8 +1526,16 @@ func processSingleRangeResults(
br *kvpb.BatchResponse,
fp singleRangeBatchResponseFootprint,
) {
log.VEventf(ctx, 2,
"responses:%d {footprint:%v overhead:%v resumeMem:%v gets:%v scans:%v incpGets:%v "+
"incpScans:%v startedScans:%v}",
len(br.Responses), fp.memoryFootprintBytes, fp.responsesOverhead, fp.resumeReqsMemUsage,
fp.numGetResults, fp.numScanResults, fp.numIncompleteGets, fp.numIncompleteScans,
fp.numStartedScans,
)
// If there are no results, this function has nothing to do.
if !fp.hasResults() {
log.VEvent(ctx, 2, "no results")
return
}

Expand Down Expand Up @@ -1523,7 +1570,10 @@ func processSingleRangeResults(
// the Streamer's one.
s.results.Lock()
defer s.results.Unlock()
defer s.results.doneAddingLocked(ctx)
defer func() {
numResults, signalled := s.results.doneAddingLocked(ctx)
log.VEventf(ctx, 2, "done adding results, buffered:%d signalled:%v", numResults, signalled)
}()

// memoryTokensBytes accumulates all reservations that are made for all
// Results created below. The accounting for these reservations has already
Expand All @@ -1542,6 +1592,7 @@ func processSingleRangeResults(
get := response
if get.ResumeSpan != nil {
// This Get wasn't completed.
log.VEvent(ctx, 2, "incomplete Get")
continue
}
// This Get was completed.
Expand Down Expand Up @@ -1574,6 +1625,7 @@ func processSingleRangeResults(
// multiple ranges and the last range has no data in it - we
// want to be able to set scanComplete field on such an empty
// Result).
log.VEvent(ctx, 2, "incomplete Scan")
continue
}
result := Result{
Expand Down
42 changes: 42 additions & 0 deletions pkg/kv/kvpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ package kvpb
import (
"context"
"fmt"
"strings"
"unicode/utf8"

"github.com/cockroachdb/cockroach/pkg/col/coldata"
_ "github.com/cockroachdb/cockroach/pkg/kv/kvnemesis/kvnemesisutil" // see RequestHeader
Expand Down Expand Up @@ -730,6 +732,46 @@ func (sr *ReverseScanResponse) Verify(req Request) error {
return nil
}

// TruncatedRequestsString formats a slice of RequestUnions for printing,
// limited to maxBytes bytes.
func TruncatedRequestsString(reqs []RequestUnion, maxBytes int) string {
if maxBytes < len("<nil>") {
panic(errors.AssertionFailedf("maxBytes too low: %d", maxBytes))
}
if reqs == nil {
return "<nil>"
}
if len(reqs) == 0 {
return "[]"
}
var b strings.Builder
b.WriteRune('[')
b.WriteString(reqs[0].String())
for i := 1; i < len(reqs); i++ {
if b.Len() > maxBytes {
break
}
b.WriteRune(' ')
b.WriteString(reqs[i].String())
}
b.WriteRune(']')
str := b.String()
if len(str) > maxBytes {
str = str[:maxBytes-len("…]")]
// Check whether we truncated in the middle of a rune.
for len(str) > 1 {
if r, _ := utf8.DecodeLastRuneInString(str); r == utf8.RuneError {
// Shave off another byte and check again.
str = str[:len(str)-1]
} else {
break
}
}
return str + "…]"
}
return str
}

// Method implements the Request interface.
func (*GetRequest) Method() Method { return Get }

Expand Down
9 changes: 1 addition & 8 deletions pkg/kv/kvpb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,14 +389,7 @@ func (ba *BatchRequest) IsCompleteTransaction() bool {
}
}
// Unreachable.
var sb strings.Builder
for i, args := range ba.Requests {
if i > 0 {
sb.WriteString(",")
}
sb.WriteString(args.String())
}
panic(fmt.Sprintf("unreachable. Batch requests: %s", sb.String()))
panic(fmt.Sprintf("unreachable. Batch requests: %s", TruncatedRequestsString(ba.Requests, 1024)))
}

// hasFlag returns true iff one of the requests within the batch contains the
Expand Down

0 comments on commit 935db31

Please sign in to comment.