diff --git a/storage/gc_queue.go b/storage/gc_queue.go index 293b2376333d..a1d53940655e 100644 --- a/storage/gc_queue.go +++ b/storage/gc_queue.go @@ -22,10 +22,9 @@ import ( "sync" "time" - "golang.org/x/net/context" - "github.com/gogo/protobuf/proto" "github.com/pkg/errors" + "golang.org/x/net/context" "github.com/cockroachdb/cockroach/base" "github.com/cockroachdb/cockroach/config" @@ -286,10 +285,12 @@ func processAbortCache( // 6) scan the abort cache table for old entries // 7) push these transactions (again, recreating txn entries). // 8) send a GCRequest. -func (gcq *gcQueue) process(now hlc.Timestamp, repl *Replica, - sysCfg config.SystemConfig) error { - ctx := repl.context(context.TODO()) - +func (gcq *gcQueue) process( + ctx context.Context, + now hlc.Timestamp, + repl *Replica, + sysCfg config.SystemConfig, +) error { snap := repl.store.Engine().NewSnapshot() desc := repl.Desc() defer snap.Close() diff --git a/storage/gc_queue_test.go b/storage/gc_queue_test.go index 60a38c468fd0..d7355dd0b215 100644 --- a/storage/gc_queue_test.go +++ b/storage/gc_queue_test.go @@ -23,6 +23,8 @@ import ( "testing" "time" + "github.com/gogo/protobuf/proto" + "github.com/pkg/errors" "golang.org/x/net/context" "github.com/cockroachdb/cockroach/keys" @@ -35,8 +37,6 @@ import ( "github.com/cockroachdb/cockroach/util/leaktest" "github.com/cockroachdb/cockroach/util/log" "github.com/cockroachdb/cockroach/util/uuid" - "github.com/gogo/protobuf/proto" - "github.com/pkg/errors" ) // makeTS creates a new hybrid logical timestamp. @@ -273,7 +273,7 @@ func TestGCQueueProcess(t *testing.T) { // Process through a scan queue. gcQ := newGCQueue(tc.gossip) - if err := gcQ.process(tc.clock.Now(), tc.rng, cfg); err != nil { + if err := gcQ.process(context.Background(), tc.clock.Now(), tc.rng, cfg); err != nil { t.Fatal(err) } @@ -494,7 +494,7 @@ func TestGCQueueTransactionTable(t *testing.T) { t.Fatal("config not set") } - if err := gcQ.process(tc.clock.Now(), tc.rng, cfg); err != nil { + if err := gcQ.process(context.Background(), tc.clock.Now(), tc.rng, cfg); err != nil { t.Fatal(err) } @@ -592,7 +592,7 @@ func TestGCQueueIntentResolution(t *testing.T) { // Process through a scan queue. gcQ := newGCQueue(tc.gossip) - if err := gcQ.process(tc.clock.Now(), tc.rng, cfg); err != nil { + if err := gcQ.process(context.Background(), tc.clock.Now(), tc.rng, cfg); err != nil { t.Fatal(err) } diff --git a/storage/queue.go b/storage/queue.go index ce042104df0c..205a82d65cf1 100644 --- a/storage/queue.go +++ b/storage/queue.go @@ -23,6 +23,8 @@ import ( "sync/atomic" "time" + "github.com/opentracing/opentracing-go" + "github.com/pkg/errors" "golang.org/x/net/context" "golang.org/x/net/trace" @@ -33,8 +35,6 @@ import ( "github.com/cockroachdb/cockroach/util/log" "github.com/cockroachdb/cockroach/util/stop" "github.com/cockroachdb/cockroach/util/timeutil" - "github.com/opentracing/opentracing-go" - "github.com/pkg/errors" ) const ( @@ -112,8 +112,7 @@ type queueImpl interface { // process accepts current time, a replica, and the system config // and executes queue-specific work on it. - // TODO(nvanbenschoten) this should take a context.Context. - process(hlc.Timestamp, *Replica, config.SystemConfig) error + process(context.Context, hlc.Timestamp, *Replica, config.SystemConfig) error // timer returns a duration to wait between processing the next item // from the queue. @@ -204,7 +203,12 @@ type baseQueue struct { // maxSize doesn't prevent new replicas from being added, it just // limits the total size. Higher priority replicas can still be // added; their addition simply removes the lowest priority replica. -func makeBaseQueue(name string, impl queueImpl, gossip *gossip.Gossip, cfg queueConfig) baseQueue { +func makeBaseQueue( + name string, + impl queueImpl, + gossip *gossip.Gossip, + cfg queueConfig, +) baseQueue { bq := baseQueue{ name: name, impl: impl, @@ -429,13 +433,15 @@ func (bq *baseQueue) processReplica(repl *Replica, clock *hlc.Clock) error { return nil } + sp := repl.store.Tracer().StartSpan(bq.name) + ctx := opentracing.ContextWithSpan(repl.context(context.Background()), sp) + log.Trace(ctx, fmt.Sprintf("queue start for range %d", repl.RangeID)) + defer sp.Finish() + // If the queue requires a replica to have the range leader lease in // order to be processed, check whether this replica has leader lease // and renew or acquire if necessary. if bq.needsLeaderLease { - sp := repl.store.Tracer().StartSpan(bq.name) - ctx := opentracing.ContextWithSpan(repl.context(context.Background()), sp) - defer sp.Finish() // Create a "fake" get request in order to invoke redirectOnOrAcquireLease. if err := repl.redirectOnOrAcquireLeaderLease(ctx); err != nil { if _, harmless := err.GetDetail().(*roachpb.NotLeaderError); harmless { @@ -444,14 +450,16 @@ func (bq *baseQueue) processReplica(repl *Replica, clock *hlc.Clock) error { } return errors.Wrapf(err.GoError(), "%s: could not obtain lease", repl) } + log.Trace(ctx, "got range lease") } bq.eventLog.VInfof(log.V(3), "%s: processing", repl) start := timeutil.Now() - if err := bq.impl.process(clock.Now(), repl, cfg); err != nil { + if err := bq.impl.process(ctx, clock.Now(), repl, cfg); err != nil { return err } bq.eventLog.VInfof(log.V(2), "%s: done: %s", repl, timeutil.Since(start)) + log.Trace(ctx, "done") return nil } diff --git a/storage/queue_test.go b/storage/queue_test.go index e6568da25f8e..7cf2eda6dc1b 100644 --- a/storage/queue_test.go +++ b/storage/queue_test.go @@ -22,6 +22,9 @@ import ( "testing" "time" + "github.com/pkg/errors" + "golang.org/x/net/context" + "github.com/cockroachdb/cockroach/config" "github.com/cockroachdb/cockroach/gossip" "github.com/cockroachdb/cockroach/keys" @@ -31,7 +34,6 @@ import ( "github.com/cockroachdb/cockroach/util/hlc" "github.com/cockroachdb/cockroach/util/leaktest" "github.com/cockroachdb/cockroach/util/stop" - "github.com/pkg/errors" ) func gossipForTest(t *testing.T) (*gossip.Gossip, *stop.Stopper) { @@ -75,11 +77,20 @@ func (tq *testQueueImpl) shouldQueue(now hlc.Timestamp, r *Replica, _ config.Sys return tq.shouldQueueFn(now, r) } -func (tq *testQueueImpl) process(now hlc.Timestamp, r *Replica, _ config.SystemConfig) error { +func (tq *testQueueImpl) process( + _ context.Context, + now hlc.Timestamp, + r *Replica, + _ config.SystemConfig, +) error { atomic.AddInt32(&tq.processed, 1) return tq.err } +func (tq *testQueueImpl) getProcessed() int { + return int(atomic.LoadInt32(&tq.processed)) +} + func (tq *testQueueImpl) timer() time.Duration { if tq.blocker != nil { <-tq.blocker @@ -140,13 +151,10 @@ func TestBaseQueueAddUpdateAndRemove(t *testing.T) { defer stopper.Stop() r1 := &Replica{RangeID: 1} - if err := r1.setDesc(&roachpb.RangeDescriptor{RangeID: 1}); err != nil { - t.Fatal(err) - } + r1.setDescWithoutProcessUpdate(&roachpb.RangeDescriptor{RangeID: 1}) r2 := &Replica{RangeID: 2} - if err := r2.setDesc(&roachpb.RangeDescriptor{RangeID: 2}); err != nil { - t.Fatal(err) - } + r2.setDescWithoutProcessUpdate(&roachpb.RangeDescriptor{RangeID: 2}) + shouldAddMap := map[*Replica]bool{ r1: true, r2: true, @@ -229,9 +237,8 @@ func TestBaseQueueAdd(t *testing.T) { defer stopper.Stop() r := &Replica{RangeID: 1} - if err := r.setDesc(&roachpb.RangeDescriptor{RangeID: 1}); err != nil { - t.Fatal(err) - } + r.setDescWithoutProcessUpdate(&roachpb.RangeDescriptor{RangeID: 1}) + testQueue := &testQueueImpl{ shouldQueueFn: func(now hlc.Timestamp, r *Replica) (shouldQueue bool, priority float64) { return false, 0.0 @@ -254,17 +261,16 @@ func TestBaseQueueAdd(t *testing.T) { // processed according to the timer function. func TestBaseQueueProcess(t *testing.T) { defer leaktest.AfterTest(t)() - g, stopper := gossipForTest(t) - defer stopper.Stop() + tsc := TestStoreContext() + tc := testContext{} + tc.StartWithStoreContext(t, tsc) + defer tc.Stop() + + r1 := &Replica{RangeID: 1, store: tc.store} + r1.setDescWithoutProcessUpdate(&roachpb.RangeDescriptor{RangeID: 1}) + r2 := &Replica{RangeID: 2, store: tc.store} + r2.setDescWithoutProcessUpdate(&roachpb.RangeDescriptor{RangeID: 2}) - r1 := &Replica{RangeID: 1} - if err := r1.setDesc(&roachpb.RangeDescriptor{RangeID: 1}); err != nil { - t.Fatal(err) - } - r2 := &Replica{RangeID: 2} - if err := r2.setDesc(&roachpb.RangeDescriptor{RangeID: 2}); err != nil { - t.Fatal(err) - } testQueue := &testQueueImpl{ blocker: make(chan struct{}, 1), shouldQueueFn: func(now hlc.Timestamp, r *Replica) (shouldQueue bool, priority float64) { @@ -273,20 +279,18 @@ func TestBaseQueueProcess(t *testing.T) { return }, } - bq := makeBaseQueue("test", testQueue, g, queueConfig{maxSize: 2}) - mc := hlc.NewManualClock(0) - clock := hlc.NewClock(mc.UnixNano) - bq.Start(clock, stopper) + bq := makeBaseQueue("test", testQueue, tc.gossip, queueConfig{maxSize: 2}) + bq.Start(tc.clock, tc.stopper) bq.MaybeAdd(r1, hlc.ZeroTimestamp) bq.MaybeAdd(r2, hlc.ZeroTimestamp) - if pc := atomic.LoadInt32(&testQueue.processed); pc != 0 { + if pc := testQueue.getProcessed(); pc != 0 { t.Errorf("expected no processed ranges; got %d", pc) } testQueue.blocker <- struct{}{} util.SucceedsSoon(t, func() error { - if pc := atomic.LoadInt32(&testQueue.processed); pc != int32(1) { + if pc := testQueue.getProcessed(); pc != 1 { return errors.Errorf("expected %d processed replicas; got %d", 1, pc) } return nil @@ -294,7 +298,7 @@ func TestBaseQueueProcess(t *testing.T) { testQueue.blocker <- struct{}{} util.SucceedsSoon(t, func() error { - if pc := atomic.LoadInt32(&testQueue.processed); pc < int32(2) { + if pc := testQueue.getProcessed(); pc < 2 { return errors.Errorf("expected >= %d processed replicas; got %d", 2, pc) } return nil @@ -313,9 +317,7 @@ func TestBaseQueueAddRemove(t *testing.T) { defer stopper.Stop() r := &Replica{RangeID: 1} - if err := r.setDesc(&roachpb.RangeDescriptor{RangeID: 1}); err != nil { - t.Fatal(err) - } + r.setDescWithoutProcessUpdate(&roachpb.RangeDescriptor{RangeID: 1}) testQueue := &testQueueImpl{ blocker: make(chan struct{}, 1), shouldQueueFn: func(now hlc.Timestamp, r *Replica) (shouldQueue bool, priority float64) { @@ -340,7 +342,7 @@ func TestBaseQueueAddRemove(t *testing.T) { bq.incoming <- struct{}{} } - if pc := atomic.LoadInt32(&testQueue.processed); pc > 0 { + if pc := testQueue.getProcessed(); pc > 0 { t.Errorf("expected processed count of 0; got %d", pc) } } @@ -349,8 +351,10 @@ func TestBaseQueueAddRemove(t *testing.T) { // rejected when the queue has 'acceptsUnsplitRanges = false'. func TestAcceptsUnsplitRanges(t *testing.T) { defer leaktest.AfterTest(t)() - g, stopper := gossipForTest(t) - defer stopper.Stop() + tsc := TestStoreContext() + tc := testContext{} + tc.StartWithStoreContext(t, tsc) + defer tc.Stop() dataMaxAddr, err := keys.Addr(keys.SystemConfigTableDataMax) if err != nil { @@ -358,24 +362,20 @@ func TestAcceptsUnsplitRanges(t *testing.T) { } // This range can never be split due to zone configs boundaries. - neverSplits := &Replica{RangeID: 1} - if err := neverSplits.setDesc(&roachpb.RangeDescriptor{ + neverSplits := &Replica{RangeID: 1, store: tc.store} + neverSplits.setDescWithoutProcessUpdate(&roachpb.RangeDescriptor{ RangeID: 1, StartKey: roachpb.RKeyMin, EndKey: dataMaxAddr, - }); err != nil { - t.Fatal(err) - } + }) // This range will need to be split after user db/table entries are created. - willSplit := &Replica{RangeID: 2} - if err := willSplit.setDesc(&roachpb.RangeDescriptor{ + willSplit := &Replica{RangeID: 2, store: tc.store} + willSplit.setDescWithoutProcessUpdate(&roachpb.RangeDescriptor{ RangeID: 2, StartKey: dataMaxAddr, EndKey: roachpb.RKeyMax, - }); err != nil { - t.Fatal(err) - } + }) var queued int32 testQueue := &testQueueImpl{ @@ -386,13 +386,11 @@ func TestAcceptsUnsplitRanges(t *testing.T) { }, } - bq := makeBaseQueue("test", testQueue, g, queueConfig{maxSize: 2}) - mc := hlc.NewManualClock(0) - clock := hlc.NewClock(mc.UnixNano) - bq.Start(clock, stopper) + bq := makeBaseQueue("test", testQueue, tc.gossip, queueConfig{maxSize: 2}) + bq.Start(tc.clock, tc.stopper) // Check our config. - sysCfg, ok := g.GetSystemConfig() + sysCfg, ok := tc.gossip.GetSystemConfig() if !ok { t.Fatal("config not set") } @@ -411,7 +409,7 @@ func TestAcceptsUnsplitRanges(t *testing.T) { bq.MaybeAdd(willSplit, hlc.ZeroTimestamp) util.SucceedsSoon(t, func() error { - if pc := atomic.LoadInt32(&testQueue.processed); pc != int32(2) { + if pc := testQueue.getProcessed(); pc != 2 { return errors.Errorf("expected %d processed replicas; got %d", 2, pc) } return nil @@ -440,7 +438,7 @@ func TestAcceptsUnsplitRanges(t *testing.T) { bq.MaybeAdd(willSplit, hlc.ZeroTimestamp) util.SucceedsSoon(t, func() error { - if pc := atomic.LoadInt32(&testQueue.processed); pc != int32(3) { + if pc := testQueue.getProcessed(); pc != 3 { return errors.Errorf("expected %d processed replicas; got %d", 3, pc) } return nil @@ -465,8 +463,10 @@ func (*testError) purgatoryErrorMarker() { // the purgatory channel causes the replicas to be reprocessed. func TestBaseQueuePurgatory(t *testing.T) { defer leaktest.AfterTest(t)() - g, stopper := gossipForTest(t) - defer stopper.Stop() + tsc := TestStoreContext() + tc := testContext{} + tc.StartWithStoreContext(t, tsc) + defer tc.Stop() testQueue := &testQueueImpl{ duration: time.Nanosecond, @@ -478,35 +478,30 @@ func TestBaseQueuePurgatory(t *testing.T) { pChan: make(chan struct{}, 1), err: &testError{}, } + replicaCount := 10 - bq := makeBaseQueue("test", testQueue, g, queueConfig{maxSize: replicaCount}) - mc := hlc.NewManualClock(0) - clock := hlc.NewClock(mc.UnixNano) - bq.Start(clock, stopper) + bq := makeBaseQueue("test", testQueue, tc.gossip, queueConfig{maxSize: replicaCount}) + bq.Start(tc.clock, tc.stopper) for i := 1; i <= replicaCount; i++ { - r := &Replica{RangeID: roachpb.RangeID(i)} - if err := r.setDesc(&roachpb.RangeDescriptor{RangeID: roachpb.RangeID(i)}); err != nil { - t.Fatal(err) - } + r := &Replica{RangeID: roachpb.RangeID(i), store: tc.store} + r.setDescWithoutProcessUpdate(&roachpb.RangeDescriptor{RangeID: roachpb.RangeID(i)}) bq.MaybeAdd(r, hlc.ZeroTimestamp) } util.SucceedsSoon(t, func() error { - if pc := atomic.LoadInt32(&testQueue.processed); pc != int32(replicaCount) { + if pc := testQueue.getProcessed(); pc != replicaCount { return errors.Errorf("expected %d processed replicas; got %d", replicaCount, pc) } // We have to loop checking the following conditions because the increment // of testQueue.processed does not happen atomically with the replica being // placed in purgatory. - bq.mu.Lock() // Protect access to purgatory and priorityQ. - defer bq.mu.Unlock() // Verify that the size of the purgatory map is correct. - if l := len(bq.mu.purgatory); l != replicaCount { + if l := bq.PurgatoryLength(); l != replicaCount { return errors.Errorf("expected purgatory size of %d; got %d", replicaCount, l) } // ...and priorityQ should be empty. - if l := len(bq.mu.priorityQ); l != 0 { + if l := bq.Length(); l != 0 { return errors.Errorf("expected empty priorityQ; got %d", l) } return nil @@ -516,20 +511,18 @@ func TestBaseQueuePurgatory(t *testing.T) { testQueue.pChan <- struct{}{} util.SucceedsSoon(t, func() error { - if pc := atomic.LoadInt32(&testQueue.processed); pc != int32(replicaCount*2) { + if pc := testQueue.getProcessed(); pc != replicaCount*2 { return errors.Errorf("expected %d processed replicas; got %d", replicaCount*2, pc) } // We have to loop checking the following conditions because the increment // of testQueue.processed does not happen atomically with the replica being // placed in purgatory. - bq.mu.Lock() // Protect access to purgatory and priorityQ. - defer bq.mu.Unlock() // Verify the replicas are still in purgatory. - if l := len(bq.mu.purgatory); l != replicaCount { + if l := bq.PurgatoryLength(); l != replicaCount { return errors.Errorf("expected purgatory size of %d; got %d", replicaCount, l) } // ...and priorityQ should be empty. - if l := len(bq.mu.priorityQ); l != 0 { + if l := bq.Length(); l != 0 { return errors.Errorf("expected empty priorityQ; got %d", l) } return nil @@ -540,20 +533,18 @@ func TestBaseQueuePurgatory(t *testing.T) { testQueue.pChan <- struct{}{} util.SucceedsSoon(t, func() error { - if pc := atomic.LoadInt32(&testQueue.processed); pc != int32(replicaCount*3) { + if pc := testQueue.getProcessed(); pc != replicaCount*3 { return errors.Errorf("expected %d processed replicas; got %d", replicaCount*3, pc) } return nil }) - bq.mu.Lock() // Protect access to purgatory and priorityQ. // Verify the replicas are no longer in purgatory. - if l := len(bq.mu.purgatory); l != 0 { + if l := bq.PurgatoryLength(); l != 0 { t.Errorf("expected purgatory size of 0; got %d", l) } // ...and priorityQ should be empty. - if l := len(bq.mu.priorityQ); l != 0 { + if l := bq.Length(); l != 0 { t.Errorf("expected empty priorityQ; got %d", l) } - bq.mu.Unlock() } diff --git a/storage/raft_log_queue.go b/storage/raft_log_queue.go index 66ac04282794..8e53b4ec53ab 100644 --- a/storage/raft_log_queue.go +++ b/storage/raft_log_queue.go @@ -20,14 +20,16 @@ import ( "math" "time" + "github.com/coreos/etcd/raft" + "github.com/pkg/errors" + "golang.org/x/net/context" + "github.com/cockroachdb/cockroach/config" "github.com/cockroachdb/cockroach/gossip" "github.com/cockroachdb/cockroach/internal/client" "github.com/cockroachdb/cockroach/roachpb" "github.com/cockroachdb/cockroach/util/hlc" "github.com/cockroachdb/cockroach/util/log" - "github.com/coreos/etcd/raft" - "github.com/pkg/errors" ) const ( @@ -123,7 +125,12 @@ func (*raftLogQueue) shouldQueue( // process truncates the raft log of the range if the replica is the raft // leader and if the total number of the range's raft log's stale entries // exceeds RaftLogQueueStaleThreshold. -func (rlq *raftLogQueue) process(now hlc.Timestamp, r *Replica, _ config.SystemConfig) error { +func (rlq *raftLogQueue) process( + _ context.Context, + now hlc.Timestamp, + r *Replica, + _ config.SystemConfig, +) error { truncatableIndexes, oldestIndex, err := getTruncatableIndexes(r) if err != nil { return err diff --git a/storage/replica_consistency_queue.go b/storage/replica_consistency_queue.go index 74b819676805..a6e8be4fe7a2 100644 --- a/storage/replica_consistency_queue.go +++ b/storage/replica_consistency_queue.go @@ -19,6 +19,8 @@ package storage import ( "time" + "golang.org/x/net/context" + "github.com/cockroachdb/cockroach/config" "github.com/cockroachdb/cockroach/gossip" "github.com/cockroachdb/cockroach/roachpb" @@ -51,7 +53,12 @@ func (*replicaConsistencyQueue) shouldQueue(now hlc.Timestamp, rng *Replica, } // process() is called on every range for which this node is a leader. -func (q *replicaConsistencyQueue) process(_ hlc.Timestamp, rng *Replica, _ config.SystemConfig) error { +func (q *replicaConsistencyQueue) process( + _ context.Context, + _ hlc.Timestamp, + rng *Replica, + _ config.SystemConfig, +) error { req := roachpb.CheckConsistencyRequest{} _, pErr := rng.CheckConsistency(req, rng.Desc()) if pErr != nil { diff --git a/storage/replica_gc_queue.go b/storage/replica_gc_queue.go index 779d04758846..3736ff9acfce 100644 --- a/storage/replica_gc_queue.go +++ b/storage/replica_gc_queue.go @@ -19,6 +19,10 @@ package storage import ( "time" + "github.com/coreos/etcd/raft" + "github.com/pkg/errors" + "golang.org/x/net/context" + "github.com/cockroachdb/cockroach/config" "github.com/cockroachdb/cockroach/gossip" "github.com/cockroachdb/cockroach/internal/client" @@ -26,8 +30,6 @@ import ( "github.com/cockroachdb/cockroach/roachpb" "github.com/cockroachdb/cockroach/util/hlc" "github.com/cockroachdb/cockroach/util/log" - "github.com/coreos/etcd/raft" - "github.com/pkg/errors" ) const ( @@ -129,7 +131,12 @@ func replicaGCShouldQueueImpl( // process performs a consistent lookup on the range descriptor to see if we are // still a member of the range. -func (q *replicaGCQueue) process(now hlc.Timestamp, rng *Replica, _ config.SystemConfig) error { +func (q *replicaGCQueue) process( + ctx context.Context, + now hlc.Timestamp, + rng *Replica, + _ config.SystemConfig, +) error { // Note that the Replicas field of desc is probably out of date, so // we should only use `desc` for its static fields like RangeID and // StartKey (and avoid rng.GetReplica() for the same reason). @@ -162,6 +169,7 @@ func (q *replicaGCQueue) process(now hlc.Timestamp, rng *Replica, _ config.Syste if log.V(1) { log.Infof("destroying local data from range %d", desc.RangeID) } + log.Trace(ctx, "destroying local data") if err := rng.store.RemoveReplica(rng, replyDesc, true); err != nil { return err } @@ -173,6 +181,7 @@ func (q *replicaGCQueue) process(now hlc.Timestamp, rng *Replica, _ config.Syste if log.V(1) { log.Infof("removing merged range %d", desc.RangeID) } + log.Trace(ctx, "removing merged range") if err := rng.store.RemoveReplica(rng, replyDesc, false); err != nil { return err } diff --git a/storage/replicate_queue.go b/storage/replicate_queue.go index 6437fb76ecfa..8e740de8943d 100644 --- a/storage/replicate_queue.go +++ b/storage/replicate_queue.go @@ -17,14 +17,17 @@ package storage import ( + "fmt" "time" + "github.com/pkg/errors" + "golang.org/x/net/context" + "github.com/cockroachdb/cockroach/config" "github.com/cockroachdb/cockroach/gossip" "github.com/cockroachdb/cockroach/roachpb" "github.com/cockroachdb/cockroach/util/hlc" "github.com/cockroachdb/cockroach/util/log" - "github.com/pkg/errors" ) const ( @@ -103,6 +106,7 @@ func (rq *replicateQueue) shouldQueue(now hlc.Timestamp, repl *Replica, } func (rq *replicateQueue) process( + ctx context.Context, now hlc.Timestamp, repl *Replica, sysCfg config.SystemConfig, @@ -126,6 +130,7 @@ func (rq *replicateQueue) process( switch action { case AllocatorAdd: + log.Trace(ctx, "adding a new replica") newStore, err := rq.allocator.AllocateTarget(zone.ReplicaAttrs[0], desc.Replicas, true, nil) if err != nil { return err @@ -134,20 +139,24 @@ func (rq *replicateQueue) process( NodeID: newStore.Node.NodeID, StoreID: newStore.StoreID, } + if log.V(1) { - log.Infof("adding replica for under-replicated RangeID:%d to %+v", repl.RangeID, newReplica) + log.Infof("range %d: adding replica to %+v due to under-replication", repl.RangeID, newReplica) } + log.Trace(ctx, fmt.Sprintf("adding replica to %+v due to under-replication", newReplica)) if err = repl.ChangeReplicas(roachpb.ADD_REPLICA, newReplica, desc); err != nil { return err } case AllocatorRemove: + log.Trace(ctx, "removing a replica") removeReplica, err := rq.allocator.RemoveTarget(desc.Replicas) if err != nil { return err } if log.V(1) { - log.Infof("removing replica for over-replicated RangeID:%d from %+v", repl.RangeID, removeReplica) + log.Infof("range %d: removing replica %+v due to over-replication", repl.RangeID, removeReplica) } + log.Trace(ctx, fmt.Sprintf("removing replica %+v due to over-replication", removeReplica)) if err = repl.ChangeReplicas(roachpb.REMOVE_REPLICA, removeReplica, desc); err != nil { return err } @@ -156,26 +165,31 @@ func (rq *replicateQueue) process( return nil } case AllocatorRemoveDead: + log.Trace(ctx, "removing a dead replica") if len(deadReplicas) == 0 { if log.V(1) { log.Warningf("Range of replica %s was identified as having dead replicas, but no dead replicas were found.", repl) } break } + deadReplica := deadReplicas[0] if log.V(1) { - log.Infof("removing replica from dead store RangeID:%d from %+v", repl.RangeID, deadReplicas[0]) + log.Infof("range %d: removing replica %+v from dead store", repl.RangeID, deadReplica) } - if err = repl.ChangeReplicas(roachpb.REMOVE_REPLICA, deadReplicas[0], desc); err != nil { + log.Trace(ctx, fmt.Sprintf("removing replica %+v from dead store", deadReplica)) + if err = repl.ChangeReplicas(roachpb.REMOVE_REPLICA, deadReplica, desc); err != nil { return err } case AllocatorNoop: + log.Trace(ctx, "considering a rebalance") // The Noop case will result if this replica was queued in order to // rebalance. Attempt to find a rebalancing target. rebalanceStore := rq.allocator.RebalanceTarget(repl.store.StoreID(), zone.ReplicaAttrs[0], desc.Replicas) if rebalanceStore == nil { if log.V(1) { - log.Infof("no suitable rebalance target for RangeID:%d", repl.RangeID) + log.Infof("range %d: no suitable rebalance target", repl.RangeID) } + log.Trace(ctx, "no suitable rebalance target") // No action was necessary and no rebalance target was found. Return // without re-queuing this replica. return nil @@ -185,8 +199,9 @@ func (rq *replicateQueue) process( StoreID: rebalanceStore.StoreID, } if log.V(1) { - log.Infof("rebalancing RangeID:%d to %+v", repl.RangeID, rebalanceReplica) + log.Infof("range %d: rebalancing to %+v", repl.RangeID, rebalanceReplica) } + log.Trace(ctx, fmt.Sprintf("rebalancing to %+v", rebalanceReplica)) if err = repl.ChangeReplicas(roachpb.ADD_REPLICA, rebalanceReplica, desc); err != nil { return err } diff --git a/storage/split_queue.go b/storage/split_queue.go index 6148bc2e8366..8ab3e77d774f 100644 --- a/storage/split_queue.go +++ b/storage/split_queue.go @@ -17,8 +17,10 @@ package storage import ( + "fmt" "time" + "github.com/pkg/errors" "golang.org/x/net/context" "github.com/cockroachdb/cockroach/config" @@ -27,7 +29,6 @@ import ( "github.com/cockroachdb/cockroach/roachpb" "github.com/cockroachdb/cockroach/util/hlc" "github.com/cockroachdb/cockroach/util/log" - "github.com/pkg/errors" ) const ( @@ -86,15 +87,18 @@ func (*splitQueue) shouldQueue(now hlc.Timestamp, rng *Replica, } // process synchronously invokes admin split for each proposed split key. -func (sq *splitQueue) process(now hlc.Timestamp, rng *Replica, - sysCfg config.SystemConfig) error { - ctx := rng.context(context.TODO()) - +func (sq *splitQueue) process( + ctx context.Context, + now hlc.Timestamp, + rng *Replica, + sysCfg config.SystemConfig, +) error { // First handle case of splitting due to zone config maps. desc := rng.Desc() splitKeys := sysCfg.ComputeSplitKeys(desc.StartKey, desc.EndKey) if len(splitKeys) > 0 { log.Infof("splitting %s at keys %v", rng, splitKeys) + log.Trace(ctx, fmt.Sprintf("splitting at keys %v", splitKeys)) for _, splitKey := range splitKeys { if err := sq.db.AdminSplit(splitKey.AsRawKey()); err != nil { return errors.Errorf("unable to split %s at key %q: %s", rng, splitKey, err) @@ -112,6 +116,7 @@ func (sq *splitQueue) process(now hlc.Timestamp, rng *Replica, // FIXME: why is this implementation not the same as the one above? if float64(size)/float64(zone.RangeMaxBytes) > 1 { log.Infof("splitting %s size=%d max=%d", rng, size, zone.RangeMaxBytes) + log.Trace(ctx, fmt.Sprintf("splitting size=%d max=%d", size, zone.RangeMaxBytes)) if _, pErr := client.SendWrappedWith(rng, ctx, roachpb.Header{ Timestamp: now, }, &roachpb.AdminSplitRequest{ diff --git a/storage/verify_queue.go b/storage/verify_queue.go index ea4818d83947..43eec5789a7b 100644 --- a/storage/verify_queue.go +++ b/storage/verify_queue.go @@ -19,6 +19,8 @@ package storage import ( "time" + "golang.org/x/net/context" + "github.com/cockroachdb/cockroach/config" "github.com/cockroachdb/cockroach/gossip" "github.com/cockroachdb/cockroach/util/hlc" @@ -80,8 +82,12 @@ func (*verifyQueue) shouldQueue(now hlc.Timestamp, rng *Replica, // process iterates through all keys and values in a range. The very // act of scanning keys verifies on-disk checksums, as each block // checksum is checked on load. -func (*verifyQueue) process(now hlc.Timestamp, rng *Replica, - _ config.SystemConfig) error { +func (*verifyQueue) process( + _ context.Context, + now hlc.Timestamp, + rng *Replica, + _ config.SystemConfig, +) error { snap := rng.store.Engine().NewSnapshot() iter := NewReplicaDataIterator(rng.Desc(), snap, false /* !replicatedOnly */)