Skip to content

Commit

Permalink
Squashed commit of the following:
Browse files Browse the repository at this point in the history
    storage: lazily load storage.Replica objects

    More than several tens of thousands of `Replica` objects can take
    substantial time at server startup to initialize. This change introduces
    a `ReplicaShim` object which contains a pointer to the `Replica` which is
    non-nil if resident. Otherwise, a set of other values including a ref to
    the appropriate zone config, `MVCCStats` and a handful of other properties
    necessary to accurately compute stats even when a replica is not-resident.

    Further improve performance on initializing `ReplicaShim` objects at store
    startup by parallelizing reading the MVCC stats and the range descriptors.

    Improve replica shim load performance by loading all MVCCStats values
    during a simple scan over all local range ID keys and adding stats
    values to a map. The stats map is consulted when loading the range
    descriptors to initialize each `ReplicaShim`.

    Release note (performance improvement): faster startup whena node contains
    many replicas.

Release note: None
  • Loading branch information
spencerkimball committed Dec 12, 2018
1 parent b64daa8 commit b60f881
Show file tree
Hide file tree
Showing 17 changed files with 863 additions and 323 deletions.
2 changes: 1 addition & 1 deletion pkg/storage/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4965,7 +4965,7 @@ func makeDescriptor(storeList []roachpb.StoreID) roachpb.RangeDescriptor {
func TestAllocatorComputeActionNoStorePool(t *testing.T) {
defer leaktest.AfterTest(t)()

a := MakeAllocator(nil /* storePool */, nil /* rpcContext */)
a := MakeAllocator(nil /* storePool */, nil /* nodeLatencyFn */)
action, priority := a.ComputeAction(context.Background(), &config.ZoneConfig{NumReplicas: proto.Int32(0)}, RangeInfo{})
if action != AllocatorNoop {
t.Errorf("expected AllocatorNoop, but got %v", action)
Expand Down
8 changes: 4 additions & 4 deletions pkg/storage/client_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,17 +274,17 @@ func TestStoreMetrics(t *testing.T) {
replica := mtc.stores[0].LookupReplica(roachpb.RKey("z"))
mtc.replicateRange(replica.RangeID, 1, 2)

// Verify stats on store1 after replication.
verifyStats(t, mtc, 1)
// Verify stats after replication.
verifyStats(t, mtc, 0, 1, 2)

// Add some data to the "right" range.
dataKey := []byte("z")
if _, err := mtc.dbs[0].Inc(context.TODO(), dataKey, 5); err != nil {
t.Fatal(err)
}
mtc.waitForValues(roachpb.Key("z"), []int64{5, 5, 5})
mtc.waitForValues(roachpb.Key(dataKey), []int64{5, 5, 5})

// Verify all stats on stores after addition.
// Verify all stats after addition.
verifyStats(t, mtc, 0, 1, 2)

// Create a transaction statement that fails. Regression test for #4969.
Expand Down
7 changes: 4 additions & 3 deletions pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2809,11 +2809,12 @@ func TestStoreRangeMoveDecommissioning(t *testing.T) {
sc.TestingKnobs.DisableReplicaRebalancing = true
mtc := &multiTestContext{storeConfig: &sc}
defer mtc.Stop()
mtc.Start(t, 6)
zone := config.DefaultSystemZoneConfig()
mtc.Start(t, int(*zone.NumReplicas+1))
mtc.initGossipNetwork()

// Replicate the range to 2 more stores. Note that there are 4 stores in the
// cluster leaving an extra store available as a replication target once the
// Replicate the range to more stores. Note that there is an extra
// store in the cluster available as a replication target once the
// replica on the dead node is removed.
replica := mtc.stores[0].LookupReplica(roachpb.RKeyMin)
mtc.replicateRange(replica.RangeID, 1, 2)
Expand Down
12 changes: 11 additions & 1 deletion pkg/storage/client_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,20 @@ func TestComputeStatsForKeySpan(t *testing.T) {
expectedRanges int
expectedKeys int64
}{
// All ranges.
{"a", "i", 4, 6},
// Subset of ranges including first.
{"a", "c", 1, 3},
{"b", "e", 2, 5},
// Middle subset of ranges.
{"c", "g", 2, 2},
// Subset of ranges including last.
{"e", "i", 2, 1},
// Offset into starting range (a-c), which should be included.
{"b", "e", 2, 5},
// Offset into ending range (g-i), which should be included.
{"e", "h", 2, 1},
// Offset into starting and ending ranges.
{"b", "h", 4, 6},
} {
start, end := tcase.startKey, tcase.endKey
result, err := mtc.stores[0].ComputeStatsForKeySpan(
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/gc_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,10 @@ func makeGCQueueScore(
repl.mu.Lock()
ms := *repl.mu.state.Stats
gcThreshold := *repl.mu.state.GCThreshold
desc := repl.mu.state.Desc
zone := repl.mu.zone
repl.mu.Unlock()

desc, zone := repl.DescAndZone()

// Use desc.RangeID for fuzzing the final score, so that different ranges
// have slightly different priorities and even symmetrical workloads don't
// trigger GC at the same time.
Expand Down
20 changes: 13 additions & 7 deletions pkg/storage/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ import (
func (s *Store) AddReplica(repl *Replica) error {
s.mu.Lock()
defer s.mu.Unlock()
if err := s.addReplicaInternalLocked(repl); err != nil {
if err := s.addReplicaShimInternalLocked(&ReplicaShim{replica: repl}); err != nil {
return err
}
s.metrics.ReplicaCount.Inc(1)
Expand Down Expand Up @@ -248,23 +248,29 @@ func (s *Store) ReservationCount() int {
return len(s.snapshotApplySem)
}

// AssertInvariants verifies that the store's bookkeping is self-consistent. It
// AssertInvariants verifies that the store's bookkeeping is self-consistent. It
// is only valid to call this method when there is no in-flight traffic to the
// store (e.g., after the store is shut down).
func (s *Store) AssertInvariants() {
s.mu.RLock()
defer s.mu.RUnlock()
s.mu.replicas.Range(func(_ int64, p unsafe.Pointer) bool {
s.mu.replicaShims.Range(func(_ int64, p unsafe.Pointer) bool {
ctx := s.cfg.AmbientCtx.AnnotateCtx(context.Background())
repl := (*Replica)(p)
shim := (*ReplicaShim)(p)
shim.Lock()
repl := shim.replica
shim.Unlock()
if repl == nil {
return true // skip and keep iterating
}
// We would normally need to hold repl.raftMu. Otherwise we can observe an
// initialized replica that is not in s.replicasByKey, e.g., if we race with
// a goroutine that is currently initializing repl. The lock ordering makes
// acquiring repl.raftMu challenging; instead we require that this method is
// called only when there is no in-flight traffic to the store, at which
// point acquiring repl.raftMu is unnecessary.
if repl.IsInitialized() {
if ex := s.mu.replicasByKey.Get(repl); ex != repl {
if ex := s.mu.replicasByKey.Get(repl); ex.(*ReplicaShim).replica != repl {
log.Fatalf(ctx, "%v misplaced in replicasByKey; found %v instead", repl, ex)
}
} else if _, ok := s.mu.uninitReplicas[repl.RangeID]; !ok {
Expand Down Expand Up @@ -605,13 +611,13 @@ func WatchForDisappearingReplicas(t testing.TB, store *Store) {
default:
}

store.mu.replicas.Range(func(k int64, v unsafe.Pointer) bool {
store.mu.replicaShims.Range(func(k int64, v unsafe.Pointer) bool {
m[k] = struct{}{}
return true
})

for k := range m {
if _, ok := store.mu.replicas.Load(k); !ok {
if _, ok := store.mu.replicaShims.Load(k); !ok {
t.Fatalf("r%d disappeared from Store.mu.replicas map", k)
}
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/storage/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ var (
Measurement: "Replicas",
Unit: metric.Unit_COUNT,
}
metaResidentCount = metric.Metadata{
Name: "replicas.resident",
Help: "Number of resident replicas",
Measurement: "Replicas",
Unit: metric.Unit_COUNT,
}

// Range metrics.
metaRangeCount = metric.Metadata{
Expand Down Expand Up @@ -931,6 +937,7 @@ type StoreMetrics struct {
RaftLeaderNotLeaseHolderCount *metric.Gauge
LeaseHolderCount *metric.Gauge
QuiescentCount *metric.Gauge
ResidentCount *metric.Gauge

// Range metrics.
RangeCount *metric.Gauge
Expand Down Expand Up @@ -1131,6 +1138,7 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
RaftLeaderNotLeaseHolderCount: metric.NewGauge(metaRaftLeaderNotLeaseHolderCount),
LeaseHolderCount: metric.NewGauge(metaLeaseHolderCount),
QuiescentCount: metric.NewGauge(metaQuiescentCount),
ResidentCount: metric.NewGauge(metaResidentCount),

// Range metrics.
RangeCount: metric.NewGauge(metaRangeCount),
Expand Down
28 changes: 28 additions & 0 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,9 @@ type Replica struct {
// Is the range quiescent? Quiescent ranges are not Tick()'d and unquiesce
// whenever a Raft operation is performed.
quiescent bool
// Behind indicates the range is quiescent but still has
// replica(s) not fully caught up.
quiescentButBehind bool
// mergeComplete is non-nil if a merge is in-progress, in which case any
// requests should be held until the completion of the merge is signaled by
// the closing of the channel.
Expand Down Expand Up @@ -3929,6 +3932,7 @@ func (r *Replica) unquiesceWithOptionsLocked(campaignOnWake bool) {
log.Infof(ctx, "unquiescing %d", r.RangeID)
}
r.mu.quiescent = false
r.mu.quiescentButBehind = false // may not be true, but behind is only valid when quiescent
r.store.unquiescedReplicas.Lock()
r.store.unquiescedReplicas.m[r.RangeID] = struct{}{}
r.store.unquiescedReplicas.Unlock()
Expand Down Expand Up @@ -4755,6 +4759,7 @@ func (r *Replica) quiesceAndNotifyLocked(ctx context.Context, status *raft.Statu
commit := status.Commit
quiesce := true
if prog.Match < status.Commit {
r.mu.quiescentButBehind = true
commit = prog.Match
quiesce = false
}
Expand Down Expand Up @@ -6976,6 +6981,29 @@ func (r *Replica) Less(i btree.Item) bool {
return r.startKey().Less(i.(rangeKeyItem).startKey())
}

// ReplicaCapacity contains details on the replica storage.
type ReplicaCapacity struct {
Leaseholder bool
Stats enginepb.MVCCStats
QPS, WPS float64
}

// Capacity returns the current capacity info for the replica.
func (r *Replica) Capacity(now hlc.Timestamp) ReplicaCapacity {
var cap ReplicaCapacity
if r.OwnsValidLease(now) {
cap.Leaseholder = true
}
cap.Stats = r.GetMVCCStats()
if qps, dur := r.leaseholderStats.avgQPS(); dur >= MinStatsDuration {
cap.QPS = qps
}
if wps, dur := r.writeStats.avgQPS(); dur >= MinStatsDuration {
cap.WPS = wps
}
return cap
}

// ReplicaMetrics contains details on the current status of the replica.
type ReplicaMetrics struct {
Leader bool
Expand Down
5 changes: 3 additions & 2 deletions pkg/storage/replica_rankings.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package storage
import (
"container/heap"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)

Expand All @@ -26,8 +27,8 @@ const (
)

type replicaWithStats struct {
repl *Replica
qps float64
rangeID roachpb.RangeID
qps float64
// TODO(a-robinson): Include writes-per-second and logicalBytes of storage?
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/replica_rankings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ func TestReplicaRankings(t *testing.T) {

for i, replQPS := range tc.replicasByQPS {
acc.addReplica(replicaWithStats{
repl: &Replica{RangeID: roachpb.RangeID(i)},
qps: replQPS,
rangeID: roachpb.RangeID(i),
qps: replQPS,
})
}
rr.update(acc)
Expand Down
Loading

0 comments on commit b60f881

Please sign in to comment.