From 133d6f7c780d2aa2b9e9ff94179a48be5375e3e2 Mon Sep 17 00:00:00 2001 From: Anatolie Lupacescu Date: Mon, 7 Oct 2024 11:28:44 +0300 Subject: [PATCH 1/4] fix: move persistence outside of lock --- registry/storage/shares.go | 71 ++++++++++++++++---------------------- 1 file changed, 29 insertions(+), 42 deletions(-) diff --git a/registry/storage/shares.go b/registry/storage/shares.go index 46cb1c852f..d075e1fa27 100644 --- a/registry/storage/shares.go +++ b/registry/storage/shares.go @@ -58,6 +58,7 @@ type sharesStorage struct { shares map[string]*types.SSVShare validatorStore *validatorStore mu sync.RWMutex + dbmu sync.Mutex } type storageOperator struct { @@ -135,6 +136,9 @@ func (s *sharesStorage) load() error { s.mu.Lock() defer s.mu.Unlock() + s.dbmu.Lock() + defer s.dbmu.Unlock() + return s.db.GetAll(append(s.prefix, sharesPrefix...), func(i int, obj basedb.Obj) error { val := &storageShare{} if err := val.Decode(obj.Value); err != nil { @@ -159,12 +163,8 @@ func (s *sharesStorage) Get(_ basedb.Reader, pubKey []byte) (*types.SSVShare, bo } func (s *sharesStorage) unsafeGet(pubKey []byte) (*types.SSVShare, bool) { - share := s.shares[hex.EncodeToString(pubKey)] - if share == nil { - return nil, false - } - - return share, true + share, found := s.shares[hex.EncodeToString(pubKey)] + return share, found } func (s *sharesStorage) List(_ basedb.Reader, filters ...SharesFilter) []*types.SSVShare { @@ -210,25 +210,26 @@ func (s *sharesStorage) Save(rw basedb.ReadWriter, shares ...*types.SSVShare) er } } - s.mu.Lock() - defer s.mu.Unlock() + err := func() error { + s.dbmu.Lock() + defer s.dbmu.Unlock() - return s.unsafeSave(rw, shares...) -} - -func (s *sharesStorage) unsafeSave(rw basedb.ReadWriter, shares ...*types.SSVShare) error { - err := s.db.Using(rw).SetMany(s.prefix, len(shares), func(i int) (basedb.Obj, error) { - share := specShareToStorageShare(shares[i]) - value, err := share.Encode() - if err != nil { - return basedb.Obj{}, fmt.Errorf("failed to serialize share: %w", err) - } - return basedb.Obj{Key: s.storageKey(share.ValidatorPubKey[:]), Value: value}, nil - }) + return s.db.Using(rw).SetMany(s.prefix, len(shares), func(i int) (basedb.Obj, error) { + share := specShareToStorageShare(shares[i]) + value, err := share.Encode() + if err != nil { + return basedb.Obj{}, fmt.Errorf("failed to serialize share: %w", err) + } + return basedb.Obj{Key: s.storageKey(share.ValidatorPubKey[:]), Value: value}, nil + }) + }() if err != nil { return err } + s.mu.Lock() + defer s.mu.Unlock() + updateShares := make([]*types.SSVShare, 0, len(shares)) addShares := make([]*types.SSVShare, 0, len(shares)) @@ -247,6 +248,7 @@ func (s *sharesStorage) unsafeSave(rw basedb.ReadWriter, shares ...*types.SSVSha if err := s.validatorStore.handleSharesUpdated(updateShares...); err != nil { return err } + if err := s.validatorStore.handleSharesAdded(addShares...); err != nil { return err } @@ -319,6 +321,9 @@ func (s *sharesStorage) Delete(rw basedb.ReadWriter, pubKey []byte) error { s.mu.Lock() defer s.mu.Unlock() + s.dbmu.Lock() + defer s.dbmu.Unlock() + // Delete the share from the database if err := s.db.Using(rw).Delete(s.prefix, s.storageKey(pubKey)); err != nil { return err @@ -364,28 +369,7 @@ func (s *sharesStorage) UpdateValidatorsMetadata(data map[spectypes.ValidatorPK] } }() - saveShares := func(sshares []*types.SSVShare) error { - s.mu.Lock() - defer s.mu.Unlock() - if err := s.unsafeSave(nil, sshares...); err != nil { - return err - } - return nil - } - - // split into chunks to avoid holding the lock for too long - chunkSize := 1000 - for i := 0; i < len(shares); i += chunkSize { - end := i + chunkSize - if end > len(shares) { - end = len(shares) - } - if err := saveShares(shares[i:end]); err != nil { - return err - } - } - - return nil + return s.Save(nil, shares...) } // Drop deletes all shares. @@ -393,6 +377,9 @@ func (s *sharesStorage) Drop() error { s.mu.Lock() defer s.mu.Unlock() + s.dbmu.Lock() + defer s.dbmu.Unlock() + err := s.db.DropPrefix(bytes.Join( [][]byte{s.prefix, sharesPrefix, []byte("/")}, nil, From 90e00bdb894154b93df9c760e25dde69e2039889 Mon Sep 17 00:00:00 2001 From: Anatolie Lupacescu Date: Fri, 11 Oct 2024 10:09:03 +0300 Subject: [PATCH 2/4] use db lock as parent lock for shares map mutex --- registry/storage/shares.go | 165 +++++++++++++++++++++---------------- 1 file changed, 95 insertions(+), 70 deletions(-) diff --git a/registry/storage/shares.go b/registry/storage/shares.go index d075e1fa27..7c8c1c6db1 100644 --- a/registry/storage/shares.go +++ b/registry/storage/shares.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/gob" "encoding/hex" + "errors" "fmt" "sync" @@ -57,8 +58,8 @@ type sharesStorage struct { prefix []byte shares map[string]*types.SSVShare validatorStore *validatorStore + dbmu sync.Mutex // parent lock for in-memory mutex mu sync.RWMutex - dbmu sync.Mutex } type storageOperator struct { @@ -133,12 +134,7 @@ func NewSharesStorage(logger *zap.Logger, db basedb.Database, prefix []byte) (Sh // load reads all shares from db. func (s *sharesStorage) load() error { - s.mu.Lock() - defer s.mu.Unlock() - - s.dbmu.Lock() - defer s.dbmu.Unlock() - + // not locking since at this point nobody has the reference to this object return s.db.GetAll(append(s.prefix, sharesPrefix...), func(i int, obj basedb.Obj) error { val := &storageShare{} if err := val.Decode(obj.Value); err != nil { @@ -210,50 +206,55 @@ func (s *sharesStorage) Save(rw basedb.ReadWriter, shares ...*types.SSVShare) er } } + s.dbmu.Lock() + defer s.dbmu.Unlock() + + // Update in-memory. err := func() error { - s.dbmu.Lock() - defer s.dbmu.Unlock() - - return s.db.Using(rw).SetMany(s.prefix, len(shares), func(i int) (basedb.Obj, error) { - share := specShareToStorageShare(shares[i]) - value, err := share.Encode() - if err != nil { - return basedb.Obj{}, fmt.Errorf("failed to serialize share: %w", err) - } - return basedb.Obj{Key: s.storageKey(share.ValidatorPubKey[:]), Value: value}, nil - }) - }() - if err != nil { - return err - } + s.mu.Lock() + defer s.mu.Unlock() - s.mu.Lock() - defer s.mu.Unlock() + updateShares := make([]*types.SSVShare, 0, len(shares)) + addShares := make([]*types.SSVShare, 0, len(shares)) - updateShares := make([]*types.SSVShare, 0, len(shares)) - addShares := make([]*types.SSVShare, 0, len(shares)) + for _, share := range shares { + key := hex.EncodeToString(share.ValidatorPubKey[:]) - for _, share := range shares { - key := hex.EncodeToString(share.ValidatorPubKey[:]) + // Update validatorStore indices. + if _, ok := s.shares[key]; ok { + updateShares = append(updateShares, share) + } else { + addShares = append(addShares, share) + } + s.shares[key] = share + } - // Update validatorStore indices. - if _, ok := s.shares[key]; ok { - updateShares = append(updateShares, share) - } else { - addShares = append(addShares, share) + if err := s.validatorStore.handleSharesUpdated(updateShares...); err != nil { + return err } - s.shares[key] = share - } - if err := s.validatorStore.handleSharesUpdated(updateShares...); err != nil { - return err - } + if err := s.validatorStore.handleSharesAdded(addShares...); err != nil { + return err + } - if err := s.validatorStore.handleSharesAdded(addShares...); err != nil { + return nil + }() + if err != nil { return err } - return nil + return s.unsafeSave(rw, shares...) +} + +func (s *sharesStorage) unsafeSave(rw basedb.ReadWriter, shares ...*types.SSVShare) error { + return s.db.Using(rw).SetMany(s.prefix, len(shares), func(i int) (basedb.Obj, error) { + share := specShareToStorageShare(shares[i]) + value, err := share.Encode() + if err != nil { + return basedb.Obj{}, fmt.Errorf("failed to serialize share: %w", err) + } + return basedb.Obj{Key: s.storageKey(share.ValidatorPubKey[:]), Value: value}, nil + }) } func specShareToStorageShare(share *types.SSVShare) *storageShare { @@ -317,38 +318,45 @@ func (s *sharesStorage) storageShareToSpecShare(share *storageShare) (*types.SSV return specShare, nil } -func (s *sharesStorage) Delete(rw basedb.ReadWriter, pubKey []byte) error { - s.mu.Lock() - defer s.mu.Unlock() +var errShareNotFound = errors.New("share not found") +func (s *sharesStorage) Delete(rw basedb.ReadWriter, pubKey []byte) error { s.dbmu.Lock() defer s.dbmu.Unlock() - // Delete the share from the database - if err := s.db.Using(rw).Delete(s.prefix, s.storageKey(pubKey)); err != nil { - return err - } + err := func() error { + s.mu.Lock() + defer s.mu.Unlock() - share := s.shares[hex.EncodeToString(pubKey)] - if share == nil { - return nil - } + share, found := s.shares[hex.EncodeToString(pubKey)] + if !found { + return errShareNotFound + } - // Remove the share from local storage map - delete(s.shares, hex.EncodeToString(pubKey)) + // Remove the share from local storage map + delete(s.shares, hex.EncodeToString(pubKey)) - // Remove the share from the validator store. This method will handle its own locking. - if err := s.validatorStore.handleShareRemoved(share); err != nil { + // Remove the share from the validator store. This method will handle its own locking. + return s.validatorStore.handleShareRemoved(share) + }() + if errors.Is(err, errShareNotFound) { + return nil + } + if err != nil { return err } - return nil + // Delete the share from the database + return s.db.Using(rw).Delete(s.prefix, s.storageKey(pubKey)) } // UpdateValidatorsMetadata updates the metadata of the given validator func (s *sharesStorage) UpdateValidatorsMetadata(data map[spectypes.ValidatorPK]*beaconprotocol.ValidatorMetadata) error { var shares []*types.SSVShare + s.dbmu.Lock() + defer s.dbmu.Unlock() + func() { s.mu.RLock() defer s.mu.RUnlock() @@ -357,40 +365,57 @@ func (s *sharesStorage) UpdateValidatorsMetadata(data map[spectypes.ValidatorPK] if metadata == nil { continue } - share, exists := s.unsafeGet(pk[:]) if !exists { continue } - share.BeaconMetadata = metadata share.Share.ValidatorIndex = metadata.Index shares = append(shares, share) } }() - return s.Save(nil, shares...) + saveShares := func(sshares []*types.SSVShare) error { + s.mu.Lock() + defer s.mu.Unlock() + if err := s.unsafeSave(nil, sshares...); err != nil { + return err + } + return s.validatorStore.handleSharesUpdated(shares...) + } + + // split into chunks to avoid holding the lock for too long + chunkSize := 1000 + for i := 0; i < len(shares); i += chunkSize { + end := i + chunkSize + if end > len(shares) { + end = len(shares) + } + if err := saveShares(shares[i:end]); err != nil { + return err + } + } + + return nil } // Drop deletes all shares. func (s *sharesStorage) Drop() error { - s.mu.Lock() - defer s.mu.Unlock() - s.dbmu.Lock() defer s.dbmu.Unlock() - err := s.db.DropPrefix(bytes.Join( + func() { + s.mu.Lock() + defer s.mu.Unlock() + + s.shares = make(map[string]*types.SSVShare) + s.validatorStore.handleDrop() + }() + + return s.db.DropPrefix(bytes.Join( [][]byte{s.prefix, sharesPrefix, []byte("/")}, nil, )) - if err != nil { - return err - } - - s.shares = make(map[string]*types.SSVShare) - s.validatorStore.handleDrop() - return nil } // storageKey builds share key using sharesPrefix & validator public key, e.g. "shares/0x00..01" From cf5bbf5fc66110bb0bd9dbc706110c272487454d Mon Sep 17 00:00:00 2001 From: Anatolie Lupacescu Date: Tue, 15 Oct 2024 11:46:57 +0300 Subject: [PATCH 3/4] add missing save to memory --- registry/storage/shares.go | 39 +++++++++++++++----------------------- 1 file changed, 15 insertions(+), 24 deletions(-) diff --git a/registry/storage/shares.go b/registry/storage/shares.go index 7c8c1c6db1..534cdcf93c 100644 --- a/registry/storage/shares.go +++ b/registry/storage/shares.go @@ -206,6 +206,8 @@ func (s *sharesStorage) Save(rw basedb.ReadWriter, shares ...*types.SSVShare) er } } + s.logger.Debug("save validators to shares storage", zap.Int("count", len(shares))) + s.dbmu.Lock() defer s.dbmu.Unlock() @@ -243,10 +245,10 @@ func (s *sharesStorage) Save(rw basedb.ReadWriter, shares ...*types.SSVShare) er return err } - return s.unsafeSave(rw, shares...) + return s.saveToDB(rw, shares...) } -func (s *sharesStorage) unsafeSave(rw basedb.ReadWriter, shares ...*types.SSVShare) error { +func (s *sharesStorage) saveToDB(rw basedb.ReadWriter, shares ...*types.SSVShare) error { return s.db.Using(rw).SetMany(s.prefix, len(shares), func(i int) (basedb.Obj, error) { share := specShareToStorageShare(shares[i]) value, err := share.Encode() @@ -357,9 +359,9 @@ func (s *sharesStorage) UpdateValidatorsMetadata(data map[spectypes.ValidatorPK] s.dbmu.Lock() defer s.dbmu.Unlock() - func() { - s.mu.RLock() - defer s.mu.RUnlock() + err := func() error { + s.mu.Lock() + defer s.mu.Unlock() for pk, metadata := range data { if metadata == nil { @@ -373,30 +375,19 @@ func (s *sharesStorage) UpdateValidatorsMetadata(data map[spectypes.ValidatorPK] share.Share.ValidatorIndex = metadata.Index shares = append(shares, share) } - }() - saveShares := func(sshares []*types.SSVShare) error { - s.mu.Lock() - defer s.mu.Unlock() - if err := s.unsafeSave(nil, sshares...); err != nil { - return err + for _, share := range shares { + key := hex.EncodeToString(share.ValidatorPubKey[:]) + s.shares[key] = share } - return s.validatorStore.handleSharesUpdated(shares...) - } - // split into chunks to avoid holding the lock for too long - chunkSize := 1000 - for i := 0; i < len(shares); i += chunkSize { - end := i + chunkSize - if end > len(shares) { - end = len(shares) - } - if err := saveShares(shares[i:end]); err != nil { - return err - } + return s.validatorStore.handleSharesUpdated(shares...) + }() + if err != nil { + return err } - return nil + return s.saveToDB(nil, shares...) } // Drop deletes all shares. From 2be0c5a05911f921ccc01e4fbfcbb8c9b659e6a7 Mon Sep 17 00:00:00 2001 From: Anatolie Lupacescu Date: Wed, 16 Oct 2024 11:49:08 +0300 Subject: [PATCH 4/4] rename mutexes --- registry/storage/shares.go | 52 ++++++++++++++++++++------------------ 1 file changed, 28 insertions(+), 24 deletions(-) diff --git a/registry/storage/shares.go b/registry/storage/shares.go index 534cdcf93c..77bdbf7bec 100644 --- a/registry/storage/shares.go +++ b/registry/storage/shares.go @@ -58,8 +58,12 @@ type sharesStorage struct { prefix []byte shares map[string]*types.SSVShare validatorStore *validatorStore - dbmu sync.Mutex // parent lock for in-memory mutex - mu sync.RWMutex + // storageMtx serializes access to the database in order to avoid + // re-creation of a deleted share during update metadata + storageMtx sync.Mutex + // memoryMtx allows more granular access to in-memory shares map + // to minimize time spent in 'locked' mode when updating state + memoryMtx sync.RWMutex } type storageOperator struct { @@ -152,8 +156,8 @@ func (s *sharesStorage) load() error { } func (s *sharesStorage) Get(_ basedb.Reader, pubKey []byte) (*types.SSVShare, bool) { - s.mu.RLock() - defer s.mu.RUnlock() + s.memoryMtx.RLock() + defer s.memoryMtx.RUnlock() return s.unsafeGet(pubKey) } @@ -164,8 +168,8 @@ func (s *sharesStorage) unsafeGet(pubKey []byte) (*types.SSVShare, bool) { } func (s *sharesStorage) List(_ basedb.Reader, filters ...SharesFilter) []*types.SSVShare { - s.mu.RLock() - defer s.mu.RUnlock() + s.memoryMtx.RLock() + defer s.memoryMtx.RUnlock() if len(filters) == 0 { return maps.Values(s.shares) @@ -185,8 +189,8 @@ Shares: } func (s *sharesStorage) Range(_ basedb.Reader, fn func(*types.SSVShare) bool) { - s.mu.RLock() - defer s.mu.RUnlock() + s.memoryMtx.RLock() + defer s.memoryMtx.RUnlock() for _, share := range s.shares { if !fn(share) { @@ -208,13 +212,13 @@ func (s *sharesStorage) Save(rw basedb.ReadWriter, shares ...*types.SSVShare) er s.logger.Debug("save validators to shares storage", zap.Int("count", len(shares))) - s.dbmu.Lock() - defer s.dbmu.Unlock() + s.storageMtx.Lock() + defer s.storageMtx.Unlock() // Update in-memory. err := func() error { - s.mu.Lock() - defer s.mu.Unlock() + s.memoryMtx.Lock() + defer s.memoryMtx.Unlock() updateShares := make([]*types.SSVShare, 0, len(shares)) addShares := make([]*types.SSVShare, 0, len(shares)) @@ -323,12 +327,12 @@ func (s *sharesStorage) storageShareToSpecShare(share *storageShare) (*types.SSV var errShareNotFound = errors.New("share not found") func (s *sharesStorage) Delete(rw basedb.ReadWriter, pubKey []byte) error { - s.dbmu.Lock() - defer s.dbmu.Unlock() + s.storageMtx.Lock() + defer s.storageMtx.Unlock() err := func() error { - s.mu.Lock() - defer s.mu.Unlock() + s.memoryMtx.Lock() + defer s.memoryMtx.Unlock() share, found := s.shares[hex.EncodeToString(pubKey)] if !found { @@ -356,12 +360,12 @@ func (s *sharesStorage) Delete(rw basedb.ReadWriter, pubKey []byte) error { func (s *sharesStorage) UpdateValidatorsMetadata(data map[spectypes.ValidatorPK]*beaconprotocol.ValidatorMetadata) error { var shares []*types.SSVShare - s.dbmu.Lock() - defer s.dbmu.Unlock() + s.storageMtx.Lock() + defer s.storageMtx.Unlock() err := func() error { - s.mu.Lock() - defer s.mu.Unlock() + s.memoryMtx.Lock() + defer s.memoryMtx.Unlock() for pk, metadata := range data { if metadata == nil { @@ -392,12 +396,12 @@ func (s *sharesStorage) UpdateValidatorsMetadata(data map[spectypes.ValidatorPK] // Drop deletes all shares. func (s *sharesStorage) Drop() error { - s.dbmu.Lock() - defer s.dbmu.Unlock() + s.storageMtx.Lock() + defer s.storageMtx.Unlock() func() { - s.mu.Lock() - defer s.mu.Unlock() + s.memoryMtx.Lock() + defer s.memoryMtx.Unlock() s.shares = make(map[string]*types.SSVShare) s.validatorStore.handleDrop()