Skip to content

Commit

Permalink
gc: handle expired children parts
Browse files Browse the repository at this point in the history
Closes #2858.

Signed-off-by: Pavel Karpy <[email protected]>
  • Loading branch information
carpawell committed Jun 10, 2024
1 parent ef33b39 commit 53ed18c
Show file tree
Hide file tree
Showing 13 changed files with 251 additions and 37 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Changelog for NeoFS Node

### Fixed
- Unenforced IR `morph.consensus.p2p.peers.min` config default (#2856)
- Object parts do not expire (#2858)

### Changed

Expand Down
2 changes: 1 addition & 1 deletion pkg/local_object_storage/engine/container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestStorageEngine_ContainerCleanUp(t *testing.T) {
shard.WithMetaBaseOptions(
meta.WithPath(filepath.Join(path, fmt.Sprintf("%d.metabase", i))),
meta.WithPermissions(0700),
meta.WithEpochState(epochState{}),
meta.WithEpochState(&epochState{}),
),
)
require.NoError(t, err)
Expand Down
6 changes: 3 additions & 3 deletions pkg/local_object_storage/engine/control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestInitializationFailure(t *testing.T) {
}),
meta.WithPath(c.metabase),
meta.WithPermissions(0700),
meta.WithEpochState(epochState{})),
meta.WithEpochState(&epochState{})),
shard.WithWriteCache(true),
shard.WithWriteCacheOptions(writecache.WithPath(c.writecache)),
shard.WithPiloramaOptions(pilorama.WithPath(c.pilorama)),
Expand Down Expand Up @@ -235,7 +235,7 @@ func TestReload(t *testing.T) {
// add new shard
rcfg.AddShard(newMeta, []shard.Option{shard.WithMetaBaseOptions(
meta.WithPath(newMeta),
meta.WithEpochState(epochState{}),
meta.WithEpochState(&epochState{}),
)})
require.NoError(t, e.Reload(rcfg))

Expand Down Expand Up @@ -278,7 +278,7 @@ func engineWithShards(t *testing.T, path string, num int) (*StorageEngine, []str
shard.WithMetaBaseOptions(
meta.WithPath(filepath.Join(addPath, fmt.Sprintf("%d.metabase", i))),
meta.WithPermissions(0700),
meta.WithEpochState(epochState{}),
meta.WithEpochState(&epochState{}),
),
)
require.NoError(t, err)
Expand Down
24 changes: 18 additions & 6 deletions pkg/local_object_storage/engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/pilorama"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
"github.com/nspcc-dev/neofs-node/pkg/util"
"github.com/nspcc-dev/neofs-sdk-go/checksum"
checksumtest "github.com/nspcc-dev/neofs-sdk-go/checksum/test"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
Expand All @@ -28,10 +29,12 @@ import (
"go.uber.org/zap"
)

type epochState struct{}
type epochState struct {
e uint64
}

func (s epochState) CurrentEpoch() uint64 {
return 0
func (s *epochState) CurrentEpoch() uint64 {
return s.e
}

func BenchmarkExists(b *testing.B) {
Expand Down Expand Up @@ -127,8 +130,16 @@ func testNewShard(t testing.TB, id int) *shard.Shard {
shard.WithMetaBaseOptions(
meta.WithPath(filepath.Join(t.Name(), fmt.Sprintf("%d.metabase", id))),
meta.WithPermissions(0700),
meta.WithEpochState(epochState{}),
))
meta.WithEpochState(&epochState{}),
),
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
pool, err := ants.NewPool(sz)
if err != nil {
panic(err)
}

return pool
}))

