Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix/Locked but expired objects #2535

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ minor release, the component will be purged, so be prepared (see `Updating` sect
- `neofs-lens write-cache list` command duplication (#2505)
- `neofs-adm` works with contract wallet in `init` and `update-contracts` commands only (#2134)
- Missing removed but locked objects in `SEARCH`'s results (#2526)
- LOCK objects and regular objects expiration conflicts (#2392)

### Removed
- Deprecated `morph.rpc_endpoint` SN and `morph.endpoint.client` IR config sections (#2400)
Expand Down
8 changes: 8 additions & 0 deletions cmd/neofs-node/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,10 @@ type engineWithNotifications struct {
defaultTopic string
}

func (e engineWithNotifications) IsLocked(address oid.Address) (bool, error) {
return e.base.IsLocked(address)
}

func (e engineWithNotifications) Delete(tombstone oid.Address, toDelete []oid.ID) error {
return e.base.Delete(tombstone, toDelete)
}
Expand Down Expand Up @@ -562,6 +566,10 @@ type engineWithoutNotifications struct {
engine *engine.StorageEngine
}

func (e engineWithoutNotifications) IsLocked(address oid.Address) (bool, error) {
return e.engine.IsLocked(address)
}

func (e engineWithoutNotifications) Delete(tombstone oid.Address, toDelete []oid.ID) error {
var prm engine.InhumePrm

Expand Down
33 changes: 32 additions & 1 deletion pkg/core/object/fmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type FormatValidatorOption func(*cfg)

type cfg struct {
netState netmap.State
e LockSource
}

// DeleteHandler is an interface of delete queue processor.
Expand All @@ -34,6 +35,12 @@ type DeleteHandler interface {
DeleteObjects(oid.Address, ...oid.Address) error
}

// LockSource is a source of lock relations between the objects.
type LockSource interface {
// IsLocked must clarify object's lock status.
IsLocked(address oid.Address) (bool, error)
}

// Locker is an object lock storage interface.
type Locker interface {
// Lock list of objects as locked by locker in the specified container.
Expand Down Expand Up @@ -286,7 +293,24 @@ func (v *FormatValidator) checkExpiration(obj *object.Object) error {
}

if exp < v.netState.CurrentEpoch() {
return errExpired
// an object could be expired but locked;
// put such an object is a correct operation

cID, _ := obj.ContainerID()
oID, _ := obj.ID()

var addr oid.Address
addr.SetContainer(cID)
addr.SetObject(oID)

locked, err := v.e.IsLocked(addr)
if err != nil {
return fmt.Errorf("locking status check for an expired object: %w", err)
}

if !locked {
return errExpired
}
}

return nil
Expand Down Expand Up @@ -347,3 +371,10 @@ func WithNetState(netState netmap.State) FormatValidatorOption {
c.netState = netState
}
}

// WithLockSource return option to set a Locked objects source.
func WithLockSource(e LockSource) FormatValidatorOption {
return func(c *cfg) {
c.e = e
}
}
34 changes: 32 additions & 2 deletions pkg/core/object/fmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,26 @@ func (s testNetState) CurrentEpoch() uint64 {
return s.epoch
}

type testLockSource struct {
m map[oid.Address]bool
}

func (t testLockSource) IsLocked(address oid.Address) (bool, error) {
return t.m[address], nil
}

func TestFormatValidator_Validate(t *testing.T) {
const curEpoch = 13

ls := testLockSource{
m: make(map[oid.Address]bool),
}

v := NewFormatValidator(
WithNetState(testNetState{
epoch: curEpoch,
}),
WithLockSource(ls),
)

ownerKey, err := keys.NewPrivateKey()
Expand Down Expand Up @@ -225,8 +238,25 @@ func TestFormatValidator_Validate(t *testing.T) {

t.Run("expired object", func(t *testing.T) {
val := strconv.FormatUint(curEpoch-1, 10)
err := v.Validate(fn(val), false)
require.ErrorIs(t, err, errExpired)
obj := fn(val)

t.Run("non-locked", func(t *testing.T) {
err := v.Validate(obj, false)
require.ErrorIs(t, err, errExpired)
})

t.Run("locked", func(t *testing.T) {
var addr oid.Address
oID, _ := obj.ID()
cID, _ := obj.ContainerID()

addr.SetContainer(cID)
addr.SetObject(oID)
ls.m[addr] = true

err := v.Validate(obj, false)
require.NoError(t, err)
})
})

t.Run("alive object", func(t *testing.T) {
Expand Down
15 changes: 14 additions & 1 deletion pkg/local_object_storage/engine/inhume.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (e *StorageEngine) inhume(prm InhumePrm) (InhumeRes, error) {

for i := range prm.addrs {
if !prm.forceRemoval {
locked, err := e.isLocked(prm.addrs[i])
locked, err := e.IsLocked(prm.addrs[i])
if err != nil {
e.log.Warn("removing an object without full locking check",
zap.Error(err),
Expand Down Expand Up @@ -179,6 +179,19 @@ func (e *StorageEngine) inhumeAddr(addr oid.Address, prm shard.InhumePrm, checkE
return ok, retErr
}

// IsLocked checks whether an object is locked according to StorageEngine's state.
func (e *StorageEngine) IsLocked(addr oid.Address) (bool, error) {
cthulhu-rider marked this conversation as resolved.
Show resolved Hide resolved
var res bool
var err error

err = e.execIfNotBlocked(func() error {
res, err = e.isLocked(addr)
return err
})

return res, err
}

func (e *StorageEngine) isLocked(addr oid.Address) (bool, error) {
var locked bool
var err error
Expand Down
127 changes: 82 additions & 45 deletions pkg/local_object_storage/metabase/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,25 +96,28 @@ func (db *DB) Lock(cnr cid.ID, locker oid.ID, locked []oid.ID) error {
}

// FreeLockedBy unlocks all objects in DB which are locked by lockers.
func (db *DB) FreeLockedBy(lockers []oid.Address) error {
// Returns unlocked objects if any.
func (db *DB) FreeLockedBy(lockers []oid.Address) ([]oid.Address, error) {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()

if db.mode.NoMetabase() {
return ErrDegradedMode
return nil, ErrDegradedMode
}

return db.boltDB.Update(func(tx *bbolt.Tx) error {
var err error
var unlocked []oid.Address

return unlocked, db.boltDB.Update(func(tx *bbolt.Tx) error {
for i := range lockers {
err = freePotentialLocks(tx, lockers[i].Container(), lockers[i].Object())
uu, err := freePotentialLocks(tx, lockers[i].Container(), lockers[i].Object())
if err != nil {
return err
}

unlocked = append(unlocked, uu...)
}

return err
return nil
})
}

Expand All @@ -133,60 +136,94 @@ func objectLocked(tx *bbolt.Tx, idCnr cid.ID, idObj oid.ID) bool {
return false
}

type kv struct {
k []byte
v []byte
}

// releases all records about the objects locked by the locker.
// Returns unlocked objects (if any).
//
// Operation is very resource-intensive, which is caused by the admissibility
// of multiple locks. Also, if we knew what objects are locked, it would be
// possible to speed up the execution.
func freePotentialLocks(tx *bbolt.Tx, idCnr cid.ID, locker oid.ID) error {
func freePotentialLocks(tx *bbolt.Tx, idCnr cid.ID, locker oid.ID) ([]oid.Address, error) {
bucketLocked := tx.Bucket(bucketNameLocked)
if bucketLocked != nil {
key := make([]byte, cidSize)
idCnr.Encode(key)
if bucketLocked == nil {
return nil, nil
}

bucketLockedContainer := bucketLocked.Bucket(key)
if bucketLockedContainer != nil {
keyLocker := objectKey(locker, key)
return bucketLockedContainer.ForEach(func(k, v []byte) error {
keyLockers, err := decodeList(v)
if err != nil {
return fmt.Errorf("decode list of lockers in locked bucket: %w", err)
}
key := make([]byte, cidSize)
idCnr.Encode(key)

for i := range keyLockers {
if bytes.Equal(keyLockers[i], keyLocker) {
if len(keyLockers) == 1 {
// locker was all alone
err = bucketLockedContainer.Delete(k)
if err != nil {
return fmt.Errorf("delete locked object record from locked bucket: %w", err)
}
} else {
// exclude locker
keyLockers = append(keyLockers[:i], keyLockers[i+1:]...)

v, err = encodeList(keyLockers)
if err != nil {
return fmt.Errorf("encode updated list of lockers: %w", err)
}

// update the record
err = bucketLockedContainer.Put(k, v)
if err != nil {
return fmt.Errorf("update list of lockers: %w", err)
}
}

return nil
bucketLockedContainer := bucketLocked.Bucket(key)
if bucketLockedContainer == nil {
return nil, nil
}

var unlocked []oid.Address
var bktChanges []kv
keyLocker := objectKey(locker, key)

err := bucketLockedContainer.ForEach(func(k, v []byte) error {
keyLockers, err := decodeList(v)
if err != nil {
return fmt.Errorf("decode list of lockers in locked bucket: %w", err)
}

for i := range keyLockers {
if bytes.Equal(keyLockers[i], keyLocker) {
if len(keyLockers) == 1 {
bktChanges = append(bktChanges, kv{k: k, v: nil})

var oID oid.ID
err = oID.Decode(k)
if err != nil {
return fmt.Errorf("decode unlocked object id error: %w", err)
}

var addr oid.Address
addr.SetContainer(idCnr)
addr.SetObject(oID)

unlocked = append(unlocked, addr)
} else {
// exclude locker
keyLockers = append(keyLockers[:i], keyLockers[i+1:]...)

v, err = encodeList(keyLockers)
if err != nil {
return fmt.Errorf("encode updated list of lockers: %w", err)
}

bktChanges = append(bktChanges, kv{k: k, v: v})
}

return nil
})
}
}

return nil
})
if err != nil {
return nil, fmt.Errorf("iterating lockers: %w", err)
}

for _, kv := range bktChanges {
if kv.v == nil {
err = bucketLockedContainer.Delete(kv.k)
if err != nil {
return nil, fmt.Errorf("delete locked object record from locked bucket: %w", err)
}
} else {
err = bucketLockedContainer.Put(kv.k, kv.v)
if err != nil {
return nil, fmt.Errorf("update list of lockers: %w", err)
}
}
}

return nil
return unlocked, nil
}

// IsLockedPrm groups the parameters of IsLocked operation.
Expand Down
15 changes: 13 additions & 2 deletions pkg/local_object_storage/metabase/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,9 @@ func TestDB_Lock(t *testing.T) {
require.Len(t, res.DeletedLockObjects(), 1)
require.Equal(t, objectcore.AddressOf(lockObj), res.DeletedLockObjects()[0])

err = db.FreeLockedBy([]oid.Address{lockAddr})
unlocked, err := db.FreeLockedBy([]oid.Address{lockAddr})
require.NoError(t, err)
require.ElementsMatch(t, objsToAddrs(objs), unlocked)

inhumePrm.SetAddresses(objAddr)
inhumePrm.SetGCMark()
Expand Down Expand Up @@ -140,8 +141,9 @@ func TestDB_Lock(t *testing.T) {

// unlock just objects that were locked by
// just removed locker
err = db.FreeLockedBy([]oid.Address{res.DeletedLockObjects()[0]})
unlocked, err := db.FreeLockedBy([]oid.Address{res.DeletedLockObjects()[0]})
require.NoError(t, err)
require.ElementsMatch(t, objsToAddrs(objs), unlocked)

// removing objects after unlock

Expand Down Expand Up @@ -264,3 +266,12 @@ func putAndLockObj(t *testing.T, db *meta.DB, numOfLockedObjs int) ([]*object.Ob

return lockedObjs, lockObj
}

func objsToAddrs(oo []*object.Object) []oid.Address {
res := make([]oid.Address, 0, len(oo))
for _, o := range oo {
res = append(res, objectcore.AddressOf(o))
}

return res
}
Loading