Skip to content

Commit

Permalink
fix(generator): check of inflight conflict early
Browse files Browse the repository at this point in the history
  • Loading branch information
Dmitry Kropachev committed Jul 16, 2023
1 parent c300396 commit 88fee5e
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 8 deletions.
8 changes: 4 additions & 4 deletions pkg/generators/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (g *Generator) fillAllPartitions(stopFlag *stop.Flag) {
}
return true
}
for {
for !stopFlag.IsHardOrSoft() {
values := CreatePartitionKeyValues(g.table, g.r, &g.partitionsConfig)
token, err := g.routingKeyCreator.GetHash(g.table, values)
if err != nil {
Expand All @@ -162,13 +162,13 @@ func (g *Generator) fillAllPartitions(stopFlag *stop.Flag) {
g.cntCreated++
idx := token % g.partitionCount
partition := g.partitions[idx]
if partition.inFlight.Has(token) {
continue
}
select {
case partition.values <- &typedef.ValueWithToken{Token: token, Value: values}:
g.cntEmitted++
default:
if stopFlag.IsHardOrSoft() {
return
}
if !pFilled[idx] {
pFilled[idx] = true
if allFilled() {
Expand Down
16 changes: 12 additions & 4 deletions pkg/inflight/inflight.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const shrinkInflightsLimit = 1000
type InFlight interface {
AddIfNotPresent(uint64) bool
Delete(uint64)
Has(uint64) bool
}

// New creates a instance of a simple InFlight set.
Expand Down Expand Up @@ -74,6 +75,11 @@ func (s *shardedSyncU64set) AddIfNotPresent(v uint64) bool {
return ss.AddIfNotPresent(v)
}

func (s *shardedSyncU64set) Has(v uint64) bool {
ss := s.shards[v%256]
return ss.Has(v)
}

type syncU64set struct {
values map[uint64]struct{}
deleted uint64
Expand All @@ -90,31 +96,33 @@ func (s *syncU64set) AddIfNotPresent(u uint64) bool {
}
s.lock.RUnlock()
s.lock.Lock()
defer s.lock.Unlock()
_, ok = s.values[u]
if ok {
s.lock.Unlock()
return false
}
s.values[u] = struct{}{}
s.lock.Unlock()
return true
}

func (s *syncU64set) Has(u uint64) bool {
s.lock.RLock()
defer s.lock.RUnlock()
_, ok := s.values[u]
s.lock.RUnlock()
return ok
}

func (s *syncU64set) Delete(u uint64) {
s.lock.Lock()
defer s.lock.Unlock()
_, ok := s.values[u]
if !ok {
s.lock.Unlock()
return
}
delete(s.values, u)
s.addDeleted(1)
s.lock.Unlock()
}

func (s *syncU64set) addDeleted(n uint64) {
Expand All @@ -126,7 +134,6 @@ func (s *syncU64set) addDeleted(n uint64) {

func (s *syncU64set) shrink() {
s.lock.Lock()
defer s.lock.Unlock()
mapLen := uint64(0)
if uint64(len(s.values)) >= s.deleted {
mapLen = uint64(len(s.values)) - s.deleted
Expand All @@ -138,4 +145,5 @@ func (s *syncU64set) shrink() {
}
s.values = newValues
s.deleted = 0
s.lock.Unlock()
}

0 comments on commit 88fee5e

Please sign in to comment.