From c8d603557b3664bcbb7f490a191194daace173d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 14 Apr 2021 20:26:07 +0200 Subject: [PATCH 1/5] storagefsm: Fix batch deal packing behavior --- api/test/deals.go | 135 +++++++++++++++++--------------- cli/test/client.go | 4 +- extern/storage-sealing/fsm.go | 4 + extern/storage-sealing/input.go | 36 ++++++--- node/impl/storminer.go | 6 +- 5 files changed, 107 insertions(+), 78 deletions(-) diff --git a/api/test/deals.go b/api/test/deals.go index 7a9454bae38..0d25d90af4c 100644 --- a/api/test/deals.go +++ b/api/test/deals.go @@ -51,7 +51,7 @@ func TestDoubleDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, sta } func MakeDeal(t *testing.T, ctx context.Context, rseed int, client api.FullNode, miner TestStorageNode, carExport, fastRet bool, startEpoch abi.ChainEpoch) { - res, data, err := CreateClientFile(ctx, client, rseed) + res, data, err := CreateClientFile(ctx, client, rseed, 0) if err != nil { t.Fatal(err) } @@ -72,8 +72,11 @@ func MakeDeal(t *testing.T, ctx context.Context, rseed int, client api.FullNode, testRetrieval(t, ctx, client, fcid, &info.PieceCID, carExport, data) } -func CreateClientFile(ctx context.Context, client api.FullNode, rseed int) (*api.ImportRes, []byte, error) { - data := make([]byte, 1600) +func CreateClientFile(ctx context.Context, client api.FullNode, rseed, size int) (*api.ImportRes, []byte, error) { + if size == 0 { + size = 1600 + } + data := make([]byte, size) rand.New(rand.NewSource(int64(rseed))).Read(data) dir, err := ioutil.TempDir(os.TempDir(), "test-make-deal-") @@ -119,7 +122,7 @@ func TestPublishDealsBatching(t *testing.T, b APIBuilder, blocktime time.Duratio // Starts a deal and waits until it's published runDealTillPublish := func(rseed int) { - res, _, err := CreateClientFile(s.ctx, s.client, rseed) + res, _, err := CreateClientFile(s.ctx, s.client, rseed, 0) require.NoError(t, err) upds, err := client.ClientGetDealUpdates(s.ctx) @@ -186,68 +189,76 @@ func TestPublishDealsBatching(t *testing.T, b APIBuilder, blocktime time.Duratio } func TestBatchDealInput(t *testing.T, b APIBuilder, blocktime time.Duration, startEpoch abi.ChainEpoch) { - publishPeriod := 10 * time.Second - maxDealsPerMsg := uint64(4) - - // Set max deals per publish deals message to maxDealsPerMsg - minerDef := []StorageMiner{{ - Full: 0, - Opts: node.Options( - node.Override( - new(*storageadapter.DealPublisher), - storageadapter.NewDealPublisher(nil, storageadapter.PublishMsgConfig{ - Period: publishPeriod, - MaxDealsPerMsg: maxDealsPerMsg, - })), - node.Override(new(dtypes.GetSealingConfigFunc), func() (dtypes.GetSealingConfigFunc, error) { - return func() (sealiface.Config, error) { - return sealiface.Config{ - MaxWaitDealsSectors: 1, - MaxSealingSectors: 1, - MaxSealingSectorsForDeals: 2, - AlwaysKeepUnsealedCopy: true, - }, nil - }, nil - }), - ), - Preseal: PresealGenesis, - }} - - // Create a connect client and miner node - n, sn := b(t, OneFull, minerDef) - client := n[0].FullNode.(*impl.FullNodeAPI) - miner := sn[0] - s := connectAndStartMining(t, b, blocktime, client, miner) - defer s.blockMiner.Stop() - - // Starts a deal and waits until it's published - runDealTillSeal := func(rseed int) { - res, _, err := CreateClientFile(s.ctx, s.client, rseed) - require.NoError(t, err) + run := func(piece, deals, expectSectors int) func(t *testing.T) { + return func(t *testing.T) { + publishPeriod := 10 * time.Second + maxDealsPerMsg := uint64(deals) + + // Set max deals per publish deals message to maxDealsPerMsg + minerDef := []StorageMiner{{ + Full: 0, + Opts: node.Options( + node.Override( + new(*storageadapter.DealPublisher), + storageadapter.NewDealPublisher(nil, storageadapter.PublishMsgConfig{ + Period: publishPeriod, + MaxDealsPerMsg: maxDealsPerMsg, + })), + node.Override(new(dtypes.GetSealingConfigFunc), func() (dtypes.GetSealingConfigFunc, error) { + return func() (sealiface.Config, error) { + return sealiface.Config{ + MaxWaitDealsSectors: 1, + MaxSealingSectors: 1, + MaxSealingSectorsForDeals: 2, + AlwaysKeepUnsealedCopy: true, + }, nil + }, nil + }), + ), + Preseal: PresealGenesis, + }} + + // Create a connect client and miner node + n, sn := b(t, OneFull, minerDef) + client := n[0].FullNode.(*impl.FullNodeAPI) + miner := sn[0] + s := connectAndStartMining(t, b, blocktime, client, miner) + defer s.blockMiner.Stop() + + // Starts a deal and waits until it's published + runDealTillSeal := func(rseed int) { + res, _, err := CreateClientFile(s.ctx, s.client, rseed, piece) + require.NoError(t, err) + + dc := startDeal(t, s.ctx, s.miner, s.client, res.Root, false, startEpoch) + waitDealSealed(t, s.ctx, s.miner, s.client, dc, false) + } - dc := startDeal(t, s.ctx, s.miner, s.client, res.Root, false, startEpoch) - waitDealSealed(t, s.ctx, s.miner, s.client, dc, false) - } + // Run maxDealsPerMsg+1 deals in parallel + done := make(chan struct{}, maxDealsPerMsg+1) + for rseed := 1; rseed <= int(maxDealsPerMsg+1); rseed++ { + rseed := rseed + go func() { + runDealTillSeal(rseed) + done <- struct{}{} + }() + } - // Run maxDealsPerMsg+1 deals in parallel - done := make(chan struct{}, maxDealsPerMsg+1) - for rseed := 1; rseed <= int(maxDealsPerMsg+1); rseed++ { - rseed := rseed - go func() { - runDealTillSeal(rseed) - done <- struct{}{} - }() - } + // Wait for maxDealsPerMsg of the deals to be published + for i := 0; i < int(maxDealsPerMsg); i++ { + <-done + } - // Wait for maxDealsPerMsg of the deals to be published - for i := 0; i < int(maxDealsPerMsg); i++ { - <-done + sl, err := sn[0].SectorsList(s.ctx) + require.NoError(t, err) + require.GreaterOrEqual(t, len(sl), expectSectors) + require.LessOrEqual(t, len(sl), expectSectors+1) + } } - sl, err := sn[0].SectorsList(s.ctx) - require.NoError(t, err) - require.GreaterOrEqual(t, len(sl), 4) - require.LessOrEqual(t, len(sl), 5) + t.Run("4-p1600B", run(1600, 4, 4)) + t.Run("4-p513B", run(513, 4, 2)) + t.Run("32-p257B", run(257, 32, 8)) } func TestFastRetrievalDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, startEpoch abi.ChainEpoch) { @@ -430,7 +441,7 @@ func startSealingWaiting(t *testing.T, ctx context.Context, miner TestStorageNod si, err := miner.SectorsStatus(ctx, snum, false) require.NoError(t, err) - t.Logf("Sector state: %s", si.State) + t.Logf("Sector %d state: %s", snum, si.State) if si.State == api.SectorState(sealing.WaitDeals) { require.NoError(t, miner.SectorStartSealing(ctx, snum)) } diff --git a/cli/test/client.go b/cli/test/client.go index 4a49f732a45..2a48b7b6443 100644 --- a/cli/test/client.go +++ b/cli/test/client.go @@ -44,7 +44,7 @@ func RunClientTest(t *testing.T, cmds []*lcli.Command, clientNode test.TestNode) // Create a deal (non-interactive) // client deal --start-epoch= 1000000attofil - res, _, err := test.CreateClientFile(ctx, clientNode, 1) + res, _, err := test.CreateClientFile(ctx, clientNode, 1, 0) require.NoError(t, err) startEpoch := fmt.Sprintf("--start-epoch=%d", 2<<12) dataCid := res.Root @@ -60,7 +60,7 @@ func RunClientTest(t *testing.T, cmds []*lcli.Command, clientNode test.TestNode) // // "no" (verified client) // "yes" (confirm deal) - res, _, err = test.CreateClientFile(ctx, clientNode, 2) + res, _, err = test.CreateClientFile(ctx, clientNode, 2, 0) require.NoError(t, err) dataCid2 := res.Root duration = fmt.Sprintf("%d", build.MinDealDuration/builtin.EpochsInDay) diff --git a/extern/storage-sealing/fsm.go b/extern/storage-sealing/fsm.go index 8726fe2f885..a67caf58567 100644 --- a/extern/storage-sealing/fsm.go +++ b/extern/storage-sealing/fsm.go @@ -51,6 +51,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto AddPiece: planOne( on(SectorPieceAdded{}, WaitDeals), apply(SectorStartPacking{}), + apply(SectorAddPiece{}), on(SectorAddPieceFailed{}, AddPieceFailed), ), Packing: planOne(on(SectorPacked{}, GetTicket)), @@ -534,6 +535,7 @@ func onReturning(mut mutator) func() (mutator, func(*SectorInfo) (bool, error)) func planOne(ts ...func() (mut mutator, next func(*SectorInfo) (more bool, err error))) func(events []statemachine.Event, state *SectorInfo) (uint64, error) { return func(events []statemachine.Event, state *SectorInfo) (uint64, error) { + eloop: for i, event := range events { if gm, ok := event.User.(globalMutator); ok { gm.applyGlobal(state) @@ -556,6 +558,8 @@ func planOne(ts ...func() (mut mutator, next func(*SectorInfo) (more bool, err e if err != nil || !more { return uint64(i + 1), err } + + continue eloop } _, ok := event.User.(Ignorable) diff --git a/extern/storage-sealing/input.go b/extern/storage-sealing/input.go index 44d2e8275b4..3ac362e7879 100644 --- a/extern/storage-sealing/input.go +++ b/extern/storage-sealing/input.go @@ -27,6 +27,14 @@ func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) e m.inputLk.Lock() + sid := m.minerSectorID(sector.SectorNumber) + + if len(m.assignedPieces[sid]) > 0 { + m.inputLk.Unlock() + // got assigned more pieces in the AddPiece state + return ctx.Send(SectorAddPiece{}) + } + started, err := m.maybeStartSealing(ctx, sector, used) if err != nil || started { delete(m.openSectors, m.minerSectorID(sector.SectorNumber)) @@ -36,16 +44,16 @@ func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) e return err } - m.openSectors[m.minerSectorID(sector.SectorNumber)] = &openSector{ - used: used, - maybeAccept: func(cid cid.Cid) error { - // todo check deal start deadline (configurable) + if _, has := m.openSectors[sid]; !has { + m.openSectors[sid] = &openSector{ + used: used, + maybeAccept: func(cid cid.Cid) error { + // todo check deal start deadline (configurable) + m.assignedPieces[sid] = append(m.assignedPieces[sid], cid) - sid := m.minerSectorID(sector.SectorNumber) - m.assignedPieces[sid] = append(m.assignedPieces[sid], cid) - - return ctx.Send(SectorAddPiece{}) - }, + return ctx.Send(SectorAddPiece{}) + }, + } } go func() { @@ -350,11 +358,19 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e continue } + avail := abi.PaddedPieceSize(ssize).Unpadded() - m.openSectors[mt.sector].used + + if mt.size > avail { + continue + } + err := m.openSectors[mt.sector].maybeAccept(mt.deal) if err != nil { m.pendingPieces[mt.deal].accepted(mt.sector.Number, 0, err) // non-error case in handleAddPiece } + m.openSectors[mt.sector].used += mt.padding + mt.size + m.pendingPieces[mt.deal].assigned = true delete(toAssign, mt.deal) @@ -362,8 +378,6 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e log.Errorf("sector %d rejected deal %s: %+v", mt.sector, mt.deal, err) continue } - - delete(m.openSectors, mt.sector) } if len(toAssign) > 0 { diff --git a/node/impl/storminer.go b/node/impl/storminer.go index 27ab1af5f8a..553f5e45933 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -244,13 +244,13 @@ func (sm *StorageMinerAPI) SectorsList(context.Context) ([]abi.SectorNumber, err return nil, err } - out := make([]abi.SectorNumber, len(sectors)) - for i, sector := range sectors { + out := make([]abi.SectorNumber, 0, len(sectors)) + for _, sector := range sectors { if sector.State == sealing.UndefinedSectorState { continue // sector ID not set yet } - out[i] = sector.SectorNumber + out = append(out, sector.SectorNumber) } return out, nil } From 9475079b9761a7381fa57885a4a5a7ce72cf488f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sun, 30 May 2021 15:13:38 +0200 Subject: [PATCH 2/5] Make batch deal input test less flaky --- api/test/deals.go | 73 +++++++++++++++---- api/test/mining.go | 2 +- extern/storage-sealing/fsm.go | 2 + extern/storage-sealing/input.go | 1 + extern/storage-sealing/states_sealing.go | 2 +- .../storageadapter/ondealsectorcommitted.go | 3 + node/node_test.go | 1 + 7 files changed, 68 insertions(+), 16 deletions(-) diff --git a/api/test/deals.go b/api/test/deals.go index 0d25d90af4c..8cd639efda5 100644 --- a/api/test/deals.go +++ b/api/test/deals.go @@ -8,6 +8,7 @@ import ( "math/rand" "os" "path/filepath" + "sort" "testing" "time" @@ -63,7 +64,7 @@ func MakeDeal(t *testing.T, ctx context.Context, rseed int, client api.FullNode, // TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this time.Sleep(time.Second) - waitDealSealed(t, ctx, miner, client, deal, false) + waitDealSealed(t, ctx, miner, client, deal, false, false, nil) // Retrieval info, err := client.ClientGetDealInfo(ctx, *deal) @@ -207,10 +208,11 @@ func TestBatchDealInput(t *testing.T, b APIBuilder, blocktime time.Duration, sta node.Override(new(dtypes.GetSealingConfigFunc), func() (dtypes.GetSealingConfigFunc, error) { return func() (sealiface.Config, error) { return sealiface.Config{ - MaxWaitDealsSectors: 1, + MaxWaitDealsSectors: 2, MaxSealingSectors: 1, - MaxSealingSectorsForDeals: 2, + MaxSealingSectorsForDeals: 3, AlwaysKeepUnsealedCopy: true, + WaitDealsDelay: time.Hour, }, nil }, nil }), @@ -225,18 +227,40 @@ func TestBatchDealInput(t *testing.T, b APIBuilder, blocktime time.Duration, sta s := connectAndStartMining(t, b, blocktime, client, miner) defer s.blockMiner.Stop() + checkNoPadding := func() { + sl, err := sn[0].SectorsList(s.ctx) + require.NoError(t, err) + + sort.Slice(sl, func(i, j int) bool { + return sl[i] < sl[j] + }) + + for _, snum := range sl { + si, err := sn[0].SectorsStatus(s.ctx, snum, false) + require.NoError(t, err) + + fmt.Printf("S %d: %+v %s\n", snum, si.Deals, si.State) + + for _, deal := range si.Deals { + if deal == 0 { + fmt.Printf("sector %d had a padding piece!\n", snum) + } + } + } + } + // Starts a deal and waits until it's published runDealTillSeal := func(rseed int) { res, _, err := CreateClientFile(s.ctx, s.client, rseed, piece) require.NoError(t, err) dc := startDeal(t, s.ctx, s.miner, s.client, res.Root, false, startEpoch) - waitDealSealed(t, s.ctx, s.miner, s.client, dc, false) + waitDealSealed(t, s.ctx, s.miner, s.client, dc, false, true, checkNoPadding) } - // Run maxDealsPerMsg+1 deals in parallel - done := make(chan struct{}, maxDealsPerMsg+1) - for rseed := 1; rseed <= int(maxDealsPerMsg+1); rseed++ { + // Run maxDealsPerMsg deals in parallel + done := make(chan struct{}, maxDealsPerMsg) + for rseed := 0; rseed < int(maxDealsPerMsg); rseed++ { rseed := rseed go func() { runDealTillSeal(rseed) @@ -249,10 +273,12 @@ func TestBatchDealInput(t *testing.T, b APIBuilder, blocktime time.Duration, sta <-done } + checkNoPadding() + sl, err := sn[0].SectorsList(s.ctx) require.NoError(t, err) - require.GreaterOrEqual(t, len(sl), expectSectors) - require.LessOrEqual(t, len(sl), expectSectors+1) + require.Equal(t, len(sl), expectSectors) + //require.LessOrEqual(t, len(sl), expectSectors+1) } } @@ -314,12 +340,12 @@ func TestSecondDealRetrieval(t *testing.T, b APIBuilder, blocktime time.Duration // TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this time.Sleep(time.Second) - waitDealSealed(t, s.ctx, s.miner, s.client, deal1, true) + waitDealSealed(t, s.ctx, s.miner, s.client, deal1, true, false, nil) deal2 := startDeal(t, s.ctx, s.miner, s.client, fcid2, true, 0) time.Sleep(time.Second) - waitDealSealed(t, s.ctx, s.miner, s.client, deal2, false) + waitDealSealed(t, s.ctx, s.miner, s.client, deal2, false, false, nil) // Retrieval info, err := s.client.ClientGetDealInfo(s.ctx, *deal2) @@ -375,7 +401,7 @@ func startDeal(t *testing.T, ctx context.Context, miner TestStorageNode, client return deal } -func waitDealSealed(t *testing.T, ctx context.Context, miner TestStorageNode, client api.FullNode, deal *cid.Cid, noseal bool) { +func waitDealSealed(t *testing.T, ctx context.Context, miner TestStorageNode, client api.FullNode, deal *cid.Cid, noseal, noSealStart bool, cb func()) { loop: for { di, err := client.ClientGetDealInfo(ctx, *deal) @@ -387,7 +413,9 @@ loop: if noseal { return } - startSealingWaiting(t, ctx, miner) + if !noSealStart { + startSealingWaiting(t, ctx, miner) + } case storagemarket.StorageDealProposalRejected: t.Fatal("deal rejected") case storagemarket.StorageDealFailing: @@ -398,8 +426,25 @@ loop: fmt.Println("COMPLETE", di) break loop } - fmt.Println("Deal state: ", storagemarket.DealStates[di.State]) + + mds, err := miner.MarketListIncompleteDeals(ctx) + if err != nil { + t.Fatal(err) + } + + var minerState storagemarket.StorageDealStatus + for _, md := range mds { + if md.DealID == di.DealID { + minerState = md.State + break + } + } + + fmt.Printf("Deal %d state: client:%s provider:%s\n", di.DealID, storagemarket.DealStates[di.State], storagemarket.DealStates[minerState]) time.Sleep(time.Second / 2) + if cb != nil { + cb() + } } } diff --git a/api/test/mining.go b/api/test/mining.go index 4a4f1e1a492..d6dea9abfd5 100644 --- a/api/test/mining.go +++ b/api/test/mining.go @@ -194,7 +194,7 @@ func TestDealMining(t *testing.T, b APIBuilder, blocktime time.Duration, carExpo // TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this time.Sleep(time.Second) - waitDealSealed(t, ctx, provider, client, deal, false) + waitDealSealed(t, ctx, provider, client, deal, false, false, nil) <-minedTwo diff --git a/extern/storage-sealing/fsm.go b/extern/storage-sealing/fsm.go index a67caf58567..b5b6562dc43 100644 --- a/extern/storage-sealing/fsm.go +++ b/extern/storage-sealing/fsm.go @@ -194,6 +194,8 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto func (m *Sealing) logEvents(events []statemachine.Event, state *SectorInfo) { for _, event := range events { + log.Debugw("sector event", "sector", state.SectorNumber, "type", fmt.Sprintf("%T", event.User), "event", event.User) + e, err := json.Marshal(event) if err != nil { log.Errorf("marshaling event for logging: %+v", err) diff --git a/extern/storage-sealing/input.go b/extern/storage-sealing/input.go index 3ac362e7879..8ddfd44ccdb 100644 --- a/extern/storage-sealing/input.go +++ b/extern/storage-sealing/input.go @@ -436,6 +436,7 @@ func (m *Sealing) createSector(ctx context.Context, cfg sealiface.Config, sp abi } func (m *Sealing) StartPacking(sid abi.SectorNumber) error { + log.Infow("starting to seal deal sector", "sector", sid, "trigger", "user") return m.sectors.Send(uint64(sid), SectorStartPacking{}) } diff --git a/extern/storage-sealing/states_sealing.go b/extern/storage-sealing/states_sealing.go index c55dd200517..e5449b42221 100644 --- a/extern/storage-sealing/states_sealing.go +++ b/extern/storage-sealing/states_sealing.go @@ -35,7 +35,7 @@ func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) err } // todo: return to the sealing queue (this is extremely unlikely to happen) - pp.accepted(sector.SectorNumber, 0, xerrors.Errorf("sector entered packing state early")) + pp.accepted(sector.SectorNumber, 0, xerrors.Errorf("sector %d entered packing state early", sector.SectorNumber)) } delete(m.openSectors, m.minerSectorID(sector.SectorNumber)) diff --git a/markets/storageadapter/ondealsectorcommitted.go b/markets/storageadapter/ondealsectorcommitted.go index b5f9c7510c3..00697e7e4c8 100644 --- a/markets/storageadapter/ondealsectorcommitted.go +++ b/markets/storageadapter/ondealsectorcommitted.go @@ -103,6 +103,8 @@ func (mgr *SectorCommittedManager) OnDealSectorPreCommitted(ctx context.Context, } } + log.Infow("sub precommit", "deal", dealInfo.DealID) + // Not yet active, start matching against incoming messages return false, true, nil } @@ -154,6 +156,7 @@ func (mgr *SectorCommittedManager) OnDealSectorPreCommitted(ctx context.Context, for _, did := range params.DealIDs { if did == res.DealID { // Found the deal ID in this message. Callback with the sector ID. + log.Infow("deal precommit found", "deal", res.DealID, "sector", params.SectorNumber) cb(params.SectorNumber, false, nil) return false, nil } diff --git a/node/node_test.go b/node/node_test.go index 45a5b7f57ca..6a58b02f03b 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -66,6 +66,7 @@ func TestBatchDealInput(t *testing.T) { logging.SetLogLevel("chain", "ERROR") logging.SetLogLevel("sub", "ERROR") logging.SetLogLevel("storageminer", "ERROR") + logging.SetLogLevel("sectors", "DEBUG") blockTime := 10 * time.Millisecond From 6e1919c67fe7218b0fa71861411b473a0af01b1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sun, 30 May 2021 18:30:38 +0200 Subject: [PATCH 3/5] storagefsm: Fix race spawning more than one new sector at once --- extern/storage-sealing/input.go | 10 ++++++++++ extern/storage-sealing/sealing.go | 1 + 2 files changed, 11 insertions(+) diff --git a/extern/storage-sealing/input.go b/extern/storage-sealing/input.go index 8ddfd44ccdb..bf66382d370 100644 --- a/extern/storage-sealing/input.go +++ b/extern/storage-sealing/input.go @@ -27,6 +27,10 @@ func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) e m.inputLk.Lock() + if m.creating != nil && *m.creating == sector.SectorNumber { + m.creating = nil + } + sid := m.minerSectorID(sector.SectorNumber) if len(m.assignedPieces[sid]) > 0 { @@ -390,6 +394,10 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e } func (m *Sealing) tryCreateDealSector(ctx context.Context, sp abi.RegisteredSealProof) error { + if m.creating != nil { + return nil // new sector is being created right now + } + cfg, err := m.getConfig() if err != nil { return xerrors.Errorf("getting storage config: %w", err) @@ -408,6 +416,8 @@ func (m *Sealing) tryCreateDealSector(ctx context.Context, sp abi.RegisteredSeal return err } + m.creating = &sid + log.Infow("Creating sector", "number", sid, "type", "deal", "proofType", sp) return m.sectors.Send(uint64(sid), SectorStart{ ID: sid, diff --git a/extern/storage-sealing/sealing.go b/extern/storage-sealing/sealing.go index 8feca3b7b11..7c118901b68 100644 --- a/extern/storage-sealing/sealing.go +++ b/extern/storage-sealing/sealing.go @@ -93,6 +93,7 @@ type Sealing struct { sectorTimers map[abi.SectorID]*time.Timer pendingPieces map[cid.Cid]*pendingPiece assignedPieces map[abi.SectorID][]cid.Cid + creating *abi.SectorNumber // used to prevent a race where we could create a new sector more than once upgradeLk sync.Mutex toUpgrade map[abi.SectorNumber]struct{} From f3bf7731525fb0c078c1eb3d2230dd706911d0e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sun, 30 May 2021 19:24:42 +0200 Subject: [PATCH 4/5] storagefsm: Fix too-long log handling --- extern/storage-sealing/fsm.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/extern/storage-sealing/fsm.go b/extern/storage-sealing/fsm.go index b5b6562dc43..a3b0db1c4e3 100644 --- a/extern/storage-sealing/fsm.go +++ b/extern/storage-sealing/fsm.go @@ -206,6 +206,10 @@ func (m *Sealing) logEvents(events []statemachine.Event, state *SectorInfo) { continue // don't log on every fsm restart } + if len(e) > 8000 { + e = []byte(string(e[:8000]) + "... truncated") + } + l := Log{ Timestamp: uint64(time.Now().Unix()), Message: string(e), From 670913261c896c3a175eb0b3552944653bb9e199 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sun, 30 May 2021 19:26:53 +0200 Subject: [PATCH 5/5] Self-review cleanup --- api/test/deals.go | 15 ++++++++++++--- markets/storageadapter/ondealsectorcommitted.go | 3 --- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/api/test/deals.go b/api/test/deals.go index 8cd639efda5..d930016922f 100644 --- a/api/test/deals.go +++ b/api/test/deals.go @@ -19,6 +19,7 @@ import ( "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/actors/builtin/market" @@ -227,6 +228,9 @@ func TestBatchDealInput(t *testing.T, b APIBuilder, blocktime time.Duration, sta s := connectAndStartMining(t, b, blocktime, client, miner) defer s.blockMiner.Stop() + err := miner.MarketSetAsk(s.ctx, big.Zero(), big.Zero(), 200, 128, 32<<30) + require.NoError(t, err) + checkNoPadding := func() { sl, err := sn[0].SectorsList(s.ctx) require.NoError(t, err) @@ -239,7 +243,7 @@ func TestBatchDealInput(t *testing.T, b APIBuilder, blocktime time.Duration, sta si, err := sn[0].SectorsStatus(s.ctx, snum, false) require.NoError(t, err) - fmt.Printf("S %d: %+v %s\n", snum, si.Deals, si.State) + // fmt.Printf("S %d: %+v %s\n", snum, si.Deals, si.State) for _, deal := range si.Deals { if deal == 0 { @@ -278,13 +282,18 @@ func TestBatchDealInput(t *testing.T, b APIBuilder, blocktime time.Duration, sta sl, err := sn[0].SectorsList(s.ctx) require.NoError(t, err) require.Equal(t, len(sl), expectSectors) - //require.LessOrEqual(t, len(sl), expectSectors+1) } } t.Run("4-p1600B", run(1600, 4, 4)) t.Run("4-p513B", run(513, 4, 2)) - t.Run("32-p257B", run(257, 32, 8)) + if !testing.Short() { + t.Run("32-p257B", run(257, 32, 8)) + t.Run("32-p10B", run(10, 32, 2)) + + // fixme: this appears to break data-transfer / markets in some really creative ways + //t.Run("128-p10B", run(10, 128, 8)) + } } func TestFastRetrievalDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, startEpoch abi.ChainEpoch) { diff --git a/markets/storageadapter/ondealsectorcommitted.go b/markets/storageadapter/ondealsectorcommitted.go index 00697e7e4c8..b5f9c7510c3 100644 --- a/markets/storageadapter/ondealsectorcommitted.go +++ b/markets/storageadapter/ondealsectorcommitted.go @@ -103,8 +103,6 @@ func (mgr *SectorCommittedManager) OnDealSectorPreCommitted(ctx context.Context, } } - log.Infow("sub precommit", "deal", dealInfo.DealID) - // Not yet active, start matching against incoming messages return false, true, nil } @@ -156,7 +154,6 @@ func (mgr *SectorCommittedManager) OnDealSectorPreCommitted(ctx context.Context, for _, did := range params.DealIDs { if did == res.DealID { // Found the deal ID in this message. Callback with the sector ID. - log.Infow("deal precommit found", "deal", res.DealID, "sector", params.SectorNumber) cb(params.SectorNumber, false, nil) return false, nil }