Skip to content

Commit

Permalink
Fix/GC big objects parts (#2863)
Browse files Browse the repository at this point in the history
Closes #2858.
  • Loading branch information
roman-khimov authored Jun 13, 2024
2 parents 9c049a2 + 658cbde commit 690c64d
Show file tree
Hide file tree
Showing 11 changed files with 268 additions and 51 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Changelog for NeoFS Node

### Fixed
- Unenforced IR `morph.consensus.p2p.peers.min` config default (#2856)
- Object parts do not expire (#2858)

### Changed

Expand Down
31 changes: 8 additions & 23 deletions pkg/core/object/fmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"strconv"

"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
Expand Down Expand Up @@ -78,8 +77,6 @@ var errNilID = errors.New("missing identifier")

var errNilCID = errors.New("missing container identifier")

var errNoExpirationEpoch = errors.New("missing expiration epoch attribute")

var errTombstoneExpiration = errors.New("tombstone body and header contain different expiration values")

var errEmptySGMembers = errors.New("storage group with empty members list")
Expand Down Expand Up @@ -170,7 +167,7 @@ func (v *FormatValidator) Validate(obj *object.Object, unprepared bool) error {
return fmt.Errorf("(%T) could not validate signature key: %w", v, err)
}

if err := v.checkExpiration(obj); err != nil {
if err := v.checkExpiration(*obj); err != nil {
return fmt.Errorf("object did not pass expiration check: %w", err)
}

Expand Down Expand Up @@ -288,7 +285,7 @@ func (v *FormatValidator) ValidateContent(o *object.Object) (ContentMeta, error)
}

// check if the tombstone has the same expiration in the body and the header
exp, err := expirationEpochAttribute(o)
exp, err := Expiration(*o)
if err != nil {
return ContentMeta{}, err
}
Expand Down Expand Up @@ -353,7 +350,7 @@ func (v *FormatValidator) ValidateContent(o *object.Object) (ContentMeta, error)
}

// check that LOCK object has correct expiration epoch
lockExp, err := expirationEpochAttribute(o)
lockExp, err := Expiration(*o)
if err != nil {
return ContentMeta{}, fmt.Errorf("lock object expiration epoch: %w", err)
}
Expand Down Expand Up @@ -385,17 +382,17 @@ func (v *FormatValidator) ValidateContent(o *object.Object) (ContentMeta, error)

var errExpired = errors.New("object has expired")

func (v *FormatValidator) checkExpiration(obj *object.Object) error {
exp, err := expirationEpochAttribute(obj)
func (v *FormatValidator) checkExpiration(obj object.Object) error {
exp, err := Expiration(obj)
if err != nil {
if errors.Is(err, errNoExpirationEpoch) {
if errors.Is(err, ErrNoExpiration) {
return nil // objects without expiration attribute are valid
}

return err
}

if exp < v.netState.CurrentEpoch() {
if currEpoch := v.netState.CurrentEpoch(); exp < currEpoch {
// an object could be expired but locked;
// put such an object is a correct operation

Expand All @@ -412,25 +409,13 @@ func (v *FormatValidator) checkExpiration(obj *object.Object) error {
}

if !locked {
return errExpired
return fmt.Errorf("%w: attribute: %d, current: %d", errExpired, exp, currEpoch)
}
}

return nil
}

func expirationEpochAttribute(obj *object.Object) (uint64, error) {
for _, a := range obj.Attributes() {
if a.Key() != object.AttributeExpirationEpoch {
continue
}

return strconv.ParseUint(a.Value(), 10, 64)
}

return 0, errNoExpirationEpoch
}

var (
errDuplAttr = errors.New("duplication of attributes detected")
errEmptyAttrVal = errors.New("empty attribute value")
Expand Down
18 changes: 18 additions & 0 deletions pkg/core/object/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package object

import (
"errors"
"strconv"

"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
Expand All @@ -26,3 +27,20 @@ func AddressOf(obj *object.Object) oid.Address {

return addr
}

// ErrNoExpiration means no expiration was set.
var ErrNoExpiration = errors.New("missing expiration epoch attribute")

// Expiration searches for expiration attribute in the object. Returns
// ErrNoExpiration if not found.
func Expiration(obj object.Object) (uint64, error) {
for _, a := range obj.Attributes() {
if a.Key() != object.AttributeExpirationEpoch {
continue
}

return strconv.ParseUint(a.Value(), 10, 64)
}

return 0, ErrNoExpiration
}
18 changes: 15 additions & 3 deletions pkg/local_object_storage/engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/pilorama"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
"github.com/nspcc-dev/neofs-node/pkg/util"
"github.com/nspcc-dev/neofs-sdk-go/checksum"
checksumtest "github.com/nspcc-dev/neofs-sdk-go/checksum/test"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
Expand All @@ -28,10 +29,12 @@ import (
"go.uber.org/zap"
)

type epochState struct{}
type epochState struct {
e uint64
}

func (s epochState) CurrentEpoch() uint64 {
return 0
return s.e
}

func BenchmarkExists(b *testing.B) {
Expand Down Expand Up @@ -128,7 +131,15 @@ func testNewShard(t testing.TB, id int) *shard.Shard {
meta.WithPath(filepath.Join(t.Name(), fmt.Sprintf("%d.metabase", id))),
meta.WithPermissions(0700),
meta.WithEpochState(epochState{}),
))
),
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
pool, err := ants.NewPool(sz)
if err != nil {
panic(err)
}

return pool
}))

require.NoError(t, s.Open())
require.NoError(t, s.Init())
Expand All @@ -152,6 +163,7 @@ func testEngineFromShardOpts(t *testing.T, num int, extraOpts []shard.Option) *S
),
shard.WithPiloramaOptions(
pilorama.WithPath(filepath.Join(t.Name(), fmt.Sprintf("pilorama%d", i)))),
shard.WithExpiredObjectsCallback(engine.processExpiredObjects),
}, extraOpts...)...)
require.NoError(t, err)
}
Expand Down
174 changes: 174 additions & 0 deletions pkg/local_object_storage/engine/gc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package engine

