Skip to content

Commit

Permalink
reduce cost
Browse files Browse the repository at this point in the history
  • Loading branch information
areshand committed Dec 3, 2024
1 parent fda6770 commit 9455fc1
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 37 deletions.
71 changes: 44 additions & 27 deletions testsuite/replay-verify/archive_disk_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
MAINNET_SNAPSHOT_NAME = "mainnet-archive"


def get_region_from_zone(zone):
return zone.rsplit("-", 1)[0]


def get_kubectl_credentials(project_id, region, cluster_name):
try:
# Command to get kubectl credentials for the cluster
Expand Down Expand Up @@ -141,6 +145,8 @@ def create_snapshot_with_gcloud(
source_disk_link,
"--project",
target_project,
"--storage-location",
get_region_from_zone(source_zone),
]

try:
Expand All @@ -156,6 +162,22 @@ def create_snapshot_with_gcloud(
raise Exception(f"Error creating snapshot: {e}")


def delete_disk(disk_client, project, zone, disk_name):
# Check if the disk already exists

try:
disk = disk_client.get(project=project, zone=zone, disk=disk_name)
logger.info(f"Disk {disk_name} already exists. Deleting it.")
# Delete the existing disk
operation = disk_client.delete(project=project, zone=zone, disk=disk_name)
wait_for_operation(
project, zone, operation.name, compute_v1.ZoneOperationsClient()
)
logger.info(f"Disk {disk_name} deleted.")
except Exception as e:
logger.info(f"Disk {e} {disk_name} does not exist, no delete needed.")


# Creating disk from import snapshots
# require getting a hold of the kubectrl of the cluster
# eg: gcloud container clusters get-credentials replay-on-archive --region us-central1 --project replay-verify
Expand All @@ -172,19 +194,7 @@ def create_disk_pv_pvc_from_snapshot(
):
disk_client = compute_v1.DisksClient()
snapshot_client = compute_v1.SnapshotsClient()

# Check if the disk already exists
try:
disk = disk_client.get(project=project, zone=zone, disk=disk_name)
logger.info(f"Disk {disk_name} already exists. Deleting it.")
# Delete the existing disk
operation = disk_client.delete(project=project, zone=zone, disk=disk_name)
wait_for_operation(
project, zone, operation.name, compute_v1.ZoneOperationsClient()
)
logger.info(f"Disk {disk_name} deleted.")
except Exception as e:
logger.info(f"Disk {e} {disk_name} does not exist. Creating a new one.")
delete_disk(disk_client, project, zone, disk_name)

# Create a new disk from the snapshot
logger.info(f"Creating disk {disk_name} from snapshot {og_snapshot_name}.")
Expand All @@ -199,14 +209,16 @@ def create_disk_pv_pvc_from_snapshot(
wait_for_operation(project, zone, operation.name, compute_v1.ZoneOperationsClient())
logger.info(f"Disk {disk_name} created from snapshot {og_snapshot_name}.")

region_name = zone.rsplit("-", 1)[0]
region_name = get_region_from_zone(zone)
get_kubectl_credentials(project, region_name, cluster_name)
# create_persistent_volume(disk_name, pv_name, pvc_name, namespace, True)
# this is only for xfs replaying logs to repair the disk
repair_pv = f"{pv_name}-repair"
repair_pvc = f"{pvc_name}-repair"
repair_job_name = f"xfs-repair-{pvc_name}"
create_persistent_volume(disk_name, repair_pv, repair_pvc, namespace, False)
create_persistent_volume(
project, zone, disk_name, repair_pv, repair_pvc, namespace, False
)
# start a pod to mount the disk and run simple task
with open("xfs-disk-repair.yaml", "r") as f:
pod_manifest = yaml.safe_load(f)
Expand All @@ -228,6 +240,9 @@ def create_disk_pv_pvc_from_snapshot(
time.sleep(10)
logger.info(f"creating final snapshot")
create_snapshot_with_gcloud(snapshot_name, project, disk_name, zone, project)
logger.info("deleting repair pvc and correpsonding pv and disks")
# delete the disk used for repair
delete_disk(disk_client, project, zone, disk_name)


def is_job_pod_cleanedup(namespace, job_name):
Expand Down Expand Up @@ -255,7 +270,9 @@ def wait_for_operation(project, zone, operation_name, zone_operations_client):
time.sleep(20)


def create_persistent_volume(disk_name, pv_name, pvc_name, namespace, read_only):
def create_persistent_volume(
project, zone, disk_name, pv_name, pvc_name, namespace, read_only
):
config.load_kube_config()
v1 = client.CoreV1Api()

Expand Down Expand Up @@ -286,20 +303,22 @@ def create_persistent_volume(disk_name, pv_name, pvc_name, namespace, read_only)
raise

# Create PersistentVolume
volume_handle = f"projects/{project}/zones/{zone}/disks/{disk_name}"
pv = client.V1PersistentVolume(
api_version="v1",
kind="PersistentVolume",
metadata=client.V1ObjectMeta(name=pv_name),
spec=client.V1PersistentVolumeSpec(
capacity={"storage": "10000Gi"},
access_modes=["ReadOnlyMany"],
gce_persistent_disk=client.V1GCEPersistentDiskVolumeSource(
pd_name=disk_name,
access_modes=["ReadWriteOnce"],
csi=client.V1CSIPersistentVolumeSource(
driver="pd.csi.storage.gke.io",
volume_handle=volume_handle,
fs_type="xfs",
read_only=read_only,
),
persistent_volume_reclaim_policy="Retain",
storage_class_name="standard",
persistent_volume_reclaim_policy="Retain", # this is to delete the PV and disk separately to speed up pv deletion
storage_class_name="ssd-data-xfs",
),
)

Expand All @@ -309,9 +328,9 @@ def create_persistent_volume(disk_name, pv_name, pvc_name, namespace, read_only)
kind="PersistentVolumeClaim",
metadata=client.V1ObjectMeta(name=pvc_name, namespace=namespace),
spec=client.V1PersistentVolumeClaimSpec(
access_modes=["ReadOnlyMany"],
access_modes=["ReadWriteOnce"],
resources=client.V1ResourceRequirements(requests={"storage": "10000Gi"}),
storage_class_name="standard",
storage_class_name="ssd-data-xfs",
volume_name=pv_name,
),
)
Expand Down Expand Up @@ -427,7 +446,7 @@ def create_pvcs_from_snapshot(run_id, snapshot_name, namespace, pvc_num, label):
return res


def create_disk_pv_pvc(
def create_repair_disk_and_its_snapshot(
project, zone, cluster_name, og_snapshot_name, snapshot_name, prefix, namespace
):
tasks = []
Expand Down Expand Up @@ -462,8 +481,6 @@ def create_disk_pv_pvc(
except Exception as e:
logger.error(f"Task generated an exception: {e}")

# start a self deleteing job to mount the xfs disks for repairing


def parse_args():
parser = argparse.ArgumentParser(
Expand Down Expand Up @@ -506,7 +523,7 @@ def parse_args():
source_namespace,
project_id,
)
create_disk_pv_pvc(
create_repair_disk_and_its_snapshot(
project_id,
zone,
cluster_name,
Expand Down
31 changes: 21 additions & 10 deletions testsuite/replay-verify/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

REPLAY_CONCURRENCY_LEVEL = 1


class Network(Enum):
TESTNET = 1
MAINNET = 2
Expand Down Expand Up @@ -241,6 +242,7 @@ def get_pod_status(self):
def get_humio_log_link(self):
return construct_humio_url(self.label, self.name, self.start_time, time.time())


class ReplayConfig:
def __init__(self, network):
if network == Network.TESTNET:
Expand All @@ -253,9 +255,10 @@ def __init__(self, network):
self.concurrent_replayer = 18
self.pvc_number = 8
self.min_range_size = 10_000
self.range_size = 2_000_000
self.range_size = 2_000_000
self.timeout_secs = 400


class TaskStats:
def __init__(self, name):
self.name = name
Expand Down Expand Up @@ -308,7 +311,7 @@ def __init__(
self.image = image
self.pvcs = []
self.config = replay_config

def __str__(self):
return f"""ReplayScheduler:
id: {self.id}
Expand Down Expand Up @@ -360,7 +363,11 @@ def create_pvc_from_snapshot(self):
else MAINNET_SNAPSHOT_NAME
)
pvcs = create_pvcs_from_snapshot(
self.id, snapshot_name, self.namespace, self.config.pvc_number, self.get_label()
self.id,
snapshot_name,
self.namespace,
self.config.pvc_number,
self.get_label(),
)
assert len(pvcs) == self.config.pvc_number, "failed to create all pvcs"
self.pvcs = pvcs
Expand Down Expand Up @@ -504,12 +511,16 @@ def get_image(image_tag=None):
shell = forge.LocalShell()
git = forge.Git(shell)
image_name = "tools"
default_latest_image = forge.find_recent_images(
shell,
git,
1,
image_name=image_name,
)[0] if image_tag is None else image_tag
default_latest_image = (
forge.find_recent_images(
shell,
git,
1,
image_name=image_name,
)[0]
if image_tag is None
else image_tag
)
full_image = f"{forge.GAR_REPO_NAME}/{image_name}:{default_latest_image}"
return full_image

Expand Down Expand Up @@ -546,7 +557,7 @@ def print_logs(failed_workpod_logs, txn_mismatch_logs):
range_size=range_size,
image=image,
replay_config=config,
network= network,
network=network,
namespace=args.namespace,
)
logger.info(f"scheduler: {scheduler}")
Expand Down

0 comments on commit 9455fc1

Please sign in to comment.