diff --git a/tests/unit_test/iconscore/test_db.py b/tests/unit_test/iconscore/test_db.py index 3b28c7605..5a57bf8a5 100644 --- a/tests/unit_test/iconscore/test_db.py +++ b/tests/unit_test/iconscore/test_db.py @@ -35,6 +35,8 @@ ArrayDB, DictDB, VarDB, + ContainerUtil, + get_default_value, ) from iconservice.utils import int_to_bytes from iconservice.utils.rlp import rlp_encode_bytes @@ -44,6 +46,9 @@ class DummyKeyValueDatabase: def __init__(self): self._db = {} + def __len__(self): + return len(self._db) + def get(self, key: bytes) -> bytes: return self._db.get(key) @@ -114,7 +119,17 @@ def _init_context(cls, context, address: 'Address'): def _set_revision(cls, context, revision: int): context._inv_container.revision_code = revision - def test_array_db(self, context, key_value_db): + @pytest.mark.parametrize( + "values,value_type", + [ + ((0, 100, 200, 300), int), + (("aa", "b", "ccc", "dddd"), str), + ((b"aa", b"b", b"ccc", b"dddd"), bytes), + ((True, False, True, False), bool), + ([Address(AddressPrefix.EOA, os.urandom(20)) for _ in range(4)], Address), + ] + ) + def test_array_db(self, context, key_value_db, values, value_type): context_db = ContextDatabase(key_value_db, is_shared=False) address = Address(AddressPrefix.CONTRACT, os.urandom(20)) score_db = IconScoreDatabase(address, context_db) @@ -122,26 +137,24 @@ def test_array_db(self, context, key_value_db): self._init_context(context, score_db.address) self._set_revision(context, Revision.USE_RLP.value - 1) - name = "balances" - balances = ArrayDB(name, score_db, int) + name = "array_db" + array_db = ArrayDB(name, score_db, value_type) for i in range(2): - balances.put(i * 100) - assert len(balances) == 2 + array_db.put(values[i]) + assert len(array_db) == 2 self._set_revision(context, Revision.USE_RLP.value) - balances.put(200) # balances[2] = 200 - balances.put(300) # balances[3] = 300 - assert len(balances) == 4 + array_db.put(values[2]) + array_db.put(values[3]) + assert len(array_db) == 4 - # balances: [0, 100, 200, 300] - for i, value in enumerate(balances): - assert value == i * 100 + for i, value in enumerate(array_db): + assert value == values[i] - # len(balances) final_key: bytes = _get_final_key(address, ContainerTag.ARRAY, name.encode(), use_rlp=True) - assert key_value_db.get(final_key) == int_to_bytes(len(balances)) + assert key_value_db.get(final_key) == int_to_bytes(len(array_db)) for i, use_rlp in enumerate((False, False, True, True)): key = _get_final_key( @@ -151,9 +164,31 @@ def test_array_db(self, context, key_value_db): int_to_bytes(i), use_rlp=use_rlp, ) - assert key_value_db.get(key) == int_to_bytes(i * 100) - - def test_var_db(self, context, key_value_db): + assert key_value_db.get(key) == ContainerUtil.encode_value(values[i]) + + for v in reversed(values): + assert v == array_db.pop() + assert len(array_db) == 0 + + # 2 values for array_db size still remain + # even though all items in array_db have been popped. + assert len(key_value_db) == 2 + + @pytest.mark.parametrize( + "old_value,new_value,value_type", + [ + (300, 500, int), + ("old string", "new string", str), + (b"old", b"new", bytes), + (False, True, bool), + ( + Address(AddressPrefix.EOA, os.urandom(20)), + Address(AddressPrefix.CONTRACT, os.urandom(20)), + Address, + ), + ] + ) + def test_var_db(self, context, key_value_db, old_value, new_value, value_type): context_db = ContextDatabase(key_value_db, is_shared=False) address = Address(AddressPrefix.CONTRACT, os.urandom(20)) score_db = IconScoreDatabase(address, context_db) @@ -161,79 +196,123 @@ def test_var_db(self, context, key_value_db): self._init_context(context, address) self._set_revision(context, Revision.USE_RLP.value - 1) - name = "owner" - owner_address = Address(AddressPrefix.EOA, os.urandom(20)) + name = "var_db" - owner = VarDB(name, score_db, Address) - owner.set(owner_address) - assert owner.get() == owner_address + var_db = VarDB(name, score_db, value_type) + var_db.set(old_value) + assert var_db.get() == old_value key = _get_final_key(address, ContainerTag.VAR, name.encode(), use_rlp=False) - assert key_value_db.get(key) == owner_address.to_bytes() + assert key_value_db.get(key) == ContainerUtil.encode_value(old_value) self._set_revision(context, Revision.USE_RLP.value) - assert key_value_db.get(key) == owner_address.to_bytes() + assert key_value_db.get(key) == ContainerUtil.encode_value(old_value) - owner.set(address) - assert owner.get() == address - assert owner.get() != owner_address + var_db.set(new_value) + assert var_db.get() == new_value + assert var_db.get() != old_value key = _get_final_key(address, ContainerTag.VAR, name.encode(), use_rlp=True) - assert key_value_db.get(key) == address.to_bytes() + assert key_value_db.get(key) == ContainerUtil.encode_value(new_value) - def test_1_depth_dict_db(self, context, key_value_db): + var_db.remove() + assert var_db.get() == get_default_value(value_type) + assert len(key_value_db) == 0 + + @pytest.mark.parametrize( + "keys", + [ + (b"hello", 1234, "world", Address(AddressPrefix.EOA, os.urandom(20))), + ] + ) + @pytest.mark.parametrize( + "old_values, new_values, value_type", + [ + ((False, True, False, True), (True, False, True, False), bool), + ((b"a", b"b", b"c", b"d"), (b"A", b"B", b"C", b"D"), bytes), + ((10, 20, 30, 40), (100, 200, 300, 400), int), + (("aa", "bb", "cc", "dd"), ("AA", "BB", "CC", "DD"), str), + ( + [Address(AddressPrefix.EOA, os.urandom(20)) for _ in range(4)], + [Address(AddressPrefix.CONTRACT, os.urandom(20)) for _ in range(4)], + Address + ), + ] + ) + def test_1_depth_dict_db(self, context, key_value_db, keys, old_values, new_values, value_type): context_db = ContextDatabase(key_value_db, is_shared=False) score_address = Address(AddressPrefix.CONTRACT, os.urandom(20)) score_db = IconScoreDatabase(score_address, context_db) - addresses = [ - Address(AddressPrefix.EOA, i.to_bytes(20, "big")) - for i in range(3) - ] - self._init_context(context, score_address) self._set_revision(context, Revision.USE_RLP.value - 1) - name = "balances" - balances = DictDB(name, score_db, depth=1, value_type=int) + name = "dict_db_depth_1" + dict_db = DictDB(name, score_db, depth=1, value_type=value_type) - # [0, 100] + # Put two items to dict_db for i in range(2): - balance = i * 100 - address = addresses[i] - balances[address] = balance - assert balances[address] == balance + k, v = keys[i], old_values[i] + + dict_db[k] = v + assert dict_db[k] == v key = _get_final_key( - score_address, ContainerTag.DICT, name.encode(), address, + score_address, ContainerTag.DICT, name.encode(), ContainerUtil.encode_key(k), use_rlp=False ) - assert key_value_db.get(key) == int_to_bytes(balance) + assert key_value_db.get(key) == ContainerUtil.encode_value(v) self._set_revision(context, Revision.USE_RLP.value) - # [0, 1000, 2000] - for i in range(1, 3): - balance = i * 1000 - address = addresses[i] - balances[address] = balance - assert balances[address] == balance + # Put 4 items to dict_db + for i, k in enumerate(keys): + old_v = old_values[i] + new_v = new_values[i] + dict_db[k] = new_v + assert dict_db[k] == new_v + assert dict_db[k] != old_v key = _get_final_key( - score_address, ContainerTag.DICT, name.encode(), address, + score_address, ContainerTag.DICT, name.encode(), ContainerUtil.encode_key(k), use_rlp=True ) - assert key_value_db.get(key) == int_to_bytes(balance) + assert key_value_db.get(key) == ContainerUtil.encode_value(new_v) - # [0, 1000, 2000] - for i, address in enumerate(addresses): - assert balances[address] == i * 1000 + # If there is no value for a given key, default value is returned + for k in keys: + del dict_db[k] + assert dict_db[k] == get_default_value(value_type) - # {0, 2000} - del balances[addresses[1]] - assert balances[addresses[1]] == 0 + assert len(key_value_db) == 0 - def test_2_depth_dict_db(self, context, key_value_db): + @pytest.mark.parametrize( + "keys1", + [ + (b"hello", 1234, "world", Address(AddressPrefix.EOA, os.urandom(20))), + ] + ) + @pytest.mark.parametrize( + "keys2", + [ + (b"hello2", 12345, "world2", Address(AddressPrefix.CONTRACT, os.urandom(20))), + ] + ) + @pytest.mark.parametrize( + "old_values, new_values, value_type", + [ + ((False, True, False, True), (True, False, True, False), bool), + ((b"a", b"b", b"c", b"d"), (b"A", b"B", b"C", b"D"), bytes), + ((10, 20, 30, 40), (100, 200, 300, 400), int), + (("aa", "bb", "cc", "dd"), ("AA", "BB", "CC", "DD"), str), + ( + [Address(AddressPrefix.EOA, os.urandom(20)) for _ in range(4)], + [Address(AddressPrefix.CONTRACT, os.urandom(20)) for _ in range(4)], + Address + ), + ] + ) + def test_2_depth_dict_db(self, context, key_value_db, keys1, keys2, old_values, new_values, value_type): context_db = ContextDatabase(key_value_db, is_shared=False) score_address = Address(AddressPrefix.CONTRACT, os.urandom(20)) score_db = IconScoreDatabase(score_address, context_db) @@ -241,65 +320,60 @@ def test_2_depth_dict_db(self, context, key_value_db): self._init_context(context, score_address) self._set_revision(context, Revision.USE_RLP.value - 1) - # allowances[Address][Address] = int - name = "allowances" - allowances = DictDB(name, score_db, depth=2, value_type=int) + name = "dict_db_depth_2" + dict_db = DictDB(name, score_db, depth=2, value_type=value_type) - parents = [ - Address.from_prefix_and_int(AddressPrefix.EOA, 0), - Address.from_prefix_and_int(AddressPrefix.EOA, 1) - ] + # To assign a value to middle-layer dict_db is forbidden + for k1, v in zip(keys1, old_values): + with pytest.raises(InvalidContainerAccessException): + dict_db[k1] = v - children = [ - Address.from_prefix_and_int(AddressPrefix.EOA, 100), - Address.from_prefix_and_int(AddressPrefix.EOA, 101) - ] + # Assign values to dict_db on Revision.USE_RLP - 1 + for k1 in keys1: + for k2, v in zip(keys2, old_values): + dict_db[k1][k2] = v + + assert len(key_value_db) == len(keys1) * len(keys2) - expected_allowances = { - parents[0]: { - children[0]: 100, - children[1]: 200, - }, - parents[1]: { - children[0]: 1000, - children[1]: 2000, - }, - } - - for parent in parents: - for child in children: - allowances[parent][child] = expected_allowances[parent][child] - - for parent in parents: - for child in children: - expected_allowance = expected_allowances[parent][child] - assert allowances[parent][child] == expected_allowance - - key = _get_final_key( + for k1 in keys1: + for k2, v in zip(keys2, old_values): + assert dict_db[k1][k2] == v + + key: bytes = _get_final_key( score_address, ContainerTag.DICT, name.encode(), - ContainerTag.DICT, parent.to_bytes(), - child.to_bytes(), + ContainerTag.DICT, ContainerUtil.encode_key(k1), + ContainerUtil.encode_key(k2), use_rlp=False ) - assert key_value_db.get(key) == int_to_bytes(expected_allowance) + assert key_value_db.get(key) == ContainerUtil.encode_value(v) - with pytest.raises(InvalidContainerAccessException): - del allowances[parents[0]] + self._set_revision(context, Revision.USE_RLP.value) - del allowances[parents[0]][children[0]] - assert allowances[parents[0]][children[0]] == 0 + # Replace all old_values with new_values on Revision.USE_RLP + for k1 in keys1: + for k2, v in zip(keys2, new_values): + dict_db[k1][k2] = v - self._set_revision(context, Revision.USE_RLP.value) + # old_values + new_values + assert len(key_value_db) == len(keys1) * len(keys2) * 2 + + for k1 in keys1: + for k2, v in zip(keys2, new_values): + assert dict_db[k1][k2] == v + + key: bytes = _get_final_key( + score_address, + ContainerTag.DICT, name.encode(), + ContainerUtil.encode_key(k1), + ContainerUtil.encode_key(k2), + use_rlp=True + ) + assert key_value_db.get(key) == ContainerUtil.encode_value(v) + + for k1 in keys1: + for k2 in keys2: + del dict_db[k1][k2] + assert dict_db[k1][k2] == get_default_value(value_type) - value = 9999 - allowances[parents[0]][children[0]] = value - assert allowances[parents[0]][children[0]] == value - - key = _get_final_key( - score_address, - ContainerTag.DICT, name.encode(), - parents[0].to_bytes(), children[0].to_bytes(), - use_rlp=True - ) - assert key_value_db.get(key) == int_to_bytes(value) + assert len(key_value_db) == 0