Skip to content

Commit

Permalink
object/put: fix concurrent PUT data corruption
Browse files Browse the repository at this point in the history
If ants pool is busy and cannot take task, early `return` without `wg.Wait()`
leads to `iterateNodesForObject`'s `return` and all the buffers for binary
replication from now may be reused while are still in use by the other routines
inside the pool. Wait for WG before any `return` is called. Closes #2978,
closes #2988, closes #2975, closes #2971.

Signed-off-by: Pavel Karpy <[email protected]>
  • Loading branch information
carpawell committed Nov 25, 2024
1 parent 339b4cb commit 459406e
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ attribute, which is used for container domain name in NNS contracts (#2954)
- Panic in event listener related to inability to switch RPC node (#2970)
- Non-container nodes never check placement policy on PUT, SEARCH requests (#3014)
- If shards are overloaded with PUT requests, operation is not skipped but waits for 30 seconds (#2871)
- Data corruption if PUT is done too concurrently (#2978)

### Changed
- `ObjectService`'s `Put` RPC handler caches up to 10K lists of per-object sorted container nodes (#2901)
Expand Down
40 changes: 40 additions & 0 deletions pkg/services/object/put/distibuted_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strconv"
"sync"
"testing"
"time"

cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
Expand Down Expand Up @@ -492,4 +493,43 @@ func TestIterateNodesForObject(t *testing.T) {
cnrNodes[1][0].PublicKey(), cnrNodes[1][1].PublicKey(),
})
})
t.Run("return only after worker pool finished", func(t *testing.T) {
objID := oidtest.ID()
cnrNodes := allocNodes([]uint{2, 3, 1})
poolErr := errors.New("pool err")
iter := placementIterator{
log: zap.NewNop(),
neoFSNet: new(testNetwork),
remotePool: &testWorkerPool{
err: poolErr,
nFail: 2,
},
containerNodes: testContainerNodes{
objID: objID,
cnrNodes: cnrNodes,
primCounts: []uint{2, 3, 1},
},
}
blockCh := make(chan struct{})
returnCh := make(chan struct{})
go func() {
err := iter.iterateNodesForObject(objID, func(node nodeDesc) error {
<-blockCh
return nil
})
require.ErrorContains(t, err, poolErr.Error())
close(returnCh)
}()
select {
case <-returnCh:
t.Fatal("`iterateNodesForObject` is not synced with worker pools")
case <-time.After(time.Second / 2):
}
close(blockCh)
select {
case <-returnCh:
case <-time.After(10 * time.Second):
t.Fatal("unexpected test lock")
}
})
}
1 change: 1 addition & 0 deletions pkg/services/object/put/distributed.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ func (x placementIterator) iterateNodesForObject(obj oid.ID, f func(nodeDesc) er
if e, _ := lastRespErr.Load().(error); e != nil {
err = fmt.Errorf("%w (last node error: %w)", err, e)
}
wg.Wait()
return errIncompletePut{singleErr: err}
}
}
Expand Down

0 comments on commit 459406e

Please sign in to comment.