Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize DL get key by node. #17680

Merged
merged 5 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions chia/data_layer/data_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ async def managed(
CREATE INDEX IF NOT EXISTS node_hash ON root(node_hash)
"""
)
await writer.execute(
"""
CREATE INDEX IF NOT EXISTS key ON node(key)
emlowe marked this conversation as resolved.
Show resolved Hide resolved
"""
)

yield self

Expand Down Expand Up @@ -1508,12 +1513,50 @@ async def insert_root_with_ancestor_table(
if status == Status.COMMITTED:
await self.build_ancestor_table_for_latest_root(tree_id=tree_id)

async def get_node_by_key_latest_generation(self, key: bytes, tree_id: bytes32) -> TerminalNode:
async with self.db_wrapper.reader() as reader:
root = await self.get_tree_root(tree_id=tree_id)
if root.node_hash is None:
raise KeyNotFoundError(key=key)

cursor = await reader.execute(
"""
SELECT a.hash FROM ancestors a
JOIN node n ON a.hash = n.hash
WHERE n.key = :key
AND a.tree_id = :tree_id
ORDER BY a.generation DESC
LIMIT 1
""",
{"key": key, "tree_id": tree_id},
)

row = await cursor.fetchone()
if row is None:
raise KeyNotFoundError(key=key)

node = await self.get_node(row["hash"])
node_hash = node.hash
while True:
internal_node = await self._get_one_ancestor(node_hash, tree_id)
if internal_node is None:
break
node_hash = internal_node.hash

if node_hash != root.node_hash:
raise KeyNotFoundError(key=key)
assert isinstance(node, TerminalNode)
return node

async def get_node_by_key(
self,
key: bytes,
tree_id: bytes32,
root_hash: Optional[bytes32] = None,
) -> TerminalNode:
if root_hash is None:
return await self.get_node_by_key_latest_generation(key, tree_id)

nodes = await self.get_keys_values(tree_id=tree_id, root_hash=root_hash)

for node in nodes:
Expand Down
39 changes: 39 additions & 0 deletions tests/core/data_layer/test_data_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -1797,3 +1797,42 @@ async def test_delete_store_data_protects_pending_roots(raw_data_store: DataStor
start_index = index * keys_per_pending_root
end_index = (index + 1) * keys_per_pending_root
assert {pair.key for pair in kv} == set(original_keys[start_index:end_index])


@pytest.mark.anyio
async def test_get_node_by_key_with_overlapping_keys(raw_data_store: DataStore) -> None:
num_stores = 5
num_keys = 20
values_offset = 10000
repetitions = 25
random = Random()
random.seed(100, version=2)

tree_ids = [bytes32(i.to_bytes(32, byteorder="big")) for i in range(num_stores)]
for tree_id in tree_ids:
await raw_data_store.create_tree(tree_id=tree_id, status=Status.COMMITTED)
keys = [key.to_bytes(4, byteorder="big") for key in range(num_keys)]
for repetition in range(repetitions):
for index, tree_id in enumerate(tree_ids):
values = [
(value + values_offset * repetition).to_bytes(4, byteorder="big")
for value in range(index * num_keys, (index + 1) * num_keys)
]
batch = []
for key, value in zip(keys, values):
batch.append({"action": "upsert", "key": key, "value": value})
await raw_data_store.insert_batch(tree_id, batch, status=Status.COMMITTED)

for index, tree_id in enumerate(tree_ids):
values = [
(value + values_offset * repetition).to_bytes(4, byteorder="big")
for value in range(index * num_keys, (index + 1) * num_keys)
]
for key, value in zip(keys, values):
node = await raw_data_store.get_node_by_key(tree_id=tree_id, key=key)
assert node.value == value
if random.randint(0, 4) == 0:
batch = [{"action": "delete", "key": key}]
await raw_data_store.insert_batch(tree_id, batch, status=Status.COMMITTED)
with pytest.raises(Exception):
node = await raw_data_store.get_node_by_key(tree_id=tree_id, key=key)
Loading