Skip to content

Commit

Permalink
[CHIA-299] DL separate DAT files in folders by store id (#17688)
Browse files Browse the repository at this point in the history
* DL separate files by folder.

* Checkpoint.

* Lint.

* Fix http download args.

* Refactor.

* Fix bug.

* Syntax error.

* Fix bug.

* Lint.

* Add test.

* Add S3 change too.

* Lint.

* Fix bug.

* Fix merge conflict properly.

* Merge conflict.

* Merge conflict.

* Lint.

* Fix bug.

* Fix test post merge.

* Lint.

* Try to fix windows test.

* Coverage.

* Lint.

* Lint.

* Lint.

* Checkpoint have enable config option.

* Lint.

* Lint.

* Fix some tests.

* Fix test.

* Try to fix windows.

* Apply suggestions from code review

Co-authored-by: Kyle Altendorf <[email protected]>

* Update test_data_store.py

* Address some comments.

* Try with new syntax.

* Refactor.

* Lint.

* Lint.

* Try with overload.

* Lint.

* use ClientTimeout for timeout

* change timeout to ClientTimeout

---------

Co-authored-by: Kyle Altendorf <[email protected]>
Co-authored-by: Earle Lowe <[email protected]>
  • Loading branch information
3 people authored Aug 1, 2024
1 parent c4026f9 commit 74b14ce
Show file tree
Hide file tree
Showing 7 changed files with 513 additions and 134 deletions.
87 changes: 69 additions & 18 deletions chia/_tests/core/data_layer/test_data_rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
)
from chia.data_layer.data_layer_wallet import DataLayerWallet, verify_offer
from chia.data_layer.data_store import DataStore
from chia.data_layer.download_data import get_delta_filename, get_full_tree_filename
from chia.data_layer.download_data import get_delta_filename_path, get_full_tree_filename_path
from chia.rpc.data_layer_rpc_api import DataLayerRpcApi
from chia.rpc.data_layer_rpc_client import DataLayerRpcClient
from chia.rpc.wallet_rpc_api import WalletRpcApi
Expand Down Expand Up @@ -94,6 +94,7 @@ async def init_data_layer_service(
manage_data_interval: int = 5,
maximum_full_file_count: Optional[int] = None,
enable_batch_autoinsert: bool = True,
group_files_by_store: bool = False,
) -> AsyncIterator[DataLayerService]:
config = bt.config
config["data_layer"]["wallet_peer"]["port"] = int(wallet_rpc_port)
Expand All @@ -103,6 +104,7 @@ async def init_data_layer_service(
config["data_layer"]["rpc_port"] = 0
config["data_layer"]["manage_data_interval"] = 5
config["data_layer"]["enable_batch_autoinsert"] = enable_batch_autoinsert
config["data_layer"]["group_files_by_store"] = group_files_by_store
if maximum_full_file_count is not None:
config["data_layer"]["maximum_full_file_count"] = maximum_full_file_count
if db_path is not None:
Expand All @@ -124,9 +126,17 @@ async def init_data_layer(
wallet_service: Optional[WalletService] = None,
manage_data_interval: int = 5,
maximum_full_file_count: Optional[int] = None,
group_files_by_store: bool = False,
) -> AsyncIterator[DataLayer]:
async with init_data_layer_service(
wallet_rpc_port, bt, db_path, wallet_service, manage_data_interval, maximum_full_file_count
wallet_rpc_port,
bt,
db_path,
wallet_service,
manage_data_interval,
maximum_full_file_count,
True,
group_files_by_store,
) as data_layer_service:
yield data_layer_service._api.data_layer

Expand Down Expand Up @@ -2188,12 +2198,15 @@ async def test_issue_15955_deadlock(


@pytest.mark.parametrize(argnames="maximum_full_file_count", argvalues=[1, 5, 100])
@boolean_datacases(name="group_files_by_store", false="group by singleton", true="don't group by singleton")
@pytest.mark.limit_consensus_modes(reason="does not depend on consensus rules")
@pytest.mark.anyio
async def test_maximum_full_file_count(
self_hostname: str,
one_wallet_and_one_simulator_services: SimulatorsAndWalletsServices,
tmp_path: Path,
maximum_full_file_count: int,
group_files_by_store: bool,
) -> None:
wallet_rpc_api, full_node_api, wallet_rpc_port, ph, bt = await init_wallet_and_node(
self_hostname, one_wallet_and_one_simulator_services
Expand All @@ -2205,6 +2218,7 @@ async def test_maximum_full_file_count(
db_path=tmp_path,
manage_data_interval=manage_data_interval,
maximum_full_file_count=maximum_full_file_count,
group_files_by_store=group_files_by_store,
) as data_layer:
data_rpc_api = DataLayerRpcApi(data_layer)
res = await data_rpc_api.create_data_store({})
Expand All @@ -2223,30 +2237,47 @@ async def test_maximum_full_file_count(
await asyncio.sleep(manage_data_interval * 2)
root_hash = await data_rpc_api.get_root({"id": store_id.hex()})
root_hashes.append(root_hash["hash"])
with os.scandir(data_layer.server_files_location) as entries:
expected_files_count = min(batch_count, maximum_full_file_count) + batch_count
server_files_location = (
data_layer.server_files_location.joinpath(f"{store_id}")
if group_files_by_store
else data_layer.server_files_location
)
with os.scandir(server_files_location) as entries:
filenames = {entry.name for entry in entries}
expected_files_count = min(batch_count, maximum_full_file_count) + batch_count

assert len(filenames) == expected_files_count

for generation, hash in enumerate(root_hashes):
filename = get_delta_filename(store_id, hash, generation + 1)
assert filename in filenames
filename = get_full_tree_filename(store_id, hash, generation + 1)
if generation + 1 > batch_count - maximum_full_file_count:
assert filename in filenames
else:
assert filename not in filenames
for generation, hash in enumerate(root_hashes):
delta_path = get_delta_filename_path(
data_layer.server_files_location,
store_id,
hash,
generation + 1,
group_files_by_store,
)
assert delta_path.exists()
full_file_path = get_full_tree_filename_path(
data_layer.server_files_location,
store_id,
hash,
generation + 1,
group_files_by_store,
)
if generation + 1 > batch_count - maximum_full_file_count:
assert full_file_path.exists()
else:
assert not full_file_path.exists()


@pytest.mark.parametrize("retain", [True, False])
@boolean_datacases(name="group_files_by_store", false="group by singleton", true="don't group by singleton")
@pytest.mark.limit_consensus_modes(reason="does not depend on consensus rules")
@pytest.mark.anyio
async def test_unsubscribe_removes_files(
self_hostname: str,
one_wallet_and_one_simulator_services: SimulatorsAndWalletsServices,
tmp_path: Path,
retain: bool,
group_files_by_store: bool,
) -> None:
wallet_rpc_api, full_node_api, wallet_rpc_port, ph, bt = await init_wallet_and_node(
self_hostname, one_wallet_and_one_simulator_services
Expand All @@ -2258,6 +2289,7 @@ async def test_unsubscribe_removes_files(
db_path=tmp_path,
manage_data_interval=manage_data_interval,
maximum_full_file_count=100,
group_files_by_store=group_files_by_store,
) as data_layer:
data_rpc_api = DataLayerRpcApi(data_layer)
res = await data_rpc_api.create_data_store({})
Expand All @@ -2278,18 +2310,37 @@ async def test_unsubscribe_removes_files(
root_hash = await data_rpc_api.get_root({"id": store_id.hex()})
root_hashes.append(root_hash["hash"])

filenames = {path.name for path in data_layer.server_files_location.iterdir()}
store_path = (
data_layer.server_files_location.joinpath(f"{store_id}")
if group_files_by_store
else data_layer.server_files_location
)
filenames = {path.name for path in store_path.iterdir()}
assert len(filenames) == 2 * update_count
for generation, hash in enumerate(root_hashes):
assert get_delta_filename(store_id, hash, generation + 1) in filenames
assert get_full_tree_filename(store_id, hash, generation + 1) in filenames
path = get_delta_filename_path(
data_layer.server_files_location,
store_id,
hash,
generation + 1,
group_files_by_store,
)
assert path.exists()
path = get_full_tree_filename_path(
data_layer.server_files_location,
store_id,
hash,
generation + 1,
group_files_by_store,
)
assert path.exists()

res = await data_rpc_api.unsubscribe(request={"id": store_id.hex(), "retain": retain})

# wait for unsubscribe to be processed
await asyncio.sleep(manage_data_interval * 3)

filenames = {path.name for path in data_layer.server_files_location.iterdir()}
filenames = {path.name for path in store_path.iterdir()}
assert len(filenames) == (2 * update_count if retain else 0)


Expand Down
Loading

0 comments on commit 74b14ce

Please sign in to comment.