diff --git a/CHANGELOG.md b/CHANGELOG.md index 25517653e4..0028cf279e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ Changelog for NeoFS Node ### Fixed - Unenforced IR `morph.consensus.p2p.peers.min` config default (#2856) +- Object parts do not expire (#2858) ### Changed diff --git a/pkg/core/object/fmt.go b/pkg/core/object/fmt.go index 47772c01d9..756579f085 100644 --- a/pkg/core/object/fmt.go +++ b/pkg/core/object/fmt.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "strconv" "github.com/nspcc-dev/neofs-node/pkg/core/netmap" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" @@ -78,8 +77,6 @@ var errNilID = errors.New("missing identifier") var errNilCID = errors.New("missing container identifier") -var errNoExpirationEpoch = errors.New("missing expiration epoch attribute") - var errTombstoneExpiration = errors.New("tombstone body and header contain different expiration values") var errEmptySGMembers = errors.New("storage group with empty members list") @@ -170,7 +167,7 @@ func (v *FormatValidator) Validate(obj *object.Object, unprepared bool) error { return fmt.Errorf("(%T) could not validate signature key: %w", v, err) } - if err := v.checkExpiration(obj); err != nil { + if err := v.checkExpiration(*obj); err != nil { return fmt.Errorf("object did not pass expiration check: %w", err) } @@ -288,7 +285,7 @@ func (v *FormatValidator) ValidateContent(o *object.Object) (ContentMeta, error) } // check if the tombstone has the same expiration in the body and the header - exp, err := expirationEpochAttribute(o) + exp, err := Expiration(*o) if err != nil { return ContentMeta{}, err } @@ -353,7 +350,7 @@ func (v *FormatValidator) ValidateContent(o *object.Object) (ContentMeta, error) } // check that LOCK object has correct expiration epoch - lockExp, err := expirationEpochAttribute(o) + lockExp, err := Expiration(*o) if err != nil { return ContentMeta{}, fmt.Errorf("lock object expiration epoch: %w", err) } @@ -385,17 +382,17 @@ func (v *FormatValidator) ValidateContent(o *object.Object) (ContentMeta, error) var errExpired = errors.New("object has expired") -func (v *FormatValidator) checkExpiration(obj *object.Object) error { - exp, err := expirationEpochAttribute(obj) +func (v *FormatValidator) checkExpiration(obj object.Object) error { + exp, err := Expiration(obj) if err != nil { - if errors.Is(err, errNoExpirationEpoch) { + if errors.Is(err, ErrNoExpiration) { return nil // objects without expiration attribute are valid } return err } - if exp < v.netState.CurrentEpoch() { + if currEpoch := v.netState.CurrentEpoch(); exp < currEpoch { // an object could be expired but locked; // put such an object is a correct operation @@ -412,25 +409,13 @@ func (v *FormatValidator) checkExpiration(obj *object.Object) error { } if !locked { - return errExpired + return fmt.Errorf("%w: attribute: %d, current: %d", errExpired, exp, currEpoch) } } return nil } -func expirationEpochAttribute(obj *object.Object) (uint64, error) { - for _, a := range obj.Attributes() { - if a.Key() != object.AttributeExpirationEpoch { - continue - } - - return strconv.ParseUint(a.Value(), 10, 64) - } - - return 0, errNoExpirationEpoch -} - var ( errDuplAttr = errors.New("duplication of attributes detected") errEmptyAttrVal = errors.New("empty attribute value") diff --git a/pkg/core/object/object.go b/pkg/core/object/object.go index ea6ec07120..24c9f2451d 100644 --- a/pkg/core/object/object.go +++ b/pkg/core/object/object.go @@ -2,6 +2,7 @@ package object import ( "errors" + "strconv" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" @@ -26,3 +27,20 @@ func AddressOf(obj *object.Object) oid.Address { return addr } + +// ErrNoExpiration means no expiration was set. +var ErrNoExpiration = errors.New("missing expiration epoch attribute") + +// Expiration searches for expiration attribute in the object. Returns +// ErrNoExpiration if not found. +func Expiration(obj object.Object) (uint64, error) { + for _, a := range obj.Attributes() { + if a.Key() != object.AttributeExpirationEpoch { + continue + } + + return strconv.ParseUint(a.Value(), 10, 64) + } + + return 0, ErrNoExpiration +} diff --git a/pkg/local_object_storage/engine/engine_test.go b/pkg/local_object_storage/engine/engine_test.go index 7f4d7e2f29..b0b7029442 100644 --- a/pkg/local_object_storage/engine/engine_test.go +++ b/pkg/local_object_storage/engine/engine_test.go @@ -14,6 +14,7 @@ import ( meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/pilorama" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" + "github.com/nspcc-dev/neofs-node/pkg/util" "github.com/nspcc-dev/neofs-sdk-go/checksum" checksumtest "github.com/nspcc-dev/neofs-sdk-go/checksum/test" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" @@ -28,10 +29,12 @@ import ( "go.uber.org/zap" ) -type epochState struct{} +type epochState struct { + e uint64 +} func (s epochState) CurrentEpoch() uint64 { - return 0 + return s.e } func BenchmarkExists(b *testing.B) { @@ -128,7 +131,15 @@ func testNewShard(t testing.TB, id int) *shard.Shard { meta.WithPath(filepath.Join(t.Name(), fmt.Sprintf("%d.metabase", id))), meta.WithPermissions(0700), meta.WithEpochState(epochState{}), - )) + ), + shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool { + pool, err := ants.NewPool(sz) + if err != nil { + panic(err) + } + + return pool + })) require.NoError(t, s.Open()) require.NoError(t, s.Init()) @@ -152,6 +163,7 @@ func testEngineFromShardOpts(t *testing.T, num int, extraOpts []shard.Option) *S ), shard.WithPiloramaOptions( pilorama.WithPath(filepath.Join(t.Name(), fmt.Sprintf("pilorama%d", i)))), + shard.WithExpiredObjectsCallback(engine.processExpiredObjects), }, extraOpts...)...) require.NoError(t, err) } diff --git a/pkg/local_object_storage/engine/gc_test.go b/pkg/local_object_storage/engine/gc_test.go new file mode 100644 index 0000000000..eb833ef78c --- /dev/null +++ b/pkg/local_object_storage/engine/gc_test.go @@ -0,0 +1,174 @@ +package engine + +import ( + "errors" + "fmt" + "path/filepath" + "testing" + "time" + + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" + meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/pilorama" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" + "github.com/nspcc-dev/neofs-node/pkg/util" + statusSDK "github.com/nspcc-dev/neofs-sdk-go/client/status" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" + objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + usertest "github.com/nspcc-dev/neofs-sdk-go/user/test" + "github.com/panjf2000/ants/v2" + "github.com/stretchr/testify/require" +) + +func TestChildrenExpiration(t *testing.T) { + const numOfShards = 5 + const currEpoch = 10 + es := &epochState{e: currEpoch} + owner := usertest.ID(t) + + e := New() + for i := 0; i < numOfShards; i++ { + _, err := e.AddShard( + shard.WithBlobStorOptions( + blobstor.WithStorages( + newStorages(filepath.Join(t.TempDir(), fmt.Sprintf("blobstor%d", i)), + 1<<20)), + ), + shard.WithMetaBaseOptions( + meta.WithPath(filepath.Join(t.TempDir(), fmt.Sprintf("metabase%d", i))), + meta.WithPermissions(0700), + meta.WithEpochState(es), + ), + shard.WithPiloramaOptions( + pilorama.WithPath(filepath.Join(t.TempDir(), fmt.Sprintf("pilorama%d", i)))), + shard.WithExpiredObjectsCallback(e.processExpiredObjects), + shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool { + pool, err := ants.NewPool(sz) + if err != nil { + panic(err) + } + + return pool + }), + ) + require.NoError(t, err) + } + require.NoError(t, e.Open()) + require.NoError(t, e.Init()) + t.Cleanup(func() { + _ = e.Close() + }) + + expAttr := *objectSDK.NewAttribute(objectSDK.AttributeExpirationEpoch, fmt.Sprint(currEpoch)) + + t.Run("V1", func(t *testing.T) { + cnr := cidtest.ID() + splitID := objectSDK.NewSplitID() + + parent := generateObjectWithCID(t, cnr) + parentID, _ := parent.ID() + parent.SetAttributes(expAttr) + + child1 := generateObjectWithCID(t, cnr) + child1ID, _ := child1.ID() + child1.SetSplitID(splitID) + + child2 := generateObjectWithCID(t, cnr) + child2ID, _ := child2.ID() + child2.SetSplitID(splitID) + child2.SetPreviousID(child1ID) + + child3 := generateObjectWithCID(t, cnr) + child3ID, _ := child3.ID() + child3.SetSplitID(splitID) + child3.SetPreviousID(child2ID) + child3.SetParent(parent) + child3.SetParentID(parentID) + + link := generateObjectWithCID(t, cnr) + link.SetParent(parent) + link.SetParentID(parentID) + link.SetChildren(child1ID, child2ID, child3ID) + link.SetSplitID(splitID) + + require.NoError(t, Put(e, child1)) + require.NoError(t, Put(e, child2)) + require.NoError(t, Put(e, child3)) + require.NoError(t, Put(e, link)) + + e.HandleNewEpoch(currEpoch + 1) + + checkObjectsAsyncRemoval(t, e, cnr, child1ID, child2ID, child3ID) + }) + + t.Run("V2", func(t *testing.T) { + cnr := cidtest.ID() + + parent := generateObjectWithCID(t, cnr) + parentID, _ := parent.ID() + parent.SetAttributes(expAttr) + + child1 := generateObjectWithCID(t, cnr) + child1ID, _ := child1.ID() + child1.SetParent(parent) + + child2 := generateObjectWithCID(t, cnr) + child2ID, _ := child2.ID() + child2.SetFirstID(child1ID) + child2.SetPreviousID(child1ID) + + child3 := generateObjectWithCID(t, cnr) + child3ID, _ := child3.ID() + child3.SetFirstID(child1ID) + child3.SetPreviousID(child2ID) + child3.SetParent(parent) + child3.SetParentID(parentID) + + children := make([]objectSDK.MeasuredObject, 3) + children[0].SetObjectID(child1ID) + children[1].SetObjectID(child2ID) + children[2].SetObjectID(child3ID) + + var link objectSDK.Link + link.SetObjects(children) + + var linkObj objectSDK.Object + linkObj.WriteLink(link) + linkObj.SetContainerID(cnr) + linkObj.SetParent(parent) + linkObj.SetParentID(parentID) + linkObj.SetFirstID(child1ID) + linkObj.SetOwnerID(&owner) + linkObj.CalculateAndSetPayloadChecksum() + require.NoError(t, linkObj.CalculateAndSetID()) + + require.NoError(t, Put(e, child1)) + require.NoError(t, Put(e, child2)) + require.NoError(t, Put(e, child3)) + require.NoError(t, Put(e, &linkObj)) + + e.HandleNewEpoch(currEpoch + 1) + + checkObjectsAsyncRemoval(t, e, cnr, child1ID, child2ID, child3ID) + }) +} + +func checkObjectsAsyncRemoval(t *testing.T, e *StorageEngine, cnr cid.ID, objs ...oid.ID) { + require.Eventually(t, func() bool { + var addr oid.Address + addr.SetContainer(cnr) + + for _, obj := range objs { + addr.SetObject(obj) + + _, err := Get(e, addr) + if !errors.As(err, new(statusSDK.ObjectNotFound)) { + return false + } + } + + return true + }, 1*time.Second, 100*time.Millisecond) +} diff --git a/pkg/local_object_storage/engine/inhume.go b/pkg/local_object_storage/engine/inhume.go index 062974cb1d..54576af569 100644 --- a/pkg/local_object_storage/engine/inhume.go +++ b/pkg/local_object_storage/engine/inhume.go @@ -149,6 +149,7 @@ func (e *StorageEngine) inhumeAddr(addr oid.Address, prm shard.InhumePrm) (bool, // see if the object is root e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { existPrm.SetAddress(addr) + existPrm.IgnoreExpiration() res, err := sh.Exists(existPrm) if err != nil { @@ -321,6 +322,16 @@ func (e *StorageEngine) isLocked(addr oid.Address) (bool, error) { return locked, outErr } +func (e *StorageEngine) processExpiredObjects(_ context.Context, addrs []oid.Address) { + var prm InhumePrm + prm.MarkAsGarbage(addrs...) + + _, err := e.Inhume(prm) + if err != nil { + e.log.Warn("handling expired objects", zap.Error(err)) + } +} + func (e *StorageEngine) processExpiredTombstones(ctx context.Context, addrs []meta.TombstonedObject) { e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { sh.HandleExpiredTombstones(addrs) diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index 99004b1902..8d07157c17 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -100,6 +100,7 @@ func (e *StorageEngine) createShard(opts []shard.Option) (*shard.Shard, error) { sh := shard.New(append(opts, shard.WithID(id), + shard.WithExpiredObjectsCallback(e.processExpiredObjects), shard.WithExpiredTombstonesCallback(e.processExpiredTombstones), shard.WithExpiredLocksCallback(e.processExpiredLocks), shard.WithDeletedLockCallback(e.processDeletedLocks), diff --git a/pkg/local_object_storage/metabase/exists.go b/pkg/local_object_storage/metabase/exists.go index cf9af7269e..adf8957084 100644 --- a/pkg/local_object_storage/metabase/exists.go +++ b/pkg/local_object_storage/metabase/exists.go @@ -14,7 +14,8 @@ import ( // ExistsPrm groups the parameters of Exists operation. type ExistsPrm struct { - addr oid.Address + addr oid.Address + ignoreExpiration bool } // ExistsRes groups the resulting values of Exists operation. @@ -29,6 +30,11 @@ func (p *ExistsPrm) SetAddress(addr oid.Address) { p.addr = addr } +// IgnoreExpiration returns existence status despite the expiration status. +func (p *ExistsPrm) IgnoreExpiration() { + p.ignoreExpiration = true +} + // Exists returns the fact that the object is in the metabase. func (p ExistsRes) Exists() bool { return p.exists @@ -47,7 +53,10 @@ func (db *DB) Exists(prm ExistsPrm) (res ExistsRes, err error) { return res, ErrDegradedMode } - currEpoch := db.epochState.CurrentEpoch() + var currEpoch uint64 + if !prm.ignoreExpiration { + currEpoch = db.epochState.CurrentEpoch() + } err = db.boltDB.View(func(tx *bbolt.Tx) error { res.exists, err = db.exists(tx, prm.addr, currEpoch) diff --git a/pkg/local_object_storage/shard/exists.go b/pkg/local_object_storage/shard/exists.go index 53bd4f2174..fce28bd675 100644 --- a/pkg/local_object_storage/shard/exists.go +++ b/pkg/local_object_storage/shard/exists.go @@ -8,7 +8,8 @@ import ( // ExistsPrm groups the parameters of Exists operation. type ExistsPrm struct { - addr oid.Address + addr oid.Address + ignoreExpiration bool } // ExistsRes groups the resulting values of Exists operation. @@ -21,6 +22,11 @@ func (p *ExistsPrm) SetAddress(addr oid.Address) { p.addr = addr } +// IgnoreExpiration returns existence status despite the expiration status. +func (p *ExistsPrm) IgnoreExpiration() { + p.ignoreExpiration = true +} + // Exists returns the fact that the object is in the shard. func (p ExistsRes) Exists() bool { return p.ex @@ -50,6 +56,9 @@ func (s *Shard) Exists(prm ExistsPrm) (ExistsRes, error) { } else { var existsPrm meta.ExistsPrm existsPrm.SetAddress(prm.addr) + if prm.ignoreExpiration { + existsPrm.IgnoreExpiration() + } var res meta.ExistsRes res, err = s.metaBase.Exists(existsPrm) diff --git a/pkg/local_object_storage/shard/gc.go b/pkg/local_object_storage/shard/gc.go index 9005ec8f35..81b0daae43 100644 --- a/pkg/local_object_storage/shard/gc.go +++ b/pkg/local_object_storage/shard/gc.go @@ -230,39 +230,26 @@ func (s *Shard) removeGarbage() { } func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) { + epoch := e.(newEpoch).epoch + log := s.log.With(zap.Uint64("epoch", epoch)) + + log.Debug("started expired objects handling") + expired, err := s.getExpiredObjects(ctx, e.(newEpoch).epoch, func(typ object.Type) bool { return typ != object.TypeTombstone && typ != object.TypeLock }) if err != nil || len(expired) == 0 { if err != nil { - s.log.Warn("iterator over expired objects failed", zap.String("error", err.Error())) + log.Warn("iterator over expired objects failed", zap.String("error", err.Error())) } return } - s.m.RLock() - defer s.m.RUnlock() - - if s.info.Mode.NoMetabase() { - return - } + log.Debug("collected expired objects", zap.Int("num", len(expired))) - var inhumePrm meta.InhumePrm + s.expiredObjectsCallback(ctx, expired) - inhumePrm.SetAddresses(expired...) - inhumePrm.SetGCMark() - - // inhume the collected objects - res, err := s.metaBase.Inhume(inhumePrm) - if err != nil { - s.log.Warn("could not inhume the objects", - zap.String("error", err.Error()), - ) - - return - } - - s.decObjectCounterBy(logical, res.AvailableInhumed()) + log.Debug("finished expired objects handling") } func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) { diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index 862bc4b910..9d5e75dccf 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -94,6 +94,8 @@ type cfg struct { gcCfg gcCfg + expiredObjectsCallback ExpiredObjectsCallback + expiredTombstonesCallback ExpiredTombstonesCallback expiredLocksCallback ExpiredObjectsCallback @@ -241,6 +243,14 @@ func WithGCRemoverSleepInterval(dur time.Duration) Option { } } +// WithExpiredObjectsCallback returns option to specify callback +// of the expired objects handler. +func WithExpiredObjectsCallback(cb ExpiredObjectsCallback) Option { + return func(c *cfg) { + c.expiredObjectsCallback = cb + } +} + // WithExpiredTombstonesCallback returns option to specify callback // of the expired tombstones handler. func WithExpiredTombstonesCallback(cb ExpiredTombstonesCallback) Option {