Skip to content

Commit

Permalink
This PR refactors the basebackups monitoring introduced in PR #615. P…
Browse files Browse the repository at this point in the history
…reviously, we reset the basebackup progress file whenever a new basebackup request was made, which resulted in not catching a few cases where pghoard restarts. Now, the progress file is only reset when a backup is successful, and we also record the total bytes uploaded in the file for the previous basebackup. If there is a retry due to a pghoard restart or a failed backup request, we check if progress has been made; if it has not exceeded the bytes uploaded in the previous state, we emit a stalled metric. Also, added logging for upload progress for each file and snapshot stages in a basebackup operation.

[SRE-7476]
  • Loading branch information
sebinsunny committed May 31, 2024
1 parent cea3d0b commit 75a935c
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 56 deletions.
4 changes: 2 additions & 2 deletions pghoard/basebackup/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -564,8 +564,6 @@ def run_local_tar_basebackup(self, delta: bool = False, with_delta_stats: bool =
db_conn.commit()

self.log.info("Starting to backup %r and %r tablespaces to %r", pgdata, len(tablespaces), compressed_base)
progress_instance = PersistedProgress()
progress_instance.reset_all(metrics=self.metrics)
start_time = time.monotonic()

if delta:
Expand Down Expand Up @@ -686,6 +684,8 @@ def run_local_tar_basebackup(self, delta: bool = False, with_delta_stats: bool =
"%r byte input, %r byte output, took %r seconds, waiting to upload", total_file_count, chunks_count,
total_size_plain, total_size_enc, backup_time
)
progress_instance = PersistedProgress()
progress_instance.reset_all(metrics=self.metrics)

finally:
db_conn.rollback()
Expand Down
4 changes: 4 additions & 0 deletions pghoard/basebackup/delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ def progress_callback(progress_step: ProgressStep, progress_data: ProgressMetric
persisted_progress.write(self.metrics)
self.last_flush_time = time.monotonic()
self.metrics.gauge("pghoard.seconds_since_backup_progress_stalled", 0, tags=tags)
self.log.info(
"Updated snapshot progress for %s to %d files; elapsed time since last check: %.2f seconds.",
progress_step.value, progress_data["handled"], elapsed
)
else:
stalled_age = progress_info.age
self.metrics.gauge("pghoard.seconds_since_backup_progress_stalled", stalled_age, tags=tags)
Expand Down
56 changes: 21 additions & 35 deletions pghoard/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,7 @@ def untrack_upload_event(self, file_key: str) -> None:
with self._tracked_events_lock:
self._tracked_events.pop(file_key)

def increment(self, file_key: str, total_bytes_uploaded: float) -> None:
metric_data = {}
def increment(self, file_key: str, file_size: int, total_bytes_uploaded: float) -> None:
persisted_progress = PersistedProgress.read(metrics=self.metrics)

with self._tracked_events_lock:
Expand All @@ -160,9 +159,11 @@ def increment(self, file_key: str, total_bytes_uploaded: float) -> None:
FileType.Basebackup_delta,
FileType.Basebackup_delta_chunk,
):
progress_info = persisted_progress.get(file_key)
if total_bytes_uploaded > progress_info.current_progress:
progress_info.update(total_bytes_uploaded)
progress_info = persisted_progress.get("total_bytes_uploaded")
updated_total_bytes_uploaded = progress_info.current_progress + total_bytes_uploaded

if updated_total_bytes_uploaded > progress_info.current_progress:
progress_info.update(updated_total_bytes_uploaded)
persisted_progress.write(metrics=self.metrics)
self.metrics.gauge("pghoard.seconds_since_backup_progress_stalled", 0)
else:
Expand All @@ -174,19 +175,14 @@ def increment(self, file_key: str, total_bytes_uploaded: float) -> None:
file_key,
stalled_age,
)
metric_data = {
"metric": "pghoard.basebackup_bytes_uploaded",
"inc_value": total_bytes_uploaded,
"tags": {
"delta": file_type in (FileType.Basebackup_delta, FileType.Basebackup_delta_chunk)
},
}
elif file_type in (FileType.Wal, FileType.Timeline):
metric_data = {"metric": "pghoard.compressed_file_upload", "inc_value": total_bytes_uploaded}
upload_percentage = (total_bytes_uploaded / file_size) * 100
if upload_percentage > 90 or upload_percentage < 10:
self.log.info(
"Upload progress for file %s is at %s%%, uploaded %s bytes out of %s.", file_key,
round(upload_percentage, 2), total_bytes_uploaded, file_size
)

self._tracked_events[file_key].increments.append(TransferIncrement(total_bytes_uploaded=total_bytes_uploaded))
if metric_data:
self.metrics.increase(**metric_data)

def reset(self) -> None:
with self._tracked_events_lock:
Expand All @@ -204,7 +200,7 @@ def run_safe(self):
time.sleep(self.CHECK_FREQUENCY)
except Exception: # pylint: disable=broad-except
self.log.exception("Failed to update transfer rate %s", "pghoard.compressed_file_upload")
self.metrics.increase("pghoard.transfer_operation.errors")
self.metrics.increase("pghoard.transfer_operation_errors")
self.reset()
self.stop()

