Skip to content

Commit

Permalink
Merge pull request #4623 from filecoin-project/feat/cache-deal-state-…
Browse files Browse the repository at this point in the history
…matcher

Cache deal states for most recent old/new tipset
  • Loading branch information
magik6k authored Oct 31, 2020
2 parents 0f8dada + f79652c commit c6a8eff
Show file tree
Hide file tree
Showing 8 changed files with 402 additions and 107 deletions.
69 changes: 69 additions & 0 deletions chain/events/state/mock/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package test

import (
"context"
"sync"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/blockstore"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
)

type MockAPI struct {
bs blockstore.Blockstore

lk sync.Mutex
ts map[types.TipSetKey]*types.Actor
stateGetActorCalled int
}

func NewMockAPI(bs blockstore.Blockstore) *MockAPI {
return &MockAPI{
bs: bs,
ts: make(map[types.TipSetKey]*types.Actor),
}
}

func (m *MockAPI) ChainHasObj(ctx context.Context, c cid.Cid) (bool, error) {
return m.bs.Has(c)
}

func (m *MockAPI) ChainReadObj(ctx context.Context, c cid.Cid) ([]byte, error) {
blk, err := m.bs.Get(c)
if err != nil {
return nil, xerrors.Errorf("blockstore get: %w", err)
}

return blk.RawData(), nil
}

func (m *MockAPI) StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) {
m.lk.Lock()
defer m.lk.Unlock()

m.stateGetActorCalled++
return m.ts[tsk], nil
}

func (m *MockAPI) StateGetActorCallCount() int {
m.lk.Lock()
defer m.lk.Unlock()

return m.stateGetActorCalled
}

func (m *MockAPI) ResetCallCounts() {
m.lk.Lock()
defer m.lk.Unlock()

m.stateGetActorCalled = 0
}

func (m *MockAPI) SetActor(tsk types.TipSetKey, act *types.Actor) {
m.lk.Lock()
defer m.lk.Unlock()

m.ts[tsk] = act
}
32 changes: 32 additions & 0 deletions chain/events/state/mock/state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package test

import (
"context"
"testing"

"github.com/filecoin-project/go-state-types/abi"
"github.com/ipfs/go-cid"

"github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
"github.com/filecoin-project/specs-actors/v2/actors/util/adt"
"github.com/stretchr/testify/require"
)

func CreateEmptyMarketState(t *testing.T, store adt.Store) *market.State {
emptyArrayCid, err := adt.MakeEmptyArray(store).Root()
require.NoError(t, err)
emptyMap, err := adt.MakeEmptyMap(store).Root()
require.NoError(t, err)
return market.ConstructState(emptyArrayCid, emptyMap, emptyMap)
}

func CreateDealAMT(ctx context.Context, t *testing.T, store adt.Store, deals map[abi.DealID]*market.DealState) cid.Cid {
root := adt.MakeEmptyArray(store)
for dealID, dealState := range deals {
err := root.Set(uint64(dealID), dealState)
require.NoError(t, err)
}
rootCid, err := root.Root()
require.NoError(t, err)
return rootCid
}
27 changes: 27 additions & 0 deletions chain/events/state/mock/tipset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package test

import (
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/lotus/chain/types"
"github.com/ipfs/go-cid"
)

var dummyCid cid.Cid

func init() {
dummyCid, _ = cid.Parse("bafkqaaa")
}

func MockTipset(minerAddr address.Address, timestamp uint64) (*types.TipSet, error) {
return types.NewTipSet([]*types.BlockHeader{{
Miner: minerAddr,
Height: 5,
ParentStateRoot: dummyCid,
Messages: dummyCid,
ParentMessageReceipts: dummyCid,
BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS},
BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS},
Timestamp: timestamp,
}})
}
101 changes: 17 additions & 84 deletions chain/events/state/predicates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,19 @@ import (
"context"
"testing"

test "github.com/filecoin-project/lotus/chain/events/state/mock"

"github.com/filecoin-project/lotus/chain/actors/builtin/miner"

"github.com/filecoin-project/go-bitfield"

"github.com/stretchr/testify/require"
"golang.org/x/xerrors"

"github.com/ipfs/go-cid"
cbornode "github.com/ipfs/go-ipld-cbor"
"github.com/stretchr/testify/require"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-state-types/crypto"

builtin2 "github.com/filecoin-project/specs-actors/v2/actors/builtin"
market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
miner2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/miner"
Expand All @@ -36,39 +34,6 @@ func init() {
dummyCid, _ = cid.Parse("bafkqaaa")
}

type mockAPI struct {
ts map[types.TipSetKey]*types.Actor
bs bstore.Blockstore
}

func newMockAPI(bs bstore.Blockstore) *mockAPI {
return &mockAPI{
bs: bs,
ts: make(map[types.TipSetKey]*types.Actor),
}
}

func (m mockAPI) ChainHasObj(ctx context.Context, c cid.Cid) (bool, error) {
return m.bs.Has(c)
}

func (m mockAPI) ChainReadObj(ctx context.Context, c cid.Cid) ([]byte, error) {
blk, err := m.bs.Get(c)
if err != nil {
return nil, xerrors.Errorf("blockstore get: %w", err)
}

return blk.RawData(), nil
}

func (m mockAPI) StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) {
return m.ts[tsk], nil
}

