Skip to content

Commit

Permalink
Merge 48fbe5c into 2f5e83b
Browse files Browse the repository at this point in the history
  • Loading branch information
carpawell authored Sep 7, 2023
2 parents 2f5e83b + 48fbe5c commit 0d46fac
Show file tree
Hide file tree
Showing 13 changed files with 450 additions and 58 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ minor release, the component will be purged, so be prepared (see `Updating` sect
- `neofs-lens write-cache list` command duplication (#2505)
- `neofs-adm` works with contract wallet in `init` and `update-contracts` commands only (#2134)
- Missing removed but locked objects in `SEARCH`'s results (#2526)
- LOCK objects and regular objects expiration conflicts (#2392)

### Removed
- Deprecated `morph.rpc_endpoint` SN and `morph.endpoint.client` IR config sections (#2400)
Expand Down
8 changes: 8 additions & 0 deletions cmd/neofs-node/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,10 @@ type engineWithNotifications struct {
defaultTopic string
}

func (e engineWithNotifications) IsLocked(address oid.Address) (bool, error) {
return e.base.IsLocked(address)
}

func (e engineWithNotifications) Delete(tombstone oid.Address, toDelete []oid.ID) error {
return e.base.Delete(tombstone, toDelete)
}
Expand Down Expand Up @@ -562,6 +566,10 @@ type engineWithoutNotifications struct {
engine *engine.StorageEngine
}

func (e engineWithoutNotifications) IsLocked(address oid.Address) (bool, error) {
return e.engine.IsLocked(address)
}

func (e engineWithoutNotifications) Delete(tombstone oid.Address, toDelete []oid.ID) error {
var prm engine.InhumePrm

Expand Down
33 changes: 32 additions & 1 deletion pkg/core/object/fmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type FormatValidatorOption func(*cfg)

type cfg struct {
netState netmap.State
e LockSource
}

// DeleteHandler is an interface of delete queue processor.
Expand All @@ -34,6 +35,12 @@ type DeleteHandler interface {
DeleteObjects(oid.Address, ...oid.Address) error
}

// LockSource is a source of lock relations between the objects.
type LockSource interface {
// IsLocked must clarify object's lock status.
IsLocked(address oid.Address) (bool, error)
}

// Locker is an object lock storage interface.
type Locker interface {
// Lock list of objects as locked by locker in the specified container.
Expand Down Expand Up @@ -286,7 +293,24 @@ func (v *FormatValidator) checkExpiration(obj *object.Object) error {
}

if exp < v.netState.CurrentEpoch() {
return errExpired
// an object could be expired but locked;
// put such an object is a correct operation

cID, _ := obj.ContainerID()
oID, _ := obj.ID()

var addr oid.Address
addr.SetContainer(cID)
addr.SetObject(oID)

locked, err := v.e.IsLocked(addr)
if err != nil {
return fmt.Errorf("locking status check for an expired object: %w", err)
}

if !locked {
return errExpired
}
}

return nil
Expand Down Expand Up @@ -347,3 +371,10 @@ func WithNetState(netState netmap.State) FormatValidatorOption {
c.netState = netState
}
}

// WithLockSource return option to set a Locked objects source.
func WithLockSource(e LockSource) FormatValidatorOption {
return func(c *cfg) {
c.e = e
}
}
34 changes: 32 additions & 2 deletions pkg/core/object/fmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,26 @@ func (s testNetState) CurrentEpoch() uint64 {
return s.epoch
}

type testLockSource struct {
m map[oid.Address]bool
}

func (t testLockSource) IsLocked(address oid.Address) (bool, error) {
return t.m[address], nil
}

func TestFormatValidator_Validate(t *testing.T) {
const curEpoch = 13

ls := testLockSource{
m: make(map[oid.Address]bool),
}

v := NewFormatValidator(
WithNetState(testNetState{
epoch: curEpoch,
}),
WithLockSource(ls),
)

ownerKey, err := keys.NewPrivateKey()
Expand Down Expand Up @@ -225,8 +238,25 @@ func TestFormatValidator_Validate(t *testing.T) {

t.Run("expired object", func(t *testing.T) {
val := strconv.FormatUint(curEpoch-1, 10)
err := v.Validate(fn(val), false)
require.ErrorIs(t, err, errExpired)
obj := fn(val)

t.Run("non-locked", func(t *testing.T) {
err := v.Validate(obj, false)
require.ErrorIs(t, err, errExpired)
})

t.Run("locked", func(t *testing.T) {
var addr oid.Address
oID, _ := obj.ID()
cID, _ := obj.ContainerID()

addr.SetContainer(cID)
addr.SetObject(oID)
ls.m[addr] = true

err := v.Validate(obj, false)
require.NoError(t, err)
})
})

t.Run("alive object", func(t *testing.T) {
Expand Down
15 changes: 14 additions & 1 deletion pkg/local_object_storage/engine/inhume.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (e *StorageEngine) inhume(prm InhumePrm) (InhumeRes, error) {

for i := range prm.addrs {
if !prm.forceRemoval {
locked, err := e.isLocked(prm.addrs[i])
locked, err := e.IsLocked(prm.addrs[i])
if err != nil {
e.log.Warn("removing an object without full locking check",
zap.Error(err),
Expand Down Expand Up @@ -179,6 +179,19 @@ func (e *StorageEngine) inhumeAddr(addr oid.Address, prm shard.InhumePrm, checkE
return ok, retErr
}

// IsLocked checks whether an object is locked according to StorageEngine's state.
func (e *StorageEngine) IsLocked(addr oid.Address) (bool, error) {
var res bool
var err error

err = e.execIfNotBlocked(func() error {
res, err = e.isLocked(addr)
return err
})

return res, err
}

func (e *StorageEngine) isLocked(addr oid.Address) (bool, error) {
var locked bool
var err error
Expand Down
127 changes: 82 additions & 45 deletions pkg/local_object_storage/metabase/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,25 +96,28 @@ func (db *DB) Lock(cnr cid.ID, locker oid.ID, locked []oid.ID) error {
}

// FreeLockedBy unlocks all objects in DB which are locked by lockers.
func (db *DB) FreeLockedBy(lockers []oid.Address) error {
// Returns unlocked objects if any.
func (db *DB) FreeLockedBy(lockers []oid.Address) ([]oid.Address, error) {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()

if db.mode.NoMetabase() {
return ErrDegradedMode
return nil, ErrDegradedMode
}

return db.boltDB.Update(func(tx *bbolt.Tx) error {
var err error
var unlocked []oid.Address

return unlocked, db.boltDB.Update(func(tx *bbolt.Tx) error {
for i := range lockers {
err = freePotentialLocks(tx, lockers[i].Container(), lockers[i].Object())
uu, err := freePotentialLocks(tx, lockers[i].Container(), lockers[i].Object())
if err != nil {
return err
}

unlocked = append(unlocked, uu...)
}

return err
return nil
})
}

Expand All @@ -133,60 +136,94 @@ func objectLocked(tx *bbolt.Tx, idCnr cid.ID, idObj oid.ID) bool {
return false
}

type kv struct {
k []byte
v []byte
}

// releases all records about the objects locked by the locker.
// Returns unlocked objects (if any).
//
// Operation is very resource-intensive, which is caused by the admissibility
// of multiple locks. Also, if we knew what objects are locked, it would be
// possible to speed up the execution.
func freePotentialLocks(tx *bbolt.Tx, idCnr cid.ID, locker oid.ID) error {
func freePotentialLocks(tx *bbolt.Tx, idCnr cid.ID, locker oid.ID) ([]oid.Address, error) {
bucketLocked := tx.Bucket(bucketNameLocked)
if bucketLocked != nil {
key := make([]byte, cidSize)
idCnr.Encode(key)
if bucketLocked == nil {
return nil, nil
}

bucketLockedContainer := bucketLocked.Bucket(key)
if bucketLockedContainer != nil {
keyLocker := objectKey(locker, key)
return bucketLockedContainer.ForEach(func(k, v []byte) error {
keyLockers, err := decodeList(v)
if err != nil {
return fmt.Errorf("decode list of lockers in locked bucket: %w", err)
}
key := make([]byte, cidSize)
idCnr.Encode(key)

for i := range keyLockers {
if bytes.Equal(keyLockers[i], keyLocker) {
if len(keyLockers) == 1 {
// locker was all alone
err = bucketLockedContainer.Delete(k)
if err != nil {
return fmt.Errorf("delete locked object record from locked bucket: %w", err)
}
} else {
// exclude locker
keyLockers = append(keyLockers[:i], keyLockers[i+1:]...)

v, err = encodeList(keyLockers)
if err != nil {
return fmt.Errorf("encode updated list of lockers: %w", err)
}

// update the record
err = bucketLockedContainer.Put(k, v)
if err != nil {
return fmt.Errorf("update list of lockers: %w", err)
}
}

return nil
bucketLockedContainer := bucketLocked.Bucket(key)
if bucketLockedContainer == nil {
return nil, nil
}

var unlocked []oid.Address
var bktChanges []kv
keyLocker := objectKey(locker, key)

err := bucketLockedContainer.ForEach(func(k, v []byte) error {
keyLockers, err := decodeList(v)
if err != nil {
return fmt.Errorf("decode list of lockers in locked bucket: %w", err)
}

for i := range keyLockers {
if bytes.Equal(keyLockers[i], keyLocker) {
if len(keyLockers) == 1 {
bktChanges = append(bktChanges, kv{k: k, v: nil})

var oID oid.ID
err = oID.Decode(k)
if err != nil {
return fmt.Errorf("decode unlocked object id error: %w", err)
}

var addr oid.Address
addr.SetContainer(idCnr)
addr.SetObject(oID)

unlocked = append(unlocked, addr)
} else {
// exclude locker
keyLockers = append(keyLockers[:i], keyLockers[i+1:]...)

v, err = encodeList(keyLockers)
if err != nil {
return fmt.Errorf("encode updated list of lockers: %w", err)
}

bktChanges = append(bktChanges, kv{k: k, v: v})
}

return nil
})
}
}

return nil
})
if err != nil {
return nil, fmt.Errorf("iterating lockers: %w", err)
}

for _, kv := range bktChanges {
if kv.v == nil {
err = bucketLockedContainer.Delete(kv.k)
if err != nil {
return nil, fmt.Errorf("delete locked object record from locked bucket: %w", err)
}
} else {
err = bucketLockedContainer.Put(kv.k, kv.v)
if err != nil {
return nil, fmt.Errorf("update list of lockers: %w", err)
}
}
}

return nil
return unlocked, nil
}

// IsLockedPrm groups the parameters of IsLocked operation.
Expand Down
15 changes: 13 additions & 2 deletions pkg/local_object_storage/metabase/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,9 @@ func TestDB_Lock(t *testing.T) {
require.Len(t, res.DeletedLockObjects(), 1)
require.Equal(t, objectcore.AddressOf(lockObj), res.DeletedLockObjects()[0])

err = db.FreeLockedBy([]oid.Address{lockAddr})
unlocked, err := db.FreeLockedBy([]oid.Address{lockAddr})
require.NoError(t, err)
require.ElementsMatch(t, objsToAddrs(objs), unlocked)

inhumePrm.SetAddresses(objAddr)
inhumePrm.SetGCMark()
Expand Down Expand Up @@ -140,8 +141,9 @@ func TestDB_Lock(t *testing.T) {

// unlock just objects that were locked by
// just removed locker
err = db.FreeLockedBy([]oid.Address{res.DeletedLockObjects()[0]})
unlocked, err := db.FreeLockedBy([]oid.Address{res.DeletedLockObjects()[0]})
require.NoError(t, err)
require.ElementsMatch(t, objsToAddrs(objs), unlocked)

// removing objects after unlock

Expand Down Expand Up @@ -264,3 +266,12 @@ func putAndLockObj(t *testing.T, db *meta.DB, numOfLockedObjs int) ([]*object.Ob

return lockedObjs, lockObj
}

func objsToAddrs(oo []*object.Object) []oid.Address {
res := make([]oid.Address, 0, len(oo))
for _, o := range oo {
res = append(res, objectcore.AddressOf(o))
}

return res
}
Loading

0 comments on commit 0d46fac

Please sign in to comment.