Skip to content
This repository has been archived by the owner on Dec 15, 2021. It is now read-only.

IS-914: bugfix query_iscore #385

Merged
merged 2 commits into from
Nov 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion iconservice/iiss/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,10 @@ def handle_query_iscore(self,
_context: 'IconScoreContext',
params: dict) -> dict:
ret_params: dict = TypeConverter.convert(params, ParamType.IISS_QUERY_ISCORE)
address: 'Address' = ret_params[ConstantKeys.ADDRESS]
address: 'Address' = ret_params.get(ConstantKeys.ADDRESS)

if not isinstance(address, Address):
raise InvalidParamsException(f"Invalid address: {address}")

# TODO: error handling
iscore, block_height = self._reward_calc_proxy.query_iscore(address)
Expand Down
3 changes: 3 additions & 0 deletions iconservice/iiss/reward_calc/ipc/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,9 @@ def from_list(items: list) -> 'QueryCalculateResultResponse':


class QueryRequest(Request):
"""queryIScore
"""

def __init__(self, address: 'Address'):
super().__init__(MessageType.QUERY)

Expand Down
14 changes: 9 additions & 5 deletions iconservice/iiss/reward_calc/ipc/message_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
# limitations under the License.

import asyncio
from iconcommons.logger import Logger
from typing import Callable, Any, Optional
from typing import Callable, Any, Optional, Dict

from iconcommons.logger import Logger
from iconservice.base.exception import InvalidParamsException, ServiceNotReadyException
from iconservice.icon_constant import RCStatus
from .message import Request, Response, MessageType
from iconservice.base.exception import InvalidParamsException, ServiceNotReadyException