Expand Down Expand Up @@ -423,18 +419,11 @@ def run_safe(self):
if file_to_transfer.callback_queue:
file_to_transfer.callback_queue.put(result)

self.log.info(
"%r %stransfer of key: %r, size: %r, took %.3fs", oper, "FAILED " if not result.success else "", key,
oper_size,
time.monotonic() - start_time
)

if file_to_transfer.operation in {TransferOperation.Upload} and filetype in (
FileType.Basebackup, FileType.Basebackup_chunk, FileType.Basebackup_delta, FileType.Basebackup_delta_chunk
):
if result.success:
persisted_progress = PersistedProgress.read(metrics=self.metrics)
persisted_progress.reset(key, metrics=self.metrics)
operation_type = file_to_transfer.operation
status = "FAILED" if not result.success else "successfully"
log_msg = f"{operation_type.capitalize()} of key: {key}, " \
f"size: {oper_size}, {status} in {time.monotonic() - start_time:.3f}s"
self.log.info(log_msg)

self.fetch_manager.stop()
self.log.debug("Quitting TransferAgent")
Expand Down Expand Up @@ -496,7 +485,9 @@ def handle_upload(self, site, key, file_to_transfer: UploadEvent):
metadata = file_to_transfer.metadata.copy()
if file_to_transfer.file_size is not None:
metadata["Content-Length"] = str(file_to_transfer.file_size)
upload_progress_fn = partial(self.upload_tracker.increment, file_key=key)
upload_progress_fn = partial(
self.upload_tracker.increment, file_key=key, file_size=file_to_transfer.file_size
)
storage.store_file_object(
key,
f,
Expand Down Expand Up @@ -539,10 +530,5 @@ def handle_upload(self, site, key, file_to_transfer: UploadEvent):

# Sleep for a bit to avoid busy looping. Increase sleep time if the op fails multiple times
self.sleep(min(0.5 * 2 ** (file_to_transfer.retry_number - 1), 20))
if file_to_transfer.file_type in (
FileType.Basebackup, FileType.Basebackup_chunk, FileType.Basebackup_delta, FileType.Basebackup_delta_chunk
) and file_to_transfer.retry_number < 2:
persisted_progress = PersistedProgress.read(metrics=self.metrics)
persisted_progress.reset(key, metrics=self.metrics)
self.transfer_queue.put(file_to_transfer)
return None
25 changes: 7 additions & 18 deletions test/test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,36 +92,25 @@ def test_json_serialization(self, tmpdir):
def test_persisted_progress(self, mocker, tmp_path):
test_progress_file = tmp_path / "test_progress.json"
original_time = 1625072042.123456
test_data = {
"progress": {
"0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b": {
"current_progress": 100,
"last_updated_time": original_time
}
}
}
test_data = {"progress": {"total_bytes_uploaded": {"current_progress": 100, "last_updated_time": original_time}}}

with open(test_progress_file, "w") as file:
json.dump(test_data, file)

mocker.patch("pghoard.common.PROGRESS_FILE", test_progress_file)
persisted_progress = PersistedProgress.read(metrics=metrics.Metrics(statsd={}))
assert "0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b" in persisted_progress.progress
assert persisted_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b"
].current_progress == 100
assert persisted_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b"
].last_updated_time == 1625072042.123456
assert "total_bytes_uploaded" in persisted_progress.progress
assert persisted_progress.progress["total_bytes_uploaded"].current_progress == 100
assert persisted_progress.progress["total_bytes_uploaded"].last_updated_time == 1625072042.123456

new_progress = 200
progress_info = persisted_progress.get("0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b")
progress_info = persisted_progress.get("total_bytes_uploaded")
progress_info.update(new_progress)
persisted_progress.write(metrics=metrics.Metrics(statsd={}))

updated_progress = PersistedProgress.read(metrics=metrics.Metrics(statsd={}))
assert updated_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b"
].current_progress == new_progress
assert updated_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b"
].last_updated_time > original_time
assert updated_progress.progress["total_bytes_uploaded"].current_progress == new_progress
assert updated_progress.progress["total_bytes_uploaded"].last_updated_time > original_time

def test_default_persisted_progress_creation(self, mocker, tmp_path):
tmp_file = tmp_path / "non_existent_progress.json"
Expand Down
2 changes: 1 addition & 1 deletion test/test_transferagent.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,6 @@ def test_handle_upload_with_persisted_progress(self, mocker, tmp_path):
self.transfer_agent.handle_upload("test_site", self.foo_basebackup_path, upload_event)
updated_progress = PersistedProgress.read(metrics=metrics.Metrics(statsd={}))
assert temp_progress_file.exists()
assert updated_progress.progress[self.foo_basebackup_path].current_progress == 3
assert updated_progress.progress["total_bytes_uploaded"].current_progress == 3
if temp_progress_file.exists():
temp_progress_file.unlink()

0 comments on commit 75a935c

Please sign in to comment.