Skip to content

Commit

Permalink
Merge pull request #8291 from filecoin-project/fix/fsm-input-locking
Browse files Browse the repository at this point in the history
fix: sealing fsm: Handle inputLk correctly
  • Loading branch information
arajasek authored Mar 11, 2022
2 parents 9fc6242 + ad66ad4 commit 362c73b
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 51 deletions.
16 changes: 12 additions & 4 deletions extern/sector-storage/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,11 +426,19 @@ func generateFakePoSt(sectorInfo []proof.SectorInfo, rpt func(abi.RegisteredSeal
}

func (mgr *SectorMgr) ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (mount.Reader, bool, error) {
if uint64(offset) != 0 {
panic("implme")
off := storiface.UnpaddedByteIndex(0)
var piece cid.Cid
for _, c := range mgr.sectors[sector.ID].pieces {
piece = c
if off >= offset {
break
}
off += storiface.UnpaddedByteIndex(len(mgr.pieces[piece]))
}

br := bytes.NewReader(mgr.pieces[mgr.sectors[sector.ID].pieces[0]][:size])
if off > offset {
panic("non-aligned offset todo")
}
br := bytes.NewReader(mgr.pieces[piece][:size])

return struct {
io.ReadCloser
Expand Down
24 changes: 11 additions & 13 deletions extern/storage-sealing/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,32 +315,29 @@ func (m *Sealing) SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPiec
m.inputLk.Unlock()

// we already have a pre-existing add piece call for this deal, let's wait for it to finish and see if it's successful
for {
res, err := waitAddPieceResp(ctx, pp)
if err != nil {
return api.SectorOffset{}, err
}
// there was an error waiting for a pre-existing add piece call, let's retry
if res.err != nil {
m.inputLk.Lock()
pp = m.addPendingPiece(ctx, size, data, deal, sp)
m.inputLk.Unlock()
continue
}
res, err := waitAddPieceResp(ctx, pp)
if err != nil {
return api.SectorOffset{}, err
}
if res.err == nil {
// all good, return the response
return api.SectorOffset{Sector: res.sn, Offset: res.offset.Padded()}, res.err
}
// if there was an error waiting for a pre-existing add piece call, let's retry
m.inputLk.Lock()
}

// addPendingPiece takes over m.inputLk
pp := m.addPendingPiece(ctx, size, data, deal, sp)
m.inputLk.Unlock()

res, err := waitAddPieceResp(ctx, pp)
if err != nil {
return api.SectorOffset{}, err
}
return api.SectorOffset{Sector: res.sn, Offset: res.offset.Padded()}, res.err
}

// called with m.inputLk; transfers the lock to another goroutine!
func (m *Sealing) addPendingPiece(ctx context.Context, size abi.UnpaddedPieceSize, data storage.Data, deal api.PieceDealInfo, sp abi.RegisteredSealProof) *pendingPiece {
doneCh := make(chan struct{})
pp := &pendingPiece{
Expand All @@ -357,6 +354,7 @@ func (m *Sealing) addPendingPiece(ctx context.Context, size abi.UnpaddedPieceSiz

m.pendingPieces[proposalCID(deal)] = pp
go func() {
defer m.inputLk.Unlock()
if err := m.updateInput(ctx, sp); err != nil {
log.Errorf("%+v", err)
}
Expand Down
27 changes: 16 additions & 11 deletions itests/batch_deal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,18 @@ import (

"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/stretchr/testify/require"

"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
"github.com/filecoin-project/lotus/itests/kit"
"github.com/filecoin-project/lotus/markets/storageadapter"
"github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/stretchr/testify/require"
)

func TestBatchDealInput(t *testing.T) {
t.Skip("this test is disabled as it's flaky: #4611")
kit.QuietMiningLogs()

var (
Expand Down Expand Up @@ -47,17 +49,20 @@ func TestBatchDealInput(t *testing.T) {
})),
node.Override(new(dtypes.GetSealingConfigFunc), func() (dtypes.GetSealingConfigFunc, error) {
return func() (sealiface.Config, error) {
return sealiface.Config{
MaxWaitDealsSectors: 2,
MaxSealingSectors: 1,
MaxSealingSectorsForDeals: 3,
AlwaysKeepUnsealedCopy: true,
WaitDealsDelay: time.Hour,
}, nil
sc := modules.ToSealingConfig(config.DefaultStorageMiner())
sc.MaxWaitDealsSectors = 2
sc.MaxSealingSectors = 1
sc.MaxSealingSectorsForDeals = 3
sc.AlwaysKeepUnsealedCopy = true
sc.WaitDealsDelay = time.Hour
sc.BatchPreCommits = false
sc.AggregateCommits = false

return sc, nil
}, nil
}),
))
client, miner, ens := kit.EnsembleMinimal(t, kit.MockProofs(), opts)
client, miner, ens := kit.EnsembleMinimal(t, kit.MockProofs(), opts, kit.ThroughRPC())
ens.InterconnectAll().BeginMining(blockTime)
dh := kit.NewDealHarness(t, client, miner, miner)

Expand Down Expand Up @@ -126,9 +131,9 @@ func TestBatchDealInput(t *testing.T) {
t.Run("4-p513B", run(513, 4, 2))
if !testing.Short() {
t.Run("32-p257B", run(257, 32, 8))
t.Run("32-p10B", run(10, 32, 2))

// fixme: this appears to break data-transfer / markets in some really creative ways
//t.Run("32-p10B", run(10, 32, 2))
// t.Run("128-p10B", run(10, 128, 8))
}
}
50 changes: 27 additions & 23 deletions itests/sector_miner_collateral_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
"github.com/filecoin-project/lotus/itests/kit"
"github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo"
)
Expand All @@ -40,29 +42,31 @@ func TestMinerBalanceCollateral(t *testing.T) {
opts := kit.ConstructorOpts(
node.ApplyIf(node.IsType(repo.StorageMiner), node.Override(new(dtypes.GetSealingConfigFunc), func() (dtypes.GetSealingConfigFunc, error) {
return func() (sealiface.Config, error) {
return sealiface.Config{
MaxWaitDealsSectors: 4,
MaxSealingSectors: 4,
MaxSealingSectorsForDeals: 4,
AlwaysKeepUnsealedCopy: true,
WaitDealsDelay: time.Hour,

BatchPreCommits: batching,
AggregateCommits: batching,

PreCommitBatchWait: time.Hour,
CommitBatchWait: time.Hour,

MinCommitBatch: nSectors,
MaxPreCommitBatch: nSectors,
MaxCommitBatch: nSectors,

CollateralFromMinerBalance: enabled,
AvailableBalanceBuffer: big.Zero(),
DisableCollateralFallback: false,
AggregateAboveBaseFee: big.Zero(),
BatchPreCommitAboveBaseFee: big.Zero(),
}, nil
sc := modules.ToSealingConfig(config.DefaultStorageMiner())

sc.MaxWaitDealsSectors = 4
sc.MaxSealingSectors = 4
sc.MaxSealingSectorsForDeals = 4
sc.AlwaysKeepUnsealedCopy = true
sc.WaitDealsDelay = time.Hour

sc.BatchPreCommits = batching
sc.AggregateCommits = batching

sc.PreCommitBatchWait = time.Hour
sc.CommitBatchWait = time.Hour

sc.MinCommitBatch = nSectors
sc.MaxPreCommitBatch = nSectors
sc.MaxCommitBatch = nSectors

sc.CollateralFromMinerBalance = enabled
sc.AvailableBalanceBuffer = big.Zero()
sc.DisableCollateralFallback = false
sc.AggregateAboveBaseFee = big.Zero()
sc.BatchPreCommitAboveBaseFee = big.Zero()

return sc, nil
}, nil
})),
)
Expand Down

0 comments on commit 362c73b

Please sign in to comment.