Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework ID usage in multiraft #2675

Merged
merged 6 commits into from
Sep 29, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions kv/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,11 +316,11 @@ func (ds *DistSender) sendRPC(trace *tracer.Trace, rangeID proto.RangeID, replic

// Build a slice of replica addresses (if gossiped).
var addrs []net.Addr
replicaMap := map[string]*proto.Replica{}
replicaMap := map[string]*proto.ReplicaDescriptor{}
for i := range replicas {
addr := replicas[i].NodeDesc.Address
addrs = append(addrs, addr)
replicaMap[addr.String()] = &replicas[i].Replica
replicaMap[addr.String()] = &replicas[i].ReplicaDescriptor
}
if len(addrs) == 0 {
return nil, noNodeAddrsAvailError{}
Expand Down Expand Up @@ -627,7 +627,7 @@ func (ds *DistSender) sendChunk(ctx context.Context, ba proto.BatchRequest) (*pr
evictDesc()
}
} else {
newLeader = &proto.Replica{}
newLeader = &proto.ReplicaDescriptor{}
}
ds.updateLeaderCache(proto.RangeID(desc.RangeID), *newLeader)
if log.V(1) {
Expand Down Expand Up @@ -752,7 +752,7 @@ func (ds *DistSender) sendChunk(ctx context.Context, ba proto.BatchRequest) (*pr

// updateLeaderCache updates the cached leader for the given range,
// evicting any previous value in the process.
func (ds *DistSender) updateLeaderCache(rid proto.RangeID, leader proto.Replica) {
func (ds *DistSender) updateLeaderCache(rid proto.RangeID, leader proto.ReplicaDescriptor) {
oldLeader := ds.leaderCache.Lookup(rid)
if leader.StoreID != oldLeader.StoreID {
if log.V(1) {
Expand Down
44 changes: 22 additions & 22 deletions kv/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ var testRangeDescriptor = proto.RangeDescriptor{
RangeID: 1,
StartKey: proto.Key("a"),
EndKey: proto.Key("z"),
Replicas: []proto.Replica{
Replicas: []proto.ReplicaDescriptor{
{
NodeID: 1,
StoreID: 1,
Expand Down Expand Up @@ -86,16 +86,16 @@ func TestMoveLocalReplicaToFront(t *testing.T) {
// No attribute prefix
slice: replicaSlice{
replicaInfo{
Replica: proto.Replica{NodeID: 2, StoreID: 2},
NodeDesc: &proto.NodeDescriptor{NodeID: 2},
ReplicaDescriptor: proto.ReplicaDescriptor{NodeID: 2, StoreID: 2},
NodeDesc: &proto.NodeDescriptor{NodeID: 2},
},
replicaInfo{
Replica: proto.Replica{NodeID: 3, StoreID: 3},
NodeDesc: &proto.NodeDescriptor{NodeID: 3},
ReplicaDescriptor: proto.ReplicaDescriptor{NodeID: 3, StoreID: 3},
NodeDesc: &proto.NodeDescriptor{NodeID: 3},
},
replicaInfo{
Replica: proto.Replica{NodeID: 1, StoreID: 1},
NodeDesc: &proto.NodeDescriptor{NodeID: 1},
ReplicaDescriptor: proto.ReplicaDescriptor{NodeID: 1, StoreID: 1},
NodeDesc: &proto.NodeDescriptor{NodeID: 1},
},
},
localNodeDesc: proto.NodeDescriptor{NodeID: 1},
Expand All @@ -104,16 +104,16 @@ func TestMoveLocalReplicaToFront(t *testing.T) {
// Sort replicas by attribute
slice: replicaSlice{
replicaInfo{
Replica: proto.Replica{NodeID: 2, StoreID: 2},
NodeDesc: &proto.NodeDescriptor{NodeID: 2, Attrs: proto.Attributes{Attrs: []string{"ad"}}},
ReplicaDescriptor: proto.ReplicaDescriptor{NodeID: 2, StoreID: 2},
NodeDesc: &proto.NodeDescriptor{NodeID: 2, Attrs: proto.Attributes{Attrs: []string{"ad"}}},
},
replicaInfo{
Replica: proto.Replica{NodeID: 3, StoreID: 3},
NodeDesc: &proto.NodeDescriptor{NodeID: 3, Attrs: proto.Attributes{Attrs: []string{"ab", "c"}}},
ReplicaDescriptor: proto.ReplicaDescriptor{NodeID: 3, StoreID: 3},
NodeDesc: &proto.NodeDescriptor{NodeID: 3, Attrs: proto.Attributes{Attrs: []string{"ab", "c"}}},
},
replicaInfo{
Replica: proto.Replica{NodeID: 1, StoreID: 1},
NodeDesc: &proto.NodeDescriptor{NodeID: 1, Attrs: proto.Attributes{Attrs: []string{"ab"}}},
ReplicaDescriptor: proto.ReplicaDescriptor{NodeID: 1, StoreID: 1},
NodeDesc: &proto.NodeDescriptor{NodeID: 1, Attrs: proto.Attributes{Attrs: []string{"ab"}}},
},
},
localNodeDesc: proto.NodeDescriptor{NodeID: 1, Attrs: proto.Attributes{Attrs: []string{"ab"}}},
Expand Down Expand Up @@ -299,7 +299,7 @@ func TestSendRPCOrder(t *testing.T) {
if err := g.AddInfoProto(gossip.MakeNodeIDKey(proto.NodeID(i)), nd, time.Hour); err != nil {
t.Fatal(err)
}
descriptor.Replicas = append(descriptor.Replicas, proto.Replica{
descriptor.Replicas = append(descriptor.Replicas, proto.ReplicaDescriptor{
NodeID: proto.NodeID(i),
StoreID: proto.StoreID(i),
})
Expand All @@ -318,7 +318,7 @@ func TestSendRPCOrder(t *testing.T) {
}
}

ds.leaderCache.Update(proto.RangeID(rangeID), proto.Replica{})
ds.leaderCache.Update(proto.RangeID(rangeID), proto.ReplicaDescriptor{})
if tc.leader > 0 {
ds.leaderCache.Update(proto.RangeID(rangeID), descriptor.Replicas[tc.leader-1])
}
Expand Down Expand Up @@ -363,7 +363,7 @@ func TestRetryOnNotLeaderError(t *testing.T) {
defer leaktest.AfterTest(t)
g, s := makeTestGossip(t)
defer s()
leader := proto.Replica{
leader := proto.ReplicaDescriptor{
NodeID: 99,
StoreID: 999,
}
Expand All @@ -373,7 +373,7 @@ func TestRetryOnNotLeaderError(t *testing.T) {
if first {
reply := getReply()
reply.(proto.Response).Header().SetGoError(
&proto.NotLeaderError{Leader: &leader, Replica: &proto.Replica{}})
&proto.NotLeaderError{Leader: &leader, Replica: &proto.ReplicaDescriptor{}})
first = false
return []gogoproto.Message{reply}, nil
}
Expand Down Expand Up @@ -459,7 +459,7 @@ func TestEvictCacheOnError(t *testing.T) {
for i, tc := range testCases {
g, s := makeTestGossip(t)
defer s()
leader := proto.Replica{
leader := proto.ReplicaDescriptor{
NodeID: 99,
StoreID: 999,
}
Expand Down Expand Up @@ -498,7 +498,7 @@ func TestEvictCacheOnError(t *testing.T) {
if _, err := batchutil.SendWrapped(ds, put); err != nil && !testutils.IsError(err, "boom") {
t.Errorf("put encountered unexpected error: %s", err)
}
if cur := ds.leaderCache.Lookup(1); reflect.DeepEqual(cur, &proto.Replica{}) && !tc.shouldClearLeader {
if cur := ds.leaderCache.Lookup(1); reflect.DeepEqual(cur, &proto.ReplicaDescriptor{}) && !tc.shouldClearLeader {
t.Errorf("%d: leader cache eviction: shouldClearLeader=%t, but value is %v", i, tc.shouldClearLeader, cur)
}
_, cachedDesc := ds.rangeCache.getCachedRangeDescriptor(put.Key, false /* !inclusive */)
Expand Down Expand Up @@ -626,7 +626,7 @@ func TestSendRPCRetry(t *testing.T) {
t.Fatal(err)
}

descriptor.Replicas = append(descriptor.Replicas, proto.Replica{
descriptor.Replicas = append(descriptor.Replicas, proto.ReplicaDescriptor{
NodeID: proto.NodeID(i),
StoreID: proto.StoreID(i),
})
Expand Down Expand Up @@ -697,7 +697,7 @@ func TestMultiRangeMergeStaleDescriptor(t *testing.T) {
RangeID: 1,
StartKey: proto.Key("a"),
EndKey: proto.Key("b"),
Replicas: []proto.Replica{
Replicas: []proto.ReplicaDescriptor{
{
NodeID: 1,
StoreID: 1,
Expand All @@ -710,7 +710,7 @@ func TestMultiRangeMergeStaleDescriptor(t *testing.T) {
RangeID: 1,
StartKey: proto.Key("a"),
EndKey: proto.KeyMax,
Replicas: []proto.Replica{
Replicas: []proto.ReplicaDescriptor{
{
NodeID: 1,
StoreID: 1,
Expand Down
8 changes: 4 additions & 4 deletions kv/leader_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,20 @@ func newLeaderCache(size int) *leaderCache {

// Lookup consults the cache for the replica cached as the leader of
// the given Raft consensus group.
func (lc *leaderCache) Lookup(group proto.RangeID) proto.Replica {
func (lc *leaderCache) Lookup(group proto.RangeID) proto.ReplicaDescriptor {
lc.mu.Lock()
defer lc.mu.Unlock()
v, ok := lc.cache.Get(group)
if !ok || v == nil {
return proto.Replica{}
return proto.ReplicaDescriptor{}
}
return *(v.(*proto.Replica))
return *(v.(*proto.ReplicaDescriptor))
}

// Update invalidates the cached leader for the given Raft group.
// If a replica is passed in, it is inserted into the cache.
// A StoreID of 0 (empty replica) means evict.
func (lc *leaderCache) Update(group proto.RangeID, r proto.Replica) {
func (lc *leaderCache) Update(group proto.RangeID, r proto.ReplicaDescriptor) {
lc.mu.Lock()
defer lc.mu.Unlock()
lc.cache.Del(group)
Expand Down
6 changes: 3 additions & 3 deletions kv/leader_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,18 @@ func TestLeaderCache(t *testing.T) {
if r := lc.Lookup(12); r.StoreID != 0 {
t.Fatalf("lookup of missing key returned replica: %v", r)
}
replica := proto.Replica{StoreID: 1}
replica := proto.ReplicaDescriptor{StoreID: 1}
lc.Update(5, replica)
if r := lc.Lookup(5); r.StoreID != 1 {
t.Errorf("expected %v, got %v", replica, r)
}
newReplica := proto.Replica{StoreID: 7}
newReplica := proto.ReplicaDescriptor{StoreID: 7}
lc.Update(5, newReplica)
r := lc.Lookup(5)
if r.StoreID != 7 {
t.Errorf("expected %v, got %v", newReplica, r)
}
lc.Update(5, proto.Replica{})
lc.Update(5, proto.ReplicaDescriptor{})
r = lc.Lookup(5)
if r.StoreID != 0 {
t.Fatalf("evicted leader returned: %v", r)
Expand Down
4 changes: 2 additions & 2 deletions kv/local_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (ls *LocalSender) Send(ctx context.Context, ba proto.BatchRequest) (*proto.
// If we aren't given a Replica, then a little bending over
// backwards here. This case applies exclusively to unittests.
if ba.RangeID == 0 || ba.Replica.StoreID == 0 {
var repl *proto.Replica
var repl *proto.ReplicaDescriptor
var rangeID proto.RangeID
rangeID, repl, err = ls.lookupReplica(ba.Key, ba.EndKey)
if err == nil {
Expand Down Expand Up @@ -167,7 +167,7 @@ func (ls *LocalSender) Send(ctx context.Context, ba proto.BatchRequest) (*proto.
// Returns RangeID and replica on success; RangeKeyMismatch error
// if not found.
// This is only for testing usage; performance doesn't matter.
func (ls *LocalSender) lookupReplica(start, end proto.Key) (rangeID proto.RangeID, replica *proto.Replica, err error) {
func (ls *LocalSender) lookupReplica(start, end proto.Key) (rangeID proto.RangeID, replica *proto.ReplicaDescriptor, err error) {
ls.mu.RLock()
defer ls.mu.RUnlock()
var rng *storage.Replica
Expand Down
4 changes: 2 additions & 2 deletions kv/local_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func TestLocalSenderGetStore(t *testing.T) {
defer leaktest.AfterTest(t)
ls := NewLocalSender()
store := storage.Store{}
replica := proto.Replica{StoreID: store.Ident.StoreID}
replica := proto.ReplicaDescriptor{StoreID: store.Ident.StoreID}
s, err := ls.GetStore(replica.StoreID)
if s != nil || err == nil {
t.Errorf("expected no stores in new local sender")
Expand Down Expand Up @@ -154,7 +154,7 @@ func TestLocalSenderLookupReplica(t *testing.T) {
RangeID: proto.RangeID(i),
StartKey: rng.start,
EndKey: rng.end,
Replicas: []proto.Replica{{StoreID: rng.storeID}},
Replicas: []proto.ReplicaDescriptor{{StoreID: rng.storeID}},
}
newRng, err := storage.NewReplica(d[i], s[i])
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions kv/replica_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
// replicaInfo extends the Replica structure with the associated node
// descriptor.
type replicaInfo struct {
proto.Replica
proto.ReplicaDescriptor
NodeDesc *proto.NodeDescriptor
}

Expand Down Expand Up @@ -55,8 +55,8 @@ func newReplicaSlice(gossip *gossip.Gossip, desc *proto.RangeDescriptor) replica
continue
}
replicas = append(replicas, replicaInfo{
Replica: r,
NodeDesc: nd,
ReplicaDescriptor: r,
NodeDesc: nd,
})
}
return replicas
Expand Down
2 changes: 1 addition & 1 deletion kv/replica_slice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestReplicaSetMoveToFront(t *testing.T) {
defer leaktest.AfterTest(t)
rs := replicaSlice(nil)
for i := 0; i < 5; i++ {
rs = append(rs, replicaInfo{Replica: proto.Replica{StoreID: proto.StoreID(i + 1)}})
rs = append(rs, replicaInfo{ReplicaDescriptor: proto.ReplicaDescriptor{StoreID: proto.StoreID(i + 1)}})
}
rs.MoveToFront(0)
exp := []proto.StoreID{1, 2, 3, 4, 5}
Expand Down
2 changes: 1 addition & 1 deletion kv/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ func TestTxnCoordSenderTxnUpdatedOnError(t *testing.T) {
Txn: &proto.Transaction{
Name: "test txn",
},
Replica: proto.Replica{
Replica: proto.ReplicaDescriptor{
NodeID: 12345,
},
},
Expand Down
8 changes: 4 additions & 4 deletions multiraft/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import (
// An EventLeaderElection is broadcast when a group starts or completes
// an election. NodeID is zero when an election is in progress.
type EventLeaderElection struct {
GroupID proto.RangeID
NodeID proto.RaftNodeID
Term uint64
GroupID proto.RangeID
ReplicaID proto.ReplicaID
Term uint64
}

// An EventCommandCommitted is broadcast whenever a command has been committed.
Expand All @@ -52,7 +52,7 @@ type EventMembershipChangeCommitted struct {
GroupID proto.RangeID
CommandID string
Index uint64
NodeID proto.RaftNodeID
Replica proto.ReplicaDescriptor
ChangeType raftpb.ConfChangeType
Payload []byte

Expand Down
Loading