Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Really fix Data layer download banning #17664

Merged
merged 2 commits into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions chia/_tests/core/data_layer/test_data_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from random import Random
from typing import Any, Awaitable, Callable, Dict, List, Optional, Set, Tuple, cast

import aiohttp
import aiosqlite
import pytest

Expand Down Expand Up @@ -1391,8 +1392,8 @@ async def mock_http_download(
server_info: ServerInfo,
timeout: int,
log: logging.Logger,
) -> bool:
return False
) -> None:
raise aiohttp.ClientConnectionError()

start_timestamp = int(time.time())
with monkeypatch.context() as m:
Expand Down
3 changes: 2 additions & 1 deletion chia/data_layer/data_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -1858,7 +1858,7 @@ async def received_correct_file(self, tree_id: bytes32, server_info: ServerInfo)
)
await self.update_server_info(tree_id, new_server_info)

async def server_misses_file(self, tree_id: bytes32, server_info: ServerInfo, timestamp: int) -> None:
async def server_misses_file(self, tree_id: bytes32, server_info: ServerInfo, timestamp: int) -> ServerInfo:
# Max banned time is 1 hour.
BAN_TIME_BY_MISSING_COUNT = [5 * 60] * 3 + [15 * 60] * 3 + [30 * 60] * 2 + [60 * 60]
index = min(server_info.num_consecutive_failures, len(BAN_TIME_BY_MISSING_COUNT) - 1)
Expand All @@ -1868,6 +1868,7 @@ async def server_misses_file(self, tree_id: bytes32, server_info: ServerInfo, ti
ignore_till=max(server_info.ignore_till, timestamp + BAN_TIME_BY_MISSING_COUNT[index]),
)
await self.update_server_info(tree_id, new_server_info)
return new_server_info

async def get_available_servers_for_store(self, tree_id: bytes32, timestamp: int) -> List[ServerInfo]:
subscriptions = await self.get_subscriptions()
Expand Down
22 changes: 16 additions & 6 deletions chia/data_layer/download_data.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import asyncio
import logging
import os
import time
Expand Down Expand Up @@ -156,9 +157,16 @@ async def insert_from_delta_file(
filename = get_delta_filename(tree_id, root_hash, existing_generation)
request_json = {"url": server_info.url, "client_folder": str(client_foldername), "filename": filename}
if downloader is None:
# use http downloader
if not await http_download(client_foldername, filename, proxy_url, server_info, timeout, log):
await data_store.server_misses_file(tree_id, server_info, timestamp)
# use http downloader - this raises on any error
try:
await http_download(client_foldername, filename, proxy_url, server_info, timeout, log)
except (asyncio.TimeoutError, aiohttp.ClientError):
new_server_info = await data_store.server_misses_file(tree_id, server_info, timestamp)
log.info(
f"Failed to download {filename} from {new_server_info.url}."
f"Miss {new_server_info.num_consecutive_failures}."
)
log.info(f"Next attempt from {new_server_info.url} in {new_server_info.ignore_till - timestamp}s.")
return False
else:
log.info(f"Using downloader {downloader} for store {tree_id.hex()}.")
Expand Down Expand Up @@ -230,7 +238,11 @@ async def http_download(
server_info: ServerInfo,
timeout: int,
log: logging.Logger,
) -> bool:
) -> None:
"""
Download a file from a server using aiohttp.
Raises exceptions on errors
"""
async with aiohttp.ClientSession() as session:
headers = {"accept-encoding": "gzip"}
async with session.get(
Expand All @@ -250,5 +262,3 @@ async def http_download(
if new_percentage != progress_percentage:
progress_percentage = new_percentage
log.info(f"Downloading delta file {filename}. {progress_percentage} of {size} bytes.")

return True
Loading