Skip to content

Commit

Permalink
Merge pull request #2675 from bdarnell/use-replica-id
Browse files Browse the repository at this point in the history
Rework ID usage in multiraft
  • Loading branch information
bdarnell committed Sep 29, 2015
2 parents 42b4585 + 570b508 commit fd5b64a
Show file tree
Hide file tree
Showing 63 changed files with 2,078 additions and 1,299 deletions.
8 changes: 4 additions & 4 deletions kv/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,11 +316,11 @@ func (ds *DistSender) sendRPC(trace *tracer.Trace, rangeID proto.RangeID, replic

// Build a slice of replica addresses (if gossiped).
var addrs []net.Addr
replicaMap := map[string]*proto.Replica{}
replicaMap := map[string]*proto.ReplicaDescriptor{}
for i := range replicas {
addr := replicas[i].NodeDesc.Address
addrs = append(addrs, addr)
replicaMap[addr.String()] = &replicas[i].Replica
replicaMap[addr.String()] = &replicas[i].ReplicaDescriptor
}
if len(addrs) == 0 {
return nil, noNodeAddrsAvailError{}
Expand Down Expand Up @@ -627,7 +627,7 @@ func (ds *DistSender) sendChunk(ctx context.Context, ba proto.BatchRequest) (*pr
evictDesc()
}
} else {
newLeader = &proto.Replica{}
newLeader = &proto.ReplicaDescriptor{}
}
ds.updateLeaderCache(proto.RangeID(desc.RangeID), *newLeader)
if log.V(1) {
Expand Down Expand Up @@ -752,7 +752,7 @@ func (ds *DistSender) sendChunk(ctx context.Context, ba proto.BatchRequest) (*pr

// updateLeaderCache updates the cached leader for the given range,
// evicting any previous value in the process.
func (ds *DistSender) updateLeaderCache(rid proto.RangeID, leader proto.Replica) {
func (ds *DistSender) updateLeaderCache(rid proto.RangeID, leader proto.ReplicaDescriptor) {
oldLeader := ds.leaderCache.Lookup(rid)
if leader.StoreID != oldLeader.StoreID {
if log.V(1) {
Expand Down
44 changes: 22 additions & 22 deletions kv/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ var testRangeDescriptor = proto.RangeDescriptor{
RangeID: 1,
StartKey: proto.Key("a"),
EndKey: proto.Key("z"),
Replicas: []proto.Replica{
Replicas: []proto.ReplicaDescriptor{
{
NodeID: 1,
StoreID: 1,
Expand Down Expand Up @@ -86,16 +86,16 @@ func TestMoveLocalReplicaToFront(t *testing.T) {
// No attribute prefix
slice: replicaSlice{
replicaInfo{
Replica: proto.Replica{NodeID: 2, StoreID: 2},
NodeDesc: &proto.NodeDescriptor{NodeID: 2},
ReplicaDescriptor: proto.ReplicaDescriptor{NodeID: 2, StoreID: 2},
NodeDesc: &proto.NodeDescriptor{NodeID: 2},
},
replicaInfo{
Replica: proto.Replica{NodeID: 3, StoreID: 3},
NodeDesc: &proto.NodeDescriptor{NodeID: 3},
ReplicaDescriptor: proto.ReplicaDescriptor{NodeID: 3, StoreID: 3},
NodeDesc: &proto.NodeDescriptor{NodeID: 3},
},
replicaInfo{
Replica: proto.Replica{NodeID: 1, StoreID: 1},
NodeDesc: &proto.NodeDescriptor{NodeID: 1},
ReplicaDescriptor: proto.ReplicaDescriptor{NodeID: 1, StoreID: 1},
NodeDesc: &proto.NodeDescriptor{NodeID: 1},
},
},
localNodeDesc: proto.NodeDescriptor{NodeID: 1},
Expand All @@ -104,16 +104,16 @@ func TestMoveLocalReplicaToFront(t *testing.T) {
// Sort replicas by attribute
slice: replicaSlice{
replicaInfo{
Replica: proto.Replica{NodeID: 2, StoreID: 2},
NodeDesc: &proto.NodeDescriptor{NodeID: 2, Attrs: proto.Attributes{Attrs: []string{"ad"}}},
ReplicaDescriptor: proto.ReplicaDescriptor{NodeID: 2, StoreID: 2},
NodeDesc: &proto.NodeDescriptor{NodeID: 2, Attrs: proto.Attributes{Attrs: []string{"ad"}}},
},
replicaInfo{
Replica: proto.Replica{NodeID: 3, StoreID: 3},
NodeDesc: &proto.NodeDescriptor{NodeID: 3, Attrs: proto.Attributes{Attrs: []string{"ab", "c"}}},
ReplicaDescriptor: proto.ReplicaDescriptor{NodeID: 3, StoreID: 3},
NodeDesc: &proto.NodeDescriptor{NodeID: 3, Attrs: proto.Attributes{Attrs: []string{"ab", "c"}}},
},
replicaInfo{
Replica: proto.Replica{NodeID: 1, StoreID: 1},
NodeDesc: &proto.NodeDescriptor{NodeID: 1, Attrs: proto.Attributes{Attrs: []string{"ab"}}},
ReplicaDescriptor: proto.ReplicaDescriptor{NodeID: 1, StoreID: 1},
NodeDesc: &proto.NodeDescriptor{NodeID: 1, Attrs: proto.Attributes{Attrs: []string{"ab"}}},
},
},
localNodeDesc: proto.NodeDescriptor{NodeID: 1, Attrs: proto.Attributes{Attrs: []string{"ab"}}},
Expand Down Expand Up @@ -299,7 +299,7 @@ func TestSendRPCOrder(t *testing.T) {
if err := g.AddInfoProto(gossip.MakeNodeIDKey(proto.NodeID(i)), nd, time.Hour); err != nil {
t.Fatal(err)
}
descriptor.Replicas = append(descriptor.Replicas, proto.Replica{
descriptor.Replicas = append(descriptor.Replicas, proto.ReplicaDescriptor{
NodeID: proto.NodeID(i),
StoreID: proto.StoreID(i),
})
Expand All @@ -318,7 +318,7 @@ func TestSendRPCOrder(t *testing.T) {
}
}

ds.leaderCache.Update(proto.RangeID(rangeID), proto.Replica{})
ds.leaderCache.Update(proto.RangeID(rangeID), proto.ReplicaDescriptor{})
if tc.leader > 0 {
ds.leaderCache.Update(proto.RangeID(rangeID), descriptor.Replicas[tc.leader-1])
}
Expand Down Expand Up @@ -363,7 +363,7 @@ func TestRetryOnNotLeaderError(t *testing.T) {
defer leaktest.AfterTest(t)
g, s := makeTestGossip(t)
defer s()
leader := proto.Replica{
leader := proto.ReplicaDescriptor{
NodeID: 99,
StoreID: 999,
}
Expand All @@ -373,7 +373,7 @@ func TestRetryOnNotLeaderError(t *testing.T) {
if first {
reply := getReply()
reply.(proto.Response).Header().SetGoError(
&proto.NotLeaderError{Leader: &leader, Replica: &proto.Replica{}})
&proto.NotLeaderError{Leader: &leader, Replica: &proto.ReplicaDescriptor{}})
first = false
return []gogoproto.Message{reply}, nil
}
Expand Down Expand Up @@ -459,7 +459,7 @@ func TestEvictCacheOnError(t *testing.T) {
for i, tc := range testCases {
g, s := makeTestGossip(t)
defer s()
leader := proto.Replica{
leader := proto.ReplicaDescriptor{
NodeID: 99,
StoreID: 999,
}
Expand Down Expand Up @@ -498,7 +498,7 @@ func TestEvictCacheOnError(t *testing.T) {
if _, err := batchutil.SendWrapped(ds, put); err != nil && !testutils.IsError(err, "boom") {
t.Errorf("put encountered unexpected error: %s", err)
}
if cur := ds.leaderCache.Lookup(1); reflect.DeepEqual(cur, &proto.Replica{}) && !tc.shouldClearLeader {
if cur := ds.leaderCache.Lookup(1); reflect.DeepEqual(cur, &proto.ReplicaDescriptor{}) && !tc.shouldClearLeader {
t.Errorf("%d: leader cache eviction: shouldClearLeader=%t, but value is %v", i, tc.shouldClearLeader, cur)
}
_, cachedDesc := ds.rangeCache.getCachedRangeDescriptor(put.Key, false /* !inclusive */)
Expand Down Expand Up @@ -626,7 +626,7 @@ func TestSendRPCRetry(t *testing.T) {
t.Fatal(err)
}

descriptor.Replicas = append(descriptor.Replicas, proto.Replica{
descriptor.Replicas = append(descriptor.Replicas, proto.ReplicaDescriptor{
NodeID: proto.NodeID(i),
StoreID: proto.StoreID(i),
})
Expand Down Expand Up @@ -697,7 +697,7 @@ func TestMultiRangeMergeStaleDescriptor(t *testing.T) {
RangeID: 1,
StartKey: proto.Key("a"),
EndKey: proto.Key("b"),
Replicas: []proto.Replica{
Replicas: []proto.ReplicaDescriptor{
{
NodeID: 1,
StoreID: 1,
Expand All @@ -710,7 +710,7 @@ func TestMultiRangeMergeStaleDescriptor(t *testing.T) {
RangeID: 1,
StartKey: proto.Key("a"),
EndKey: proto.KeyMax,
Replicas: []proto.Replica{
Replicas: []proto.ReplicaDescriptor{
{
NodeID: 1,
StoreID: 1,
Expand Down
8 changes: 4 additions & 4 deletions kv/leader_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,20 @@ func newLeaderCache(size int) *leaderCache {

// Lookup consults the cache for the replica cached as the leader of
// the given Raft consensus group.
func (lc *leaderCache) Lookup(group proto.RangeID) proto.Replica {
func (lc *leaderCache) Lookup(group proto.RangeID) proto.ReplicaDescriptor {
lc.mu.Lock()
defer lc.mu.Unlock()
v, ok := lc.cache.Get(group)
if !ok || v == nil {
return proto.Replica{}
return proto.ReplicaDescriptor{}
}
return *(v.(*proto.Replica))
return *(v.(*proto.ReplicaDescriptor))
}

// Update invalidates the cached leader for the given Raft group.
// If a replica is passed in, it is inserted into the cache.
// A StoreID of 0 (empty replica) means evict.
func (lc *leaderCache) Update(group proto.RangeID, r proto.Replica) {
func (lc *leaderCache) Update(group proto.RangeID, r proto.ReplicaDescriptor) {
lc.mu.Lock()
defer lc.mu.Unlock()
lc.cache.Del(group)
Expand Down
6 changes: 3 additions & 3 deletions kv/leader_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,18 @@ func TestLeaderCache(t *testing.T) {
if r := lc.Lookup(12); r.StoreID != 0 {
t.Fatalf("lookup of missing key returned replica: %v", r)
}
replica := proto.Replica{StoreID: 1}
replica := proto.ReplicaDescriptor{StoreID: 1}
lc.Update(5, replica)
if r := lc.Lookup(5); r.StoreID != 1 {
t.Errorf("expected %v, got %v", replica, r)
}
newReplica := proto.Replica{StoreID: 7}
newReplica := proto.ReplicaDescriptor{StoreID: 7}
lc.Update(5, newReplica)
r := lc.Lookup(5)
if r.StoreID != 7 {
t.Errorf("expected %v, got %v", newReplica, r)
}
lc.Update(5, proto.Replica{})
lc.Update(5, proto.ReplicaDescriptor{})
r = lc.Lookup(5)
if r.StoreID != 0 {
t.Fatalf("evicted leader returned: %v", r)
Expand Down
4 changes: 2 additions & 2 deletions kv/local_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (ls *LocalSender) Send(ctx context.Context, ba proto.BatchRequest) (*proto.
// If we aren't given a Replica, then a little bending over
// backwards here. This case applies exclusively to unittests.
if ba.RangeID == 0 || ba.Replica.StoreID == 0 {
var repl *proto.Replica
var repl *proto.ReplicaDescriptor
var rangeID proto.RangeID
rangeID, repl, err = ls.lookupReplica(ba.Key, ba.EndKey)
if err == nil {
Expand Down Expand Up @@ -167,7 +167,7 @@ func (ls *LocalSender) Send(ctx context.Context, ba proto.BatchRequest) (*proto.
// Returns RangeID and replica on success; RangeKeyMismatch error
// if not found.
// This is only for testing usage; performance doesn't matter.
func (ls *LocalSender) lookupReplica(start, end proto.Key) (rangeID proto.RangeID, replica *proto.Replica, err error) {
func (ls *LocalSender) lookupReplica(start, end proto.Key) (rangeID proto.RangeID, replica *proto.ReplicaDescriptor, err error) {
ls.mu.RLock()
defer ls.mu.RUnlock()
var rng *storage.Replica
Expand Down
4 changes: 2 additions & 2 deletions kv/local_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func TestLocalSenderGetStore(t *testing.T) {
defer leaktest.AfterTest(t)
ls := NewLocalSender()
store := storage.Store{}
replica := proto.Replica{StoreID: store.Ident.StoreID}
replica := proto.ReplicaDescriptor{StoreID: store.Ident.StoreID}
s, err := ls.GetStore(replica.StoreID)
if s != nil || err == nil {
t.Errorf("expected no stores in new local sender")
Expand Down Expand Up @@ -154,7 +154,7 @@ func TestLocalSenderLookupReplica(t *testing.T) {
RangeID: proto.RangeID(i),
StartKey: rng.start,
EndKey: rng.end,
Replicas: []proto.Replica{{StoreID: rng.storeID}},
Replicas: []proto.ReplicaDescriptor{{StoreID: rng.storeID}},
}
newRng, err := storage.NewReplica(d[i], s[i])
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions kv/replica_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
// replicaInfo extends the Replica structure with the associated node
// descriptor.
type replicaInfo struct {
proto.Replica
proto.ReplicaDescriptor
NodeDesc *proto.NodeDescriptor
}

Expand Down Expand Up @@ -55,8 +55,8 @@ func newReplicaSlice(gossip *gossip.Gossip, desc *proto.RangeDescriptor) replica
continue
}
replicas = append(replicas, replicaInfo{
Replica: r,
NodeDesc: nd,
ReplicaDescriptor: r,
NodeDesc: nd,
})
}
return replicas
Expand Down
2 changes: 1 addition & 1 deletion kv/replica_slice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestReplicaSetMoveToFront(t *testing.T) {
defer leaktest.AfterTest(t)
rs := replicaSlice(nil)
for i := 0; i < 5; i++ {
rs = append(rs, replicaInfo{Replica: proto.Replica{StoreID: proto.StoreID(i + 1)}})
rs = append(rs, replicaInfo{ReplicaDescriptor: proto.ReplicaDescriptor{StoreID: proto.StoreID(i + 1)}})
}
rs.MoveToFront(0)
exp := []proto.StoreID{1, 2, 3, 4, 5}
Expand Down
2 changes: 1 addition & 1 deletion kv/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ func TestTxnCoordSenderTxnUpdatedOnError(t *testing.T) {
Txn: &proto.Transaction{
Name: "test txn",
},
Replica: proto.Replica{
Replica: proto.ReplicaDescriptor{
NodeID: 12345,
},
},
Expand Down
8 changes: 4 additions & 4 deletions multiraft/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import (
// An EventLeaderElection is broadcast when a group starts or completes
// an election. NodeID is zero when an election is in progress.
type EventLeaderElection struct {
GroupID proto.RangeID
NodeID proto.RaftNodeID
Term uint64
GroupID proto.RangeID
ReplicaID proto.ReplicaID
Term uint64
}

// An EventCommandCommitted is broadcast whenever a command has been committed.
Expand All @@ -52,7 +52,7 @@ type EventMembershipChangeCommitted struct {
GroupID proto.RangeID
CommandID string
Index uint64
NodeID proto.RaftNodeID
Replica proto.ReplicaDescriptor
ChangeType raftpb.ConfChangeType
Payload []byte

Expand Down
Loading

0 comments on commit fd5b64a

Please sign in to comment.