Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: don't lock in-memory shares storage for database ops #1778

Open
wants to merge 4 commits into
base: stage
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 104 additions & 97 deletions registry/storage/shares.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"encoding/gob"
"encoding/hex"
"errors"
"fmt"
"sync"

Expand Down Expand Up @@ -57,7 +58,12 @@ type sharesStorage struct {
prefix []byte
shares map[string]*types.SSVShare
validatorStore *validatorStore
mu sync.RWMutex
// storageMtx serializes access to the database in order to avoid
// re-creation of a deleted share during update metadata
storageMtx sync.Mutex
// memoryMtx allows more granular access to in-memory shares map
// to minimize time spent in 'locked' mode when updating state
memoryMtx sync.RWMutex
}

type storageOperator struct {
Expand Down Expand Up @@ -132,9 +138,7 @@ func NewSharesStorage(logger *zap.Logger, db basedb.Database, prefix []byte) (Sh

// load reads all shares from db.
func (s *sharesStorage) load() error {
s.mu.Lock()
defer s.mu.Unlock()

// not locking since at this point nobody has the reference to this object
return s.db.GetAll(append(s.prefix, sharesPrefix...), func(i int, obj basedb.Obj) error {
val := &storageShare{}
if err := val.Decode(obj.Value); err != nil {
Expand All @@ -152,24 +156,20 @@ func (s *sharesStorage) load() error {
}

func (s *sharesStorage) Get(_ basedb.Reader, pubKey []byte) (*types.SSVShare, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
s.memoryMtx.RLock()
defer s.memoryMtx.RUnlock()

return s.unsafeGet(pubKey)
}

func (s *sharesStorage) unsafeGet(pubKey []byte) (*types.SSVShare, bool) {
share := s.shares[hex.EncodeToString(pubKey)]
if share == nil {
return nil, false
}

return share, true
share, found := s.shares[hex.EncodeToString(pubKey)]
return share, found
}

func (s *sharesStorage) List(_ basedb.Reader, filters ...SharesFilter) []*types.SSVShare {
s.mu.RLock()
defer s.mu.RUnlock()
s.memoryMtx.RLock()
defer s.memoryMtx.RUnlock()

if len(filters) == 0 {
return maps.Values(s.shares)
Expand All @@ -189,8 +189,8 @@ Shares:
}

func (s *sharesStorage) Range(_ basedb.Reader, fn func(*types.SSVShare) bool) {
s.mu.RLock()
defer s.mu.RUnlock()
s.memoryMtx.RLock()
defer s.memoryMtx.RUnlock()

for _, share := range s.shares {
if !fn(share) {
Expand All @@ -210,48 +210,57 @@ func (s *sharesStorage) Save(rw basedb.ReadWriter, shares ...*types.SSVShare) er
}
}

s.mu.Lock()
defer s.mu.Unlock()
s.logger.Debug("save validators to shares storage", zap.Int("count", len(shares)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: save validators to shares storage -> save validator shares to shares storage


return s.unsafeSave(rw, shares...)
}
s.storageMtx.Lock()
defer s.storageMtx.Unlock()

func (s *sharesStorage) unsafeSave(rw basedb.ReadWriter, shares ...*types.SSVShare) error {
err := s.db.Using(rw).SetMany(s.prefix, len(shares), func(i int) (basedb.Obj, error) {
share := specShareToStorageShare(shares[i])
value, err := share.Encode()
if err != nil {
return basedb.Obj{}, fmt.Errorf("failed to serialize share: %w", err)
}
return basedb.Obj{Key: s.storageKey(share.ValidatorPubKey[:]), Value: value}, nil
})
if err != nil {
return err
}
// Update in-memory.
err := func() error {
s.memoryMtx.Lock()
defer s.memoryMtx.Unlock()

updateShares := make([]*types.SSVShare, 0, len(shares))
addShares := make([]*types.SSVShare, 0, len(shares))
updateShares := make([]*types.SSVShare, 0, len(shares))
addShares := make([]*types.SSVShare, 0, len(shares))

for _, share := range shares {
key := hex.EncodeToString(share.ValidatorPubKey[:])
for _, share := range shares {
key := hex.EncodeToString(share.ValidatorPubKey[:])

// Update validatorStore indices.
if _, ok := s.shares[key]; ok {
updateShares = append(updateShares, share)
} else {
addShares = append(addShares, share)
// Update validatorStore indices.
if _, ok := s.shares[key]; ok {
updateShares = append(updateShares, share)
} else {
addShares = append(addShares, share)
}
s.shares[key] = share
}
s.shares[key] = share
}

if err := s.validatorStore.handleSharesUpdated(updateShares...); err != nil {
return err
}
if err := s.validatorStore.handleSharesAdded(addShares...); err != nil {
if err := s.validatorStore.handleSharesUpdated(updateShares...); err != nil {
return err
}

if err := s.validatorStore.handleSharesAdded(addShares...); err != nil {
return err
}
Comment on lines +238 to +244
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to wrap errors here (because they come from different code branches might be hard to debug without it):

		if err := s.validatorStore.handleSharesUpdated(updateShares...); err != nil {
			return fmt.Erorrf("handleSharesUpdated: %w", err)
		}

		if err := s.validatorStore.handleSharesAdded(addShares...); err != nil {
			return fmt.Erorrf("handleSharesAdded: %w", err)
		}


return nil
}()
if err != nil {
return err
}

return nil
return s.saveToDB(rw, shares...)
}

func (s *sharesStorage) saveToDB(rw basedb.ReadWriter, shares ...*types.SSVShare) error {
return s.db.Using(rw).SetMany(s.prefix, len(shares), func(i int) (basedb.Obj, error) {
share := specShareToStorageShare(shares[i])
value, err := share.Encode()
if err != nil {
return basedb.Obj{}, fmt.Errorf("failed to serialize share: %w", err)
}
return basedb.Obj{Key: s.storageKey(share.ValidatorPubKey[:]), Value: value}, nil
})
}

func specShareToStorageShare(share *types.SSVShare) *storageShare {
Expand Down Expand Up @@ -315,95 +324,93 @@ func (s *sharesStorage) storageShareToSpecShare(share *storageShare) (*types.SSV
return specShare, nil
}

var errShareNotFound = errors.New("share not found")

func (s *sharesStorage) Delete(rw basedb.ReadWriter, pubKey []byte) error {
s.mu.Lock()
defer s.mu.Unlock()
s.storageMtx.Lock()
defer s.storageMtx.Unlock()

// Delete the share from the database
if err := s.db.Using(rw).Delete(s.prefix, s.storageKey(pubKey)); err != nil {
return err
}
err := func() error {
s.memoryMtx.Lock()
defer s.memoryMtx.Unlock()

share := s.shares[hex.EncodeToString(pubKey)]
if share == nil {
return nil
}
share, found := s.shares[hex.EncodeToString(pubKey)]
if !found {
return errShareNotFound
}

// Remove the share from local storage map
delete(s.shares, hex.EncodeToString(pubKey))
// Remove the share from local storage map
delete(s.shares, hex.EncodeToString(pubKey))

// Remove the share from the validator store. This method will handle its own locking.
if err := s.validatorStore.handleShareRemoved(share); err != nil {
// Remove the share from the validator store. This method will handle its own locking.
return s.validatorStore.handleShareRemoved(share)
}()
if errors.Is(err, errShareNotFound) {
return nil
}
if err != nil {
return err
}

return nil
// Delete the share from the database
return s.db.Using(rw).Delete(s.prefix, s.storageKey(pubKey))
}

// UpdateValidatorsMetadata updates the metadata of the given validator
func (s *sharesStorage) UpdateValidatorsMetadata(data map[spectypes.ValidatorPK]*beaconprotocol.ValidatorMetadata) error {
var shares []*types.SSVShare

func() {
s.mu.RLock()
defer s.mu.RUnlock()
s.storageMtx.Lock()
defer s.storageMtx.Unlock()

err := func() error {
s.memoryMtx.Lock()
defer s.memoryMtx.Unlock()

for pk, metadata := range data {
if metadata == nil {
continue
}

share, exists := s.unsafeGet(pk[:])
if !exists {
continue
}

share.BeaconMetadata = metadata
share.Share.ValidatorIndex = metadata.Index
shares = append(shares, share)
}
}()

saveShares := func(sshares []*types.SSVShare) error {
s.mu.Lock()
defer s.mu.Unlock()
if err := s.unsafeSave(nil, sshares...); err != nil {
return err
for _, share := range shares {
key := hex.EncodeToString(share.ValidatorPubKey[:])
s.shares[key] = share
}
return nil
}

// split into chunks to avoid holding the lock for too long
chunkSize := 1000
for i := 0; i < len(shares); i += chunkSize {
end := i + chunkSize
if end > len(shares) {
end = len(shares)
}
if err := saveShares(shares[i:end]); err != nil {
return err
}
return s.validatorStore.handleSharesUpdated(shares...)
}()
if err != nil {
return err
}

return nil
return s.saveToDB(nil, shares...)
}

// Drop deletes all shares.
func (s *sharesStorage) Drop() error {
s.mu.Lock()
defer s.mu.Unlock()
s.storageMtx.Lock()
defer s.storageMtx.Unlock()

err := s.db.DropPrefix(bytes.Join(
func() {
s.memoryMtx.Lock()
defer s.memoryMtx.Unlock()

s.shares = make(map[string]*types.SSVShare)
s.validatorStore.handleDrop()
}()

return s.db.DropPrefix(bytes.Join(
[][]byte{s.prefix, sharesPrefix, []byte("/")},
nil,
))
if err != nil {
return err
}

s.shares = make(map[string]*types.SSVShare)
s.validatorStore.handleDrop()
return nil
}

// storageKey builds share key using sharesPrefix & validator public key, e.g. "shares/0x00..01"
Expand Down
Loading