From 9d7b88d79713768d80c7b2441ffa70d4fae4730a Mon Sep 17 00:00:00 2001 From: Giacomo Alzetta Date: Mon, 4 Mar 2024 10:33:53 +0100 Subject: [PATCH] Use transfer pool to make code thread safe --- myhoard/restore_coordinator.py | 179 ++++++++++++++++----------------- 1 file changed, 87 insertions(+), 92 deletions(-) diff --git a/myhoard/restore_coordinator.py b/myhoard/restore_coordinator.py index a3241da..a86df71 100644 --- a/myhoard/restore_coordinator.py +++ b/myhoard/restore_coordinator.py @@ -30,8 +30,8 @@ ) from contextlib import suppress from pymysql import OperationalError -from rohmu import errors as rohmu_errors, get_transfer -from rohmu.object_storage.base import BaseTransfer +from rohmu import errors as rohmu_errors +from rohmu.transfer_pool import TransferPool from typing import Any, Dict, Iterable, List, Optional, Tuple, TypedDict import contextlib @@ -185,7 +185,7 @@ def __init__( # can be successfully restored. self.binlog_streams = binlog_streams self.current_file = None - self.file_storage: Optional[BaseTransfer] = None + self.file_storage_pool = TransferPool() self.file_storage_config = file_storage_config self.free_memory_percentage = free_memory_percentage self.is_running = True @@ -325,10 +325,6 @@ def run(self) -> None: while self.is_running: try: - if not self.file_storage: - self.log.info("Creating file storage accessor") - self.file_storage = get_transfer(self.file_storage_config) - if self.phase == self.Phase.getting_backup_info: self.get_backup_info() if self.phase == self.Phase.initiating_binlog_downloads: @@ -839,9 +835,9 @@ def _build_full_name(self, name: str) -> str: return f"{self.site}/{self.stream_id}/{name}" def _load_file_data(self, name, missing_ok=False): - assert self.file_storage try: - info_str, _ = self.file_storage.get_contents_to_string(self._build_full_name(name)) + with self.file_storage_pool.with_transfer(self.file_storage_config) as file_storage: + info_str, _ = file_storage.get_contents_to_string(self._build_full_name(name)) return json.loads(info_str) except rohmu_errors.FileNotFoundFromStorageError as ex: if not missing_ok: @@ -859,29 +855,28 @@ def _load_file_data(self, name, missing_ok=False): def _basebackup_data_provider(self, target_stream) -> None: name = self._build_full_name("basebackup.xbstream") compressed_size = self.state["basebackup_info"].get("compressed_size") - file_storage = get_transfer(self.file_storage_config) - - last_time = [time.monotonic()] - last_value = [0] - self.basebackup_bytes_downloaded = 0 - - def download_progress(progress, max_progress): - if progress and max_progress and compressed_size: - # progress may be the actual number of bytes or it may be percentages - self.basebackup_bytes_downloaded = int(compressed_size * progress / max_progress) - # Track both absolute number and explicitly calculated rate. The rate can be useful as - # a separate measurement because downloads are not ongoing all the time and calculating - # rate based on raw byte counter requires knowing when the operation started and ended - self.stats.gauge_int("myhoard.restore.basebackup_bytes_downloaded", self.basebackup_bytes_downloaded) - last_value[0], last_time[0] = track_rate( - current=self.basebackup_bytes_downloaded, - last_recorded=last_value[0], - last_recorded_time=last_time[0], - metric_name="myhoard.restore.basebackup_download_rate", - stats=self.stats, - ) + with self.file_storage_pool.with_transfer(self.file_storage_config) as file_storage: + last_time = [time.monotonic()] + last_value = [0] + self.basebackup_bytes_downloaded = 0 + + def download_progress(progress, max_progress): + if progress and max_progress and compressed_size: + # progress may be the actual number of bytes or it may be percentages + self.basebackup_bytes_downloaded = int(compressed_size * progress / max_progress) + # Track both absolute number and explicitly calculated rate. The rate can be useful as + # a separate measurement because downloads are not ongoing all the time and calculating + # rate based on raw byte counter requires knowing when the operation started and ended + self.stats.gauge_int("myhoard.restore.basebackup_bytes_downloaded", self.basebackup_bytes_downloaded) + last_value[0], last_time[0] = track_rate( + current=self.basebackup_bytes_downloaded, + last_recorded=last_value[0], + last_recorded_time=last_time[0], + metric_name="myhoard.restore.basebackup_download_rate", + stats=self.stats, + ) - file_storage.get_contents_to_fileobj(name, target_stream, progress_callback=download_progress) + file_storage.get_contents_to_fileobj(name, target_stream, progress_callback=download_progress) def _get_iteration_sleep(self) -> float: if self.phase in self.POLL_PHASES: @@ -909,53 +904,53 @@ def _list_binlogs_in_bucket( start_time = time.monotonic() target_time_reached_by_server = set() - assert self.file_storage self.log.debug("Listing binlogs in bucket %s", bucket) try: - list_iter = self.file_storage.list_iter(self._build_binlog_full_name(f"binlogs/{bucket}")) - for info in self._get_sorted_file_infos(list_iter): - binlog = parse_fs_metadata(info["metadata"]) - # We may be handling binlogs from multiple streams. To make the other logic work, calculate - # monotonically increasing index across all streams. (Individual streams have their indexes - # always start from 1.) - binlog["adjusted_remote_index"] = self.state["binlog_stream_offset"] + binlog["remote_index"] - binlog["remote_key"] = info["name"] - binlog["remote_size"] = info["size"] - highest_index = max(highest_index, binlog["remote_index"]) - if last_processed_index is not None and binlog["adjusted_remote_index"] <= last_processed_index: - continue - # We're handing binlogs in order. If we've reached target time for any earlier binlog then this - # binlog must be out of range as well. This check is needed because we might have binlogs without - # GTIDs that cannot be excluded based on start/end checks - if binlog["server_id"] in target_time_reached_by_server: - continue - if self.target_time and binlog["gtid_ranges"]: - if binlog["gtid_ranges"][0]["start_ts"] >= self.target_time: - # We exclude entries whose time matches recovery target time so any file whose start_ts - # is equal or higher than target time is certain not to contain data we're going to apply - self.log.info( - "Start time %s of binlog %s from server %s is after our target time %s, skipping", - binlog["gtid_ranges"][0]["start_ts"], - binlog["remote_index"], - binlog["server_id"], - self.target_time, - ) - target_time_reached_by_server.add(binlog["server_id"]) + with self.file_storage_pool.with_transfer(self.file_storage_config) as file_storage: + list_iter = file_storage.list_iter(self._build_binlog_full_name(f"binlogs/{bucket}")) + for info in self._get_sorted_file_infos(list_iter): + binlog = parse_fs_metadata(info["metadata"]) + # We may be handling binlogs from multiple streams. To make the other logic work, calculate + # monotonically increasing index across all streams. (Individual streams have their indexes + # always start from 1.) + binlog["adjusted_remote_index"] = self.state["binlog_stream_offset"] + binlog["remote_index"] + binlog["remote_key"] = info["name"] + binlog["remote_size"] = info["size"] + highest_index = max(highest_index, binlog["remote_index"]) + if last_processed_index is not None and binlog["adjusted_remote_index"] <= last_processed_index: continue - if binlog["gtid_ranges"][0]["end_ts"] >= self.target_time: - # Log and mark target time reached but include binlog and continue processing results. We may - # get binlogs from multiple servers in some race conditions and we don't yet know if this binlog - # was from a server that was actually valid at that point in time and some other server may have - # binlogs that are still relevant. - self.log.info( - "End time %s of binlog %s from server %s is at or after our target time %s, target time reached", - binlog["gtid_ranges"][0]["end_ts"], - binlog["remote_index"], - binlog["server_id"], - self.target_time, - ) - target_time_reached_by_server.add(binlog["server_id"]) - new_binlogs.append(binlog) + # We're handing binlogs in order. If we've reached target time for any earlier binlog then this + # binlog must be out of range as well. This check is needed because we might have binlogs without + # GTIDs that cannot be excluded based on start/end checks + if binlog["server_id"] in target_time_reached_by_server: + continue + if self.target_time and binlog["gtid_ranges"]: + if binlog["gtid_ranges"][0]["start_ts"] >= self.target_time: + # We exclude entries whose time matches recovery target time so any file whose start_ts + # is equal or higher than target time is certain not to contain data we're going to apply + self.log.info( + "Start time %s of binlog %s from server %s is after our target time %s, skipping", + binlog["gtid_ranges"][0]["start_ts"], + binlog["remote_index"], + binlog["server_id"], + self.target_time, + ) + target_time_reached_by_server.add(binlog["server_id"]) + continue + if binlog["gtid_ranges"][0]["end_ts"] >= self.target_time: + # Log and mark target time reached but include binlog and continue processing results. We may + # get binlogs from multiple servers in some race conditions and we don't yet know if this binlog + # was from a server that was actually valid at that point in time and some other server may have + # binlogs that are still relevant. + self.log.info( + "End time %s of binlog %s from server %s is at or after our target time %s, target time reached", + binlog["gtid_ranges"][0]["end_ts"], + binlog["remote_index"], + binlog["server_id"], + self.target_time, + ) + target_time_reached_by_server.add(binlog["server_id"]) + new_binlogs.append(binlog) except rohmu_errors.FileNotFoundFromStorageError: pass except Exception as ex: # pylint: disable=broad-except @@ -1012,25 +1007,25 @@ def _fetch_more_binlogs_infos_for_current_stream(self) -> None: # Also refresh promotions list so that we know which of the remote # binlogs are actually valid - assert self.file_storage promotions: Dict[int, Any] = {} try: - for info in self.file_storage.list_iter(self._build_binlog_full_name("promotions")): - # There could theoretically be multiple promotions with the same - # index value if new master got promoted but then failed before - # managing to upload any binlogs. To cope with that only keep one - # promotion info per server id (the one with most recent timestamp) - info = parse_fs_metadata(info["metadata"]) - existing = promotions.get(info["start_index"]) - if existing and info["promoted_at"] < existing["promoted_at"]: - continue - promotions[info["start_index"]] = info - self.log.info( - "server_id %s valid starting from %s (at %s)", - info["server_id"], - info["start_index"], - info["promoted_at"], - ) + with self.file_storage_pool.with_transfer(self.file_storage_config) as file_storage: + for info in file_storage.list_iter(self._build_binlog_full_name("promotions")): + # There could theoretically be multiple promotions with the same + # index value if new master got promoted but then failed before + # managing to upload any binlogs. To cope with that only keep one + # promotion info per server id (the one with most recent timestamp) + info = parse_fs_metadata(info["metadata"]) + existing = promotions.get(info["start_index"]) + if existing and info["promoted_at"] < existing["promoted_at"]: + continue + promotions[info["start_index"]] = info + self.log.info( + "server_id %s valid starting from %s (at %s)", + info["server_id"], + info["start_index"], + info["promoted_at"], + ) except Exception as ex: # pylint: disable=broad-except # There should always be one promotion file so file not found is real error too self.log.error("Failed to list promotions: %r", ex)