Skip to content
This repository has been archived by the owner on Dec 15, 2021. It is now read-only.

Commit

Permalink
Add unit-tests for ContainerDBs
Browse files Browse the repository at this point in the history
  • Loading branch information
goldworm committed Nov 16, 2020
1 parent ee85359 commit 5c21e3d
Showing 1 changed file with 182 additions and 108 deletions.
290 changes: 182 additions & 108 deletions tests/unit_test/iconscore/test_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
ArrayDB,
DictDB,
VarDB,
ContainerUtil,
get_default_value,
)
from iconservice.utils import int_to_bytes
from iconservice.utils.rlp import rlp_encode_bytes
Expand All @@ -44,6 +46,9 @@ class DummyKeyValueDatabase:
def __init__(self):
self._db = {}

def __len__(self):
return len(self._db)

def get(self, key: bytes) -> bytes:
return self._db.get(key)

Expand Down Expand Up @@ -114,34 +119,42 @@ def _init_context(cls, context, address: 'Address'):
def _set_revision(cls, context, revision: int):
context._inv_container.revision_code = revision

def test_array_db(self, context, key_value_db):
@pytest.mark.parametrize(
"values,value_type",
[
((0, 100, 200, 300), int),
(("aa", "b", "ccc", "dddd"), str),
((b"aa", b"b", b"ccc", b"dddd"), bytes),
((True, False, True, False), bool),
([Address(AddressPrefix.EOA, os.urandom(20)) for _ in range(4)], Address),
]
)
def test_array_db(self, context, key_value_db, values, value_type):
context_db = ContextDatabase(key_value_db, is_shared=False)
address = Address(AddressPrefix.CONTRACT, os.urandom(20))
score_db = IconScoreDatabase(address, context_db)

self._init_context(context, score_db.address)
self._set_revision(context, Revision.USE_RLP.value - 1)

name = "balances"
balances = ArrayDB(name, score_db, int)
name = "array_db"
array_db = ArrayDB(name, score_db, value_type)

for i in range(2):
balances.put(i * 100)
assert len(balances) == 2
array_db.put(values[i])
assert len(array_db) == 2

self._set_revision(context, Revision.USE_RLP.value)

balances.put(200) # balances[2] = 200
balances.put(300) # balances[3] = 300
assert len(balances) == 4
array_db.put(values[2])
array_db.put(values[3])
assert len(array_db) == 4

# balances: [0, 100, 200, 300]
for i, value in enumerate(balances):
assert value == i * 100
for i, value in enumerate(array_db):
assert value == values[i]

# len(balances)
final_key: bytes = _get_final_key(address, ContainerTag.ARRAY, name.encode(), use_rlp=True)
assert key_value_db.get(final_key) == int_to_bytes(len(balances))
assert key_value_db.get(final_key) == int_to_bytes(len(array_db))

for i, use_rlp in enumerate((False, False, True, True)):
key = _get_final_key(
Expand All @@ -151,155 +164,216 @@ def test_array_db(self, context, key_value_db):
int_to_bytes(i),
use_rlp=use_rlp,
)
assert key_value_db.get(key) == int_to_bytes(i * 100)

def test_var_db(self, context, key_value_db):
assert key_value_db.get(key) == ContainerUtil.encode_value(values[i])

for v in reversed(values):
assert v == array_db.pop()
assert len(array_db) == 0

# 2 values for array_db size still remain
# even though all items in array_db have been popped.
assert len(key_value_db) == 2

@pytest.mark.parametrize(
"old_value,new_value,value_type",
[
(300, 500, int),
("old string", "new string", str),
(b"old", b"new", bytes),
(False, True, bool),
(
Address(AddressPrefix.EOA, os.urandom(20)),
Address(AddressPrefix.CONTRACT, os.urandom(20)),
Address,
),
]
)
def test_var_db(self, context, key_value_db, old_value, new_value, value_type):
context_db = ContextDatabase(key_value_db, is_shared=False)
address = Address(AddressPrefix.CONTRACT, os.urandom(20))
score_db = IconScoreDatabase(address, context_db)

self._init_context(context, address)
self._set_revision(context, Revision.USE_RLP.value - 1)

name = "owner"
owner_address = Address(AddressPrefix.EOA, os.urandom(20))
name = "var_db"

owner = VarDB(name, score_db, Address)
owner.set(owner_address)
assert owner.get() == owner_address
var_db = VarDB(name, score_db, value_type)
var_db.set(old_value)
assert var_db.get() == old_value

