Skip to content

Commit

Permalink
fix: decode filtered logs in ws (backport: evmos#1781) (#251) (#277)
Browse files Browse the repository at this point in the history
* fix: decode filtered logs in ws (backport: evmos#1781) (#251)

* fix decode logs in ws

* add test

* add change doc

* decode in single loop

* add ci timeout

* reset max-tx-gas-wanted

* fix rest filter api

* test batch tx with exist sc

* more filter tests

* fix test
  • Loading branch information
mmsqe authored Jun 14, 2023
1 parent 1a879ae commit afb1039
Show file tree
Hide file tree
Showing 8 changed files with 217 additions and 65 deletions.
1 change: 1 addition & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ jobs:

integration_tests:
runs-on: ubuntu-latest
timeout-minutes: 60
steps:
- uses: actions/checkout@v3
- uses: cachix/install-nix-action@v20
Expand Down
10 changes: 10 additions & 0 deletions rpc/backend/chain_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,9 @@ func (suite *BackendTestSuite) TestFeeHistory() {
func(validator sdk.AccAddress) {
client := suite.backend.clientCtx.Client.(*mocks.Client)
suite.backend.cfg.JSONRPC.FeeHistoryCap = 2
var header metadata.MD
queryClient := suite.backend.queryClient.QueryClient.(*mocks.EVMQueryClient)
RegisterParams(queryClient, &header, 1)
RegisterBlockError(client, ethrpc.BlockNumber(1).Int64())
},
1,
Expand All @@ -374,6 +377,9 @@ func (suite *BackendTestSuite) TestFeeHistory() {
func(validator sdk.AccAddress) {
client := suite.backend.clientCtx.Client.(*mocks.Client)
suite.backend.cfg.JSONRPC.FeeHistoryCap = 2
var header metadata.MD
queryClient := suite.backend.queryClient.QueryClient.(*mocks.EVMQueryClient)
RegisterParams(queryClient, &header, 1)
RegisterBlock(client, ethrpc.BlockNumber(1).Int64(), nil)
RegisterBlockResultsError(client, 1)
},
Expand All @@ -390,6 +396,8 @@ func (suite *BackendTestSuite) TestFeeHistory() {
queryClient := suite.backend.queryClient.QueryClient.(*mocks.EVMQueryClient)
client := suite.backend.clientCtx.Client.(*mocks.Client)
suite.backend.cfg.JSONRPC.FeeHistoryCap = 2
var header metadata.MD
RegisterParams(queryClient, &header, 1)
RegisterBlock(client, ethrpc.BlockNumber(1).Int64(), nil)
RegisterBlockResults(client, 1)
RegisterBaseFeeError(queryClient)
Expand All @@ -408,6 +416,7 @@ func (suite *BackendTestSuite) TestFeeHistory() {
var header metadata.MD
baseFee := sdk.NewInt(1)
queryClient := suite.backend.queryClient.QueryClient.(*mocks.EVMQueryClient)
fQueryClient := suite.backend.queryClient.FeeMarket.(*mocks.FeeMarketQueryClient)
client := suite.backend.clientCtx.Client.(*mocks.Client)
suite.backend.cfg.JSONRPC.FeeHistoryCap = 2
RegisterBlock(client, ethrpc.BlockNumber(1).Int64(), nil)
Expand All @@ -417,6 +426,7 @@ func (suite *BackendTestSuite) TestFeeHistory() {
RegisterConsensusParams(client, 1)
RegisterParams(queryClient, &header, 1)
RegisterParamsWithoutHeader(queryClient, 1)
RegisterFeeMarketParams(fQueryClient, 1)
},
1,
1,
Expand Down
17 changes: 6 additions & 11 deletions rpc/namespaces/ethereum/eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,15 +414,12 @@ func (api *PublicFilterAPI) Logs(ctx context.Context, crit filters.FilterCriteri
api.logger.Debug("event data type mismatch", "type", fmt.Sprintf("%T", ev.Data))
continue
}

txResponse, err := evmtypes.DecodeTxResponse(dataTx.TxResult.Result.Data)
txLogs, err := evmtypes.DecodeTxLogsFromEvents(dataTx.TxResult.Result.Data)
if err != nil {
api.logger.Error("fail to decode tx response", "error", err)
api.logger.Error("fail to decode tx response", "error", err.Error())
return
}

logs := FilterLogs(evmtypes.LogsToEthereum(txResponse.Logs), crit.FromBlock, crit.ToBlock, crit.Addresses, crit.Topics)

logs := FilterLogs(txLogs, crit.FromBlock, crit.ToBlock, crit.Addresses, crit.Topics)
for _, log := range logs {
_ = notifier.Notify(rpcSub.ID, log)
}
Expand Down Expand Up @@ -497,14 +494,12 @@ func (api *PublicFilterAPI) NewFilter(criteria filters.FilterCriteria) (rpc.ID,
api.logger.Debug("event data type mismatch", "type", fmt.Sprintf("%T", ev.Data))
continue
}

txResponse, err := evmtypes.DecodeTxResponse(dataTx.TxResult.Result.Data)
txLogs, err := evmtypes.DecodeTxLogsFromEvents(dataTx.TxResult.Result.Data)
if err != nil {
api.logger.Error("fail to decode tx response", "error", err)
api.logger.Error("fail to decode tx response", "error", err.Error())
return
}

logs := FilterLogs(evmtypes.LogsToEthereum(txResponse.Logs), criteria.FromBlock, criteria.ToBlock, criteria.Addresses, criteria.Topics)
logs := FilterLogs(txLogs, criteria.FromBlock, criteria.ToBlock, criteria.Addresses, criteria.Topics)

api.filtersMu.Lock()
if f, found := api.filters[filterID]; found {
Expand Down
6 changes: 2 additions & 4 deletions rpc/websockets.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,14 +589,12 @@ func (api *pubSubAPI) subscribeLogs(wsConn *wsConn, subID rpc.ID, extra interfac
api.logger.Debug("event data type mismatch", "type", fmt.Sprintf("%T", event.Data))
continue
}

txResponse, err := evmtypes.DecodeTxResponse(dataTx.TxResult.Result.Data)
txLogs, err := evmtypes.DecodeTxLogsFromEvents(dataTx.TxResult.Result.Data)
if err != nil {
api.logger.Error("failed to decode tx response", "error", err.Error())
return
}

logs := rpcfilters.FilterLogs(evmtypes.LogsToEthereum(txResponse.Logs), crit.FromBlock, crit.ToBlock, crit.Addresses, crit.Topics)
logs := rpcfilters.FilterLogs(txLogs, crit.FromBlock, crit.ToBlock, crit.Addresses, crit.Topics)
if len(logs) == 0 {
continue
}
Expand Down
5 changes: 4 additions & 1 deletion tests/integration_tests/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from web3.middleware import geth_poa_middleware

from .cosmoscli import CosmosCLI
from .utils import wait_for_port
from .utils import supervisorctl, wait_for_port

DEFAULT_CHAIN_BINARY = "ethermintd"

Expand Down Expand Up @@ -62,6 +62,9 @@ def cosmos_cli(self, i=0):
self.base_dir / f"node{i}", self.node_rpc(i), self.chain_binary
)

def supervisorctl(self, *args):
return supervisorctl(self.base_dir / "../tasks.ini", *args)


class Geth:
def __init__(self, w3):
Expand Down
138 changes: 100 additions & 38 deletions tests/integration_tests/test_websockets.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,83 +3,145 @@
from collections import defaultdict

import websockets
from eth_utils import abi
from hexbytes import HexBytes
from pystarport import ports


def test_single_request_netversion(ethermint):
ethermint.use_websocket()
eth_ws = ethermint.w3.provider

response = eth_ws.make_request("net_version", [])

# net_version should be 9000
assert response["result"] == "9000", "got " + response["result"] + ", expected 9000"


# note:
# batch requests still not implemented in web3.py
# todo: follow https://github.com/ethereum/web3.py/issues/832, add tests when complete

# eth_subscribe and eth_unsubscribe support still not implemented in web3.py
# todo: follow https://github.com/ethereum/web3.py/issues/1402, add tests when complete
from .network import Ethermint
from .utils import (
ADDRS,
CONTRACTS,
build_batch_tx,
deploy_contract,
modify_command_in_supervisor_config,
wait_for_new_blocks,
wait_for_port,
)


class Client:
def __init__(self, ws):
self._ws = ws
self._gen_id = 0
self._subs = defaultdict(asyncio.Queue)
self._rsps = defaultdict(asyncio.Queue)

def gen_id(self):
self._gen_id += 1
return self._gen_id

async def receive_loop(self):
while True:
msg = json.loads(await self._ws.recv())
if "id" in msg:
# responses
await self._rsps[msg["id"]].put(msg)
else:
# subscriptions
assert msg["method"] == "eth_subscription"
sub_id = msg["params"]["subscription"]
await self._subs[sub_id].put(msg["params"]["result"])

async def recv_response(self, rpcid):
rsp = await self._rsps[rpcid].get()
del self._rsps[rpcid]
return rsp

async def recv_subscription(self, sub_id):
return await self._subs[sub_id].get()

async def subscribe(self, *args):
rpcid = self.gen_id()
await self._ws.send(
json.dumps({"id": rpcid, "method": "eth_subscribe", "params": args})
)
rsp = await self.recv_response(rpcid)
assert "error" not in rsp
return rsp["result"]

def sub_qsize(self, sub_id):
return self._subs[sub_id].qsize()

async def send(self, id):
await self._ws.send(
json.dumps({"id": id, "method": "web3_clientVersion", "params": []})
)
rsp = await self.recv_response(id)
assert "error" not in rsp


def test_web3_client_version(ethermint):
ethermint_ws = ethermint.copy()
ethermint_ws.use_websocket()
port = ethermint_ws.base_port(0)
url = f"ws://127.0.0.1:{ports.evmrpc_ws_port(port)}"
async def unsubscribe(self, sub_id):
rpcid = self.gen_id()
await self._ws.send(
json.dumps({"id": rpcid, "method": "eth_unsubscribe", "params": [sub_id]})
)
rsp = await self.recv_response(rpcid)
assert "error" not in rsp
return rsp["result"]


def test_subscribe_basic(ethermint: Ethermint):
"""
test basic subscribe and unsubscribe
"""
modify_command_in_supervisor_config(
ethermint.base_dir / "tasks.ini",
lambda cmd: f"{cmd} --evm.max-tx-gas-wanted {0}",
)
ethermint.supervisorctl("update")
wait_for_port(ports.evmrpc_ws_port(ethermint.base_port(0)))
cli = ethermint.cosmos_cli()
loop = asyncio.get_event_loop()

async def assert_unsubscribe(c: Client, sub_id):
assert await c.unsubscribe(sub_id)
# check no more messages
await loop.run_in_executor(None, wait_for_new_blocks, cli, 2)
assert c.sub_qsize(sub_id) == 0
# unsubscribe again return False
assert not await c.unsubscribe(sub_id)

async def logs_test(c: Client, w3, contract):
method = "Transfer(address,address,uint256)"
topic = f"0x{abi.event_signature_to_log_topic(method).hex()}"
params = {"address": contract.address, "topics": [topic]}
sub_id = await c.subscribe("logs", params)
sender = ADDRS["validator"]
recipient = ADDRS["community"]
nonce = w3.eth.get_transaction_count(sender)
total = 2
txs = [
contract.functions.transfer(recipient, 1000).build_transaction(
{"from": sender, "nonce": nonce + n, "gas": 200000}
)
for n in range(total)
]
cosmos_tx, _ = build_batch_tx(w3, cli, txs)
rsp = cli.broadcast_tx_json(cosmos_tx)
assert rsp["code"] == 0, rsp["raw_log"]
msgs = [await c.recv_subscription(sub_id) for i in range(total)]
assert len(msgs) == total
for msg in msgs:
assert topic in msg["topics"] == [
topic,
HexBytes(b"\x00" * 12 + HexBytes(sender)).hex(),
HexBytes(b"\x00" * 12 + HexBytes(recipient)).hex(),
]
await assert_unsubscribe(c, sub_id)

async def async_test():
async with websockets.connect(url) as ws:
async with websockets.connect(ethermint.w3_ws_endpoint) as ws:
c = Client(ws)
t = asyncio.create_task(c.receive_loop())
# run send concurrently
await asyncio.gather(*[c.send(id) for id in ["0", 1, 2.0]])
contract, _ = deploy_contract(ethermint.w3, CONTRACTS["TestERC20A"])
await asyncio.gather(*[logs_test(c, ethermint.w3, contract)])
t.cancel()
try:
await t
except asyncio.CancelledError:
# allow retry
print("cancel")
pass

loop.run_until_complete(async_test())


def test_batch_request_netversion(ethermint):
return


def test_ws_subscribe_log(ethermint):
return


def test_ws_subscribe_newheads(ethermint):
return
timeout = 50
loop.run_until_complete(asyncio.wait_for(async_test(), timeout))
64 changes: 61 additions & 3 deletions tests/integration_tests/utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import json
import os
import re
import socket
import subprocess
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path

import bech32
Expand Down Expand Up @@ -188,10 +190,9 @@ def decode_bech32(addr):


def supervisorctl(inipath, *args):
subprocess.run(
return subprocess.check_output(
(sys.executable, "-msupervisor.supervisorctl", "-c", inipath, *args),
check=True,
)
).decode()


def parse_events(logs):
Expand All @@ -206,3 +207,60 @@ def derive_new_account(n=1):
account_path = f"m/44'/60'/0'/0/{n}"
mnemonic = os.getenv("COMMUNITY_MNEMONIC")
return Account.from_mnemonic(mnemonic, account_path=account_path)


def send_raw_transactions(w3, raw_transactions):
with ThreadPoolExecutor(len(raw_transactions)) as exec:
tasks = [
exec.submit(w3.eth.send_raw_transaction, raw) for raw in raw_transactions
]
sended_hash_set = {future.result() for future in as_completed(tasks)}
return sended_hash_set


def modify_command_in_supervisor_config(ini: Path, fn, **kwargs):
"replace the first node with the instrumented binary"
ini.write_text(
re.sub(
r"^command = (ethermintd .*$)",
lambda m: f"command = {fn(m.group(1))}",
ini.read_text(),
flags=re.M,
**kwargs,
)
)


def build_batch_tx(w3, cli, txs, key=KEYS["validator"]):
"return cosmos batch tx and eth tx hashes"
signed_txs = [sign_transaction(w3, tx, key) for tx in txs]
tmp_txs = [cli.build_evm_tx(signed.rawTransaction.hex()) for signed in signed_txs]

msgs = [tx["body"]["messages"][0] for tx in tmp_txs]
fee = sum(int(tx["auth_info"]["fee"]["amount"][0]["amount"]) for tx in tmp_txs)
gas_limit = sum(int(tx["auth_info"]["fee"]["gas_limit"]) for tx in tmp_txs)

tx_hashes = [signed.hash for signed in signed_txs]

# build batch cosmos tx
return {
"body": {
"messages": msgs,
"memo": "",
"timeout_height": "0",
"extension_options": [
{"@type": "/ethermint.evm.v1.ExtensionOptionsEthereumTx"}
],
"non_critical_extension_options": [],
},
"auth_info": {
"signer_infos": [],
"fee": {
"amount": [{"denom": "aphoton", "amount": str(fee)}],
"gas_limit": str(gas_limit),
"payer": "",
"granter": "",
},
},
"signatures": [],
}, tx_hashes
Loading

0 comments on commit afb1039

Please sign in to comment.