diff --git a/server/node_test.go b/server/node_test.go index b91ef65a0c05..dc24a76f94cb 100644 --- a/server/node_test.go +++ b/server/node_test.go @@ -110,7 +110,7 @@ func createAndStartTestNode(addr net.Addr, engines []engine.Engine, gossipBS net if err := node.start(addr, engines, roachpb.Attributes{}); err != nil { t.Fatal(err) } - if err := waitForInitialSplits(node.ctx.DB); err != nil { + if err := WaitForInitialSplits(node.ctx.DB); err != nil { t.Fatal(err) } return grpcServer, addr, node, stopper diff --git a/server/testserver.go b/server/testserver.go index 40c1678e2e79..e4c540daf0a7 100644 --- a/server/testserver.go +++ b/server/testserver.go @@ -276,10 +276,13 @@ func ExpectedInitialRangeCount() int { // splits at startup. If the expected range count is not reached within a // configured timeout, an error is returned. func (ts *TestServer) WaitForInitialSplits() error { - return waitForInitialSplits(ts.DB()) + return WaitForInitialSplits(ts.DB()) } -func waitForInitialSplits(db *client.DB) error { +// WaitForInitialSplits waits for the expected number of initial ranges to be +// populated in the meta2 table. If the expected range count is not reached +// within a configured timeout, an error is returned. +func WaitForInitialSplits(db *client.DB) error { expectedRanges := ExpectedInitialRangeCount() return util.RetryForDuration(initialSplitsTimeout, func() error { // Scan all keys in the Meta2Prefix; we only need a count. diff --git a/storage/helpers_test.go b/storage/helpers_test.go index ecc08698dfee..d186035400a8 100644 --- a/storage/helpers_test.go +++ b/storage/helpers_test.go @@ -110,6 +110,18 @@ func (s *Store) LogReplicaChangeTest(txn *client.Txn, changeType roachpb.Replica return s.logChange(txn, changeType, replica, desc) } +// ReplicateQueuePurgatoryLength returns the number of replicas in replicate +// queue purgatory. +func (s *Store) ReplicateQueuePurgatoryLength() int { + return s.replicateQueue.PurgatoryLength() +} + +// SetReplicaScannerDisabled turns replica scanning off or on as directed. Note +// that while disabled, removals are still processed. +func (s *Store) SetReplicaScannerDisabled(disabled bool) { + s.scanner.SetDisabled(disabled) +} + // GetLastIndex is the same function as LastIndex but it does not require // that the replica lock is held. func (r *Replica) GetLastIndex() (uint64, error) { diff --git a/storage/queue.go b/storage/queue.go index efbf6b97bf0f..f380573d7706 100644 --- a/storage/queue.go +++ b/storage/queue.go @@ -222,6 +222,13 @@ func (bq *baseQueue) Length() int { return bq.mu.priorityQ.Len() } +// PurgatoryLength returns the current size of purgatory. +func (bq *baseQueue) PurgatoryLength() int { + bq.mu.Lock() + defer bq.mu.Unlock() + return len(bq.mu.purgatory) +} + // SetDisabled turns queue processing off or on as directed. func (bq *baseQueue) SetDisabled(disabled bool) { if disabled { diff --git a/storage/replica_command.go b/storage/replica_command.go index d77e4f0f04df..8b7f677658ab 100644 --- a/storage/replica_command.go +++ b/storage/replica_command.go @@ -2307,6 +2307,17 @@ func (r *Replica) splitTrigger( // Update store stats with difference in stats before and after split. r.store.metrics.addMVCCStats(deltaMS) + // If the range was not properly replicated before the split, the replicate + // queue may not have picked it up (due to the need for a split). Enqueue + // both new halves to speed up a potentially necessary replication. See + // #7022. + if len(split.UpdatedDesc.Replicas) == 1 { + r.store.replicateQueue.MaybeAdd(r, r.store.Clock().Now()) + } + if len(split.NewDesc.Replicas) == 1 { + r.store.replicateQueue.MaybeAdd(newRng, r.store.Clock().Now()) + } + // To avoid leaving the new range unavailable as it waits to elect // its leader, one (and only one) of the nodes should start an // election as soon as the split is processed. diff --git a/storage/replicate_test.go b/storage/replicate_test.go new file mode 100644 index 000000000000..1c9adae4e8bc --- /dev/null +++ b/storage/replicate_test.go @@ -0,0 +1,47 @@ +// Copyright 2016 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. +// +// Author: Peter Mattis (peter@cockroachlabs.com) + +package storage_test + +import ( + "testing" + + "github.com/cockroachdb/cockroach/server" + "github.com/cockroachdb/cockroach/util/leaktest" +) + +func TestEagerReplication(t *testing.T) { + defer leaktest.AfterTest(t)() + store, stopper, _ := createTestStore(t) + defer stopper.Stop() + + // Disable the replica scanner so that we rely on the eager replication code + // path that occurs after splits. + store.SetReplicaScannerDisabled(true) + + if err := server.WaitForInitialSplits(store.DB()); err != nil { + t.Fatal(err) + } + + // After the initial splits have been performed, all of the resulting ranges + // should be present in replicate queue purgatory (because we only have a + // single store in the test and thus replication cannot succeed). + expected := server.ExpectedInitialRangeCount() + if n := store.ReplicateQueuePurgatoryLength(); expected != n { + t.Fatalf("expected %d replicas in purgatory, but found %d", + expected, n) + } +}