Skip to content

Commit

Permalink
rename mutexes
Browse files Browse the repository at this point in the history
  • Loading branch information
anatolie-ssv committed Oct 19, 2024
1 parent cf5bbf5 commit 2be0c5a
Showing 1 changed file with 28 additions and 24 deletions.
52 changes: 28 additions & 24 deletions registry/storage/shares.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,12 @@ type sharesStorage struct {
prefix []byte
shares map[string]*types.SSVShare
validatorStore *validatorStore
dbmu sync.Mutex // parent lock for in-memory mutex
mu sync.RWMutex
// storageMtx serializes access to the database in order to avoid
// re-creation of a deleted share during update metadata
storageMtx sync.Mutex
// memoryMtx allows more granular access to in-memory shares map
// to minimize time spent in 'locked' mode when updating state
memoryMtx sync.RWMutex
}

type storageOperator struct {
Expand Down Expand Up @@ -152,8 +156,8 @@ func (s *sharesStorage) load() error {
}

func (s *sharesStorage) Get(_ basedb.Reader, pubKey []byte) (*types.SSVShare, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
s.memoryMtx.RLock()
defer s.memoryMtx.RUnlock()

return s.unsafeGet(pubKey)
}
Expand All @@ -164,8 +168,8 @@ func (s *sharesStorage) unsafeGet(pubKey []byte) (*types.SSVShare, bool) {
}

func (s *sharesStorage) List(_ basedb.Reader, filters ...SharesFilter) []*types.SSVShare {
s.mu.RLock()
defer s.mu.RUnlock()
s.memoryMtx.RLock()
defer s.memoryMtx.RUnlock()

if len(filters) == 0 {
return maps.Values(s.shares)
Expand All @@ -185,8 +189,8 @@ Shares:
}

func (s *sharesStorage) Range(_ basedb.Reader, fn func(*types.SSVShare) bool) {
s.mu.RLock()
defer s.mu.RUnlock()
s.memoryMtx.RLock()
defer s.memoryMtx.RUnlock()

for _, share := range s.shares {
if !fn(share) {
Expand All @@ -208,13 +212,13 @@ func (s *sharesStorage) Save(rw basedb.ReadWriter, shares ...*types.SSVShare) er

s.logger.Debug("save validators to shares storage", zap.Int("count", len(shares)))

s.dbmu.Lock()
defer s.dbmu.Unlock()
s.storageMtx.Lock()
defer s.storageMtx.Unlock()

// Update in-memory.
err := func() error {
s.mu.Lock()
defer s.mu.Unlock()
s.memoryMtx.Lock()
defer s.memoryMtx.Unlock()

updateShares := make([]*types.SSVShare, 0, len(shares))
addShares := make([]*types.SSVShare, 0, len(shares))
Expand Down Expand Up @@ -323,12 +327,12 @@ func (s *sharesStorage) storageShareToSpecShare(share *storageShare) (*types.SSV
var errShareNotFound = errors.New("share not found")

func (s *sharesStorage) Delete(rw basedb.ReadWriter, pubKey []byte) error {
s.dbmu.Lock()
defer s.dbmu.Unlock()
s.storageMtx.Lock()
defer s.storageMtx.Unlock()

err := func() error {
s.mu.Lock()
defer s.mu.Unlock()
s.memoryMtx.Lock()
defer s.memoryMtx.Unlock()

share, found := s.shares[hex.EncodeToString(pubKey)]
if !found {
Expand Down Expand Up @@ -356,12 +360,12 @@ func (s *sharesStorage) Delete(rw basedb.ReadWriter, pubKey []byte) error {
func (s *sharesStorage) UpdateValidatorsMetadata(data map[spectypes.ValidatorPK]*beaconprotocol.ValidatorMetadata) error {
var shares []*types.SSVShare

s.dbmu.Lock()
defer s.dbmu.Unlock()
s.storageMtx.Lock()
defer s.storageMtx.Unlock()

err := func() error {
s.mu.Lock()
defer s.mu.Unlock()
s.memoryMtx.Lock()
defer s.memoryMtx.Unlock()

for pk, metadata := range data {
if metadata == nil {
Expand Down Expand Up @@ -392,12 +396,12 @@ func (s *sharesStorage) UpdateValidatorsMetadata(data map[spectypes.ValidatorPK]

// Drop deletes all shares.
func (s *sharesStorage) Drop() error {
s.dbmu.Lock()
defer s.dbmu.Unlock()
s.storageMtx.Lock()
defer s.storageMtx.Unlock()

func() {
s.mu.Lock()
defer s.mu.Unlock()
s.memoryMtx.Lock()
defer s.memoryMtx.Unlock()

s.shares = make(map[string]*types.SSVShare)
s.validatorStore.handleDrop()
Expand Down

0 comments on commit 2be0c5a

Please sign in to comment.