Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

object/put: fix concurrent PUT data corruption #3027

Merged
merged 3 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ attribute, which is used for container domain name in NNS contracts (#2954)
- Panic in event listener related to inability to switch RPC node (#2970)
- Non-container nodes never check placement policy on PUT, SEARCH requests (#3014)
- If shards are overloaded with PUT requests, operation is not skipped but waits for 30 seconds (#2871)
- Data corruption if PUT is done too concurrently (#2978)

### Changed
- `ObjectService`'s `Put` RPC handler caches up to 10K lists of per-object sorted container nodes (#2901)
Expand All @@ -49,6 +50,7 @@ attribute, which is used for container domain name in NNS contracts (#2954)
- Log sampling is disabled by default now (#3011)
- EACL is no longer considered for system role (#2972)
- Deprecate peapod substorage (#3013)
- Node does not stop trying to PUT an object if there are more PUT tasks than configured (#3027)

### Removed
- Support for node.key configuration (#2959)
Expand Down
42 changes: 40 additions & 2 deletions pkg/services/object/put/distibuted_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strconv"
"sync"
"testing"
"time"

cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
Expand Down Expand Up @@ -364,8 +365,7 @@ func TestIterateNodesForObject(t *testing.T) {
cnrNodes[1][0].PublicKey(), cnrNodes[1][1].PublicKey(),
})
require.EqualError(t, err, "incomplete object PUT by placement: "+
"submit next job to save an object to the worker pool: any worker pool error "+
"(last node error: any node error)")
"number of replicas cannot be met for list #1: 1 required, 0 nodes remaining (last node error: any node error)")
})
t.Run("not enough nodes a priori", func(t *testing.T) {
// nodes: [A B] [C D E] [F]
Expand Down Expand Up @@ -492,4 +492,42 @@ func TestIterateNodesForObject(t *testing.T) {
cnrNodes[1][0].PublicKey(), cnrNodes[1][1].PublicKey(),
})
})
t.Run("return only after worker pool finished", func(t *testing.T) {
objID := oidtest.ID()
cnrNodes := allocNodes([]uint{2, 3, 1})
iter := placementIterator{
log: zap.NewNop(),
neoFSNet: new(testNetwork),
remotePool: &testWorkerPool{
err: errors.New("pool err"),
nFail: 2,
},
containerNodes: testContainerNodes{
objID: objID,
cnrNodes: cnrNodes,
primCounts: []uint{2, 3, 1},
},
}
blockCh := make(chan struct{})
returnCh := make(chan struct{})
go func() {
err := iter.iterateNodesForObject(objID, func(node nodeDesc) error {
<-blockCh
return nil
})
require.EqualError(t, err, "incomplete object PUT by placement: number of replicas cannot be met for list #0: 1 required, 0 nodes remaining")
close(returnCh)
}()
select {
case <-returnCh:
t.Fatal("`iterateNodesForObject` is not synced with worker pools")
case <-time.After(time.Second / 2):
}
close(blockCh)
select {
case <-returnCh:
case <-time.After(10 * time.Second):
t.Fatal("unexpected test lock")
}
})
}
17 changes: 7 additions & 10 deletions pkg/services/object/put/distributed.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@
var err error
var nodeLists [][]netmap.NodeInfo
var replCounts []uint
var l = x.log.With(zap.Stringer("oid", obj))
if x.localOnly {
// TODO: although this particular case fits correctly into the general approach,
// much less actions can be done
Expand Down Expand Up @@ -288,7 +289,7 @@
}
// critical error that may ultimately block the storage service. Normally it
// should not appear because entry into the network map under strict control
x.log.Error("failed to decode network endpoints of the storage node from the network map, skip the node",
l.Error("failed to decode network endpoints of the storage node from the network map, skip the node",
zap.String("public key", netmap.StringifyPublicKey(nodeLists[listInd][j])), zap.Error(nr.convertErr))
if listLen-nodesCounters[listInd].processed-1 < replRem { // -1 includes current node failure
err = fmt.Errorf("%w (last node error: failed to decode network addresses: %w)",
Expand Down Expand Up @@ -321,17 +322,13 @@
processedNodesMtx.Unlock()
if err != nil {
lastRespErr.Store(err)
svcutil.LogServiceError(x.log, "PUT", nr.desc.info.AddressGroup(), err)
svcutil.LogServiceError(l, "PUT", nr.desc.info.AddressGroup(), err)
return
}
}); err != nil {
wg.Done()
svcutil.LogWorkerPoolError(x.log, "PUT", err)
err = fmt.Errorf("submit next job to save an object to the worker pool: %w", err)
if e, _ := lastRespErr.Load().(error); e != nil {
err = fmt.Errorf("%w (last node error: %w)", err, e)
}
return errIncompletePut{singleErr: err}
svcutil.LogWorkerPoolError(l, "PUT", err)
}
}
wg.Wait()
Expand Down Expand Up @@ -363,7 +360,7 @@
if nr.convertErr != nil {
// critical error that may ultimately block the storage service. Normally it
// should not appear because entry into the network map under strict control
x.log.Error("failed to decode network endpoints of the storage node from the network map, skip the node",
l.Error("failed to decode network endpoints of the storage node from the network map, skip the node",

Check warning on line 363 in pkg/services/object/put/distributed.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/put/distributed.go#L363

Added line #L363 was not covered by tests
zap.String("public key", netmap.StringifyPublicKey(nodeLists[i][j])), zap.Error(nr.convertErr))
continue // to send as many replicas as possible
}
Expand All @@ -382,12 +379,12 @@
nodeResults[pks] = nr
processedNodesMtx.Unlock()
if err != nil {
svcutil.LogServiceError(x.log, "PUT (extra broadcast)", nr.desc.info.AddressGroup(), err)
svcutil.LogServiceError(l, "PUT (extra broadcast)", nr.desc.info.AddressGroup(), err)
return
}
}); err != nil {
wg.Done()
svcutil.LogWorkerPoolError(x.log, "PUT (extra broadcast)", err)
svcutil.LogWorkerPoolError(l, "PUT (extra broadcast)", err)

Check warning on line 387 in pkg/services/object/put/distributed.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/put/distributed.go#L387

Added line #L387 was not covered by tests
break broadcast
}
}
Expand Down
Loading