Skip to content

Commit

Permalink
Merge pull request filecoin-project#5612 from filecoin-project/feat/c…
Browse files Browse the repository at this point in the history
…c-fsm-cleanup

storagefsm: Cleanup CC sector creation
  • Loading branch information
magik6k authored and bibibong committed Mar 21, 2021
1 parent 3bf7c1e commit e4ab300
Show file tree
Hide file tree
Showing 11 changed files with 90 additions and 115 deletions.
2 changes: 1 addition & 1 deletion api/api_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type StorageMiner interface {
MiningBase(context.Context) (*types.TipSet, error)

// Temp api for testing
PledgeSector(context.Context) error
PledgeSector(context.Context) (abi.SectorID, error)

// Get the status of a given sector by ID
SectorsStatus(ctx context.Context, sid abi.SectorNumber, showOnChainInfo bool) (SectorInfo, error)
Expand Down
4 changes: 2 additions & 2 deletions api/apistruct/struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ type StorageMinerStruct struct {
MarketPendingDeals func(ctx context.Context) (api.PendingDealInfo, error) `perm:"write"`
MarketPublishPendingDeals func(ctx context.Context) error `perm:"admin"`

PledgeSector func(context.Context) error `perm:"write"`
PledgeSector func(context.Context) (abi.SectorID, error) `perm:"write"`

SectorsStatus func(ctx context.Context, sid abi.SectorNumber, showOnChainInfo bool) (api.SectorInfo, error) `perm:"read"`
SectorsList func(context.Context) ([]abi.SectorNumber, error) `perm:"read"`
Expand Down Expand Up @@ -1419,7 +1419,7 @@ func (c *StorageMinerStruct) ActorAddressConfig(ctx context.Context) (api.Addres
return c.Internal.ActorAddressConfig(ctx)
}

func (c *StorageMinerStruct) PledgeSector(ctx context.Context) error {
func (c *StorageMinerStruct) PledgeSector(ctx context.Context) (abi.SectorID, error) {
return c.Internal.PledgeSector(ctx)
}

Expand Down
18 changes: 2 additions & 16 deletions api/test/tape.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,23 +76,9 @@ func testTapeFix(t *testing.T, b APIBuilder, blocktime time.Duration, after bool
<-done
}()
err = miner.PledgeSector(ctx)
sid, err := miner.PledgeSector(ctx)
require.NoError(t, err)
// Wait till done.
var sectorNo abi.SectorNumber
for {
s, err := miner.SectorsList(ctx) // Note - the test builder doesn't import genesis sectors into FSM
require.NoError(t, err)
fmt.Printf("Sectors: %d\n", len(s))
if len(s) == 1 {
sectorNo = s[0]
break
}
build.Clock.Sleep(100 * time.Millisecond)
}
fmt.Printf("All sectors is fsm\n")
// If before, we expect the precommit to fail
Expand All @@ -104,7 +90,7 @@ func testTapeFix(t *testing.T, b APIBuilder, blocktime time.Duration, after bool
}
for {
st, err := miner.SectorsStatus(ctx, sectorNo, false)
st, err := miner.SectorsStatus(ctx, sid.Number, false)
require.NoError(t, err)
if st.State == successState {
break
Expand Down
2 changes: 1 addition & 1 deletion api/test/window_post.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func pledgeSectors(t *testing.T, ctx context.Context, miner TestStorageNode, n,
log.Errorf("WAIT")
}
log.Errorf("PLEDGING %d", i)
err := miner.PledgeSector(ctx)
_, err := miner.PledgeSector(ctx)
require.NoError(t, err)
}

Expand Down
8 changes: 7 additions & 1 deletion documentation/en/api-methods-miner.md
Original file line number Diff line number Diff line change
Expand Up @@ -1142,7 +1142,13 @@ Perms: write

Inputs: `null`

Response: `{}`
Response:
```json
{
"Miner": 1000,
"Number": 9
}
```

## Return

Expand Down
2 changes: 0 additions & 2 deletions extern/storage-sealing/fsm_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,10 @@ func (evt SectorStart) apply(state *SectorInfo) {
type SectorStartCC struct {
ID abi.SectorNumber
SectorType abi.RegisteredSealProof
Pieces []Piece
}

func (evt SectorStartCC) apply(state *SectorInfo) {
state.SectorNumber = evt.ID
state.Pieces = evt.Pieces
state.SectorType = evt.SectorType
}

Expand Down
95 changes: 23 additions & 72 deletions extern/storage-sealing/garbage.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,91 +5,42 @@ import (

"golang.org/x/xerrors"

"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-storage/storage"
)

func (m *Sealing) pledgeSector(ctx context.Context, sectorID storage.SectorRef, existingPieceSizes []abi.UnpaddedPieceSize, sizes ...abi.UnpaddedPieceSize) ([]abi.PieceInfo, error) {
if len(sizes) == 0 {
return nil, nil
}

log.Infof("Pledge %d, contains %+v", sectorID, existingPieceSizes)

out := make([]abi.PieceInfo, len(sizes))
for i, size := range sizes {
ppi, err := m.sealer.AddPiece(ctx, sectorID, existingPieceSizes, size, NewNullReader(size))
if err != nil {
return nil, xerrors.Errorf("add piece: %w", err)
}

existingPieceSizes = append(existingPieceSizes, size)

out[i] = ppi
}
func (m *Sealing) PledgeSector(ctx context.Context) (storage.SectorRef, error) {
m.inputLk.Lock()
defer m.inputLk.Unlock()

return out, nil
}

func (m *Sealing) PledgeSector() error {
cfg, err := m.getConfig()
if err != nil {
return xerrors.Errorf("getting config: %w", err)
return storage.SectorRef{}, xerrors.Errorf("getting config: %w", err)
}

if cfg.MaxSealingSectors > 0 {
if m.stats.curSealing() >= cfg.MaxSealingSectors {
return xerrors.Errorf("too many sectors sealing (curSealing: %d, max: %d)", m.stats.curSealing(), cfg.MaxSealingSectors)
return storage.SectorRef{}, xerrors.Errorf("too many sectors sealing (curSealing: %d, max: %d)", m.stats.curSealing(), cfg.MaxSealingSectors)
}
}

go func() {
ctx := context.TODO() // we can't use the context from command which invokes
// this, as we run everything here async, and it's cancelled when the
// command exits

spt, err := m.currentSealProof(ctx)
if err != nil {
log.Errorf("%+v", err)
return
}

size, err := spt.SectorSize()
if err != nil {
log.Errorf("%+v", err)
return
}

sid, err := m.sc.Next()
if err != nil {
log.Errorf("%+v", err)
return
}
sectorID := m.minerSector(spt, sid)
err = m.sealer.NewSector(ctx, sectorID)
if err != nil {
log.Errorf("%+v", err)
return
}

pieces, err := m.pledgeSector(ctx, sectorID, []abi.UnpaddedPieceSize{}, abi.PaddedPieceSize(size).Unpadded())
if err != nil {
log.Errorf("%+v", err)
return
}
spt, err := m.currentSealProof(ctx)
if err != nil {
return storage.SectorRef{}, xerrors.Errorf("getting seal proof type: %w", err)
}

ps := make([]Piece, len(pieces))
for idx := range ps {
ps[idx] = Piece{
Piece: pieces[idx],
DealInfo: nil,
}
}
sid, err := m.sc.Next()
if err != nil {
return storage.SectorRef{}, xerrors.Errorf("generating sector number: %w", err)
}
sectorID := m.minerSector(spt, sid)
err = m.sealer.NewSector(ctx, sectorID)
if err != nil {
return storage.SectorRef{}, xerrors.Errorf("notifying sealer of the new sector: %w", err)
}

if err := m.newSectorCC(ctx, sid, ps); err != nil {
log.Errorf("%+v", err)
return
}
}()
return nil
log.Infof("Creating CC sector %d", sid)
return sectorID, m.sectors.Send(uint64(sid), SectorStartCC{
ID: sid,
SectorType: spt,
})
}
15 changes: 0 additions & 15 deletions extern/storage-sealing/sealing.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,21 +199,6 @@ func (m *Sealing) TerminatePending(ctx context.Context) ([]abi.SectorID, error)
return m.terminator.Pending(ctx)
}

// newSectorCC accepts a slice of pieces with no deal (junk data)
func (m *Sealing) newSectorCC(ctx context.Context, sid abi.SectorNumber, pieces []Piece) error {
spt, err := m.currentSealProof(ctx)
if err != nil {
return xerrors.Errorf("getting current seal proof type: %w", err)
}

log.Infof("Creating CC sector %d", sid)
return m.sectors.Send(uint64(sid), SectorStartCC{
ID: sid,
Pieces: pieces,
SectorType: spt,
})
}

func (m *Sealing) currentSealProof(ctx context.Context) (abi.RegisteredSealProof, error) {
mi, err := m.api.StateMinerInfo(ctx, m.maddr, nil)
if err != nil {
Expand Down
24 changes: 23 additions & 1 deletion extern/storage-sealing/states_sealing.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,36 @@ func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) err
log.Warnf("Creating %d filler pieces for sector %d", len(fillerSizes), sector.SectorNumber)
}

fillerPieces, err := m.pledgeSector(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.existingPieceSizes(), fillerSizes...)
fillerPieces, err := m.padSector(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.existingPieceSizes(), fillerSizes...)
if err != nil {
return xerrors.Errorf("filling up the sector (%v): %w", fillerSizes, err)
}

return ctx.Send(SectorPacked{FillerPieces: fillerPieces})
}

func (m *Sealing) padSector(ctx context.Context, sectorID storage.SectorRef, existingPieceSizes []abi.UnpaddedPieceSize, sizes ...abi.UnpaddedPieceSize) ([]abi.PieceInfo, error) {
if len(sizes) == 0 {
return nil, nil
}

log.Infof("Pledge %d, contains %+v", sectorID, existingPieceSizes)

out := make([]abi.PieceInfo, len(sizes))
for i, size := range sizes {
ppi, err := m.sealer.AddPiece(ctx, sectorID, existingPieceSizes, size, NewNullReader(size))
if err != nil {
return nil, xerrors.Errorf("add piece: %w", err)
}

existingPieceSizes = append(existingPieceSizes, size)

out[i] = ppi
}

return out, nil
}

func checkTicketExpired(sector SectorInfo, epoch abi.ChainEpoch) bool {
return epoch-sector.TicketEpoch > MaxTicketAge // TODO: allow configuring expected seal durations
}
Expand Down
30 changes: 28 additions & 2 deletions node/impl/storminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,30 @@ func (sm *StorageMinerAPI) ActorSectorSize(ctx context.Context, addr address.Add
return mi.SectorSize, nil
}

func (sm *StorageMinerAPI) PledgeSector(ctx context.Context) error {
return sm.Miner.PledgeSector()
func (sm *StorageMinerAPI) PledgeSector(ctx context.Context) (abi.SectorID, error) {
sr, err := sm.Miner.PledgeSector(ctx)
if err != nil {
return abi.SectorID{}, err
}

// wait for the sector to enter the Packing state
// TODO: instead of polling implement some pubsub-type thing in storagefsm
for {
info, err := sm.Miner.GetSectorInfo(sr.ID.Number)
if err != nil {
return abi.SectorID{}, xerrors.Errorf("getting pledged sector info: %w", err)
}

if info.State != sealing.UndefinedSectorState {
return sr.ID, nil
}

select {
case <-time.After(10 * time.Millisecond):
case <-ctx.Done():
return abi.SectorID{}, ctx.Err()
}
}
}

func (sm *StorageMinerAPI) SectorsStatus(ctx context.Context, sid abi.SectorNumber, showOnChainInfo bool) (api.SectorInfo, error) {
Expand Down Expand Up @@ -214,6 +236,10 @@ func (sm *StorageMinerAPI) SectorsList(context.Context) ([]abi.SectorNumber, err

out := make([]abi.SectorNumber, len(sectors))
for i, sector := range sectors {
if sector.State == sealing.UndefinedSectorState {
continue // sector ID not set yet
}

out[i] = sector.SectorNumber
}
return out, nil
Expand Down
5 changes: 3 additions & 2 deletions storage/sealing.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-storage/storage"

sealing "github.com/EpiK-Protocol/go-epik/extern/storage-sealing"
)
Expand All @@ -34,8 +35,8 @@ func (m *Miner) GetSectorInfo(sid abi.SectorNumber) (sealing.SectorInfo, error)
return m.sealing.GetSectorInfo(sid)
}

func (m *Miner) PledgeSector() error {
return m.sealing.PledgeSector()
func (m *Miner) PledgeSector(ctx context.Context) (storage.SectorRef, error) {
return m.sealing.PledgeSector(ctx)
}

func (m *Miner) ForceSectorState(ctx context.Context, id abi.SectorNumber, state sealing.SectorState) error {
Expand Down

0 comments on commit e4ab300

Please sign in to comment.