Skip to content

Commit

Permalink
object/put: fix concurrent PUT data corruption (#3027)
Browse files Browse the repository at this point in the history
  • Loading branch information
roman-khimov authored Nov 25, 2024
2 parents 3f6d545 + 872baaf commit a6effcf
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 12 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ attribute, which is used for container domain name in NNS contracts (#2954)
- Panic in event listener related to inability to switch RPC node (#2970)
- Non-container nodes never check placement policy on PUT, SEARCH requests (#3014)
- If shards are overloaded with PUT requests, operation is not skipped but waits for 30 seconds (#2871)
- Data corruption if PUT is done too concurrently (#2978)

### Changed
- `ObjectService`'s `Put` RPC handler caches up to 10K lists of per-object sorted container nodes (#2901)
Expand All @@ -49,6 +50,7 @@ attribute, which is used for container domain name in NNS contracts (#2954)
- Log sampling is disabled by default now (#3011)
- EACL is no longer considered for system role (#2972)
- Deprecate peapod substorage (#3013)
- Node does not stop trying to PUT an object if there are more PUT tasks than configured (#3027)

### Removed
- Support for node.key configuration (#2959)
Expand Down
42 changes: 40 additions & 2 deletions pkg/services/object/put/distibuted_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strconv"
"sync"
"testing"
"time"

cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
Expand Down Expand Up @@ -364,8 +365,7 @@ func TestIterateNodesForObject(t *testing.T) {
cnrNodes[1][0].PublicKey(), cnrNodes[1][1].PublicKey(),
})
require.EqualError(t, err, "incomplete object PUT by placement: "+
"submit next job to save an object to the worker pool: any worker pool error "+
"(last node error: any node error)")
"number of replicas cannot be met for list #1: 1 required, 0 nodes remaining (last node error: any node error)")
})
t.Run("not enough nodes a priori", func(t *testing.T) {
// nodes: [A B] [C D E] [F]
Expand Down Expand Up @@ -492,4 +492,42 @@ func TestIterateNodesForObject(t *testing.T) {
cnrNodes[1][0].PublicKey(), cnrNodes[1][1].PublicKey(),
})
})
t.Run("return only after worker pool finished", func(t *testing.T) {
objID := oidtest.ID()
cnrNodes := allocNodes([]uint{2, 3, 1})
iter := placementIterator{
log: zap.NewNop(),
neoFSNet: new(testNetwork),
remotePool: &testWorkerPool{
err: errors.New("pool err"),
nFail: 2,
},
containerNodes: testContainerNodes{
objID: objID,
cnrNodes: cnrNodes,
primCounts: []uint{2, 3, 1},
},
}
blockCh := make(chan struct{})
returnCh := make(chan struct{})
go func() {
err := iter.iterateNodesForObject(objID, func(node nodeDesc) error {
<-blockCh
return nil
})
require.EqualError(t, err, "incomplete object PUT by placement: number of replicas cannot be met for list #0: 1 required, 0 nodes remaining")
close(returnCh)
}()
select {
case <-returnCh:
t.Fatal("`iterateNodesForObject` is not synced with worker pools")
case <-time.After(time.Second / 2):
}
close(blockCh)
select {
case <-returnCh:
case <-time.After(10 * time.Second):
t.Fatal("unexpected test lock")
}
})
}
17 changes: 7 additions & 10 deletions pkg/services/object/put/distributed.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ func (x placementIterator) iterateNodesForObject(obj oid.ID, f func(nodeDesc) er
var err error
var nodeLists [][]netmap.NodeInfo
var replCounts []uint
var l = x.log.With(zap.Stringer("oid", obj))
if x.localOnly {
// TODO: although this particular case fits correctly into the general approach,
// much less actions can be done
Expand Down Expand Up @@ -288,7 +289,7 @@ func (x placementIterator) iterateNodesForObject(obj oid.ID, f func(nodeDesc) er
}
// critical error that may ultimately block the storage service. Normally it
// should not appear because entry into the network map under strict control
x.log.Error("failed to decode network endpoints of the storage node from the network map, skip the node",
l.Error("failed to decode network endpoints of the storage node from the network map, skip the node",
zap.String("public key", netmap.StringifyPublicKey(nodeLists[listInd][j])), zap.Error(nr.convertErr))
if listLen-nodesCounters[listInd].processed-1 < replRem { // -1 includes current node failure
err = fmt.Errorf("%w (last node error: failed to decode network addresses: %w)",
Expand Down Expand Up @@ -321,17 +322,13 @@ func (x placementIterator) iterateNodesForObject(obj oid.ID, f func(nodeDesc) er
processedNodesMtx.Unlock()
if err != nil {
lastRespErr.Store(err)
svcutil.LogServiceError(x.log, "PUT", nr.desc.info.AddressGroup(), err)
svcutil.LogServiceError(l, "PUT", nr.desc.info.AddressGroup(), err)
return
}
}); err != nil {
wg.Done()
svcutil.LogWorkerPoolError(x.log, "PUT", err)
err = fmt.Errorf("submit next job to save an object to the worker pool: %w", err)
if e, _ := lastRespErr.Load().(error); e != nil {
err = fmt.Errorf("%w (last node error: %w)", err, e)
}
return errIncompletePut{singleErr: err}
svcutil.LogWorkerPoolError(l, "PUT", err)
}
}
wg.Wait()
Expand Down Expand Up @@ -363,7 +360,7 @@ broadcast:
if nr.convertErr != nil {
// critical error that may ultimately block the storage service. Normally it
// should not appear because entry into the network map under strict control
x.log.Error("failed to decode network endpoints of the storage node from the network map, skip the node",
l.Error("failed to decode network endpoints of the storage node from the network map, skip the node",
zap.String("public key", netmap.StringifyPublicKey(nodeLists[i][j])), zap.Error(nr.convertErr))
continue // to send as many replicas as possible
}
Expand All @@ -382,12 +379,12 @@ broadcast:
nodeResults[pks] = nr
processedNodesMtx.Unlock()
if err != nil {
svcutil.LogServiceError(x.log, "PUT (extra broadcast)", nr.desc.info.AddressGroup(), err)
svcutil.LogServiceError(l, "PUT (extra broadcast)", nr.desc.info.AddressGroup(), err)
return
}
}); err != nil {
wg.Done()
svcutil.LogWorkerPoolError(x.log, "PUT (extra broadcast)", err)
svcutil.LogWorkerPoolError(l, "PUT (extra broadcast)", err)
break broadcast
}
}
Expand Down

0 comments on commit a6effcf

Please sign in to comment.