Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage module: add go docs and minor code quality refactors #6259

Merged
merged 13 commits into from
May 20, 2021
11 changes: 11 additions & 0 deletions chain/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,14 @@ func (cs *ChainStore) NearestCommonAncestor(a, b *types.TipSet) (*types.TipSet,
return cs.LoadTipSet(l[len(l)-1].Parents())
}

// ReorgOps takes two tipsets (which can be at different heights), and walks
// their corresponding chains backwards one step at a time until we find
// a common ancestor. It then returns the respective chain segments that fork
// from the identified ancestor, in reverse order, where the first element of
// each slice is the supplied tipset, and the last element is the common
// ancestor.
//
// If an error happens along the way, we return the error with nil slices.
func (cs *ChainStore) ReorgOps(a, b *types.TipSet) ([]*types.TipSet, []*types.TipSet, error) {
return ReorgOps(cs.LoadTipSet, a, b)
}
Expand Down Expand Up @@ -1235,6 +1243,9 @@ func (cs *ChainStore) ReadMsgMetaCids(mmc cid.Cid) ([]cid.Cid, []cid.Cid, error)
return blscids, secpkcids, nil
}

// GetPath returns the sequence of atomic head change operations that
// need to be applied in order to switch the head of the chain from the `from`
// tipset to the `to` tipset.
func (cs *ChainStore) GetPath(ctx context.Context, from types.TipSetKey, to types.TipSetKey) ([]*api.HeadChange, error) {
fts, err := cs.LoadTipSet(from)
if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion extern/storage-sealing/precommit_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ type BasicPreCommitPolicy struct {
duration abi.ChainEpoch
}

// NewBasicPreCommitPolicy produces a BasicPreCommitPolicy
// NewBasicPreCommitPolicy produces a BasicPreCommitPolicy.
//
// The provided duration is used as the default sector expiry when the sector
// contains no deals. The proving boundary is used to adjust/align the sector's expiration.
func NewBasicPreCommitPolicy(api Chain, duration abi.ChainEpoch, provingBoundary abi.ChainEpoch) BasicPreCommitPolicy {
return BasicPreCommitPolicy{
api: api,
Expand Down
4 changes: 2 additions & 2 deletions storage/adapter_storage_miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ import (
var _ sealing.SealingAPI = new(SealingAPIAdapter)

type SealingAPIAdapter struct {
delegate storageMinerApi
delegate fullNodeFilteredAPI
}

func NewSealingAPIAdapter(api storageMinerApi) SealingAPIAdapter {
func NewSealingAPIAdapter(api fullNodeFilteredAPI) SealingAPIAdapter {
return SealingAPIAdapter{delegate: api}
}

Expand Down
75 changes: 59 additions & 16 deletions storage/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@ import (
"errors"
"time"

"github.com/filecoin-project/go-state-types/network"

"github.com/filecoin-project/go-state-types/dline"

"github.com/filecoin-project/go-bitfield"

"github.com/ipfs/go-cid"
Expand All @@ -20,9 +16,13 @@ import (
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/go-state-types/dline"
"github.com/filecoin-project/go-state-types/network"

"github.com/filecoin-project/specs-storage/storage"

sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/specs-storage/storage"

"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/v1api"
Expand All @@ -41,8 +41,16 @@ import (

var log = logging.Logger("storageminer")

// Miner is the central miner entrypoint object inside Lotus. It is
// instantiated in the node builder, along with the WindowPoStScheduler.
//
// This object is the owner of the sealing pipeline. Most of the actual logic
// lives in the storage-sealing module (sealing.Sealing), and the Miner object
// exposes it to the rest of the system by proxying calls.
Comment on lines +47 to +49
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is mostly tech debt, but it's really tricky to untangle storage-sealing from this object, which is why it's here

//
// Miner#Run starts the sealing FSM.
type Miner struct {
api storageMinerApi
api fullNodeFilteredAPI
feeCfg config.MinerFeeConfig
h host.Host
sealer sectorstorage.SectorManager
Expand Down Expand Up @@ -70,7 +78,9 @@ type SealingStateEvt struct {
Error string
}

type storageMinerApi interface {
// fullNodeFilteredAPI is the subset of the full node API the Miner needs from
// a Lotus full node.
type fullNodeFilteredAPI interface {
// Call a read only method on actors (no interaction with the chain required)
StateCall(context.Context, *types.Message, types.TipSetKey) (*api.InvocResult, error)
StateMinerSectors(context.Context, address.Address, *bitfield.BitField, types.TipSetKey) ([]*miner.SectorOnChainInfo, error)
Expand Down Expand Up @@ -116,7 +126,18 @@ type storageMinerApi interface {
WalletHas(context.Context, address.Address) (bool, error)
}

func NewMiner(api storageMinerApi, maddr address.Address, h host.Host, ds datastore.Batching, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingConfigFunc, feeCfg config.MinerFeeConfig, journal journal.Journal, as *AddressSelector) (*Miner, error) {
// NewMiner creates a new Miner object.
func NewMiner(api fullNodeFilteredAPI,
maddr address.Address,
h host.Host,
ds datastore.Batching,
sealer sectorstorage.SectorManager,
sc sealing.SectorIDCounter,
verif ffiwrapper.Verifier,
gsd dtypes.GetSealingConfigFunc,
feeCfg config.MinerFeeConfig,
journal journal.Journal,
as *AddressSelector) (*Miner, error) {
m := &Miner{
api: api,
feeCfg: feeCfg,
Expand All @@ -136,6 +157,7 @@ func NewMiner(api storageMinerApi, maddr address.Address, h host.Host, ds datast
return m, nil
}

// Run starts the sealing FSM in the background, running preliminary checks first.
func (m *Miner) Run(ctx context.Context) error {
if err := m.runPreflightChecks(ctx); err != nil {
return xerrors.Errorf("miner preflight checks failed: %w", err)
Expand All @@ -152,17 +174,37 @@ func (m *Miner) Run(ctx context.Context) error {
MaxTerminateGasFee: abi.TokenAmount(m.feeCfg.MaxTerminateGasFee),
}

evts := events.NewEvents(ctx, m.api)
adaptedAPI := NewSealingAPIAdapter(m.api)
// TODO: Maybe we update this policy after actor upgrades?
pcp := sealing.NewBasicPreCommitPolicy(adaptedAPI, policy.GetMaxSectorExpirationExtension()-(md.WPoStProvingPeriod*2), md.PeriodStart%md.WPoStProvingPeriod)
var (
// consumer of chain head changes.
evts = events.NewEvents(ctx, m.api)
evtsAdapter = NewEventsAdapter(evts)

as := func(ctx context.Context, mi miner.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) {
return m.addrSel.AddressFor(ctx, m.api, mi, use, goodFunds, minFunds)
}
// Create a shim to glue the API required by the sealing component
// with the API that Lotus is capable of providing.
// The shim translates between "tipset tokens" and tipset keys, and
// provides extra methods.
adaptedAPI = NewSealingAPIAdapter(m.api)

// Instantiate a precommit policy.
defaultDuration = policy.GetMaxSectorExpirationExtension() - (md.WPoStProvingPeriod * 2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest adding a comment to explain this calculation

provingBoundary = md.PeriodStart % md.WPoStProvingPeriod

// TODO: Maybe we update this policy after actor upgrades?
pcp = sealing.NewBasicPreCommitPolicy(adaptedAPI, defaultDuration, provingBoundary)

// address selector.
as = func(ctx context.Context, mi miner.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest adding a comment explaining what the address selector does

return m.addrSel.AddressFor(ctx, m.api, mi, use, goodFunds, minFunds)
}

// sealing configuration.
cfg = sealing.GetSealingConfigFunc(m.getSealConfig)
)

m.sealing = sealing.New(adaptedAPI, fc, NewEventsAdapter(evts), m.maddr, m.ds, m.sealer, m.sc, m.verif, &pcp, sealing.GetSealingConfigFunc(m.getSealConfig), m.handleSealingNotifications, as)
// Instantiate the sealing FSM.
m.sealing = sealing.New(adaptedAPI, fc, evtsAdapter, m.maddr, m.ds, m.sealer, m.sc, m.verif, &pcp, cfg, m.handleSealingNotifications, as)

// Run the sealing FSM.
go m.sealing.Run(ctx) //nolint:errcheck // logged intside the function

return nil
Expand All @@ -184,6 +226,7 @@ func (m *Miner) Stop(ctx context.Context) error {
return m.sealing.Stop(ctx)
}

// runPreflightChecks verifies that preconditions to run the miner are satisfied.
func (m *Miner) runPreflightChecks(ctx context.Context) error {
mi, err := m.api.StateMinerInfo(ctx, m.maddr, types.EmptyTSK)
if err != nil {
Expand Down
File renamed without changes.
23 changes: 13 additions & 10 deletions storage/wdpost_changehandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,25 @@ const (
type CompleteGeneratePoSTCb func(posts []miner.SubmitWindowedPoStParams, err error)
type CompleteSubmitPoSTCb func(err error)

type changeHandlerAPI interface {
// wdPoStCommands is the subset of the WindowPoStScheduler + full node APIs used
// by the changeHandler to execute actions and query state.
type wdPoStCommands interface {
StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*dline.Info, error)

startGeneratePoST(ctx context.Context, ts *types.TipSet, deadline *dline.Info, onComplete CompleteGeneratePoSTCb) context.CancelFunc
startSubmitPoST(ctx context.Context, ts *types.TipSet, deadline *dline.Info, posts []miner.SubmitWindowedPoStParams, onComplete CompleteSubmitPoSTCb) context.CancelFunc
onAbort(ts *types.TipSet, deadline *dline.Info)
failPost(err error, ts *types.TipSet, deadline *dline.Info)
recordPoStFailure(err error, ts *types.TipSet, deadline *dline.Info)
}

type changeHandler struct {
api changeHandlerAPI
api wdPoStCommands
actor address.Address
proveHdlr *proveHandler
submitHdlr *submitHandler
}

func newChangeHandler(api changeHandlerAPI, actor address.Address) *changeHandler {
func newChangeHandler(api wdPoStCommands, actor address.Address) *changeHandler {
posts := newPostsCache()
p := newProver(api, posts)
s := newSubmitter(api, posts)
Expand Down Expand Up @@ -146,7 +149,7 @@ type postResult struct {

// proveHandler generates proofs
type proveHandler struct {
api changeHandlerAPI
api wdPoStCommands
posts *postsCache

postResults chan *postResult
Expand All @@ -163,7 +166,7 @@ type proveHandler struct {
}

func newProver(
api changeHandlerAPI,
api wdPoStCommands,
posts *postsCache,
) *proveHandler {
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -248,7 +251,7 @@ func (p *proveHandler) processPostResult(res *postResult) {
di := res.currPost.di
if res.err != nil {
// Proving failed so inform the API
p.api.failPost(res.err, res.ts, di)
p.api.recordPoStFailure(res.err, res.ts, di)
log.Warnf("Aborted window post Proving (Deadline: %+v)", di)
p.api.onAbort(res.ts, di)

Expand Down Expand Up @@ -295,7 +298,7 @@ type postInfo struct {

// submitHandler submits proofs on-chain
type submitHandler struct {
api changeHandlerAPI
api wdPoStCommands
posts *postsCache

submitResults chan *submitResult
Expand All @@ -319,7 +322,7 @@ type submitHandler struct {
}

func newSubmitter(
api changeHandlerAPI,
api wdPoStCommands,
posts *postsCache,
) *submitHandler {
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -488,7 +491,7 @@ func (s *submitHandler) submitIfReady(ctx context.Context, advance *types.TipSet
func (s *submitHandler) processSubmitResult(res *submitResult) {
if res.err != nil {
// Submit failed so inform the API and go back to the start state
s.api.failPost(res.err, res.pw.ts, res.pw.di)
s.api.recordPoStFailure(res.err, res.pw.ts, res.pw.di)
log.Warnf("Aborted window post Submitting (Deadline: %+v)", res.pw.di)
s.api.onAbort(res.pw.ts, res.pw.di)

Expand Down
2 changes: 1 addition & 1 deletion storage/wdpost_changehandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (m *mockAPI) wasAbortCalled() bool {
return m.abortCalled
}

func (m *mockAPI) failPost(err error, ts *types.TipSet, deadline *dline.Info) {
func (m *mockAPI) recordPoStFailure(err error, ts *types.TipSet, deadline *dline.Info) {
}

func (m *mockAPI) setChangeHandler(ch *changeHandler) {
Expand Down
Loading