From 53ed18c46b04a084c0b86bbc9333d02e7e025a08 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Wed, 5 Jun 2024 11:23:00 +0300 Subject: [PATCH] gc: handle expired children parts Closes #2858. Signed-off-by: Pavel Karpy --- CHANGELOG.md | 1 + .../engine/container_test.go | 2 +- .../engine/control_test.go | 6 +- .../engine/engine_test.go | 24 ++- pkg/local_object_storage/engine/error_test.go | 2 +- .../engine/evacuate_test.go | 2 +- pkg/local_object_storage/engine/gc_test.go | 174 ++++++++++++++++++ pkg/local_object_storage/engine/inhume.go | 11 ++ pkg/local_object_storage/engine/shards.go | 1 + pkg/local_object_storage/metabase/exists.go | 13 +- pkg/local_object_storage/shard/exists.go | 11 +- pkg/local_object_storage/shard/gc.go | 31 +--- pkg/local_object_storage/shard/shard.go | 10 + 13 files changed, 251 insertions(+), 37 deletions(-) create mode 100644 pkg/local_object_storage/engine/gc_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index ed5f5368d9..5ea6d738ae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ Changelog for NeoFS Node ### Fixed - Unenforced IR `morph.consensus.p2p.peers.min` config default (#2856) +- Object parts do not expire (#2858) ### Changed diff --git a/pkg/local_object_storage/engine/container_test.go b/pkg/local_object_storage/engine/container_test.go index cb3cb90e50..ee27f19e83 100644 --- a/pkg/local_object_storage/engine/container_test.go +++ b/pkg/local_object_storage/engine/container_test.go @@ -40,7 +40,7 @@ func TestStorageEngine_ContainerCleanUp(t *testing.T) { shard.WithMetaBaseOptions( meta.WithPath(filepath.Join(path, fmt.Sprintf("%d.metabase", i))), meta.WithPermissions(0700), - meta.WithEpochState(epochState{}), + meta.WithEpochState(&epochState{}), ), ) require.NoError(t, err) diff --git a/pkg/local_object_storage/engine/control_test.go b/pkg/local_object_storage/engine/control_test.go index 6cf9a19f3e..d06b02f94b 100644 --- a/pkg/local_object_storage/engine/control_test.go +++ b/pkg/local_object_storage/engine/control_test.go @@ -51,7 +51,7 @@ func TestInitializationFailure(t *testing.T) { }), meta.WithPath(c.metabase), meta.WithPermissions(0700), - meta.WithEpochState(epochState{})), + meta.WithEpochState(&epochState{})), shard.WithWriteCache(true), shard.WithWriteCacheOptions(writecache.WithPath(c.writecache)), shard.WithPiloramaOptions(pilorama.WithPath(c.pilorama)), @@ -235,7 +235,7 @@ func TestReload(t *testing.T) { // add new shard rcfg.AddShard(newMeta, []shard.Option{shard.WithMetaBaseOptions( meta.WithPath(newMeta), - meta.WithEpochState(epochState{}), + meta.WithEpochState(&epochState{}), )}) require.NoError(t, e.Reload(rcfg)) @@ -278,7 +278,7 @@ func engineWithShards(t *testing.T, path string, num int) (*StorageEngine, []str shard.WithMetaBaseOptions( meta.WithPath(filepath.Join(addPath, fmt.Sprintf("%d.metabase", i))), meta.WithPermissions(0700), - meta.WithEpochState(epochState{}), + meta.WithEpochState(&epochState{}), ), ) require.NoError(t, err) diff --git a/pkg/local_object_storage/engine/engine_test.go b/pkg/local_object_storage/engine/engine_test.go index 7f4d7e2f29..617355a044 100644 --- a/pkg/local_object_storage/engine/engine_test.go +++ b/pkg/local_object_storage/engine/engine_test.go @@ -14,6 +14,7 @@ import ( meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/pilorama" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" + "github.com/nspcc-dev/neofs-node/pkg/util" "github.com/nspcc-dev/neofs-sdk-go/checksum" checksumtest "github.com/nspcc-dev/neofs-sdk-go/checksum/test" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" @@ -28,10 +29,12 @@ import ( "go.uber.org/zap" ) -type epochState struct{} +type epochState struct { + e uint64 +} -func (s epochState) CurrentEpoch() uint64 { - return 0 +func (s *epochState) CurrentEpoch() uint64 { + return s.e } func BenchmarkExists(b *testing.B) { @@ -127,8 +130,16 @@ func testNewShard(t testing.TB, id int) *shard.Shard { shard.WithMetaBaseOptions( meta.WithPath(filepath.Join(t.Name(), fmt.Sprintf("%d.metabase", id))), meta.WithPermissions(0700), - meta.WithEpochState(epochState{}), - )) + meta.WithEpochState(&epochState{}), + ), + shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool { + pool, err := ants.NewPool(sz) + if err != nil { + panic(err) + } + + return pool + })) require.NoError(t, s.Open()) require.NoError(t, s.Init()) @@ -148,10 +159,11 @@ func testEngineFromShardOpts(t *testing.T, num int, extraOpts []shard.Option) *S shard.WithMetaBaseOptions( meta.WithPath(filepath.Join(t.Name(), fmt.Sprintf("metabase%d", i))), meta.WithPermissions(0700), - meta.WithEpochState(epochState{}), + meta.WithEpochState(&epochState{}), ), shard.WithPiloramaOptions( pilorama.WithPath(filepath.Join(t.Name(), fmt.Sprintf("pilorama%d", i)))), + shard.WithExpiredObjectsCallback(engine.processExpiredObjects), }, extraOpts...)...) require.NoError(t, err) } diff --git a/pkg/local_object_storage/engine/error_test.go b/pkg/local_object_storage/engine/error_test.go index 3e052d3b57..0743bca8e1 100644 --- a/pkg/local_object_storage/engine/error_test.go +++ b/pkg/local_object_storage/engine/error_test.go @@ -44,7 +44,7 @@ func newEngine(t testing.TB, dir string, opts ...Option) (*StorageEngine, string shard.WithMetaBaseOptions( meta.WithPath(filepath.Join(dir, fmt.Sprintf("%d.metabase", i))), meta.WithPermissions(0700), - meta.WithEpochState(epochState{}), + meta.WithEpochState(&epochState{}), ), shard.WithPiloramaOptions( pilorama.WithPath(filepath.Join(dir, fmt.Sprintf("%d.pilorama", i))), diff --git a/pkg/local_object_storage/engine/evacuate_test.go b/pkg/local_object_storage/engine/evacuate_test.go index 29563217f5..5edb611d3c 100644 --- a/pkg/local_object_storage/engine/evacuate_test.go +++ b/pkg/local_object_storage/engine/evacuate_test.go @@ -44,7 +44,7 @@ func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEng shard.WithMetaBaseOptions( meta.WithPath(filepath.Join(dir, fmt.Sprintf("%d.metabase", i))), meta.WithPermissions(0700), - meta.WithEpochState(epochState{}), + meta.WithEpochState(&epochState{}), )) require.NoError(t, err) } diff --git a/pkg/local_object_storage/engine/gc_test.go b/pkg/local_object_storage/engine/gc_test.go new file mode 100644 index 0000000000..eb833ef78c --- /dev/null +++ b/pkg/local_object_storage/engine/gc_test.go @@ -0,0 +1,174 @@ +package engine + +import ( + "errors" + "fmt" + "path/filepath" + "testing" + "time" + + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" + meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/pilorama" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" + "github.com/nspcc-dev/neofs-node/pkg/util" + statusSDK "github.com/nspcc-dev/neofs-sdk-go/client/status" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" + objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + usertest "github.com/nspcc-dev/neofs-sdk-go/user/test" + "github.com/panjf2000/ants/v2" + "github.com/stretchr/testify/require" +) + +func TestChildrenExpiration(t *testing.T) { + const numOfShards = 5 + const currEpoch = 10 + es := &epochState{e: currEpoch} + owner := usertest.ID(t) + + e := New() + for i := 0; i < numOfShards; i++ { + _, err := e.AddShard( + shard.WithBlobStorOptions( + blobstor.WithStorages( + newStorages(filepath.Join(t.TempDir(), fmt.Sprintf("blobstor%d", i)), + 1<<20)), + ), + shard.WithMetaBaseOptions( + meta.WithPath(filepath.Join(t.TempDir(), fmt.Sprintf("metabase%d", i))), + meta.WithPermissions(0700), + meta.WithEpochState(es), + ), + shard.WithPiloramaOptions( + pilorama.WithPath(filepath.Join(t.TempDir(), fmt.Sprintf("pilorama%d", i)))), + shard.WithExpiredObjectsCallback(e.processExpiredObjects), + shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool { + pool, err := ants.NewPool(sz) + if err != nil { + panic(err) + } + + return pool + }), + ) + require.NoError(t, err) + } + require.NoError(t, e.Open()) + require.NoError(t, e.Init()) + t.Cleanup(func() { + _ = e.Close() + }) + + expAttr := *objectSDK.NewAttribute(objectSDK.AttributeExpirationEpoch, fmt.Sprint(currEpoch)) + + t.Run("V1", func(t *testing.T) { + cnr := cidtest.ID() + splitID := objectSDK.NewSplitID() + + parent := generateObjectWithCID(t, cnr) + parentID, _ := parent.ID() + parent.SetAttributes(expAttr) + + child1 := generateObjectWithCID(t, cnr) + child1ID, _ := child1.ID() + child1.SetSplitID(splitID) + + child2 := generateObjectWithCID(t, cnr) + child2ID, _ := child2.ID() + child2.SetSplitID(splitID) + child2.SetPreviousID(child1ID) + + child3 := generateObjectWithCID(t, cnr) + child3ID, _ := child3.ID() + child3.SetSplitID(splitID) + child3.SetPreviousID(child2ID) + child3.SetParent(parent) + child3.SetParentID(parentID) + + link := generateObjectWithCID(t, cnr) + link.SetParent(parent) + link.SetParentID(parentID) + link.SetChildren(child1ID, child2ID, child3ID) + link.SetSplitID(splitID) + + require.NoError(t, Put(e, child1)) + require.NoError(t, Put(e, child2)) + require.NoError(t, Put(e, child3)) + require.NoError(t, Put(e, link)) + + e.HandleNewEpoch(currEpoch + 1) + + checkObjectsAsyncRemoval(t, e, cnr, child1ID, child2ID, child3ID) + }) + + t.Run("V2", func(t *testing.T) { + cnr := cidtest.ID() + + parent := generateObjectWithCID(t, cnr) + parentID, _ := parent.ID() + parent.SetAttributes(expAttr) + + child1 := generateObjectWithCID(t, cnr) + child1ID, _ := child1.ID() + child1.SetParent(parent) + + child2 := generateObjectWithCID(t, cnr) + child2ID, _ := child2.ID() + child2.SetFirstID(child1ID) + child2.SetPreviousID(child1ID) + + child3 := generateObjectWithCID(t, cnr) + child3ID, _ := child3.ID() + child3.SetFirstID(child1ID) + child3.SetPreviousID(child2ID) + child3.SetParent(parent) + child3.SetParentID(parentID) + + children := make([]objectSDK.MeasuredObject, 3) + children[0].SetObjectID(child1ID) + children[1].SetObjectID(child2ID) + children[2].SetObjectID(child3ID) + + var link objectSDK.Link + link.SetObjects(children) + + var linkObj objectSDK.Object + linkObj.WriteLink(link) + linkObj.SetContainerID(cnr) + linkObj.SetParent(parent) + linkObj.SetParentID(parentID) + linkObj.SetFirstID(child1ID) + linkObj.SetOwnerID(&owner) + linkObj.CalculateAndSetPayloadChecksum() + require.NoError(t, linkObj.CalculateAndSetID()) + + require.NoError(t, Put(e, child1)) + require.NoError(t, Put(e, child2)) + require.NoError(t, Put(e, child3)) + require.NoError(t, Put(e, &linkObj)) + + e.HandleNewEpoch(currEpoch + 1) + + checkObjectsAsyncRemoval(t, e, cnr, child1ID, child2ID, child3ID) + }) +} + +func checkObjectsAsyncRemoval(t *testing.T, e *StorageEngine, cnr cid.ID, objs ...oid.ID) { + require.Eventually(t, func() bool { + var addr oid.Address + addr.SetContainer(cnr) + + for _, obj := range objs { + addr.SetObject(obj) + + _, err := Get(e, addr) + if !errors.As(err, new(statusSDK.ObjectNotFound)) { + return false + } + } + + return true + }, 1*time.Second, 100*time.Millisecond) +} diff --git a/pkg/local_object_storage/engine/inhume.go b/pkg/local_object_storage/engine/inhume.go index 062974cb1d..54576af569 100644 --- a/pkg/local_object_storage/engine/inhume.go +++ b/pkg/local_object_storage/engine/inhume.go @@ -149,6 +149,7 @@ func (e *StorageEngine) inhumeAddr(addr oid.Address, prm shard.InhumePrm) (bool, // see if the object is root e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { existPrm.SetAddress(addr) + existPrm.IgnoreExpiration() res, err := sh.Exists(existPrm) if err != nil { @@ -321,6 +322,16 @@ func (e *StorageEngine) isLocked(addr oid.Address) (bool, error) { return locked, outErr } +func (e *StorageEngine) processExpiredObjects(_ context.Context, addrs []oid.Address) { + var prm InhumePrm + prm.MarkAsGarbage(addrs...) + + _, err := e.Inhume(prm) + if err != nil { + e.log.Warn("handling expired objects", zap.Error(err)) + } +} + func (e *StorageEngine) processExpiredTombstones(ctx context.Context, addrs []meta.TombstonedObject) { e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { sh.HandleExpiredTombstones(addrs) diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index 99004b1902..8d07157c17 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -100,6 +100,7 @@ func (e *StorageEngine) createShard(opts []shard.Option) (*shard.Shard, error) { sh := shard.New(append(opts, shard.WithID(id), + shard.WithExpiredObjectsCallback(e.processExpiredObjects), shard.WithExpiredTombstonesCallback(e.processExpiredTombstones), shard.WithExpiredLocksCallback(e.processExpiredLocks), shard.WithDeletedLockCallback(e.processDeletedLocks), diff --git a/pkg/local_object_storage/metabase/exists.go b/pkg/local_object_storage/metabase/exists.go index cf9af7269e..adf8957084 100644 --- a/pkg/local_object_storage/metabase/exists.go +++ b/pkg/local_object_storage/metabase/exists.go @@ -14,7 +14,8 @@ import ( // ExistsPrm groups the parameters of Exists operation. type ExistsPrm struct { - addr oid.Address + addr oid.Address + ignoreExpiration bool } // ExistsRes groups the resulting values of Exists operation. @@ -29,6 +30,11 @@ func (p *ExistsPrm) SetAddress(addr oid.Address) { p.addr = addr } +// IgnoreExpiration returns existence status despite the expiration status. +func (p *ExistsPrm) IgnoreExpiration() { + p.ignoreExpiration = true +} + // Exists returns the fact that the object is in the metabase. func (p ExistsRes) Exists() bool { return p.exists @@ -47,7 +53,10 @@ func (db *DB) Exists(prm ExistsPrm) (res ExistsRes, err error) { return res, ErrDegradedMode } - currEpoch := db.epochState.CurrentEpoch() + var currEpoch uint64 + if !prm.ignoreExpiration { + currEpoch = db.epochState.CurrentEpoch() + } err = db.boltDB.View(func(tx *bbolt.Tx) error { res.exists, err = db.exists(tx, prm.addr, currEpoch) diff --git a/pkg/local_object_storage/shard/exists.go b/pkg/local_object_storage/shard/exists.go index 53bd4f2174..fce28bd675 100644 --- a/pkg/local_object_storage/shard/exists.go +++ b/pkg/local_object_storage/shard/exists.go @@ -8,7 +8,8 @@ import ( // ExistsPrm groups the parameters of Exists operation. type ExistsPrm struct { - addr oid.Address + addr oid.Address + ignoreExpiration bool } // ExistsRes groups the resulting values of Exists operation. @@ -21,6 +22,11 @@ func (p *ExistsPrm) SetAddress(addr oid.Address) { p.addr = addr } +// IgnoreExpiration returns existence status despite the expiration status. +func (p *ExistsPrm) IgnoreExpiration() { + p.ignoreExpiration = true +} + // Exists returns the fact that the object is in the shard. func (p ExistsRes) Exists() bool { return p.ex @@ -50,6 +56,9 @@ func (s *Shard) Exists(prm ExistsPrm) (ExistsRes, error) { } else { var existsPrm meta.ExistsPrm existsPrm.SetAddress(prm.addr) + if prm.ignoreExpiration { + existsPrm.IgnoreExpiration() + } var res meta.ExistsRes res, err = s.metaBase.Exists(existsPrm) diff --git a/pkg/local_object_storage/shard/gc.go b/pkg/local_object_storage/shard/gc.go index 9005ec8f35..81b0daae43 100644 --- a/pkg/local_object_storage/shard/gc.go +++ b/pkg/local_object_storage/shard/gc.go @@ -230,39 +230,26 @@ func (s *Shard) removeGarbage() { } func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) { + epoch := e.(newEpoch).epoch + log := s.log.With(zap.Uint64("epoch", epoch)) + + log.Debug("started expired objects handling") + expired, err := s.getExpiredObjects(ctx, e.(newEpoch).epoch, func(typ object.Type) bool { return typ != object.TypeTombstone && typ != object.TypeLock }) if err != nil || len(expired) == 0 { if err != nil { - s.log.Warn("iterator over expired objects failed", zap.String("error", err.Error())) + log.Warn("iterator over expired objects failed", zap.String("error", err.Error())) } return } - s.m.RLock() - defer s.m.RUnlock() - - if s.info.Mode.NoMetabase() { - return - } + log.Debug("collected expired objects", zap.Int("num", len(expired))) - var inhumePrm meta.InhumePrm + s.expiredObjectsCallback(ctx, expired) - inhumePrm.SetAddresses(expired...) - inhumePrm.SetGCMark() - - // inhume the collected objects - res, err := s.metaBase.Inhume(inhumePrm) - if err != nil { - s.log.Warn("could not inhume the objects", - zap.String("error", err.Error()), - ) - - return - } - - s.decObjectCounterBy(logical, res.AvailableInhumed()) + log.Debug("finished expired objects handling") } func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) { diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index 862bc4b910..9d5e75dccf 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -94,6 +94,8 @@ type cfg struct { gcCfg gcCfg + expiredObjectsCallback ExpiredObjectsCallback + expiredTombstonesCallback ExpiredTombstonesCallback expiredLocksCallback ExpiredObjectsCallback @@ -241,6 +243,14 @@ func WithGCRemoverSleepInterval(dur time.Duration) Option { } } +// WithExpiredObjectsCallback returns option to specify callback +// of the expired objects handler. +func WithExpiredObjectsCallback(cb ExpiredObjectsCallback) Option { + return func(c *cfg) { + c.expiredObjectsCallback = cb + } +} + // WithExpiredTombstonesCallback returns option to specify callback // of the expired tombstones handler. func WithExpiredTombstonesCallback(cb ExpiredTombstonesCallback) Option {