import (
"errors"
"fmt"
"path/filepath"
"testing"
"time"

"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/pilorama"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
"github.com/nspcc-dev/neofs-node/pkg/util"
statusSDK "github.com/nspcc-dev/neofs-sdk-go/client/status"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
usertest "github.com/nspcc-dev/neofs-sdk-go/user/test"
"github.com/panjf2000/ants/v2"
"github.com/stretchr/testify/require"
)

func TestChildrenExpiration(t *testing.T) {
const numOfShards = 5
const currEpoch = 10
es := &epochState{e: currEpoch}
owner := usertest.ID(t)

e := New()
for i := 0; i < numOfShards; i++ {
_, err := e.AddShard(
shard.WithBlobStorOptions(
blobstor.WithStorages(
newStorages(filepath.Join(t.TempDir(), fmt.Sprintf("blobstor%d", i)),
1<<20)),
),
shard.WithMetaBaseOptions(
meta.WithPath(filepath.Join(t.TempDir(), fmt.Sprintf("metabase%d", i))),
meta.WithPermissions(0700),
meta.WithEpochState(es),
),
shard.WithPiloramaOptions(
pilorama.WithPath(filepath.Join(t.TempDir(), fmt.Sprintf("pilorama%d", i)))),
shard.WithExpiredObjectsCallback(e.processExpiredObjects),
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
pool, err := ants.NewPool(sz)
if err != nil {
panic(err)
}

return pool
}),
)
require.NoError(t, err)
}
require.NoError(t, e.Open())
require.NoError(t, e.Init())
t.Cleanup(func() {
_ = e.Close()
})

expAttr := *objectSDK.NewAttribute(objectSDK.AttributeExpirationEpoch, fmt.Sprint(currEpoch))

t.Run("V1", func(t *testing.T) {
cnr := cidtest.ID()
splitID := objectSDK.NewSplitID()

parent := generateObjectWithCID(t, cnr)
parentID, _ := parent.ID()
parent.SetAttributes(expAttr)

child1 := generateObjectWithCID(t, cnr)
child1ID, _ := child1.ID()
child1.SetSplitID(splitID)

child2 := generateObjectWithCID(t, cnr)
child2ID, _ := child2.ID()
child2.SetSplitID(splitID)
child2.SetPreviousID(child1ID)

child3 := generateObjectWithCID(t, cnr)
child3ID, _ := child3.ID()
child3.SetSplitID(splitID)
child3.SetPreviousID(child2ID)
child3.SetParent(parent)
child3.SetParentID(parentID)

link := generateObjectWithCID(t, cnr)
link.SetParent(parent)
link.SetParentID(parentID)
link.SetChildren(child1ID, child2ID, child3ID)
link.SetSplitID(splitID)

require.NoError(t, Put(e, child1))
require.NoError(t, Put(e, child2))
require.NoError(t, Put(e, child3))
require.NoError(t, Put(e, link))

e.HandleNewEpoch(currEpoch + 1)

checkObjectsAsyncRemoval(t, e, cnr, child1ID, child2ID, child3ID)
})

