diff --git a/pkg/generators/generator.go b/pkg/generators/generator.go index 6c004254..0277b9ab 100644 --- a/pkg/generators/generator.go +++ b/pkg/generators/generator.go @@ -153,7 +153,7 @@ func (g *Generator) fillAllPartitions(stopFlag *stop.Flag) { } return true } - for { + for !stopFlag.IsHardOrSoft() { values := CreatePartitionKeyValues(g.table, g.r, &g.partitionsConfig) token, err := g.routingKeyCreator.GetHash(g.table, values) if err != nil { @@ -162,13 +162,13 @@ func (g *Generator) fillAllPartitions(stopFlag *stop.Flag) { g.cntCreated++ idx := token % g.partitionCount partition := g.partitions[idx] + if partition.inFlight.Has(token) { + continue + } select { case partition.values <- &typedef.ValueWithToken{Token: token, Value: values}: g.cntEmitted++ default: - if stopFlag.IsHardOrSoft() { - return - } if !pFilled[idx] { pFilled[idx] = true if allFilled() { diff --git a/pkg/inflight/inflight.go b/pkg/inflight/inflight.go index 88ce1116..5ffb6f83 100644 --- a/pkg/inflight/inflight.go +++ b/pkg/inflight/inflight.go @@ -26,6 +26,7 @@ const shrinkInflightsLimit = 1000 type InFlight interface { AddIfNotPresent(uint64) bool Delete(uint64) + Has(uint64) bool } // New creates a instance of a simple InFlight set. @@ -74,6 +75,11 @@ func (s *shardedSyncU64set) AddIfNotPresent(v uint64) bool { return ss.AddIfNotPresent(v) } +func (s *shardedSyncU64set) Has(v uint64) bool { + ss := s.shards[v%256] + return ss.Has(v) +} + type syncU64set struct { values map[uint64]struct{} deleted uint64 @@ -90,31 +96,33 @@ func (s *syncU64set) AddIfNotPresent(u uint64) bool { } s.lock.RUnlock() s.lock.Lock() - defer s.lock.Unlock() _, ok = s.values[u] if ok { + s.lock.Unlock() return false } s.values[u] = struct{}{} + s.lock.Unlock() return true } func (s *syncU64set) Has(u uint64) bool { s.lock.RLock() - defer s.lock.RUnlock() _, ok := s.values[u] + s.lock.RUnlock() return ok } func (s *syncU64set) Delete(u uint64) { s.lock.Lock() - defer s.lock.Unlock() _, ok := s.values[u] if !ok { + s.lock.Unlock() return } delete(s.values, u) s.addDeleted(1) + s.lock.Unlock() } func (s *syncU64set) addDeleted(n uint64) { @@ -126,7 +134,6 @@ func (s *syncU64set) addDeleted(n uint64) { func (s *syncU64set) shrink() { s.lock.Lock() - defer s.lock.Unlock() mapLen := uint64(0) if uint64(len(s.values)) >= s.deleted { mapLen = uint64(len(s.values)) - s.deleted @@ -138,4 +145,5 @@ func (s *syncU64set) shrink() { } s.values = newValues s.deleted = 0 + s.lock.Unlock() }