Skip to content

Commit

Permalink
Add proper logging support
Browse files Browse the repository at this point in the history
  • Loading branch information
mjishnu committed Jun 17, 2024
1 parent 4fb506b commit af7ca11
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 24 deletions.
41 changes: 36 additions & 5 deletions pypdl/factory.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,32 @@
import logging
import threading
import time
from concurrent.futures import Future, ThreadPoolExecutor, as_completed
from logging import Logger, getLogger
from typing import Union

from .manager import DownloadManager as Pypdl
from .utls import (
AutoShutdownFuture,
ScreenCleaner,
cursor_up,
default_logger,
seconds_to_hms,
to_mb,
)


class Factory:
def __init__(self, instances: int = 2, allow_reuse: bool = False, **kwargs):
self._instances = [Pypdl(True, **kwargs) for _ in range(instances)]
def __init__(
self,
instances: int = 2,
allow_reuse: bool = False,
logger: Logger = default_logger("PypdlFactory"),
**kwargs,
):
self._instances = [
Pypdl(True, getLogger(f"PypdlFactory.instance-{i}"), **kwargs)
for i in range(instances)
]
self._allow_reuse = allow_reuse
self._pool = ThreadPoolExecutor(max_workers=2)
self._stop = False
Expand All @@ -35,6 +45,7 @@ def __init__(self, instances: int = 2, allow_reuse: bool = False, **kwargs):
self.completed = []
self.failed = []
self.remaining = []
self.logger = logger

