Skip to content

Commit

Permalink
rangecache: improve tracing of range descriptor lookups
Browse files Browse the repository at this point in the history
Release note: None
  • Loading branch information
erikgrinaker committed Sep 28, 2021
1 parent d62a975 commit 50a491d
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 31 deletions.
3 changes: 2 additions & 1 deletion pkg/kv/bulk/sst_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ func runTestImport(t *testing.T, batchSizeValue int64) {
// splits to exercise that codepath, but we also want to make sure we
// still handle an unexpected split, so we make our own range cache and
// only populate it with one of our two splits.
mockCache := rangecache.NewRangeCache(s.ClusterSettings(), nil, func() int64 { return 2 << 10 }, s.Stopper())
mockCache := rangecache.NewRangeCache(s.ClusterSettings(), nil,
func() int64 { return 2 << 10 }, s.Stopper(), s.Tracer().(*tracing.Tracer))
addr, err := keys.Addr(key(0))
if err != nil {
t.Fatal(err)
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,8 @@ func NewDistSender(cfg DistSenderConfig) *DistSender {
getRangeDescCacheSize := func() int64 {
return rangeDescriptorCacheSize.Get(&ds.st.SV)
}
ds.rangeCache = rangecache.NewRangeCache(ds.st, rdb, getRangeDescCacheSize, cfg.RPCContext.Stopper)
ds.rangeCache = rangecache.NewRangeCache(ds.st, rdb, getRangeDescCacheSize,
cfg.RPCContext.Stopper, cfg.AmbientCtx.Tracer)
if tf := cfg.TestingKnobs.TransportFactory; tf != nil {
ds.transportFactory = tf
} else {
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4391,7 +4391,7 @@ func TestSendToReplicasSkipsStaleReplicas(t *testing.T) {
getRangeDescCacheSize := func() int64 {
return 1 << 20
}
rc := rangecache.NewRangeCache(st, nil /* db */, getRangeDescCacheSize, stopper)
rc := rangecache.NewRangeCache(st, nil /* db */, getRangeDescCacheSize, stopper, st.Tracer)
rc.Insert(ctx, roachpb.RangeInfo{
Desc: desc,
Lease: roachpb.Lease{
Expand Down Expand Up @@ -4432,7 +4432,7 @@ func TestSendToReplicasSkipsStaleReplicas(t *testing.T) {
}

cfg := DistSenderConfig{
AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()},
AmbientCtx: log.AmbientContext{Tracer: st.Tracer},
Clock: clock,
NodeDescs: ns,
RPCContext: rpcContext,
Expand Down
19 changes: 11 additions & 8 deletions pkg/kv/kvclient/rangecache/range_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type RangeDescriptorDB interface {
type RangeCache struct {
st *cluster.Settings
stopper *stop.Stopper
tracer *tracing.Tracer
// RangeDescriptorDB is used to retrieve range descriptors from the
// database, which will be cached by this structure.
db RangeDescriptorDB
Expand Down Expand Up @@ -175,9 +176,13 @@ func makeLookupRequestKey(
// NewRangeCache returns a new RangeCache which uses the given RangeDescriptorDB
// as the underlying source of range descriptors.
func NewRangeCache(
st *cluster.Settings, db RangeDescriptorDB, size func() int64, stopper *stop.Stopper,
st *cluster.Settings,
db RangeDescriptorDB,
size func() int64,
stopper *stop.Stopper,
tracer *tracing.Tracer,
) *RangeCache {
rdc := &RangeCache{st: st, db: db, stopper: stopper}
rdc := &RangeCache{st: st, db: db, stopper: stopper, tracer: tracer}
rdc.rangeCache.cache = cache.NewOrderedCache(cache.Config{
Policy: cache.CacheLRU,
ShouldEvict: func(n int, _, _ interface{}) bool {
Expand Down Expand Up @@ -634,9 +639,7 @@ func (rc *RangeCache) tryLookup(
return returnToken, nil
}

if log.V(2) {
log.Infof(ctx, "lookup range descriptor: key=%s (reverse: %t)", key, useReverseScan)
}
log.VEventf(ctx, 2, "looking up range descriptor: key=%s", key)

var prevDesc *roachpb.RangeDescriptor
if evictToken.Valid() {
Expand All @@ -646,7 +649,7 @@ func (rc *RangeCache) tryLookup(
resC, leader := rc.lookupRequests.DoChan(requestKey, func() (interface{}, error) {
var lookupRes EvictionToken
if err := rc.stopper.RunTaskWithErr(ctx, "rangecache: range lookup", func(ctx context.Context) error {
ctx, reqSpan := tracing.ForkSpan(ctx, "range lookup")
ctx, reqSpan := tracing.EnsureChildSpan(ctx, rc.tracer, "range lookup")
defer reqSpan.Finish()
// Clear the context's cancelation. This request services potentially many
// callers waiting for its result, and using the flight's leader's
Expand Down Expand Up @@ -761,9 +764,9 @@ func (rc *RangeCache) tryLookup(
s = res.Val.(EvictionToken).String()
}
if res.Shared {
log.Eventf(ctx, "looked up range descriptor with shared request: %s", s)
log.VEventf(ctx, 2, "looked up range descriptor with shared request: %s", s)
} else {
log.Eventf(ctx, "looked up range descriptor: %s", s)
log.VEventf(ctx, 2, "looked up range descriptor: %s", s)
}
if res.Err != nil {
return EvictionToken{}, res.Err
Expand Down
18 changes: 9 additions & 9 deletions pkg/kv/kvclient/rangecache/range_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func initTestDescriptorDB(t *testing.T) *testDescriptorDB {
}
// TODO(andrei): don't leak this Stopper. Someone needs to Stop() it.
db.stopper = stop.NewStopper()
db.cache = NewRangeCache(st, db, staticSize(2<<10), db.stopper)
db.cache = NewRangeCache(st, db, staticSize(2<<10), db.stopper, st.Tracer)
return db
}

Expand Down Expand Up @@ -483,7 +483,7 @@ func TestLookupByKeyMin(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())
cache := NewRangeCache(st, nil, staticSize(2<<10), stopper)
cache := NewRangeCache(st, nil, staticSize(2<<10), stopper, st.Tracer)
startToMeta2Desc := roachpb.RangeDescriptor{
StartKey: roachpb.RKeyMin,
EndKey: keys.RangeMetaKey(roachpb.RKey("a")),
Expand Down Expand Up @@ -1011,7 +1011,7 @@ func TestRangeCacheClearOverlapping(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
cache := NewRangeCache(st, nil, staticSize(2<<10), stopper)
cache := NewRangeCache(st, nil, staticSize(2<<10), stopper, st.Tracer)
cache.rangeCache.cache.Add(rangeCacheKey(keys.RangeMetaKey(roachpb.RKeyMax)), &CacheEntry{desc: *defDesc})

// Now, add a new, overlapping set of descriptors.
Expand Down Expand Up @@ -1187,7 +1187,7 @@ func TestRangeCacheClearOlderOverlapping(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
cache := NewRangeCache(st, nil /* db */, staticSize(2<<10), stopper)
cache := NewRangeCache(st, nil /* db */, staticSize(2<<10), stopper, st.Tracer)
for _, d := range tc.cachedDescs {
cache.Insert(ctx, roachpb.RangeInfo{Desc: d})
}
Expand Down Expand Up @@ -1239,7 +1239,7 @@ func TestRangeCacheClearOverlappingMeta(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
cache := NewRangeCache(st, nil, staticSize(2<<10), stopper)
cache := NewRangeCache(st, nil, staticSize(2<<10), stopper, st.Tracer)
cache.Insert(ctx,
roachpb.RangeInfo{Desc: firstDesc},
roachpb.RangeInfo{Desc: restDesc})
Expand Down Expand Up @@ -1277,7 +1277,7 @@ func TestGetCachedRangeDescriptorInverted(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
cache := NewRangeCache(st, nil, staticSize(2<<10), stopper)
cache := NewRangeCache(st, nil, staticSize(2<<10), stopper, st.Tracer)
for _, rd := range testData {
cache.Insert(ctx, roachpb.RangeInfo{
Desc: rd,
Expand Down Expand Up @@ -1415,7 +1415,7 @@ func TestRangeCacheGeneration(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
cache := NewRangeCache(st, nil, staticSize(2<<10), stopper)
cache := NewRangeCache(st, nil, staticSize(2<<10), stopper, st.Tracer)
cache.Insert(ctx, roachpb.RangeInfo{Desc: *descAM2}, roachpb.RangeInfo{Desc: *descMZ4})
cache.Insert(ctx, roachpb.RangeInfo{Desc: *tc.insertDesc})

Expand Down Expand Up @@ -1481,7 +1481,7 @@ func TestRangeCacheEvictAndReplace(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
cache := NewRangeCache(st, nil, staticSize(2<<10), stopper)
cache := NewRangeCache(st, nil, staticSize(2<<10), stopper, st.Tracer)

ri := roachpb.RangeInfo{Desc: desc1}
cache.Insert(ctx, ri)
Expand Down Expand Up @@ -1592,7 +1592,7 @@ func TestRangeCacheUpdateLease(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
cache := NewRangeCache(st, nil, staticSize(2<<10), stopper)
cache := NewRangeCache(st, nil, staticSize(2<<10), stopper, st.Tracer)

cache.Insert(ctx, roachpb.RangeInfo{
Desc: desc1,
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/distsql_physical_planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func TestDistSQLReceiverUpdatesCaches(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
rangeCache := rangecache.NewRangeCache(st, nil /* db */, size, stopper)
rangeCache := rangecache.NewRangeCache(st, nil /* db */, size, stopper, st.Tracer)
r := MakeDistSQLReceiver(
ctx,
&errOnlyResultWriter{}, /* resultWriter */
Expand Down
24 changes: 15 additions & 9 deletions pkg/sql/rowexec/tablereader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,15 @@ func TestTableReader(t *testing.T) {
ts := c.spec
ts.Table = *td.TableDesc()

evalCtx := tree.MakeTestingEvalContext(s.ClusterSettings())
st := s.ClusterSettings()
evalCtx := tree.MakeTestingEvalContext(st)
defer evalCtx.Stop(ctx)
flowCtx := execinfra.FlowCtx{
EvalCtx: &evalCtx,
Cfg: &execinfra.ServerConfig{
Settings: s.ClusterSettings(),
RangeCache: rangecache.NewRangeCache(s.ClusterSettings(), nil, func() int64 { return 2 << 10 }, s.Stopper()),
Settings: st,
RangeCache: rangecache.NewRangeCache(s.ClusterSettings(), nil,
func() int64 { return 2 << 10 }, s.Stopper(), s.Tracer().(*tracing.Tracer)),
},
Txn: kv.NewTxn(ctx, s.DB(), s.NodeID()),
NodeID: evalCtx.NodeID,
Expand Down Expand Up @@ -372,13 +374,15 @@ func TestLimitScans(t *testing.T) {

tableDesc := catalogkv.TestingGetTableDescriptor(kvDB, keys.SystemSQLCodec, "test", "t")

evalCtx := tree.MakeTestingEvalContext(s.ClusterSettings())
st := s.ClusterSettings()
evalCtx := tree.MakeTestingEvalContext(st)
defer evalCtx.Stop(ctx)
flowCtx := execinfra.FlowCtx{
EvalCtx: &evalCtx,
Cfg: &execinfra.ServerConfig{
Settings: s.ClusterSettings(),
RangeCache: rangecache.NewRangeCache(s.ClusterSettings(), nil, func() int64 { return 2 << 10 }, s.Stopper()),
Settings: st,
RangeCache: rangecache.NewRangeCache(s.ClusterSettings(), nil,
func() int64 { return 2 << 10 }, s.Stopper(), s.Tracer().(*tracing.Tracer)),
},
Txn: kv.NewTxn(ctx, kvDB, s.NodeID()),
NodeID: evalCtx.NodeID,
Expand Down Expand Up @@ -476,7 +480,8 @@ func BenchmarkTableReader(b *testing.B) {
s, sqlDB, kvDB := serverutils.StartServer(b, base.TestServerArgs{})
defer s.Stopper().Stop(ctx)

evalCtx := tree.MakeTestingEvalContext(s.ClusterSettings())
st := s.ClusterSettings()
evalCtx := tree.MakeTestingEvalContext(st)
defer evalCtx.Stop(ctx)

const numCols = 2
Expand All @@ -492,8 +497,9 @@ func BenchmarkTableReader(b *testing.B) {
flowCtx := execinfra.FlowCtx{
EvalCtx: &evalCtx,
Cfg: &execinfra.ServerConfig{
Settings: s.ClusterSettings(),
RangeCache: rangecache.NewRangeCache(s.ClusterSettings(), nil, func() int64 { return 2 << 10 }, s.Stopper()),
Settings: st,
RangeCache: rangecache.NewRangeCache(s.ClusterSettings(), nil,
func() int64 { return 2 << 10 }, s.Stopper(), s.Tracer().(*tracing.Tracer)),
},
Txn: kv.NewTxn(ctx, s.DB(), s.NodeID()),
NodeID: evalCtx.NodeID,
Expand Down

0 comments on commit 50a491d

Please sign in to comment.