class MessageQueue(object):
Expand All @@ -29,7 +29,7 @@ def __init__(self, loop, notify_message: tuple = None, notify_handler: Callable[
"If notify_message is not None, notify_handler is mandatory parameter")
self._loop = loop
self._requests = asyncio.Queue()
self._msg_id_to_future = {}
self._msg_id_to_future: Dict[int, asyncio.Future] = {}
self.notify_message: Optional[tuple] = notify_message
self.notify_handler = notify_handler
self._rc_status = RCStatus.NOT_READY
Expand All @@ -47,10 +47,14 @@ def put(self, request, wait_for_response: bool = True) -> Optional[asyncio.Futur
self._msg_id_to_future[request.msg_id] = future
return future

def task_done(self):
self._requests.task_done()

def message_handler(self, response: 'Response'):
msg_type: MessageType = getattr(response, "MSG_TYPE")

if self._rc_status == RCStatus.NOT_READY:
if response.MSG_TYPE == MessageType.READY:
if msg_type == MessageType.READY:
self._rc_status = RCStatus.READY
else:
raise ServiceNotReadyException(f"Ready notification did not arrive: {response}")
Expand Down
2 changes: 2 additions & 0 deletions iconservice/iiss/reward_calc/ipc/reward_calc_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,8 @@ def query_iscore(self, address: 'Address') -> tuple:
:return: [i-score(int), block_height(int)]
:exception TimeoutException: The operation has timed-out
"""
assert isinstance(address, Address)

Logger.debug(tag=_TAG, msg="query_iscore() start")

future: concurrent.futures.Future = asyncio.run_coroutine_threadsafe(
Expand Down
67 changes: 41 additions & 26 deletions iconservice/iiss/reward_calc/ipc/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
from .message_unpacker import MessageUnpacker


_TAG = "RCP"


class IPCServer(object):
def __init__(self):
self._loop = None
Expand Down Expand Up @@ -69,50 +72,62 @@ def close(self):
self._unpacker = None

def _on_accepted(self, reader: 'StreamReader', writer: 'StreamWriter'):
Logger.debug(f"on_accepted() start: {reader} {writer}")
Logger.debug(tag=_TAG, msg=f"on_accepted() start: {reader} {writer}")

self._tasks.append(asyncio.ensure_future(self._on_send(writer)))
self._tasks.append(asyncio.ensure_future(self._on_recv(reader)))

Logger.debug("on_accepted() end")
Logger.debug(tag=_TAG, msg="on_accepted() end")

async def _on_send(self, writer: 'StreamWriter'):
Logger.debug("_on_send() start")
Logger.debug(tag=_TAG, msg="_on_send() start")

while True:
request: 'Request' = await self._queue.get()
if request.msg_type == MessageType.NONE:
self._queue.put_response(
NoneResponse.from_list([request.msg_type, request.msg_id])
)
break

data: bytes = request.to_bytes()
Logger.debug(f"on_send(): data({data.hex()}")
Logger.info(f"Sending Data : {request}")
writer.write(data)
await writer.drain()
try:
request: 'Request' = await self._queue.get()
if request.msg_type == MessageType.NONE:
self._queue.put_response(
NoneResponse.from_list([request.msg_type, request.msg_id])
)

self._queue.task_done()
break

data: bytes = request.to_bytes()
Logger.debug(tag=_TAG, msg=f"on_send(): data({data.hex()}")
Logger.info(tag=_TAG, msg=f"Sending Data : {request}")
writer.write(data)
await writer.drain()

self._queue.task_done()

except BaseException as e:
Logger.error(tag=_TAG, msg=str(e))

writer.close()

Logger.debug("_on_send() end")
Logger.debug(tag=_TAG, msg="_on_send() end")

async def _on_recv(self, reader: 'StreamReader'):
Logger.debug("_on_recv() start")
Logger.debug(tag=_TAG, msg="_on_recv() start")

while True:
data: bytes = await reader.read(1024)
if not isinstance(data, bytes) or len(data) == 0:
break
try:
data: bytes = await reader.read(1024)
if not isinstance(data, bytes) or len(data) == 0:
break

Logger.debug(tag=_TAG, msg=f"_on_recv(): data({data.hex()})")

Logger.debug(f"_on_recv(): data({data.hex()})")
self._unpacker.feed(data)

self._unpacker.feed(data)
for response in self._unpacker:
Logger.info(tag=_TAG, msg=f"Received Data : {response}")
self._queue.message_handler(response)

for response in self._unpacker:
Logger.info(f"Received Data : {response}")
self._queue.message_handler(response)
except BaseException as e:
Logger.error(tag=_TAG, msg=str(e))

await self._queue.put(NoneRequest())

Logger.debug("_on_recv() end")
Logger.debug(tag=_TAG, msg="_on_recv() end")
32 changes: 31 additions & 1 deletion tests/integrate_test/iiss/prevote/test_iiss_claim.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
from typing import TYPE_CHECKING, List
from unittest.mock import Mock

import pytest

from iconservice.base.address import ZERO_SCORE_ADDRESS
from iconservice.base.exception import InvalidParamsException
from iconservice.icon_constant import IISS_MAX_DELEGATIONS, Revision, ICX_IN_LOOP
from iconservice.iiss.reward_calc.ipc.reward_calc_proxy import RewardCalcProxy
from tests.integrate_test.iiss.test_iiss_base import TestIISSBase
Expand Down Expand Up @@ -86,7 +89,10 @@ def test_iiss_claim(self):
iscore = icx * 10 ** 3
RewardCalcProxy.query_iscore = Mock(return_value=(iscore, block_height))

# query iscore
# query iscore with an invalid address
self._query_iscore_with_invalid_params()

# query iscore with a valid address
response: dict = self.query_iscore(self._accounts[0])
expected_response = {
"blockHeight": block_height,
Expand Down Expand Up @@ -115,3 +121,27 @@ def test_iiss_claim(self):
self.assertEqual(['IScoreClaimed(int,int)'], tx_results[0].event_logs[0].indexed)
self.assertEqual([icx, iscore], tx_results[0].event_logs[0].data)
RewardCalcProxy.commit_claim.assert_not_called()

def _query_iscore_with_invalid_params(self):
params = {
"version": self._version,
"to": ZERO_SCORE_ADDRESS,
"dataType": "call",
"data": {
"method": "queryIScore"
}
}

# query iscore without an address
with pytest.raises(InvalidParamsException):
self.icon_service_engine.query("icx_call", params)

# query iscore with an empty string as an address
params["data"]["params"] = {"address": ""}
with pytest.raises(InvalidParamsException):
self.icon_service_engine.query("icx_call", params)

# query iscore with an invalid address
params["data"]["params"] = {"address": "hx1234"}
with pytest.raises(InvalidParamsException):
self.icon_service_engine.query("icx_call", params)
4 changes: 2 additions & 2 deletions tests/integrate_test/iiss/test_iiss_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,8 @@ def get_delegation(self,
return self._query(query_request)

def query_iscore(self,
from_: Union['EOAAccount', 'Address', str]) -> dict:
address: Optional['Address'] = self._convert_address_from_address_type(from_)
address: Union['EOAAccount', 'Address', str]) -> dict:
address: Optional['Address'] = self._convert_address_from_address_type(address)

query_request = {
"version": self._version,
Expand Down