diff --git a/iconservice/iiss/engine.py b/iconservice/iiss/engine.py index 33625f575..8df969d9d 100644 --- a/iconservice/iiss/engine.py +++ b/iconservice/iiss/engine.py @@ -660,7 +660,10 @@ def handle_query_iscore(self, _context: 'IconScoreContext', params: dict) -> dict: ret_params: dict = TypeConverter.convert(params, ParamType.IISS_QUERY_ISCORE) - address: 'Address' = ret_params[ConstantKeys.ADDRESS] + address: 'Address' = ret_params.get(ConstantKeys.ADDRESS) + + if not isinstance(address, Address): + raise InvalidParamsException(f"Invalid address: {address}") # TODO: error handling iscore, block_height = self._reward_calc_proxy.query_iscore(address) diff --git a/iconservice/iiss/reward_calc/ipc/message.py b/iconservice/iiss/reward_calc/ipc/message.py index b653da868..4aad8c871 100644 --- a/iconservice/iiss/reward_calc/ipc/message.py +++ b/iconservice/iiss/reward_calc/ipc/message.py @@ -275,6 +275,9 @@ def from_list(items: list) -> 'QueryCalculateResultResponse': class QueryRequest(Request): + """queryIScore + """ + def __init__(self, address: 'Address'): super().__init__(MessageType.QUERY) diff --git a/iconservice/iiss/reward_calc/ipc/message_queue.py b/iconservice/iiss/reward_calc/ipc/message_queue.py index 4a4896043..8ada49b39 100644 --- a/iconservice/iiss/reward_calc/ipc/message_queue.py +++ b/iconservice/iiss/reward_calc/ipc/message_queue.py @@ -14,12 +14,12 @@ # limitations under the License. import asyncio -from iconcommons.logger import Logger -from typing import Callable, Any, Optional +from typing import Callable, Any, Optional, Dict +from iconcommons.logger import Logger +from iconservice.base.exception import InvalidParamsException, ServiceNotReadyException from iconservice.icon_constant import RCStatus from .message import Request, Response, MessageType -from iconservice.base.exception import InvalidParamsException, ServiceNotReadyException class MessageQueue(object): @@ -29,7 +29,7 @@ def __init__(self, loop, notify_message: tuple = None, notify_handler: Callable[ "If notify_message is not None, notify_handler is mandatory parameter") self._loop = loop self._requests = asyncio.Queue() - self._msg_id_to_future = {} + self._msg_id_to_future: Dict[int, asyncio.Future] = {} self.notify_message: Optional[tuple] = notify_message self.notify_handler = notify_handler self._rc_status = RCStatus.NOT_READY @@ -47,10 +47,14 @@ def put(self, request, wait_for_response: bool = True) -> Optional[asyncio.Futur self._msg_id_to_future[request.msg_id] = future return future + def task_done(self): + self._requests.task_done() + def message_handler(self, response: 'Response'): + msg_type: MessageType = getattr(response, "MSG_TYPE") if self._rc_status == RCStatus.NOT_READY: - if response.MSG_TYPE == MessageType.READY: + if msg_type == MessageType.READY: self._rc_status = RCStatus.READY else: raise ServiceNotReadyException(f"Ready notification did not arrive: {response}") diff --git a/iconservice/iiss/reward_calc/ipc/reward_calc_proxy.py b/iconservice/iiss/reward_calc/ipc/reward_calc_proxy.py index befe3bac8..0be0f923a 100644 --- a/iconservice/iiss/reward_calc/ipc/reward_calc_proxy.py +++ b/iconservice/iiss/reward_calc/ipc/reward_calc_proxy.py @@ -252,6 +252,8 @@ def query_iscore(self, address: 'Address') -> tuple: :return: [i-score(int), block_height(int)] :exception TimeoutException: The operation has timed-out """ + assert isinstance(address, Address) + Logger.debug(tag=_TAG, msg="query_iscore() start") future: concurrent.futures.Future = asyncio.run_coroutine_threadsafe( diff --git a/iconservice/iiss/reward_calc/ipc/server.py b/iconservice/iiss/reward_calc/ipc/server.py index 8ff80746a..8a943cbb6 100644 --- a/iconservice/iiss/reward_calc/ipc/server.py +++ b/iconservice/iiss/reward_calc/ipc/server.py @@ -24,6 +24,9 @@ from .message_unpacker import MessageUnpacker +_TAG = "RCP" + + class IPCServer(object): def __init__(self): self._loop = None @@ -69,50 +72,62 @@ def close(self): self._unpacker = None def _on_accepted(self, reader: 'StreamReader', writer: 'StreamWriter'): - Logger.debug(f"on_accepted() start: {reader} {writer}") + Logger.debug(tag=_TAG, msg=f"on_accepted() start: {reader} {writer}") self._tasks.append(asyncio.ensure_future(self._on_send(writer))) self._tasks.append(asyncio.ensure_future(self._on_recv(reader))) - Logger.debug("on_accepted() end") + Logger.debug(tag=_TAG, msg="on_accepted() end") async def _on_send(self, writer: 'StreamWriter'): - Logger.debug("_on_send() start") + Logger.debug(tag=_TAG, msg="_on_send() start") while True: - request: 'Request' = await self._queue.get() - if request.msg_type == MessageType.NONE: - self._queue.put_response( - NoneResponse.from_list([request.msg_type, request.msg_id]) - ) - break - - data: bytes = request.to_bytes() - Logger.debug(f"on_send(): data({data.hex()}") - Logger.info(f"Sending Data : {request}") - writer.write(data) - await writer.drain() + try: + request: 'Request' = await self._queue.get() + if request.msg_type == MessageType.NONE: + self._queue.put_response( + NoneResponse.from_list([request.msg_type, request.msg_id]) + ) + + self._queue.task_done() + break + + data: bytes = request.to_bytes() + Logger.debug(tag=_TAG, msg=f"on_send(): data({data.hex()}") + Logger.info(tag=_TAG, msg=f"Sending Data : {request}") + writer.write(data) + await writer.drain() + + self._queue.task_done() + + except BaseException as e: + Logger.error(tag=_TAG, msg=str(e)) writer.close() - Logger.debug("_on_send() end") + Logger.debug(tag=_TAG, msg="_on_send() end") async def _on_recv(self, reader: 'StreamReader'): - Logger.debug("_on_recv() start") + Logger.debug(tag=_TAG, msg="_on_recv() start") while True: - data: bytes = await reader.read(1024) - if not isinstance(data, bytes) or len(data) == 0: - break + try: + data: bytes = await reader.read(1024) + if not isinstance(data, bytes) or len(data) == 0: + break + + Logger.debug(tag=_TAG, msg=f"_on_recv(): data({data.hex()})") - Logger.debug(f"_on_recv(): data({data.hex()})") + self._unpacker.feed(data) - self._unpacker.feed(data) + for response in self._unpacker: + Logger.info(tag=_TAG, msg=f"Received Data : {response}") + self._queue.message_handler(response) - for response in self._unpacker: - Logger.info(f"Received Data : {response}") - self._queue.message_handler(response) + except BaseException as e: + Logger.error(tag=_TAG, msg=str(e)) await self._queue.put(NoneRequest()) - Logger.debug("_on_recv() end") + Logger.debug(tag=_TAG, msg="_on_recv() end") diff --git a/tests/integrate_test/iiss/prevote/test_iiss_claim.py b/tests/integrate_test/iiss/prevote/test_iiss_claim.py index db600ef69..058c4473d 100644 --- a/tests/integrate_test/iiss/prevote/test_iiss_claim.py +++ b/tests/integrate_test/iiss/prevote/test_iiss_claim.py @@ -19,7 +19,10 @@ from typing import TYPE_CHECKING, List from unittest.mock import Mock +import pytest + from iconservice.base.address import ZERO_SCORE_ADDRESS +from iconservice.base.exception import InvalidParamsException from iconservice.icon_constant import IISS_MAX_DELEGATIONS, Revision, ICX_IN_LOOP from iconservice.iiss.reward_calc.ipc.reward_calc_proxy import RewardCalcProxy from tests.integrate_test.iiss.test_iiss_base import TestIISSBase @@ -86,7 +89,10 @@ def test_iiss_claim(self): iscore = icx * 10 ** 3 RewardCalcProxy.query_iscore = Mock(return_value=(iscore, block_height)) - # query iscore + # query iscore with an invalid address + self._query_iscore_with_invalid_params() + + # query iscore with a valid address response: dict = self.query_iscore(self._accounts[0]) expected_response = { "blockHeight": block_height, @@ -115,3 +121,27 @@ def test_iiss_claim(self): self.assertEqual(['IScoreClaimed(int,int)'], tx_results[0].event_logs[0].indexed) self.assertEqual([icx, iscore], tx_results[0].event_logs[0].data) RewardCalcProxy.commit_claim.assert_not_called() + + def _query_iscore_with_invalid_params(self): + params = { + "version": self._version, + "to": ZERO_SCORE_ADDRESS, + "dataType": "call", + "data": { + "method": "queryIScore" + } + } + + # query iscore without an address + with pytest.raises(InvalidParamsException): + self.icon_service_engine.query("icx_call", params) + + # query iscore with an empty string as an address + params["data"]["params"] = {"address": ""} + with pytest.raises(InvalidParamsException): + self.icon_service_engine.query("icx_call", params) + + # query iscore with an invalid address + params["data"]["params"] = {"address": "hx1234"} + with pytest.raises(InvalidParamsException): + self.icon_service_engine.query("icx_call", params) diff --git a/tests/integrate_test/iiss/test_iiss_base.py b/tests/integrate_test/iiss/test_iiss_base.py index 75911c4bf..3e924613d 100644 --- a/tests/integrate_test/iiss/test_iiss_base.py +++ b/tests/integrate_test/iiss/test_iiss_base.py @@ -329,8 +329,8 @@ def get_delegation(self, return self._query(query_request) def query_iscore(self, - from_: Union['EOAAccount', 'Address', str]) -> dict: - address: Optional['Address'] = self._convert_address_from_address_type(from_) + address: Union['EOAAccount', 'Address', str]) -> dict: + address: Optional['Address'] = self._convert_address_from_address_type(address) query_request = { "version": self._version,