Skip to content

Commit

Permalink
storage: extract post-commit portion of split
Browse files Browse the repository at this point in the history
This is the first step towards formalizing the split trigger as a side effect
in the sense of cockroachdb#6286. With proposed-evaluated KV, we will have to attach
a small amount of metadata to the WriteBatch containing the SplitTrigger based
from which we have to deduce the necessary actions.

I have more WIP which contains plumbing-type work, but have decided to take
a step back and bring these changes in in small increments as a plethora of
roadblocks is to be expected and cockroachdb#6144 may influence (and/or be influenced by)
this work.

Code movement only (and changing a few words around the boundary of the move)
in this commit.
  • Loading branch information
tbg committed Jul 19, 2016
1 parent c23d19d commit 50b4368
Showing 1 changed file with 105 additions and 105 deletions.
210 changes: 105 additions & 105 deletions storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -2267,6 +2267,110 @@ func (r *Replica) AdminSplit(
return reply, nil
}

// splitTriggerPostCommit is the part of the split trigger which coordinates
// the actual split with the Store. As such, it tries to avoid using any of the
// intermediate results of the code below, the goal being moving it closer to
// Store's Raft processing goroutine eventually.
func splitTriggerPostCommit(
ctx context.Context,
deltaMS enginepb.MVCCStats,
split *roachpb.SplitTrigger,
r *Replica,
) {
// Create the new Replica representing the right side of the split.
// Our error handling options at this point are very limited, but
// we need to do this after our batch has committed.
newRng, err := NewReplica(&split.NewDesc, r.store, 0)
if err != nil {
panic(err)
}

// Copy the timestamp cache into the new range.
r.mu.Lock()
newRng.mu.Lock()
r.mu.tsCache.MergeInto(newRng.mu.tsCache, true /* clear */)
newRng.mu.Unlock()
r.mu.Unlock()
log.Trace(ctx, "copied timestamp cache")

// Add the new split replica to the store. This step atomically
// updates the EndKey of the updated replica and also adds the
// new replica to the store's replica map.
if err := r.store.SplitRange(r, newRng); err != nil {
// Our in-memory state has diverged from the on-disk state.
log.Fatalf("%s: failed to update Store after split: %s", r, err)
}

// Update store stats with difference in stats before and after split.
// TODO(tschottdorf): at this point
r.store.metrics.addMVCCStats(deltaMS)

// If the range was not properly replicated before the split, the replicate
// queue may not have picked it up (due to the need for a split). Enqueue
// both new halves to speed up a potentially necessary replication. See
// #7022 and #7800.
now := r.store.Clock().Now()
r.store.replicateQueue.MaybeAdd(r, now)
r.store.replicateQueue.MaybeAdd(newRng, now)

// To avoid leaving the new range unavailable as it waits to elect
// its leader, one (and only one) of the nodes should start an
// election as soon as the split is processed.
//
// If there is only one replica, raft instantly makes it the leader.
// Otherwise, we must trigger a campaign here.
//
// TODO(bdarnell): The asynchronous campaign can cause a problem
// with merges, by recreating a replica that should have been
// destroyed. This will probably be addressed as a part of the
// general work to be done for merges
// (https://github.com/cockroachdb/cockroach/issues/2433), but for
// now we're safe because we only perform the asynchronous
// campaign when there are multiple replicas, and we only perform
// merges in testing scenarios with a single replica.
// Note that in multi-node scenarios the async campaign is safe
// because it has exactly the same behavior as an incoming message
// from another node; the problem here is only with merges.
if len(split.NewDesc.Replicas) > 1 && r.store.StoreID() == split.InitialLeaderStoreID {
// Schedule the campaign a short time in the future. As
// followers process the split, they destroy and recreate their
// raft groups, which can cause messages to be dropped. In
// general a shorter delay (perhaps all the way down to zero) is
// better in production, because the race is rare and the worst
// case scenario is that we simply wait for an election timeout.
// However, the test for this feature disables election timeouts
// and relies solely on this campaign trigger, so it is unacceptably
// flaky without a bit of a delay.
if err := r.store.stopper.RunAsyncTask(func() {
time.Sleep(10 * time.Millisecond)
// Make sure that newRng hasn't been removed.
replica, err := r.store.GetReplica(newRng.RangeID)
if err != nil {
if _, ok := err.(*roachpb.RangeNotFoundError); ok {
log.Infof("%s: new replica %d removed before campaigning",
r, r.mu.replicaID)
} else {
log.Infof("%s: new replica %d unable to campaign: %s",
r, r.mu.replicaID, err)
}
return
}

if err := replica.withRaftGroup(func(raftGroup *raft.RawNode) error {
if err := raftGroup.Campaign(); err != nil {
log.Warningf("%s: error %v", r, err)
}
return nil
}); err != nil {
panic(err)
}
}); err != nil {
log.Warningf("%s: error %v", r, err)
return
}
}
}

