Skip to content

Commit

Permalink
Fix issue scylladb#267
Browse files Browse the repository at this point in the history
  • Loading branch information
dkropachev committed Feb 1, 2023
1 parent 8bcf95d commit 9d991fe
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 36 deletions.
99 changes: 70 additions & 29 deletions inflight/inflight.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@ package inflight

import (
"sync"

"github.com/scylladb/go-set/u64set"
)

// We track inflights in the map, maps in golang are not shrinking
// Therefore we track how many inflights were deleted and when it reaches the limit
// we forcefully recreate the map to shrink it
const shrinkInflightsLimit = 1000000

type InFlight interface {
AddIfNotPresent(uint64) bool
Delete(uint64)
Expand All @@ -28,13 +31,15 @@ type InFlight interface {
// New creates a instance of a simple InFlight set.
// It's internal data is protected by a simple sync.RWMutex.
func New() InFlight {
return newSyncU64set()
return newSyncU64set(shrinkInflightsLimit)
}

func newSyncU64set() *syncU64set {
func newSyncU64set(limit uint64) *syncU64set {
return &syncU64set{
pks: u64set.New(),
mu: &sync.RWMutex{},
values: make(map[uint64]bool),
limit: limit,
deleted: 0,
lock: sync.RWMutex{},
}
}

Expand All @@ -48,7 +53,7 @@ func NewConcurrent() InFlight {
func newShardedSyncU64set() *shardedSyncU64set {
s := &shardedSyncU64set{}
for i := range s.shards {
s.shards[i] = newSyncU64set()
s.shards[i] = newSyncU64set(shrinkInflightsLimit)
}
return s
}
Expand All @@ -69,35 +74,71 @@ func (s *shardedSyncU64set) AddIfNotPresent(v uint64) bool {
return ss.AddIfNotPresent(v)
}

// syncU64set is an InFlight implementation protected by a sync.RWLock
type syncU64set struct {
pks *u64set.Set
mu *sync.RWMutex
values map[uint64]bool
deleted uint64
limit uint64
lock sync.RWMutex
}

func (s *syncU64set) Delete(v uint64) {
s.mu.Lock()
defer s.mu.Unlock()
s.pks.Remove(v)
func (s *syncU64set) AddIfNotPresent(u uint64) bool {
s.lock.RLock()
_, ok := s.values[u]
if ok {
s.lock.RUnlock()
return false
}
s.lock.RUnlock()
s.lock.Lock()
defer s.lock.Unlock()
_, ok = s.values[u]
if ok {
return false
}
s.values[u] = true
return true
}

func (s *syncU64set) AddIfNotPresent(v uint64) bool {
s.mu.RLock()
if s.pks.Has(v) {
s.mu.RUnlock()
return false
func (s *syncU64set) Has(u uint64) bool {
s.lock.RLock()
defer s.lock.RUnlock()
_, ok := s.values[u]
return ok
}

func (s *syncU64set) Delete(u uint64) {
s.lock.Lock()
defer s.lock.Unlock()
_, ok := s.values[u]
if !ok {
return
}
s.mu.RUnlock()
return s.addIfNotPresent(v)
delete(s.values, u)
s.addDeleted(1)
}

func (s *syncU64set) addIfNotPresent(v uint64) bool {
s.mu.Lock()
defer s.mu.Unlock()
if s.pks.Has(v) {
// double check
return false
func (s *syncU64set) addDeleted(n uint64) {
s.deleted += n
if s.limit != 0 && s.deleted > s.limit {
go s.shrink()
}
s.pks.Add(v)
return true
}

func (s *syncU64set) shrink() {
s.lock.Lock()
defer s.lock.Unlock()
var newValues map[uint64]bool
if uint64(len(s.values)) >= s.deleted {
newValues = make(map[uint64]bool, uint64(len(s.values))-s.deleted)
} else {
newValues = make(map[uint64]bool, 0)
}

for key, val := range s.values {
if val == true {
newValues[key] = val
}
}
s.values = newValues
s.deleted = 0
}
14 changes: 7 additions & 7 deletions inflight/inflight_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (

func TestAddIfNotPresent(t *testing.T) {
t.Parallel()
flight := newSyncU64set()
flight := newSyncU64set(shrinkInflightsLimit)
if !flight.AddIfNotPresent(10) {
t.Error("could not add the first value")
}
Expand All @@ -34,11 +34,11 @@ func TestAddIfNotPresent(t *testing.T) {

func TestDelete(t *testing.T) {
t.Parallel()
flight := newSyncU64set()
flight := newSyncU64set(shrinkInflightsLimit)
flight.AddIfNotPresent(10)

flight.Delete(10)
if flight.pks.Has(10) {
if flight.Has(10) {
t.Error("did not delete the value")
}
}
Expand All @@ -60,20 +60,20 @@ func TestDeleteSharded(t *testing.T) {
flight.AddIfNotPresent(10)

flight.Delete(10)
if flight.shards[10%256].pks.Has(10) {
if flight.shards[10%256].Has(10) {
t.Error("did not delete the value")
}
}

func TestInflight(t *testing.T) {
t.Parallel()
flight := newSyncU64set()
flight := newSyncU64set(shrinkInflightsLimit)
f := func(v uint64) interface{} {
return flight.AddIfNotPresent(v)
}
g := func(v uint64) interface{} {
flight.Delete(v)
return !flight.pks.Has(v)
return !flight.Has(v)
}

cfg := createQuickConfig()
Expand All @@ -90,7 +90,7 @@ func TestInflightSharded(t *testing.T) {
}
g := func(v uint64) interface{} {
flight.Delete(v)
return !flight.shards[v%256].pks.Has(v)
return !flight.shards[v%256].Has(v)
}

cfg := createQuickConfig()
Expand Down

0 comments on commit 9d991fe

Please sign in to comment.