require.NoError(t, s.Open())
require.NoError(t, s.Init())
Expand All @@ -148,10 +159,11 @@ func testEngineFromShardOpts(t *testing.T, num int, extraOpts []shard.Option) *S
shard.WithMetaBaseOptions(
meta.WithPath(filepath.Join(t.Name(), fmt.Sprintf("metabase%d", i))),
meta.WithPermissions(0700),
meta.WithEpochState(epochState{}),
meta.WithEpochState(&epochState{}),
),
shard.WithPiloramaOptions(
pilorama.WithPath(filepath.Join(t.Name(), fmt.Sprintf("pilorama%d", i)))),
shard.WithExpiredObjectsCallback(engine.processExpiredObjects),
}, extraOpts...)...)
require.NoError(t, err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/local_object_storage/engine/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func newEngine(t testing.TB, dir string, opts ...Option) (*StorageEngine, string
shard.WithMetaBaseOptions(
meta.WithPath(filepath.Join(dir, fmt.Sprintf("%d.metabase", i))),
meta.WithPermissions(0700),
meta.WithEpochState(epochState{}),
meta.WithEpochState(&epochState{}),
),
shard.WithPiloramaOptions(
pilorama.WithPath(filepath.Join(dir, fmt.Sprintf("%d.pilorama", i))),
Expand Down
2 changes: 1 addition & 1 deletion pkg/local_object_storage/engine/evacuate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEng
shard.WithMetaBaseOptions(
meta.WithPath(filepath.Join(dir, fmt.Sprintf("%d.metabase", i))),
meta.WithPermissions(0700),
meta.WithEpochState(epochState{}),
meta.WithEpochState(&epochState{}),
))
require.NoError(t, err)
}
Expand Down
174 changes: 174 additions & 0 deletions pkg/local_object_storage/engine/gc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package engine

import (
"errors"
"fmt"
"path/filepath"
"testing"
"time"

"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/pilorama"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
"github.com/nspcc-dev/neofs-node/pkg/util"
statusSDK "github.com/nspcc-dev/neofs-sdk-go/client/status"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
usertest "github.com/nspcc-dev/neofs-sdk-go/user/test"
"github.com/panjf2000/ants/v2"
"github.com/stretchr/testify/require"
)

func TestChildrenExpiration(t *testing.T) {
const numOfShards = 5
const currEpoch = 10
es := &epochState{e: currEpoch}
owner := usertest.ID(t)

e := New()
for i := 0; i < numOfShards; i++ {
_, err := e.AddShard(
shard.WithBlobStorOptions(
blobstor.WithStorages(
newStorages(filepath.Join(t.TempDir(), fmt.Sprintf("blobstor%d", i)),
1<<20)),
),
shard.WithMetaBaseOptions(
meta.WithPath(filepath.Join(t.TempDir(), fmt.Sprintf("metabase%d", i))),
meta.WithPermissions(0700),
meta.WithEpochState(es),
),
shard.WithPiloramaOptions(
pilorama.WithPath(filepath.Join(t.TempDir(), fmt.Sprintf("pilorama%d", i)))),
shard.WithExpiredObjectsCallback(e.processExpiredObjects),
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
pool, err := ants.NewPool(sz)
if err != nil {
panic(err)
}

return pool
}),
)
require.NoError(t, err)
}
require.NoError(t, e.Open())
require.NoError(t, e.Init())
t.Cleanup(func() {
_ = e.Close()
})

expAttr := *objectSDK.NewAttribute(objectSDK.AttributeExpirationEpoch, fmt.Sprint(currEpoch))

t.Run("V1", func(t *testing.T) {
cnr := cidtest.ID()
splitID := objectSDK.NewSplitID()

parent := generateObjectWithCID(t, cnr)
parentID, _ := parent.ID()
parent.SetAttributes(expAttr)

child1 := generateObjectWithCID(t, cnr)
child1ID, _ := child1.ID()
child1.SetSplitID(splitID)

child2 := generateObjectWithCID(t, cnr)
child2ID, _ := child2.ID()
child2.SetSplitID(splitID)
child2.SetPreviousID(child1ID)

child3 := generateObjectWithCID(t, cnr)
child3ID, _ := child3.ID()
child3.SetSplitID(splitID)
child3.SetPreviousID(child2ID)
child3.SetParent(parent)
child3.SetParentID(parentID)

link := generateObjectWithCID(t, cnr)
link.SetParent(parent)
link.SetParentID(parentID)
link.SetChildren(child1ID, child2ID, child3ID)
link.SetSplitID(splitID)

require.NoError(t, Put(e, child1))
require.NoError(t, Put(e, child2))
require.NoError(t, Put(e, child3))
require.NoError(t, Put(e, link))

e.HandleNewEpoch(currEpoch + 1)

checkObjectsAsyncRemoval(t, e, cnr, child1ID, child2ID, child3ID)
})

t.Run("V2", func(t *testing.T) {
cnr := cidtest.ID()

parent := generateObjectWithCID(t, cnr)
parentID, _ := parent.ID()
parent.SetAttributes(expAttr)

child1 := generateObjectWithCID(t, cnr)
child1ID, _ := child1.ID()
child1.SetParent(parent)

child2 := generateObjectWithCID(t, cnr)
child2ID, _ := child2.ID()
child2.SetFirstID(child1ID)
child2.SetPreviousID(child1ID)

child3 := generateObjectWithCID(t, cnr)
child3ID, _ := child3.ID()
child3.SetFirstID(child1ID)
child3.SetPreviousID(child2ID)
child3.SetParent(parent)
child3.SetParentID(parentID)

children := make([]objectSDK.MeasuredObject, 3)
children[0].SetObjectID(child1ID)
children[1].SetObjectID(child2ID)
children[2].SetObjectID(child3ID)

var link objectSDK.Link
link.SetObjects(children)

var linkObj objectSDK.Object
linkObj.WriteLink(link)
linkObj.SetContainerID(cnr)
linkObj.SetParent(parent)
linkObj.SetParentID(parentID)
linkObj.SetFirstID(child1ID)
linkObj.SetOwnerID(&owner)
linkObj.CalculateAndSetPayloadChecksum()
require.NoError(t, linkObj.CalculateAndSetID())

require.NoError(t, Put(e, child1))
require.NoError(t, Put(e, child2))
require.NoError(t, Put(e, child3))
require.NoError(t, Put(e, &linkObj))

e.HandleNewEpoch(currEpoch + 1)

checkObjectsAsyncRemoval(t, e, cnr, child1ID, child2ID, child3ID)
})
}

