From afb10391a43f8f418dd5daf3b0f965ea8eb89fbc Mon Sep 17 00:00:00 2001 From: mmsqe Date: Wed, 14 Jun 2023 21:32:17 +0800 Subject: [PATCH] fix: decode filtered logs in ws (backport: #1781) (#251) (#277) * fix: decode filtered logs in ws (backport: #1781) (#251) * fix decode logs in ws * add test * add change doc * decode in single loop * add ci timeout * reset max-tx-gas-wanted * fix rest filter api * test batch tx with exist sc * more filter tests * fix test --- .github/workflows/test.yml | 1 + rpc/backend/chain_info_test.go | 10 ++ rpc/namespaces/ethereum/eth/filters/api.go | 17 +-- rpc/websockets.go | 6 +- tests/integration_tests/network.py | 5 +- tests/integration_tests/test_websockets.py | 138 +++++++++++++++------ tests/integration_tests/utils.py | 64 +++++++++- x/evm/types/utils.go | 41 ++++-- 8 files changed, 217 insertions(+), 65 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 19d6c0e24a..c1647ad1e4 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -85,6 +85,7 @@ jobs: integration_tests: runs-on: ubuntu-latest + timeout-minutes: 60 steps: - uses: actions/checkout@v3 - uses: cachix/install-nix-action@v20 diff --git a/rpc/backend/chain_info_test.go b/rpc/backend/chain_info_test.go index 6d67bfe582..f4e6165f22 100644 --- a/rpc/backend/chain_info_test.go +++ b/rpc/backend/chain_info_test.go @@ -361,6 +361,9 @@ func (suite *BackendTestSuite) TestFeeHistory() { func(validator sdk.AccAddress) { client := suite.backend.clientCtx.Client.(*mocks.Client) suite.backend.cfg.JSONRPC.FeeHistoryCap = 2 + var header metadata.MD + queryClient := suite.backend.queryClient.QueryClient.(*mocks.EVMQueryClient) + RegisterParams(queryClient, &header, 1) RegisterBlockError(client, ethrpc.BlockNumber(1).Int64()) }, 1, @@ -374,6 +377,9 @@ func (suite *BackendTestSuite) TestFeeHistory() { func(validator sdk.AccAddress) { client := suite.backend.clientCtx.Client.(*mocks.Client) suite.backend.cfg.JSONRPC.FeeHistoryCap = 2 + var header metadata.MD + queryClient := suite.backend.queryClient.QueryClient.(*mocks.EVMQueryClient) + RegisterParams(queryClient, &header, 1) RegisterBlock(client, ethrpc.BlockNumber(1).Int64(), nil) RegisterBlockResultsError(client, 1) }, @@ -390,6 +396,8 @@ func (suite *BackendTestSuite) TestFeeHistory() { queryClient := suite.backend.queryClient.QueryClient.(*mocks.EVMQueryClient) client := suite.backend.clientCtx.Client.(*mocks.Client) suite.backend.cfg.JSONRPC.FeeHistoryCap = 2 + var header metadata.MD + RegisterParams(queryClient, &header, 1) RegisterBlock(client, ethrpc.BlockNumber(1).Int64(), nil) RegisterBlockResults(client, 1) RegisterBaseFeeError(queryClient) @@ -408,6 +416,7 @@ func (suite *BackendTestSuite) TestFeeHistory() { var header metadata.MD baseFee := sdk.NewInt(1) queryClient := suite.backend.queryClient.QueryClient.(*mocks.EVMQueryClient) + fQueryClient := suite.backend.queryClient.FeeMarket.(*mocks.FeeMarketQueryClient) client := suite.backend.clientCtx.Client.(*mocks.Client) suite.backend.cfg.JSONRPC.FeeHistoryCap = 2 RegisterBlock(client, ethrpc.BlockNumber(1).Int64(), nil) @@ -417,6 +426,7 @@ func (suite *BackendTestSuite) TestFeeHistory() { RegisterConsensusParams(client, 1) RegisterParams(queryClient, &header, 1) RegisterParamsWithoutHeader(queryClient, 1) + RegisterFeeMarketParams(fQueryClient, 1) }, 1, 1, diff --git a/rpc/namespaces/ethereum/eth/filters/api.go b/rpc/namespaces/ethereum/eth/filters/api.go index 1083711a30..feec40cce2 100644 --- a/rpc/namespaces/ethereum/eth/filters/api.go +++ b/rpc/namespaces/ethereum/eth/filters/api.go @@ -414,15 +414,12 @@ func (api *PublicFilterAPI) Logs(ctx context.Context, crit filters.FilterCriteri api.logger.Debug("event data type mismatch", "type", fmt.Sprintf("%T", ev.Data)) continue } - - txResponse, err := evmtypes.DecodeTxResponse(dataTx.TxResult.Result.Data) + txLogs, err := evmtypes.DecodeTxLogsFromEvents(dataTx.TxResult.Result.Data) if err != nil { - api.logger.Error("fail to decode tx response", "error", err) + api.logger.Error("fail to decode tx response", "error", err.Error()) return } - - logs := FilterLogs(evmtypes.LogsToEthereum(txResponse.Logs), crit.FromBlock, crit.ToBlock, crit.Addresses, crit.Topics) - + logs := FilterLogs(txLogs, crit.FromBlock, crit.ToBlock, crit.Addresses, crit.Topics) for _, log := range logs { _ = notifier.Notify(rpcSub.ID, log) } @@ -497,14 +494,12 @@ func (api *PublicFilterAPI) NewFilter(criteria filters.FilterCriteria) (rpc.ID, api.logger.Debug("event data type mismatch", "type", fmt.Sprintf("%T", ev.Data)) continue } - - txResponse, err := evmtypes.DecodeTxResponse(dataTx.TxResult.Result.Data) + txLogs, err := evmtypes.DecodeTxLogsFromEvents(dataTx.TxResult.Result.Data) if err != nil { - api.logger.Error("fail to decode tx response", "error", err) + api.logger.Error("fail to decode tx response", "error", err.Error()) return } - - logs := FilterLogs(evmtypes.LogsToEthereum(txResponse.Logs), criteria.FromBlock, criteria.ToBlock, criteria.Addresses, criteria.Topics) + logs := FilterLogs(txLogs, criteria.FromBlock, criteria.ToBlock, criteria.Addresses, criteria.Topics) api.filtersMu.Lock() if f, found := api.filters[filterID]; found { diff --git a/rpc/websockets.go b/rpc/websockets.go index 3ed5c7d962..b42a7b70bc 100644 --- a/rpc/websockets.go +++ b/rpc/websockets.go @@ -589,14 +589,12 @@ func (api *pubSubAPI) subscribeLogs(wsConn *wsConn, subID rpc.ID, extra interfac api.logger.Debug("event data type mismatch", "type", fmt.Sprintf("%T", event.Data)) continue } - - txResponse, err := evmtypes.DecodeTxResponse(dataTx.TxResult.Result.Data) + txLogs, err := evmtypes.DecodeTxLogsFromEvents(dataTx.TxResult.Result.Data) if err != nil { api.logger.Error("failed to decode tx response", "error", err.Error()) return } - - logs := rpcfilters.FilterLogs(evmtypes.LogsToEthereum(txResponse.Logs), crit.FromBlock, crit.ToBlock, crit.Addresses, crit.Topics) + logs := rpcfilters.FilterLogs(txLogs, crit.FromBlock, crit.ToBlock, crit.Addresses, crit.Topics) if len(logs) == 0 { continue } diff --git a/tests/integration_tests/network.py b/tests/integration_tests/network.py index c638e2a563..e3d22aa616 100644 --- a/tests/integration_tests/network.py +++ b/tests/integration_tests/network.py @@ -9,7 +9,7 @@ from web3.middleware import geth_poa_middleware from .cosmoscli import CosmosCLI -from .utils import wait_for_port +from .utils import supervisorctl, wait_for_port DEFAULT_CHAIN_BINARY = "ethermintd" @@ -62,6 +62,9 @@ def cosmos_cli(self, i=0): self.base_dir / f"node{i}", self.node_rpc(i), self.chain_binary ) + def supervisorctl(self, *args): + return supervisorctl(self.base_dir / "../tasks.ini", *args) + class Geth: def __init__(self, w3): diff --git a/tests/integration_tests/test_websockets.py b/tests/integration_tests/test_websockets.py index db0c5218a4..247b22ddc1 100644 --- a/tests/integration_tests/test_websockets.py +++ b/tests/integration_tests/test_websockets.py @@ -3,45 +3,65 @@ from collections import defaultdict import websockets +from eth_utils import abi +from hexbytes import HexBytes from pystarport import ports - -def test_single_request_netversion(ethermint): - ethermint.use_websocket() - eth_ws = ethermint.w3.provider - - response = eth_ws.make_request("net_version", []) - - # net_version should be 9000 - assert response["result"] == "9000", "got " + response["result"] + ", expected 9000" - - -# note: -# batch requests still not implemented in web3.py -# todo: follow https://github.com/ethereum/web3.py/issues/832, add tests when complete - -# eth_subscribe and eth_unsubscribe support still not implemented in web3.py -# todo: follow https://github.com/ethereum/web3.py/issues/1402, add tests when complete +from .network import Ethermint +from .utils import ( + ADDRS, + CONTRACTS, + build_batch_tx, + deploy_contract, + modify_command_in_supervisor_config, + wait_for_new_blocks, + wait_for_port, +) class Client: def __init__(self, ws): self._ws = ws + self._gen_id = 0 self._subs = defaultdict(asyncio.Queue) self._rsps = defaultdict(asyncio.Queue) + def gen_id(self): + self._gen_id += 1 + return self._gen_id + async def receive_loop(self): while True: msg = json.loads(await self._ws.recv()) if "id" in msg: # responses await self._rsps[msg["id"]].put(msg) + else: + # subscriptions + assert msg["method"] == "eth_subscription" + sub_id = msg["params"]["subscription"] + await self._subs[sub_id].put(msg["params"]["result"]) async def recv_response(self, rpcid): rsp = await self._rsps[rpcid].get() del self._rsps[rpcid] return rsp + async def recv_subscription(self, sub_id): + return await self._subs[sub_id].get() + + async def subscribe(self, *args): + rpcid = self.gen_id() + await self._ws.send( + json.dumps({"id": rpcid, "method": "eth_subscribe", "params": args}) + ) + rsp = await self.recv_response(rpcid) + assert "error" not in rsp + return rsp["result"] + + def sub_qsize(self, sub_id): + return self._subs[sub_id].qsize() + async def send(self, id): await self._ws.send( json.dumps({"id": id, "method": "web3_clientVersion", "params": []}) @@ -49,37 +69,79 @@ async def send(self, id): rsp = await self.recv_response(id) assert "error" not in rsp - -def test_web3_client_version(ethermint): - ethermint_ws = ethermint.copy() - ethermint_ws.use_websocket() - port = ethermint_ws.base_port(0) - url = f"ws://127.0.0.1:{ports.evmrpc_ws_port(port)}" + async def unsubscribe(self, sub_id): + rpcid = self.gen_id() + await self._ws.send( + json.dumps({"id": rpcid, "method": "eth_unsubscribe", "params": [sub_id]}) + ) + rsp = await self.recv_response(rpcid) + assert "error" not in rsp + return rsp["result"] + + +def test_subscribe_basic(ethermint: Ethermint): + """ + test basic subscribe and unsubscribe + """ + modify_command_in_supervisor_config( + ethermint.base_dir / "tasks.ini", + lambda cmd: f"{cmd} --evm.max-tx-gas-wanted {0}", + ) + ethermint.supervisorctl("update") + wait_for_port(ports.evmrpc_ws_port(ethermint.base_port(0))) + cli = ethermint.cosmos_cli() loop = asyncio.get_event_loop() + async def assert_unsubscribe(c: Client, sub_id): + assert await c.unsubscribe(sub_id) + # check no more messages + await loop.run_in_executor(None, wait_for_new_blocks, cli, 2) + assert c.sub_qsize(sub_id) == 0 + # unsubscribe again return False + assert not await c.unsubscribe(sub_id) + + async def logs_test(c: Client, w3, contract): + method = "Transfer(address,address,uint256)" + topic = f"0x{abi.event_signature_to_log_topic(method).hex()}" + params = {"address": contract.address, "topics": [topic]} + sub_id = await c.subscribe("logs", params) + sender = ADDRS["validator"] + recipient = ADDRS["community"] + nonce = w3.eth.get_transaction_count(sender) + total = 2 + txs = [ + contract.functions.transfer(recipient, 1000).build_transaction( + {"from": sender, "nonce": nonce + n, "gas": 200000} + ) + for n in range(total) + ] + cosmos_tx, _ = build_batch_tx(w3, cli, txs) + rsp = cli.broadcast_tx_json(cosmos_tx) + assert rsp["code"] == 0, rsp["raw_log"] + msgs = [await c.recv_subscription(sub_id) for i in range(total)] + assert len(msgs) == total + for msg in msgs: + assert topic in msg["topics"] == [ + topic, + HexBytes(b"\x00" * 12 + HexBytes(sender)).hex(), + HexBytes(b"\x00" * 12 + HexBytes(recipient)).hex(), + ] + await assert_unsubscribe(c, sub_id) + async def async_test(): - async with websockets.connect(url) as ws: + async with websockets.connect(ethermint.w3_ws_endpoint) as ws: c = Client(ws) t = asyncio.create_task(c.receive_loop()) # run send concurrently await asyncio.gather(*[c.send(id) for id in ["0", 1, 2.0]]) + contract, _ = deploy_contract(ethermint.w3, CONTRACTS["TestERC20A"]) + await asyncio.gather(*[logs_test(c, ethermint.w3, contract)]) t.cancel() try: await t except asyncio.CancelledError: - # allow retry + print("cancel") pass - loop.run_until_complete(async_test()) - - -def test_batch_request_netversion(ethermint): - return - - -def test_ws_subscribe_log(ethermint): - return - - -def test_ws_subscribe_newheads(ethermint): - return + timeout = 50 + loop.run_until_complete(asyncio.wait_for(async_test(), timeout)) diff --git a/tests/integration_tests/utils.py b/tests/integration_tests/utils.py index f4b7c95657..5f9c3c9f2f 100644 --- a/tests/integration_tests/utils.py +++ b/tests/integration_tests/utils.py @@ -1,9 +1,11 @@ import json import os +import re import socket import subprocess import sys import time +from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path import bech32 @@ -188,10 +190,9 @@ def decode_bech32(addr): def supervisorctl(inipath, *args): - subprocess.run( + return subprocess.check_output( (sys.executable, "-msupervisor.supervisorctl", "-c", inipath, *args), - check=True, - ) + ).decode() def parse_events(logs): @@ -206,3 +207,60 @@ def derive_new_account(n=1): account_path = f"m/44'/60'/0'/0/{n}" mnemonic = os.getenv("COMMUNITY_MNEMONIC") return Account.from_mnemonic(mnemonic, account_path=account_path) + + +def send_raw_transactions(w3, raw_transactions): + with ThreadPoolExecutor(len(raw_transactions)) as exec: + tasks = [ + exec.submit(w3.eth.send_raw_transaction, raw) for raw in raw_transactions + ] + sended_hash_set = {future.result() for future in as_completed(tasks)} + return sended_hash_set + + +def modify_command_in_supervisor_config(ini: Path, fn, **kwargs): + "replace the first node with the instrumented binary" + ini.write_text( + re.sub( + r"^command = (ethermintd .*$)", + lambda m: f"command = {fn(m.group(1))}", + ini.read_text(), + flags=re.M, + **kwargs, + ) + ) + + +def build_batch_tx(w3, cli, txs, key=KEYS["validator"]): + "return cosmos batch tx and eth tx hashes" + signed_txs = [sign_transaction(w3, tx, key) for tx in txs] + tmp_txs = [cli.build_evm_tx(signed.rawTransaction.hex()) for signed in signed_txs] + + msgs = [tx["body"]["messages"][0] for tx in tmp_txs] + fee = sum(int(tx["auth_info"]["fee"]["amount"][0]["amount"]) for tx in tmp_txs) + gas_limit = sum(int(tx["auth_info"]["fee"]["gas_limit"]) for tx in tmp_txs) + + tx_hashes = [signed.hash for signed in signed_txs] + + # build batch cosmos tx + return { + "body": { + "messages": msgs, + "memo": "", + "timeout_height": "0", + "extension_options": [ + {"@type": "/ethermint.evm.v1.ExtensionOptionsEthereumTx"} + ], + "non_critical_extension_options": [], + }, + "auth_info": { + "signer_infos": [], + "fee": { + "amount": [{"denom": "aphoton", "amount": str(fee)}], + "gas_limit": str(gas_limit), + "payer": "", + "granter": "", + }, + }, + "signatures": [], + }, tx_hashes diff --git a/x/evm/types/utils.go b/x/evm/types/utils.go index 02c697b841..d7696ba2bc 100644 --- a/x/evm/types/utils.go +++ b/x/evm/types/utils.go @@ -26,6 +26,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/math" + ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" ) @@ -36,23 +37,47 @@ var DefaultPriorityReduction = sdk.DefaultPowerReduction var EmptyCodeHash = crypto.Keccak256(nil) -// DecodeTxResponse decodes an protobuf-encoded byte slice into TxResponse +// DecodeTxResponse decodes a protobuf-encoded byte slice into TxResponse func DecodeTxResponse(in []byte) (*MsgEthereumTxResponse, error) { + responses, err := DecodeTxResponses(in) + if err != nil { + return nil, err + } + if len(responses) == 0 { + return &MsgEthereumTxResponse{}, nil + } + return responses[0], nil +} + +// DecodeTxResponses decodes a protobuf-encoded byte slice into TxResponses +func DecodeTxResponses(in []byte) ([]*MsgEthereumTxResponse, error) { var txMsgData sdk.TxMsgData if err := proto.Unmarshal(in, &txMsgData); err != nil { return nil, err } - if len(txMsgData.MsgResponses) == 0 { - return &MsgEthereumTxResponse{}, nil + responses := make([]*MsgEthereumTxResponse, len(txMsgData.MsgResponses)) + for i, res := range txMsgData.MsgResponses { + var response MsgEthereumTxResponse + if err := proto.Unmarshal(res.Value, &response); err != nil { + return nil, errorsmod.Wrap(err, "failed to unmarshal tx response message data") + } + responses[i] = &response } + return responses, nil +} - var res MsgEthereumTxResponse - if err := proto.Unmarshal(txMsgData.MsgResponses[0].Value, &res); err != nil { - return nil, errorsmod.Wrap(err, "failed to unmarshal tx response message data") +// DecodeTxLogsFromEvents decodes a protobuf-encoded byte slice into ethereum logs +func DecodeTxLogsFromEvents(in []byte) ([]*ethtypes.Log, error) { + txResponses, err := DecodeTxResponses(in) + if err != nil { + return nil, err } - - return &res, nil + var txLogs []*Log + for _, response := range txResponses { + txLogs = append(txLogs, response.Logs...) + } + return LogsToEthereum(txLogs), nil } // EncodeTransactionLogs encodes TransactionLogs slice into a protobuf-encoded byte slice.