Skip to content

Commit

Permalink
Add unit tests, fix minor bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
mjishnu committed Dec 30, 2024
1 parent 4d02b0d commit 258e31e
Show file tree
Hide file tree
Showing 6 changed files with 307 additions and 46 deletions.
4 changes: 2 additions & 2 deletions pypdl/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

from aiofiles import os

from downloader import Multidown, Singledown
from utils import FileValidator, combine_files, create_segment_table
from .downloader import Multidown, Singledown
from .utils import FileValidator, combine_files, create_segment_table


class Consumer:
Expand Down
20 changes: 10 additions & 10 deletions pypdl/producer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import asyncio

from utils import get_filepath
from .utils import get_filepath


class Producer:
Expand Down Expand Up @@ -90,7 +89,7 @@ async def enqueue_tasks(self, in_queue: asyncio.queues, out_queue):
async def _fetch_task_info(self, url, file_path, multisegment, **kwargs):
if callable(url):
url = url()
kwargs.update({"raise_for_status": False})

header = await self._fetch_header(url, **kwargs)
file_path = await get_filepath(url, header, file_path)
if size := int(header.get("content-length", 0)):
Expand All @@ -108,14 +107,15 @@ async def _fetch_task_info(self, url, file_path, multisegment, **kwargs):
return url, file_path, multisegment, etag, size

async def _fetch_header(self, url, **kwargs):
async with self.session.head(url, **kwargs) as response:
if response.status < 400:
self.logger.debug("Header acquired from HEAD request")
return response.headers
try:
async with self.session.head(url, **kwargs) as response:
if response.status < 400:
self.logger.debug("Header acquired from HEAD request")
return response.headers
except Exception:
pass

async with self.session.get(url, **kwargs) as response:
if response.status < 400:
self.logger.debug("Header acquired from GET request")
return response.headers
raise Exception(
f"Failed to get header (Status: {response.status}, Reason: {response.reason})"
)
52 changes: 21 additions & 31 deletions pypdl/pypdl.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,32 +7,21 @@

import aiohttp

from consumer import Consumer
from producer import Producer
from utils import (
AutoShutdownFuture,
EFuture,
LoggingExecutor,
ScreenCleaner,
Task,
TEventLoop,
cursor_up,
default_logger,
seconds_to_hms,
to_mb,
)
from . import utils
from .consumer import Consumer
from .producer import Producer


class Pypdl:
def __init__(
self,
max_concurrent: int = 1,
allow_reuse: bool = False,
logger: Logger = default_logger("Pypdl"),
logger: Logger = utils.default_logger("Pypdl"),
):
self._interrupt = Event()
self._pool = LoggingExecutor(logger, max_workers=1)
self._loop = TEventLoop()
self._pool = utils.LoggingExecutor(logger, max_workers=1)
self._loop = utils.TEventLoop()
self._producer = None
self._consumers = []
self._producer_queue = None
Expand Down Expand Up @@ -103,9 +92,10 @@ def start(
display: bool = True,
clear_terminal: bool = True,
**kwargs,
) -> Union[EFuture, AutoShutdownFuture]:
) -> Union[utils.EFuture, utils.AutoShutdownFuture]:
if not self.is_idle:
raise RuntimeError("Pypdl already running")
tasks = asyncio.all_tasks(self._loop.loop)
raise RuntimeError(f"Pypdl already running {tasks}")

