Skip to content

Commit

Permalink
upload event tracker
Browse files Browse the repository at this point in the history
  • Loading branch information
kathia-barahona committed Oct 4, 2023
1 parent c6d3db4 commit 68f3061
Show file tree
Hide file tree
Showing 6 changed files with 260 additions and 55 deletions.
6 changes: 0 additions & 6 deletions pghoard/basebackup/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,17 +316,13 @@ def run_piped_basebackup(self):
})
metadata.update(self.metadata)

def callback(n_bytes: int) -> None:
self.metrics.increase("pghoard.basebackup_bytes_uploaded", inc_value=n_bytes, tags={"delta": False})

self.transfer_queue.put(
UploadEvent(
file_type=FileType.Basebackup,
backup_site_name=self.site,
file_path=basebackup_path,
callback_queue=self.callback_queue,
file_size=compressed_file_size,
incremental_progress_callback=callback,
source_data=stream_target,
remove_after_upload=True,
metadata=metadata
Expand Down Expand Up @@ -615,7 +611,6 @@ def run_local_tar_basebackup(self, delta: bool = False, with_delta_stats: bool =
data_file_format,
compressed_base,
delta_stats=delta_stats,
delta=delta,
file_type=FileType.Basebackup_chunk
)

Expand Down Expand Up @@ -725,7 +720,6 @@ def run_local_tar_basebackup(self, delta: bool = False, with_delta_stats: bool =
callback_queue=self.callback_queue,
chunk_path=Path(data_file_format(0)), # pylint: disable=too-many-format-args
temp_dir=compressed_base,
delta=delta,
files_to_backup=control_files,
file_type=FileType.Basebackup,
extra_metadata={
Expand Down
9 changes: 0 additions & 9 deletions pghoard/basebackup/chunks.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ def tar_one_file(
chunk_path,
files_to_backup,
callback_queue: CallbackQueue,
delta: bool,
file_type: FileType = FileType.Basebackup_chunk,
extra_metadata: Optional[Dict[str, Any]] = None,
delta_stats: Optional[DeltaStats] = None
Expand Down Expand Up @@ -193,9 +192,6 @@ def tar_one_file(

middle_path, chunk_name = ChunkUploader.chunk_path_to_middle_path_name(Path(chunk_path), file_type)

def callback(n_bytes: int) -> None:
self.metrics.increase("pghoard.basebackup_bytes_uploaded", inc_value=n_bytes, tags={"delta": delta})

self.transfer_queue.put(
UploadEvent(
callback_queue=callback_queue,
Expand All @@ -204,7 +200,6 @@ def callback(n_bytes: int) -> None:
file_path=middle_path / chunk_name,
source_data=chunk_path,
metadata=metadata,
incremental_progress_callback=callback,
backup_site_name=self.site,
)
)
Expand All @@ -220,15 +215,13 @@ def handle_single_chunk(
chunks,
index: int,
temp_dir: Path,
delta: bool,
delta_stats: Optional[DeltaStats] = None,
file_type: FileType = FileType.Basebackup_chunk
) -> Dict[str, Any]:
one_chunk_files = chunks[index]
chunk_name, input_size, result_size = self.tar_one_file(
callback_queue=chunk_callback_queue,
chunk_path=chunk_path,
delta=delta,
temp_dir=temp_dir,
files_to_backup=one_chunk_files,
delta_stats=delta_stats,
Expand Down Expand Up @@ -269,7 +262,6 @@ def create_and_upload_chunks(
chunks,
data_file_format: Callable[[int], str],
temp_base_dir: Path,
delta: bool,
delta_stats: Optional[DeltaStats] = None,
file_type: FileType = FileType.Basebackup_chunk,
chunks_max_progress: float = 100.0
Expand Down Expand Up @@ -306,7 +298,6 @@ def create_and_upload_chunks(
chunks=chunks,
index=i,
temp_dir=temp_base_dir,
delta=delta,
delta_stats=delta_stats,
file_type=file_type
)
Expand Down
5 changes: 0 additions & 5 deletions pghoard/basebackup/delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,6 @@ def progress_callback(n_bytes: int = 1) -> None:

dest_path = Path("basebackup_delta") / result_digest

def callback(n_bytes: int) -> None:
self.metrics.increase("pghoard.basebackup_bytes_uploaded", inc_value=n_bytes, tags={"delta": True})

self.transfer_queue.put(
UploadEvent(
callback_queue=callback_queue,
Expand All @@ -201,7 +198,6 @@ def callback(n_bytes: int) -> None:
backup_site_name=self.site,
metadata=metadata,
file_path=dest_path,
incremental_progress_callback=callback,
source_data=chunk_path
)
)
Expand Down Expand Up @@ -363,7 +359,6 @@ def _upload_chunks(self, delta_chunks, chunks_max_progress: float) -> Tuple[Uplo
chunk_files = self.chunk_uploader.create_and_upload_chunks(
chunks=delta_chunks,
data_file_format=self.data_file_format,
delta=True,
temp_base_dir=self.compressed_base,
file_type=FileType.Basebackup_delta_chunk,
chunks_max_progress=chunks_max_progress,
Expand Down
13 changes: 11 additions & 2 deletions pghoard/pghoard.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
is_basebackup_preserved, parse_preservation_requests, patch_basebackup_metadata_with_preservation
)
from pghoard.receivexlog import PGReceiveXLog
from pghoard.transfer import TransferAgent, TransferQueue, UploadEvent
from pghoard.transfer import (TransferAgent, TransferQueue, UploadEvent, UploadEventProgressTracker)
from pghoard.walreceiver import WALReceiver
from pghoard.webserver import WebServer

Expand Down Expand Up @@ -143,6 +143,9 @@ def __init__(self, config_path):
self.requested_basebackup_sites = set()
self.inotify_adapter = InotifyAdapter(self.compression_queue)
self.inotify = InotifyWatcher(self.inotify_adapter)

self.upload_tracker = UploadEventProgressTracker(metrics=self.metrics)

self.webserver = WebServer(
self.config, self.requested_basebackup_sites, self.compression_queue, self.transfer_queue, self.metrics
)
Expand All @@ -167,6 +170,7 @@ def __init__(self, config_path):
config=self.config,
mp_manager=self.mp_manager,
transfer_queue=self.transfer_queue,
upload_tracker=self.upload_tracker,
metrics=self.metrics,
shared_state_dict=self.transfer_agent_state
)
Expand Down Expand Up @@ -695,6 +699,7 @@ def startup_walk_for_missed_files(self):
def start_threads_on_startup(self):
# Startup threads
self.inotify.start()
self.upload_tracker.start()
self.webserver.start()
self.wal_file_deleter.start()
for compressor in self.compressors:
Expand Down Expand Up @@ -971,7 +976,11 @@ def load_config(self, _signal=None, _frame=None): # pylint: disable=unused-argu

def _get_all_threads(self):
all_threads = []
# on first config load webserver isn't initialized yet

# on first config load upload_tracker and webserver aren't initialized yet
if hasattr(self, "upload_tracker"):
all_threads.append(self.upload_tracker)

if hasattr(self, "webserver"):
all_threads.append(self.webserver)
all_threads.extend(self.basebackups.values())
Expand Down
164 changes: 159 additions & 5 deletions pghoard/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,27 @@
import enum
import logging
import os
import threading
import time
from contextlib import suppress
from dataclasses import dataclass
from functools import partial
from io import BytesIO
from pathlib import Path
from queue import Empty
from threading import Lock
from typing import Any, BinaryIO, Dict, Optional, Union
from typing import Any, BinaryIO, Dict, List, Optional, Union

from rohmu import get_transfer
from rohmu.errors import FileNotFoundFromStorageError
from rohmu.object_storage.base import (BaseTransfer, IncrementalProgressCallbackType)
from rohmu.object_storage.base import BaseTransfer

from pghoard.common import (
CallbackEvent, CallbackQueue, FileType, PGHoardThread, Queue, QuitEvent, StrEnum, create_alert_file,
get_object_storage_config
)
from pghoard.fetcher import FileFetchManager
from pghoard.metrics import Metrics

_STATS_LOCK = Lock()
_last_stats_transmit_time = 0
Expand Down Expand Up @@ -54,7 +57,6 @@ class UploadEvent(BaseTransferEvent):
file_size: Optional[int]
remove_after_upload: bool = True
retry_number: int = 0
incremental_progress_callback: Optional[IncrementalProgressCallbackType] = None

@property
def operation(self):
Expand Down Expand Up @@ -99,15 +101,156 @@ def operation(self):
TransferQueue = Queue


@dataclass
class TransferIncrement:
n_bytes: float
tracked_at: float = dataclasses.field(default_factory=time.monotonic)


@dataclass
class UploadEventProgress:
key: str
file_size: Optional[int]
file_type: FileType
increments: List[TransferIncrement] = dataclasses.field(default_factory=list)
started_at: float = dataclasses.field(default_factory=time.monotonic)

def is_completed(self) -> bool:
# UploadEvents without file_size are allowed, therefore we cannot determine if the event is completed
if not self.file_size:
return False

total_nbytes = sum(increment.n_bytes for increment in self.increments)
return total_nbytes >= self.file_size


class UploadEventProgressTracker(PGHoardThread):
CHECK_FREQUENCY = 5. # check every 5 seconds for progress
WARNING_TIMEOUT = 5. * 60 # log a warning in case there is no progress during last 5 minutes

def __init__(self, metrics: Metrics) -> None:
self.metrics = metrics
self.log = logging.getLogger("UploadEventProgressTracker")

self.running: bool = False

self._tracked_events: Dict[str, UploadEventProgress] = {}
self._tracked_events_lock = threading.Lock()
self.log.debug("UploadEventProgressTracker initialized")

super().__init__()

def track_upload_event(self, file_key: str, file_type: FileType, file_size: Optional[int]) -> None:
with self._tracked_events_lock:
self.log.info("Tracking upload event for file %s", file_key)
self._tracked_events[file_key] = UploadEventProgress(key=file_key, file_type=file_type, file_size=file_size)

def untrack_upload_event(self, file_key: str) -> None:
if file_key not in self._tracked_events:
return

with self._tracked_events_lock:
self._tracked_events.pop(file_key)

def increment(self, file_key: str, n_bytes: float) -> None:
with self._tracked_events_lock:
if file_key not in self._tracked_events:
raise Exception(f"UploadEvent for {file_key} is not being tracked.")

file_type = self._tracked_events[file_key].file_type
if file_type in (
FileType.Basebackup, FileType.Basebackup_chunk, FileType.Basebackup_delta, FileType.Basebackup_delta_chunk
):
delta = file_type in (FileType.Basebackup_delta, FileType.Basebackup_delta_chunk)
self.metrics.increase(
"pghoard.basebackup_bytes_uploaded",
inc_value=n_bytes,
tags={"delta": delta},
)
elif file_type in (FileType.Wal, FileType.Timeline):
self.metrics.increase("pghoard.compressed_file_upload", inc_value=n_bytes)

self._tracked_events[file_key].increments.append(TransferIncrement(n_bytes=n_bytes))

# if the file is fully upload, then stop tracking it
if self._tracked_events[file_key].is_completed():
self._tracked_events.pop(file_key)

def reset(self) -> None:
with self._tracked_events_lock:
self._tracked_events = {}
self.running = False

def run_safe(self):
try:
self.running = True

while self.running:
with self._tracked_events_lock:
self._check_increment_rate()

time.sleep(self.CHECK_FREQUENCY)
except Exception: # pylint: disable=broad-except
self.log.exception("Failed to update transfer rate %s", "pghoard.compressed_file_upload")
self.metrics.increase("pghoard.transfer_operation.errors")
self.reset()
self.stop()

self.log.debug("Quitting UploadEventProgressTracker")

def stop(self) -> None:
self.running = False

def _check_increment_rate(self) -> None:
"""
Check if the transfer operation is progressing by comparing the time elapsed since
last increment with the average time it took for previous increments. If the operation has been inactive,
a warning will be logged.
"""
now = time.monotonic()
for ue_progress in self._tracked_events.values():
if ue_progress.is_completed():
continue

last_increment_at = ue_progress.started_at
avg_rate = 0.

if ue_progress.increments:
# total "waiting" time between all increments
total_increment_diff = sum(
next_inc.tracked_at - prev_inc.tracked_at
for prev_inc, next_inc in zip(ue_progress.increments, ue_progress.increments[1:])
)
if len(ue_progress.increments) > 1:
avg_rate = total_increment_diff / (len(ue_progress.increments) - 1)
last_increment_at = ue_progress.increments[-1].tracked_at

# log warning in case we have not tracked any progress for the operation since
# the last check
if last_increment_at and (now - last_increment_at) >= avg_rate + self.WARNING_TIMEOUT:
self.log.warning(
"Upload for file %s has been inactive since %s seconds.", ue_progress.key, now - last_increment_at
)


class TransferAgent(PGHoardThread):
def __init__(self, config, mp_manager, transfer_queue: TransferQueue, metrics, shared_state_dict):
def __init__(
self,
config,
mp_manager,
transfer_queue: TransferQueue,
upload_tracker: UploadEventProgressTracker,
metrics: Metrics,
shared_state_dict,
):
super().__init__()
self.log = logging.getLogger("TransferAgent")
self.config = config
self.metrics = metrics
self.mp_manager = mp_manager
self.fetch_manager = FileFetchManager(self.config, self.mp_manager, self.get_object_storage)
self.transfer_queue = transfer_queue
self.upload_tracker = upload_tracker
self.running = True
self.sleep = time.sleep
self.state = shared_state_dict
Expand Down Expand Up @@ -324,9 +467,20 @@ def handle_upload(self, site, key, file_to_transfer: UploadEvent):
metadata = file_to_transfer.metadata.copy()
if file_to_transfer.file_size:
metadata["Content-Length"] = str(file_to_transfer.file_size)
self.upload_tracker.track_upload_event(
file_key=key,
file_type=file_to_transfer.file_type,
file_size=file_to_transfer.file_size,
)
upload_progress_fn = partial(self.upload_tracker.increment, file_key=key)
storage.store_file_object(
key, f, metadata=metadata, upload_progress_fn=file_to_transfer.incremental_progress_callback
key,
f,
metadata=metadata,
upload_progress_fn=lambda n_bytes: upload_progress_fn(n_bytes=n_bytes),
)
# make sure we untrack it manually
self.upload_tracker.untrack_upload_event(key)
if unlink_local:
if isinstance(file_to_transfer.source_data, Path):
try:
Expand Down
Loading

0 comments on commit 68f3061

Please sign in to comment.