Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
70741: rangecache: improve tracing of range descriptor lookups r=andreimatei a=erikgrinaker

Release note: None

---

It would be nifty if we could attach the actual lookup trace (i.e. the forked span for the RPC) to the waiters' spans once the lookup completes, such that it is visible for all waiters, but I'm not sure if our tracing supports that.

Co-authored-by: Erik Grinaker <[email protected]>
  • Loading branch information
craig[bot] and erikgrinaker committed Sep 28, 2021
2 parents 55e2bcd + 50a491d commit 049aff1
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 31 deletions.
3 changes: 2 additions & 1 deletion pkg/kv/bulk/sst_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ func runTestImport(t *testing.T, batchSizeValue int64) {
// splits to exercise that codepath, but we also want to make sure we
// still handle an unexpected split, so we make our own range cache and
// only populate it with one of our two splits.
mockCache := rangecache.NewRangeCache(s.ClusterSettings(), nil, func() int64 { return 2 << 10 }, s.Stopper())
mockCache := rangecache.NewRangeCache(s.ClusterSettings(), nil,
func() int64 { return 2 << 10 }, s.Stopper(), s.Tracer().(*tracing.Tracer))
addr, err := keys.Addr(key(0))
if err != nil {
t.Fatal(err)
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,8 @@ func NewDistSender(cfg DistSenderConfig) *DistSender {
getRangeDescCacheSize := func() int64 {
return rangeDescriptorCacheSize.Get(&ds.st.SV)
}
ds.rangeCache = rangecache.NewRangeCache(ds.st, rdb, getRangeDescCacheSize, cfg.RPCContext.Stopper)
ds.rangeCache = rangecache.NewRangeCache(ds.st, rdb, getRangeDescCacheSize,
cfg.RPCContext.Stopper, cfg.AmbientCtx.Tracer)
if tf := cfg.TestingKnobs.TransportFactory; tf != nil {
ds.transportFactory = tf
} else {
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4391,7 +4391,7 @@ func TestSendToReplicasSkipsStaleReplicas(t *testing.T) {
getRangeDescCacheSize := func() int64 {
return 1 << 20
}
rc := rangecache.NewRangeCache(st, nil /* db */, getRangeDescCacheSize, stopper)
rc := rangecache.NewRangeCache(st, nil /* db */, getRangeDescCacheSize, stopper, st.Tracer)
rc.Insert(ctx, roachpb.RangeInfo{
Desc: desc,
Lease: roachpb.Lease{
Expand Down Expand Up @@ -4432,7 +4432,7 @@ func TestSendToReplicasSkipsStaleReplicas(t *testing.T) {
}

cfg := DistSenderConfig{
AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()},
AmbientCtx: log.AmbientContext{Tracer: st.Tracer},
Clock: clock,
NodeDescs: ns,
RPCContext: rpcContext,
Expand Down
19 changes: 11 additions & 8 deletions pkg/kv/kvclient/rangecache/range_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type RangeDescriptorDB interface {
type RangeCache struct {
st *cluster.Settings
stopper *stop.Stopper
tracer *tracing.Tracer
// RangeDescriptorDB is used to retrieve range descriptors from the
// database, which will be cached by this structure.
db RangeDescriptorDB
Expand Down Expand Up @@ -175,9 +176,13 @@ func makeLookupRequestKey(
// NewRangeCache returns a new RangeCache which uses the given RangeDescriptorDB
// as the underlying source of range descriptors.
func NewRangeCache(
st *cluster.Settings, db RangeDescriptorDB, size func() int64, stopper *stop.Stopper,
st *cluster.Settings,
db RangeDescriptorDB,
size func() int64,
stopper *stop.Stopper,
tracer *tracing.Tracer,
) *RangeCache {
rdc := &RangeCache{st: st, db: db, stopper: stopper}
rdc := &RangeCache{st: st, db: db, stopper: stopper, tracer: tracer}
rdc.rangeCache.cache = cache.NewOrderedCache(cache.Config{
Policy: cache.CacheLRU,
ShouldEvict: func(n int, _, _ interface{}) bool {
Expand Down Expand Up @@ -634,9 +639,7 @@ func (rc *RangeCache) tryLookup(
return returnToken, nil
}

if log.V(2) {
log.Infof(ctx, "lookup range descriptor: key=%s (reverse: %t)", key, useReverseScan)
}
log.VEventf(ctx, 2, "looking up range descriptor: key=%s", key)

var prevDesc *roachpb.RangeDescriptor
if evictToken.Valid() {
Expand All @@ -646,7 +649,7 @@ func (rc *RangeCache) tryLookup(
resC, leader := rc.lookupRequests.DoChan(requestKey, func() (interface{}, error) {
var lookupRes EvictionToken
if err := rc.stopper.RunTaskWithErr(ctx, "rangecache: range lookup", func(ctx context.Context) error {
ctx, reqSpan := tracing.ForkSpan(ctx, "range lookup")
ctx, reqSpan := tracing.EnsureChildSpan(ctx, rc.tracer, "range lookup")
defer reqSpan.Finish()
// Clear the context's cancelation. This request services potentially many
// callers waiting for its result, and using the flight's leader's
Expand Down Expand Up @@ -761,9 +764,9 @@ func (rc *RangeCache) tryLookup(
s = res.Val.(EvictionToken).String()
}
if res.Shared {
log.Eventf(ctx, "looked up range descriptor with shared request: %s", s)
log.VEventf(ctx, 2, "looked up range descriptor with shared request: %s", s)
} else {
log.Eventf(ctx, "looked up range descriptor: %s", s)
log.VEventf(ctx, 2, "looked up range descriptor: %s", s)
}
if res.Err != nil {
return EvictionToken{}, res.Err
Expand Down
18 changes: 9 additions & 9 deletions pkg/kv/kvclient/rangecache/range_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func initTestDescriptorDB(t *testing.T) *testDescriptorDB {
}
// TODO(andrei): don't leak this Stopper. Someone needs to Stop() it.
db.stopper = stop.NewStopper()
db.cache = NewRangeCache(st, db, staticSize(2<<10), db.stopper)
db.cache = NewRangeCache(st, db, staticSize(2<<10), db.stopper, st.Tracer)
return db
}

Expand Down Expand Up @@ -483,7 +483,7 @@ func TestLookupByKeyMin(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())
cache := NewRangeCache(st, nil, staticSize(2<<10), stopper)
cache := NewRangeCache(st, nil, staticSize(2<<10), stopper, st.Tracer)
startToMeta2Desc := roachpb.RangeDescriptor{
StartKey: roachpb.RKeyMin,
EndKey: keys.RangeMetaKey(roachpb.RKey("a")),
Expand Down Expand Up @@ -1011,7 +1011,7 @@ func TestRangeCacheClearOverlapping(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
cache := NewRangeCache(st, nil, staticSize(2<<10), stopper)
cache := NewRangeCache(st, nil, staticSize(2<<10), stopper, st.Tracer)
cache.rangeCache.cache.Add(rangeCacheKey(keys.RangeMetaKey(roachpb.RKeyMax)), &CacheEntry{desc: *defDesc})

// Now, add a new, overlapping set of descriptors.
Expand Down Expand Up @@ -1187,7 +1187,7 @@ func TestRangeCacheClearOlderOverlapping(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
cache := NewRangeCache(st, nil /* db */, staticSize(2<<10), stopper)
cache := NewRangeCache(st, nil /* db */, staticSize(2<<10), stopper, st.Tracer)
for _, d := range tc.cachedDescs {
cache.Insert(ctx, roachpb.RangeInfo{Desc: d})
}
Expand Down Expand Up @@ -1239,7 +1239,7 @@ func TestRangeCacheClearOverlappingMeta(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
cache := NewRangeCache(st, nil, staticSize(2<<10), stopper)
cache := NewRangeCache(st, nil, staticSize(2<<10), stopper, st.Tracer)
cache.Insert(ctx,
roachpb.RangeInfo{Desc: firstDesc},
roachpb.RangeInfo{Desc: restDesc})
Expand Down Expand Up @@ -1277,7 +1277,7 @@ func TestGetCachedRangeDescriptorInverted(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
cache := NewRangeCache(st, nil, staticSize(2<<10), stopper)
cache := NewRangeCache(st, nil, staticSize(2<<10), stopper, st.Tracer)
for _, rd := range testData {
cache.Insert(ctx, roachpb.RangeInfo{
Desc: rd,
Expand Down Expand Up @@ -1415,7 +1415,7 @@ func TestRangeCacheGeneration(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
cache := NewRangeCache(st, nil, staticSize(2<<10), stopper)
cache := NewRangeCache(st, nil, staticSize(2<<10), stopper, st.Tracer)
cache.Insert(ctx, roachpb.RangeInfo{Desc: *descAM2}, roachpb.RangeInfo{Desc: *descMZ4})
cache.Insert(ctx, roachpb.RangeInfo{Desc: *tc.insertDesc})

Expand Down Expand Up @@ -1481,7 +1481,7 @@ func TestRangeCacheEvictAndReplace(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
cache := NewRangeCache(st, nil, staticSize(2<<10), stopper)
cache := NewRangeCache(st, nil, staticSize(2<<10), stopper, st.Tracer)

ri := roachpb.RangeInfo{Desc: desc1}
cache.Insert(ctx, ri)
Expand Down Expand Up @@ -1592,7 +1592,7 @@ func TestRangeCacheUpdateLease(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
cache := NewRangeCache(st, nil, staticSize(2<<10), stopper)
cache := NewRangeCache(st, nil, staticSize(2<<10), stopper, st.Tracer)

cache.Insert(ctx, roachpb.RangeInfo{
Desc: desc1,
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/distsql_physical_planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func TestDistSQLReceiverUpdatesCaches(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
rangeCache := rangecache.NewRangeCache(st, nil /* db */, size, stopper)
rangeCache := rangecache.NewRangeCache(st, nil /* db */, size, stopper, st.Tracer)
r := MakeDistSQLReceiver(
ctx,
&errOnlyResultWriter{}, /* resultWriter */
Expand Down
24 changes: 15 additions & 9 deletions pkg/sql/rowexec/tablereader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,15 @@ func TestTableReader(t *testing.T) {
ts := c.spec
ts.Table = *td.TableDesc()

evalCtx := tree.MakeTestingEvalContext(s.ClusterSettings())
st := s.ClusterSettings()
evalCtx := tree.MakeTestingEvalContext(st)
defer evalCtx.Stop(ctx)
flowCtx := execinfra.FlowCtx{
EvalCtx: &evalCtx,
Cfg: &execinfra.ServerConfig{
Settings: s.ClusterSettings(),
RangeCache: rangecache.NewRangeCache(s.ClusterSettings(), nil, func() int64 { return 2 << 10 }, s.Stopper()),
Settings: st,
RangeCache: rangecache.NewRangeCache(s.ClusterSettings(), nil,
func() int64 { return 2 << 10 }, s.Stopper(), s.Tracer().(*tracing.Tracer)),
},
Txn: kv.NewTxn(ctx, s.DB(), s.NodeID()),
NodeID: evalCtx.NodeID,
Expand Down Expand Up @@ -372,13 +374,15 @@ func TestLimitScans(t *testing.T) {

tableDesc := catalogkv.TestingGetTableDescriptor(kvDB, keys.SystemSQLCodec, "test", "t")

evalCtx := tree.MakeTestingEvalContext(s.ClusterSettings())
st := s.ClusterSettings()
evalCtx := tree.MakeTestingEvalContext(st)
defer evalCtx.Stop(ctx)
flowCtx := execinfra.FlowCtx{
EvalCtx: &evalCtx,
Cfg: &execinfra.ServerConfig{
Settings: s.ClusterSettings(),
RangeCache: rangecache.NewRangeCache(s.ClusterSettings(), nil, func() int64 { return 2 << 10 }, s.Stopper()),
Settings: st,
RangeCache: rangecache.NewRangeCache(s.ClusterSettings(), nil,
func() int64 { return 2 << 10 }, s.Stopper(), s.Tracer().(*tracing.Tracer)),
},
Txn: kv.NewTxn(ctx, kvDB, s.NodeID()),
NodeID: evalCtx.NodeID,
Expand Down Expand Up @@ -476,7 +480,8 @@ func BenchmarkTableReader(b *testing.B) {
s, sqlDB, kvDB := serverutils.StartServer(b, base.TestServerArgs{})
defer s.Stopper().Stop(ctx)

evalCtx := tree.MakeTestingEvalContext(s.ClusterSettings())
st := s.ClusterSettings()
evalCtx := tree.MakeTestingEvalContext(st)
defer evalCtx.Stop(ctx)

const numCols = 2
Expand All @@ -492,8 +497,9 @@ func BenchmarkTableReader(b *testing.B) {
flowCtx := execinfra.FlowCtx{
EvalCtx: &evalCtx,
Cfg: &execinfra.ServerConfig{
Settings: s.ClusterSettings(),
RangeCache: rangecache.NewRangeCache(s.ClusterSettings(), nil, func() int64 { return 2 << 10 }, s.Stopper()),
Settings: st,
RangeCache: rangecache.NewRangeCache(s.ClusterSettings(), nil,
func() int64 { return 2 << 10 }, s.Stopper(), s.Tracer().(*tracing.Tracer)),
},
Txn: kv.NewTxn(ctx, s.DB(), s.NodeID()),
NodeID: evalCtx.NodeID,
Expand Down

0 comments on commit 049aff1

Please sign in to comment.