if tasks and (url or file_path):
raise TypeError(
Expand All @@ -129,7 +119,7 @@ def start(
}
_kwargs.update(kwargs)
for i, task_kwargs in enumerate(tasks):
task = Task(
task = utils.Task(
multisegment,
segments,
retries,
Expand All @@ -144,7 +134,7 @@ def start(

coro = self._download_tasks(task_dict, display, clear_terminal)

self._future = EFuture(
self._future = utils.EFuture(
asyncio.run_coroutine_threadsafe(coro, self._loop.get()), self._loop
)

Expand All @@ -153,7 +143,7 @@ def start(
time.sleep(0.1)

if not self._allow_reuse:
future = AutoShutdownFuture(self._future, self._loop, self._pool)
future = utils.AutoShutdownFuture(self._future, self._loop, self._pool)
else:
future = self._future

Expand Down Expand Up @@ -200,7 +190,7 @@ async def _download_tasks(self, tasks_dict, display, clear_terminal):

self.time_spent = time.time() - start_time
if display:
print(f"Time elapsed: {seconds_to_hms(self.time_spent)}")
print(f"Time elapsed: {utils.seconds_to_hms(self.time_spent)}")

return self.success

Expand Down Expand Up @@ -242,7 +232,7 @@ def _progress_monitor(self, display, clear_terminal):
self._logger.debug("Starting progress monitor")
interval = 0.5
recent_queue = deque(maxlen=12)
with ScreenCleaner(display, clear_terminal):
with utils.ScreenCleaner(display, clear_terminal):
while not self.completed and not self._interrupt.is_set():
self._calc_values(recent_queue, interval)
if display:
Expand All @@ -269,7 +259,7 @@ def _calc_values(self, recent_queue, interval):

# Speed calculation
recent_queue.append(self.current_size)
non_zero_list = [to_mb(value) for value in recent_queue if value]
non_zero_list = [utils.to_mb(value) for value in recent_queue if value]
if len(non_zero_list) < 1:
self.speed = 0
elif len(non_zero_list) == 1:
Expand All @@ -280,12 +270,12 @@ def _calc_values(self, recent_queue, interval):

if self.size:
self.progress = int((self.current_size / self.size) * 100)
self.remaining_size = to_mb(self.size - self.current_size)
self.remaining_size = utils.to_mb(self.size - self.current_size)

if self.speed:
self.eta = seconds_to_hms(self.remaining_size / self.speed)
self.eta = self.remaining_size / self.speed
else:
self.eta = "99:59:59"
self.eta = -1

if self.completed_task is self.total_task:
future = asyncio.run_coroutine_threadsafe(
Expand All @@ -294,7 +284,7 @@ def _calc_values(self, recent_queue, interval):
future.result()

def _display(self):
cursor_up()
utils.cursor_up()
whitespace = " "
if self.size:
progress_bar = f"[{'█' * self.progress}{'·' * (100 - self.progress)}] {self.progress}% \n"
Expand All @@ -304,14 +294,14 @@ def _display(self):
else:
info1 = ""

info2 = f"Size: {to_mb(self.size):.2f} MB, Speed: {self.speed:.2f} MB/s, ETA: {self.eta}"
info2 = f"Size: {utils.to_mb(self.size):.2f} MB, Speed: {self.speed:.2f} MB/s, ETA: { utils.seconds_to_hms(self.eta)}"
print(progress_bar + info1 + info2 + whitespace * 35)
else:
if self.total_task > 1:
download_stats = f"[{'█' * self.task_progress}{'·' * (100 - self.task_progress)}] {self.task_progress}% \n"
else:
download_stats = f"Downloading... {whitespace * 95}\n"

info = f"Total Downloads: {self.completed_task}/{self.total_task}, Downloaded Size: {to_mb(self.current_size):.2f} MB, Speed: {self.speed:.2f} MB/s"
info = f"Total Downloads: {self.completed_task}/{self.total_task}, Downloaded Size: {utils.to_mb(self.current_size):.2f} MB, Speed: {self.speed:.2f} MB/s"

print(download_stats + info + whitespace * 35)
12 changes: 9 additions & 3 deletions pypdl/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ def to_mb(size_in_bytes: int) -> float:


def seconds_to_hms(sec: float) -> str:
if sec == -1:
return "99:59:59"
time_struct = time.gmtime(sec)
return time.strftime("%H:%M:%S", time_struct)

Expand Down Expand Up @@ -177,7 +179,12 @@ class TEventLoop:

def __init__(self):
self.loop = asyncio.new_event_loop()
Thread(target=self.loop.run_forever, daemon=True).start()
self._thread = Thread(target=self._run, daemon=False)
self._thread.start()

def _run(self):
self.loop.run_forever()
self.loop.close()

def get(self) -> asyncio.AbstractEventLoop:
return self.loop
Expand All @@ -196,8 +203,7 @@ def clear_wait(self):
def stop(self, *args):
self.clear_wait()
self.call_soon_threadsafe(self.loop.stop)
while self.loop.is_running():
time.sleep(0.1)
self._thread.join()


class LoggingExecutor:
Expand Down
1 change: 1 addition & 0 deletions test/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from . import test
Loading

0 comments on commit 258e31e

Please sign in to comment.