t.Run("V2", func(t *testing.T) {
cnr := cidtest.ID()

parent := generateObjectWithCID(t, cnr)
parentID, _ := parent.ID()
parent.SetAttributes(expAttr)

child1 := generateObjectWithCID(t, cnr)
child1ID, _ := child1.ID()
child1.SetParent(parent)

child2 := generateObjectWithCID(t, cnr)
child2ID, _ := child2.ID()
child2.SetFirstID(child1ID)
child2.SetPreviousID(child1ID)

child3 := generateObjectWithCID(t, cnr)
child3ID, _ := child3.ID()
child3.SetFirstID(child1ID)
child3.SetPreviousID(child2ID)
child3.SetParent(parent)
child3.SetParentID(parentID)

children := make([]objectSDK.MeasuredObject, 3)
children[0].SetObjectID(child1ID)
children[1].SetObjectID(child2ID)
children[2].SetObjectID(child3ID)

var link objectSDK.Link
link.SetObjects(children)

var linkObj objectSDK.Object
linkObj.WriteLink(link)
linkObj.SetContainerID(cnr)
linkObj.SetParent(parent)
linkObj.SetParentID(parentID)
linkObj.SetFirstID(child1ID)
linkObj.SetOwnerID(&owner)
linkObj.CalculateAndSetPayloadChecksum()
require.NoError(t, linkObj.CalculateAndSetID())

require.NoError(t, Put(e, child1))
require.NoError(t, Put(e, child2))
require.NoError(t, Put(e, child3))
require.NoError(t, Put(e, &linkObj))

e.HandleNewEpoch(currEpoch + 1)

checkObjectsAsyncRemoval(t, e, cnr, child1ID, child2ID, child3ID)
})
}

func checkObjectsAsyncRemoval(t *testing.T, e *StorageEngine, cnr cid.ID, objs ...oid.ID) {
require.Eventually(t, func() bool {
var addr oid.Address
addr.SetContainer(cnr)

for _, obj := range objs {
addr.SetObject(obj)

_, err := Get(e, addr)
if !errors.As(err, new(statusSDK.ObjectNotFound)) {
return false
}
}

return true
}, 1*time.Second, 100*time.Millisecond)
}
11 changes: 11 additions & 0 deletions pkg/local_object_storage/engine/inhume.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func (e *StorageEngine) inhumeAddr(addr oid.Address, prm shard.InhumePrm) (bool,
// see if the object is root
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
existPrm.SetAddress(addr)
existPrm.IgnoreExpiration()

res, err := sh.Exists(existPrm)
if err != nil {
Expand Down Expand Up @@ -321,6 +322,16 @@ func (e *StorageEngine) isLocked(addr oid.Address) (bool, error) {
return locked, outErr
}

func (e *StorageEngine) processExpiredObjects(_ context.Context, addrs []oid.Address) {
var prm InhumePrm
prm.MarkAsGarbage(addrs...)

_, err := e.Inhume(prm)
if err != nil {
e.log.Warn("handling expired objects", zap.Error(err))
}
}

func (e *StorageEngine) processExpiredTombstones(ctx context.Context, addrs []meta.TombstonedObject) {
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
sh.HandleExpiredTombstones(addrs)
Expand Down
1 change: 1 addition & 0 deletions pkg/local_object_storage/engine/shards.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func (e *StorageEngine) createShard(opts []shard.Option) (*shard.Shard, error) {

sh := shard.New(append(opts,
shard.WithID(id),
shard.WithExpiredObjectsCallback(e.processExpiredObjects),
shard.WithExpiredTombstonesCallback(e.processExpiredTombstones),
shard.WithExpiredLocksCallback(e.processExpiredLocks),
shard.WithDeletedLockCallback(e.processDeletedLocks),
Expand Down
Loading

0 comments on commit 690c64d

Please sign in to comment.