Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cluster] Store shards in sorted form #2890

Merged
merged 2 commits into from
Nov 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/cluster/placement/algo/sharded_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1230,8 +1230,8 @@ func verifyAllShardsInAvailableState(t *testing.T, p placement.Placement) {
for _, instance := range p.Instances() {
s := instance.Shards()
require.Equal(t, len(s.All()), len(s.ShardsForState(shard.Available)))
require.Nil(t, s.ShardsForState(shard.Initializing))
require.Nil(t, s.ShardsForState(shard.Leaving))
require.Empty(t, s.ShardsForState(shard.Initializing))
require.Empty(t, s.ShardsForState(shard.Leaving))
}
}

Expand Down
175 changes: 108 additions & 67 deletions src/cluster/shard/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,19 @@ func (s State) Proto() (placementpb.ShardState, error) {
func NewShard(id uint32) Shard { return &shard{id: id, state: Unknown} }

// NewShardFromProto create a new shard from proto.
func NewShardFromProto(shard *placementpb.Shard) (Shard, error) {
state, err := NewShardStateFromProto(shard.State)
func NewShardFromProto(spb *placementpb.Shard) (Shard, error) {
state, err := NewShardStateFromProto(spb.State)
if err != nil {
return nil, err
}

return NewShard(shard.Id).
SetState(state).
SetSourceID(shard.SourceId).
SetCutoverNanos(shard.CutoverNanos).
SetCutoffNanos(shard.CutoffNanos), nil
return &shard{
id: spb.Id,
state: state,
sourceID: spb.SourceId,
cutoverNanos: spb.CutoverNanos,
cutoffNanos: spb.CutoffNanos,
}, nil
}

type shard struct {
Expand Down Expand Up @@ -146,26 +148,26 @@ func (s *shard) Equals(other Shard) bool {
}

func (s *shard) Proto() (*placementpb.Shard, error) {
ss, err := s.State().Proto()
ss, err := s.state.Proto()
if err != nil {
return nil, err
}

return &placementpb.Shard{
Id: s.ID(),
Id: s.id,
State: ss,
SourceId: s.SourceID(),
SourceId: s.sourceID,
CutoverNanos: s.cutoverNanos,
CutoffNanos: s.cutoffNanos,
}, nil
}

func (s *shard) Clone() Shard {
return NewShard(s.ID()).
SetState(s.State()).
SetSourceID(s.SourceID()).
SetCutoverNanos(s.CutoverNanos()).
SetCutoffNanos(s.CutoffNanos())
if s == nil {
return nil
}
clone := *s
return &clone
}

// SortableShardsByIDAsc are sortable shards by ID in ascending order
Expand All @@ -188,11 +190,23 @@ func (s SortableIDsAsc) Less(i, j int) bool {

// NewShards creates a new instance of Shards
func NewShards(ss []Shard) Shards {
// deduplicate first, last one wins
shardMap := make(map[uint32]Shard, len(ss))
for _, s := range ss {
shardMap[s.ID()] = s
}
return shards{shardsMap: shardMap}

shrds := make([]Shard, 0, len(shardMap))
for _, s := range shardMap {
shrds = append(shrds, s)
}

sort.Sort(SortableShardsByIDAsc(shrds))

return &shards{
shards: shrds,
shardMap: shardMap,
}
}

// NewShardsFromProto creates a new set of shards from proto.
Expand All @@ -209,77 +223,106 @@ func NewShardsFromProto(shards []*placementpb.Shard) (Shards, error) {
}

type shards struct {
shardsMap map[uint32]Shard
shards []Shard
shardMap map[uint32]Shard
}

func (ss shards) All() []Shard {
shards := make([]Shard, 0, len(ss.shardsMap))
for _, shard := range ss.shardsMap {
shards = append(shards, shard)
}
sort.Sort(SortableShardsByIDAsc(shards))
func (ss *shards) All() []Shard {
shards := make([]Shard, len(ss.shards))
copy(shards, ss.shards)

return shards
}

func (ss shards) AllIDs() []uint32 {
ids := make([]uint32, 0, len(ss.shardsMap))
for _, shard := range ss.shardsMap {
ids = append(ids, shard.ID())
func (ss *shards) AllIDs() []uint32 {
shardIDs := make([]uint32, 0, len(ss.shards))
for _, shrd := range ss.shards {
shardIDs = append(shardIDs, shrd.ID())
}
sort.Sort(SortableIDsAsc(ids))
return ids

return shardIDs
}

func (ss shards) NumShards() int {
return len(ss.shardsMap)
func (ss *shards) NumShards() int {
return len(ss.shards)
}

func (ss shards) Shard(id uint32) (Shard, bool) {
shard, ok := ss.shardsMap[id]
return shard, ok
func (ss *shards) Shard(id uint32) (Shard, bool) {
shard, ok := ss.shardMap[id]
if !ok {
return nil, false
}

return shard, true
}

func (ss shards) Add(shard Shard) {
ss.shardsMap[shard.ID()] = shard
func (ss *shards) Add(shard Shard) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: maybe worth adding a couple of sanity tests on this one if they don't already exist? I expect they may not since the previous implementation was trivial (not that this implementation is all that complex either ofc).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

id := shard.ID()
// we keep a sorted slice of shards, do a binary search to either find the index
// of an existing shard for replacement, or the target index position
i := sort.Search(len(ss.shards), func(i int) bool { return ss.shards[i].ID() >= id })
if i < len(ss.shards) && ss.shards[i].ID() == id {
ss.shards[i] = shard
ss.shardMap[id] = shard
return
}

// extend the sorted shard slice by 1
ss.shards = append(ss.shards, shard)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: maybe worth a comment explaining this? i.e. "insert into the middle by adding to the end and then shifting data".

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that shards are added relatively rarely, might simply add to the end and sort.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the small complexity add is worth the fact that won't surprise you with a slowdown of an order of magnitude.

ss.shardMap[id] = shard

// target position was at the end, so extending with the new shard was enough
if i >= len(ss.shards)-1 {
return
}

// if not, copy over all slice elements shifted by 1 and overwrite data at index
copy(ss.shards[i+1:], ss.shards[i:])
ss.shards[i] = shard
}

func (ss shards) Remove(shard uint32) {
delete(ss.shardsMap, shard)
func (ss *shards) Remove(id uint32) {
// we keep a sorted slice of shards, do a binary search to find the index
i := sort.Search(len(ss.shards), func(i int) bool { return ss.shards[i].ID() >= id })
if i < len(ss.shards) && ss.shards[i].ID() == id {
delete(ss.shardMap, id)
// shift all other elements back after removal
ss.shards = ss.shards[:i+copy(ss.shards[i:], ss.shards[i+1:])]
}
}

func (ss shards) Contains(shard uint32) bool {
_, ok := ss.shardsMap[shard]
func (ss *shards) Contains(shard uint32) bool {
_, ok := ss.shardMap[shard]
return ok
}

func (ss shards) NumShardsForState(state State) int {
func (ss *shards) NumShardsForState(state State) int {
count := 0
for _, s := range ss.shardsMap {
for _, s := range ss.shards {
if s.State() == state {
count++
}
}
return count
}

func (ss shards) ShardsForState(state State) []Shard {
var r []Shard
for _, s := range ss.shardsMap {
func (ss *shards) ShardsForState(state State) []Shard {
r := make([]Shard, 0, len(ss.shards))
for _, s := range ss.shards {
if s.State() == state {
r = append(r, s)
}
}
return r
}

func (ss shards) Equals(other Shards) bool {
shards := ss.All()
otherShards := other.All()
if len(shards) != len(otherShards) {
func (ss *shards) Equals(other Shards) bool {
if len(ss.shards) != other.NumShards() {
return false
}

for i, shard := range shards {
otherShards := other.All()
for i, shard := range ss.shards {
otherShard := otherShards[i]
if !shard.Equals(otherShard) {
return false
Expand All @@ -288,7 +331,7 @@ func (ss shards) Equals(other Shards) bool {
return true
}

func (ss shards) String() string {
func (ss *shards) String() string {
var strs []string
for _, state := range validStates() {
ids := NewShards(ss.ShardsForState(state)).AllIDs()
Expand All @@ -298,10 +341,9 @@ func (ss shards) String() string {
return fmt.Sprintf("[%s]", strings.Join(strs, ", "))
}

func (ss shards) Proto() ([]*placementpb.Shard, error) {
res := make([]*placementpb.Shard, 0, len(ss.shardsMap))
// All() returns the shards in ID ascending order.
for _, shard := range ss.All() {
func (ss *shards) Proto() ([]*placementpb.Shard, error) {
res := make([]*placementpb.Shard, 0, len(ss.shards))
for _, shard := range ss.shards {
sp, err := shard.Proto()
if err != nil {
return nil, err
Expand All @@ -312,22 +354,21 @@ func (ss shards) Proto() ([]*placementpb.Shard, error) {
return res, nil
}

func (ss shards) Clone() Shards {
shards := make([]Shard, ss.NumShards())
for i, shard := range ss.All() {
shards[i] = shard.Clone()
func (ss *shards) Clone() Shards {
shrds := make([]Shard, 0, len(ss.shards))
shardMap := make(map[uint32]Shard, len(ss.shards))

for _, shrd := range ss.shards {
shrds = append(shrds, shrd.Clone())
shardMap[shrd.ID()] = shrd
}

return NewShards(shards)
return &shards{
shards: shrds,
shardMap: shardMap,
}
}

// SortableShardProtosByIDAsc sorts shard protos by their ids in ascending order.
type SortableShardProtosByIDAsc []*placementpb.Shard

func (su SortableShardProtosByIDAsc) Len() int { return len(su) }
func (su SortableShardProtosByIDAsc) Less(i, j int) bool { return su[i].Id < su[j].Id }
func (su SortableShardProtosByIDAsc) Swap(i, j int) { su[i], su[j] = su[j], su[i] }

// validStates returns all the valid states.
func validStates() []State {
return []State{
Expand Down
Loading