From fc5e243c921f87270c657fe4334bbc095062017d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 16 Feb 2021 17:14:59 +0100 Subject: [PATCH 1/5] storagefsm: Cleanup CC sector creation --- extern/storage-sealing/fsm_events.go | 2 - extern/storage-sealing/garbage.go | 93 ++++++------------------ extern/storage-sealing/sealing.go | 15 ---- extern/storage-sealing/states_sealing.go | 24 +++++- node/impl/storminer.go | 2 +- storage/sealing.go | 4 +- 6 files changed, 47 insertions(+), 93 deletions(-) diff --git a/extern/storage-sealing/fsm_events.go b/extern/storage-sealing/fsm_events.go index 14015c2d83a..8d11b248b35 100644 --- a/extern/storage-sealing/fsm_events.go +++ b/extern/storage-sealing/fsm_events.go @@ -70,12 +70,10 @@ func (evt SectorStart) apply(state *SectorInfo) { type SectorStartCC struct { ID abi.SectorNumber SectorType abi.RegisteredSealProof - Pieces []Piece } func (evt SectorStartCC) apply(state *SectorInfo) { state.SectorNumber = evt.ID - state.Pieces = evt.Pieces state.SectorType = evt.SectorType } diff --git a/extern/storage-sealing/garbage.go b/extern/storage-sealing/garbage.go index 185bebe3560..722e3e9bc51 100644 --- a/extern/storage-sealing/garbage.go +++ b/extern/storage-sealing/garbage.go @@ -4,34 +4,12 @@ import ( "context" "golang.org/x/xerrors" - - "github.com/filecoin-project/go-state-types/abi" - "github.com/filecoin-project/specs-storage/storage" ) -func (m *Sealing) pledgeSector(ctx context.Context, sectorID storage.SectorRef, existingPieceSizes []abi.UnpaddedPieceSize, sizes ...abi.UnpaddedPieceSize) ([]abi.PieceInfo, error) { - if len(sizes) == 0 { - return nil, nil - } - - log.Infof("Pledge %d, contains %+v", sectorID, existingPieceSizes) - - out := make([]abi.PieceInfo, len(sizes)) - for i, size := range sizes { - ppi, err := m.sealer.AddPiece(ctx, sectorID, existingPieceSizes, size, NewNullReader(size)) - if err != nil { - return nil, xerrors.Errorf("add piece: %w", err) - } - - existingPieceSizes = append(existingPieceSizes, size) - - out[i] = ppi - } - - return out, nil -} +func (m *Sealing) PledgeSector(ctx context.Context) error { + m.inputLk.Lock() + defer m.inputLk.Unlock() -func (m *Sealing) PledgeSector() error { cfg, err := m.getConfig() if err != nil { return xerrors.Errorf("getting config: %w", err) @@ -43,53 +21,24 @@ func (m *Sealing) PledgeSector() error { } } - go func() { - ctx := context.TODO() // we can't use the context from command which invokes - // this, as we run everything here async, and it's cancelled when the - // command exits - - spt, err := m.currentSealProof(ctx) - if err != nil { - log.Errorf("%+v", err) - return - } - - size, err := spt.SectorSize() - if err != nil { - log.Errorf("%+v", err) - return - } - - sid, err := m.sc.Next() - if err != nil { - log.Errorf("%+v", err) - return - } - sectorID := m.minerSector(spt, sid) - err = m.sealer.NewSector(ctx, sectorID) - if err != nil { - log.Errorf("%+v", err) - return - } - - pieces, err := m.pledgeSector(ctx, sectorID, []abi.UnpaddedPieceSize{}, abi.PaddedPieceSize(size).Unpadded()) - if err != nil { - log.Errorf("%+v", err) - return - } + spt, err := m.currentSealProof(ctx) + if err != nil { + return xerrors.Errorf("getting seal proof type: %w", err) + } - ps := make([]Piece, len(pieces)) - for idx := range ps { - ps[idx] = Piece{ - Piece: pieces[idx], - DealInfo: nil, - } - } + sid, err := m.sc.Next() + if err != nil { + return xerrors.Errorf("generating sector number: %w", err) + } + sectorID := m.minerSector(spt, sid) + err = m.sealer.NewSector(ctx, sectorID) + if err != nil { + return xerrors.Errorf("notifying sealer of the new sector: %w", err) + } - if err := m.newSectorCC(ctx, sid, ps); err != nil { - log.Errorf("%+v", err) - return - } - }() - return nil + log.Infof("Creating CC sector %d", sid) + return m.sectors.Send(uint64(sid), SectorStartCC{ + ID: sid, + SectorType: spt, + }) } diff --git a/extern/storage-sealing/sealing.go b/extern/storage-sealing/sealing.go index 12afa4d85c6..8feca3b7b11 100644 --- a/extern/storage-sealing/sealing.go +++ b/extern/storage-sealing/sealing.go @@ -202,21 +202,6 @@ func (m *Sealing) TerminatePending(ctx context.Context) ([]abi.SectorID, error) return m.terminator.Pending(ctx) } -// newSectorCC accepts a slice of pieces with no deal (junk data) -func (m *Sealing) newSectorCC(ctx context.Context, sid abi.SectorNumber, pieces []Piece) error { - spt, err := m.currentSealProof(ctx) - if err != nil { - return xerrors.Errorf("getting current seal proof type: %w", err) - } - - log.Infof("Creating CC sector %d", sid) - return m.sectors.Send(uint64(sid), SectorStartCC{ - ID: sid, - Pieces: pieces, - SectorType: spt, - }) -} - func (m *Sealing) currentSealProof(ctx context.Context) (abi.RegisteredSealProof, error) { mi, err := m.api.StateMinerInfo(ctx, m.maddr, nil) if err != nil { diff --git a/extern/storage-sealing/states_sealing.go b/extern/storage-sealing/states_sealing.go index 8c0918e4d3e..f5bb4cae6e9 100644 --- a/extern/storage-sealing/states_sealing.go +++ b/extern/storage-sealing/states_sealing.go @@ -70,7 +70,7 @@ func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) err log.Warnf("Creating %d filler pieces for sector %d", len(fillerSizes), sector.SectorNumber) } - fillerPieces, err := m.pledgeSector(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.existingPieceSizes(), fillerSizes...) + fillerPieces, err := m.padSector(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.existingPieceSizes(), fillerSizes...) if err != nil { return xerrors.Errorf("filling up the sector (%v): %w", fillerSizes, err) } @@ -78,6 +78,28 @@ func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) err return ctx.Send(SectorPacked{FillerPieces: fillerPieces}) } +func (m *Sealing) padSector(ctx context.Context, sectorID storage.SectorRef, existingPieceSizes []abi.UnpaddedPieceSize, sizes ...abi.UnpaddedPieceSize) ([]abi.PieceInfo, error) { + if len(sizes) == 0 { + return nil, nil + } + + log.Infof("Pledge %d, contains %+v", sectorID, existingPieceSizes) + + out := make([]abi.PieceInfo, len(sizes)) + for i, size := range sizes { + ppi, err := m.sealer.AddPiece(ctx, sectorID, existingPieceSizes, size, NewNullReader(size)) + if err != nil { + return nil, xerrors.Errorf("add piece: %w", err) + } + + existingPieceSizes = append(existingPieceSizes, size) + + out[i] = ppi + } + + return out, nil +} + func checkTicketExpired(sector SectorInfo, epoch abi.ChainEpoch) bool { return epoch-sector.TicketEpoch > MaxTicketAge // TODO: allow configuring expected seal durations } diff --git a/node/impl/storminer.go b/node/impl/storminer.go index aedf93530cb..267d8e27bad 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -122,7 +122,7 @@ func (sm *StorageMinerAPI) ActorSectorSize(ctx context.Context, addr address.Add } func (sm *StorageMinerAPI) PledgeSector(ctx context.Context) error { - return sm.Miner.PledgeSector() + return sm.Miner.PledgeSector(ctx) } func (sm *StorageMinerAPI) SectorsStatus(ctx context.Context, sid abi.SectorNumber, showOnChainInfo bool) (api.SectorInfo, error) { diff --git a/storage/sealing.go b/storage/sealing.go index d07a14810db..6ec5a412aeb 100644 --- a/storage/sealing.go +++ b/storage/sealing.go @@ -34,8 +34,8 @@ func (m *Miner) GetSectorInfo(sid abi.SectorNumber) (sealing.SectorInfo, error) return m.sealing.GetSectorInfo(sid) } -func (m *Miner) PledgeSector() error { - return m.sealing.PledgeSector() +func (m *Miner) PledgeSector(ctx context.Context) error { + return m.sealing.PledgeSector(ctx) } func (m *Miner) ForceSectorState(ctx context.Context, id abi.SectorNumber, state sealing.SectorState) error { From 46b880a6b98bd4b093529f24c4e0f0ae634704ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 16 Feb 2021 17:34:06 +0100 Subject: [PATCH 2/5] storageminer: Skip uninitialized sectors in SectorsList --- node/impl/storminer.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/node/impl/storminer.go b/node/impl/storminer.go index 267d8e27bad..9ddcfb9b7bd 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -219,6 +219,10 @@ func (sm *StorageMinerAPI) SectorsList(context.Context) ([]abi.SectorNumber, err out := make([]abi.SectorNumber, len(sectors)) for i, sector := range sectors { + if sector.State == sealing.UndefinedSectorState { + continue // sector ID not set yet + } + out[i] = sector.SectorNumber } return out, nil From f71976506967fd09bf67e56961fd72a93a1aa027 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 16 Feb 2021 17:41:29 +0100 Subject: [PATCH 3/5] storageminer: exit PledgeSector after sectors enter sealing pipeline --- extern/storage-sealing/garbage.go | 16 +++++++++------- node/impl/storminer.go | 24 +++++++++++++++++++++++- storage/sealing.go | 3 ++- 3 files changed, 34 insertions(+), 9 deletions(-) diff --git a/extern/storage-sealing/garbage.go b/extern/storage-sealing/garbage.go index 722e3e9bc51..398040e6ed0 100644 --- a/extern/storage-sealing/garbage.go +++ b/extern/storage-sealing/garbage.go @@ -4,40 +4,42 @@ import ( "context" "golang.org/x/xerrors" + + "github.com/filecoin-project/specs-storage/storage" ) -func (m *Sealing) PledgeSector(ctx context.Context) error { +func (m *Sealing) PledgeSector(ctx context.Context) (storage.SectorRef, error) { m.inputLk.Lock() defer m.inputLk.Unlock() cfg, err := m.getConfig() if err != nil { - return xerrors.Errorf("getting config: %w", err) + return storage.SectorRef{}, xerrors.Errorf("getting config: %w", err) } if cfg.MaxSealingSectors > 0 { if m.stats.curSealing() >= cfg.MaxSealingSectors { - return xerrors.Errorf("too many sectors sealing (curSealing: %d, max: %d)", m.stats.curSealing(), cfg.MaxSealingSectors) + return storage.SectorRef{}, xerrors.Errorf("too many sectors sealing (curSealing: %d, max: %d)", m.stats.curSealing(), cfg.MaxSealingSectors) } } spt, err := m.currentSealProof(ctx) if err != nil { - return xerrors.Errorf("getting seal proof type: %w", err) + return storage.SectorRef{}, xerrors.Errorf("getting seal proof type: %w", err) } sid, err := m.sc.Next() if err != nil { - return xerrors.Errorf("generating sector number: %w", err) + return storage.SectorRef{}, xerrors.Errorf("generating sector number: %w", err) } sectorID := m.minerSector(spt, sid) err = m.sealer.NewSector(ctx, sectorID) if err != nil { - return xerrors.Errorf("notifying sealer of the new sector: %w", err) + return storage.SectorRef{}, xerrors.Errorf("notifying sealer of the new sector: %w", err) } log.Infof("Creating CC sector %d", sid) - return m.sectors.Send(uint64(sid), SectorStartCC{ + return sectorID, m.sectors.Send(uint64(sid), SectorStartCC{ ID: sid, SectorType: spt, }) diff --git a/node/impl/storminer.go b/node/impl/storminer.go index 9ddcfb9b7bd..b9ecfb574b9 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -122,7 +122,29 @@ func (sm *StorageMinerAPI) ActorSectorSize(ctx context.Context, addr address.Add } func (sm *StorageMinerAPI) PledgeSector(ctx context.Context) error { - return sm.Miner.PledgeSector(ctx) + sr, err := sm.Miner.PledgeSector(ctx) + if err != nil { + return err + } + + // wait for the sector to enter the Packing state + // TODO: instead of polling implement some pubsub-type thing in storagefsm + for { + info, err := sm.Miner.GetSectorInfo(sr.ID.Number) + if err != nil { + return xerrors.Errorf("getting pledged sector info: %w", err) + } + + if info.State != sealing.UndefinedSectorState { + return nil + } + + select { + case <-time.After(10 * time.Millisecond): + case <-ctx.Done(): + return ctx.Err() + } + } } func (sm *StorageMinerAPI) SectorsStatus(ctx context.Context, sid abi.SectorNumber, showOnChainInfo bool) (api.SectorInfo, error) { diff --git a/storage/sealing.go b/storage/sealing.go index 6ec5a412aeb..8981c373866 100644 --- a/storage/sealing.go +++ b/storage/sealing.go @@ -8,6 +8,7 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/specs-storage/storage" sealing "github.com/filecoin-project/lotus/extern/storage-sealing" ) @@ -34,7 +35,7 @@ func (m *Miner) GetSectorInfo(sid abi.SectorNumber) (sealing.SectorInfo, error) return m.sealing.GetSectorInfo(sid) } -func (m *Miner) PledgeSector(ctx context.Context) error { +func (m *Miner) PledgeSector(ctx context.Context) (storage.SectorRef, error) { return m.sealing.PledgeSector(ctx) } From fd90c030183d63172597cbd57fd6ff8502897795 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 16 Feb 2021 19:16:35 +0100 Subject: [PATCH 4/5] Roturn SectorID from PledgeSector --- api/api_storage.go | 2 +- api/apistruct/struct.go | 4 ++-- cmd/lotus-storage-miner/sectors.go | 9 ++++++++- documentation/en/api-methods-miner.md | 8 +++++++- node/impl/storminer.go | 10 +++++----- 5 files changed, 23 insertions(+), 10 deletions(-) diff --git a/api/api_storage.go b/api/api_storage.go index eb4584e103a..3f814343384 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -36,7 +36,7 @@ type StorageMiner interface { MiningBase(context.Context) (*types.TipSet, error) // Temp api for testing - PledgeSector(context.Context) error + PledgeSector(context.Context) (abi.SectorID, error) // Get the status of a given sector by ID SectorsStatus(ctx context.Context, sid abi.SectorNumber, showOnChainInfo bool) (SectorInfo, error) diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index ded34ac5ad8..53d7dc24ace 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -304,7 +304,7 @@ type StorageMinerStruct struct { MarketPendingDeals func(ctx context.Context) (api.PendingDealInfo, error) `perm:"write"` MarketPublishPendingDeals func(ctx context.Context) error `perm:"admin"` - PledgeSector func(context.Context) error `perm:"write"` + PledgeSector func(context.Context) (abi.SectorID, error) `perm:"write"` SectorsStatus func(ctx context.Context, sid abi.SectorNumber, showOnChainInfo bool) (api.SectorInfo, error) `perm:"read"` SectorsList func(context.Context) ([]abi.SectorNumber, error) `perm:"read"` @@ -1274,7 +1274,7 @@ func (c *StorageMinerStruct) ActorAddressConfig(ctx context.Context) (api.Addres return c.Internal.ActorAddressConfig(ctx) } -func (c *StorageMinerStruct) PledgeSector(ctx context.Context) error { +func (c *StorageMinerStruct) PledgeSector(ctx context.Context) (abi.SectorID, error) { return c.Internal.PledgeSector(ctx) } diff --git a/cmd/lotus-storage-miner/sectors.go b/cmd/lotus-storage-miner/sectors.go index fb9358382fe..ad7d6106853 100644 --- a/cmd/lotus-storage-miner/sectors.go +++ b/cmd/lotus-storage-miner/sectors.go @@ -55,7 +55,14 @@ var sectorsPledgeCmd = &cli.Command{ defer closer() ctx := lcli.ReqContext(cctx) - return nodeApi.PledgeSector(ctx) + id, err := nodeApi.PledgeSector(ctx) + if err != nil { + return err + } + + fmt.Println("Created CC sector: ", id.Number) + + return nil }, } diff --git a/documentation/en/api-methods-miner.md b/documentation/en/api-methods-miner.md index aa84ebdf255..e72a53f6213 100644 --- a/documentation/en/api-methods-miner.md +++ b/documentation/en/api-methods-miner.md @@ -1142,7 +1142,13 @@ Perms: write Inputs: `null` -Response: `{}` +Response: +```json +{ + "Miner": 1000, + "Number": 9 +} +``` ## Return diff --git a/node/impl/storminer.go b/node/impl/storminer.go index b9ecfb574b9..cde168bea4f 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -121,10 +121,10 @@ func (sm *StorageMinerAPI) ActorSectorSize(ctx context.Context, addr address.Add return mi.SectorSize, nil } -func (sm *StorageMinerAPI) PledgeSector(ctx context.Context) error { +func (sm *StorageMinerAPI) PledgeSector(ctx context.Context) (abi.SectorID, error) { sr, err := sm.Miner.PledgeSector(ctx) if err != nil { - return err + return abi.SectorID{}, err } // wait for the sector to enter the Packing state @@ -132,17 +132,17 @@ func (sm *StorageMinerAPI) PledgeSector(ctx context.Context) error { for { info, err := sm.Miner.GetSectorInfo(sr.ID.Number) if err != nil { - return xerrors.Errorf("getting pledged sector info: %w", err) + return abi.SectorID{}, xerrors.Errorf("getting pledged sector info: %w", err) } if info.State != sealing.UndefinedSectorState { - return nil + return sr.ID, nil } select { case <-time.After(10 * time.Millisecond): case <-ctx.Done(): - return ctx.Err() + return abi.SectorID{}, ctx.Err() } } } From a8464a345ba66c90a0fab31aab48b92134308d60 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 16 Feb 2021 19:19:16 +0100 Subject: [PATCH 5/5] Fix tests --- api/test/tape.go | 19 ++----------------- api/test/window_post.go | 2 +- 2 files changed, 3 insertions(+), 18 deletions(-) diff --git a/api/test/tape.go b/api/test/tape.go index 466bdd829a5..b52162dd65b 100644 --- a/api/test/tape.go +++ b/api/test/tape.go @@ -6,7 +6,6 @@ import ( "testing" "time" - "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/network" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" @@ -73,23 +72,9 @@ func testTapeFix(t *testing.T, b APIBuilder, blocktime time.Duration, after bool <-done }() - err = miner.PledgeSector(ctx) + sid, err := miner.PledgeSector(ctx) require.NoError(t, err) - // Wait till done. - var sectorNo abi.SectorNumber - for { - s, err := miner.SectorsList(ctx) // Note - the test builder doesn't import genesis sectors into FSM - require.NoError(t, err) - fmt.Printf("Sectors: %d\n", len(s)) - if len(s) == 1 { - sectorNo = s[0] - break - } - - build.Clock.Sleep(100 * time.Millisecond) - } - fmt.Printf("All sectors is fsm\n") // If before, we expect the precommit to fail @@ -101,7 +86,7 @@ func testTapeFix(t *testing.T, b APIBuilder, blocktime time.Duration, after bool } for { - st, err := miner.SectorsStatus(ctx, sectorNo, false) + st, err := miner.SectorsStatus(ctx, sid.Number, false) require.NoError(t, err) if st.State == successState { break diff --git a/api/test/window_post.go b/api/test/window_post.go index 99d48083688..ce42318b2b2 100644 --- a/api/test/window_post.go +++ b/api/test/window_post.go @@ -162,7 +162,7 @@ func pledgeSectors(t *testing.T, ctx context.Context, miner TestStorageNode, n, log.Errorf("WAIT") } log.Errorf("PLEDGING %d", i) - err := miner.PledgeSector(ctx) + _, err := miner.PledgeSector(ctx) require.NoError(t, err) }