key = _get_final_key(address, ContainerTag.VAR, name.encode(), use_rlp=False)
assert key_value_db.get(key) == owner_address.to_bytes()
assert key_value_db.get(key) == ContainerUtil.encode_value(old_value)

self._set_revision(context, Revision.USE_RLP.value)
assert key_value_db.get(key) == owner_address.to_bytes()
assert key_value_db.get(key) == ContainerUtil.encode_value(old_value)

owner.set(address)
assert owner.get() == address
assert owner.get() != owner_address
var_db.set(new_value)
assert var_db.get() == new_value
assert var_db.get() != old_value

key = _get_final_key(address, ContainerTag.VAR, name.encode(), use_rlp=True)
assert key_value_db.get(key) == address.to_bytes()
assert key_value_db.get(key) == ContainerUtil.encode_value(new_value)

def test_1_depth_dict_db(self, context, key_value_db):
var_db.remove()
assert var_db.get() == get_default_value(value_type)
assert len(key_value_db) == 0

@pytest.mark.parametrize(
"keys",
[
(b"hello", 1234, "world", Address(AddressPrefix.EOA, os.urandom(20))),
]
)
@pytest.mark.parametrize(
"old_values, new_values, value_type",
[
((False, True, False, True), (True, False, True, False), bool),
((b"a", b"b", b"c", b"d"), (b"A", b"B", b"C", b"D"), bytes),
((10, 20, 30, 40), (100, 200, 300, 400), int),
(("aa", "bb", "cc", "dd"), ("AA", "BB", "CC", "DD"), str),
(
[Address(AddressPrefix.EOA, os.urandom(20)) for _ in range(4)],
[Address(AddressPrefix.CONTRACT, os.urandom(20)) for _ in range(4)],
Address
),
]
)
def test_1_depth_dict_db(self, context, key_value_db, keys, old_values, new_values, value_type):
context_db = ContextDatabase(key_value_db, is_shared=False)
score_address = Address(AddressPrefix.CONTRACT, os.urandom(20))
score_db = IconScoreDatabase(score_address, context_db)

addresses = [
Address(AddressPrefix.EOA, i.to_bytes(20, "big"))
for i in range(3)
]

self._init_context(context, score_address)
self._set_revision(context, Revision.USE_RLP.value - 1)

name = "balances"
balances = DictDB(name, score_db, depth=1, value_type=int)
name = "dict_db_depth_1"
dict_db = DictDB(name, score_db, depth=1, value_type=value_type)

# [0, 100]
# Put two items to dict_db
for i in range(2):
balance = i * 100
address = addresses[i]
balances[address] = balance
assert balances[address] == balance
k, v = keys[i], old_values[i]

dict_db[k] = v
assert dict_db[k] == v

key = _get_final_key(
score_address, ContainerTag.DICT, name.encode(), address,
score_address, ContainerTag.DICT, name.encode(), ContainerUtil.encode_key(k),
use_rlp=False
)
assert key_value_db.get(key) == int_to_bytes(balance)
assert key_value_db.get(key) == ContainerUtil.encode_value(v)

self._set_revision(context, Revision.USE_RLP.value)

# [0, 1000, 2000]
for i in range(1, 3):
balance = i * 1000
address = addresses[i]
balances[address] = balance
assert balances[address] == balance
# Put 4 items to dict_db
for i, k in enumerate(keys):
old_v = old_values[i]
new_v = new_values[i]
dict_db[k] = new_v
assert dict_db[k] == new_v
assert dict_db[k] != old_v

key = _get_final_key(
score_address, ContainerTag.DICT, name.encode(), address,
score_address, ContainerTag.DICT, name.encode(), ContainerUtil.encode_key(k),
use_rlp=True
)
assert key_value_db.get(key) == int_to_bytes(balance)
assert key_value_db.get(key) == ContainerUtil.encode_value(new_v)

# [0, 1000, 2000]
for i, address in enumerate(addresses):
assert balances[address] == i * 1000
# If there is no value for a given key, default value is returned
for k in keys:
del dict_db[k]
assert dict_db[k] == get_default_value(value_type)

# {0, 2000}
del balances[addresses[1]]
assert balances[addresses[1]] == 0
assert len(key_value_db) == 0