func checkObjectsAsyncRemoval(t *testing.T, e *StorageEngine, cnr cid.ID, objs ...oid.ID) {
require.Eventually(t, func() bool {
var addr oid.Address
addr.SetContainer(cnr)

for _, obj := range objs {
addr.SetObject(obj)

_, err := Get(e, addr)
if !errors.As(err, new(statusSDK.ObjectNotFound)) {
return false
}
}

return true
}, 1*time.Second, 100*time.Millisecond)
}
11 changes: 11 additions & 0 deletions pkg/local_object_storage/engine/inhume.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func (e *StorageEngine) inhumeAddr(addr oid.Address, prm shard.InhumePrm) (bool,
// see if the object is root
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
existPrm.SetAddress(addr)
existPrm.IgnoreExpiration()

res, err := sh.Exists(existPrm)
if err != nil {
Expand Down Expand Up @@ -321,6 +322,16 @@ func (e *StorageEngine) isLocked(addr oid.Address) (bool, error) {
return locked, outErr
}

func (e *StorageEngine) processExpiredObjects(_ context.Context, addrs []oid.Address) {
var prm InhumePrm
prm.MarkAsGarbage(addrs...)

_, err := e.Inhume(prm)
if err != nil {
e.log.Warn("handling expired objects", zap.Error(err))
}
}

func (e *StorageEngine) processExpiredTombstones(ctx context.Context, addrs []meta.TombstonedObject) {
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
sh.HandleExpiredTombstones(addrs)
Expand Down
1 change: 1 addition & 0 deletions pkg/local_object_storage/engine/shards.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func (e *StorageEngine) createShard(opts []shard.Option) (*shard.Shard, error) {

sh := shard.New(append(opts,
shard.WithID(id),
shard.WithExpiredObjectsCallback(e.processExpiredObjects),
shard.WithExpiredTombstonesCallback(e.processExpiredTombstones),
shard.WithExpiredLocksCallback(e.processExpiredLocks),
shard.WithDeletedLockCallback(e.processDeletedLocks),
Expand Down
13 changes: 11 additions & 2 deletions pkg/local_object_storage/metabase/exists.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ import (

// ExistsPrm groups the parameters of Exists operation.
type ExistsPrm struct {
addr oid.Address
addr oid.Address
ignoreExpiration bool
}

// ExistsRes groups the resulting values of Exists operation.
Expand All @@ -29,6 +30,11 @@ func (p *ExistsPrm) SetAddress(addr oid.Address) {
p.addr = addr
}

// IgnoreExpiration returns existence status despite the expiration status.
func (p *ExistsPrm) IgnoreExpiration() {
p.ignoreExpiration = true
}

// Exists returns the fact that the object is in the metabase.
func (p ExistsRes) Exists() bool {
return p.exists
Expand All @@ -47,7 +53,10 @@ func (db *DB) Exists(prm ExistsPrm) (res ExistsRes, err error) {
return res, ErrDegradedMode
}

currEpoch := db.epochState.CurrentEpoch()
var currEpoch uint64
if !prm.ignoreExpiration {
currEpoch = db.epochState.CurrentEpoch()
}

err = db.boltDB.View(func(tx *bbolt.Tx) error {
res.exists, err = db.exists(tx, prm.addr, currEpoch)
Expand Down
11 changes: 10 additions & 1 deletion pkg/local_object_storage/shard/exists.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import (

// ExistsPrm groups the parameters of Exists operation.
type ExistsPrm struct {
addr oid.Address
addr oid.Address
ignoreExpiration bool
}

// ExistsRes groups the resulting values of Exists operation.
Expand All @@ -21,6 +22,11 @@ func (p *ExistsPrm) SetAddress(addr oid.Address) {
p.addr = addr
}

// IgnoreExpiration returns existence status despite the expiration status.
func (p *ExistsPrm) IgnoreExpiration() {
p.ignoreExpiration = true
}

// Exists returns the fact that the object is in the shard.
func (p ExistsRes) Exists() bool {
return p.ex
Expand Down Expand Up @@ -50,6 +56,9 @@ func (s *Shard) Exists(prm ExistsPrm) (ExistsRes, error) {
} else {
var existsPrm meta.ExistsPrm
existsPrm.SetAddress(prm.addr)
if prm.ignoreExpiration {
existsPrm.IgnoreExpiration()
}

var res meta.ExistsRes
res, err = s.metaBase.Exists(existsPrm)
Expand Down
Loading

0 comments on commit 53ed18c

Please sign in to comment.