diff --git a/CHANGELOG.md b/CHANGELOG.md index d40f0669ef..bf3dc70a05 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,7 @@ attribute, which is used for container domain name in NNS contracts (#2954) - Panic in event listener related to inability to switch RPC node (#2970) - Non-container nodes never check placement policy on PUT, SEARCH requests (#3014) - If shards are overloaded with PUT requests, operation is not skipped but waits for 30 seconds (#2871) +- Data corruption if PUT is done too concurrently (#2978) ### Changed - `ObjectService`'s `Put` RPC handler caches up to 10K lists of per-object sorted container nodes (#2901) @@ -49,6 +50,7 @@ attribute, which is used for container domain name in NNS contracts (#2954) - Log sampling is disabled by default now (#3011) - EACL is no longer considered for system role (#2972) - Deprecate peapod substorage (#3013) +- Node does not stop trying to PUT an object if there are more PUT tasks than configured (#3027) ### Removed - Support for node.key configuration (#2959) diff --git a/pkg/services/object/put/distibuted_test.go b/pkg/services/object/put/distibuted_test.go index af4b462e62..e03c37f8c3 100644 --- a/pkg/services/object/put/distibuted_test.go +++ b/pkg/services/object/put/distibuted_test.go @@ -7,6 +7,7 @@ import ( "strconv" "sync" "testing" + "time" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/netmap" @@ -364,8 +365,7 @@ func TestIterateNodesForObject(t *testing.T) { cnrNodes[1][0].PublicKey(), cnrNodes[1][1].PublicKey(), }) require.EqualError(t, err, "incomplete object PUT by placement: "+ - "submit next job to save an object to the worker pool: any worker pool error "+ - "(last node error: any node error)") + "number of replicas cannot be met for list #1: 1 required, 0 nodes remaining (last node error: any node error)") }) t.Run("not enough nodes a priori", func(t *testing.T) { // nodes: [A B] [C D E] [F] @@ -492,4 +492,42 @@ func TestIterateNodesForObject(t *testing.T) { cnrNodes[1][0].PublicKey(), cnrNodes[1][1].PublicKey(), }) }) + t.Run("return only after worker pool finished", func(t *testing.T) { + objID := oidtest.ID() + cnrNodes := allocNodes([]uint{2, 3, 1}) + iter := placementIterator{ + log: zap.NewNop(), + neoFSNet: new(testNetwork), + remotePool: &testWorkerPool{ + err: errors.New("pool err"), + nFail: 2, + }, + containerNodes: testContainerNodes{ + objID: objID, + cnrNodes: cnrNodes, + primCounts: []uint{2, 3, 1}, + }, + } + blockCh := make(chan struct{}) + returnCh := make(chan struct{}) + go func() { + err := iter.iterateNodesForObject(objID, func(node nodeDesc) error { + <-blockCh + return nil + }) + require.EqualError(t, err, "incomplete object PUT by placement: number of replicas cannot be met for list #0: 1 required, 0 nodes remaining") + close(returnCh) + }() + select { + case <-returnCh: + t.Fatal("`iterateNodesForObject` is not synced with worker pools") + case <-time.After(time.Second / 2): + } + close(blockCh) + select { + case <-returnCh: + case <-time.After(10 * time.Second): + t.Fatal("unexpected test lock") + } + }) } diff --git a/pkg/services/object/put/distributed.go b/pkg/services/object/put/distributed.go index 476bad12e7..a124ae80d8 100644 --- a/pkg/services/object/put/distributed.go +++ b/pkg/services/object/put/distributed.go @@ -214,6 +214,7 @@ func (x placementIterator) iterateNodesForObject(obj oid.ID, f func(nodeDesc) er var err error var nodeLists [][]netmap.NodeInfo var replCounts []uint + var l = x.log.With(zap.Stringer("oid", obj)) if x.localOnly { // TODO: although this particular case fits correctly into the general approach, // much less actions can be done @@ -288,7 +289,7 @@ func (x placementIterator) iterateNodesForObject(obj oid.ID, f func(nodeDesc) er } // critical error that may ultimately block the storage service. Normally it // should not appear because entry into the network map under strict control - x.log.Error("failed to decode network endpoints of the storage node from the network map, skip the node", + l.Error("failed to decode network endpoints of the storage node from the network map, skip the node", zap.String("public key", netmap.StringifyPublicKey(nodeLists[listInd][j])), zap.Error(nr.convertErr)) if listLen-nodesCounters[listInd].processed-1 < replRem { // -1 includes current node failure err = fmt.Errorf("%w (last node error: failed to decode network addresses: %w)", @@ -321,17 +322,13 @@ func (x placementIterator) iterateNodesForObject(obj oid.ID, f func(nodeDesc) er processedNodesMtx.Unlock() if err != nil { lastRespErr.Store(err) - svcutil.LogServiceError(x.log, "PUT", nr.desc.info.AddressGroup(), err) + svcutil.LogServiceError(l, "PUT", nr.desc.info.AddressGroup(), err) return } }); err != nil { wg.Done() - svcutil.LogWorkerPoolError(x.log, "PUT", err) err = fmt.Errorf("submit next job to save an object to the worker pool: %w", err) - if e, _ := lastRespErr.Load().(error); e != nil { - err = fmt.Errorf("%w (last node error: %w)", err, e) - } - return errIncompletePut{singleErr: err} + svcutil.LogWorkerPoolError(l, "PUT", err) } } wg.Wait() @@ -363,7 +360,7 @@ broadcast: if nr.convertErr != nil { // critical error that may ultimately block the storage service. Normally it // should not appear because entry into the network map under strict control - x.log.Error("failed to decode network endpoints of the storage node from the network map, skip the node", + l.Error("failed to decode network endpoints of the storage node from the network map, skip the node", zap.String("public key", netmap.StringifyPublicKey(nodeLists[i][j])), zap.Error(nr.convertErr)) continue // to send as many replicas as possible } @@ -382,12 +379,12 @@ broadcast: nodeResults[pks] = nr processedNodesMtx.Unlock() if err != nil { - svcutil.LogServiceError(x.log, "PUT (extra broadcast)", nr.desc.info.AddressGroup(), err) + svcutil.LogServiceError(l, "PUT (extra broadcast)", nr.desc.info.AddressGroup(), err) return } }); err != nil { wg.Done() - svcutil.LogWorkerPoolError(x.log, "PUT (extra broadcast)", err) + svcutil.LogWorkerPoolError(l, "PUT (extra broadcast)", err) break broadcast } }