def test_2_depth_dict_db(self, context, key_value_db):
@pytest.mark.parametrize(
"keys1",
[
(b"hello", 1234, "world", Address(AddressPrefix.EOA, os.urandom(20))),
]
)
@pytest.mark.parametrize(
"keys2",
[
(b"hello2", 12345, "world2", Address(AddressPrefix.CONTRACT, os.urandom(20))),
]
)
@pytest.mark.parametrize(
"old_values, new_values, value_type",
[
((False, True, False, True), (True, False, True, False), bool),
((b"a", b"b", b"c", b"d"), (b"A", b"B", b"C", b"D"), bytes),
((10, 20, 30, 40), (100, 200, 300, 400), int),
(("aa", "bb", "cc", "dd"), ("AA", "BB", "CC", "DD"), str),
(
[Address(AddressPrefix.EOA, os.urandom(20)) for _ in range(4)],
[Address(AddressPrefix.CONTRACT, os.urandom(20)) for _ in range(4)],
Address
),
]
)
def test_2_depth_dict_db(self, context, key_value_db, keys1, keys2, old_values, new_values, value_type):
context_db = ContextDatabase(key_value_db, is_shared=False)
score_address = Address(AddressPrefix.CONTRACT, os.urandom(20))
score_db = IconScoreDatabase(score_address, context_db)

self._init_context(context, score_address)
self._set_revision(context, Revision.USE_RLP.value - 1)

# allowances[Address][Address] = int
name = "allowances"
allowances = DictDB(name, score_db, depth=2, value_type=int)
name = "dict_db_depth_2"
dict_db = DictDB(name, score_db, depth=2, value_type=value_type)

parents = [
Address.from_prefix_and_int(AddressPrefix.EOA, 0),
Address.from_prefix_and_int(AddressPrefix.EOA, 1)
]
# To assign a value to middle-layer dict_db is forbidden
for k1, v in zip(keys1, old_values):
with pytest.raises(InvalidContainerAccessException):
dict_db[k1] = v

children = [
Address.from_prefix_and_int(AddressPrefix.EOA, 100),
Address.from_prefix_and_int(AddressPrefix.EOA, 101)
]
# Assign values to dict_db on Revision.USE_RLP - 1
for k1 in keys1:
for k2, v in zip(keys2, old_values):
dict_db[k1][k2] = v

assert len(key_value_db) == len(keys1) * len(keys2)

expected_allowances = {
parents[0]: {
children[0]: 100,
children[1]: 200,
},
parents[1]: {
children[0]: 1000,
children[1]: 2000,
},
}

for parent in parents:
for child in children:
allowances[parent][child] = expected_allowances[parent][child]

for parent in parents:
for child in children:
expected_allowance = expected_allowances[parent][child]
assert allowances[parent][child] == expected_allowance

key = _get_final_key(
for k1 in keys1:
for k2, v in zip(keys2, old_values):
assert dict_db[k1][k2] == v

key: bytes = _get_final_key(
score_address,
ContainerTag.DICT, name.encode(),
ContainerTag.DICT, parent.to_bytes(),
child.to_bytes(),
ContainerTag.DICT, ContainerUtil.encode_key(k1),
ContainerUtil.encode_key(k2),
use_rlp=False
)
assert key_value_db.get(key) == int_to_bytes(expected_allowance)
assert key_value_db.get(key) == ContainerUtil.encode_value(v)

with pytest.raises(InvalidContainerAccessException):
del allowances[parents[0]]
self._set_revision(context, Revision.USE_RLP.value)

del allowances[parents[0]][children[0]]
assert allowances[parents[0]][children[0]] == 0
# Replace all old_values with new_values on Revision.USE_RLP
for k1 in keys1:
for k2, v in zip(keys2, new_values):
dict_db[k1][k2] = v

self._set_revision(context, Revision.USE_RLP.value)
# old_values + new_values
assert len(key_value_db) == len(keys1) * len(keys2) * 2

for k1 in keys1:
for k2, v in zip(keys2, new_values):
assert dict_db[k1][k2] == v

key: bytes = _get_final_key(
score_address,
ContainerTag.DICT, name.encode(),
ContainerUtil.encode_key(k1),
ContainerUtil.encode_key(k2),
use_rlp=True
)
assert key_value_db.get(key) == ContainerUtil.encode_value(v)

for k1 in keys1:
for k2 in keys2:
del dict_db[k1][k2]
assert dict_db[k1][k2] == get_default_value(value_type)

value = 9999
allowances[parents[0]][children[0]] = value
assert allowances[parents[0]][children[0]] == value

key = _get_final_key(
score_address,
ContainerTag.DICT, name.encode(),
parents[0].to_bytes(), children[0].to_bytes(),
use_rlp=True
)
assert key_value_db.get(key) == int_to_bytes(value)
assert len(key_value_db) == 0

0 comments on commit 5c21e3d

Please sign in to comment.