Skip to content

Commit

Permalink
Merge pull request #7396 from petermattis/pmattis/debug-eager-replica…
Browse files Browse the repository at this point in the history
…tion

storage: eagerly replicate after splits only if not replicated
  • Loading branch information
petermattis authored Jun 23, 2016
2 parents 646ce6c + fa6a986 commit 4839efe
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 3 deletions.
2 changes: 1 addition & 1 deletion server/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func createAndStartTestNode(addr net.Addr, engines []engine.Engine, gossipBS net
if err := node.start(addr, engines, roachpb.Attributes{}); err != nil {
t.Fatal(err)
}
if err := waitForInitialSplits(node.ctx.DB); err != nil {
if err := WaitForInitialSplits(node.ctx.DB); err != nil {
t.Fatal(err)
}
return grpcServer, addr, node, stopper
Expand Down
7 changes: 5 additions & 2 deletions server/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,10 +276,13 @@ func ExpectedInitialRangeCount() int {
// splits at startup. If the expected range count is not reached within a
// configured timeout, an error is returned.
func (ts *TestServer) WaitForInitialSplits() error {
return waitForInitialSplits(ts.DB())
return WaitForInitialSplits(ts.DB())
}

func waitForInitialSplits(db *client.DB) error {
// WaitForInitialSplits waits for the expected number of initial ranges to be
// populated in the meta2 table. If the expected range count is not reached
// within a configured timeout, an error is returned.
func WaitForInitialSplits(db *client.DB) error {
expectedRanges := ExpectedInitialRangeCount()
return util.RetryForDuration(initialSplitsTimeout, func() error {
// Scan all keys in the Meta2Prefix; we only need a count.
Expand Down
12 changes: 12 additions & 0 deletions storage/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,18 @@ func (s *Store) LogReplicaChangeTest(txn *client.Txn, changeType roachpb.Replica
return s.logChange(txn, changeType, replica, desc)
}

// ReplicateQueuePurgatoryLength returns the number of replicas in replicate
// queue purgatory.
func (s *Store) ReplicateQueuePurgatoryLength() int {
return s.replicateQueue.PurgatoryLength()
}

// SetReplicaScannerDisabled turns replica scanning off or on as directed. Note
// that while disabled, removals are still processed.
func (s *Store) SetReplicaScannerDisabled(disabled bool) {
s.scanner.SetDisabled(disabled)
}

// GetLastIndex is the same function as LastIndex but it does not require
// that the replica lock is held.
func (r *Replica) GetLastIndex() (uint64, error) {
Expand Down
7 changes: 7 additions & 0 deletions storage/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,13 @@ func (bq *baseQueue) Length() int {
return bq.mu.priorityQ.Len()
}

// PurgatoryLength returns the current size of purgatory.
func (bq *baseQueue) PurgatoryLength() int {
bq.mu.Lock()
defer bq.mu.Unlock()
return len(bq.mu.purgatory)
}

// SetDisabled turns queue processing off or on as directed.
func (bq *baseQueue) SetDisabled(disabled bool) {
if disabled {
Expand Down
11 changes: 11 additions & 0 deletions storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -2307,6 +2307,17 @@ func (r *Replica) splitTrigger(
// Update store stats with difference in stats before and after split.
r.store.metrics.addMVCCStats(deltaMS)

// If the range was not properly replicated before the split, the replicate
// queue may not have picked it up (due to the need for a split). Enqueue
// both new halves to speed up a potentially necessary replication. See
// #7022.
if len(split.UpdatedDesc.Replicas) == 1 {
r.store.replicateQueue.MaybeAdd(r, r.store.Clock().Now())
}
if len(split.NewDesc.Replicas) == 1 {
r.store.replicateQueue.MaybeAdd(newRng, r.store.Clock().Now())
}

// To avoid leaving the new range unavailable as it waits to elect
// its leader, one (and only one) of the nodes should start an
// election as soon as the split is processed.
Expand Down
47 changes: 47 additions & 0 deletions storage/replicate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2016 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
//
// Author: Peter Mattis ([email protected])

package storage_test

import (
"testing"

"github.com/cockroachdb/cockroach/server"
"github.com/cockroachdb/cockroach/util/leaktest"
)

func TestEagerReplication(t *testing.T) {
defer leaktest.AfterTest(t)()
store, stopper, _ := createTestStore(t)
defer stopper.Stop()

// Disable the replica scanner so that we rely on the eager replication code
// path that occurs after splits.
store.SetReplicaScannerDisabled(true)

if err := server.WaitForInitialSplits(store.DB()); err != nil {
t.Fatal(err)
}

// After the initial splits have been performed, all of the resulting ranges
// should be present in replicate queue purgatory (because we only have a
// single store in the test and thus replication cannot succeed).
expected := server.ExpectedInitialRangeCount()
if n := store.ReplicateQueuePurgatoryLength(); expected != n {
t.Fatalf("expected %d replicas in purgatory, but found %d",
expected, n)
}
}

0 comments on commit 4839efe

Please sign in to comment.