def start(
self, tasks: list, display: bool = True, block: bool = True
Expand All @@ -53,6 +64,7 @@ def start(
list: If `block` is True. This is a list of tuples where each tuple contains the URL of the download and the result of the download.
"""
self._reset()
self.logger.debug("Downloading %s files", len(tasks))
if self._allow_reuse:
future = self._pool.submit(self._execute, tasks, display)
else:
Expand All @@ -70,19 +82,23 @@ def start(
def stop(self) -> None:
"""Stops all active downloads."""
with self._stop_lock:
self.logger.debug("Initiating download stop")
self._lock.set()
self._stop = True
for instance in self._instances:
instance.stop()
while self._lock.is_set():
time.sleep(0.5)
time.sleep(1)
self.logger.debug("Download stopped")

def shutdown(self) -> None:
"""Shutdown the factory."""
self.logger.debug("Shutting down factory")
for instance in self._instances:
instance.shutdown()
self._pool.shutdown()
self.logger.debug("Factory shutdown")

def _reset(self):
self._stop = False
Expand All @@ -93,6 +109,7 @@ def _reset(self):
self.completed.clear()
self.failed.clear()
self.remaining.clear()
self.logger.debug("Reseted download factory")

def _execute(self, tasks, display):
start_time = time.time()
Expand All @@ -106,8 +123,10 @@ def _execute(self, tasks, display):

self._pool.submit(self._compute, display)

self.logger.debug("Initiated waiting loop")
while len(self.completed) + len(self.failed) != self.total:
if self._stop:
self.logger.debug("Exit waiting loop, download interrupted")
break

for future in as_completed(futures):
Expand All @@ -121,11 +140,13 @@ def _execute(self, tasks, display):
self._manage_remaining(instance, futures)

self.time_spent = time.time() - start_time
self.logger.debug("Processed final task, clearing lock")
self._lock.clear()

self.logger.debug("Exit waiting loop, download completed")
return self.completed

def _add_future(self, instance, task, futures):
self.logger.debug("Adding new task")
url, *kwargs = task
instance._status = None
kwargs = kwargs[0] if kwargs else {}
Expand All @@ -134,37 +155,46 @@ def _add_future(self, instance, task, futures):
futures[future] = (instance, url)
while instance._status is None:
time.sleep(0.1)
self.logger.debug("Added new task: %s", url)

def _handle_completed(self, instance, curr_url, result):
self.logger.debug("Handling completed download, setting lock")
self._lock.set()
if instance.size:
self._completed_size += instance.size
else:
self._prog = False
self._completed_prog += int((1 / self.total) * 100)
self.completed.append((curr_url, result))
self.logger.debug("Download completed: %s", curr_url)

def _handle_failed(self, curr_url):
self.logger.debug("Handling failed download, setting lock")
self._lock.set()
self.failed.append(curr_url)
logging.error("Download failed: %s", curr_url)
self.logger.error("Download failed: %s", curr_url)

def _manage_remaining(self, instance, futures):
with self._stop_lock:
if self._stop:
self.logger.debug("Stop Initiated, removing instance from running")
self._running.remove(instance)
return

if self.remaining:
self.logger.debug("Remaining tasks: %s", len(self.remaining))
self._add_future(instance, self.remaining.pop(0), futures)
else:
self.logger.debug("No remaining tasks, removing instance from running")
self._running.remove(instance)

if len(self.completed) + len(self.failed) != self.total:
self.logger.debug("Not final task, releasing lock")
self._lock.clear()
time.sleep(0.5)

def _compute(self, display):
self.logger.debug("Starting download computation")
with ScreenCleaner(display):
while True:
if not self._lock.is_set():
Expand All @@ -185,6 +215,7 @@ def _compute(self, display):
if display:
self._display()
print(f"Time elapsed: {seconds_to_hms(self.time_spent)}")
self.logger.debug("Computation ended")

def _calc_values(self):
def sum_attribute(instances, attribute):
Expand Down
10 changes: 0 additions & 10 deletions pypdl/main.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,6 @@
import logging

from .factory import Factory
from .manager import DownloadManager

handler = logging.FileHandler("pypdl.log", mode="a", delay=True)
handler.setFormatter(
logging.Formatter(
"%(asctime)s - %(levelname)s: %(message)s", datefmt="%d-%m-%y %H:%M:%S"
)
)
logging.basicConfig(level=logging.INFO, handlers=[handler])


class Pypdl(DownloadManager):
"""
Expand Down
43 changes: 35 additions & 8 deletions pypdl/manager.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import asyncio
import logging
import time
from collections import deque
from concurrent.futures import Future, ThreadPoolExecutor
from logging import Logger
from pathlib import Path
from threading import Event
from typing import Callable, Optional, Union
Expand All @@ -17,14 +17,20 @@
combine_files,
create_segment_table,
cursor_up,
default_logger,
get_filepath,
seconds_to_hms,
to_mb,
)


class DownloadManager:
def __init__(self, allow_reuse: bool = False, **kwargs):
def __init__(
self,
allow_reuse: bool = False,
logger: Logger = default_logger("Pypdl"),
**kwargs,
):
self._pool = ThreadPoolExecutor(max_workers=2)
self._workers = []
self._status = 0
Expand All @@ -46,6 +52,7 @@ def __init__(self, allow_reuse: bool = False, **kwargs):
self.remaining = None
self.failed = False
self.completed = False
self.logger = logger

def start(
self,
Expand Down Expand Up @@ -86,10 +93,10 @@ def download():
for i in range(retries + 1):
try:
_url = mirror_func() if i > 0 and callable(mirror_func) else url
if i > 0:
logging.info("Retrying... (%d/%d)", i, retries)

self._reset()

self.logger.debug("Downloading, url: %s attempt: %s", _url, (i + 1))

result = self._execute(
_url,
file_path,
Expand All @@ -108,10 +115,11 @@ def download():
time.sleep(3)

except Exception as e:
logging.error("(%s) [%s]", e.__class__.__name__, e)
self.logger.error("(%s) [%s]", e.__class__.__name__, e)

self._status = 1
self.failed = True
self.logger.debug("Download failed, url: %s", _url)
return None

if self._allow_reuse:
Expand All @@ -130,10 +138,12 @@ def stop(self) -> None:
self._interrupt.set()
self._stop = True
time.sleep(1)
self.logger.debug("Download stoped")

def shutdown(self) -> None:
"""Shutdown the download manager."""
self._pool.shutdown()
self.logger.debug("Shutdown download manger")

def _reset(self):
self._workers.clear()
Expand All @@ -149,6 +159,7 @@ def _reset(self):
self.remaining = None
self.failed = False
self.completed = False
self.logger.debug("Reseted download manager")

def _execute(
self, url, file_path, segments, display, multisegment, etag, overwrite
Expand All @@ -162,12 +173,15 @@ def _execute(
if not overwrite and Path(file_path).exists():
self._status = 1
self.completed = True
self.time_spent = time.time() - start_time
self.logger.debug("File already exists, download completed")
return FileValidator(file_path)

if multisegment:
segment_table = create_segment_table(
url, file_path, segments, self.size, etag
)
self.logger.debug("Segment table created: %s", str(segment_table))
segments = segment_table["segments"]

self._pool.submit(
Expand All @@ -180,6 +194,7 @@ def _execute(
download_mode = "Multi-Segment" if multisegment else "Single-Segment"
interval = 0.5
self._status = 1
self.logger.debug("Initiated waiting loop")
with ScreenCleaner(display):
while True:
status = sum(worker.completed for worker in self._workers)
Expand All @@ -190,13 +205,16 @@ def _execute(

if self._interrupt.is_set():
self.time_spent = time.time() - start_time
self.logger.debug("Exit waiting loop, download interrupted")
return None

if status and status == len(self._workers):
if multisegment:
self.logger.debug("Combining files")
combine_files(file_path, segments)
self.completed = True
self.time_spent = time.time() - start_time
self.logger.debug("Exit waiting loop, download completed")
return FileValidator(file_path)

time.sleep(interval)
Expand All @@ -205,14 +223,17 @@ def _get_info(self, url, file_path, multisegment, etag):
header = asyncio.run(self._get_header(url))
file_path = get_filepath(url, header, file_path)
if size := int(header.get("content-length", 0)):
self.logger.debug("Size accquired from header")
self.size = size

etag = header.get("etag", not etag) # since we check truthiness of etag

if isinstance(etag, str):
self.logger.debug("ETag accquired from header")
etag = etag.strip('"')

if not self.size or not header.get("accept-ranges"):
self.logger.debug("Single segment download, accept-ranges header not found")
multisegment = False

return file_path, multisegment, etag
Expand All @@ -221,14 +242,17 @@ async def _get_header(self, url):
async with aiohttp.ClientSession() as session:
async with session.head(url, **self._kwargs) as response:
if response.status == 200:
self.logger.debug("Header accquired from head request")
return response.headers

async with session.get(url, **self._kwargs) as response:
if response.status == 200:
self.logger.debug("Header accquired from get request")
return response.headers

async def _multi_segment(self, segments, segment_table):
tasks = []
self.logger.debug("Multi-Segment download started")
async with aiohttp.ClientSession() as session:
for segment in range(segments):
md = Multidown(self._interrupt)
Expand All @@ -240,18 +264,21 @@ async def _multi_segment(self, segments, segment_table):
)
try:
await asyncio.gather(*tasks)
self.logger.debug("Downloaded all segments")
except Exception as e:
logging.error("(%s) [%s]", e.__class__.__name__, e)
self.logger.error("(%s) [%s]", e.__class__.__name__, e)
self._interrupt.set()

async def _single_segment(self, url, file_path):
self.logger.debug("Single-Segment download started")
async with aiohttp.ClientSession() as session:
sd = Simpledown(self._interrupt)
self._workers.append(sd)
try:
await sd.worker(url, file_path, session, **self._kwargs)
self.logger.debug("Downloaded single segement")
except Exception as e:
logging.error("(%s) [%s]", e.__class__.__name__, e)
self.logger.error("(%s) [%s]", e.__class__.__name__, e)
self._interrupt.set()

def _calc_values(self, recent_queue, interval):
Expand Down
Loading

0 comments on commit af7ca11

Please sign in to comment.