Skip to content

Commit

Permalink
kv/bulk: track send-wait by store
Browse files Browse the repository at this point in the history
Release note: none.
  • Loading branch information
dt committed Apr 22, 2022
1 parent 7ae6b88 commit f77e393
Show file tree
Hide file tree
Showing 9 changed files with 100 additions and 7 deletions.
7 changes: 7 additions & 0 deletions pkg/kv/bulk/buffering_adder.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func MakeBulkAdder(
disallowShadowingBelow: opts.DisallowShadowingBelow,
batchTS: opts.BatchTimestamp,
writeAtBatchTS: opts.WriteAtBatchTimestamp,
stats: ingestionPerformanceStats{sendWaitByStore: make(map[roachpb.StoreID]time.Duration)},
},
timestamp: timestamp,
maxBufferLimit: opts.MaxBufferSize,
Expand Down Expand Up @@ -132,6 +133,9 @@ func (b *BufferingAdder) Close(ctx context.Context) {
if log.V(1) {
if b.sink.stats.bufferFlushes > 0 {
b.sink.stats.LogTimings(ctx, b.name, "closing")
if log.V(3) {
b.sink.stats.LogPerStoreTimings(ctx, b.name)
}
b.sink.stats.LogFlushes(ctx, b.name, "closing", sz(b.memAcc.Used()))
} else {
log.Infof(ctx, "%s adder closing; ingested nothing", b.name)
Expand Down Expand Up @@ -294,6 +298,9 @@ func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error {

if log.V(2) {
b.sink.stats.LogTimings(ctx, b.name, "flushed")
if log.V(3) {
b.sink.stats.LogPerStoreTimings(ctx, b.name)
}
}

if log.V(3) {
Expand Down
16 changes: 14 additions & 2 deletions pkg/kv/bulk/sst_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ func MakeSSTBatcher(
disallowShadowingBelow: disallowShadowingBelow,
writeAtBatchTS: writeAtBatchTs,
disableSplits: !splitFilledRanges,
stats: ingestionPerformanceStats{sendWaitByStore: make(map[roachpb.StoreID]time.Duration)},
}
err := b.Reset(ctx)
return b, err
Expand All @@ -152,7 +153,7 @@ func MakeSSTBatcher(
func MakeStreamSSTBatcher(
ctx context.Context, db *kv.DB, settings *cluster.Settings,
) (*SSTBatcher, error) {
b := &SSTBatcher{db: db, settings: settings, ingestAll: true}
b := &SSTBatcher{db: db, settings: settings, ingestAll: true, stats: ingestionPerformanceStats{sendWaitByStore: make(map[roachpb.StoreID]time.Duration)}}
err := b.Reset(ctx)
return b, err
}
Expand Down Expand Up @@ -528,7 +529,7 @@ func (b *SSTBatcher) addSSTable(
}

ba := roachpb.BatchRequest{
Header: roachpb.Header{Timestamp: b.batchTS},
Header: roachpb.Header{Timestamp: b.batchTS, ClientRangeInfo: roachpb.ClientRangeInfo{ExplicitlyRequested: true}},
AdmissionHeader: roachpb.AdmissionHeader{
Priority: int32(admission.BulkNormalPri),
CreateTime: timeutil.Now().UnixNano(),
Expand All @@ -542,6 +543,17 @@ func (b *SSTBatcher) addSSTable(
sendTime := timeutil.Since(beforeSend)
b.stats.sendWait += sendTime

if br != nil && len(br.BatchResponse_Header.RangeInfos) > 0 {
// Should only ever really be one iteration but if somehow it isn't,
// e.g. if a request was redirected, go ahead and count it against all
// involved stores; if it is small this edge case is immaterial, and
// if it is large, it's probably one big one but we don't know which
// so just blame them all (averaging it out could hide one big delay).
for i := range br.BatchResponse_Header.RangeInfos {
b.stats.sendWaitByStore[br.BatchResponse_Header.RangeInfos[i].Lease.Replica.StoreID] += sendTime
}
}

if pErr == nil {
resp := br.Responses[0].GetInner().(*roachpb.AddSSTableResponse)
if b.writeAtBatchTS {
Expand Down
29 changes: 29 additions & 0 deletions pkg/kv/bulk/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ package bulk

import (
"context"
"fmt"
"sort"
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -42,6 +45,8 @@ type ingestionPerformanceStats struct {
scatterWait time.Duration // time spent scattering.
commitWait time.Duration // time spent waiting for commit timestamps.

sendWaitByStore map[roachpb.StoreID]time.Duration

// span tracks the total span into which this batcher has flushed. It is
// only maintained if log.V(1), so if vmodule is upped mid-ingest it may be
// incomplete.
Expand Down Expand Up @@ -86,6 +91,30 @@ func (s ingestionPerformanceStats) LogFlushes(
)
}

func (s ingestionPerformanceStats) LogPerStoreTimings(ctx context.Context, name string) {
if len(s.sendWaitByStore) == 0 {
return
}
ids := make(roachpb.StoreIDSlice, 0, len(s.sendWaitByStore))
for i := range s.sendWaitByStore {
ids = append(ids, i)
}
sort.Sort(ids)

var sb strings.Builder
for i, id := range ids {
// Hack: fill the map with placeholder stores if we haven't seen the store
// with ID below K for all but lowest K, so that next time we print a zero.
if i > 0 && ids[i-1] != id-1 {
s.sendWaitByStore[id-1] = 0
fmt.Fprintf(&sb, "%d: %s;", id-1, timing(0))
}
fmt.Fprintf(&sb, "%d: %s;", id, timing(s.sendWaitByStore[id]))

}
log.Infof(ctx, "%s waited on sending to: %s", name, redact.Safe(sb.String()))
}

type sz int64

func (b sz) String() string { return string(humanizeutil.IBytes(int64(b))) }
Expand Down
10 changes: 7 additions & 3 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2056,6 +2056,8 @@ func (ds *DistSender) sendToReplicas(
// doesn't have info. Like above, this asks the server to return an
// update.
ClosedTimestampPolicy: routing.ClosedTimestampPolicy(),

ExplicitlyRequested: ba.ClientRangeInfo.ExplicitlyRequested,
}
br, err = transport.SendNext(ctx, ba)
ds.maybeIncrementErrCounters(br, err)
Expand Down Expand Up @@ -2152,9 +2154,11 @@ func (ds *DistSender) sendToReplicas(
if len(br.RangeInfos) > 0 {
log.VEventf(ctx, 2, "received updated range info: %s", br.RangeInfos)
routing.EvictAndReplace(ctx, br.RangeInfos...)
// The field is cleared by the DistSender because it refers
// routing information not exposed by the KV API.
br.RangeInfos = nil
if !ba.Header.ClientRangeInfo.ExplicitlyRequested {
// The field is cleared by the DistSender because it refers
// routing information not exposed by the KV API.
br.RangeInfos = nil
}
}
return br, nil
}
Expand Down
33 changes: 33 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3454,6 +3454,39 @@ func TestRefreshNoFalsePositive(t *testing.T) {
require.NoError(t, txn.Commit(ctx))
}

func TestExplicitRangeInfo(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
s, db := startNoSplitMergeServer(t)
defer s.Stopper().Stop(ctx)

require.NoError(t, setupMultipleRanges(ctx, db, "a", "b", "c", "d", "e", "f", "g", "h"))
for _, key := range []string{"a1", "a2", "a3", "b1", "b2", "c1", "c2", "d1", "f1", "f2", "f3", "g1", "g2", "h1"} {
require.NoError(t, db.Put(ctx, key, "value"))
}

b := &kv.Batch{}
b.Header.ClientRangeInfo.ExplicitlyRequested = true

spans := [][]string{{"a", "c"}, {"c", "e"}, {"g", "h"}}
for _, span := range spans {
b.Scan(span[0], span[1])
}
require.NoError(t, db.Run(ctx, b))
require.Equal(t, 4, len(b.RawResponse().Responses)) // 3 scans + end req
require.Equal(t, 5, len(b.RawResponse().BatchResponse_Header.RangeInfos)) // ranges a, b, c, d, g

*b = kv.Batch{}
for _, span := range spans {
b.Scan(span[0], span[1])
}
require.NoError(t, db.Run(ctx, b))
require.Equal(t, 4, len(b.RawResponse().Responses))
require.Equal(t, 0, len(b.RawResponse().BatchResponse_Header.RangeInfos))

}

func BenchmarkReturnOnRangeBoundary(b *testing.B) {
const (
Ranges = 10 // number of ranges to create
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/replica_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,8 @@ func (r *Replica) maybeAddRangeInfoToResponse(
// than the replica in case this is a follower.
cri := &ba.ClientRangeInfo
ri := r.GetRangeInfo(ctx)
needInfo := (cri.DescriptorGeneration < ri.Desc.Generation) ||
needInfo := cri.ExplicitlyRequested ||
(cri.DescriptorGeneration < ri.Desc.Generation) ||
(cri.LeaseSequence < ri.Lease.Sequence) ||
(cri.ClosedTimestampPolicy != ri.ClosedTimestampPolicy)
if !needInfo {
Expand Down
1 change: 1 addition & 0 deletions pkg/roachpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,7 @@ func (h *BatchResponse_Header) combine(o BatchResponse_Header) error {
}
}
h.Now.Forward(o.Now)
h.RangeInfos = append(h.RangeInfos, o.RangeInfos...)
h.CollectedSpans = append(h.CollectedSpans, o.CollectedSpans...)
return nil
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/roachpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2411,7 +2411,10 @@ message Header {
// client_range_info represents the kvclient's knowledge about the state of
// the range (i.e. of the range descriptor and lease). The kvserver checks
// whether the client's info is up to date and, if it isn't, it will return a
// RangeInfo with up-to-date information.
// RangeInfo with up-to-date information. Typically this entire field is set
// by the client's DistSender, however it will preserve the value of the field
// `ExplicitlyRequested` so that requests passed to DistSender can request
// `RangeInfos` if desired.
ClientRangeInfo client_range_info = 17 [(gogoproto.nullable) = false];
// gateway_node_id is the ID of the gateway node where the request originated.
// For requests from tenants, this is set to the NodeID of the KV node handling
Expand Down
3 changes: 3 additions & 0 deletions pkg/roachpb/data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,9 @@ message ClientRangeInfo {
int64 descriptor_generation = 1 [(gogoproto.casttype) = "RangeGeneration"];
int64 lease_sequence = 2 [(gogoproto.casttype) = "LeaseSequence"];
RangeClosedTimestampPolicy closed_timestamp_policy = 3;
// ExplicitlyRequested causes range info to be returned even if other fields
// are up-to-date.
bool explicitly_requested = 4;
}

// RangeInfo describes a range which executed a request. It contains the range
Expand Down

0 comments on commit f77e393

Please sign in to comment.