-
Notifications
You must be signed in to change notification settings - Fork 455
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[cluster] Store shards in sorted form #2890
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -68,17 +68,19 @@ func (s State) Proto() (placementpb.ShardState, error) { | |
func NewShard(id uint32) Shard { return &shard{id: id, state: Unknown} } | ||
|
||
// NewShardFromProto create a new shard from proto. | ||
func NewShardFromProto(shard *placementpb.Shard) (Shard, error) { | ||
state, err := NewShardStateFromProto(shard.State) | ||
func NewShardFromProto(spb *placementpb.Shard) (Shard, error) { | ||
state, err := NewShardStateFromProto(spb.State) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
return NewShard(shard.Id). | ||
SetState(state). | ||
SetSourceID(shard.SourceId). | ||
SetCutoverNanos(shard.CutoverNanos). | ||
SetCutoffNanos(shard.CutoffNanos), nil | ||
return &shard{ | ||
id: spb.Id, | ||
state: state, | ||
sourceID: spb.SourceId, | ||
cutoverNanos: spb.CutoverNanos, | ||
cutoffNanos: spb.CutoffNanos, | ||
}, nil | ||
} | ||
|
||
type shard struct { | ||
|
@@ -146,26 +148,26 @@ func (s *shard) Equals(other Shard) bool { | |
} | ||
|
||
func (s *shard) Proto() (*placementpb.Shard, error) { | ||
ss, err := s.State().Proto() | ||
ss, err := s.state.Proto() | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
return &placementpb.Shard{ | ||
Id: s.ID(), | ||
Id: s.id, | ||
State: ss, | ||
SourceId: s.SourceID(), | ||
SourceId: s.sourceID, | ||
CutoverNanos: s.cutoverNanos, | ||
CutoffNanos: s.cutoffNanos, | ||
}, nil | ||
} | ||
|
||
func (s *shard) Clone() Shard { | ||
return NewShard(s.ID()). | ||
SetState(s.State()). | ||
SetSourceID(s.SourceID()). | ||
SetCutoverNanos(s.CutoverNanos()). | ||
SetCutoffNanos(s.CutoffNanos()) | ||
if s == nil { | ||
return nil | ||
} | ||
clone := *s | ||
return &clone | ||
} | ||
|
||
// SortableShardsByIDAsc are sortable shards by ID in ascending order | ||
|
@@ -188,11 +190,20 @@ func (s SortableIDsAsc) Less(i, j int) bool { | |
|
||
// NewShards creates a new instance of Shards | ||
func NewShards(ss []Shard) Shards { | ||
shrd := make([]Shard, len(ss)) | ||
copy(shrd, ss) | ||
|
||
sort.Sort(SortableShardsByIDAsc(shrd)) | ||
|
||
shardMap := make(map[uint32]Shard, len(ss)) | ||
for _, s := range ss { | ||
for _, s := range shrd { | ||
shardMap[s.ID()] = s | ||
} | ||
return shards{shardsMap: shardMap} | ||
|
||
return &shards{ | ||
shards: shrd, | ||
shardMap: shardMap, | ||
} | ||
} | ||
|
||
// NewShardsFromProto creates a new set of shards from proto. | ||
|
@@ -209,77 +220,99 @@ func NewShardsFromProto(shards []*placementpb.Shard) (Shards, error) { | |
} | ||
|
||
type shards struct { | ||
shardsMap map[uint32]Shard | ||
shards []Shard | ||
shardMap map[uint32]Shard | ||
} | ||
|
||
func (ss shards) All() []Shard { | ||
shards := make([]Shard, 0, len(ss.shardsMap)) | ||
for _, shard := range ss.shardsMap { | ||
shards = append(shards, shard) | ||
} | ||
sort.Sort(SortableShardsByIDAsc(shards)) | ||
func (ss *shards) All() []Shard { | ||
shards := make([]Shard, len(ss.shards)) | ||
copy(shards, ss.shards) | ||
|
||
return shards | ||
} | ||
|
||
func (ss shards) AllIDs() []uint32 { | ||
ids := make([]uint32, 0, len(ss.shardsMap)) | ||
for _, shard := range ss.shardsMap { | ||
ids = append(ids, shard.ID()) | ||
func (ss *shards) AllIDs() []uint32 { | ||
shardIDs := make([]uint32, 0, len(ss.shards)) | ||
for _, shrd := range ss.shards { | ||
shardIDs = append(shardIDs, shrd.ID()) | ||
} | ||
sort.Sort(SortableIDsAsc(ids)) | ||
return ids | ||
|
||
return shardIDs | ||
} | ||
|
||
func (ss shards) NumShards() int { | ||
return len(ss.shardsMap) | ||
func (ss *shards) NumShards() int { | ||
return len(ss.shards) | ||
} | ||
|
||
func (ss shards) Shard(id uint32) (Shard, bool) { | ||
shard, ok := ss.shardsMap[id] | ||
return shard, ok | ||
func (ss *shards) Shard(id uint32) (Shard, bool) { | ||
shard, ok := ss.shardMap[id] | ||
if !ok { | ||
return nil, false | ||
} | ||
|
||
return shard, true | ||
} | ||
|
||
func (ss shards) Add(shard Shard) { | ||
ss.shardsMap[shard.ID()] = shard | ||
func (ss *shards) Add(shard Shard) { | ||
id := shard.ID() | ||
i := sort.Search(len(ss.shards), func(i int) bool { return ss.shards[i].ID() >= id }) | ||
if i < len(ss.shards) && ss.shards[i].ID() == id { | ||
ss.shards[i] = shard | ||
ss.shardMap[id] = shard | ||
return | ||
} | ||
|
||
ss.shards = append(ss.shards, shard) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: maybe worth a comment explaining this? i.e. "insert into the middle by adding to the end and then shifting data". There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Given that shards are added relatively rarely, might simply add to the end and sort. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the small complexity add is worth the fact that won't surprise you with a slowdown of an order of magnitude. |
||
ss.shardMap[id] = shard | ||
|
||
if i >= len(ss.shards)-1 { | ||
return | ||
} | ||
|
||
copy(ss.shards[i+1:], ss.shards[i:]) | ||
ss.shards[i] = shard | ||
} | ||
|
||
func (ss shards) Remove(shard uint32) { | ||
delete(ss.shardsMap, shard) | ||
func (ss *shards) Remove(id uint32) { | ||
i := sort.Search(len(ss.shards), func(i int) bool { return ss.shards[i].ID() >= id }) | ||
if i < len(ss.shards) && ss.shards[i].ID() == id { | ||
delete(ss.shardMap, id) | ||
ss.shards = ss.shards[:i+copy(ss.shards[i:], ss.shards[i+1:])] | ||
} | ||
} | ||
|
||
func (ss shards) Contains(shard uint32) bool { | ||
_, ok := ss.shardsMap[shard] | ||
func (ss *shards) Contains(shard uint32) bool { | ||
_, ok := ss.shardMap[shard] | ||
return ok | ||
} | ||
|
||
func (ss shards) NumShardsForState(state State) int { | ||
func (ss *shards) NumShardsForState(state State) int { | ||
count := 0 | ||
for _, s := range ss.shardsMap { | ||
for _, s := range ss.shards { | ||
if s.State() == state { | ||
count++ | ||
} | ||
} | ||
return count | ||
} | ||
|
||
func (ss shards) ShardsForState(state State) []Shard { | ||
var r []Shard | ||
for _, s := range ss.shardsMap { | ||
func (ss *shards) ShardsForState(state State) []Shard { | ||
r := make([]Shard, 0, len(ss.shards)) | ||
for _, s := range ss.shards { | ||
if s.State() == state { | ||
r = append(r, s) | ||
} | ||
} | ||
return r | ||
} | ||
|
||
func (ss shards) Equals(other Shards) bool { | ||
shards := ss.All() | ||
otherShards := other.All() | ||
if len(shards) != len(otherShards) { | ||
func (ss *shards) Equals(other Shards) bool { | ||
if len(ss.shards) != other.NumShards() { | ||
return false | ||
} | ||
|
||
for i, shard := range shards { | ||
otherShards := other.All() | ||
for i, shard := range ss.shards { | ||
otherShard := otherShards[i] | ||
if !shard.Equals(otherShard) { | ||
return false | ||
|
@@ -288,7 +321,7 @@ func (ss shards) Equals(other Shards) bool { | |
return true | ||
} | ||
|
||
func (ss shards) String() string { | ||
func (ss *shards) String() string { | ||
var strs []string | ||
for _, state := range validStates() { | ||
ids := NewShards(ss.ShardsForState(state)).AllIDs() | ||
|
@@ -298,10 +331,9 @@ func (ss shards) String() string { | |
return fmt.Sprintf("[%s]", strings.Join(strs, ", ")) | ||
} | ||
|
||
func (ss shards) Proto() ([]*placementpb.Shard, error) { | ||
res := make([]*placementpb.Shard, 0, len(ss.shardsMap)) | ||
// All() returns the shards in ID ascending order. | ||
for _, shard := range ss.All() { | ||
func (ss *shards) Proto() ([]*placementpb.Shard, error) { | ||
res := make([]*placementpb.Shard, 0, len(ss.shards)) | ||
for _, shard := range ss.shards { | ||
sp, err := shard.Proto() | ||
if err != nil { | ||
return nil, err | ||
|
@@ -312,22 +344,21 @@ func (ss shards) Proto() ([]*placementpb.Shard, error) { | |
return res, nil | ||
} | ||
|
||
func (ss shards) Clone() Shards { | ||
shards := make([]Shard, ss.NumShards()) | ||
for i, shard := range ss.All() { | ||
shards[i] = shard.Clone() | ||
func (ss *shards) Clone() Shards { | ||
shrds := make([]Shard, 0, len(ss.shards)) | ||
shardMap := make(map[uint32]Shard, len(ss.shards)) | ||
|
||
for _, shrd := range ss.shards { | ||
shrds = append(shrds, shrd.Clone()) | ||
shardMap[shrd.ID()] = shrd | ||
} | ||
|
||
return NewShards(shards) | ||
return &shards{ | ||
shards: shrds, | ||
shardMap: shardMap, | ||
} | ||
} | ||
|
||
// SortableShardProtosByIDAsc sorts shard protos by their ids in ascending order. | ||
type SortableShardProtosByIDAsc []*placementpb.Shard | ||
|
||
func (su SortableShardProtosByIDAsc) Len() int { return len(su) } | ||
func (su SortableShardProtosByIDAsc) Less(i, j int) bool { return su[i].Id < su[j].Id } | ||
func (su SortableShardProtosByIDAsc) Swap(i, j int) { su[i], su[j] = su[j], su[i] } | ||
|
||
// validStates returns all the valid states. | ||
func validStates() []State { | ||
return []State{ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: maybe worth adding a couple of sanity tests on this one if they don't already exist? I expect they may not since the previous implementation was trivial (not that this implementation is all that complex either ofc).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done