Skip to content

Commit

Permalink
[WIP]
Browse files Browse the repository at this point in the history
  • Loading branch information
petermattis committed Sep 26, 2016
1 parent 0356ff3 commit 9b0203d
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 28 deletions.
2 changes: 2 additions & 0 deletions kv/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ func (gt *grpcTransport) SendNext(done chan<- BatchCall) {
addr := client.remoteAddr
if log.V(2) {
log.Infof(gt.opts.ctx, "sending request to %s: %+v", addr, client.args)
} else {
log.Eventf(gt.opts.ctx, "sending request to %s", addr)
}

if localServer := gt.rpcContext.GetLocalInternalServerForAddr(addr); enableLocalCalls && localServer != nil {
Expand Down
2 changes: 2 additions & 0 deletions storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -2280,6 +2280,7 @@ func (r *Replica) refreshPendingCmdsLocked(reason refreshRaftReason, refreshAtDe
}
delete(r.mu.pendingCmds, idKey)
// The command can be refurbished.
log.VEventf(4, p.ctx, "refurbishing command %x; %s", p.idKey, reason)
if pErr := r.refurbishPendingCmdLocked(p); pErr != nil {
p.done <- roachpb.ResponseWithError{Err: pErr}
}
Expand All @@ -2299,6 +2300,7 @@ func (r *Replica) refreshPendingCmdsLocked(reason refreshRaftReason, refreshAtDe
// the right place. Reproposing in order is definitely required, however.
sort.Sort(reproposals)
for _, p := range reproposals {
log.VEventf(4, p.ctx, "reproposing command %x; %s", p.idKey, reason)
if err := r.proposePendingCmdLocked(p); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -2308,7 +2308,7 @@ func (r *Replica) AdminSplit(
leftDesc := *desc
leftDesc.EndKey = splitKey

log.Infof(ctx, "initiating a split of this range at key %s", splitKey)
log.Infof(ctx, "initiating a split of this range at key %s [r%d]", splitKey, rightDesc.RangeID)

if err := r.store.DB().Txn(ctx, func(txn *client.Txn) error {
log.Event(ctx, "split closure begins")
Expand Down
55 changes: 28 additions & 27 deletions storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1538,33 +1538,34 @@ func splitTriggerPostCommit(
//
// Note: you must not use the context inside of this task since it may
// contain a finished trace by the time it runs.
if err := r.store.stopper.RunAsyncTask(ctx, func(ctx context.Context) {
time.Sleep(10 * time.Millisecond)
// Make sure that rightRng hasn't been removed.
replica, err := r.store.GetReplica(rightRng.RangeID)
if err != nil {
if _, ok := err.(*roachpb.RangeNotFoundError); ok {
log.Infof(ctx, "%s: RHS replica %d removed before campaigning",
r, r.mu.replicaID)
} else {
log.Infof(ctx, "%s: RHS replica %d unable to campaign: %s",
r, r.mu.replicaID, err)
}
return
}

if err := replica.withRaftGroup(func(raftGroup *raft.RawNode) (bool, error) {
if err := raftGroup.Campaign(); err != nil {
log.Warningf(ctx, "%s: error %v", r, err)
}
return true, nil
}); err != nil {
panic(err)
}
}); err != nil {
log.Warningf(ctx, "%s: error %v", r, err)
return
}
// if err := r.store.stopper.RunAsyncTask(ctx, func(ctx context.Context) {
// time.Sleep(10 * time.Millisecond)
// // Make sure that rightRng hasn't been removed.
// replica, err := r.store.GetReplica(rightRng.RangeID)
// if err != nil {
// if _, ok := err.(*roachpb.RangeNotFoundError); ok {
// log.Infof(ctx, "%s: RHS replica %d removed before campaigning",
// r, r.mu.replicaID)
// } else {
// log.Infof(ctx, "%s: RHS replica %d unable to campaign: %s",
// r, r.mu.replicaID, err)
// }
// return
// }

// if err := replica.withRaftGroup(func(raftGroup *raft.RawNode) (bool, error) {
// log.Infof(replica.ctx, "campaigning after split")
// if err := raftGroup.Campaign(); err != nil {
// log.Warningf(ctx, "%s: error %v", r, err)
// }
// return true, nil
// }); err != nil {
// panic(err)
// }
// }); err != nil {
// log.Warningf(ctx, "%s: error %v", r, err)
// return
// }
} else if len(split.RightDesc.Replicas) == 1 {
// TODO(peter): In single-node clusters, we enqueue the right-hand side of
// the split (the new range) for Raft processing so that the corresponding
Expand Down

0 comments on commit 9b0203d

Please sign in to comment.