diff --git a/api/api_full.go b/api/api_full.go index 601b1466047..777e996cb24 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -367,6 +367,10 @@ type FullNode interface { // StateWaitMsg looks back in the chain for a message. If not found, it blocks until the // message arrives on chain, and gets to the indicated confidence depth. StateWaitMsg(ctx context.Context, cid cid.Cid, confidence uint64) (*MsgLookup, error) + // StateWaitMsgLimited looks back up to limit epochs in the chain for a message. + // If not found, it blocks until the message arrives on chain, and gets to the + // indicated confidence depth. + StateWaitMsgLimited(ctx context.Context, cid cid.Cid, confidence uint64, limit abi.ChainEpoch) (*MsgLookup, error) // StateListMiners returns the addresses of every miner that has claimed power in the Power Actor StateListMiners(context.Context, types.TipSetKey) ([]address.Address, error) // StateListActors returns the addresses of every actor in the state diff --git a/api/api_gateway.go b/api/api_gateway.go new file mode 100644 index 00000000000..95d28887d85 --- /dev/null +++ b/api/api_gateway.go @@ -0,0 +1,24 @@ +package api + +import ( + "context" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/chain/types" + "github.com/ipfs/go-cid" +) + +type GatewayAPI interface { + ChainHead(ctx context.Context) (*types.TipSet, error) + ChainGetTipSet(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error) + ChainGetTipSetByHeight(ctx context.Context, h abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error) + GasEstimateMessageGas(ctx context.Context, msg *types.Message, spec *MessageSendSpec, tsk types.TipSetKey) (*types.Message, error) + MpoolPush(ctx context.Context, sm *types.SignedMessage) (cid.Cid, error) + MsigGetAvailableBalance(ctx context.Context, addr address.Address, tsk types.TipSetKey) (types.BigInt, error) + MsigGetVested(ctx context.Context, addr address.Address, start types.TipSetKey, end types.TipSetKey) (types.BigInt, error) + StateAccountKey(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error) + StateGetActor(ctx context.Context, actor address.Address, ts types.TipSetKey) (*types.Actor, error) + StateLookupID(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error) + StateWaitMsg(ctx context.Context, msg cid.Cid, confidence uint64) (*MsgLookup, error) +} diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index 1a3ddda5819..df84eac80f7 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -190,6 +190,7 @@ type FullNodeStruct struct { StateReadState func(context.Context, address.Address, types.TipSetKey) (*api.ActorState, error) `perm:"read"` StateMsgGasCost func(context.Context, cid.Cid, types.TipSetKey) (*api.MsgGasCost, error) `perm:"read"` StateWaitMsg func(ctx context.Context, cid cid.Cid, confidence uint64) (*api.MsgLookup, error) `perm:"read"` + StateWaitMsgLimited func(context.Context, cid.Cid, uint64, abi.ChainEpoch) (*api.MsgLookup, error) `perm:"read"` StateSearchMsg func(context.Context, cid.Cid) (*api.MsgLookup, error) `perm:"read"` StateListMiners func(context.Context, types.TipSetKey) ([]address.Address, error) `perm:"read"` StateListActors func(context.Context, types.TipSetKey) ([]address.Address, error) `perm:"read"` @@ -361,6 +362,23 @@ type WorkerStruct struct { } } +type GatewayStruct struct { + Internal struct { + // TODO: does the gateway need perms? + ChainGetTipSet func(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error) + ChainGetTipSetByHeight func(ctx context.Context, h abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error) + ChainHead func(ctx context.Context) (*types.TipSet, error) + GasEstimateMessageGas func(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec, tsk types.TipSetKey) (*types.Message, error) + MpoolPush func(ctx context.Context, sm *types.SignedMessage) (cid.Cid, error) + MsigGetAvailableBalance func(ctx context.Context, addr address.Address, tsk types.TipSetKey) (types.BigInt, error) + MsigGetVested func(ctx context.Context, addr address.Address, start types.TipSetKey, end types.TipSetKey) (types.BigInt, error) + StateAccountKey func(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error) + StateGetActor func(ctx context.Context, actor address.Address, ts types.TipSetKey) (*types.Actor, error) + StateLookupID func(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error) + StateWaitMsg func(ctx context.Context, msg cid.Cid, confidence uint64) (*api.MsgLookup, error) + } +} + // CommonStruct func (c *CommonStruct) AuthVerify(ctx context.Context, token string) ([]auth.Permission, error) { @@ -854,6 +872,10 @@ func (c *FullNodeStruct) StateWaitMsg(ctx context.Context, msgc cid.Cid, confide return c.Internal.StateWaitMsg(ctx, msgc, confidence) } +func (c *FullNodeStruct) StateWaitMsgLimited(ctx context.Context, msgc cid.Cid, confidence uint64, limit abi.ChainEpoch) (*api.MsgLookup, error) { + return c.Internal.StateWaitMsgLimited(ctx, msgc, confidence, limit) +} + func (c *FullNodeStruct) StateSearchMsg(ctx context.Context, msgc cid.Cid) (*api.MsgLookup, error) { return c.Internal.StateSearchMsg(ctx, msgc) } @@ -1372,7 +1394,52 @@ func (w *WorkerStruct) Closing(ctx context.Context) (<-chan struct{}, error) { return w.Internal.Closing(ctx) } +func (g GatewayStruct) ChainHead(ctx context.Context) (*types.TipSet, error) { + return g.Internal.ChainHead(ctx) +} + +func (g GatewayStruct) ChainGetTipSet(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error) { + return g.Internal.ChainGetTipSet(ctx, tsk) +} + +func (g GatewayStruct) ChainGetTipSetByHeight(ctx context.Context, h abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error) { + return g.Internal.ChainGetTipSetByHeight(ctx, h, tsk) +} + +func (g GatewayStruct) GasEstimateMessageGas(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec, tsk types.TipSetKey) (*types.Message, error) { + return g.Internal.GasEstimateMessageGas(ctx, msg, spec, tsk) +} + +func (g GatewayStruct) MpoolPush(ctx context.Context, sm *types.SignedMessage) (cid.Cid, error) { + return g.Internal.MpoolPush(ctx, sm) +} + +func (g GatewayStruct) MsigGetAvailableBalance(ctx context.Context, addr address.Address, tsk types.TipSetKey) (types.BigInt, error) { + return g.Internal.MsigGetAvailableBalance(ctx, addr, tsk) +} + +func (g GatewayStruct) MsigGetVested(ctx context.Context, addr address.Address, start types.TipSetKey, end types.TipSetKey) (types.BigInt, error) { + return g.Internal.MsigGetVested(ctx, addr, start, end) +} + +func (g GatewayStruct) StateAccountKey(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error) { + return g.Internal.StateAccountKey(ctx, addr, tsk) +} + +func (g GatewayStruct) StateGetActor(ctx context.Context, actor address.Address, ts types.TipSetKey) (*types.Actor, error) { + return g.Internal.StateGetActor(ctx, actor, ts) +} + +func (g GatewayStruct) StateLookupID(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error) { + return g.Internal.StateLookupID(ctx, addr, tsk) +} + +func (g GatewayStruct) StateWaitMsg(ctx context.Context, msg cid.Cid, confidence uint64) (*api.MsgLookup, error) { + return g.Internal.StateWaitMsg(ctx, msg, confidence) +} + var _ api.Common = &CommonStruct{} var _ api.FullNode = &FullNodeStruct{} var _ api.StorageMiner = &StorageMinerStruct{} var _ api.WorkerAPI = &WorkerStruct{} +var _ api.GatewayAPI = &GatewayStruct{} diff --git a/api/client/client.go b/api/client/client.go index cd915acf049..390ce93d76b 100644 --- a/api/client/client.go +++ b/api/client/client.go @@ -82,3 +82,17 @@ func NewWorkerRPC(ctx context.Context, addr string, requestHeader http.Header) ( return &res, closer, err } + +// NewGatewayRPC creates a new http jsonrpc client for a gateway node. +func NewGatewayRPC(ctx context.Context, addr string, requestHeader http.Header, opts ...jsonrpc.Option) (api.GatewayAPI, jsonrpc.ClientCloser, error) { + var res apistruct.GatewayStruct + closer, err := jsonrpc.NewMergeClient(ctx, addr, "Filecoin", + []interface{}{ + &res.Internal, + }, + requestHeader, + opts..., + ) + + return &res, closer, err +} diff --git a/api/test/ccupgrade.go b/api/test/ccupgrade.go index 97fb665ed7b..4a860c661d7 100644 --- a/api/test/ccupgrade.go +++ b/api/test/ccupgrade.go @@ -12,10 +12,7 @@ import ( "github.com/filecoin-project/go-state-types/abi" - "github.com/filecoin-project/lotus/build" - "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/node" "github.com/filecoin-project/lotus/node/impl" ) @@ -37,11 +34,7 @@ func TestCCUpgrade(t *testing.T, b APIBuilder, blocktime time.Duration) { func testCCUpgrade(t *testing.T, b APIBuilder, blocktime time.Duration, upgradeHeight abi.ChainEpoch) { ctx := context.Background() - n, sn := b(t, 1, OneMiner, node.Override(new(stmgr.UpgradeSchedule), stmgr.UpgradeSchedule{{ - Network: build.ActorUpgradeNetworkVersion, - Height: upgradeHeight, - Migration: stmgr.UpgradeActorsV2, - }})) + n, sn := b(t, []FullNodeOpts{FullNodeWithUpgradeAt(upgradeHeight)}, OneMiner) client := n[0].FullNode.(*impl.FullNodeAPI) miner := sn[0] diff --git a/api/test/deals.go b/api/test/deals.go index aa5bfa716bc..8b4a7fe8b19 100644 --- a/api/test/deals.go +++ b/api/test/deals.go @@ -48,7 +48,7 @@ func TestDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, carExport _ = os.Setenv("BELLMAN_NO_GPU", "1") ctx := context.Background() - n, sn := b(t, 1, OneMiner) + n, sn := b(t, OneFull, OneMiner) client := n[0].FullNode.(*impl.FullNodeAPI) miner := sn[0] @@ -85,7 +85,7 @@ func TestDoubleDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration) { _ = os.Setenv("BELLMAN_NO_GPU", "1") ctx := context.Background() - n, sn := b(t, 1, OneMiner) + n, sn := b(t, OneFull, OneMiner) client := n[0].FullNode.(*impl.FullNodeAPI) miner := sn[0] @@ -149,7 +149,7 @@ func TestFastRetrievalDealFlow(t *testing.T, b APIBuilder, blocktime time.Durati _ = os.Setenv("BELLMAN_NO_GPU", "1") ctx := context.Background() - n, sn := b(t, 1, OneMiner) + n, sn := b(t, OneFull, OneMiner) client := n[0].FullNode.(*impl.FullNodeAPI) miner := sn[0] @@ -204,7 +204,7 @@ func TestSenondDealRetrieval(t *testing.T, b APIBuilder, blocktime time.Duration _ = os.Setenv("BELLMAN_NO_GPU", "1") ctx := context.Background() - n, sn := b(t, 1, OneMiner) + n, sn := b(t, OneFull, OneMiner) client := n[0].FullNode.(*impl.FullNodeAPI) miner := sn[0] diff --git a/api/test/mining.go b/api/test/mining.go index e19774a767a..8147c224b4d 100644 --- a/api/test/mining.go +++ b/api/test/mining.go @@ -25,7 +25,7 @@ var log = logging.Logger("apitest") func (ts *testSuite) testMining(t *testing.T) { ctx := context.Background() - apis, sn := ts.makeNodes(t, 1, OneMiner) + apis, sn := ts.makeNodes(t, OneFull, OneMiner) api := apis[0] newHeads, err := api.ChainNotify(ctx) @@ -54,7 +54,7 @@ func (ts *testSuite) testMiningReal(t *testing.T) { }() ctx := context.Background() - apis, sn := ts.makeNodes(t, 1, OneMiner) + apis, sn := ts.makeNodes(t, OneFull, OneMiner) api := apis[0] newHeads, err := api.ChainNotify(ctx) @@ -93,7 +93,7 @@ func TestDealMining(t *testing.T, b APIBuilder, blocktime time.Duration, carExpo // test making a deal with a fresh miner, and see if it starts to mine ctx := context.Background() - n, sn := b(t, 1, []StorageMiner{ + n, sn := b(t, OneFull, []StorageMiner{ {Full: 0, Preseal: PresealGenesis}, {Full: 0, Preseal: 0}, // TODO: Add support for miners on non-first full node }) diff --git a/api/test/paych.go b/api/test/paych.go index e95773b6a9a..a8ccebdde4b 100644 --- a/api/test/paych.go +++ b/api/test/paych.go @@ -33,7 +33,7 @@ func TestPaymentChannels(t *testing.T, b APIBuilder, blocktime time.Duration) { _ = os.Setenv("BELLMAN_NO_GPU", "1") ctx := context.Background() - n, sn := b(t, 2, OneMiner) + n, sn := b(t, TwoFull, OneMiner) paymentCreator := n[0] paymentReceiver := n[1] diff --git a/api/test/test.go b/api/test/test.go index 853267eff9b..35b39774081 100644 --- a/api/test/test.go +++ b/api/test/test.go @@ -4,11 +4,14 @@ import ( "context" "testing" + "github.com/filecoin-project/lotus/chain/stmgr" + "github.com/multiformats/go-multiaddr" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/miner" @@ -35,17 +38,27 @@ var PresealGenesis = -1 const GenesisPreseals = 2 +// Options for setting up a mock storage miner type StorageMiner struct { Full int Preseal int } +type OptionGenerator func([]TestNode) node.Option + +// Options for setting up a mock full node +type FullNodeOpts struct { + Lite bool // run node in "lite" mode + Opts OptionGenerator // generate dependency injection options +} + // APIBuilder is a function which is invoked in test suite to provide // test nodes and networks // +// fullOpts array defines options for each full node // storage array defines storage nodes, numbers in the array specify full node // index the storage node 'belongs' to -type APIBuilder func(t *testing.T, nFull int, storage []StorageMiner, opts ...node.Option) ([]TestNode, []TestStorageNode) +type APIBuilder func(t *testing.T, full []FullNodeOpts, storage []StorageMiner) ([]TestNode, []TestStorageNode) type testSuite struct { makeNodes APIBuilder } @@ -63,13 +76,39 @@ func TestApis(t *testing.T, b APIBuilder) { t.Run("testMiningReal", ts.testMiningReal) } +func DefaultFullOpts(nFull int) []FullNodeOpts { + full := make([]FullNodeOpts, nFull) + for i := range full { + full[i] = FullNodeOpts{ + Opts: func(nodes []TestNode) node.Option { + return node.Options() + }, + } + } + return full +} + var OneMiner = []StorageMiner{{Full: 0, Preseal: PresealGenesis}} +var OneFull = DefaultFullOpts(1) +var TwoFull = DefaultFullOpts(2) + +var FullNodeWithUpgradeAt = func(upgradeHeight abi.ChainEpoch) FullNodeOpts { + return FullNodeOpts{ + Opts: func(nodes []TestNode) node.Option { + return node.Override(new(stmgr.UpgradeSchedule), stmgr.UpgradeSchedule{{ + Network: build.ActorUpgradeNetworkVersion, + Height: upgradeHeight, + Migration: stmgr.UpgradeActorsV2, + }}) + }, + } +} func (ts *testSuite) testVersion(t *testing.T) { build.RunningNodeType = build.NodeFull ctx := context.Background() - apis, _ := ts.makeNodes(t, 1, OneMiner) + apis, _ := ts.makeNodes(t, OneFull, OneMiner) api := apis[0] v, err := api.Version(ctx) @@ -81,7 +120,7 @@ func (ts *testSuite) testVersion(t *testing.T) { func (ts *testSuite) testID(t *testing.T) { ctx := context.Background() - apis, _ := ts.makeNodes(t, 1, OneMiner) + apis, _ := ts.makeNodes(t, OneFull, OneMiner) api := apis[0] id, err := api.ID(ctx) @@ -93,7 +132,7 @@ func (ts *testSuite) testID(t *testing.T) { func (ts *testSuite) testConnectTwo(t *testing.T) { ctx := context.Background() - apis, _ := ts.makeNodes(t, 2, OneMiner) + apis, _ := ts.makeNodes(t, TwoFull, OneMiner) p, err := apis[0].NetPeers(ctx) if err != nil { diff --git a/api/test/window_post.go b/api/test/window_post.go index eadcdbb05a7..7bc56a562d4 100644 --- a/api/test/window_post.go +++ b/api/test/window_post.go @@ -15,11 +15,9 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/extern/sector-storage/mock" sealing "github.com/filecoin-project/lotus/extern/storage-sealing" - "github.com/filecoin-project/lotus/node" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" - "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/types" bminer "github.com/filecoin-project/lotus/miner" "github.com/filecoin-project/lotus/node/impl" @@ -34,7 +32,7 @@ func init() { func TestPledgeSector(t *testing.T, b APIBuilder, blocktime time.Duration, nSectors int) { ctx := context.Background() - n, sn := b(t, 1, OneMiner) + n, sn := b(t, OneFull, OneMiner) client := n[0].FullNode.(*impl.FullNodeAPI) miner := sn[0] @@ -133,11 +131,7 @@ func testWindowPostUpgrade(t *testing.T, b APIBuilder, blocktime time.Duration, ctx, cancel := context.WithCancel(context.Background()) defer cancel() - n, sn := b(t, 1, OneMiner, node.Override(new(stmgr.UpgradeSchedule), stmgr.UpgradeSchedule{{ - Network: build.ActorUpgradeNetworkVersion, - Height: upgradeHeight, - Migration: stmgr.UpgradeActorsV2, - }})) + n, sn := b(t, []FullNodeOpts{FullNodeWithUpgradeAt(upgradeHeight)}, OneMiner) client := n[0].FullNode.(*impl.FullNodeAPI) miner := sn[0] diff --git a/chain/messagesigner/messagesigner.go b/chain/messagesigner/messagesigner.go index ac94d6a3e1f..1fe8f9565a0 100644 --- a/chain/messagesigner/messagesigner.go +++ b/chain/messagesigner/messagesigner.go @@ -6,7 +6,6 @@ import ( "sync" "github.com/filecoin-project/go-address" - "github.com/filecoin-project/lotus/chain/messagepool" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/wallet" "github.com/filecoin-project/lotus/node/modules/dtypes" @@ -21,7 +20,7 @@ const dsKeyActorNonce = "ActorNextNonce" var log = logging.Logger("messagesigner") -type mpoolAPI interface { +type MpoolNonceAPI interface { GetNonce(address.Address) (uint64, error) } @@ -30,15 +29,11 @@ type mpoolAPI interface { type MessageSigner struct { wallet *wallet.Wallet lk sync.Mutex - mpool mpoolAPI + mpool MpoolNonceAPI ds datastore.Batching } -func NewMessageSigner(wallet *wallet.Wallet, mpool *messagepool.MessagePool, ds dtypes.MetadataDS) *MessageSigner { - return newMessageSigner(wallet, mpool, ds) -} - -func newMessageSigner(wallet *wallet.Wallet, mpool mpoolAPI, ds dtypes.MetadataDS) *MessageSigner { +func NewMessageSigner(wallet *wallet.Wallet, mpool MpoolNonceAPI, ds dtypes.MetadataDS) *MessageSigner { ds = namespace.Wrap(ds, datastore.NewKey("/message-signer/")) return &MessageSigner{ wallet: wallet, diff --git a/chain/messagesigner/messagesigner_test.go b/chain/messagesigner/messagesigner_test.go index 04869ff6dde..de74b8a482f 100644 --- a/chain/messagesigner/messagesigner_test.go +++ b/chain/messagesigner/messagesigner_test.go @@ -177,7 +177,7 @@ func TestMessageSignerSignMessage(t *testing.T) { t.Run(tt.name, func(t *testing.T) { mpool := newMockMpool() ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) - ms := newMessageSigner(w, mpool, ds) + ms := NewMessageSigner(w, mpool, ds) for _, m := range tt.msgs { if len(m.mpoolNonce) == 1 { diff --git a/chain/stmgr/stmgr.go b/chain/stmgr/stmgr.go index ba3dcd1d848..d6b6f436043 100644 --- a/chain/stmgr/stmgr.go +++ b/chain/stmgr/stmgr.go @@ -38,8 +38,16 @@ import ( "github.com/filecoin-project/lotus/chain/vm" ) +const LookbackNoLimit = abi.ChainEpoch(-1) + var log = logging.Logger("statemgr") +type StateManagerAPI interface { + LoadActorTsk(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*types.Actor, error) + LookupID(ctx context.Context, addr address.Address, ts *types.TipSet) (address.Address, error) + ResolveToKeyAddress(ctx context.Context, addr address.Address, ts *types.TipSet) (address.Address, error) +} + type versionSpec struct { networkVersion network.Version atOrBelow abi.ChainEpoch @@ -508,7 +516,7 @@ func (sm *StateManager) GetReceipt(ctx context.Context, msg cid.Cid, ts *types.T return nil, fmt.Errorf("failed to load message: %w", err) } - _, r, _, err := sm.searchBackForMsg(ctx, ts, m) + _, r, _, err := sm.searchBackForMsg(ctx, ts, m, LookbackNoLimit) if err != nil { return nil, fmt.Errorf("failed to look back through chain for message: %w", err) } @@ -517,9 +525,9 @@ func (sm *StateManager) GetReceipt(ctx context.Context, msg cid.Cid, ts *types.T } // WaitForMessage blocks until a message appears on chain. It looks backwards in the chain to see if this has already -// happened. It guarantees that the message has been on chain for at least confidence epochs without being reverted -// before returning. -func (sm *StateManager) WaitForMessage(ctx context.Context, mcid cid.Cid, confidence uint64) (*types.TipSet, *types.MessageReceipt, cid.Cid, error) { +// happened, with an optional limit to how many epochs it will search. It guarantees that the message has been on +// chain for at least confidence epochs without being reverted before returning. +func (sm *StateManager) WaitForMessage(ctx context.Context, mcid cid.Cid, confidence uint64, lookbackLimit abi.ChainEpoch) (*types.TipSet, *types.MessageReceipt, cid.Cid, error) { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -557,7 +565,7 @@ func (sm *StateManager) WaitForMessage(ctx context.Context, mcid cid.Cid, confid var backFm cid.Cid backSearchWait := make(chan struct{}) go func() { - fts, r, foundMsg, err := sm.searchBackForMsg(ctx, head[0].Val, msg) + fts, r, foundMsg, err := sm.searchBackForMsg(ctx, head[0].Val, msg, lookbackLimit) if err != nil { log.Warnf("failed to look back through chain for message: %w", err) return @@ -649,7 +657,7 @@ func (sm *StateManager) SearchForMessage(ctx context.Context, mcid cid.Cid) (*ty return head, r, foundMsg, nil } - fts, r, foundMsg, err := sm.searchBackForMsg(ctx, head, msg) + fts, r, foundMsg, err := sm.searchBackForMsg(ctx, head, msg, LookbackNoLimit) if err != nil { log.Warnf("failed to look back through chain for message %s", mcid) @@ -663,7 +671,15 @@ func (sm *StateManager) SearchForMessage(ctx context.Context, mcid cid.Cid) (*ty return fts, r, foundMsg, nil } -func (sm *StateManager) searchBackForMsg(ctx context.Context, from *types.TipSet, m types.ChainMsg) (*types.TipSet, *types.MessageReceipt, cid.Cid, error) { +// searchBackForMsg searches up to limit tipsets backwards from the given +// tipset for a message receipt. +// If limit is +// - 0 then no tipsets are searched +// - 5 then five tipset are searched +// - LookbackNoLimit then there is no limit +func (sm *StateManager) searchBackForMsg(ctx context.Context, from *types.TipSet, m types.ChainMsg, limit abi.ChainEpoch) (*types.TipSet, *types.MessageReceipt, cid.Cid, error) { + limitHeight := from.Height() - limit + noLimit := limit == LookbackNoLimit cur := from curActor, err := sm.LoadActor(ctx, m.VMMessage().From, cur) @@ -679,7 +695,9 @@ func (sm *StateManager) searchBackForMsg(ctx context.Context, from *types.TipSet mNonce := m.VMMessage().Nonce for { - if cur.Height() == 0 { + // If we've reached the genesis block, or we've reached the limit of + // how far back to look + if cur.Height() == 0 || !noLimit && cur.Height() <= limitHeight { // it ain't here! return nil, nil, cid.Undef, nil } @@ -1393,3 +1411,5 @@ func (sm *StateManager) GetMarketState(ctx context.Context, ts *types.TipSet) (m } return actState, nil } + +var _ StateManagerAPI = (*StateManager)(nil) diff --git a/chain/types/mock/chain.go b/chain/types/mock/chain.go index 559630619a3..85437079c1f 100644 --- a/chain/types/mock/chain.go +++ b/chain/types/mock/chain.go @@ -59,9 +59,11 @@ func MkBlock(parents *types.TipSet, weightInc uint64, ticketNonce uint64) *types var pcids []cid.Cid var height abi.ChainEpoch weight := types.NewInt(weightInc) + var timestamp uint64 if parents != nil { pcids = parents.Cids() height = parents.Height() + 1 + timestamp = parents.MinTimestamp() + build.BlockDelaySecs weight = types.BigAdd(parents.Blocks()[0].ParentWeight, weight) } @@ -79,6 +81,7 @@ func MkBlock(parents *types.TipSet, weightInc uint64, ticketNonce uint64) *types ParentWeight: weight, Messages: c, Height: height, + Timestamp: timestamp, ParentStateRoot: pstateRoot, BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS, Data: []byte("boo! im a signature")}, ParentBaseFee: types.NewInt(uint64(build.MinimumBaseFee)), diff --git a/cli/cmd.go b/cli/cmd.go index edcb69adc11..e6475934ba3 100644 --- a/cli/cmd.go +++ b/cli/cmd.go @@ -289,6 +289,15 @@ func GetWorkerAPI(ctx *cli.Context) (api.WorkerAPI, jsonrpc.ClientCloser, error) return client.NewWorkerRPC(ctx.Context, addr, headers) } +func GetGatewayAPI(ctx *cli.Context) (api.GatewayAPI, jsonrpc.ClientCloser, error) { + addr, headers, err := GetRawAPI(ctx, repo.FullNode) + if err != nil { + return nil, nil, err + } + + return client.NewGatewayRPC(ctx.Context, addr, headers) +} + func DaemonContext(cctx *cli.Context) context.Context { if mtCtx, ok := cctx.App.Metadata[metadataTraceContext]; ok { return mtCtx.(context.Context) diff --git a/cli/paych_test.go b/cli/paych_test.go index 18782b4e869..2aa5c600908 100644 --- a/cli/paych_test.go +++ b/cli/paych_test.go @@ -390,7 +390,7 @@ func checkVoucherOutput(t *testing.T, list string, vouchers []voucherSpec) { } func startTwoNodesOneMiner(ctx context.Context, t *testing.T, blocktime time.Duration) ([]test.TestNode, []address.Address) { - n, sn := builder.RPCMockSbBuilder(t, 2, test.OneMiner) + n, sn := builder.RPCMockSbBuilder(t, test.TwoFull, test.OneMiner) paymentCreator := n[0] paymentReceiver := n[1] diff --git a/cmd/lotus-gateway/api.go b/cmd/lotus-gateway/api.go index 0a6365dbd08..d5fac0a0610 100644 --- a/cmd/lotus-gateway/api.go +++ b/cmd/lotus-gateway/api.go @@ -6,85 +6,174 @@ import ( "time" "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/node/impl/full" "github.com/ipfs/go-cid" - - "go.opencensus.io/trace" ) -const LookbackCap = time.Hour +const ( + LookbackCap = time.Hour + stateWaitLookbackLimit = abi.ChainEpoch(20) +) var ( ErrLookbackTooLong = fmt.Errorf("lookbacks of more than %s are disallowed", LookbackCap) ) +// gatewayDepsAPI defines the API methods that the GatewayAPI depends on +// (to make it easy to mock for tests) +type gatewayDepsAPI interface { + ChainHead(ctx context.Context) (*types.TipSet, error) + ChainGetTipSet(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error) + ChainGetTipSetByHeight(ctx context.Context, h abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error) + GasEstimateMessageGas(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec, tsk types.TipSetKey) (*types.Message, error) + MpoolPushUntrusted(ctx context.Context, sm *types.SignedMessage) (cid.Cid, error) + MsigGetAvailableBalance(ctx context.Context, addr address.Address, tsk types.TipSetKey) (types.BigInt, error) + MsigGetVested(ctx context.Context, addr address.Address, start types.TipSetKey, end types.TipSetKey) (types.BigInt, error) + StateAccountKey(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error) + StateGetActor(ctx context.Context, actor address.Address, ts types.TipSetKey) (*types.Actor, error) + StateLookupID(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error) + StateWaitMsgLimited(ctx context.Context, msg cid.Cid, confidence uint64, h abi.ChainEpoch) (*api.MsgLookup, error) +} + type GatewayAPI struct { - api api.FullNode + api gatewayDepsAPI } -func (a *GatewayAPI) getTipsetTimestamp(ctx context.Context, tsk types.TipSetKey) (time.Time, error) { +func (a *GatewayAPI) checkTipsetKey(ctx context.Context, tsk types.TipSetKey) error { if tsk.IsEmpty() { - return time.Now(), nil + return nil } ts, err := a.api.ChainGetTipSet(ctx, tsk) if err != nil { - return time.Time{}, err + return err } - return time.Unix(int64(ts.Blocks()[0].Timestamp), 0), nil + return a.checkTipset(ts) } -func (a *GatewayAPI) checkTipset(ctx context.Context, ts types.TipSetKey) error { - when, err := a.getTipsetTimestamp(ctx, ts) - if err != nil { - return err +func (a *GatewayAPI) checkTipset(ts *types.TipSet) error { + at := time.Unix(int64(ts.Blocks()[0].Timestamp), 0) + if err := a.checkTimestamp(at); err != nil { + return fmt.Errorf("bad tipset: %w", err) } + return nil +} - if time.Since(when) > time.Hour { - return ErrLookbackTooLong - } +func (a *GatewayAPI) checkTipsetHeight(ts *types.TipSet, h abi.ChainEpoch) error { + tsBlock := ts.Blocks()[0] + heightDelta := time.Duration(uint64(tsBlock.Height-h)*build.BlockDelaySecs) * time.Second + timeAtHeight := time.Unix(int64(tsBlock.Timestamp), 0).Add(-heightDelta) + if err := a.checkTimestamp(timeAtHeight); err != nil { + return fmt.Errorf("bad tipset height: %w", err) + } return nil } -func (a *GatewayAPI) StateGetActor(ctx context.Context, actor address.Address, ts types.TipSetKey) (*types.Actor, error) { - ctx, span := trace.StartSpan(ctx, "StateGetActor") - defer span.End() - - if err := a.checkTipset(ctx, ts); err != nil { - return nil, fmt.Errorf("bad tipset: %w", err) +func (a *GatewayAPI) checkTimestamp(at time.Time) error { + if time.Since(at) > LookbackCap { + return ErrLookbackTooLong } - return a.api.StateGetActor(ctx, actor, ts) + return nil } func (a *GatewayAPI) ChainHead(ctx context.Context) (*types.TipSet, error) { - ctx, span := trace.StartSpan(ctx, "ChainHead") - defer span.End() // TODO: cache and invalidate cache when timestamp is up (or have internal ChainNotify) return a.api.ChainHead(ctx) } func (a *GatewayAPI) ChainGetTipSet(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error) { - ctx, span := trace.StartSpan(ctx, "ChainGetTipSet") - defer span.End() + return a.api.ChainGetTipSet(ctx, tsk) +} - if err := a.checkTipset(ctx, tsk); err != nil { - return nil, fmt.Errorf("bad tipset: %w", err) +func (a *GatewayAPI) ChainGetTipSetByHeight(ctx context.Context, h abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error) { + ts, err := a.api.ChainGetTipSet(ctx, tsk) + if err != nil { + return nil, err } - // TODO: since we're limiting lookbacks, should just cache this (could really even cache the json response bytes) - return a.api.ChainGetTipSet(ctx, tsk) + // Check if the tipset key refers to a tipset that's too far in the past + if err := a.checkTipset(ts); err != nil { + return nil, err + } + + // Check if the height is too far in the past + if err := a.checkTipsetHeight(ts, h); err != nil { + return nil, err + } + + return a.api.ChainGetTipSetByHeight(ctx, h, tsk) } -func (a *GatewayAPI) MpoolPush(ctx context.Context, sm *types.SignedMessage) (cid.Cid, error) { - ctx, span := trace.StartSpan(ctx, "MpoolPush") - defer span.End() +func (a *GatewayAPI) GasEstimateMessageGas(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec, tsk types.TipSetKey) (*types.Message, error) { + if err := a.checkTipsetKey(ctx, tsk); err != nil { + return nil, err + } - // TODO: additional anti-spam checks + return a.api.GasEstimateMessageGas(ctx, msg, spec, tsk) +} +func (a *GatewayAPI) MpoolPush(ctx context.Context, sm *types.SignedMessage) (cid.Cid, error) { + // TODO: additional anti-spam checks return a.api.MpoolPushUntrusted(ctx, sm) } + +func (a *GatewayAPI) MsigGetAvailableBalance(ctx context.Context, addr address.Address, tsk types.TipSetKey) (types.BigInt, error) { + if err := a.checkTipsetKey(ctx, tsk); err != nil { + return types.NewInt(0), err + } + + return a.api.MsigGetAvailableBalance(ctx, addr, tsk) +} + +func (a *GatewayAPI) MsigGetVested(ctx context.Context, addr address.Address, start types.TipSetKey, end types.TipSetKey) (types.BigInt, error) { + if err := a.checkTipsetKey(ctx, start); err != nil { + return types.NewInt(0), err + } + if err := a.checkTipsetKey(ctx, end); err != nil { + return types.NewInt(0), err + } + + return a.api.MsigGetVested(ctx, addr, start, end) +} + +func (a *GatewayAPI) StateAccountKey(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error) { + if err := a.checkTipsetKey(ctx, tsk); err != nil { + return address.Undef, err + } + + return a.api.StateAccountKey(ctx, addr, tsk) +} + +func (a *GatewayAPI) StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) { + if err := a.checkTipsetKey(ctx, tsk); err != nil { + return nil, err + } + + return a.api.StateGetActor(ctx, actor, tsk) +} + +func (a *GatewayAPI) StateLookupID(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error) { + if err := a.checkTipsetKey(ctx, tsk); err != nil { + return address.Undef, err + } + + return a.api.StateLookupID(ctx, addr, tsk) +} + +func (a *GatewayAPI) StateWaitMsg(ctx context.Context, msg cid.Cid, confidence uint64) (*api.MsgLookup, error) { + return a.api.StateWaitMsgLimited(ctx, msg, confidence, stateWaitLookbackLimit) +} + +var _ api.GatewayAPI = (*GatewayAPI)(nil) +var _ full.ChainModuleAPI = (*GatewayAPI)(nil) +var _ full.GasModuleAPI = (*GatewayAPI)(nil) +var _ full.MpoolModuleAPI = (*GatewayAPI)(nil) +var _ full.StateModuleAPI = (*GatewayAPI)(nil) diff --git a/cmd/lotus-gateway/api_test.go b/cmd/lotus-gateway/api_test.go new file mode 100644 index 00000000000..f34f887f5b6 --- /dev/null +++ b/cmd/lotus-gateway/api_test.go @@ -0,0 +1,191 @@ +package main + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/filecoin-project/lotus/build" + + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/lotus/chain/types/mock" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/types" + "github.com/ipfs/go-cid" +) + +func TestGatewayAPIChainGetTipSetByHeight(t *testing.T) { + ctx := context.Background() + + lookbackTimestamp := uint64(time.Now().Unix()) - uint64(LookbackCap.Seconds()) + type args struct { + h abi.ChainEpoch + tskh abi.ChainEpoch + genesisTS uint64 + } + tests := []struct { + name string + args args + expErr bool + }{{ + name: "basic", + args: args{ + h: abi.ChainEpoch(1), + tskh: abi.ChainEpoch(5), + }, + }, { + name: "genesis", + args: args{ + h: abi.ChainEpoch(0), + tskh: abi.ChainEpoch(5), + }, + }, { + name: "same epoch as tipset", + args: args{ + h: abi.ChainEpoch(5), + tskh: abi.ChainEpoch(5), + }, + }, { + name: "tipset too old", + args: args{ + // Tipset height is 5, genesis is at LookbackCap - 10 epochs. + // So resulting tipset height will be 5 epochs earlier than LookbackCap. + h: abi.ChainEpoch(1), + tskh: abi.ChainEpoch(5), + genesisTS: lookbackTimestamp - build.BlockDelaySecs*10, + }, + expErr: true, + }, { + name: "lookup height too old", + args: args{ + // Tipset height is 5, lookup height is 1, genesis is at LookbackCap - 3 epochs. + // So + // - lookup height will be 2 epochs earlier than LookbackCap. + // - tipset height will be 2 epochs later than LookbackCap. + h: abi.ChainEpoch(1), + tskh: abi.ChainEpoch(5), + genesisTS: lookbackTimestamp - build.BlockDelaySecs*3, + }, + expErr: true, + }, { + name: "tipset and lookup height within acceptable range", + args: args{ + // Tipset height is 5, lookup height is 1, genesis is at LookbackCap. + // So + // - lookup height will be 1 epoch later than LookbackCap. + // - tipset height will be 5 epochs later than LookbackCap. + h: abi.ChainEpoch(1), + tskh: abi.ChainEpoch(5), + genesisTS: lookbackTimestamp, + }, + }} + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + mock := &mockGatewayDepsAPI{} + a := &GatewayAPI{api: mock} + + // Create tipsets from genesis up to tskh and return the highest + ts := mock.createTipSets(tt.args.tskh, tt.args.genesisTS) + + got, err := a.ChainGetTipSetByHeight(ctx, tt.args.h, ts.Key()) + if tt.expErr { + require.Error(t, err) + } else { + require.NoError(t, err) + require.Equal(t, tt.args.h, got.Height()) + } + }) + } +} + +type mockGatewayDepsAPI struct { + lk sync.RWMutex + tipsets []*types.TipSet +} + +func (m *mockGatewayDepsAPI) ChainHead(ctx context.Context) (*types.TipSet, error) { + m.lk.RLock() + defer m.lk.RUnlock() + + return m.tipsets[len(m.tipsets)-1], nil +} + +func (m *mockGatewayDepsAPI) ChainGetTipSet(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error) { + m.lk.RLock() + defer m.lk.RUnlock() + + for _, ts := range m.tipsets { + if ts.Key() == tsk { + return ts, nil + } + } + + return nil, nil +} + +// createTipSets creates tipsets from genesis up to tskh and returns the highest +func (m *mockGatewayDepsAPI) createTipSets(h abi.ChainEpoch, genesisTimestamp uint64) *types.TipSet { + m.lk.Lock() + defer m.lk.Unlock() + + targeth := h + 1 // add one for genesis block + if genesisTimestamp == 0 { + genesisTimestamp = uint64(time.Now().Unix()) - build.BlockDelaySecs*uint64(targeth) + } + var currts *types.TipSet + for currh := abi.ChainEpoch(0); currh < targeth; currh++ { + blks := mock.MkBlock(currts, 1, 1) + if currh == 0 { + blks.Timestamp = genesisTimestamp + } + currts = mock.TipSet(blks) + m.tipsets = append(m.tipsets, currts) + } + + return m.tipsets[len(m.tipsets)-1] +} + +func (m *mockGatewayDepsAPI) ChainGetTipSetByHeight(ctx context.Context, h abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error) { + m.lk.Lock() + defer m.lk.Unlock() + + return m.tipsets[h], nil +} + +func (m *mockGatewayDepsAPI) GasEstimateMessageGas(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec, tsk types.TipSetKey) (*types.Message, error) { + panic("implement me") +} + +func (m *mockGatewayDepsAPI) MpoolPushUntrusted(ctx context.Context, sm *types.SignedMessage) (cid.Cid, error) { + panic("implement me") +} + +func (m *mockGatewayDepsAPI) MsigGetAvailableBalance(ctx context.Context, addr address.Address, tsk types.TipSetKey) (types.BigInt, error) { + panic("implement me") +} + +func (m *mockGatewayDepsAPI) MsigGetVested(ctx context.Context, addr address.Address, start types.TipSetKey, end types.TipSetKey) (types.BigInt, error) { + panic("implement me") +} + +func (m *mockGatewayDepsAPI) StateAccountKey(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error) { + panic("implement me") +} + +func (m *mockGatewayDepsAPI) StateGetActor(ctx context.Context, actor address.Address, ts types.TipSetKey) (*types.Actor, error) { + panic("implement me") +} + +func (m *mockGatewayDepsAPI) StateLookupID(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error) { + panic("implement me") +} + +func (m *mockGatewayDepsAPI) StateWaitMsgLimited(ctx context.Context, msg cid.Cid, confidence uint64, h abi.ChainEpoch) (*api.MsgLookup, error) { + panic("implement me") +} diff --git a/cmd/lotus-gateway/endtoend_test.go b/cmd/lotus-gateway/endtoend_test.go new file mode 100644 index 00000000000..206034968d1 --- /dev/null +++ b/cmd/lotus-gateway/endtoend_test.go @@ -0,0 +1,211 @@ +package main + +import ( + "bytes" + "context" + "fmt" + "os" + "testing" + "time" + + init0 "github.com/filecoin-project/specs-actors/actors/builtin/init" + "github.com/filecoin-project/specs-actors/actors/builtin/multisig" + + "github.com/stretchr/testify/require" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-jsonrpc" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/api/client" + "github.com/filecoin-project/lotus/api/test" + "github.com/filecoin-project/lotus/chain/actors/policy" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/chain/wallet" + "github.com/filecoin-project/lotus/node" + builder "github.com/filecoin-project/lotus/node/test" +) + +func init() { + policy.SetSupportedProofTypes(abi.RegisteredSealProof_StackedDrg2KiBV1) + policy.SetConsensusMinerMinPower(abi.NewStoragePower(2048)) + policy.SetMinVerifiedDealSize(abi.NewStoragePower(256)) +} + +// TestEndToEnd tests that API calls can be made on a lite node that is +// connected through a gateway to a full API node +func TestEndToEnd(t *testing.T) { + _ = os.Setenv("BELLMAN_NO_GPU", "1") + + blocktime := 5 * time.Millisecond + ctx := context.Background() + full, lite, closer := startNodes(ctx, t, blocktime) + defer closer() + + // The full node starts with a wallet + fullWalletAddr, err := full.WalletDefaultAddress(ctx) + require.NoError(t, err) + + // Check the full node's wallet balance from the lite node + balance, err := lite.WalletBalance(ctx, fullWalletAddr) + require.NoError(t, err) + fmt.Println(balance) + + // Create a wallet on the lite node + liteWalletAddr, err := lite.WalletNew(ctx, wallet.ActSigType("secp256k1")) + require.NoError(t, err) + + // Send some funds from the full node to the lite node + err = sendFunds(ctx, t, full, fullWalletAddr, liteWalletAddr, types.NewInt(1e18)) + require.NoError(t, err) + + // Send some funds from the lite node back to the full node + err = sendFunds(ctx, t, lite, liteWalletAddr, fullWalletAddr, types.NewInt(100)) + require.NoError(t, err) + + // Sign some data with the lite node wallet address + data := []byte("hello") + sig, err := lite.WalletSign(ctx, liteWalletAddr, data) + require.NoError(t, err) + + // Verify the signature + ok, err := lite.WalletVerify(ctx, liteWalletAddr, data, sig) + require.NoError(t, err) + require.True(t, ok) + + // Create some wallets on the lite node to use for testing multisig + var walletAddrs []address.Address + for i := 0; i < 4; i++ { + addr, err := lite.WalletNew(ctx, wallet.ActSigType("secp256k1")) + require.NoError(t, err) + + walletAddrs = append(walletAddrs, addr) + + err = sendFunds(ctx, t, lite, liteWalletAddr, addr, types.NewInt(1e15)) + require.NoError(t, err) + } + + // Create an msig with three of the addresses and threshold of two sigs + msigAddrs := walletAddrs[:3] + amt := types.NewInt(1000) + addProposal, err := lite.MsigCreate(ctx, 2, msigAddrs, abi.ChainEpoch(50), amt, liteWalletAddr, types.NewInt(0)) + require.NoError(t, err) + + res, err := lite.StateWaitMsg(ctx, addProposal, 1) + require.NoError(t, err) + require.EqualValues(t, 0, res.Receipt.ExitCode) + + var execReturn init0.ExecReturn + err = execReturn.UnmarshalCBOR(bytes.NewReader(res.Receipt.Return)) + require.NoError(t, err) + + // Get available balance of msig: should be greater than zero and less + // than initial amount + msig := execReturn.IDAddress + msigBalance, err := lite.MsigGetAvailableBalance(ctx, msig, types.EmptyTSK) + require.NoError(t, err) + require.Greater(t, msigBalance.Int64(), int64(0)) + require.Less(t, msigBalance.Int64(), amt.Int64()) + + // Propose to add a new address to the msig + addProposal, err = lite.MsigAddPropose(ctx, msig, walletAddrs[0], walletAddrs[3], false) + require.NoError(t, err) + + res, err = lite.StateWaitMsg(ctx, addProposal, 1) + require.NoError(t, err) + require.EqualValues(t, 0, res.Receipt.ExitCode) + + var proposeReturn multisig.ProposeReturn + err = proposeReturn.UnmarshalCBOR(bytes.NewReader(res.Receipt.Return)) + require.NoError(t, err) + + // Approve proposal (proposer is first (implicit) signer, approver is + // second signer + txnID := uint64(proposeReturn.TxnID) + approval1, err := lite.MsigAddApprove(ctx, msig, walletAddrs[1], txnID, walletAddrs[0], walletAddrs[3], false) + require.NoError(t, err) + + res, err = lite.StateWaitMsg(ctx, approval1, 1) + require.NoError(t, err) + require.EqualValues(t, 0, res.Receipt.ExitCode) + + var approveReturn multisig.ApproveReturn + err = approveReturn.UnmarshalCBOR(bytes.NewReader(res.Receipt.Return)) + require.NoError(t, err) + require.True(t, approveReturn.Applied) +} + +func sendFunds(ctx context.Context, t *testing.T, fromNode test.TestNode, fromAddr address.Address, toAddr address.Address, amt types.BigInt) error { + msg := &types.Message{ + From: fromAddr, + To: toAddr, + Value: amt, + } + + sm, err := fromNode.MpoolPushMessage(ctx, msg, nil) + if err != nil { + return err + } + + res, err := fromNode.StateWaitMsg(ctx, sm.Cid(), 1) + if err != nil { + return err + } + if res.Receipt.ExitCode != 0 { + return xerrors.Errorf("send funds failed with exit code %d", res.Receipt.ExitCode) + } + + return nil +} + +func startNodes(ctx context.Context, t *testing.T, blocktime time.Duration) (test.TestNode, test.TestNode, jsonrpc.ClientCloser) { + var closer jsonrpc.ClientCloser + + // Create one miner and two full nodes. + // - Put a gateway server in front of full node 1 + // - Start full node 2 in lite mode + // - Connect lite node -> gateway server -> full node + opts := append( + // Full node + test.OneFull, + // Lite node + test.FullNodeOpts{ + Lite: true, + Opts: func(nodes []test.TestNode) node.Option { + fullNode := nodes[0] + + // Create a gateway server in front of the full node + _, addr, err := builder.CreateRPCServer(&GatewayAPI{api: fullNode}) + require.NoError(t, err) + + // Create a gateway client API that connects to the gateway server + var gapi api.GatewayAPI + gapi, closer, err = client.NewGatewayRPC(ctx, addr, nil) + require.NoError(t, err) + + // Provide the gateway API to dependency injection + return node.Override(new(api.GatewayAPI), gapi) + }, + }, + ) + n, sn := builder.RPCMockSbBuilder(t, opts, test.OneMiner) + + full := n[0] + lite := n[1] + miner := sn[0] + + // Get the listener address for the full node + fullAddr, err := full.NetAddrsListen(ctx) + require.NoError(t, err) + + // Connect the miner and the full node + err = miner.NetConnect(ctx, fullAddr) + require.NoError(t, err) + + // Start mining blocks + bm := test.NewBlockMiner(ctx, t, miner, blocktime) + bm.MineBlocks() + + return full, lite, closer +} diff --git a/cmd/lotus-storage-miner/allinfo_test.go b/cmd/lotus-storage-miner/allinfo_test.go index 8f744c4b3ad..a458c024b55 100644 --- a/cmd/lotus-storage-miner/allinfo_test.go +++ b/cmd/lotus-storage-miner/allinfo_test.go @@ -5,8 +5,6 @@ import ( "testing" "time" - "github.com/filecoin-project/lotus/node" - logging "github.com/ipfs/go-log/v2" "github.com/stretchr/testify/require" "github.com/urfave/cli/v2" @@ -64,8 +62,8 @@ func TestMinerAllInfo(t *testing.T) { require.NoError(t, infoAllCmd.Action(cctx)) } - bp := func(t *testing.T, nFull int, storage []test.StorageMiner, opts ...node.Option) ([]test.TestNode, []test.TestStorageNode) { - n, sn = builder.Builder(t, nFull, storage, opts...) + bp := func(t *testing.T, fullOpts []test.FullNodeOpts, storage []test.StorageMiner) ([]test.TestNode, []test.TestStorageNode) { + n, sn = builder.Builder(t, fullOpts, storage) t.Run("pre-info-all", run) diff --git a/cmd/lotus/daemon.go b/cmd/lotus/daemon.go index a0f754a6038..5964a2458fd 100644 --- a/cmd/lotus/daemon.go +++ b/cmd/lotus/daemon.go @@ -15,8 +15,6 @@ import ( "runtime/pprof" "strings" - "github.com/filecoin-project/lotus/chain/types" - paramfetch "github.com/filecoin-project/go-paramfetch" "github.com/mitchellh/go-homedir" "github.com/multiformats/go-multiaddr" @@ -32,6 +30,7 @@ import ( "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/store" + "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/vm" lcli "github.com/filecoin-project/lotus/cli" "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" @@ -114,6 +113,11 @@ var DaemonCmd = &cli.Command{ Name: "halt-after-import", Usage: "halt the process after importing chain from file", }, + &cli.BoolFlag{ + Name: "lite", + Usage: "start lotus in lite mode", + Hidden: true, + }, &cli.StringFlag{ Name: "pprof", Usage: "specify name of file for writing cpu profile to", @@ -133,6 +137,8 @@ var DaemonCmd = &cli.Command{ }, }, Action: func(cctx *cli.Context) error { + isLite := cctx.Bool("lite") + err := runmetrics.Enable(runmetrics.RunMetricOptions{ EnableCPU: true, EnableMemory: true, @@ -192,8 +198,10 @@ var DaemonCmd = &cli.Command{ return xerrors.Errorf("repo init error: %w", err) } - if err := paramfetch.GetParams(lcli.ReqContext(cctx), build.ParametersJSON(), 0); err != nil { - return xerrors.Errorf("fetching proof parameters: %w", err) + if !isLite { + if err := paramfetch.GetParams(lcli.ReqContext(cctx), build.ParametersJSON(), 0); err != nil { + return xerrors.Errorf("fetching proof parameters: %w", err) + } } var genBytes []byte @@ -240,10 +248,23 @@ var DaemonCmd = &cli.Command{ shutdownChan := make(chan struct{}) + // If the daemon is started in "lite mode", provide a GatewayAPI + // for RPC calls + liteModeDeps := node.Options() + if isLite { + gapi, closer, err := lcli.GetGatewayAPI(cctx) + if err != nil { + return err + } + + defer closer() + liteModeDeps = node.Override(new(api.GatewayAPI), gapi) + } + var api api.FullNode stop, err := node.New(ctx, - node.FullAPI(&api), + node.FullAPI(&api, node.Lite(isLite)), node.Override(new(dtypes.Bootstrapper), isBootstrapper), node.Override(new(dtypes.ShutdownChan), shutdownChan), @@ -251,6 +272,7 @@ var DaemonCmd = &cli.Command{ node.Repo(r), genesis, + liteModeDeps, node.ApplyIf(func(s *node.Settings) bool { return cctx.IsSet("api") }, node.Override(node.SetApiEndpointKey, func(lr repo.LockedRepo) error { diff --git a/documentation/en/api-methods.md b/documentation/en/api-methods.md index 8a288a4bf53..cc6d92e4f0d 100644 --- a/documentation/en/api-methods.md +++ b/documentation/en/api-methods.md @@ -167,6 +167,7 @@ * [StateVerifiedRegistryRootKey](#StateVerifiedRegistryRootKey) * [StateVerifierStatus](#StateVerifierStatus) * [StateWaitMsg](#StateWaitMsg) + * [StateWaitMsgLimited](#StateWaitMsgLimited) * [Sync](#Sync) * [SyncCheckBad](#SyncCheckBad) * [SyncCheckpoint](#SyncCheckpoint) @@ -4322,6 +4323,49 @@ Response: } ``` +### StateWaitMsgLimited +StateWaitMsgLimited looks back up to limit epochs in the chain for a message. +If not found, it blocks until the message arrives on chain, and gets to the +indicated confidence depth. + + +Perms: read + +Inputs: +```json +[ + { + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" + }, + 42, + 10101 +] +``` + +Response: +```json +{ + "Message": { + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" + }, + "Receipt": { + "ExitCode": 0, + "Return": "Ynl0ZSBhcnJheQ==", + "GasUsed": 9 + }, + "ReturnDec": {}, + "TipSet": [ + { + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" + }, + { + "/": "bafy2bzacebp3shtrn43k7g3unredz7fxn4gj533d3o43tqn2p2ipxxhrvchve" + } + ], + "Height": 10101 +} +``` + ## Sync The Sync method group contains methods for interacting with and observing the lotus sync service. diff --git a/markets/storageadapter/client.go b/markets/storageadapter/client.go index 411c86ec92b..647a1675d3e 100644 --- a/markets/storageadapter/client.go +++ b/markets/storageadapter/client.go @@ -187,7 +187,7 @@ func (c *ClientNodeAdapter) ValidatePublishedDeal(ctx context.Context, deal stor } // TODO: timeout - _, ret, _, err := c.sm.WaitForMessage(ctx, *deal.PublishMessage, build.MessageConfidence) + _, ret, _, err := c.sm.WaitForMessage(ctx, *deal.PublishMessage, build.MessageConfidence, stmgr.LookbackNoLimit) if err != nil { return 0, xerrors.Errorf("waiting for deal publish message: %w", err) } diff --git a/node/builder.go b/node/builder.go index 0d48ec1301e..71295dbc815 100644 --- a/node/builder.go +++ b/node/builder.go @@ -6,6 +6,13 @@ import ( "os" "time" + "github.com/filecoin-project/lotus/chain" + "github.com/filecoin-project/lotus/chain/exchange" + "github.com/filecoin-project/lotus/chain/store" + "github.com/filecoin-project/lotus/chain/vm" + "github.com/filecoin-project/lotus/chain/wallet" + "github.com/filecoin-project/lotus/node/hello" + logging "github.com/ipfs/go-log" ci "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/host" @@ -29,9 +36,7 @@ import ( storage2 "github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/chain" "github.com/filecoin-project/lotus/chain/beacon" - "github.com/filecoin-project/lotus/chain/exchange" "github.com/filecoin-project/lotus/chain/gen" "github.com/filecoin-project/lotus/chain/gen/slashfilter" "github.com/filecoin-project/lotus/chain/market" @@ -39,10 +44,7 @@ import ( "github.com/filecoin-project/lotus/chain/messagesigner" "github.com/filecoin-project/lotus/chain/metrics" "github.com/filecoin-project/lotus/chain/stmgr" - "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/chain/vm" - "github.com/filecoin-project/lotus/chain/wallet" sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage" "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" "github.com/filecoin-project/lotus/extern/sector-storage/stores" @@ -56,9 +58,9 @@ import ( "github.com/filecoin-project/lotus/markets/storageadapter" "github.com/filecoin-project/lotus/miner" "github.com/filecoin-project/lotus/node/config" - "github.com/filecoin-project/lotus/node/hello" "github.com/filecoin-project/lotus/node/impl" "github.com/filecoin-project/lotus/node/impl/common" + "github.com/filecoin-project/lotus/node/impl/full" "github.com/filecoin-project/lotus/node/modules" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/helpers" @@ -158,7 +160,7 @@ type Settings struct { Online bool // Online option applied Config bool // Config option applied - + Lite bool // Start node in "lite" mode } func defaults() []Option { @@ -232,6 +234,10 @@ func isType(t repo.RepoType) func(s *Settings) bool { // Online sets up basic libp2p node func Online() Option { + isFullOrLiteNode := func(s *Settings) bool { return s.nodeType == repo.FullNode } + isFullNode := func(s *Settings) bool { return s.nodeType == repo.FullNode && !s.Lite } + isLiteNode := func(s *Settings) bool { return s.nodeType == repo.FullNode && s.Lite } + return Options( // make sure that online is applied before Config. // This is important because Config overrides some of Online units @@ -245,22 +251,20 @@ func Online() Option { // common Override(new(*slashfilter.SlashFilter), modules.NewSlashFilter), - // Full node - - ApplyIf(isType(repo.FullNode), + // Full node or lite node + ApplyIf(isFullOrLiteNode, // TODO: Fix offline mode Override(new(dtypes.BootstrapPeers), modules.BuiltinBootstrap), Override(new(dtypes.DrandBootstrap), modules.DrandBootstrap), Override(new(dtypes.DrandSchedule), modules.BuiltinDrandConfig), - Override(HandleIncomingMessagesKey, modules.HandleIncomingMessages), - Override(new(ffiwrapper.Verifier), ffiwrapper.ProofVerifier), Override(new(vm.SyscallBuilder), vm.Syscalls), Override(new(*store.ChainStore), modules.ChainStore), Override(new(stmgr.UpgradeSchedule), stmgr.DefaultUpgradeSchedule()), Override(new(*stmgr.StateManager), stmgr.NewStateManagerWithUpgradeSchedule), + Override(new(stmgr.StateManagerAPI), From(new(*stmgr.StateManager))), Override(new(*wallet.Wallet), wallet.NewWallet), Override(new(*messagesigner.MessageSigner), messagesigner.NewMessageSigner), @@ -288,12 +292,6 @@ func Online() Option { Override(new(dtypes.Graphsync), modules.Graphsync), Override(new(*dtypes.MpoolLocker), new(dtypes.MpoolLocker)), - - Override(RunHelloKey, modules.RunHello), - Override(RunChainExchangeKey, modules.RunChainExchange), - Override(RunPeerMgrKey, modules.RunPeerMgr), - Override(HandleIncomingBlocksKey, modules.HandleIncomingBlocks), - Override(new(*discoveryimpl.Local), modules.NewLocalDiscovery), Override(new(discovery.PeerResolver), modules.RetrievalResolver), @@ -312,8 +310,34 @@ func Online() Option { Override(SettlePaymentChannelsKey, settler.SettlePaymentChannels), ), + // Lite node + ApplyIf(isLiteNode, + Override(new(messagesigner.MpoolNonceAPI), From(new(modules.MpoolNonceAPI))), + Override(new(full.ChainModuleAPI), From(new(api.GatewayAPI))), + Override(new(full.GasModuleAPI), From(new(api.GatewayAPI))), + Override(new(full.MpoolModuleAPI), From(new(api.GatewayAPI))), + Override(new(full.StateModuleAPI), From(new(api.GatewayAPI))), + Override(new(stmgr.StateManagerAPI), modules.NewRPCStateManager), + ), + + // Full node + ApplyIf(isFullNode, + Override(new(messagesigner.MpoolNonceAPI), From(new(*messagepool.MessagePool))), + Override(new(full.ChainModuleAPI), From(new(full.ChainModule))), + Override(new(full.GasModuleAPI), From(new(full.GasModule))), + Override(new(full.MpoolModuleAPI), From(new(full.MpoolModule))), + Override(new(full.StateModuleAPI), From(new(full.StateModule))), + Override(new(stmgr.StateManagerAPI), From(new(*stmgr.StateManager))), + + Override(RunHelloKey, modules.RunHello), + Override(RunChainExchangeKey, modules.RunChainExchange), + Override(RunPeerMgrKey, modules.RunPeerMgr), + Override(HandleIncomingMessagesKey, modules.HandleIncomingMessages), + Override(HandleIncomingBlocksKey, modules.HandleIncomingBlocks), + ), + // miner - ApplyIf(func(s *Settings) bool { return s.nodeType == repo.StorageMiner }, + ApplyIf(isType(repo.StorageMiner), Override(new(api.Common), From(new(common.CommonAPI))), Override(new(sectorstorage.StorageAuth), modules.StorageAuth), @@ -507,12 +531,22 @@ func Repo(r repo.Repo) Option { } } -func FullAPI(out *api.FullNode) Option { +type FullOption = Option + +func Lite(enable bool) FullOption { + return func(s *Settings) error { + s.Lite = enable + return nil + } +} + +func FullAPI(out *api.FullNode, fopts ...FullOption) Option { return Options( func(s *Settings) error { s.nodeType = repo.FullNode return nil }, + Options(fopts...), func(s *Settings) error { resAPI := &impl.FullNodeAPI{} s.invokes[ExtractApiKey] = fx.Populate(resAPI) diff --git a/node/impl/full/chain.go b/node/impl/full/chain.go index aa2ae4df191..5b4f411140b 100644 --- a/node/impl/full/chain.go +++ b/node/impl/full/chain.go @@ -39,10 +39,28 @@ import ( var log = logging.Logger("fullnode") +type ChainModuleAPI interface { + ChainHead(context.Context) (*types.TipSet, error) + ChainGetTipSet(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error) + ChainGetTipSetByHeight(ctx context.Context, h abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error) +} + +// ChainModule provides a default implementation of ChainModuleAPI. +// It can be swapped out with another implementation through Dependency +// Injection (for example with a thin RPC client). +type ChainModule struct { + fx.In + + Chain *store.ChainStore +} + +var _ ChainModuleAPI = (*ChainModule)(nil) + type ChainAPI struct { fx.In WalletAPI + ChainModuleAPI Chain *store.ChainStore } @@ -51,8 +69,8 @@ func (a *ChainAPI) ChainNotify(ctx context.Context) (<-chan []*api.HeadChange, e return a.Chain.SubHeadChanges(ctx), nil } -func (a *ChainAPI) ChainHead(context.Context) (*types.TipSet, error) { - return a.Chain.GetHeaviestTipSet(), nil +func (m *ChainModule) ChainHead(context.Context) (*types.TipSet, error) { + return m.Chain.GetHeaviestTipSet(), nil } func (a *ChainAPI) ChainGetRandomnessFromTickets(ctx context.Context, tsk types.TipSetKey, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte) (abi.Randomness, error) { @@ -77,8 +95,8 @@ func (a *ChainAPI) ChainGetBlock(ctx context.Context, msg cid.Cid) (*types.Block return a.Chain.GetBlock(msg) } -func (a *ChainAPI) ChainGetTipSet(ctx context.Context, key types.TipSetKey) (*types.TipSet, error) { - return a.Chain.LoadTipSet(key) +func (m *ChainModule) ChainGetTipSet(ctx context.Context, key types.TipSetKey) (*types.TipSet, error) { + return m.Chain.LoadTipSet(key) } func (a *ChainAPI) ChainGetBlockMessages(ctx context.Context, msg cid.Cid) (*api.BlockMessages, error) { @@ -180,12 +198,12 @@ func (a *ChainAPI) ChainGetParentReceipts(ctx context.Context, bcid cid.Cid) ([] return out, nil } -func (a *ChainAPI) ChainGetTipSetByHeight(ctx context.Context, h abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error) { - ts, err := a.Chain.GetTipSetFromKey(tsk) +func (m *ChainModule) ChainGetTipSetByHeight(ctx context.Context, h abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error) { + ts, err := m.Chain.GetTipSetFromKey(tsk) if err != nil { return nil, xerrors.Errorf("loading tipset %s: %w", tsk, err) } - return a.Chain.GetTipsetByHeight(ctx, h, ts, true) + return m.Chain.GetTipsetByHeight(ctx, h, ts, true) } func (a *ChainAPI) ChainReadObj(ctx context.Context, obj cid.Cid) ([]byte, error) { diff --git a/node/impl/full/gas.go b/node/impl/full/gas.go index 3580ca26dbe..0cb1eb08419 100644 --- a/node/impl/full/gas.go +++ b/node/impl/full/gas.go @@ -26,8 +26,27 @@ import ( "github.com/filecoin-project/lotus/chain/types" ) +type GasModuleAPI interface { + GasEstimateMessageGas(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec, tsk types.TipSetKey) (*types.Message, error) +} + +// GasModule provides a default implementation of GasModuleAPI. +// It can be swapped out with another implementation through Dependency +// Injection (for example with a thin RPC client). +type GasModule struct { + fx.In + Stmgr *stmgr.StateManager + Chain *store.ChainStore + Mpool *messagepool.MessagePool +} + +var _ GasModuleAPI = (*GasModule)(nil) + type GasAPI struct { fx.In + + GasModuleAPI + Stmgr *stmgr.StateManager Chain *store.ChainStore Mpool *messagepool.MessagePool @@ -36,9 +55,24 @@ type GasAPI struct { const MinGasPremium = 100e3 const MaxSpendOnFeeDenom = 100 -func (a *GasAPI) GasEstimateFeeCap(ctx context.Context, msg *types.Message, maxqueueblks int64, - tsk types.TipSetKey) (types.BigInt, error) { - ts := a.Chain.GetHeaviestTipSet() +func (a *GasAPI) GasEstimateFeeCap( + ctx context.Context, + msg *types.Message, + maxqueueblks int64, + tsk types.TipSetKey, +) (types.BigInt, error) { + return gasEstimateFeeCap(a.Chain, msg, maxqueueblks) +} +func (m *GasModule) GasEstimateFeeCap( + ctx context.Context, + msg *types.Message, + maxqueueblks int64, + tsk types.TipSetKey, +) (types.BigInt, error) { + return gasEstimateFeeCap(m.Chain, msg, maxqueueblks) +} +func gasEstimateFeeCap(cstore *store.ChainStore, msg *types.Message, maxqueueblks int64) (types.BigInt, error) { + ts := cstore.GetHeaviestTipSet() parentBaseFee := ts.Blocks()[0].ParentBaseFee increaseFactor := math.Pow(1.+1./float64(build.BaseFeeMaxChangeDenom), float64(maxqueueblks)) @@ -82,9 +116,25 @@ func medianGasPremium(prices []gasMeta, blocks int) abi.TokenAmount { return premium } -func (a *GasAPI) GasEstimateGasPremium(ctx context.Context, nblocksincl uint64, - sender address.Address, gaslimit int64, _ types.TipSetKey) (types.BigInt, error) { - +func (a *GasAPI) GasEstimateGasPremium( + ctx context.Context, + nblocksincl uint64, + sender address.Address, + gaslimit int64, + _ types.TipSetKey, +) (types.BigInt, error) { + return gasEstimateGasPremium(a.Chain, nblocksincl) +} +func (m *GasModule) GasEstimateGasPremium( + ctx context.Context, + nblocksincl uint64, + sender address.Address, + gaslimit int64, + _ types.TipSetKey, +) (types.BigInt, error) { + return gasEstimateGasPremium(m.Chain, nblocksincl) +} +func gasEstimateGasPremium(cstore *store.ChainStore, nblocksincl uint64) (types.BigInt, error) { if nblocksincl == 0 { nblocksincl = 1 } @@ -92,20 +142,20 @@ func (a *GasAPI) GasEstimateGasPremium(ctx context.Context, nblocksincl uint64, var prices []gasMeta var blocks int - ts := a.Chain.GetHeaviestTipSet() + ts := cstore.GetHeaviestTipSet() for i := uint64(0); i < nblocksincl*2; i++ { if ts.Height() == 0 { break // genesis } - pts, err := a.Chain.LoadTipSet(ts.Parents()) + pts, err := cstore.LoadTipSet(ts.Parents()) if err != nil { return types.BigInt{}, err } blocks += len(pts.Blocks()) - msgs, err := a.Chain.MessagesForTipset(pts) + msgs, err := cstore.MessagesForTipset(pts) if err != nil { return types.BigInt{}, xerrors.Errorf("loading messages: %w", err) } @@ -142,19 +192,30 @@ func (a *GasAPI) GasEstimateGasPremium(ctx context.Context, nblocksincl uint64, } func (a *GasAPI) GasEstimateGasLimit(ctx context.Context, msgIn *types.Message, _ types.TipSetKey) (int64, error) { - + return gasEstimateGasLimit(ctx, a.Chain, a.Stmgr, a.Mpool, msgIn) +} +func (m *GasModule) GasEstimateGasLimit(ctx context.Context, msgIn *types.Message, _ types.TipSetKey) (int64, error) { + return gasEstimateGasLimit(ctx, m.Chain, m.Stmgr, m.Mpool, msgIn) +} +func gasEstimateGasLimit( + ctx context.Context, + cstore *store.ChainStore, + smgr *stmgr.StateManager, + mpool *messagepool.MessagePool, + msgIn *types.Message, +) (int64, error) { msg := *msgIn msg.GasLimit = build.BlockGasLimit msg.GasFeeCap = types.NewInt(uint64(build.MinimumBaseFee) + 1) msg.GasPremium = types.NewInt(1) - currTs := a.Chain.GetHeaviestTipSet() - fromA, err := a.Stmgr.ResolveToKeyAddress(ctx, msgIn.From, currTs) + currTs := cstore.GetHeaviestTipSet() + fromA, err := smgr.ResolveToKeyAddress(ctx, msgIn.From, currTs) if err != nil { return -1, xerrors.Errorf("getting key address: %w", err) } - pending, ts := a.Mpool.PendingFor(fromA) + pending, ts := mpool.PendingFor(fromA) priorMsgs := make([]types.ChainMsg, 0, len(pending)) for _, m := range pending { priorMsgs = append(priorMsgs, m) @@ -163,11 +224,11 @@ func (a *GasAPI) GasEstimateGasLimit(ctx context.Context, msgIn *types.Message, // Try calling until we find a height with no migration. var res *api.InvocResult for { - res, err = a.Stmgr.CallWithGas(ctx, &msg, priorMsgs, ts) + res, err = smgr.CallWithGas(ctx, &msg, priorMsgs, ts) if err != stmgr.ErrExpensiveFork { break } - ts, err = a.Chain.GetTipSetFromKey(ts.Parents()) + ts, err = cstore.GetTipSetFromKey(ts.Parents()) if err != nil { return -1, xerrors.Errorf("getting parent tipset: %w", err) } @@ -180,7 +241,7 @@ func (a *GasAPI) GasEstimateGasLimit(ctx context.Context, msgIn *types.Message, } // Special case for PaymentChannel collect, which is deleting actor - st, err := a.Stmgr.ParentState(ts) + st, err := smgr.ParentState(ts) if err != nil { _ = err // somewhat ignore it as it can happen and we just want to detect @@ -206,17 +267,17 @@ func (a *GasAPI) GasEstimateGasLimit(ctx context.Context, msgIn *types.Message, return res.MsgRct.GasUsed + 76e3, nil } -func (a *GasAPI) GasEstimateMessageGas(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec, _ types.TipSetKey) (*types.Message, error) { +func (m *GasModule) GasEstimateMessageGas(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec, _ types.TipSetKey) (*types.Message, error) { if msg.GasLimit == 0 { - gasLimit, err := a.GasEstimateGasLimit(ctx, msg, types.TipSetKey{}) + gasLimit, err := m.GasEstimateGasLimit(ctx, msg, types.TipSetKey{}) if err != nil { return nil, xerrors.Errorf("estimating gas used: %w", err) } - msg.GasLimit = int64(float64(gasLimit) * a.Mpool.GetConfig().GasLimitOverestimation) + msg.GasLimit = int64(float64(gasLimit) * m.Mpool.GetConfig().GasLimitOverestimation) } if msg.GasPremium == types.EmptyInt || types.BigCmp(msg.GasPremium, types.NewInt(0)) == 0 { - gasPremium, err := a.GasEstimateGasPremium(ctx, 2, msg.From, msg.GasLimit, types.TipSetKey{}) + gasPremium, err := m.GasEstimateGasPremium(ctx, 2, msg.From, msg.GasLimit, types.TipSetKey{}) if err != nil { return nil, xerrors.Errorf("estimating gas price: %w", err) } @@ -224,7 +285,7 @@ func (a *GasAPI) GasEstimateMessageGas(ctx context.Context, msg *types.Message, } if msg.GasFeeCap == types.EmptyInt || types.BigCmp(msg.GasFeeCap, types.NewInt(0)) == 0 { - feeCap, err := a.GasEstimateFeeCap(ctx, msg, 20, types.EmptyTSK) + feeCap, err := m.GasEstimateFeeCap(ctx, msg, 20, types.EmptyTSK) if err != nil { return nil, xerrors.Errorf("estimating fee cap: %w", err) } diff --git a/node/impl/full/mpool.go b/node/impl/full/mpool.go index 1f093606c38..8ad209f3fdc 100644 --- a/node/impl/full/mpool.go +++ b/node/impl/full/mpool.go @@ -10,14 +10,32 @@ import ( "golang.org/x/xerrors" "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/messagepool" "github.com/filecoin-project/lotus/chain/messagesigner" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/node/modules/dtypes" ) +type MpoolModuleAPI interface { + MpoolPush(ctx context.Context, smsg *types.SignedMessage) (cid.Cid, error) +} + +// MpoolModule provides a default implementation of MpoolModuleAPI. +// It can be swapped out with another implementation through Dependency +// Injection (for example with a thin RPC client). +type MpoolModule struct { + fx.In + + Mpool *messagepool.MessagePool +} + +var _ MpoolModuleAPI = (*MpoolModule)(nil) + type MpoolAPI struct { fx.In + MpoolModuleAPI + WalletAPI GasAPI @@ -106,8 +124,8 @@ func (a *MpoolAPI) MpoolClear(ctx context.Context, local bool) error { return nil } -func (a *MpoolAPI) MpoolPush(ctx context.Context, smsg *types.SignedMessage) (cid.Cid, error) { - return a.Mpool.Push(smsg) +func (m *MpoolModule) MpoolPush(ctx context.Context, smsg *types.SignedMessage) (cid.Cid, error) { + return m.Mpool.Push(smsg) } func (a *MpoolAPI) MpoolPushUntrusted(ctx context.Context, smsg *types.SignedMessage) (cid.Cid, error) { @@ -162,7 +180,7 @@ func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message, spe // Sign and push the message return a.MessageSigner.SignMessage(ctx, msg, func(smsg *types.SignedMessage) error { - if _, err := a.Mpool.Push(smsg); err != nil { + if _, err := a.MpoolModuleAPI.MpoolPush(ctx, smsg); err != nil { return xerrors.Errorf("mpool push: failed to push message: %w", err) } return nil diff --git a/node/impl/full/multisig.go b/node/impl/full/multisig.go index 715689edc30..3b05e44935b 100644 --- a/node/impl/full/multisig.go +++ b/node/impl/full/multisig.go @@ -23,9 +23,8 @@ import ( type MsigAPI struct { fx.In - WalletAPI WalletAPI - StateAPI StateAPI - MpoolAPI MpoolAPI + StateAPI StateAPI + MpoolAPI MpoolAPI } func (a *MsigAPI) messageBuilder(ctx context.Context, from address.Address) (multisig.MessageBuilder, error) { diff --git a/node/impl/full/state.go b/node/impl/full/state.go index 56dfc0a149b..34724e2051e 100644 --- a/node/impl/full/state.go +++ b/node/impl/full/state.go @@ -42,6 +42,27 @@ import ( "github.com/filecoin-project/lotus/node/modules/dtypes" ) +type StateModuleAPI interface { + StateAccountKey(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error) + StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) + StateLookupID(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error) + StateWaitMsg(ctx context.Context, msg cid.Cid, confidence uint64) (*api.MsgLookup, error) + MsigGetAvailableBalance(ctx context.Context, addr address.Address, tsk types.TipSetKey) (types.BigInt, error) + MsigGetVested(ctx context.Context, addr address.Address, start types.TipSetKey, end types.TipSetKey) (types.BigInt, error) +} + +// StateModule provides a default implementation of StateModuleAPI. +// It can be swapped out with another implementation through Dependency +// Injection (for example with a thin RPC client). +type StateModule struct { + fx.In + + StateManager *stmgr.StateManager + Chain *store.ChainStore +} + +var _ StateModuleAPI = (*StateModule)(nil) + type StateAPI struct { fx.In @@ -49,6 +70,8 @@ type StateAPI struct { // API attached to the state API. It probably should live somewhere better Wallet *wallet.Wallet + StateModuleAPI + ProofVerifier ffiwrapper.Verifier StateManager *stmgr.StateManager Chain *store.ChainStore @@ -349,27 +372,33 @@ func (a *StateAPI) StateReplay(ctx context.Context, tsk types.TipSetKey, mc cid. }, nil } -func (a *StateAPI) stateForTs(ctx context.Context, ts *types.TipSet) (*state.StateTree, error) { +func stateForTs(ctx context.Context, ts *types.TipSet, cstore *store.ChainStore, smgr *stmgr.StateManager) (*state.StateTree, error) { if ts == nil { - ts = a.Chain.GetHeaviestTipSet() + ts = cstore.GetHeaviestTipSet() } - st, _, err := a.StateManager.TipSetState(ctx, ts) + st, _, err := smgr.TipSetState(ctx, ts) if err != nil { return nil, err } - buf := bufbstore.NewBufferedBstore(a.Chain.Blockstore()) + buf := bufbstore.NewBufferedBstore(cstore.Blockstore()) cst := cbor.NewCborStore(buf) return state.LoadStateTree(cst, st) } +func (a *StateAPI) stateForTs(ctx context.Context, ts *types.TipSet) (*state.StateTree, error) { + return stateForTs(ctx, ts, a.Chain, a.StateManager) +} +func (m *StateModule) stateForTs(ctx context.Context, ts *types.TipSet) (*state.StateTree, error) { + return stateForTs(ctx, ts, m.Chain, m.StateManager) +} -func (a *StateAPI) StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) { - ts, err := a.Chain.GetTipSetFromKey(tsk) +func (m *StateModule) StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) { + ts, err := m.Chain.GetTipSetFromKey(tsk) if err != nil { return nil, xerrors.Errorf("loading tipset %s: %w", tsk, err) } - state, err := a.stateForTs(ctx, ts) + state, err := m.stateForTs(ctx, ts) if err != nil { return nil, xerrors.Errorf("computing tipset state failed: %w", err) } @@ -377,26 +406,22 @@ func (a *StateAPI) StateGetActor(ctx context.Context, actor address.Address, tsk return state.GetActor(actor) } -func (a *StateAPI) StateLookupID(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error) { - ts, err := a.Chain.GetTipSetFromKey(tsk) +func (m *StateModule) StateLookupID(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error) { + ts, err := m.Chain.GetTipSetFromKey(tsk) if err != nil { return address.Undef, xerrors.Errorf("loading tipset %s: %w", tsk, err) } - state, err := a.stateForTs(ctx, ts) - if err != nil { - return address.Undef, err - } - return state.LookupID(addr) + return m.StateManager.LookupID(ctx, addr, ts) } -func (a *StateAPI) StateAccountKey(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error) { - ts, err := a.Chain.GetTipSetFromKey(tsk) +func (m *StateModule) StateAccountKey(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error) { + ts, err := m.Chain.GetTipSetFromKey(tsk) if err != nil { return address.Undef, xerrors.Errorf("loading tipset %s: %w", tsk, err) } - return a.StateManager.ResolveToKeyAddress(ctx, addr, ts) + return m.StateManager.ResolveToKeyAddress(ctx, addr, ts) } func (a *StateAPI) StateReadState(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*api.ActorState, error) { @@ -453,22 +478,28 @@ func (a *StateAPI) MinerCreateBlock(ctx context.Context, bt *api.BlockTemplate) return &out, nil } -func (a *StateAPI) StateWaitMsg(ctx context.Context, msg cid.Cid, confidence uint64) (*api.MsgLookup, error) { - ts, recpt, found, err := a.StateManager.WaitForMessage(ctx, msg, confidence) +func (m *StateModule) StateWaitMsg(ctx context.Context, msg cid.Cid, confidence uint64) (*api.MsgLookup, error) { + return stateWaitMsgLimited(ctx, m.StateManager, m.Chain, msg, confidence, stmgr.LookbackNoLimit) +} +func (a *StateAPI) StateWaitMsgLimited(ctx context.Context, msg cid.Cid, confidence uint64, lookbackLimit abi.ChainEpoch) (*api.MsgLookup, error) { + return stateWaitMsgLimited(ctx, a.StateManager, a.Chain, msg, confidence, lookbackLimit) +} +func stateWaitMsgLimited(ctx context.Context, smgr *stmgr.StateManager, cstore *store.ChainStore, msg cid.Cid, confidence uint64, lookbackLimit abi.ChainEpoch) (*api.MsgLookup, error) { + ts, recpt, found, err := smgr.WaitForMessage(ctx, msg, confidence, lookbackLimit) if err != nil { return nil, err } var returndec interface{} if recpt.ExitCode == 0 && len(recpt.Return) > 0 { - cmsg, err := a.Chain.GetCMessage(msg) + cmsg, err := cstore.GetCMessage(msg) if err != nil { return nil, xerrors.Errorf("failed to load message after successful receipt search: %w", err) } vmsg := cmsg.VMMessage() - t, err := stmgr.GetReturnType(ctx, a.StateManager, vmsg.To, vmsg.Method, ts) + t, err := stmgr.GetReturnType(ctx, smgr, vmsg.To, vmsg.Method, ts) if err != nil { return nil, xerrors.Errorf("failed to get return type: %w", err) } @@ -799,17 +830,17 @@ func (a *StateAPI) StateCompute(ctx context.Context, height abi.ChainEpoch, msgs }, nil } -func (a *StateAPI) MsigGetAvailableBalance(ctx context.Context, addr address.Address, tsk types.TipSetKey) (types.BigInt, error) { - ts, err := a.Chain.GetTipSetFromKey(tsk) +func (m *StateModule) MsigGetAvailableBalance(ctx context.Context, addr address.Address, tsk types.TipSetKey) (types.BigInt, error) { + ts, err := m.Chain.GetTipSetFromKey(tsk) if err != nil { return types.EmptyInt, xerrors.Errorf("loading tipset %s: %w", tsk, err) } - act, err := a.StateManager.LoadActor(ctx, addr, ts) + act, err := m.StateManager.LoadActor(ctx, addr, ts) if err != nil { return types.EmptyInt, xerrors.Errorf("failed to load multisig actor: %w", err) } - msas, err := multisig.Load(a.Chain.Store(ctx), act) + msas, err := multisig.Load(m.Chain.Store(ctx), act) if err != nil { return types.EmptyInt, xerrors.Errorf("failed to load multisig actor state: %w", err) } @@ -858,13 +889,13 @@ func (a *StateAPI) MsigGetVestingSchedule(ctx context.Context, addr address.Addr }, nil } -func (a *StateAPI) MsigGetVested(ctx context.Context, addr address.Address, start types.TipSetKey, end types.TipSetKey) (types.BigInt, error) { - startTs, err := a.Chain.GetTipSetFromKey(start) +func (m *StateModule) MsigGetVested(ctx context.Context, addr address.Address, start types.TipSetKey, end types.TipSetKey) (types.BigInt, error) { + startTs, err := m.Chain.GetTipSetFromKey(start) if err != nil { return types.EmptyInt, xerrors.Errorf("loading start tipset %s: %w", start, err) } - endTs, err := a.Chain.GetTipSetFromKey(end) + endTs, err := m.Chain.GetTipSetFromKey(end) if err != nil { return types.EmptyInt, xerrors.Errorf("loading end tipset %s: %w", end, err) } @@ -875,12 +906,12 @@ func (a *StateAPI) MsigGetVested(ctx context.Context, addr address.Address, star return big.Zero(), nil } - act, err := a.StateManager.LoadActor(ctx, addr, endTs) + act, err := m.StateManager.LoadActor(ctx, addr, endTs) if err != nil { return types.EmptyInt, xerrors.Errorf("failed to load multisig actor at end epoch: %w", err) } - msas, err := multisig.Load(a.Chain.Store(ctx), act) + msas, err := multisig.Load(m.Chain.Store(ctx), act) if err != nil { return types.EmptyInt, xerrors.Errorf("failed to load multisig actor state: %w", err) } diff --git a/node/impl/full/wallet.go b/node/impl/full/wallet.go index b2ecdebbd22..616c7b46edb 100644 --- a/node/impl/full/wallet.go +++ b/node/impl/full/wallet.go @@ -19,8 +19,8 @@ import ( type WalletAPI struct { fx.In - StateManager *stmgr.StateManager - Wallet *wallet.Wallet + StateManagerAPI stmgr.StateManagerAPI + Wallet *wallet.Wallet } func (a *WalletAPI) WalletNew(ctx context.Context, typ crypto.SigType) (address.Address, error) { @@ -36,7 +36,7 @@ func (a *WalletAPI) WalletList(ctx context.Context) ([]address.Address, error) { } func (a *WalletAPI) WalletBalance(ctx context.Context, addr address.Address) (types.BigInt, error) { - act, err := a.StateManager.LoadActorTsk(ctx, addr, types.EmptyTSK) + act, err := a.StateManagerAPI.LoadActorTsk(ctx, addr, types.EmptyTSK) if xerrors.Is(err, types.ErrActorNotFound) { return big.Zero(), nil } else if err != nil { @@ -46,7 +46,7 @@ func (a *WalletAPI) WalletBalance(ctx context.Context, addr address.Address) (ty } func (a *WalletAPI) WalletSign(ctx context.Context, k address.Address, msg []byte) (*crypto.Signature, error) { - keyAddr, err := a.StateManager.ResolveToKeyAddress(ctx, k, nil) + keyAddr, err := a.StateManagerAPI.ResolveToKeyAddress(ctx, k, nil) if err != nil { return nil, xerrors.Errorf("failed to resolve ID address: %w", keyAddr) } diff --git a/node/modules/mpoolnonceapi.go b/node/modules/mpoolnonceapi.go new file mode 100644 index 00000000000..294f4d95478 --- /dev/null +++ b/node/modules/mpoolnonceapi.go @@ -0,0 +1,33 @@ +package modules + +import ( + "context" + + "go.uber.org/fx" + + "github.com/filecoin-project/lotus/node/impl/full" + + "github.com/filecoin-project/lotus/chain/messagesigner" + "github.com/filecoin-project/lotus/chain/types" + + "github.com/filecoin-project/go-address" +) + +// MpoolNonceAPI substitutes the mpool nonce with an implementation that +// doesn't rely on the mpool - it just gets the nonce from actor state +type MpoolNonceAPI struct { + fx.In + + StateAPI full.StateAPI +} + +// GetNonce gets the nonce from actor state +func (a *MpoolNonceAPI) GetNonce(addr address.Address) (uint64, error) { + act, err := a.StateAPI.StateGetActor(context.Background(), addr, types.EmptyTSK) + if err != nil { + return 0, err + } + return act.Nonce, nil +} + +var _ messagesigner.MpoolNonceAPI = (*MpoolNonceAPI)(nil) diff --git a/node/modules/rpcstatemanager.go b/node/modules/rpcstatemanager.go new file mode 100644 index 00000000000..0ed054d4504 --- /dev/null +++ b/node/modules/rpcstatemanager.go @@ -0,0 +1,33 @@ +package modules + +import ( + "context" + + "github.com/filecoin-project/lotus/api" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/chain/stmgr" + "github.com/filecoin-project/lotus/chain/types" +) + +type RPCStateManager struct { + gapi api.GatewayAPI +} + +func NewRPCStateManager(api api.GatewayAPI) *RPCStateManager { + return &RPCStateManager{gapi: api} +} + +func (s *RPCStateManager) LoadActorTsk(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*types.Actor, error) { + return s.gapi.StateGetActor(ctx, addr, tsk) +} + +func (s *RPCStateManager) LookupID(ctx context.Context, addr address.Address, ts *types.TipSet) (address.Address, error) { + return s.gapi.StateLookupID(ctx, addr, ts.Key()) +} + +func (s *RPCStateManager) ResolveToKeyAddress(ctx context.Context, addr address.Address, ts *types.TipSet) (address.Address, error) { + return s.gapi.StateAccountKey(ctx, addr, ts.Key()) +} + +var _ stmgr.StateManagerAPI = (*RPCStateManager)(nil) diff --git a/node/test/builder.go b/node/test/builder.go index d9ec04460c1..4aa8a55eaf2 100644 --- a/node/test/builder.go +++ b/node/test/builder.go @@ -7,10 +7,13 @@ import ( "io/ioutil" "net" "net/http/httptest" + "strings" "sync" "testing" "time" + "golang.org/x/xerrors" + "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-jsonrpc" "github.com/filecoin-project/go-state-types/abi" @@ -117,6 +120,7 @@ func CreateTestStorageNode(ctx context.Context, t *testing.T, waddr address.Addr if err != nil { t.Fatalf("failed to construct node: %v", err) } + t.Cleanup(func() { _ = stop(context.Background()) }) /*// Bootstrap with full node @@ -137,13 +141,29 @@ func CreateTestStorageNode(ctx context.Context, t *testing.T, waddr address.Addr return test.TestStorageNode{StorageMiner: minerapi, MineOne: mineOne} } -func Builder(t *testing.T, nFull int, storage []test.StorageMiner, opts ...node.Option) ([]test.TestNode, []test.TestStorageNode) { +func Builder(t *testing.T, fullOpts []test.FullNodeOpts, storage []test.StorageMiner) ([]test.TestNode, []test.TestStorageNode) { + return mockBuilderOpts(t, fullOpts, storage, false) +} + +func MockSbBuilder(t *testing.T, fullOpts []test.FullNodeOpts, storage []test.StorageMiner) ([]test.TestNode, []test.TestStorageNode) { + return mockSbBuilderOpts(t, fullOpts, storage, false) +} + +func RPCBuilder(t *testing.T, fullOpts []test.FullNodeOpts, storage []test.StorageMiner) ([]test.TestNode, []test.TestStorageNode) { + return mockBuilderOpts(t, fullOpts, storage, true) +} + +func RPCMockSbBuilder(t *testing.T, fullOpts []test.FullNodeOpts, storage []test.StorageMiner) ([]test.TestNode, []test.TestStorageNode) { + return mockSbBuilderOpts(t, fullOpts, storage, true) +} + +func mockBuilderOpts(t *testing.T, fullOpts []test.FullNodeOpts, storage []test.StorageMiner, rpc bool) ([]test.TestNode, []test.TestStorageNode) { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) mn := mocknet.New(ctx) - fulls := make([]test.TestNode, nFull) + fulls := make([]test.TestNode, len(fullOpts)) storers := make([]test.TestStorageNode, len(storage)) pk, _, err := crypto.GenerateEd25519Key(rand.Reader) @@ -208,7 +228,7 @@ func Builder(t *testing.T, nFull int, storage []test.StorageMiner, opts ...node. // END PRESEAL SECTION - for i := 0; i < nFull; i++ { + for i := 0; i < len(fullOpts); i++ { var genesis node.Option if i == 0 { genesis = node.Override(new(modules.Genesis), testing2.MakeGenesisMem(&genbuf, *templ)) @@ -217,19 +237,25 @@ func Builder(t *testing.T, nFull int, storage []test.StorageMiner, opts ...node. } stop, err := node.New(ctx, - node.FullAPI(&fulls[i].FullNode), + node.FullAPI(&fulls[i].FullNode, node.Lite(fullOpts[i].Lite)), node.Online(), node.Repo(repo.NewMemory(nil)), node.MockHost(mn), node.Test(), genesis, - node.Options(opts...), + + fullOpts[i].Opts(fulls), ) if err != nil { t.Fatal(err) } + t.Cleanup(func() { _ = stop(context.Background()) }) + + if rpc { + fulls[i] = fullRpc(t, fulls[i]) + } } for i, def := range storage { @@ -261,6 +287,9 @@ func Builder(t *testing.T, nFull int, storage []test.StorageMiner, opts ...node. psd := presealDirs[i] */ + if rpc { + storers[i] = storerRpc(t, storers[i]) + } } if err := mn.LinkAll(); err != nil { @@ -286,11 +315,13 @@ func Builder(t *testing.T, nFull int, storage []test.StorageMiner, opts ...node. return fulls, storers } -func MockSbBuilder(t *testing.T, nFull int, storage []test.StorageMiner, options ...node.Option) ([]test.TestNode, []test.TestStorageNode) { - ctx := context.Background() +func mockSbBuilderOpts(t *testing.T, fullOpts []test.FullNodeOpts, storage []test.StorageMiner, rpc bool) ([]test.TestNode, []test.TestStorageNode) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + mn := mocknet.New(ctx) - fulls := make([]test.TestNode, nFull) + fulls := make([]test.TestNode, len(fullOpts)) storers := make([]test.TestStorageNode, len(storage)) var genbuf bytes.Buffer @@ -354,7 +385,7 @@ func MockSbBuilder(t *testing.T, nFull int, storage []test.StorageMiner, options // END PRESEAL SECTION - for i := 0; i < nFull; i++ { + for i := 0; i < len(fullOpts); i++ { var genesis node.Option if i == 0 { genesis = node.Override(new(modules.Genesis), testing2.MakeGenesisMem(&genbuf, *templ)) @@ -362,10 +393,8 @@ func MockSbBuilder(t *testing.T, nFull int, storage []test.StorageMiner, options genesis = node.Override(new(modules.Genesis), modules.LoadGenesis(genbuf.Bytes())) } - var err error - // TODO: Don't ignore stop stop, err := node.New(ctx, - node.FullAPI(&fulls[i].FullNode), + node.FullAPI(&fulls[i].FullNode, node.Lite(fullOpts[i].Lite)), node.Online(), node.Repo(repo.NewMemory(nil)), node.MockHost(mn), @@ -374,12 +403,18 @@ func MockSbBuilder(t *testing.T, nFull int, storage []test.StorageMiner, options node.Override(new(ffiwrapper.Verifier), mock.MockVerifier), genesis, - node.Options(options...), + + fullOpts[i].Opts(fulls), ) if err != nil { t.Fatalf("%+v", err) } + t.Cleanup(func() { _ = stop(context.Background()) }) + + if rpc { + fulls[i] = fullRpc(t, fulls[i]) + } } for i, def := range storage { @@ -414,6 +449,10 @@ func MockSbBuilder(t *testing.T, nFull int, storage []test.StorageMiner, options node.Override(new(ffiwrapper.Verifier), mock.MockVerifier), node.Unset(new(*sectorstorage.Manager)), )) + + if rpc { + storers[i] = storerRpc(t, storers[i]) + } } if err := mn.LinkAll(); err != nil { @@ -438,69 +477,66 @@ func MockSbBuilder(t *testing.T, nFull int, storage []test.StorageMiner, options return fulls, storers } -func RPCBuilder(t *testing.T, nFull int, storage []test.StorageMiner, opts ...node.Option) ([]test.TestNode, []test.TestStorageNode) { - return rpcWithBuilder(t, Builder, nFull, storage, opts...) -} +func fullRpc(t *testing.T, nd test.TestNode) test.TestNode { + ma, listenAddr, err := CreateRPCServer(nd) + require.NoError(t, err) -func RPCMockSbBuilder(t *testing.T, nFull int, storage []test.StorageMiner, opts ...node.Option) ([]test.TestNode, []test.TestStorageNode) { - return rpcWithBuilder(t, MockSbBuilder, nFull, storage, opts...) + var full test.TestNode + full.FullNode, _, err = client.NewFullNodeRPC(context.Background(), listenAddr, nil) + require.NoError(t, err) + + full.ListenAddr = ma + return full } -func rpcWithBuilder(t *testing.T, b test.APIBuilder, nFull int, storage []test.StorageMiner, opts ...node.Option) ([]test.TestNode, []test.TestStorageNode) { - fullApis, storaApis := b(t, nFull, storage, opts...) - fulls := make([]test.TestNode, nFull) - storers := make([]test.TestStorageNode, len(storage)) +func storerRpc(t *testing.T, nd test.TestStorageNode) test.TestStorageNode { + ma, listenAddr, err := CreateRPCServer(nd) + require.NoError(t, err) - for i, a := range fullApis { - rpcServer := jsonrpc.NewServer() - rpcServer.Register("Filecoin", a) - testServ := httptest.NewServer(rpcServer) // todo: close + var storer test.TestStorageNode + storer.StorageMiner, _, err = client.NewStorageMinerRPC(context.Background(), listenAddr, nil) + require.NoError(t, err) - addr := testServ.Listener.Addr() - listenAddr := "ws://" + addr.String() - var err error - fulls[i].FullNode, _, err = client.NewFullNodeRPC(context.Background(), listenAddr, nil) - if err != nil { - t.Fatal(err) - } - ma, err := parseWSSMultiAddr(addr) - if err != nil { - t.Fatal(err) - } - fulls[i].ListenAddr = ma - } + storer.ListenAddr = ma + storer.MineOne = nd.MineOne + return storer +} - for i, a := range storaApis { - rpcServer := jsonrpc.NewServer() - rpcServer.Register("Filecoin", a) - testServ := httptest.NewServer(rpcServer) // todo: close +func CreateRPCServer(handler interface{}) (multiaddr.Multiaddr, string, error) { + rpcServer := jsonrpc.NewServer() + rpcServer.Register("Filecoin", handler) + testServ := httptest.NewServer(rpcServer) // todo: close - addr := testServ.Listener.Addr() - listenAddr := "ws://" + addr.String() - var err error - storers[i].StorageMiner, _, err = client.NewStorageMinerRPC(context.Background(), listenAddr, nil) - if err != nil { - t.Fatal(err) - } - ma, err := parseWSSMultiAddr(addr) - if err != nil { - t.Fatal(err) - } - storers[i].ListenAddr = ma - storers[i].MineOne = a.MineOne + addr := testServ.Listener.Addr() + listenAddr := "ws://" + addr.String() + ma, err := parseWSMultiAddr(addr) + if err != nil { + return nil, "", err } - - return fulls, storers + return ma, listenAddr, err } -func parseWSSMultiAddr(addr net.Addr) (multiaddr.Multiaddr, error) { +func parseWSMultiAddr(addr net.Addr) (multiaddr.Multiaddr, error) { host, port, err := net.SplitHostPort(addr.String()) if err != nil { return nil, err } - ma, err := multiaddr.NewMultiaddr("/ip4/" + host + "/" + addr.Network() + "/" + port + "/wss") + ma, err := multiaddr.NewMultiaddr("/ip4/" + host + "/" + addr.Network() + "/" + port + "/ws") if err != nil { return nil, err } return ma, nil } + +func WSMultiAddrToString(addr multiaddr.Multiaddr) (string, error) { + parts := strings.Split(addr.String(), "/") + if len(parts) != 6 || parts[0] != "" { + return "", xerrors.Errorf("Malformed ws multiaddr %s", addr) + } + + host := parts[2] + port := parts[4] + proto := parts[5] + + return proto + "://" + host + ":" + port + "/rpc/v0", nil +}