Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storagefsm: Cleanup CC sector creation #5612

Merged
merged 5 commits into from
Feb 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/api_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type StorageMiner interface {
MiningBase(context.Context) (*types.TipSet, error)

// Temp api for testing
PledgeSector(context.Context) error
PledgeSector(context.Context) (abi.SectorID, error)

// Get the status of a given sector by ID
SectorsStatus(ctx context.Context, sid abi.SectorNumber, showOnChainInfo bool) (SectorInfo, error)
Expand Down
4 changes: 2 additions & 2 deletions api/apistruct/struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ type StorageMinerStruct struct {
MarketPendingDeals func(ctx context.Context) (api.PendingDealInfo, error) `perm:"write"`
MarketPublishPendingDeals func(ctx context.Context) error `perm:"admin"`

PledgeSector func(context.Context) error `perm:"write"`
PledgeSector func(context.Context) (abi.SectorID, error) `perm:"write"`

SectorsStatus func(ctx context.Context, sid abi.SectorNumber, showOnChainInfo bool) (api.SectorInfo, error) `perm:"read"`
SectorsList func(context.Context) ([]abi.SectorNumber, error) `perm:"read"`
Expand Down Expand Up @@ -1274,7 +1274,7 @@ func (c *StorageMinerStruct) ActorAddressConfig(ctx context.Context) (api.Addres
return c.Internal.ActorAddressConfig(ctx)
}

func (c *StorageMinerStruct) PledgeSector(ctx context.Context) error {
func (c *StorageMinerStruct) PledgeSector(ctx context.Context) (abi.SectorID, error) {
return c.Internal.PledgeSector(ctx)
}

Expand Down
19 changes: 2 additions & 17 deletions api/test/tape.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"testing"
"time"

"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/network"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
Expand Down Expand Up @@ -73,23 +72,9 @@ func testTapeFix(t *testing.T, b APIBuilder, blocktime time.Duration, after bool
<-done
}()

err = miner.PledgeSector(ctx)
sid, err := miner.PledgeSector(ctx)
require.NoError(t, err)

// Wait till done.
var sectorNo abi.SectorNumber
for {
s, err := miner.SectorsList(ctx) // Note - the test builder doesn't import genesis sectors into FSM
require.NoError(t, err)
fmt.Printf("Sectors: %d\n", len(s))
if len(s) == 1 {
sectorNo = s[0]
break
}

build.Clock.Sleep(100 * time.Millisecond)
}

fmt.Printf("All sectors is fsm\n")

// If before, we expect the precommit to fail
Expand All @@ -101,7 +86,7 @@ func testTapeFix(t *testing.T, b APIBuilder, blocktime time.Duration, after bool
}

for {
st, err := miner.SectorsStatus(ctx, sectorNo, false)
st, err := miner.SectorsStatus(ctx, sid.Number, false)
require.NoError(t, err)
if st.State == successState {
break
Expand Down
2 changes: 1 addition & 1 deletion api/test/window_post.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func pledgeSectors(t *testing.T, ctx context.Context, miner TestStorageNode, n,
log.Errorf("WAIT")
}
log.Errorf("PLEDGING %d", i)
err := miner.PledgeSector(ctx)
_, err := miner.PledgeSector(ctx)
require.NoError(t, err)
}

Expand Down
9 changes: 8 additions & 1 deletion cmd/lotus-storage-miner/sectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,14 @@ var sectorsPledgeCmd = &cli.Command{
defer closer()
ctx := lcli.ReqContext(cctx)

return nodeApi.PledgeSector(ctx)
id, err := nodeApi.PledgeSector(ctx)
if err != nil {
return err
}

fmt.Println("Created CC sector: ", id.Number)

return nil
},
}

Expand Down
8 changes: 7 additions & 1 deletion documentation/en/api-methods-miner.md
Original file line number Diff line number Diff line change
Expand Up @@ -1142,7 +1142,13 @@ Perms: write

Inputs: `null`

Response: `{}`
Response:
```json
{
"Miner": 1000,
"Number": 9
}
```

## Return

Expand Down
2 changes: 0 additions & 2 deletions extern/storage-sealing/fsm_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,10 @@ func (evt SectorStart) apply(state *SectorInfo) {
type SectorStartCC struct {
ID abi.SectorNumber
SectorType abi.RegisteredSealProof
Pieces []Piece
}

func (evt SectorStartCC) apply(state *SectorInfo) {
state.SectorNumber = evt.ID
state.Pieces = evt.Pieces
state.SectorType = evt.SectorType
}

Expand Down
95 changes: 23 additions & 72 deletions extern/storage-sealing/garbage.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,91 +5,42 @@ import (

"golang.org/x/xerrors"

"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-storage/storage"
)

func (m *Sealing) pledgeSector(ctx context.Context, sectorID storage.SectorRef, existingPieceSizes []abi.UnpaddedPieceSize, sizes ...abi.UnpaddedPieceSize) ([]abi.PieceInfo, error) {
if len(sizes) == 0 {
return nil, nil
}

log.Infof("Pledge %d, contains %+v", sectorID, existingPieceSizes)

out := make([]abi.PieceInfo, len(sizes))
for i, size := range sizes {
ppi, err := m.sealer.AddPiece(ctx, sectorID, existingPieceSizes, size, NewNullReader(size))
if err != nil {
return nil, xerrors.Errorf("add piece: %w", err)
}

existingPieceSizes = append(existingPieceSizes, size)

out[i] = ppi
}
func (m *Sealing) PledgeSector(ctx context.Context) (storage.SectorRef, error) {
m.inputLk.Lock()
defer m.inputLk.Unlock()

return out, nil
}

func (m *Sealing) PledgeSector() error {
cfg, err := m.getConfig()
if err != nil {
return xerrors.Errorf("getting config: %w", err)
return storage.SectorRef{}, xerrors.Errorf("getting config: %w", err)
}

if cfg.MaxSealingSectors > 0 {
if m.stats.curSealing() >= cfg.MaxSealingSectors {
return xerrors.Errorf("too many sectors sealing (curSealing: %d, max: %d)", m.stats.curSealing(), cfg.MaxSealingSectors)
return storage.SectorRef{}, xerrors.Errorf("too many sectors sealing (curSealing: %d, max: %d)", m.stats.curSealing(), cfg.MaxSealingSectors)
}
}

go func() {
ctx := context.TODO() // we can't use the context from command which invokes
// this, as we run everything here async, and it's cancelled when the
// command exits

spt, err := m.currentSealProof(ctx)
if err != nil {
log.Errorf("%+v", err)
return
}

size, err := spt.SectorSize()
if err != nil {
log.Errorf("%+v", err)
return
}

sid, err := m.sc.Next()
if err != nil {
log.Errorf("%+v", err)
return
}
sectorID := m.minerSector(spt, sid)
err = m.sealer.NewSector(ctx, sectorID)
if err != nil {
log.Errorf("%+v", err)
return
}

pieces, err := m.pledgeSector(ctx, sectorID, []abi.UnpaddedPieceSize{}, abi.PaddedPieceSize(size).Unpadded())
if err != nil {
log.Errorf("%+v", err)
return
}
spt, err := m.currentSealProof(ctx)
if err != nil {
return storage.SectorRef{}, xerrors.Errorf("getting seal proof type: %w", err)
}

ps := make([]Piece, len(pieces))
for idx := range ps {
ps[idx] = Piece{
Piece: pieces[idx],
DealInfo: nil,
}
}
sid, err := m.sc.Next()
if err != nil {
return storage.SectorRef{}, xerrors.Errorf("generating sector number: %w", err)
}
sectorID := m.minerSector(spt, sid)
err = m.sealer.NewSector(ctx, sectorID)
if err != nil {
return storage.SectorRef{}, xerrors.Errorf("notifying sealer of the new sector: %w", err)
}

if err := m.newSectorCC(ctx, sid, ps); err != nil {
log.Errorf("%+v", err)
return
}
}()
return nil
log.Infof("Creating CC sector %d", sid)
return sectorID, m.sectors.Send(uint64(sid), SectorStartCC{
ID: sid,
SectorType: spt,
})
}
15 changes: 0 additions & 15 deletions extern/storage-sealing/sealing.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,21 +202,6 @@ func (m *Sealing) TerminatePending(ctx context.Context) ([]abi.SectorID, error)
return m.terminator.Pending(ctx)
}

// newSectorCC accepts a slice of pieces with no deal (junk data)
func (m *Sealing) newSectorCC(ctx context.Context, sid abi.SectorNumber, pieces []Piece) error {
spt, err := m.currentSealProof(ctx)
if err != nil {
return xerrors.Errorf("getting current seal proof type: %w", err)
}

log.Infof("Creating CC sector %d", sid)
return m.sectors.Send(uint64(sid), SectorStartCC{
ID: sid,
Pieces: pieces,
SectorType: spt,
})
}

func (m *Sealing) currentSealProof(ctx context.Context) (abi.RegisteredSealProof, error) {
mi, err := m.api.StateMinerInfo(ctx, m.maddr, nil)
if err != nil {
Expand Down
24 changes: 23 additions & 1 deletion extern/storage-sealing/states_sealing.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,36 @@ func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) err
log.Warnf("Creating %d filler pieces for sector %d", len(fillerSizes), sector.SectorNumber)
}

fillerPieces, err := m.pledgeSector(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.existingPieceSizes(), fillerSizes...)
fillerPieces, err := m.padSector(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.existingPieceSizes(), fillerSizes...)
if err != nil {
return xerrors.Errorf("filling up the sector (%v): %w", fillerSizes, err)
}

return ctx.Send(SectorPacked{FillerPieces: fillerPieces})
}

func (m *Sealing) padSector(ctx context.Context, sectorID storage.SectorRef, existingPieceSizes []abi.UnpaddedPieceSize, sizes ...abi.UnpaddedPieceSize) ([]abi.PieceInfo, error) {
if len(sizes) == 0 {
return nil, nil
}

log.Infof("Pledge %d, contains %+v", sectorID, existingPieceSizes)

out := make([]abi.PieceInfo, len(sizes))
for i, size := range sizes {
ppi, err := m.sealer.AddPiece(ctx, sectorID, existingPieceSizes, size, NewNullReader(size))
if err != nil {
return nil, xerrors.Errorf("add piece: %w", err)
}

existingPieceSizes = append(existingPieceSizes, size)

out[i] = ppi
}

return out, nil
}

func checkTicketExpired(sector SectorInfo, epoch abi.ChainEpoch) bool {
return epoch-sector.TicketEpoch > MaxTicketAge // TODO: allow configuring expected seal durations
}
Expand Down
30 changes: 28 additions & 2 deletions node/impl/storminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,30 @@ func (sm *StorageMinerAPI) ActorSectorSize(ctx context.Context, addr address.Add
return mi.SectorSize, nil
}

func (sm *StorageMinerAPI) PledgeSector(ctx context.Context) error {
return sm.Miner.PledgeSector()
func (sm *StorageMinerAPI) PledgeSector(ctx context.Context) (abi.SectorID, error) {
sr, err := sm.Miner.PledgeSector(ctx)
if err != nil {
return abi.SectorID{}, err
}

// wait for the sector to enter the Packing state
// TODO: instead of polling implement some pubsub-type thing in storagefsm
for {
info, err := sm.Miner.GetSectorInfo(sr.ID.Number)
if err != nil {
return abi.SectorID{}, xerrors.Errorf("getting pledged sector info: %w", err)
}

if info.State != sealing.UndefinedSectorState {
return sr.ID, nil
}

select {
case <-time.After(10 * time.Millisecond):
case <-ctx.Done():
return abi.SectorID{}, ctx.Err()
}
}
}

func (sm *StorageMinerAPI) SectorsStatus(ctx context.Context, sid abi.SectorNumber, showOnChainInfo bool) (api.SectorInfo, error) {
Expand Down Expand Up @@ -219,6 +241,10 @@ func (sm *StorageMinerAPI) SectorsList(context.Context) ([]abi.SectorNumber, err

out := make([]abi.SectorNumber, len(sectors))
for i, sector := range sectors {
if sector.State == sealing.UndefinedSectorState {
continue // sector ID not set yet
}

out[i] = sector.SectorNumber
}
return out, nil
Expand Down
5 changes: 3 additions & 2 deletions storage/sealing.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-storage/storage"

sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
)
Expand All @@ -34,8 +35,8 @@ func (m *Miner) GetSectorInfo(sid abi.SectorNumber) (sealing.SectorInfo, error)
return m.sealing.GetSectorInfo(sid)
}

func (m *Miner) PledgeSector() error {
return m.sealing.PledgeSector()
func (m *Miner) PledgeSector(ctx context.Context) (storage.SectorRef, error) {
return m.sealing.PledgeSector(ctx)
}

func (m *Miner) ForceSectorState(ctx context.Context, id abi.SectorNumber, state sealing.SectorState) error {
Expand Down