From 239d6f8f4dcc71ae2cc33e25dac7d1f6b3f704f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 18 Jan 2021 14:26:03 +0100 Subject: [PATCH 01/15] storagefsm: Rewrite input handling --- cmd/lotus-storage-miner/info.go | 1 + extern/storage-sealing/fsm.go | 81 ++---- extern/storage-sealing/fsm_events.go | 13 +- extern/storage-sealing/input.go | 308 +++++++++++++++++++++ extern/storage-sealing/sealiface/config.go | 2 + extern/storage-sealing/sealing.go | 301 ++------------------ extern/storage-sealing/sector_state.go | 8 +- extern/storage-sealing/stats.go | 13 +- extern/storage-sealing/types.go | 3 +- node/config/def.go | 9 + node/modules/storageminer.go | 2 + 11 files changed, 388 insertions(+), 353 deletions(-) create mode 100644 extern/storage-sealing/input.go diff --git a/cmd/lotus-storage-miner/info.go b/cmd/lotus-storage-miner/info.go index 30c2924f2de..edefacf4db1 100644 --- a/cmd/lotus-storage-miner/info.go +++ b/cmd/lotus-storage-miner/info.go @@ -284,6 +284,7 @@ var stateList = []stateMeta{ {col: color.FgBlue, state: sealing.Empty}, {col: color.FgBlue, state: sealing.WaitDeals}, + {col: color.FgBlue, state: sealing.AddPiece}, {col: color.FgRed, state: sealing.UndefinedSectorState}, {col: color.FgYellow, state: sealing.Packing}, diff --git a/extern/storage-sealing/fsm.go b/extern/storage-sealing/fsm.go index c989d02967b..0e3f6ba2b37 100644 --- a/extern/storage-sealing/fsm.go +++ b/extern/storage-sealing/fsm.go @@ -37,14 +37,20 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto // Sealing UndefinedSectorState: planOne( - on(SectorStart{}, Empty), + on(SectorStart{}, WaitDeals), on(SectorStartCC{}, Packing), ), - Empty: planOne(on(SectorAddPiece{}, WaitDeals)), + Empty: planOne( // deprecated + on(SectorAddPiece{}, AddPiece), + on(SectorStartPacking{}, Packing), + ), WaitDeals: planOne( - on(SectorAddPiece{}, WaitDeals), + on(SectorAddPiece{}, AddPiece), on(SectorStartPacking{}, Packing), ), + AddPiece: planOne( + on(SectorPieceAdded{}, WaitDeals), + ), Packing: planOne(on(SectorPacked{}, GetTicket)), GetTicket: planOne( on(SectorTicket{}, PreCommit1), @@ -238,12 +244,11 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta /* - * Empty <- incoming deals - | | - | v - *<- WaitDeals <- incoming deals - | | - | v + UndefinedSectorState (start) + v | + *<- WaitDeals <-> AddPiece | + | | /--------------------/ + | v v *<- Packing <- incoming committed capacity | | | v @@ -282,10 +287,6 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta v FailedUnrecoverable - UndefinedSectorState <- ¯\_(ツ)_/¯ - | ^ - *---------------------/ - */ m.stats.updateSector(m.minerSectorID(state.SectorNumber), state.State) @@ -295,7 +296,9 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta case Empty: fallthrough case WaitDeals: - log.Infof("Waiting for deals %d", state.SectorNumber) + return m.handleWaitDeals, processed, nil + case AddPiece: + return m.handleAddPiece, processed, nil case Packing: return m.handlePacking, processed, nil case GetTicket: @@ -418,60 +421,10 @@ func (m *Sealing) restartSectors(ctx context.Context) error { log.Errorf("loading sector list: %+v", err) } - cfg, err := m.getConfig() - if err != nil { - return xerrors.Errorf("getting the sealing delay: %w", err) - } - - spt, err := m.currentSealProof(ctx) - if err != nil { - return xerrors.Errorf("getting current seal proof: %w", err) - } - ssize, err := spt.SectorSize() - if err != nil { - return err - } - - // m.unsealedInfoMap.lk.Lock() taken early in .New to prevent races - defer m.unsealedInfoMap.lk.Unlock() - for _, sector := range trackedSectors { if err := m.sectors.Send(uint64(sector.SectorNumber), SectorRestart{}); err != nil { log.Errorf("restarting sector %d: %+v", sector.SectorNumber, err) } - - if sector.State == WaitDeals { - - // put the sector in the unsealedInfoMap - if _, ok := m.unsealedInfoMap.infos[sector.SectorNumber]; ok { - // something's funky here, but probably safe to move on - log.Warnf("sector %v was already in the unsealedInfoMap when restarting", sector.SectorNumber) - } else { - ui := UnsealedSectorInfo{ - ssize: ssize, - } - for _, p := range sector.Pieces { - if p.DealInfo != nil { - ui.numDeals++ - } - ui.stored += p.Piece.Size - ui.pieceSizes = append(ui.pieceSizes, p.Piece.Size.Unpadded()) - } - - m.unsealedInfoMap.infos[sector.SectorNumber] = ui - } - - // start a fresh timer for the sector - if cfg.WaitDealsDelay > 0 { - timer := time.NewTimer(cfg.WaitDealsDelay) - go func() { - <-timer.C - if err := m.StartPacking(sector.SectorNumber); err != nil { - log.Errorf("starting sector %d: %+v", sector.SectorNumber, err) - } - }() - } - } } // TODO: Grab on-chain sector set and diff with trackedSectors diff --git a/extern/storage-sealing/fsm_events.go b/extern/storage-sealing/fsm_events.go index e2836672197..969ccf1e2dc 100644 --- a/extern/storage-sealing/fsm_events.go +++ b/extern/storage-sealing/fsm_events.go @@ -77,11 +77,20 @@ func (evt SectorStartCC) apply(state *SectorInfo) { } type SectorAddPiece struct { - NewPiece Piece + NewPiece cid.Cid } func (evt SectorAddPiece) apply(state *SectorInfo) { - state.Pieces = append(state.Pieces, evt.NewPiece) + state.PendingPieces = append(state.PendingPieces, evt.NewPiece) +} + +type SectorPieceAdded struct { + NewPieces []Piece +} + +func (evt SectorPieceAdded) apply(state *SectorInfo) { + state.Pieces = append(state.Pieces, evt.NewPieces...) + state.PendingPieces = nil } type SectorStartPacking struct{} diff --git a/extern/storage-sealing/input.go b/extern/storage-sealing/input.go new file mode 100644 index 00000000000..b0d06b326e6 --- /dev/null +++ b/extern/storage-sealing/input.go @@ -0,0 +1,308 @@ +package sealing + +import ( + "context" + "sort" + + "golang.org/x/xerrors" + + "github.com/ipfs/go-cid" + + "github.com/filecoin-project/go-padreader" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-statemachine" + "github.com/filecoin-project/specs-storage/storage" + + sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage" + "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" +) + +func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) error { + // if full / oldish / has oldish deals goto seal + // ^ also per sector deal limit + + // send SectorStartPacking + + m.inputLk.Lock() + var used abi.UnpaddedPieceSize + for _, piece := range sector.Pieces { + used += piece.Piece.Size.Unpadded() + } + + m.openSectors[m.minerSectorID(sector.SectorNumber)] = &openSector{ + used: used, + maybeAccept: func(cid cid.Cid) error { + // todo check space + + // todo check deal expiration + + return ctx.Send(SectorAddPiece{cid}) + }, + } + + go func() { + defer m.inputLk.Unlock() + if err := m.updateInput(ctx.Context(), sector.SectorType); err != nil { + log.Errorf("%+v", err) + } + }() + + return nil +} + +func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) error { + ssize, err := sector.SectorType.SectorSize() + if err != nil { + return err + } + + m.inputLk.Lock() + delete(m.openSectors, m.minerSectorID(sector.SectorNumber)) // todo: do this when handling the event + m.inputLk.Unlock() + + res := SectorPieceAdded{} + + for _, piece := range sector.PendingPieces { + m.inputLk.Lock() + deal, ok := m.pendingPieces[piece] + m.inputLk.Unlock() + if !ok { + // todo: this probably means that the miner process was restarted in the middle of adding pieces. + // Truncate whatever was in process of being added to the sector (keep sector.Pieces as those are cleanly added, then go to WaitDeals) + return xerrors.Errorf("piece %s assigned to sector %d not found", piece, sector.SectorNumber) + } + + var stored abi.PaddedPieceSize + for _, piece := range sector.Pieces { + stored += piece.Piece.Size + } + + pads, padLength := ffiwrapper.GetRequiredPadding(stored, deal.size.Padded()) + + if stored+padLength+deal.size.Padded() > abi.PaddedPieceSize(ssize) { + return xerrors.Errorf("piece assigned to a sector with not enough space") + } + + offset := padLength + pieceSizes := make([]abi.UnpaddedPieceSize, len(sector.Pieces)) + for i, p := range sector.Pieces { + pieceSizes[i] = p.Piece.Size.Unpadded() + offset += p.Piece.Size + } + + for _, p := range pads { + ppi, err := m.sealer.AddPiece(sectorstorage.WithPriority(ctx.Context(), DealSectorPriority), + m.minerSector(sector.SectorType, sector.SectorNumber), + pieceSizes, + p.Unpadded(), + NewNullReader(p.Unpadded())) + if err != nil { + return xerrors.Errorf("writing padding piece: %w", err) // todo failed state + } + + pieceSizes = append(pieceSizes, p.Unpadded()) + res.NewPieces = append(res.NewPieces, Piece{ + Piece: ppi, + }) + } + + ppi, err := m.sealer.AddPiece(sectorstorage.WithPriority(ctx.Context(), DealSectorPriority), + m.minerSector(sector.SectorType, sector.SectorNumber), + pieceSizes, + deal.size, + deal.data) + if err != nil { + return xerrors.Errorf("writing padding piece: %w", err) // todo failed state + } + + pieceSizes = append(pieceSizes, deal.size) + res.NewPieces = append(res.NewPieces, Piece{ + Piece: ppi, + DealInfo: &deal.deal, + }) + } + + return ctx.Send(res) +} + +func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPieceSize, data storage.Data, deal DealInfo) (abi.SectorNumber, abi.PaddedPieceSize, error) { + log.Infof("Adding piece for deal %d (publish msg: %s)", deal.DealID, deal.PublishCid) + if (padreader.PaddedSize(uint64(size))) != size { + return 0, 0, xerrors.Errorf("cannot allocate unpadded piece") + } + + sp, err := m.currentSealProof(ctx) + if err != nil { + return 0, 0, xerrors.Errorf("getting current seal proof type: %w", err) + } + + ssize, err := sp.SectorSize() + if err != nil { + return 0, 0, err + } + + if size > abi.PaddedPieceSize(ssize).Unpadded() { + return 0, 0, xerrors.Errorf("piece cannot fit into a sector") + } + + if deal.PublishCid == nil { + return 0, 0, xerrors.Errorf("piece must have a PublishCID") + } + + m.inputLk.Lock() + if _, exist := m.pendingPieces[*deal.PublishCid]; exist { + m.inputLk.Unlock() + return 0, 0, xerrors.Errorf("piece for deal %s already pending", *deal.PublishCid) + } + + resCh := make(chan struct{sn abi.SectorNumber; offset abi.UnpaddedPieceSize; err error}, 1) + + m.pendingPieces[*deal.PublishCid] = &pendingPiece{ + size: size, + deal: deal, + data: data, + assigned: false, + accepted: func(sn abi.SectorNumber, offset abi.UnpaddedPieceSize, err error) { + resCh <- struct { + sn abi.SectorNumber + offset abi.UnpaddedPieceSize + err error + }{sn: sn, offset: offset, err: err} + }, + } + + go func() { + defer m.inputLk.Unlock() + if err := m.updateInput(ctx, sp); err != nil { + log.Errorf("%+v", err) + } + }() + + res := <-resCh + + return res.sn, res.offset.Padded(), res.err +} + +// called with m.inputLk +func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) error { + ssize, err := sp.SectorSize() + if err != nil { + return err + } + + type match struct { + sector abi.SectorID + deal cid.Cid + + size abi.UnpaddedPieceSize + padding abi.UnpaddedPieceSize + } + + var matches []match + toAssign := map[cid.Cid]struct{}{} // used to maybe create new sectors + + // todo: this is distinctly O(n^2), may need to be optimized for tiny deals and large scale miners + // (unlikely to be a problem now) + for id, sector := range m.openSectors { + avail := abi.PaddedPieceSize(ssize).Unpadded() - sector.used + + for pieceCid, piece := range m.pendingPieces { + if piece.assigned { + continue // already assigned to a sector, skip + } + + toAssign[pieceCid] = struct{}{} + + if piece.size <= avail { // (note: if we have enough space for the piece, we also have enough space for inter-piece padding) + matches = append(matches, match{ + sector: id, + deal: pieceCid, + + size: piece.size, + padding: avail % piece.size, + }) + } + + } + } + + sort.Slice(matches, func(i, j int) bool { + if matches[i].padding != matches[j].padding { // less padding is better + return matches[i].padding < matches[j].padding + } + + if matches[i].size != matches[j].size { // larger pieces are better + return matches[i].size < matches[j].size + } + + return matches[i].sector.Number < matches[j].sector.Number // prefer older sectors + }) + + var assigned int + for _, mt := range matches { + if m.pendingPieces[mt.deal].assigned { + assigned++ + continue + } + + if _, found := m.openSectors[mt.sector]; !found { + continue + } + + err := m.openSectors[mt.sector].maybeAccept(mt.deal) + if err != nil { + m.pendingPieces[mt.deal].accepted(mt.sector.Number, 0, err) // non-error case in handleAddPiece + } + + m.pendingPieces[mt.deal].assigned = true + delete(toAssign, mt.deal) + + if err != nil { + log.Errorf("sector %d rejected deal %s: %+v", mt.sector, mt.deal, err) + continue + } + + delete(m.openSectors, mt.sector) + } + + if len(toAssign) > 0 { + m.tryCreateDealSector(ctx, sp) + + } + + return nil +} + +func (m *Sealing) tryCreateDealSector(ctx context.Context, sp abi.RegisteredSealProof) error { + cfg, err := m.getConfig() + if err != nil { + return xerrors.Errorf("getting storage config: %w", err) + } + + if cfg.MaxSealingSectorsForDeals > 0 { + if m.stats.curSealing() > cfg.MaxSealingSectorsForDeals { + return nil + } + if m.stats.curStaging() > cfg.MaxWaitDealsSectors { + return nil + } + } + + // Now actually create a new sector + + sid, err := m.sc.Next() + if err != nil { + return xerrors.Errorf("getting sector number: %w", err) + } + + err = m.sealer.NewSector(ctx, m.minerSector(sp, sid)) + if err != nil { + return xerrors.Errorf("initializing sector: %w", err) + } + + log.Infow("Creating sector", "number", sid, "type", "deal", "proofType", sp) + return m.sectors.Send(uint64(sid), SectorStart{ + ID: sid, + SectorType: sp, + }) +} diff --git a/extern/storage-sealing/sealiface/config.go b/extern/storage-sealing/sealiface/config.go index 945565562bd..4e0f51202de 100644 --- a/extern/storage-sealing/sealiface/config.go +++ b/extern/storage-sealing/sealiface/config.go @@ -15,4 +15,6 @@ type Config struct { MaxSealingSectorsForDeals uint64 WaitDealsDelay time.Duration + + TargetWaitDealsSectors uint64 } diff --git a/extern/storage-sealing/sealing.go b/extern/storage-sealing/sealing.go index 96d63efdce2..ce54b8a9e70 100644 --- a/extern/storage-sealing/sealing.go +++ b/extern/storage-sealing/sealing.go @@ -3,10 +3,7 @@ package sealing import ( "context" "errors" - "io" - "math" "sync" - "time" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" @@ -15,7 +12,6 @@ import ( "golang.org/x/xerrors" "github.com/filecoin-project/go-address" - padreader "github.com/filecoin-project/go-padreader" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/crypto" @@ -85,9 +81,11 @@ type Sealing struct { sectors *statemachine.StateGroup sc SectorIDCounter verif ffiwrapper.Verifier + pcp PreCommitPolicy - pcp PreCommitPolicy - unsealedInfoMap UnsealedSectorMap + inputLk sync.Mutex + openSectors map[abi.SectorID]*openSector + pendingPieces map[cid.Cid]*pendingPiece upgradeLk sync.Mutex toUpgrade map[abi.SectorNumber]struct{} @@ -108,17 +106,20 @@ type FeeConfig struct { MaxTerminateGasFee abi.TokenAmount } -type UnsealedSectorMap struct { - infos map[abi.SectorNumber]UnsealedSectorInfo - lk sync.Mutex +type openSector struct { + used abi.UnpaddedPieceSize // change to bitfield/rle when AddPiece gains offset support to better fill sectors + + maybeAccept func(cid.Cid) error } -type UnsealedSectorInfo struct { - numDeals uint64 - // stored should always equal sum of pieceSizes.Padded() - stored abi.PaddedPieceSize - pieceSizes []abi.UnpaddedPieceSize - ssize abi.SectorSize +type pendingPiece struct { + size abi.UnpaddedPieceSize + deal DealInfo + + data storage.Data + + assigned bool // assigned to a sector? + accepted func(abi.SectorNumber, abi.UnpaddedPieceSize, error) } func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds datastore.Batching, sealer sectorstorage.SectorManager, sc SectorIDCounter, verif ffiwrapper.Verifier, pcp PreCommitPolicy, gc GetSealingConfigFunc, notifee SectorStateNotifee, as AddrSel) *Sealing { @@ -132,12 +133,10 @@ func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds sc: sc, verif: verif, pcp: pcp, - unsealedInfoMap: UnsealedSectorMap{ - infos: make(map[abi.SectorNumber]UnsealedSectorInfo), - lk: sync.Mutex{}, - }, - toUpgrade: map[abi.SectorNumber]struct{}{}, + openSectors: map[abi.SectorID]*openSector{}, + pendingPieces: map[cid.Cid]*pendingPiece{}, + toUpgrade: map[abi.SectorNumber]struct{}{}, notifee: notifee, addrSel: as, @@ -153,8 +152,6 @@ func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds s.sectors = statemachine.New(namespace.Wrap(ds, datastore.NewKey(SectorStorePrefix)), s, SectorInfo{}) - s.unsealedInfoMap.lk.Lock() // released after initialized in .Run() - return s } @@ -178,104 +175,6 @@ func (m *Sealing) Stop(ctx context.Context) error { return nil } -func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPieceSize, r io.Reader, d DealInfo) (abi.SectorNumber, abi.PaddedPieceSize, error) { - log.Infof("Adding piece for deal %d (publish msg: %s)", d.DealID, d.PublishCid) - if (padreader.PaddedSize(uint64(size))) != size { - return 0, 0, xerrors.Errorf("cannot allocate unpadded piece") - } - - sp, err := m.currentSealProof(ctx) - if err != nil { - return 0, 0, xerrors.Errorf("getting current seal proof type: %w", err) - } - - ssize, err := sp.SectorSize() - if err != nil { - return 0, 0, err - } - - if size > abi.PaddedPieceSize(ssize).Unpadded() { - return 0, 0, xerrors.Errorf("piece cannot fit into a sector") - } - - m.unsealedInfoMap.lk.Lock() - - sid, pads, err := m.getSectorAndPadding(ctx, size) - if err != nil { - m.unsealedInfoMap.lk.Unlock() - return 0, 0, xerrors.Errorf("getting available sector: %w", err) - } - - for _, p := range pads { - err = m.addPiece(ctx, sid, p.Unpadded(), NewNullReader(p.Unpadded()), nil) - if err != nil { - m.unsealedInfoMap.lk.Unlock() - return 0, 0, xerrors.Errorf("writing pads: %w", err) - } - } - - offset := m.unsealedInfoMap.infos[sid].stored - err = m.addPiece(ctx, sid, size, r, &d) - - if err != nil { - m.unsealedInfoMap.lk.Unlock() - return 0, 0, xerrors.Errorf("adding piece to sector: %w", err) - } - - startPacking := m.unsealedInfoMap.infos[sid].numDeals >= getDealPerSectorLimit(ssize) - - m.unsealedInfoMap.lk.Unlock() - - if startPacking { - if err := m.StartPacking(sid); err != nil { - return 0, 0, xerrors.Errorf("start packing: %w", err) - } - } - - return sid, offset, nil -} - -// Caller should hold m.unsealedInfoMap.lk -func (m *Sealing) addPiece(ctx context.Context, sectorID abi.SectorNumber, size abi.UnpaddedPieceSize, r io.Reader, di *DealInfo) error { - log.Infof("Adding piece to sector %d", sectorID) - sp, err := m.currentSealProof(ctx) - if err != nil { - return xerrors.Errorf("getting current seal proof type: %w", err) - } - ssize, err := sp.SectorSize() - if err != nil { - return err - } - - ppi, err := m.sealer.AddPiece(sectorstorage.WithPriority(ctx, DealSectorPriority), m.minerSector(sp, sectorID), m.unsealedInfoMap.infos[sectorID].pieceSizes, size, r) - if err != nil { - return xerrors.Errorf("writing piece: %w", err) - } - piece := Piece{ - Piece: ppi, - DealInfo: di, - } - - err = m.sectors.Send(uint64(sectorID), SectorAddPiece{NewPiece: piece}) - if err != nil { - return err - } - - ui := m.unsealedInfoMap.infos[sectorID] - num := m.unsealedInfoMap.infos[sectorID].numDeals - if di != nil { - num = num + 1 - } - m.unsealedInfoMap.infos[sectorID] = UnsealedSectorInfo{ - numDeals: num, - stored: ui.stored + piece.Piece.Size, - pieceSizes: append(ui.pieceSizes, piece.Piece.Size.Unpadded()), - ssize: ssize, - } - - return nil -} - func (m *Sealing) Remove(ctx context.Context, sid abi.SectorNumber) error { return m.sectors.Send(uint64(sid), SectorRemove{}) } @@ -292,168 +191,6 @@ func (m *Sealing) TerminatePending(ctx context.Context) ([]abi.SectorID, error) return m.terminator.Pending(ctx) } -// Caller should NOT hold m.unsealedInfoMap.lk -func (m *Sealing) StartPacking(sectorID abi.SectorNumber) error { - // locking here ensures that when the SectorStartPacking event is sent, the sector won't be picked up anywhere else - m.unsealedInfoMap.lk.Lock() - defer m.unsealedInfoMap.lk.Unlock() - - // cannot send SectorStartPacking to sectors that have already been packed, otherwise it will cause the state machine to exit - if _, ok := m.unsealedInfoMap.infos[sectorID]; !ok { - log.Warnf("call start packing, but sector %v not in unsealedInfoMap.infos, maybe have called", sectorID) - return nil - } - log.Infof("Starting packing sector %d", sectorID) - err := m.sectors.Send(uint64(sectorID), SectorStartPacking{}) - if err != nil { - return err - } - log.Infof("send Starting packing event success sector %d", sectorID) - - delete(m.unsealedInfoMap.infos, sectorID) - - return nil -} - -// Caller should hold m.unsealedInfoMap.lk -func (m *Sealing) getSectorAndPadding(ctx context.Context, size abi.UnpaddedPieceSize) (abi.SectorNumber, []abi.PaddedPieceSize, error) { - for tries := 0; tries < 100; tries++ { - for k, v := range m.unsealedInfoMap.infos { - pads, padLength := ffiwrapper.GetRequiredPadding(v.stored, size.Padded()) - - if v.stored+size.Padded()+padLength <= abi.PaddedPieceSize(v.ssize) { - return k, pads, nil - } - } - - if len(m.unsealedInfoMap.infos) > 0 { - log.Infow("tried to put a piece into an open sector, found none with enough space", "open", len(m.unsealedInfoMap.infos), "size", size, "tries", tries) - } - - ns, ssize, err := m.newDealSector(ctx) - switch err { - case nil: - m.unsealedInfoMap.infos[ns] = UnsealedSectorInfo{ - numDeals: 0, - stored: 0, - pieceSizes: nil, - ssize: ssize, - } - case errTooManySealing: - m.unsealedInfoMap.lk.Unlock() - - select { - case <-time.After(2 * time.Second): - case <-ctx.Done(): - m.unsealedInfoMap.lk.Lock() - return 0, nil, xerrors.Errorf("getting sector for piece: %w", ctx.Err()) - } - - m.unsealedInfoMap.lk.Lock() - continue - default: - return 0, nil, xerrors.Errorf("creating new sector: %w", err) - } - - return ns, nil, nil - } - - return 0, nil, xerrors.Errorf("failed to allocate piece to a sector") -} - -var errTooManySealing = errors.New("too many sectors sealing") - -// newDealSector creates a new sector for deal storage -func (m *Sealing) newDealSector(ctx context.Context) (abi.SectorNumber, abi.SectorSize, error) { - // First make sure we don't have too many 'open' sectors - - cfg, err := m.getConfig() - if err != nil { - return 0, 0, xerrors.Errorf("getting config: %w", err) - } - - if cfg.MaxSealingSectorsForDeals > 0 { - if m.stats.curSealing() > cfg.MaxSealingSectorsForDeals { - return 0, 0, ErrTooManySectorsSealing - } - } - - if cfg.MaxWaitDealsSectors > 0 && uint64(len(m.unsealedInfoMap.infos)) >= cfg.MaxWaitDealsSectors { - // Too many sectors are sealing in parallel. Start sealing one, and retry - // allocating the piece to a sector (we're dropping the lock here, so in - // case other goroutines are also trying to create a sector, we retry in - // getSectorAndPadding instead of here - otherwise if we have lots of - // parallel deals in progress, we can start creating a ton of sectors - // with just a single deal in them) - var mostStored abi.PaddedPieceSize = math.MaxUint64 - var best abi.SectorNumber = math.MaxUint64 - - for sn, info := range m.unsealedInfoMap.infos { - if info.stored+1 > mostStored+1 { // 18446744073709551615 + 1 = 0 - best = sn - } - } - - if best != math.MaxUint64 { - m.unsealedInfoMap.lk.Unlock() - err := m.StartPacking(best) - m.unsealedInfoMap.lk.Lock() - - if err != nil { - log.Errorf("newDealSector StartPacking error: %+v", err) - // let's pretend this is fine - } - } - - return 0, 0, errTooManySealing // will wait a bit and retry - } - - spt, err := m.currentSealProof(ctx) - if err != nil { - return 0, 0, xerrors.Errorf("getting current seal proof type: %w", err) - } - - // Now actually create a new sector - - sid, err := m.sc.Next() - if err != nil { - return 0, 0, xerrors.Errorf("getting sector number: %w", err) - } - - err = m.sealer.NewSector(context.TODO(), m.minerSector(spt, sid)) - if err != nil { - return 0, 0, xerrors.Errorf("initializing sector: %w", err) - } - - log.Infof("Creating sector %d", sid) - err = m.sectors.Send(uint64(sid), SectorStart{ - ID: sid, - SectorType: spt, - }) - - if err != nil { - return 0, 0, xerrors.Errorf("starting the sector fsm: %w", err) - } - - cf, err := m.getConfig() - if err != nil { - return 0, 0, xerrors.Errorf("getting the sealing delay: %w", err) - } - - if cf.WaitDealsDelay > 0 { - timer := time.NewTimer(cf.WaitDealsDelay) - go func() { - <-timer.C - if err := m.StartPacking(sid); err != nil { - log.Errorf("starting sector %d: %+v", sid, err) - } - }() - } - - ssize, err := spt.SectorSize() - return sid, ssize, err -} - // newSectorCC accepts a slice of pieces with no deal (junk data) func (m *Sealing) newSectorCC(ctx context.Context, sid abi.SectorNumber, pieces []Piece) error { spt, err := m.currentSealProof(ctx) diff --git a/extern/storage-sealing/sector_state.go b/extern/storage-sealing/sector_state.go index 49a60795895..da3db401bf4 100644 --- a/extern/storage-sealing/sector_state.go +++ b/extern/storage-sealing/sector_state.go @@ -6,6 +6,7 @@ var ExistSectorStateList = map[SectorState]struct{}{ Empty: {}, WaitDeals: {}, Packing: {}, + AddPiece: {}, GetTicket: {}, PreCommit1: {}, PreCommit2: {}, @@ -43,8 +44,9 @@ const ( UndefinedSectorState SectorState = "" // happy path - Empty SectorState = "Empty" + Empty SectorState = "Empty" // deprecated WaitDeals SectorState = "WaitDeals" // waiting for more pieces (deals) to be added to the sector + AddPiece SectorState = "AddPiece" // put deal data (and padding if required) into the sector Packing SectorState = "Packing" // sector not in sealStore, and not on chain GetTicket SectorState = "GetTicket" // generate ticket PreCommit1 SectorState = "PreCommit1" // do PreCommit1 @@ -85,7 +87,9 @@ const ( func toStatState(st SectorState) statSectorState { switch st { - case Empty, WaitDeals, Packing, GetTicket, PreCommit1, PreCommit2, PreCommitting, PreCommitWait, WaitSeed, Committing, SubmitCommit, CommitWait, FinalizeSector: + case Empty, WaitDeals, AddPiece: + return sstStaging + case Packing, GetTicket, PreCommit1, PreCommit2, PreCommitting, PreCommitWait, WaitSeed, Committing, SubmitCommit, CommitWait, FinalizeSector: return sstSealing case Proving, Removed, Removing, Terminating, TerminateWait, TerminateFinality, TerminateFailed: return sstProving diff --git a/extern/storage-sealing/stats.go b/extern/storage-sealing/stats.go index 78630c216be..10852937572 100644 --- a/extern/storage-sealing/stats.go +++ b/extern/storage-sealing/stats.go @@ -9,7 +9,8 @@ import ( type statSectorState int const ( - sstSealing statSectorState = iota + sstStaging statSectorState = iota + sstSealing sstFailed sstProving nsst @@ -41,5 +42,13 @@ func (ss *SectorStats) curSealing() uint64 { ss.lk.Lock() defer ss.lk.Unlock() - return ss.totals[sstSealing] + ss.totals[sstFailed] + return ss.totals[sstStaging] + ss.totals[sstSealing] + ss.totals[sstFailed] +} + +// return the number of sectors waiting to enter the sealing pipeline +func (ss *SectorStats) curStaging() uint64 { + ss.lk.Lock() + defer ss.lk.Unlock() + + return ss.totals[sstStaging] } diff --git a/extern/storage-sealing/types.go b/extern/storage-sealing/types.go index 1d507362207..762fe227a59 100644 --- a/extern/storage-sealing/types.go +++ b/extern/storage-sealing/types.go @@ -70,7 +70,8 @@ type SectorInfo struct { SectorType abi.RegisteredSealProof // Packing - Pieces []Piece + Pieces []Piece + PendingPieces []cid.Cid // PreCommit1 TicketValue abi.SealRandomness diff --git a/node/config/def.go b/node/config/def.go index a20e0ceaaac..716e5060257 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -64,6 +64,14 @@ type SealingConfig struct { MaxSealingSectorsForDeals uint64 WaitDealsDelay Duration + + // Keep this many sectors in sealing pipeline, start CC if needed + // todo TargetSealingSectors uint64 + + // Try to keep this many sectors waiting for deals + TargetWaitDealsSectors uint64 + + // todo TargetSectors - stop auto-pleding new sectors after this many sectors are sealed, default CC upgrade for deals sectors if above } type MinerFeeConfig struct { @@ -183,6 +191,7 @@ func DefaultStorageMiner() *StorageMiner { MaxSealingSectors: 0, MaxSealingSectorsForDeals: 0, WaitDealsDelay: Duration(time.Hour * 6), + TargetWaitDealsSectors: 2, }, Storage: sectorstorage.SealerConfig{ diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 30f84aeaff8..f459cf2c494 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -805,6 +805,7 @@ func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error MaxSealingSectors: cfg.MaxSealingSectors, MaxSealingSectorsForDeals: cfg.MaxSealingSectorsForDeals, WaitDealsDelay: config.Duration(cfg.WaitDealsDelay), + TargetWaitDealsSectors: cfg.TargetWaitDealsSectors, } }) return @@ -819,6 +820,7 @@ func NewGetSealConfigFunc(r repo.LockedRepo) (dtypes.GetSealingConfigFunc, error MaxSealingSectors: cfg.Sealing.MaxSealingSectors, MaxSealingSectorsForDeals: cfg.Sealing.MaxSealingSectorsForDeals, WaitDealsDelay: time.Duration(cfg.Sealing.WaitDealsDelay), + TargetWaitDealsSectors: cfg.Sealing.TargetWaitDealsSectors, } }) return From 542357a1dfa58f433e72985d5abd193367151012 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 18 Jan 2021 21:59:34 +0100 Subject: [PATCH 02/15] storagefsm: Start packing correctly --- extern/storage-sealing/fsm.go | 73 +++++++++++------- extern/storage-sealing/fsm_events.go | 8 +- extern/storage-sealing/input.go | 110 ++++++++++++++++++--------- extern/storage-sealing/sealing.go | 2 + extern/storage-sealing/types.go | 3 +- node/modules/storageminer.go | 4 +- 6 files changed, 130 insertions(+), 70 deletions(-) diff --git a/extern/storage-sealing/fsm.go b/extern/storage-sealing/fsm.go index 0e3f6ba2b37..a1739a1cca3 100644 --- a/extern/storage-sealing/fsm.go +++ b/extern/storage-sealing/fsm.go @@ -50,6 +50,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto ), AddPiece: planOne( on(SectorPieceAdded{}, WaitDeals), + apply(SectorStartPacking{}), ), Packing: planOne(on(SectorPacked{}, GetTicket)), GetTicket: planOne( @@ -447,56 +448,72 @@ func final(events []statemachine.Event, state *SectorInfo) (uint64, error) { return 0, xerrors.Errorf("didn't expect any events in state %s, got %+v", state.State, events) } -func on(mut mutator, next SectorState) func() (mutator, func(*SectorInfo) error) { - return func() (mutator, func(*SectorInfo) error) { - return mut, func(state *SectorInfo) error { +func on(mut mutator, next SectorState) func() (mutator, func(*SectorInfo) (bool, error)) { + return func() (mutator, func(*SectorInfo) (bool, error)) { + return mut, func(state *SectorInfo) (bool, error) { state.State = next - return nil + return false, nil + } + } +} + +// like `on`, but doesn't change state +func apply(mut mutator) func() (mutator, func(*SectorInfo) (bool, error)) { + return func() (mutator, func(*SectorInfo) (bool, error)) { + return mut, func(state *SectorInfo) (bool, error) { + return true, nil } } } -func onReturning(mut mutator) func() (mutator, func(*SectorInfo) error) { - return func() (mutator, func(*SectorInfo) error) { - return mut, func(state *SectorInfo) error { +func onReturning(mut mutator) func() (mutator, func(*SectorInfo) (bool, error)) { + return func() (mutator, func(*SectorInfo) (bool, error)) { + return mut, func(state *SectorInfo) (bool, error) { if state.Return == "" { - return xerrors.Errorf("return state not set") + return false, xerrors.Errorf("return state not set") } state.State = SectorState(state.Return) state.Return = "" - return nil + return false, nil } } } -func planOne(ts ...func() (mut mutator, next func(*SectorInfo) error)) func(events []statemachine.Event, state *SectorInfo) (uint64, error) { +func planOne(ts ...func() (mut mutator, next func(*SectorInfo) (more bool, err error))) func(events []statemachine.Event, state *SectorInfo) (uint64, error) { return func(events []statemachine.Event, state *SectorInfo) (uint64, error) { - if gm, ok := events[0].User.(globalMutator); ok { - gm.applyGlobal(state) - return 1, nil - } + for i, event := range events { + if gm, ok := event.User.(globalMutator); ok { + gm.applyGlobal(state) + return uint64(i + 1), nil + } - for _, t := range ts { - mut, next := t() + for _, t := range ts { + mut, next := t() - if reflect.TypeOf(events[0].User) != reflect.TypeOf(mut) { - continue - } + if reflect.TypeOf(event.User) != reflect.TypeOf(mut) { + continue + } + + if err, iserr := event.User.(error); iserr { + log.Warnf("sector %d got error event %T: %+v", state.SectorNumber, event.User, err) + } - if err, iserr := events[0].User.(error); iserr { - log.Warnf("sector %d got error event %T: %+v", state.SectorNumber, events[0].User, err) + event.User.(mutator).apply(state) + more, err := next(state) + if err != nil || !more { + return uint64(i + 1), err + } } - events[0].User.(mutator).apply(state) - return 1, next(state) - } + _, ok := event.User.(Ignorable) + if ok { + continue + } - _, ok := events[0].User.(Ignorable) - if ok { - return 1, nil + return uint64(i + 1), xerrors.Errorf("planner for state %s received unexpected event %T (%+v)", state.State, event.User, event) } - return 0, xerrors.Errorf("planner for state %s received unexpected event %T (%+v)", state.State, events[0].User, events[0]) + return uint64(len(events)), nil } } diff --git a/extern/storage-sealing/fsm_events.go b/extern/storage-sealing/fsm_events.go index 969ccf1e2dc..f9c1553e353 100644 --- a/extern/storage-sealing/fsm_events.go +++ b/extern/storage-sealing/fsm_events.go @@ -1,13 +1,16 @@ package sealing import ( - "github.com/filecoin-project/lotus/chain/actors/builtin/miner" + "time" + "github.com/ipfs/go-cid" "golang.org/x/xerrors" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/specs-storage/storage" + + "github.com/filecoin-project/lotus/chain/actors/builtin/miner" ) type mutator interface { @@ -81,6 +84,9 @@ type SectorAddPiece struct { } func (evt SectorAddPiece) apply(state *SectorInfo) { + if state.CreationTime.IsZero() { + state.CreationTime = time.Now() + } state.PendingPieces = append(state.PendingPieces, evt.NewPiece) } diff --git a/extern/storage-sealing/input.go b/extern/storage-sealing/input.go index b0d06b326e6..0361f5c0850 100644 --- a/extern/storage-sealing/input.go +++ b/extern/storage-sealing/input.go @@ -3,6 +3,7 @@ package sealing import ( "context" "sort" + "time" "golang.org/x/xerrors" @@ -21,9 +22,35 @@ func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) e // if full / oldish / has oldish deals goto seal // ^ also per sector deal limit - // send SectorStartPacking - m.inputLk.Lock() + + now := time.Now() + st := m.sectorTimers[m.minerSectorID(sector.SectorNumber)] + if st != nil { + if !st.Stop() { // timer expired, SectorStartPacking was/is being sent + // we send another SectorStartPacking in case one was sent in the handleAddPiece state + return ctx.Send(SectorStartPacking{}) + } + } + + if !sector.CreationTime.IsZero() { + cfg, err := m.getConfig() + if err != nil { + return xerrors.Errorf("getting storage config: %w", err) + } + + sealTime := sector.CreationTime.Add(cfg.WaitDealsDelay) + if now.After(sealTime) { + return ctx.Send(SectorStartPacking{}) + } else { + m.sectorTimers[m.minerSectorID(sector.SectorNumber)] = time.AfterFunc(sealTime.Sub(now), func() { + if err := ctx.Send(SectorStartPacking{}); err != nil { + log.Errorw("sending SectorStartPacking event failed", "sector", sector.SectorNumber, "error", err) + } + }) + } + } + var used abi.UnpaddedPieceSize for _, piece := range sector.Pieces { used += piece.Piece.Size.Unpadded() @@ -32,7 +59,7 @@ func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) e m.openSectors[m.minerSectorID(sector.SectorNumber)] = &openSector{ used: used, maybeAccept: func(cid cid.Cid) error { - // todo check space + // todo double check space // todo check deal expiration @@ -62,6 +89,13 @@ func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) er res := SectorPieceAdded{} + var offset abi.UnpaddedPieceSize + pieceSizes := make([]abi.UnpaddedPieceSize, len(sector.Pieces)) + for i, p := range sector.Pieces { + pieceSizes[i] = p.Piece.Size.Unpadded() + offset += p.Piece.Size.Unpadded() + } + for _, piece := range sector.PendingPieces { m.inputLk.Lock() deal, ok := m.pendingPieces[piece] @@ -72,23 +106,13 @@ func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) er return xerrors.Errorf("piece %s assigned to sector %d not found", piece, sector.SectorNumber) } - var stored abi.PaddedPieceSize - for _, piece := range sector.Pieces { - stored += piece.Piece.Size - } - - pads, padLength := ffiwrapper.GetRequiredPadding(stored, deal.size.Padded()) + pads, padLength := ffiwrapper.GetRequiredPadding(offset.Padded(), deal.size.Padded()) - if stored+padLength+deal.size.Padded() > abi.PaddedPieceSize(ssize) { + if offset.Padded()+padLength+deal.size.Padded() > abi.PaddedPieceSize(ssize) { return xerrors.Errorf("piece assigned to a sector with not enough space") } - offset := padLength - pieceSizes := make([]abi.UnpaddedPieceSize, len(sector.Pieces)) - for i, p := range sector.Pieces { - pieceSizes[i] = p.Piece.Size.Unpadded() - offset += p.Piece.Size - } + offset += padLength.Unpadded() for _, p := range pads { ppi, err := m.sealer.AddPiece(sectorstorage.WithPriority(ctx.Context(), DealSectorPriority), @@ -115,9 +139,13 @@ func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) er return xerrors.Errorf("writing padding piece: %w", err) // todo failed state } + deal.accepted(sector.SectorNumber, offset, nil) + + offset += deal.size pieceSizes = append(pieceSizes, deal.size) + res.NewPieces = append(res.NewPieces, Piece{ - Piece: ppi, + Piece: ppi, DealInfo: &deal.deal, }) } @@ -155,7 +183,11 @@ func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPiec return 0, 0, xerrors.Errorf("piece for deal %s already pending", *deal.PublishCid) } - resCh := make(chan struct{sn abi.SectorNumber; offset abi.UnpaddedPieceSize; err error}, 1) + resCh := make(chan struct { + sn abi.SectorNumber + offset abi.UnpaddedPieceSize + err error + }, 1) m.pendingPieces[*deal.PublishCid] = &pendingPiece{ size: size, @@ -164,9 +196,9 @@ func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPiec assigned: false, accepted: func(sn abi.SectorNumber, offset abi.UnpaddedPieceSize, err error) { resCh <- struct { - sn abi.SectorNumber + sn abi.SectorNumber offset abi.UnpaddedPieceSize - err error + err error }{sn: sn, offset: offset, err: err} }, } @@ -203,15 +235,15 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e // todo: this is distinctly O(n^2), may need to be optimized for tiny deals and large scale miners // (unlikely to be a problem now) - for id, sector := range m.openSectors { - avail := abi.PaddedPieceSize(ssize).Unpadded() - sector.used + for pieceCid, piece := range m.pendingPieces { + if piece.assigned { + continue // already assigned to a sector, skip + } - for pieceCid, piece := range m.pendingPieces { - if piece.assigned { - continue // already assigned to a sector, skip - } + toAssign[pieceCid] = struct{}{} - toAssign[pieceCid] = struct{}{} + for id, sector := range m.openSectors { + avail := abi.PaddedPieceSize(ssize).Unpadded() - sector.used if piece.size <= avail { // (note: if we have enough space for the piece, we also have enough space for inter-piece padding) matches = append(matches, match{ @@ -222,10 +254,8 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e padding: avail % piece.size, }) } - } } - sort.Slice(matches, func(i, j int) bool { if matches[i].padding != matches[j].padding { // less padding is better return matches[i].padding < matches[j].padding @@ -266,8 +296,9 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e } if len(toAssign) > 0 { - m.tryCreateDealSector(ctx, sp) - + if err := m.tryCreateDealSector(ctx, sp); err != nil { + log.Errorw("Failed to create a new sector for deals", "error", err) + } } return nil @@ -279,13 +310,12 @@ func (m *Sealing) tryCreateDealSector(ctx context.Context, sp abi.RegisteredSeal return xerrors.Errorf("getting storage config: %w", err) } - if cfg.MaxSealingSectorsForDeals > 0 { - if m.stats.curSealing() > cfg.MaxSealingSectorsForDeals { - return nil - } - if m.stats.curStaging() > cfg.MaxWaitDealsSectors { - return nil - } + if cfg.MaxSealingSectorsForDeals > 0 && m.stats.curSealing() > cfg.MaxSealingSectorsForDeals { + return nil + } + + if cfg.MaxWaitDealsSectors > 0 && m.stats.curStaging() > cfg.MaxWaitDealsSectors { + return nil } // Now actually create a new sector @@ -306,3 +336,7 @@ func (m *Sealing) tryCreateDealSector(ctx context.Context, sp abi.RegisteredSeal SectorType: sp, }) } + +func (m *Sealing) StartPacking(sid abi.SectorNumber) error { + return m.sectors.Send(uint64(sid), SectorStartPacking{}) +} diff --git a/extern/storage-sealing/sealing.go b/extern/storage-sealing/sealing.go index ce54b8a9e70..6f4ed09762a 100644 --- a/extern/storage-sealing/sealing.go +++ b/extern/storage-sealing/sealing.go @@ -4,6 +4,7 @@ import ( "context" "errors" "sync" + "time" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" @@ -85,6 +86,7 @@ type Sealing struct { inputLk sync.Mutex openSectors map[abi.SectorID]*openSector + sectorTimers map[abi.SectorID]*time.Timer pendingPieces map[cid.Cid]*pendingPiece upgradeLk sync.Mutex diff --git a/extern/storage-sealing/types.go b/extern/storage-sealing/types.go index 762fe227a59..3f9f55b7484 100644 --- a/extern/storage-sealing/types.go +++ b/extern/storage-sealing/types.go @@ -3,8 +3,8 @@ package sealing import ( "bytes" "context" - "github.com/ipfs/go-cid" + "time" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" @@ -70,6 +70,7 @@ type SectorInfo struct { SectorType abi.RegisteredSealProof // Packing + CreationTime time.Time Pieces []Piece PendingPieces []cid.Cid diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index f459cf2c494..64f62d6ae10 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -805,7 +805,7 @@ func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error MaxSealingSectors: cfg.MaxSealingSectors, MaxSealingSectorsForDeals: cfg.MaxSealingSectorsForDeals, WaitDealsDelay: config.Duration(cfg.WaitDealsDelay), - TargetWaitDealsSectors: cfg.TargetWaitDealsSectors, + TargetWaitDealsSectors: cfg.TargetWaitDealsSectors, } }) return @@ -820,7 +820,7 @@ func NewGetSealConfigFunc(r repo.LockedRepo) (dtypes.GetSealingConfigFunc, error MaxSealingSectors: cfg.Sealing.MaxSealingSectors, MaxSealingSectorsForDeals: cfg.Sealing.MaxSealingSectorsForDeals, WaitDealsDelay: time.Duration(cfg.Sealing.WaitDealsDelay), - TargetWaitDealsSectors: cfg.Sealing.TargetWaitDealsSectors, + TargetWaitDealsSectors: cfg.Sealing.TargetWaitDealsSectors, } }) return From e5814dac4fe2a21162b34d13ec74f2e3766229e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 18 Jan 2021 22:05:56 +0100 Subject: [PATCH 03/15] cbor-gen --- extern/storage-sealing/cbor_gen.go | 83 +++++++++++++++++++++++++++++- 1 file changed, 82 insertions(+), 1 deletion(-) diff --git a/extern/storage-sealing/cbor_gen.go b/extern/storage-sealing/cbor_gen.go index 70be08ace37..d626e647391 100644 --- a/extern/storage-sealing/cbor_gen.go +++ b/extern/storage-sealing/cbor_gen.go @@ -8,6 +8,7 @@ import ( abi "github.com/filecoin-project/go-state-types/abi" miner "github.com/filecoin-project/specs-actors/actors/builtin/miner" + cid "github.com/ipfs/go-cid" cbg "github.com/whyrusleeping/cbor-gen" xerrors "golang.org/x/xerrors" ) @@ -475,7 +476,7 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error { _, err := w.Write(cbg.CborNull) return err } - if _, err := w.Write([]byte{184, 25}); err != nil { + if _, err := w.Write([]byte{184, 27}); err != nil { return err } @@ -542,6 +543,22 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error { } } + // t.CreationTime (time.Time) (struct) + if len("CreationTime") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"CreationTime\" was too long") + } + + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("CreationTime"))); err != nil { + return err + } + if _, err := io.WriteString(w, string("CreationTime")); err != nil { + return err + } + + if err := t.CreationTime.MarshalCBOR(w); err != nil { + return err + } + // t.Pieces ([]sealing.Piece) (slice) if len("Pieces") > cbg.MaxLength { return xerrors.Errorf("Value in field \"Pieces\" was too long") @@ -567,6 +584,31 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error { } } + // t.PendingPieces ([]cid.Cid) (slice) + if len("PendingPieces") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"PendingPieces\" was too long") + } + + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("PendingPieces"))); err != nil { + return err + } + if _, err := io.WriteString(w, string("PendingPieces")); err != nil { + return err + } + + if len(t.PendingPieces) > cbg.MaxLength { + return xerrors.Errorf("Slice value in field t.PendingPieces was too long") + } + + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajArray, uint64(len(t.PendingPieces))); err != nil { + return err + } + for _, v := range t.PendingPieces { + if err := cbg.WriteCidBuf(scratch, w, v); err != nil { + return xerrors.Errorf("failed writing cid field t.PendingPieces: %w", err) + } + } + // t.TicketValue (abi.SealRandomness) (slice) if len("TicketValue") > cbg.MaxLength { return xerrors.Errorf("Value in field \"TicketValue\" was too long") @@ -1107,6 +1149,16 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) error { t.SectorType = abi.RegisteredSealProof(extraI) } + // t.CreationTime (time.Time) (struct) + case "CreationTime": + + { + + if err := t.CreationTime.UnmarshalCBOR(br); err != nil { + return xerrors.Errorf("unmarshaling t.CreationTime: %w", err) + } + + } // t.Pieces ([]sealing.Piece) (slice) case "Pieces": @@ -1137,6 +1189,35 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) error { t.Pieces[i] = v } + // t.PendingPieces ([]cid.Cid) (slice) + case "PendingPieces": + + maj, extra, err = cbg.CborReadHeaderBuf(br, scratch) + if err != nil { + return err + } + + if extra > cbg.MaxLength { + return fmt.Errorf("t.PendingPieces: array too large (%d)", extra) + } + + if maj != cbg.MajArray { + return fmt.Errorf("expected cbor array") + } + + if extra > 0 { + t.PendingPieces = make([]cid.Cid, extra) + } + + for i := 0; i < int(extra); i++ { + + c, err := cbg.ReadCid(br) + if err != nil { + return xerrors.Errorf("reading cid field t.PendingPieces failed: %w", err) + } + t.PendingPieces[i] = c + } + // t.TicketValue (abi.SealRandomness) (slice) case "TicketValue": From 9857ad83785a73f84c2d45e666ce7d1af89f690d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 19 Jan 2021 19:04:05 +0100 Subject: [PATCH 04/15] storagefsm: Fix some deadlock cases --- extern/storage-sealing/input.go | 16 ++++++++++------ extern/storage-sealing/types.go | 3 ++- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/extern/storage-sealing/input.go b/extern/storage-sealing/input.go index 0361f5c0850..077af05bf72 100644 --- a/extern/storage-sealing/input.go +++ b/extern/storage-sealing/input.go @@ -28,6 +28,8 @@ func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) e st := m.sectorTimers[m.minerSectorID(sector.SectorNumber)] if st != nil { if !st.Stop() { // timer expired, SectorStartPacking was/is being sent + m.inputLk.Unlock() + // we send another SectorStartPacking in case one was sent in the handleAddPiece state return ctx.Send(SectorStartPacking{}) } @@ -36,19 +38,21 @@ func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) e if !sector.CreationTime.IsZero() { cfg, err := m.getConfig() if err != nil { + m.inputLk.Unlock() return xerrors.Errorf("getting storage config: %w", err) } sealTime := sector.CreationTime.Add(cfg.WaitDealsDelay) if now.After(sealTime) { + m.inputLk.Unlock() return ctx.Send(SectorStartPacking{}) - } else { - m.sectorTimers[m.minerSectorID(sector.SectorNumber)] = time.AfterFunc(sealTime.Sub(now), func() { - if err := ctx.Send(SectorStartPacking{}); err != nil { - log.Errorw("sending SectorStartPacking event failed", "sector", sector.SectorNumber, "error", err) - } - }) } + + m.sectorTimers[m.minerSectorID(sector.SectorNumber)] = time.AfterFunc(sealTime.Sub(now), func() { + if err := ctx.Send(SectorStartPacking{}); err != nil { + log.Errorw("sending SectorStartPacking event failed", "sector", sector.SectorNumber, "error", err) + } + }) } var used abi.UnpaddedPieceSize diff --git a/extern/storage-sealing/types.go b/extern/storage-sealing/types.go index 3f9f55b7484..0949404fb4a 100644 --- a/extern/storage-sealing/types.go +++ b/extern/storage-sealing/types.go @@ -3,9 +3,10 @@ package sealing import ( "bytes" "context" - "github.com/ipfs/go-cid" "time" + "github.com/ipfs/go-cid" + "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/exitcode" From fd67a41c75d7e1f9d5b2c7ce68dd0cf207f3a0ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 20 Jan 2021 14:49:31 +0100 Subject: [PATCH 05/15] storagefsm: Change sector CreationTime to unix ts --- extern/storage-sealing/cbor_gen.go | 38 ++++++++++++++++++++++------ extern/storage-sealing/fsm_events.go | 4 +-- extern/storage-sealing/input.go | 4 +-- extern/storage-sealing/sealing.go | 1 + extern/storage-sealing/types.go | 4 +-- 5 files changed, 36 insertions(+), 15 deletions(-) diff --git a/extern/storage-sealing/cbor_gen.go b/extern/storage-sealing/cbor_gen.go index d626e647391..bd06bd1884a 100644 --- a/extern/storage-sealing/cbor_gen.go +++ b/extern/storage-sealing/cbor_gen.go @@ -543,7 +543,7 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error { } } - // t.CreationTime (time.Time) (struct) + // t.CreationTime (int64) (int64) if len("CreationTime") > cbg.MaxLength { return xerrors.Errorf("Value in field \"CreationTime\" was too long") } @@ -555,8 +555,14 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error { return err } - if err := t.CreationTime.MarshalCBOR(w); err != nil { - return err + if t.CreationTime >= 0 { + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajUnsignedInt, uint64(t.CreationTime)); err != nil { + return err + } + } else { + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajNegativeInt, uint64(-t.CreationTime-1)); err != nil { + return err + } } // t.Pieces ([]sealing.Piece) (slice) @@ -1149,15 +1155,31 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) error { t.SectorType = abi.RegisteredSealProof(extraI) } - // t.CreationTime (time.Time) (struct) + // t.CreationTime (int64) (int64) case "CreationTime": - { - - if err := t.CreationTime.UnmarshalCBOR(br); err != nil { - return xerrors.Errorf("unmarshaling t.CreationTime: %w", err) + maj, extra, err := cbg.CborReadHeaderBuf(br, scratch) + var extraI int64 + if err != nil { + return err + } + switch maj { + case cbg.MajUnsignedInt: + extraI = int64(extra) + if extraI < 0 { + return fmt.Errorf("int64 positive overflow") + } + case cbg.MajNegativeInt: + extraI = int64(extra) + if extraI < 0 { + return fmt.Errorf("int64 negative oveflow") + } + extraI = -1 - extraI + default: + return fmt.Errorf("wrong type for int64 field: %d", maj) } + t.CreationTime = int64(extraI) } // t.Pieces ([]sealing.Piece) (slice) case "Pieces": diff --git a/extern/storage-sealing/fsm_events.go b/extern/storage-sealing/fsm_events.go index f9c1553e353..b74154f07e6 100644 --- a/extern/storage-sealing/fsm_events.go +++ b/extern/storage-sealing/fsm_events.go @@ -84,8 +84,8 @@ type SectorAddPiece struct { } func (evt SectorAddPiece) apply(state *SectorInfo) { - if state.CreationTime.IsZero() { - state.CreationTime = time.Now() + if state.CreationTime == 0 { + state.CreationTime = time.Now().Unix() } state.PendingPieces = append(state.PendingPieces, evt.NewPiece) } diff --git a/extern/storage-sealing/input.go b/extern/storage-sealing/input.go index 077af05bf72..241df8ebe08 100644 --- a/extern/storage-sealing/input.go +++ b/extern/storage-sealing/input.go @@ -35,14 +35,14 @@ func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) e } } - if !sector.CreationTime.IsZero() { + if sector.CreationTime != 0 { cfg, err := m.getConfig() if err != nil { m.inputLk.Unlock() return xerrors.Errorf("getting storage config: %w", err) } - sealTime := sector.CreationTime.Add(cfg.WaitDealsDelay) + sealTime := time.Unix(sector.CreationTime, 0).Add(cfg.WaitDealsDelay) if now.After(sealTime) { m.inputLk.Unlock() return ctx.Send(SectorStartPacking{}) diff --git a/extern/storage-sealing/sealing.go b/extern/storage-sealing/sealing.go index 6f4ed09762a..141521fcf8b 100644 --- a/extern/storage-sealing/sealing.go +++ b/extern/storage-sealing/sealing.go @@ -137,6 +137,7 @@ func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds pcp: pcp, openSectors: map[abi.SectorID]*openSector{}, + sectorTimers: map[abi.SectorID]*time.Timer{}, pendingPieces: map[cid.Cid]*pendingPiece{}, toUpgrade: map[abi.SectorNumber]struct{}{}, diff --git a/extern/storage-sealing/types.go b/extern/storage-sealing/types.go index 0949404fb4a..71b2e1ad7a3 100644 --- a/extern/storage-sealing/types.go +++ b/extern/storage-sealing/types.go @@ -3,8 +3,6 @@ package sealing import ( "bytes" "context" - "time" - "github.com/ipfs/go-cid" "github.com/filecoin-project/go-state-types/abi" @@ -71,7 +69,7 @@ type SectorInfo struct { SectorType abi.RegisteredSealProof // Packing - CreationTime time.Time + CreationTime int64 // unix seconds Pieces []Piece PendingPieces []cid.Cid From 270f2935a930679aa6de93c80a276307a9238d49 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 20 Jan 2021 15:20:44 +0100 Subject: [PATCH 06/15] storagefsm: Check per-sector deal limits --- extern/storage-sealing/input.go | 22 +++++++++++++++++++++- extern/storage-sealing/sealing.go | 11 ++++++++--- extern/storage-sealing/types.go | 1 + 3 files changed, 30 insertions(+), 4 deletions(-) diff --git a/extern/storage-sealing/input.go b/extern/storage-sealing/input.go index 241df8ebe08..f9325c688bf 100644 --- a/extern/storage-sealing/input.go +++ b/extern/storage-sealing/input.go @@ -35,6 +35,15 @@ func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) e } } + maxDeals, err := getDealPerSectorLimit(sector.SectorType) + if err != nil { + return xerrors.Errorf("getting per-sector deal limit: %w", err) + } + + if len(sector.dealIDs()) >= maxDeals { + return ctx.Send(SectorStartPacking{}) + } + if sector.CreationTime != 0 { cfg, err := m.getConfig() if err != nil { @@ -100,7 +109,12 @@ func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) er offset += p.Piece.Size.Unpadded() } - for _, piece := range sector.PendingPieces { + maxDeals, err := getDealPerSectorLimit(sector.SectorType) + if err != nil { + return xerrors.Errorf("getting per-sector deal limit: %w", err) + } + + for i, piece := range sector.PendingPieces { m.inputLk.Lock() deal, ok := m.pendingPieces[piece] m.inputLk.Unlock() @@ -110,6 +124,12 @@ func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) er return xerrors.Errorf("piece %s assigned to sector %d not found", piece, sector.SectorNumber) } + if len(sector.dealIDs())+(i+1) > maxDeals { + // shouldn't happen, but just in case + deal.accepted(sector.SectorNumber, offset, xerrors.Errorf("too many deals assigned to sector %d, dropping deal", sector.SectorNumber)) + continue + } + pads, padLength := ffiwrapper.GetRequiredPadding(offset.Padded(), deal.size.Padded()) if offset.Padded()+padLength+deal.size.Padded() > abi.PaddedPieceSize(ssize) { diff --git a/extern/storage-sealing/sealing.go b/extern/storage-sealing/sealing.go index 141521fcf8b..995c6ceec32 100644 --- a/extern/storage-sealing/sealing.go +++ b/extern/storage-sealing/sealing.go @@ -241,9 +241,14 @@ func (m *Sealing) Address() address.Address { return m.maddr } -func getDealPerSectorLimit(size abi.SectorSize) uint64 { +func getDealPerSectorLimit(spt abi.RegisteredSealProof) (int, error) { + size, err := spt.SectorSize() + if err != nil { + return 0, err + } + if size < 64<<30 { - return 256 + return 256, nil } - return 512 + return 512, nil } diff --git a/extern/storage-sealing/types.go b/extern/storage-sealing/types.go index 71b2e1ad7a3..d9c54479e84 100644 --- a/extern/storage-sealing/types.go +++ b/extern/storage-sealing/types.go @@ -3,6 +3,7 @@ package sealing import ( "bytes" "context" + "github.com/ipfs/go-cid" "github.com/filecoin-project/go-state-types/abi" From 069766ecc41a66e3f7f746771ea21da6da7ee5f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 20 Jan 2021 16:00:00 +0100 Subject: [PATCH 07/15] storagefsm: Don't persist piece assignment queue --- extern/storage-sealing/cbor_gen.go | 57 +--------------------------- extern/storage-sealing/fsm_events.go | 6 +-- extern/storage-sealing/input.go | 31 ++++++++++----- extern/storage-sealing/sealing.go | 20 +++++----- extern/storage-sealing/types.go | 5 +-- 5 files changed, 37 insertions(+), 82 deletions(-) diff --git a/extern/storage-sealing/cbor_gen.go b/extern/storage-sealing/cbor_gen.go index bd06bd1884a..235a8e040ac 100644 --- a/extern/storage-sealing/cbor_gen.go +++ b/extern/storage-sealing/cbor_gen.go @@ -8,7 +8,6 @@ import ( abi "github.com/filecoin-project/go-state-types/abi" miner "github.com/filecoin-project/specs-actors/actors/builtin/miner" - cid "github.com/ipfs/go-cid" cbg "github.com/whyrusleeping/cbor-gen" xerrors "golang.org/x/xerrors" ) @@ -476,7 +475,7 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error { _, err := w.Write(cbg.CborNull) return err } - if _, err := w.Write([]byte{184, 27}); err != nil { + if _, err := w.Write([]byte{184, 26}); err != nil { return err } @@ -590,31 +589,6 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error { } } - // t.PendingPieces ([]cid.Cid) (slice) - if len("PendingPieces") > cbg.MaxLength { - return xerrors.Errorf("Value in field \"PendingPieces\" was too long") - } - - if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("PendingPieces"))); err != nil { - return err - } - if _, err := io.WriteString(w, string("PendingPieces")); err != nil { - return err - } - - if len(t.PendingPieces) > cbg.MaxLength { - return xerrors.Errorf("Slice value in field t.PendingPieces was too long") - } - - if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajArray, uint64(len(t.PendingPieces))); err != nil { - return err - } - for _, v := range t.PendingPieces { - if err := cbg.WriteCidBuf(scratch, w, v); err != nil { - return xerrors.Errorf("failed writing cid field t.PendingPieces: %w", err) - } - } - // t.TicketValue (abi.SealRandomness) (slice) if len("TicketValue") > cbg.MaxLength { return xerrors.Errorf("Value in field \"TicketValue\" was too long") @@ -1211,35 +1185,6 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) error { t.Pieces[i] = v } - // t.PendingPieces ([]cid.Cid) (slice) - case "PendingPieces": - - maj, extra, err = cbg.CborReadHeaderBuf(br, scratch) - if err != nil { - return err - } - - if extra > cbg.MaxLength { - return fmt.Errorf("t.PendingPieces: array too large (%d)", extra) - } - - if maj != cbg.MajArray { - return fmt.Errorf("expected cbor array") - } - - if extra > 0 { - t.PendingPieces = make([]cid.Cid, extra) - } - - for i := 0; i < int(extra); i++ { - - c, err := cbg.ReadCid(br) - if err != nil { - return xerrors.Errorf("reading cid field t.PendingPieces failed: %w", err) - } - t.PendingPieces[i] = c - } - // t.TicketValue (abi.SealRandomness) (slice) case "TicketValue": diff --git a/extern/storage-sealing/fsm_events.go b/extern/storage-sealing/fsm_events.go index b74154f07e6..98cf1830857 100644 --- a/extern/storage-sealing/fsm_events.go +++ b/extern/storage-sealing/fsm_events.go @@ -79,15 +79,12 @@ func (evt SectorStartCC) apply(state *SectorInfo) { state.SectorType = evt.SectorType } -type SectorAddPiece struct { - NewPiece cid.Cid -} +type SectorAddPiece struct{} func (evt SectorAddPiece) apply(state *SectorInfo) { if state.CreationTime == 0 { state.CreationTime = time.Now().Unix() } - state.PendingPieces = append(state.PendingPieces, evt.NewPiece) } type SectorPieceAdded struct { @@ -96,7 +93,6 @@ type SectorPieceAdded struct { func (evt SectorPieceAdded) apply(state *SectorInfo) { state.Pieces = append(state.Pieces, evt.NewPieces...) - state.PendingPieces = nil } type SectorStartPacking struct{} diff --git a/extern/storage-sealing/input.go b/extern/storage-sealing/input.go index f9325c688bf..5fa2043b1fa 100644 --- a/extern/storage-sealing/input.go +++ b/extern/storage-sealing/input.go @@ -76,7 +76,10 @@ func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) e // todo check deal expiration - return ctx.Send(SectorAddPiece{cid}) + sid := m.minerSectorID(sector.SectorNumber) + m.assignedPieces[sid] = append(m.assignedPieces[sid], cid) + + return ctx.Send(SectorAddPiece{}) }, } @@ -96,11 +99,19 @@ func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) er return err } + res := SectorPieceAdded{} + m.inputLk.Lock() - delete(m.openSectors, m.minerSectorID(sector.SectorNumber)) // todo: do this when handling the event - m.inputLk.Unlock() - res := SectorPieceAdded{} + pending, ok := m.assignedPieces[m.minerSectorID(sector.SectorNumber)] + if ok { + delete(m.assignedPieces, m.minerSectorID(sector.SectorNumber)) + } + m.inputLk.Unlock() + if !ok { + // nothing to do here + return ctx.Send(res) + } var offset abi.UnpaddedPieceSize pieceSizes := make([]abi.UnpaddedPieceSize, len(sector.Pieces)) @@ -114,13 +125,11 @@ func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) er return xerrors.Errorf("getting per-sector deal limit: %w", err) } - for i, piece := range sector.PendingPieces { + for i, piece := range pending { m.inputLk.Lock() deal, ok := m.pendingPieces[piece] m.inputLk.Unlock() if !ok { - // todo: this probably means that the miner process was restarted in the middle of adding pieces. - // Truncate whatever was in process of being added to the sector (keep sector.Pieces as those are cleanly added, then go to WaitDeals) return xerrors.Errorf("piece %s assigned to sector %d not found", piece, sector.SectorNumber) } @@ -145,7 +154,9 @@ func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) er p.Unpadded(), NewNullReader(p.Unpadded())) if err != nil { - return xerrors.Errorf("writing padding piece: %w", err) // todo failed state + err = xerrors.Errorf("writing padding piece: %w", err) + deal.accepted(sector.SectorNumber, offset, err) + return err // todo failed state } pieceSizes = append(pieceSizes, p.Unpadded()) @@ -160,7 +171,9 @@ func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) er deal.size, deal.data) if err != nil { - return xerrors.Errorf("writing padding piece: %w", err) // todo failed state + err = xerrors.Errorf("writing piece: %w", err) + deal.accepted(sector.SectorNumber, offset, err) + return err // todo failed state } deal.accepted(sector.SectorNumber, offset, nil) diff --git a/extern/storage-sealing/sealing.go b/extern/storage-sealing/sealing.go index 995c6ceec32..aa1a73e6c87 100644 --- a/extern/storage-sealing/sealing.go +++ b/extern/storage-sealing/sealing.go @@ -84,10 +84,11 @@ type Sealing struct { verif ffiwrapper.Verifier pcp PreCommitPolicy - inputLk sync.Mutex - openSectors map[abi.SectorID]*openSector - sectorTimers map[abi.SectorID]*time.Timer - pendingPieces map[cid.Cid]*pendingPiece + inputLk sync.Mutex + openSectors map[abi.SectorID]*openSector + sectorTimers map[abi.SectorID]*time.Timer + pendingPieces map[cid.Cid]*pendingPiece + assignedPieces map[abi.SectorID][]cid.Cid upgradeLk sync.Mutex toUpgrade map[abi.SectorNumber]struct{} @@ -111,7 +112,7 @@ type FeeConfig struct { type openSector struct { used abi.UnpaddedPieceSize // change to bitfield/rle when AddPiece gains offset support to better fill sectors - maybeAccept func(cid.Cid) error + maybeAccept func(cid.Cid) error // called with inputLk } type pendingPiece struct { @@ -136,10 +137,11 @@ func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds verif: verif, pcp: pcp, - openSectors: map[abi.SectorID]*openSector{}, - sectorTimers: map[abi.SectorID]*time.Timer{}, - pendingPieces: map[cid.Cid]*pendingPiece{}, - toUpgrade: map[abi.SectorNumber]struct{}{}, + openSectors: map[abi.SectorID]*openSector{}, + sectorTimers: map[abi.SectorID]*time.Timer{}, + pendingPieces: map[cid.Cid]*pendingPiece{}, + assignedPieces: map[abi.SectorID][]cid.Cid{}, + toUpgrade: map[abi.SectorNumber]struct{}{}, notifee: notifee, addrSel: as, diff --git a/extern/storage-sealing/types.go b/extern/storage-sealing/types.go index d9c54479e84..b6cc2afdb19 100644 --- a/extern/storage-sealing/types.go +++ b/extern/storage-sealing/types.go @@ -70,9 +70,8 @@ type SectorInfo struct { SectorType abi.RegisteredSealProof // Packing - CreationTime int64 // unix seconds - Pieces []Piece - PendingPieces []cid.Cid + CreationTime int64 // unix seconds + Pieces []Piece // PreCommit1 TicketValue abi.SealRandomness From f96f12c8366690dee71cabe9e8b3d66bad0daec9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 20 Jan 2021 18:18:12 +0100 Subject: [PATCH 08/15] storagefsm: Add rest of checks in WaitDeals --- extern/storage-sealing/input.go | 40 +++++++++++++++++++------------ extern/storage-sealing/sealing.go | 7 +----- 2 files changed, 26 insertions(+), 21 deletions(-) diff --git a/extern/storage-sealing/input.go b/extern/storage-sealing/input.go index 5fa2043b1fa..89cd27176f7 100644 --- a/extern/storage-sealing/input.go +++ b/extern/storage-sealing/input.go @@ -19,9 +19,6 @@ import ( ) func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) error { - // if full / oldish / has oldish deals goto seal - // ^ also per sector deal limit - m.inputLk.Lock() now := time.Now() @@ -35,12 +32,28 @@ func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) e } } - maxDeals, err := getDealPerSectorLimit(sector.SectorType) + ssize, err := sector.SectorType.SectorSize() + if err != nil { + return xerrors.Errorf("getting sector size") + } + + maxDeals, err := getDealPerSectorLimit(ssize) if err != nil { return xerrors.Errorf("getting per-sector deal limit: %w", err) } if len(sector.dealIDs()) >= maxDeals { + // can't accept more deals + return ctx.Send(SectorStartPacking{}) + } + + var used abi.UnpaddedPieceSize + for _, piece := range sector.Pieces { + used += piece.Piece.Size.Unpadded() + } + + if used.Padded() == abi.PaddedPieceSize(ssize) { + // sector full return ctx.Send(SectorStartPacking{}) } @@ -51,7 +64,9 @@ func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) e return xerrors.Errorf("getting storage config: %w", err) } + // todo check deal age, start sealing if any deal has less than X (configurable) to start deadline sealTime := time.Unix(sector.CreationTime, 0).Add(cfg.WaitDealsDelay) + if now.After(sealTime) { m.inputLk.Unlock() return ctx.Send(SectorStartPacking{}) @@ -64,17 +79,10 @@ func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) e }) } - var used abi.UnpaddedPieceSize - for _, piece := range sector.Pieces { - used += piece.Piece.Size.Unpadded() - } - m.openSectors[m.minerSectorID(sector.SectorNumber)] = &openSector{ used: used, maybeAccept: func(cid cid.Cid) error { - // todo double check space - - // todo check deal expiration + // todo check deal start deadline (configurable) sid := m.minerSectorID(sector.SectorNumber) m.assignedPieces[sid] = append(m.assignedPieces[sid], cid) @@ -120,7 +128,7 @@ func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) er offset += p.Piece.Size.Unpadded() } - maxDeals, err := getDealPerSectorLimit(sector.SectorType) + maxDeals, err := getDealPerSectorLimit(ssize) if err != nil { return xerrors.Errorf("getting per-sector deal limit: %w", err) } @@ -134,7 +142,7 @@ func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) er } if len(sector.dealIDs())+(i+1) > maxDeals { - // shouldn't happen, but just in case + // todo: this is rather unlikely to happen, but in case it does, return the deal to waiting queue instead of failing it deal.accepted(sector.SectorNumber, offset, xerrors.Errorf("too many deals assigned to sector %d, dropping deal", sector.SectorNumber)) continue } @@ -142,7 +150,9 @@ func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) er pads, padLength := ffiwrapper.GetRequiredPadding(offset.Padded(), deal.size.Padded()) if offset.Padded()+padLength+deal.size.Padded() > abi.PaddedPieceSize(ssize) { - return xerrors.Errorf("piece assigned to a sector with not enough space") + // todo: this is rather unlikely to happen, but in case it does, return the deal to waiting queue instead of failing it + deal.accepted(sector.SectorNumber, offset, xerrors.Errorf("piece %s assigned to sector %d with not enough space", piece, sector.SectorNumber)) + continue } offset += padLength.Unpadded() diff --git a/extern/storage-sealing/sealing.go b/extern/storage-sealing/sealing.go index aa1a73e6c87..6b8bb04d7a6 100644 --- a/extern/storage-sealing/sealing.go +++ b/extern/storage-sealing/sealing.go @@ -243,12 +243,7 @@ func (m *Sealing) Address() address.Address { return m.maddr } -func getDealPerSectorLimit(spt abi.RegisteredSealProof) (int, error) { - size, err := spt.SectorSize() - if err != nil { - return 0, err - } - +func getDealPerSectorLimit(size abi.SectorSize) (int, error) { if size < 64<<30 { return 256, nil } From b9a9f23204fc25b9ae033ee55a7d8e3815249a56 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 20 Jan 2021 18:42:22 +0100 Subject: [PATCH 09/15] storagefsm: Add stub AddPieceFailed state --- cmd/lotus-storage-miner/info.go | 1 + extern/storage-sealing/fsm.go | 2 ++ extern/storage-sealing/fsm_events.go | 5 +++++ extern/storage-sealing/input.go | 12 +++++++++--- extern/storage-sealing/sector_state.go | 2 ++ 5 files changed, 19 insertions(+), 3 deletions(-) diff --git a/cmd/lotus-storage-miner/info.go b/cmd/lotus-storage-miner/info.go index edefacf4db1..f14d307ab09 100644 --- a/cmd/lotus-storage-miner/info.go +++ b/cmd/lotus-storage-miner/info.go @@ -307,6 +307,7 @@ var stateList = []stateMeta{ {col: color.FgCyan, state: sealing.Removed}, {col: color.FgRed, state: sealing.FailedUnrecoverable}, + {col: color.FgRed, state: sealing.AddPieceFailed}, {col: color.FgRed, state: sealing.SealPreCommit1Failed}, {col: color.FgRed, state: sealing.SealPreCommit2Failed}, {col: color.FgRed, state: sealing.PreCommitFailed}, diff --git a/extern/storage-sealing/fsm.go b/extern/storage-sealing/fsm.go index a1739a1cca3..c38101e6cf3 100644 --- a/extern/storage-sealing/fsm.go +++ b/extern/storage-sealing/fsm.go @@ -51,6 +51,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto AddPiece: planOne( on(SectorPieceAdded{}, WaitDeals), apply(SectorStartPacking{}), + on(SectorAddPieceFailed{}, AddPieceFailed), ), Packing: planOne(on(SectorPacked{}, GetTicket)), GetTicket: planOne( @@ -104,6 +105,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto // Sealing errors + AddPieceFailed: planOne(), SealPreCommit1Failed: planOne( on(SectorRetrySealPreCommit1{}, PreCommit1), ), diff --git a/extern/storage-sealing/fsm_events.go b/extern/storage-sealing/fsm_events.go index 98cf1830857..14015c2d83a 100644 --- a/extern/storage-sealing/fsm_events.go +++ b/extern/storage-sealing/fsm_events.go @@ -95,6 +95,11 @@ func (evt SectorPieceAdded) apply(state *SectorInfo) { state.Pieces = append(state.Pieces, evt.NewPieces...) } +type SectorAddPieceFailed struct{ error } + +func (evt SectorAddPieceFailed) FormatError(xerrors.Printer) (next error) { return evt.error } +func (evt SectorAddPieceFailed) apply(si *SectorInfo) {} + type SectorStartPacking struct{} func (evt SectorStartPacking) apply(*SectorInfo) {} diff --git a/extern/storage-sealing/input.go b/extern/storage-sealing/input.go index 89cd27176f7..51e4356fa0e 100644 --- a/extern/storage-sealing/input.go +++ b/extern/storage-sealing/input.go @@ -117,7 +117,7 @@ func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) er } m.inputLk.Unlock() if !ok { - // nothing to do here + // nothing to do here (might happen after a restart in AddPiece) return ctx.Send(res) } @@ -166,7 +166,7 @@ func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) er if err != nil { err = xerrors.Errorf("writing padding piece: %w", err) deal.accepted(sector.SectorNumber, offset, err) - return err // todo failed state + return ctx.Send(SectorAddPieceFailed{err}) } pieceSizes = append(pieceSizes, p.Unpadded()) @@ -183,7 +183,7 @@ func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) er if err != nil { err = xerrors.Errorf("writing piece: %w", err) deal.accepted(sector.SectorNumber, offset, err) - return err // todo failed state + return ctx.Send(SectorAddPieceFailed{err}) } deal.accepted(sector.SectorNumber, offset, nil) @@ -200,6 +200,12 @@ func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) er return ctx.Send(res) } +func (m *Sealing) handleAddPieceFailed(ctx statemachine.Context, sector SectorInfo) error { + log.Errorf("No recovery plan for AddPiece failing") + // todo: cleanup sector / just go retry (requires adding offset param to AddPiece in sector-storage for this to be safe) + return nil +} + func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPieceSize, data storage.Data, deal DealInfo) (abi.SectorNumber, abi.PaddedPieceSize, error) { log.Infof("Adding piece for deal %d (publish msg: %s)", deal.DealID, deal.PublishCid) if (padreader.PaddedSize(uint64(size))) != size { diff --git a/extern/storage-sealing/sector_state.go b/extern/storage-sealing/sector_state.go index da3db401bf4..ae5b4fab248 100644 --- a/extern/storage-sealing/sector_state.go +++ b/extern/storage-sealing/sector_state.go @@ -7,6 +7,7 @@ var ExistSectorStateList = map[SectorState]struct{}{ WaitDeals: {}, Packing: {}, AddPiece: {}, + AddPieceFailed: {}, GetTicket: {}, PreCommit1: {}, PreCommit2: {}, @@ -61,6 +62,7 @@ const ( Proving SectorState = "Proving" // error modes FailedUnrecoverable SectorState = "FailedUnrecoverable" + AddPieceFailed SectorState = "AddPieceFailed" SealPreCommit1Failed SectorState = "SealPreCommit1Failed" SealPreCommit2Failed SectorState = "SealPreCommit2Failed" PreCommitFailed SectorState = "PreCommitFailed" From df14f156e1ef69ce50a1e12b51817a6d3ea1d1b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 20 Jan 2021 22:44:18 +0100 Subject: [PATCH 10/15] storagefsm: More logging for deal test debugging --- extern/storage-sealing/input.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/extern/storage-sealing/input.go b/extern/storage-sealing/input.go index 51e4356fa0e..b1a8f1b74b4 100644 --- a/extern/storage-sealing/input.go +++ b/extern/storage-sealing/input.go @@ -28,6 +28,7 @@ func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) e m.inputLk.Unlock() // we send another SectorStartPacking in case one was sent in the handleAddPiece state + log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "wait-timeout") return ctx.Send(SectorStartPacking{}) } } @@ -44,6 +45,7 @@ func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) e if len(sector.dealIDs()) >= maxDeals { // can't accept more deals + log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "maxdeals") return ctx.Send(SectorStartPacking{}) } @@ -54,6 +56,7 @@ func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) e if used.Padded() == abi.PaddedPieceSize(ssize) { // sector full + log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "filled") return ctx.Send(SectorStartPacking{}) } @@ -69,10 +72,13 @@ func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) e if now.After(sealTime) { m.inputLk.Unlock() + log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "wait-timeout") return ctx.Send(SectorStartPacking{}) } m.sectorTimers[m.minerSectorID(sector.SectorNumber)] = time.AfterFunc(sealTime.Sub(now), func() { + log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "wait-timer") + if err := ctx.Send(SectorStartPacking{}); err != nil { log.Errorw("sending SectorStartPacking event failed", "sector", sector.SectorNumber, "error", err) } @@ -175,6 +181,8 @@ func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) er }) } + time.Sleep(1 * time.Second) // TODO: deal tests are unhappy without this + ppi, err := m.sealer.AddPiece(sectorstorage.WithPriority(ctx.Context(), DealSectorPriority), m.minerSector(sector.SectorType, sector.SectorNumber), pieceSizes, @@ -186,6 +194,8 @@ func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) er return ctx.Send(SectorAddPieceFailed{err}) } + log.Infow("deal added to a sector", "deal", deal.deal.DealID, "sector", sector.SectorNumber, "piece", ppi.PieceCID) + deal.accepted(sector.SectorNumber, offset, nil) offset += deal.size From 1336d8855db86f142ac8e6eaa1b8c8c38218e4d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 21 Jan 2021 14:13:36 +0100 Subject: [PATCH 11/15] storagefsm: Drop addpiece wait after fixing storageadapter --- extern/storage-sealing/input.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/extern/storage-sealing/input.go b/extern/storage-sealing/input.go index b1a8f1b74b4..280a8698e4b 100644 --- a/extern/storage-sealing/input.go +++ b/extern/storage-sealing/input.go @@ -181,8 +181,6 @@ func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) er }) } - time.Sleep(1 * time.Second) // TODO: deal tests are unhappy without this - ppi, err := m.sealer.AddPiece(sectorstorage.WithPriority(ctx.Context(), DealSectorPriority), m.minerSector(sector.SectorType, sector.SectorNumber), pieceSizes, From ec4deb7e28c2f92111a55222e4ce3279700e774b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 21 Jan 2021 19:59:18 +0100 Subject: [PATCH 12/15] storagefsm: Fix unlocking in handleWaitDeals --- extern/storage-sealing/input.go | 77 ++++++++++++++++++--------------- 1 file changed, 43 insertions(+), 34 deletions(-) diff --git a/extern/storage-sealing/input.go b/extern/storage-sealing/input.go index 280a8698e4b..e005c326393 100644 --- a/extern/storage-sealing/input.go +++ b/extern/storage-sealing/input.go @@ -19,52 +19,80 @@ import ( ) func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) error { + var used abi.UnpaddedPieceSize + for _, piece := range sector.Pieces { + used += piece.Piece.Size.Unpadded() + } + m.inputLk.Lock() + started, err := m.maybeStartSealing(ctx, sector, used) + if err != nil || started { + m.inputLk.Unlock() + + return err + } + + m.openSectors[m.minerSectorID(sector.SectorNumber)] = &openSector{ + used: used, + maybeAccept: func(cid cid.Cid) error { + // todo check deal start deadline (configurable) + + sid := m.minerSectorID(sector.SectorNumber) + m.assignedPieces[sid] = append(m.assignedPieces[sid], cid) + + return ctx.Send(SectorAddPiece{}) + }, + } + + go func() { + defer m.inputLk.Unlock() + if err := m.updateInput(ctx.Context(), sector.SectorType); err != nil { + log.Errorf("%+v", err) + } + }() + + return nil +} + +func (m *Sealing) maybeStartSealing(ctx statemachine.Context, sector SectorInfo, used abi.UnpaddedPieceSize) (bool, error) { now := time.Now() st := m.sectorTimers[m.minerSectorID(sector.SectorNumber)] if st != nil { if !st.Stop() { // timer expired, SectorStartPacking was/is being sent - m.inputLk.Unlock() - // we send another SectorStartPacking in case one was sent in the handleAddPiece state log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "wait-timeout") - return ctx.Send(SectorStartPacking{}) + return true, ctx.Send(SectorStartPacking{}) } } ssize, err := sector.SectorType.SectorSize() if err != nil { - return xerrors.Errorf("getting sector size") + return false, xerrors.Errorf("getting sector size") } maxDeals, err := getDealPerSectorLimit(ssize) if err != nil { - return xerrors.Errorf("getting per-sector deal limit: %w", err) + return false, xerrors.Errorf("getting per-sector deal limit: %w", err) } if len(sector.dealIDs()) >= maxDeals { // can't accept more deals log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "maxdeals") - return ctx.Send(SectorStartPacking{}) - } - - var used abi.UnpaddedPieceSize - for _, piece := range sector.Pieces { - used += piece.Piece.Size.Unpadded() + return true, ctx.Send(SectorStartPacking{}) } if used.Padded() == abi.PaddedPieceSize(ssize) { // sector full log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "filled") - return ctx.Send(SectorStartPacking{}) + return true, ctx.Send(SectorStartPacking{}) } if sector.CreationTime != 0 { cfg, err := m.getConfig() if err != nil { m.inputLk.Unlock() - return xerrors.Errorf("getting storage config: %w", err) + return false, xerrors.Errorf("getting storage config: %w", err) } // todo check deal age, start sealing if any deal has less than X (configurable) to start deadline @@ -73,7 +101,7 @@ func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) e if now.After(sealTime) { m.inputLk.Unlock() log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "wait-timeout") - return ctx.Send(SectorStartPacking{}) + return true, ctx.Send(SectorStartPacking{}) } m.sectorTimers[m.minerSectorID(sector.SectorNumber)] = time.AfterFunc(sealTime.Sub(now), func() { @@ -85,26 +113,7 @@ func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) e }) } - m.openSectors[m.minerSectorID(sector.SectorNumber)] = &openSector{ - used: used, - maybeAccept: func(cid cid.Cid) error { - // todo check deal start deadline (configurable) - - sid := m.minerSectorID(sector.SectorNumber) - m.assignedPieces[sid] = append(m.assignedPieces[sid], cid) - - return ctx.Send(SectorAddPiece{}) - }, - } - - go func() { - defer m.inputLk.Unlock() - if err := m.updateInput(ctx.Context(), sector.SectorType); err != nil { - log.Errorf("%+v", err) - } - }() - - return nil + return false, nil } func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) error { From 1070ad2289fc82a8c885ee90e5f536d5672b6b58 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 21 Jan 2021 22:20:16 +0100 Subject: [PATCH 13/15] storagefsm: Drop unused TargetWaitDealsSectors --- extern/storage-sealing/sealiface/config.go | 2 -- node/config/def.go | 4 ---- node/modules/storageminer.go | 2 -- 3 files changed, 8 deletions(-) diff --git a/extern/storage-sealing/sealiface/config.go b/extern/storage-sealing/sealiface/config.go index 4e0f51202de..945565562bd 100644 --- a/extern/storage-sealing/sealiface/config.go +++ b/extern/storage-sealing/sealiface/config.go @@ -15,6 +15,4 @@ type Config struct { MaxSealingSectorsForDeals uint64 WaitDealsDelay time.Duration - - TargetWaitDealsSectors uint64 } diff --git a/node/config/def.go b/node/config/def.go index 716e5060257..6a494fd962f 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -68,9 +68,6 @@ type SealingConfig struct { // Keep this many sectors in sealing pipeline, start CC if needed // todo TargetSealingSectors uint64 - // Try to keep this many sectors waiting for deals - TargetWaitDealsSectors uint64 - // todo TargetSectors - stop auto-pleding new sectors after this many sectors are sealed, default CC upgrade for deals sectors if above } @@ -191,7 +188,6 @@ func DefaultStorageMiner() *StorageMiner { MaxSealingSectors: 0, MaxSealingSectorsForDeals: 0, WaitDealsDelay: Duration(time.Hour * 6), - TargetWaitDealsSectors: 2, }, Storage: sectorstorage.SealerConfig{ diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 64f62d6ae10..30f84aeaff8 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -805,7 +805,6 @@ func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error MaxSealingSectors: cfg.MaxSealingSectors, MaxSealingSectorsForDeals: cfg.MaxSealingSectorsForDeals, WaitDealsDelay: config.Duration(cfg.WaitDealsDelay), - TargetWaitDealsSectors: cfg.TargetWaitDealsSectors, } }) return @@ -820,7 +819,6 @@ func NewGetSealConfigFunc(r repo.LockedRepo) (dtypes.GetSealingConfigFunc, error MaxSealingSectors: cfg.Sealing.MaxSealingSectors, MaxSealingSectorsForDeals: cfg.Sealing.MaxSealingSectorsForDeals, WaitDealsDelay: time.Duration(cfg.Sealing.WaitDealsDelay), - TargetWaitDealsSectors: cfg.Sealing.TargetWaitDealsSectors, } }) return From e27a530cbc52d9d470dae50d46177efb81657055 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 9 Feb 2021 18:44:41 +0100 Subject: [PATCH 14/15] storagefsm: cleanup openSectors better; pendingPieces by pieceCid --- extern/storage-sealing/input.go | 28 +++++++++++++++++------- extern/storage-sealing/states_sealing.go | 18 +++++++++++++++ 2 files changed, 38 insertions(+), 8 deletions(-) diff --git a/extern/storage-sealing/input.go b/extern/storage-sealing/input.go index e005c326393..77c36f15cab 100644 --- a/extern/storage-sealing/input.go +++ b/extern/storage-sealing/input.go @@ -28,6 +28,8 @@ func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) e started, err := m.maybeStartSealing(ctx, sector, used) if err != nil || started { + delete(m.openSectors, m.minerSectorID(sector.SectorNumber)) + m.inputLk.Unlock() return err @@ -243,14 +245,14 @@ func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPiec return 0, 0, xerrors.Errorf("piece cannot fit into a sector") } - if deal.PublishCid == nil { - return 0, 0, xerrors.Errorf("piece must have a PublishCID") + if _, err := deal.DealProposal.Cid(); err != nil { + return 0, 0, xerrors.Errorf("getting proposal CID: %w", err) } m.inputLk.Lock() - if _, exist := m.pendingPieces[*deal.PublishCid]; exist { + if _, exist := m.pendingPieces[proposalCID(deal)]; exist { m.inputLk.Unlock() - return 0, 0, xerrors.Errorf("piece for deal %s already pending", *deal.PublishCid) + return 0, 0, xerrors.Errorf("piece for deal %s already pending", proposalCID(deal)) } resCh := make(chan struct { @@ -259,7 +261,7 @@ func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPiec err error }, 1) - m.pendingPieces[*deal.PublishCid] = &pendingPiece{ + m.pendingPieces[proposalCID(deal)] = &pendingPiece{ size: size, deal: deal, data: data, @@ -305,12 +307,12 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e // todo: this is distinctly O(n^2), may need to be optimized for tiny deals and large scale miners // (unlikely to be a problem now) - for pieceCid, piece := range m.pendingPieces { + for proposalCid, piece := range m.pendingPieces { if piece.assigned { continue // already assigned to a sector, skip } - toAssign[pieceCid] = struct{}{} + toAssign[proposalCid] = struct{}{} for id, sector := range m.openSectors { avail := abi.PaddedPieceSize(ssize).Unpadded() - sector.used @@ -318,7 +320,7 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e if piece.size <= avail { // (note: if we have enough space for the piece, we also have enough space for inter-piece padding) matches = append(matches, match{ sector: id, - deal: pieceCid, + deal: proposalCid, size: piece.size, padding: avail % piece.size, @@ -410,3 +412,13 @@ func (m *Sealing) tryCreateDealSector(ctx context.Context, sp abi.RegisteredSeal func (m *Sealing) StartPacking(sid abi.SectorNumber) error { return m.sectors.Send(uint64(sid), SectorStartPacking{}) } + +func proposalCID(deal DealInfo) cid.Cid { + pc, err := deal.DealProposal.Cid() + if err != nil { + log.Errorf("DealProposal.Cid error: %+v", err) + return cid.Undef + } + + return pc +} diff --git a/extern/storage-sealing/states_sealing.go b/extern/storage-sealing/states_sealing.go index 1aedcdfb469..8c0918e4d3e 100644 --- a/extern/storage-sealing/states_sealing.go +++ b/extern/storage-sealing/states_sealing.go @@ -25,6 +25,24 @@ var DealSectorPriority = 1024 var MaxTicketAge = abi.ChainEpoch(builtin0.EpochsInDay * 2) func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) error { + m.inputLk.Lock() + // make sure we not accepting deals into this sector + for _, c := range m.assignedPieces[m.minerSectorID(sector.SectorNumber)] { + pp := m.pendingPieces[c] + delete(m.pendingPieces, c) + if pp == nil { + log.Errorf("nil assigned pending piece %s", c) + continue + } + + // todo: return to the sealing queue (this is extremely unlikely to happen) + pp.accepted(sector.SectorNumber, 0, xerrors.Errorf("sector entered packing state early")) + } + + delete(m.openSectors, m.minerSectorID(sector.SectorNumber)) + delete(m.assignedPieces, m.minerSectorID(sector.SectorNumber)) + m.inputLk.Unlock() + log.Infow("performing filling up rest of the sector...", "sector", sector.SectorNumber) var allocated abi.UnpaddedPieceSize From 6907e5879d0573ab1e9aa550610be038410bc610 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 11 Feb 2021 13:52:00 +0100 Subject: [PATCH 15/15] Fix WaitDeals sector accounting --- extern/storage-sealing/input.go | 4 ++-- extern/storage-sealing/sector_state.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/extern/storage-sealing/input.go b/extern/storage-sealing/input.go index 77c36f15cab..ae1d6f0ddc4 100644 --- a/extern/storage-sealing/input.go +++ b/extern/storage-sealing/input.go @@ -382,11 +382,11 @@ func (m *Sealing) tryCreateDealSector(ctx context.Context, sp abi.RegisteredSeal return xerrors.Errorf("getting storage config: %w", err) } - if cfg.MaxSealingSectorsForDeals > 0 && m.stats.curSealing() > cfg.MaxSealingSectorsForDeals { + if cfg.MaxSealingSectorsForDeals > 0 && m.stats.curSealing() >= cfg.MaxSealingSectorsForDeals { return nil } - if cfg.MaxWaitDealsSectors > 0 && m.stats.curStaging() > cfg.MaxWaitDealsSectors { + if cfg.MaxWaitDealsSectors > 0 && m.stats.curStaging() >= cfg.MaxWaitDealsSectors { return nil } diff --git a/extern/storage-sealing/sector_state.go b/extern/storage-sealing/sector_state.go index ae5b4fab248..b636614d1e8 100644 --- a/extern/storage-sealing/sector_state.go +++ b/extern/storage-sealing/sector_state.go @@ -89,7 +89,7 @@ const ( func toStatState(st SectorState) statSectorState { switch st { - case Empty, WaitDeals, AddPiece: + case UndefinedSectorState, Empty, WaitDeals, AddPiece: return sstStaging case Packing, GetTicket, PreCommit1, PreCommit2, PreCommitting, PreCommitWait, WaitSeed, Committing, SubmitCommit, CommitWait, FinalizeSector: return sstSealing