Skip to content

Commit

Permalink
feat: cache deal states for most recent old/new tipset
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed Oct 28, 2020
1 parent 3d02dba commit c23cdab
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 23 deletions.
23 changes: 10 additions & 13 deletions markets/storageadapter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,26 @@ type ClientNodeAdapter struct {
full.ChainAPI
full.MpoolAPI

fm *market.FundMgr
ev *events.Events
fm *market.FundMgr
ev *events.Events
dsMatcher *dealStateMatcher
}

type clientApi struct {
full.ChainAPI
full.StateAPI
}

func NewClientNodeAdapter(state full.StateAPI, chain full.ChainAPI, mpool full.MpoolAPI, fm *market.FundMgr) storagemarket.StorageClientNode {
func NewClientNodeAdapter(stateapi full.StateAPI, chain full.ChainAPI, mpool full.MpoolAPI, fm *market.FundMgr) storagemarket.StorageClientNode {
capi := &clientApi{chain, stateapi}
return &ClientNodeAdapter{
StateAPI: state,
StateAPI: stateapi,
ChainAPI: chain,
MpoolAPI: mpool,

fm: fm,
ev: events.NewEvents(context.TODO(), &clientApi{chain, state}),
fm: fm,
ev: events.NewEvents(context.TODO(), capi),
dsMatcher: newDealStateMatcher(state.NewStatePredicates(capi)),
}
}

Expand Down Expand Up @@ -389,13 +392,7 @@ func (c *ClientNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID a
}

// Watch for state changes to the deal
preds := state.NewStatePredicates(c)
dealDiff := preds.OnStorageMarketActorChanged(
preds.OnDealStateChanged(
preds.DealStateChangedForIDs([]abi.DealID{dealID})))
match := func(oldTs, newTs *types.TipSet) (bool, events.StateChange, error) {
return dealDiff(ctx, oldTs.Key(), newTs.Key())
}
match := c.dsMatcher.matcher(ctx, dealID)

// Wait until after the end epoch for the deal and then timeout
timeout := (sd.Proposal.EndEpoch - head.Height()) + 1
Expand Down
84 changes: 84 additions & 0 deletions markets/storageadapter/dealstatematcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package storageadapter

import (
"context"
"sync"

"github.com/filecoin-project/go-state-types/abi"
actorsmarket "github.com/filecoin-project/lotus/chain/actors/builtin/market"
"github.com/filecoin-project/lotus/chain/events"
"github.com/filecoin-project/lotus/chain/events/state"
"github.com/filecoin-project/lotus/chain/types"
)

// dealStateMatcher caches the DealStates for the most recent
// old/new tipset combination
type dealStateMatcher struct {
preds *state.StatePredicates

lk sync.Mutex
oldTsk types.TipSetKey
newTsk types.TipSetKey
oldDealStateRoot actorsmarket.DealStates
newDealStateRoot actorsmarket.DealStates
}

func newDealStateMatcher(preds *state.StatePredicates) *dealStateMatcher {
return &dealStateMatcher{preds: preds}
}

// matcher returns a function that checks if the state of the given dealID
// has changed.
// It caches the DealStates for the most recent old/new tipset combination.
func (mc *dealStateMatcher) matcher(ctx context.Context, dealID abi.DealID) events.StateMatchFunc {
// The function that is called to check if the deal state has changed for
// the target deal ID
dealStateChangedForID := mc.preds.DealStateChangedForIDs([]abi.DealID{dealID})

// The match function is called by the events API to check if there's
// been a state change for the deal with the target deal ID
match := func(oldTs, newTs *types.TipSet) (bool, events.StateChange, error) {
mc.lk.Lock()
defer mc.lk.Unlock()

// Check if we've already fetched the DealStates for the given tipsets
if mc.oldTsk == oldTs.Key() && mc.newTsk == newTs.Key() {
// If we fetch the DealStates and there is no difference between
// them, they are stored as nil. So we can just bail out.
if mc.oldDealStateRoot == nil || mc.newDealStateRoot == nil {
return false, nil, nil
}

// Check if the deal state has changed for the target ID
return dealStateChangedForID(ctx, mc.oldDealStateRoot, mc.newDealStateRoot)
}

// We haven't already fetched the DealStates for the given tipsets, so
// do so now

// Replace dealStateChangedForID with a function that records the
// DealStates so that we can cache them
var oldDealStateRootSaved, newDealStateRootSaved actorsmarket.DealStates
recorder := func(ctx context.Context, oldDealStateRoot, newDealStateRoot actorsmarket.DealStates) (changed bool, user state.UserData, err error) {
// Record DealStates
oldDealStateRootSaved = oldDealStateRoot
newDealStateRootSaved = newDealStateRoot

return dealStateChangedForID(ctx, oldDealStateRoot, newDealStateRoot)
}

// Call the match function
dealDiff := mc.preds.OnStorageMarketActorChanged(
mc.preds.OnDealStateChanged(recorder))
matched, data, err := dealDiff(ctx, oldTs.Key(), newTs.Key())

// Save the recorded DealStates for the tipsets
mc.oldTsk = oldTs.Key()
mc.newTsk = newTs.Key()
mc.oldDealStateRoot = oldDealStateRootSaved
mc.newDealStateRoot = newDealStateRootSaved

return matched, data, err
}
return match
}
16 changes: 6 additions & 10 deletions markets/storageadapter/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,18 @@ type ProviderNodeAdapter struct {
ev *events.Events

publishSpec, addBalanceSpec *api.MessageSendSpec
dsMatcher *dealStateMatcher
}

func NewProviderNodeAdapter(fc *config.MinerFeeConfig) func(dag dtypes.StagingDAG, secb *sectorblocks.SectorBlocks, full api.FullNode) storagemarket.StorageProviderNode {
return func(dag dtypes.StagingDAG, secb *sectorblocks.SectorBlocks, full api.FullNode) storagemarket.StorageProviderNode {
na := &ProviderNodeAdapter{
FullNode: full,

dag: dag,
secb: secb,
ev: events.NewEvents(context.TODO(), full),
dag: dag,
secb: secb,
ev: events.NewEvents(context.TODO(), full),
dsMatcher: newDealStateMatcher(state.NewStatePredicates(full)),
}
if fc != nil {
na.publishSpec = &api.MessageSendSpec{MaxFee: abi.TokenAmount(fc.MaxPublishDealsFee)}
Expand Down Expand Up @@ -461,13 +463,7 @@ func (n *ProviderNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID
}

// Watch for state changes to the deal
preds := state.NewStatePredicates(n)
dealDiff := preds.OnStorageMarketActorChanged(
preds.OnDealStateChanged(
preds.DealStateChangedForIDs([]abi.DealID{dealID})))
match := func(oldTs, newTs *types.TipSet) (bool, events.StateChange, error) {
return dealDiff(ctx, oldTs.Key(), newTs.Key())
}
match := n.dsMatcher.matcher(ctx, dealID)

// Wait until after the end epoch for the deal and then timeout
timeout := (sd.Proposal.EndEpoch - head.Height()) + 1
Expand Down

0 comments on commit c23cdab

Please sign in to comment.