From 9455fc1cd94562c73e59c6d6c60df4780b45b4bd Mon Sep 17 00:00:00 2001 From: Bo Wu Date: Tue, 3 Dec 2024 09:51:07 -0800 Subject: [PATCH] reduce cost --- testsuite/replay-verify/archive_disk_utils.py | 71 ++++++++++++------- testsuite/replay-verify/main.py | 31 +++++--- 2 files changed, 65 insertions(+), 37 deletions(-) diff --git a/testsuite/replay-verify/archive_disk_utils.py b/testsuite/replay-verify/archive_disk_utils.py index bb1e67a1d6060f..7607d044e927c5 100644 --- a/testsuite/replay-verify/archive_disk_utils.py +++ b/testsuite/replay-verify/archive_disk_utils.py @@ -22,6 +22,10 @@ MAINNET_SNAPSHOT_NAME = "mainnet-archive" +def get_region_from_zone(zone): + return zone.rsplit("-", 1)[0] + + def get_kubectl_credentials(project_id, region, cluster_name): try: # Command to get kubectl credentials for the cluster @@ -141,6 +145,8 @@ def create_snapshot_with_gcloud( source_disk_link, "--project", target_project, + "--storage-location", + get_region_from_zone(source_zone), ] try: @@ -156,6 +162,22 @@ def create_snapshot_with_gcloud( raise Exception(f"Error creating snapshot: {e}") +def delete_disk(disk_client, project, zone, disk_name): + # Check if the disk already exists + + try: + disk = disk_client.get(project=project, zone=zone, disk=disk_name) + logger.info(f"Disk {disk_name} already exists. Deleting it.") + # Delete the existing disk + operation = disk_client.delete(project=project, zone=zone, disk=disk_name) + wait_for_operation( + project, zone, operation.name, compute_v1.ZoneOperationsClient() + ) + logger.info(f"Disk {disk_name} deleted.") + except Exception as e: + logger.info(f"Disk {e} {disk_name} does not exist, no delete needed.") + + # Creating disk from import snapshots # require getting a hold of the kubectrl of the cluster # eg: gcloud container clusters get-credentials replay-on-archive --region us-central1 --project replay-verify @@ -172,19 +194,7 @@ def create_disk_pv_pvc_from_snapshot( ): disk_client = compute_v1.DisksClient() snapshot_client = compute_v1.SnapshotsClient() - - # Check if the disk already exists - try: - disk = disk_client.get(project=project, zone=zone, disk=disk_name) - logger.info(f"Disk {disk_name} already exists. Deleting it.") - # Delete the existing disk - operation = disk_client.delete(project=project, zone=zone, disk=disk_name) - wait_for_operation( - project, zone, operation.name, compute_v1.ZoneOperationsClient() - ) - logger.info(f"Disk {disk_name} deleted.") - except Exception as e: - logger.info(f"Disk {e} {disk_name} does not exist. Creating a new one.") + delete_disk(disk_client, project, zone, disk_name) # Create a new disk from the snapshot logger.info(f"Creating disk {disk_name} from snapshot {og_snapshot_name}.") @@ -199,14 +209,16 @@ def create_disk_pv_pvc_from_snapshot( wait_for_operation(project, zone, operation.name, compute_v1.ZoneOperationsClient()) logger.info(f"Disk {disk_name} created from snapshot {og_snapshot_name}.") - region_name = zone.rsplit("-", 1)[0] + region_name = get_region_from_zone(zone) get_kubectl_credentials(project, region_name, cluster_name) # create_persistent_volume(disk_name, pv_name, pvc_name, namespace, True) # this is only for xfs replaying logs to repair the disk repair_pv = f"{pv_name}-repair" repair_pvc = f"{pvc_name}-repair" repair_job_name = f"xfs-repair-{pvc_name}" - create_persistent_volume(disk_name, repair_pv, repair_pvc, namespace, False) + create_persistent_volume( + project, zone, disk_name, repair_pv, repair_pvc, namespace, False + ) # start a pod to mount the disk and run simple task with open("xfs-disk-repair.yaml", "r") as f: pod_manifest = yaml.safe_load(f) @@ -228,6 +240,9 @@ def create_disk_pv_pvc_from_snapshot( time.sleep(10) logger.info(f"creating final snapshot") create_snapshot_with_gcloud(snapshot_name, project, disk_name, zone, project) + logger.info("deleting repair pvc and correpsonding pv and disks") + # delete the disk used for repair + delete_disk(disk_client, project, zone, disk_name) def is_job_pod_cleanedup(namespace, job_name): @@ -255,7 +270,9 @@ def wait_for_operation(project, zone, operation_name, zone_operations_client): time.sleep(20) -def create_persistent_volume(disk_name, pv_name, pvc_name, namespace, read_only): +def create_persistent_volume( + project, zone, disk_name, pv_name, pvc_name, namespace, read_only +): config.load_kube_config() v1 = client.CoreV1Api() @@ -286,20 +303,22 @@ def create_persistent_volume(disk_name, pv_name, pvc_name, namespace, read_only) raise # Create PersistentVolume + volume_handle = f"projects/{project}/zones/{zone}/disks/{disk_name}" pv = client.V1PersistentVolume( api_version="v1", kind="PersistentVolume", metadata=client.V1ObjectMeta(name=pv_name), spec=client.V1PersistentVolumeSpec( capacity={"storage": "10000Gi"}, - access_modes=["ReadOnlyMany"], - gce_persistent_disk=client.V1GCEPersistentDiskVolumeSource( - pd_name=disk_name, + access_modes=["ReadWriteOnce"], + csi=client.V1CSIPersistentVolumeSource( + driver="pd.csi.storage.gke.io", + volume_handle=volume_handle, fs_type="xfs", read_only=read_only, ), - persistent_volume_reclaim_policy="Retain", - storage_class_name="standard", + persistent_volume_reclaim_policy="Retain", # this is to delete the PV and disk separately to speed up pv deletion + storage_class_name="ssd-data-xfs", ), ) @@ -309,9 +328,9 @@ def create_persistent_volume(disk_name, pv_name, pvc_name, namespace, read_only) kind="PersistentVolumeClaim", metadata=client.V1ObjectMeta(name=pvc_name, namespace=namespace), spec=client.V1PersistentVolumeClaimSpec( - access_modes=["ReadOnlyMany"], + access_modes=["ReadWriteOnce"], resources=client.V1ResourceRequirements(requests={"storage": "10000Gi"}), - storage_class_name="standard", + storage_class_name="ssd-data-xfs", volume_name=pv_name, ), ) @@ -427,7 +446,7 @@ def create_pvcs_from_snapshot(run_id, snapshot_name, namespace, pvc_num, label): return res -def create_disk_pv_pvc( +def create_repair_disk_and_its_snapshot( project, zone, cluster_name, og_snapshot_name, snapshot_name, prefix, namespace ): tasks = [] @@ -462,8 +481,6 @@ def create_disk_pv_pvc( except Exception as e: logger.error(f"Task generated an exception: {e}") - # start a self deleteing job to mount the xfs disks for repairing - def parse_args(): parser = argparse.ArgumentParser( @@ -506,7 +523,7 @@ def parse_args(): source_namespace, project_id, ) - create_disk_pv_pvc( + create_repair_disk_and_its_snapshot( project_id, zone, cluster_name, diff --git a/testsuite/replay-verify/main.py b/testsuite/replay-verify/main.py index e8b29e363b4ee5..e3cb4acc0b26d1 100644 --- a/testsuite/replay-verify/main.py +++ b/testsuite/replay-verify/main.py @@ -28,6 +28,7 @@ REPLAY_CONCURRENCY_LEVEL = 1 + class Network(Enum): TESTNET = 1 MAINNET = 2 @@ -241,6 +242,7 @@ def get_pod_status(self): def get_humio_log_link(self): return construct_humio_url(self.label, self.name, self.start_time, time.time()) + class ReplayConfig: def __init__(self, network): if network == Network.TESTNET: @@ -253,9 +255,10 @@ def __init__(self, network): self.concurrent_replayer = 18 self.pvc_number = 8 self.min_range_size = 10_000 - self.range_size = 2_000_000 + self.range_size = 2_000_000 self.timeout_secs = 400 + class TaskStats: def __init__(self, name): self.name = name @@ -308,7 +311,7 @@ def __init__( self.image = image self.pvcs = [] self.config = replay_config - + def __str__(self): return f"""ReplayScheduler: id: {self.id} @@ -360,7 +363,11 @@ def create_pvc_from_snapshot(self): else MAINNET_SNAPSHOT_NAME ) pvcs = create_pvcs_from_snapshot( - self.id, snapshot_name, self.namespace, self.config.pvc_number, self.get_label() + self.id, + snapshot_name, + self.namespace, + self.config.pvc_number, + self.get_label(), ) assert len(pvcs) == self.config.pvc_number, "failed to create all pvcs" self.pvcs = pvcs @@ -504,12 +511,16 @@ def get_image(image_tag=None): shell = forge.LocalShell() git = forge.Git(shell) image_name = "tools" - default_latest_image = forge.find_recent_images( - shell, - git, - 1, - image_name=image_name, - )[0] if image_tag is None else image_tag + default_latest_image = ( + forge.find_recent_images( + shell, + git, + 1, + image_name=image_name, + )[0] + if image_tag is None + else image_tag + ) full_image = f"{forge.GAR_REPO_NAME}/{image_name}:{default_latest_image}" return full_image @@ -546,7 +557,7 @@ def print_logs(failed_workpod_logs, txn_mismatch_logs): range_size=range_size, image=image, replay_config=config, - network= network, + network=network, namespace=args.namespace, ) logger.info(f"scheduler: {scheduler}")