// splitTrigger is called on a successful commit of a transaction containing an
// AdminSplit operation. It copies the abort cache for the new range and
// recomputes stats for both the existing, updated range and the new range. For
Expand Down Expand Up @@ -2531,112 +2635,8 @@ func (r *Replica) splitTrigger(
}
log.Trace(ctx, "computed stats for new range")

// This is the part of the split trigger which coordinates the actual split
// with the Store. As such, it tries to avoid using any of the intermediate
// results of the code above, the goal being moving it closer to Store's
// Raft processing goroutine.

parametrizedTrigger := func(
ctx context.Context,
deltaMS enginepb.MVCCStats,
split *roachpb.SplitTrigger,
r *Replica,
) {
// Create the new Replica representing the right side of the split.
// Our error handling options at this point are very limited, but
// we need to do this after our batch has committed.
newRng, err := NewReplica(&split.NewDesc, r.store, 0)
if err != nil {
panic(err)
}

// Copy the timestamp cache into the new range.
r.mu.Lock()
newRng.mu.Lock()
r.mu.tsCache.MergeInto(newRng.mu.tsCache, true /* clear */)
newRng.mu.Unlock()
r.mu.Unlock()
log.Trace(ctx, "copied timestamp cache")

// Add the new split replica to the store. This step atomically
// updates the EndKey of the updated replica and also adds the
// new replica to the store's replica map.
if err := r.store.SplitRange(r, newRng); err != nil {
// Our in-memory state has diverged from the on-disk state.
log.Fatalf("%s: failed to update Store after split: %s", r, err)
}

// Update store stats with difference in stats before and after split.
r.store.metrics.addMVCCStats(deltaMS)

// If the range was not properly replicated before the split, the replicate
// queue may not have picked it up (due to the need for a split). Enqueue
// both new halves to speed up a potentially necessary replication. See
// #7022 and #7800.
now := r.store.Clock().Now()
r.store.replicateQueue.MaybeAdd(r, now)
r.store.replicateQueue.MaybeAdd(newRng, now)

// To avoid leaving the new range unavailable as it waits to elect
// its leader, one (and only one) of the nodes should start an
// election as soon as the split is processed.
//
// If there is only one replica, raft instantly makes it the leader.
// Otherwise, we must trigger a campaign here.
//
// TODO(bdarnell): The asynchronous campaign can cause a problem
// with merges, by recreating a replica that should have been
// destroyed. This will probably be addressed as a part of the
// general work to be done for merges
// (https://github.com/cockroachdb/cockroach/issues/2433), but for
// now we're safe because we only perform the asynchronous
// campaign when there are multiple replicas, and we only perform
// merges in testing scenarios with a single replica.
// Note that in multi-node scenarios the async campaign is safe
// because it has exactly the same behavior as an incoming message
// from another node; the problem here is only with merges.
if len(split.NewDesc.Replicas) > 1 && r.store.StoreID() == split.InitialLeaderStoreID {
// Schedule the campaign a short time in the future. As
// followers process the split, they destroy and recreate their
// raft groups, which can cause messages to be dropped. In
// general a shorter delay (perhaps all the way down to zero) is
// better in production, because the race is rare and the worst
// case scenario is that we simply wait for an election timeout.
// However, the test for this feature disables election timeouts
// and relies solely on this campaign trigger, so it is unacceptably
// flaky without a bit of a delay.
if err := r.store.stopper.RunAsyncTask(func() {
time.Sleep(10 * time.Millisecond)
// Make sure that newRng hasn't been removed.
replica, err := r.store.GetReplica(newRng.RangeID)
if err != nil {
if _, ok := err.(*roachpb.RangeNotFoundError); ok {
log.Infof("%s: new replica %d removed before campaigning",
r, r.mu.replicaID)
} else {
log.Infof("%s: new replica %d unable to campaign: %s",
r, r.mu.replicaID, err)
}
return
}

if err := replica.withRaftGroup(func(raftGroup *raft.RawNode) error {
if err := raftGroup.Campaign(); err != nil {
log.Warningf("%s: error %v", r, err)
}
return nil
}); err != nil {
panic(err)
}
}); err != nil {
log.Warningf("%s: error %v", r, err)
return
}
}
}

theTrigger := func() {
parametrizedTrigger(ctx, deltaMS, split, r)
splitTriggerPostCommit(ctx, deltaMS, split, r)
}

// Note: you must not use the trace inside of this defer since it may
Expand Down

0 comments on commit 50b4368

Please sign in to comment.