func (m mockAPI) setActor(tsk types.TipSetKey, act *types.Actor) {
m.ts[tsk] = act
}

func TestMarketPredicates(t *testing.T) {
ctx := context.Background()
bs := bstore.NewTemporarySync()
Expand Down Expand Up @@ -177,14 +142,14 @@ func TestMarketPredicates(t *testing.T) {

minerAddr, err := address.NewFromString("t00")
require.NoError(t, err)
oldState, err := mockTipset(minerAddr, 1)
oldState, err := test.MockTipset(minerAddr, 1)
require.NoError(t, err)
newState, err := mockTipset(minerAddr, 2)
newState, err := test.MockTipset(minerAddr, 2)
require.NoError(t, err)

api := newMockAPI(bs)
api.setActor(oldState.Key(), &types.Actor{Code: builtin2.StorageMarketActorCodeID, Head: oldStateC})
api.setActor(newState.Key(), &types.Actor{Code: builtin2.StorageMarketActorCodeID, Head: newStateC})
api := test.NewMockAPI(bs)
api.SetActor(oldState.Key(), &types.Actor{Code: builtin2.StorageMarketActorCodeID, Head: oldStateC})
api.SetActor(newState.Key(), &types.Actor{Code: builtin2.StorageMarketActorCodeID, Head: newStateC})

t.Run("deal ID predicate", func(t *testing.T) {
preds := NewStatePredicates(api)
Expand Down Expand Up @@ -239,7 +204,7 @@ func TestMarketPredicates(t *testing.T) {
t.Fatal("No state change so this should not be called")
return false, nil, nil
})
marketState0 := createEmptyMarketState(t, store)
marketState0 := test.CreateEmptyMarketState(t, store)
marketCid, err := store.Put(ctx, marketState0)
require.NoError(t, err)
marketState, err := market.Load(store, &types.Actor{
Expand Down Expand Up @@ -352,7 +317,7 @@ func TestMarketPredicates(t *testing.T) {
t.Fatal("No state change so this should not be called")
return false, nil, nil
})
marketState0 := createEmptyMarketState(t, store)
marketState0 := test.CreateEmptyMarketState(t, store)
marketCid, err := store.Put(ctx, marketState0)
require.NoError(t, err)
marketState, err := market.Load(store, &types.Actor{
Expand Down Expand Up @@ -394,14 +359,14 @@ func TestMinerSectorChange(t *testing.T) {
newMinerC := createMinerState(ctx, t, store, owner, worker, []miner.SectorOnChainInfo{si1Ext, si2, si3})

minerAddr := nextIDAddrF()
oldState, err := mockTipset(minerAddr, 1)
oldState, err := test.MockTipset(minerAddr, 1)
require.NoError(t, err)
newState, err := mockTipset(minerAddr, 2)
newState, err := test.MockTipset(minerAddr, 2)
require.NoError(t, err)

api := newMockAPI(bs)
api.setActor(oldState.Key(), &types.Actor{Head: oldMinerC, Code: builtin2.StorageMinerActorCodeID})
api.setActor(newState.Key(), &types.Actor{Head: newMinerC, Code: builtin2.StorageMinerActorCodeID})
api := test.NewMockAPI(bs)
api.SetActor(oldState.Key(), &types.Actor{Head: oldMinerC, Code: builtin2.StorageMinerActorCodeID})
api.SetActor(newState.Key(), &types.Actor{Head: newMinerC, Code: builtin2.StorageMinerActorCodeID})

preds := NewStatePredicates(api)

Expand Down Expand Up @@ -449,29 +414,16 @@ func TestMinerSectorChange(t *testing.T) {
require.Equal(t, si1Ext, sectorChanges.Extended[0].From)
}

func mockTipset(minerAddr address.Address, timestamp uint64) (*types.TipSet, error) {
return types.NewTipSet([]*types.BlockHeader{{
Miner: minerAddr,
Height: 5,
ParentStateRoot: dummyCid,
Messages: dummyCid,
ParentMessageReceipts: dummyCid,
BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS},
BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS},
Timestamp: timestamp,
}})
}

type balance struct {
available abi.TokenAmount
locked abi.TokenAmount
}

func createMarketState(ctx context.Context, t *testing.T, store adt2.Store, deals map[abi.DealID]*market2.DealState, props map[abi.DealID]*market2.DealProposal, balances map[address.Address]balance) cid.Cid {
dealRootCid := createDealAMT(ctx, t, store, deals)
dealRootCid := test.CreateDealAMT(ctx, t, store, deals)
propRootCid := createProposalAMT(ctx, t, store, props)
balancesCids := createBalanceTable(ctx, t, store, balances)
state := createEmptyMarketState(t, store)
state := test.CreateEmptyMarketState(t, store)
state.States = dealRootCid
state.Proposals = propRootCid
state.EscrowTable = balancesCids[0]
Expand All @@ -482,25 +434,6 @@ func createMarketState(ctx context.Context, t *testing.T, store adt2.Store, deal
return stateC
}

func createEmptyMarketState(t *testing.T, store adt2.Store) *market2.State {
emptyArrayCid, err := adt2.MakeEmptyArray(store).Root()
require.NoError(t, err)
emptyMap, err := adt2.MakeEmptyMap(store).Root()
require.NoError(t, err)
return market2.ConstructState(emptyArrayCid, emptyMap, emptyMap)
}

func createDealAMT(ctx context.Context, t *testing.T, store adt2.Store, deals map[abi.DealID]*market2.DealState) cid.Cid {
root := adt2.MakeEmptyArray(store)
for dealID, dealState := range deals {
err := root.Set(uint64(dealID), dealState)
require.NoError(t, err)
}
rootCid, err := root.Root()
require.NoError(t, err)
return rootCid
}

func createProposalAMT(ctx context.Context, t *testing.T, store adt2.Store, props map[abi.DealID]*market2.DealProposal) cid.Cid {
root := adt2.MakeEmptyArray(store)
for dealID, prop := range props {
Expand Down
23 changes: 10 additions & 13 deletions markets/storageadapter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,26 @@ type ClientNodeAdapter struct {
full.ChainAPI
full.MpoolAPI

fm *market.FundMgr
ev *events.Events
fm *market.FundMgr
ev *events.Events
dsMatcher *dealStateMatcher
}

type clientApi struct {
full.ChainAPI
full.StateAPI
}

func NewClientNodeAdapter(state full.StateAPI, chain full.ChainAPI, mpool full.MpoolAPI, fm *market.FundMgr) storagemarket.StorageClientNode {
func NewClientNodeAdapter(stateapi full.StateAPI, chain full.ChainAPI, mpool full.MpoolAPI, fm *market.FundMgr) storagemarket.StorageClientNode {
capi := &clientApi{chain, stateapi}
return &ClientNodeAdapter{
StateAPI: state,
StateAPI: stateapi,
ChainAPI: chain,
MpoolAPI: mpool,

fm: fm,
ev: events.NewEvents(context.TODO(), &clientApi{chain, state}),
fm: fm,
ev: events.NewEvents(context.TODO(), capi),
dsMatcher: newDealStateMatcher(state.NewStatePredicates(capi)),
}
}

Expand Down Expand Up @@ -389,13 +392,7 @@ func (c *ClientNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID a
}

// Watch for state changes to the deal
preds := state.NewStatePredicates(c)
dealDiff := preds.OnStorageMarketActorChanged(
preds.OnDealStateChanged(
preds.DealStateChangedForIDs([]abi.DealID{dealID})))
match := func(oldTs, newTs *types.TipSet) (bool, events.StateChange, error) {
return dealDiff(ctx, oldTs.Key(), newTs.Key())
}
match := c.dsMatcher.matcher(ctx, dealID)

// Wait until after the end epoch for the deal and then timeout
timeout := (sd.Proposal.EndEpoch - head.Height()) + 1
Expand Down
Loading

0 comments on commit c6a8eff

Please sign in to comment.