diff --git a/.github/workflows/provision-replay-verify-archive-disks.yaml b/.github/workflows/provision-replay-verify-archive-disks.yaml index 067c3dafbbc27..0cf64b7790c28 100644 --- a/.github/workflows/provision-replay-verify-archive-disks.yaml +++ b/.github/workflows/provision-replay-verify-archive-disks.yaml @@ -14,7 +14,7 @@ on: - ".github/workflows/provision-replay-verify-archive-disks.yaml" - ".github/workflows/workflow-run-replay-verify-archive-storage-provision.yaml" schedule: - - cron: "0 22 * * 1,3,5" # This runs every Mon,Wed,Fri + - cron: "0 8 * * 1,3,5" # This runs every Mon,Wed,Fri UTC 08:00 permissions: contents: read @@ -56,8 +56,7 @@ jobs: provision-mainnet: if: | github.event_name == 'schedule' || - github.event_name == 'push' || - github.event_name == 'workflow_dispatch' && (inputs.NETWORK == 'testnet' || inputs.NETWORK == 'all') + github.event_name == 'workflow_dispatch' && (inputs.NETWORK == 'mainnet' || inputs.NETWORK == 'all') needs: determine-test-metadata uses: ./.github/workflows/workflow-run-replay-verify-archive-storage-provision.yaml secrets: inherit diff --git a/.github/workflows/replay-verify-on-archive.yaml b/.github/workflows/replay-verify-on-archive.yaml index 3effdbc23f0e0..ffa58f9d52c97 100644 --- a/.github/workflows/replay-verify-on-archive.yaml +++ b/.github/workflows/replay-verify-on-archive.yaml @@ -31,7 +31,7 @@ on: - ".github/workflows/replay-verify-on-archive.yaml" - ".github/workflows/workflow-run-replay-verify-on-archive.yaml" schedule: - - cron: "0 22 * * 0,2,4" # The main branch cadence. This runs every Sun,Tues,Thurs + - cron: "0 8 * * 0,2,4" # The main branch cadence. This runs every Sun,Tues,Thurs UTC 08:00 permissions: contents: read diff --git a/.github/workflows/workflow-run-replay-verify-on-archive.yaml b/.github/workflows/workflow-run-replay-verify-on-archive.yaml index 8fb3954ebe45a..97102c8fda218 100644 --- a/.github/workflows/workflow-run-replay-verify-on-archive.yaml +++ b/.github/workflows/workflow-run-replay-verify-on-archive.yaml @@ -55,6 +55,7 @@ jobs: GCP_SERVICE_ACCOUNT_EMAIL: ${{ secrets.GCP_SERVICE_ACCOUNT_EMAIL }} EXPORT_GCP_PROJECT_VARIABLES: "false" GIT_CREDENTIALS: ${{ secrets.GIT_CREDENTIALS }} + GCP_AUTH_DURATION: "10800" # Authenticate to Google Cloud the project is aptos-ci with credentails files generated - name: Authenticate to Google Cloud @@ -100,11 +101,12 @@ jobs: CMD="$CMD --end ${{ inputs.END_VERSION }}" fi - if [ -n "${{ inputs.IMAGE_TAG }}" ]; then - CMD="$CMD --end ${{ inputs.IMAGE_TAG }}" + if [ -n "${{ inputs.IMAGE_TAG }}" ]; then + CMD="$CMD --image_tag ${{ inputs.IMAGE_TAG }}" fi eval $CMD + timeout-minutes: 120 # This is in case user manually cancel the step above, we still want to cleanup the resources - name: Post-run cleanup env: @@ -112,8 +114,29 @@ jobs: if: ${{ always() }} run: | cd testsuite/replay-verify - poetry run python main.py --network ${{ inputs.NETWORK }} --cleanup + CMD="poetry run python main.py --network ${{ inputs.NETWORK }} --cleanup" + if [ -n "${{ inputs.IMAGE_TAG }}" ]; then + CMD="$CMD --image_tag ${{ inputs.IMAGE_TAG }}" + fi + eval $CMD echo "Cleanup completed" + # List all disks in the project that are not in use and finished creating. There is a rare chance that the disk is being created and won't be used in future due to csi retry errors + # But this disk will be deleted in the next workflow run since its status is READY then + - name: Delete all unsed disks in the project + env: + GOOGLE_CLOUD_PROJECT: aptos-devinfra-0 + if: ${{ always() }} + run: | + + DISK_URIS=$(gcloud compute disks list --filter="-users:* AND status=READY" --format "value(uri())") + echo "Disks to be deleted:" + echo "$DISK_URIS" + + if [ -n "$DISK_URIS" ]; then + gcloud compute disks delete $DISK_URIS + else + echo "No unused disks found." + fi diff --git a/testsuite/replay-verify/archive_disk_utils.py b/testsuite/replay-verify/archive_disk_utils.py index 14d2194c2309f..b6ddf2379d55a 100644 --- a/testsuite/replay-verify/archive_disk_utils.py +++ b/testsuite/replay-verify/archive_disk_utils.py @@ -1,12 +1,17 @@ import argparse from google.cloud import compute_v1 from kubernetes import client, config -import time import logging import concurrent.futures import time import yaml from kubernetes.client.rest import ApiException +from tenacity import ( + retry, + stop_after_attempt, + wait_exponential, + retry_if_exception_type, +) # Constants @@ -21,6 +26,12 @@ TESTNET_SNAPSHOT_NAME = "testnet-archive" MAINNET_SNAPSHOT_NAME = "mainnet-archive" +PROJECT = "aptos-devinfra-0" +REGION = "us-central1" +CLUSTER_NAME = "devinfra-usce1-0" +NAMESPACE = "replay-verify" +ZONE = "us-central1-a" + def get_region_from_zone(zone): return zone.rsplit("-", 1)[0] @@ -126,7 +137,7 @@ def create_snapshot_with_gcloud( delete_operation = snapshot_client.delete( project=target_project, snapshot=snapshot_name ) - del_res = delete_operation.result() + del_res = delete_operation.result(timeout=1800) logger.info(f"Snapshot {snapshot_name} {del_res}.") except Exception as e: logger.info( @@ -178,10 +189,45 @@ def delete_disk(disk_client, project, zone, disk_name): logger.info(f"Disk {e} {disk_name} does not exist, no delete needed.") +def generate_disk_name(run_id, snapshot_name, pvc_id): + return f"{snapshot_name}-{run_id}-{pvc_id}" + + +@retry( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=4, max=10), + retry=retry_if_exception_type((ApiException, Exception)), + before_sleep=lambda retry_state: logger.warning( + f"Retrying initial disk creation after error: {retry_state.outcome.exception()}" + ), +) +def create_disk_from_snapshot( + disk_client, snapshot_client, project, zone, snapshot_name, disk_name +): + start_time = time.time() + delete_disk(disk_client, project, zone, disk_name) + + # Create a new disk from the snapshot + logger.info(f"Creating disk {disk_name} from snapshot {snapshot_name}.") + snapshot = snapshot_client.get(project=project, snapshot=snapshot_name) + disk_body = compute_v1.Disk( + name=disk_name, + source_snapshot=snapshot.self_link, + type=f"projects/{project}/zones/{zone}/diskTypes/pd-ssd", + ) + + operation = disk_client.insert(project=project, zone=zone, disk_resource=disk_body) + wait_for_operation(project, zone, operation.name, compute_v1.ZoneOperationsClient()) + duration = time.time() - start_time + logger.info( + f"Disk {disk_name} created from snapshot {snapshot_name} with {duration}." + ) + + # Creating disk from import snapshots # require getting a hold of the kubectrl of the cluster # eg: gcloud container clusters get-credentials replay-on-archive --region us-central1 --project replay-verify -def create_disk_pv_pvc_from_snapshot( +def create_final_snapshot( project, zone, cluster_name, @@ -194,21 +240,14 @@ def create_disk_pv_pvc_from_snapshot( ): disk_client = compute_v1.DisksClient() snapshot_client = compute_v1.SnapshotsClient() - delete_disk(disk_client, project, zone, disk_name) - - # Create a new disk from the snapshot - logger.info(f"Creating disk {disk_name} from snapshot {og_snapshot_name}.") - snapshot = snapshot_client.get(project=project, snapshot=og_snapshot_name) - disk_body = compute_v1.Disk( - name=disk_name, - source_snapshot=snapshot.self_link, - type=f"projects/{project}/zones/{zone}/diskTypes/pd-ssd", + create_disk_from_snapshot( + disk_client, + snapshot_client, + project, + zone, + og_snapshot_name, + disk_name, ) - - operation = disk_client.insert(project=project, zone=zone, disk_resource=disk_body) - wait_for_operation(project, zone, operation.name, compute_v1.ZoneOperationsClient()) - logger.info(f"Disk {disk_name} created from snapshot {og_snapshot_name}.") - region_name = get_region_from_zone(zone) get_kubectl_credentials(project, region_name, cluster_name) # create_persistent_volume(disk_name, pv_name, pvc_name, namespace, True) @@ -243,6 +282,8 @@ def create_disk_pv_pvc_from_snapshot( logger.info("deleting repair disks") # delete the disk used for repair delete_disk(disk_client, project, zone, disk_name) + # delete the pv and pvc + delete_pv_and_pvc(repair_pv, repair_pvc, namespace) def is_job_pod_cleanedup(namespace, job_name): @@ -250,54 +291,85 @@ def is_job_pod_cleanedup(namespace, job_name): v1 = client.BatchV1Api() try: job = v1.read_namespaced_job(job_name, namespace) + # Check if job has timed out (active for too long) + if job.status.start_time: + job_duration = time.time() - job.status.start_time.timestamp() + if job_duration > 240: + logger.error( + f"Job {job_name} has been running for over {job_duration:.0f} seconds" + ) + return True return False - except Exception as e: + except client.exceptions.ApiException as e: + if e.status != 404: + logger.error(f"Error checking job {job_name}: {e}") return True def wait_for_operation(project, zone, operation_name, zone_operations_client): + start_time = time.time() + timeout = 3600 # 1 hour timeout + while True: + if time.time() - start_time > timeout: + raise TimeoutError( + f"Operation {operation_name} timed out after {timeout} seconds" + ) + result = zone_operations_client.get( project=project, zone=zone, operation=operation_name ) logger.info(f"Waiting for operation {operation_name} {result}") if result.status == compute_v1.Operation.Status.DONE: - if "error" in result: + if hasattr(result, "error") and result.error: raise Exception(result.error) return result time.sleep(20) +def delete_pv_and_pvc(pv_name, pvc_name, namespace): + config.load_kube_config() + v1 = client.CoreV1Api() + try: + v1.delete_namespaced_persistent_volume_claim(name=pvc_name, namespace=namespace) + logger.info(f"PVC {pvc_name} deleted.") + except client.exceptions.ApiException as e: + if e.status != 404: + logger.error(f"Error deleting PVC {pvc_name}: {e}") + try: + v1.delete_persistent_volume(name=pv_name) + logger.info(f"PV {pv_name} deleted.") + except client.exceptions.ApiException as e: + if e.status != 404: + logger.error(f"Error deleting PV {pv_name}: {e}") + + def create_persistent_volume( - project, zone, disk_name, pv_name, pvc_name, namespace, read_only + project, zone, disk_name, pv_name, pvc_name, namespace, read_only, label="" ): config.load_kube_config() v1 = client.CoreV1Api() + access_mode = "ReadWriteOnce" if not read_only else "ReadOnlyMany" + storage_size = "10Ti" if TESTNET_SNAPSHOT_NAME in disk_name else "8Ti" # Delete existing PVC if it exists try: - existing_pvc = v1.read_namespaced_persistent_volume_claim( - name=pvc_name, namespace=namespace - ) - if existing_pvc: - logger.info(f"PVC {pvc_name} already exists. Deleting it.") - v1.delete_namespaced_persistent_volume_claim( - name=pvc_name, namespace=namespace - ) - logger.info(f"PVC {pvc_name} deleted.") + v1.read_namespaced_persistent_volume_claim(name=pvc_name, namespace=namespace) + logger.info(f"PVC {pvc_name} already exists. Deleting it.") + v1.delete_namespaced_persistent_volume_claim(name=pvc_name, namespace=namespace) + logger.info(f"PVC {pvc_name} deleted.") except client.exceptions.ApiException as e: if e.status != 404: raise # Delete existing PV if it exists try: - existing_pv = v1.read_persistent_volume(name=pv_name) - if existing_pv: - logger.info(f"PV {pv_name} already exists. Deleting it.") - v1.delete_persistent_volume(name=pv_name) - logger.info(f"PV {pv_name} deleted.") + v1.read_persistent_volume(name=pv_name) + logger.info(f"PV {pv_name} already exists. Deleting it.") + v1.delete_persistent_volume(name=pv_name) + logger.info(f"PV {pv_name} deleted.") except client.exceptions.ApiException as e: if e.status != 404: raise @@ -307,17 +379,23 @@ def create_persistent_volume( pv = client.V1PersistentVolume( api_version="v1", kind="PersistentVolume", - metadata=client.V1ObjectMeta(name=pv_name), + metadata=client.V1ObjectMeta( + name=pv_name, + labels={ + "run": f"{label}", + "topology.kubernetes.io/zone": zone, # Add zone label to PV + }, + ), spec=client.V1PersistentVolumeSpec( - capacity={"storage": "10000Gi"}, - access_modes=["ReadWriteOnce"], + capacity={"storage": storage_size}, + access_modes=[access_mode], csi=client.V1CSIPersistentVolumeSource( driver="pd.csi.storage.gke.io", volume_handle=volume_handle, fs_type="xfs", read_only=read_only, ), - persistent_volume_reclaim_policy="Retain", # this is to delete the PV and disk separately to speed up pv deletion + persistent_volume_reclaim_policy="Delete", storage_class_name="ssd-data-xfs", ), ) @@ -326,12 +404,16 @@ def create_persistent_volume( pvc = client.V1PersistentVolumeClaim( api_version="v1", kind="PersistentVolumeClaim", - metadata=client.V1ObjectMeta(name=pvc_name, namespace=namespace), + metadata=client.V1ObjectMeta( + name=pvc_name, + namespace=namespace, + ), spec=client.V1PersistentVolumeClaimSpec( - access_modes=["ReadWriteOnce"], - resources=client.V1ResourceRequirements(requests={"storage": "10000Gi"}), + access_modes=[access_mode], + resources=client.V1ResourceRequirements(requests={"storage": storage_size}), storage_class_name="ssd-data-xfs", volume_name=pv_name, + # Remove the selector since we're using volume_name for direct binding ), ) @@ -339,6 +421,50 @@ def create_persistent_volume( v1.create_namespaced_persistent_volume_claim(namespace=namespace, body=pvc) +def create_repair_disk_and_its_snapshot( + project, zone, cluster_name, og_snapshot_name, snapshot_name, prefix, namespace +): + tasks = [] + + for copy in range(DISK_COPIES): + disk_name = f"{prefix}-repair-{copy}" + pv_name = f"{prefix}-{copy}" + pvc_name = f"{prefix}-claim-{copy}" + tasks.append( + ( + project, + zone, + cluster_name, + og_snapshot_name, + snapshot_name, + disk_name, + pv_name, + pvc_name, + namespace, + ) + ) + + # Execute tasks in parallel + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = [executor.submit(create_final_snapshot, *task) for task in tasks] + for future in concurrent.futures.as_completed(futures, timeout=3600): + try: + result = future.result() + logger.info(f"Task result: {result}") + except Exception as e: + logger.error(f"Task generated an exception: {e}") + + +def parse_args(): + parser = argparse.ArgumentParser( + formatter_class=argparse.RawDescriptionHelpFormatter, + description=__doc__, + ) + parser.add_argument("--network", required=True, choices=["testnet", "mainnet"]) + args = parser.parse_args() + return args + + def create_one_pvc_from_snapshot(pvc_name, snapshot_name, namespace, label): config.load_kube_config() api_instance = client.CoreV1Api() @@ -373,9 +499,12 @@ def create_one_pvc_from_snapshot(pvc_name, snapshot_name, namespace, label): return pvc_name -def create_pvcs_from_snapshot(run_id, snapshot_name, namespace, pvc_num, label): +def create_replay_verify_pvcs_from_snapshot( + run_id, snapshot_name, namespace, pvc_num, label +): config.load_kube_config() api_instance = client.CustomObjectsApi() + volume_snapshot_content = { "apiVersion": "snapshot.storage.k8s.io/v1", "kind": "VolumeSnapshotContent", @@ -426,9 +555,13 @@ def create_pvcs_from_snapshot(run_id, snapshot_name, namespace, pvc_num, label): if e.status != 409: logger.error(f"Error creating new volumesnapshots: {e}") - # Execute tasks in parallel tasks = [ - (f"{run_id}-{snapshot_name}-{pvc_id}", snapshot_name, namespace, label) + ( + generate_disk_name(run_id, snapshot_name, pvc_id), + snapshot_name, + namespace, + label, + ) for pvc_id in range(pvc_num) ] res = [] @@ -446,62 +579,16 @@ def create_pvcs_from_snapshot(run_id, snapshot_name, namespace, pvc_num, label): return res -def create_repair_disk_and_its_snapshot( - project, zone, cluster_name, og_snapshot_name, snapshot_name, prefix, namespace -): - tasks = [] - - for copy in range(DISK_COPIES): - disk_name = f"{prefix}-{copy}" - pv_name = f"{prefix}-{copy}" - pvc_name = f"{prefix}-claim-{copy}" - tasks.append( - ( - project, - zone, - cluster_name, - og_snapshot_name, - snapshot_name, - disk_name, - pv_name, - pvc_name, - namespace, - ) - ) - - # Execute tasks in parallel - with concurrent.futures.ThreadPoolExecutor() as executor: - futures = [ - executor.submit(create_disk_pv_pvc_from_snapshot, *task) for task in tasks - ] - for future in concurrent.futures.as_completed(futures): - try: - result = future.result() - logger.info(f"Task result: {result}") - except Exception as e: - logger.error(f"Task generated an exception: {e}") - - -def parse_args(): - parser = argparse.ArgumentParser( - formatter_class=argparse.RawDescriptionHelpFormatter, - description=__doc__, - ) - parser.add_argument("--network", required=True, choices=["testnet", "mainnet"]) - args = parser.parse_args() - return args - - if __name__ == "__main__": # check input arg network args = parse_args() network = args.network source_project_id = "aptos-platform-compute-0" - region = "us-central1" - project_id = "aptos-devinfra-0" - target_namespace = "default" - zone = "us-central1-a" - cluster_name = "devinfra-usce1-0" + region = REGION + project_id = PROJECT + target_namespace = NAMESPACE + zone = ZONE + cluster_name = CLUSTER_NAME if network == "testnet": source_cluster_id = "general-usce1-0" diff --git a/testsuite/replay-verify/main.py b/testsuite/replay-verify/main.py index 771c9417d7976..2d661dee11366 100644 --- a/testsuite/replay-verify/main.py +++ b/testsuite/replay-verify/main.py @@ -17,7 +17,7 @@ from archive_disk_utils import ( TESTNET_SNAPSHOT_NAME, MAINNET_SNAPSHOT_NAME, - create_pvcs_from_snapshot, + create_replay_verify_pvcs_from_snapshot, get_kubectl_credentials, ) @@ -362,7 +362,7 @@ def create_pvc_from_snapshot(self): if self.network == Network.TESTNET else MAINNET_SNAPSHOT_NAME ) - pvcs = create_pvcs_from_snapshot( + pvcs = create_replay_verify_pvcs_from_snapshot( self.id, snapshot_name, self.namespace, @@ -500,7 +500,9 @@ def parse_args(): parser.add_argument("--end", required=False, type=int) parser.add_argument("--worker_cnt", required=False, type=int) parser.add_argument("--range_size", required=False, type=int) - parser.add_argument("--namespace", required=False, type=str, default="default") + parser.add_argument( + "--namespace", required=False, type=str, default="replay-verify" + ) parser.add_argument("--image_tag", required=False, type=str) parser.add_argument("--cleanup", required=False, action="store_true", default=False) args = parser.parse_args() @@ -548,6 +550,16 @@ def print_logs(failed_workpod_logs, txn_mismatch_logs): config = ReplayConfig(network) worker_cnt = args.worker_cnt if args.worker_cnt else config.pvc_number * 7 range_size = args.range_size if args.range_size else config.range_size + + if args.start is not None: + assert ( + args.start >= start + ), f"start version {args.start} is out of range {start} - {end}" + if args.end is not None: + assert ( + args.end <= end + ), f"end version {args.end} is out of range {start} - {end}" + scheduler = ReplayScheduler( run_id, start if args.start is None else args.start, @@ -572,9 +584,9 @@ def print_logs(failed_workpod_logs, txn_mismatch_logs): (failed_logs, txn_mismatch_logs) = scheduler.collect_all_failed_logs() scheduler.print_stats() print_logs(failed_logs, txn_mismatch_logs) - if txn_mismatch_logs: - logger.error("Transaction mismatch logs found.") - exit(1) + if txn_mismatch_logs: + logger.error("Transaction mismatch logs found.") + exit(1) finally: scheduler.cleanup() diff --git a/testsuite/replay-verify/poetry.lock b/testsuite/replay-verify/poetry.lock index a383c32e1a31e..3267b3492cd25 100644 --- a/testsuite/replay-verify/poetry.lock +++ b/testsuite/replay-verify/poetry.lock @@ -852,6 +852,21 @@ files = [ {file = "six-1.16.0.tar.gz", hash = "sha256:1e61c37477a1626458e36f7b1d82aa5c9b094fa4802892072e49de9c60c4c926"}, ] +[[package]] +name = "tenacity" +version = "9.0.0" +description = "Retry code until it succeeds" +optional = false +python-versions = ">=3.8" +files = [ + {file = "tenacity-9.0.0-py3-none-any.whl", hash = "sha256:93de0c98785b27fcf659856aa9f54bfbd399e29969b0621bc7f762bd441b4539"}, + {file = "tenacity-9.0.0.tar.gz", hash = "sha256:807f37ca97d62aa361264d497b0e31e92b8027044942bfa756160d908320d73b"}, +] + +[package.extras] +doc = ["reno", "sphinx"] +test = ["pytest", "tornado (>=4.5)", "typeguard"] + [[package]] name = "tomli" version = "2.1.0" @@ -910,4 +925,4 @@ test = ["websockets"] [metadata] lock-version = "2.0" python-versions = "^3.10" -content-hash = "1122681d83360921d2b63f007b65c3436823b10b3d579d0fec4cf8b5e07fed59" +content-hash = "6323a0fd0a9e990e2cb17997bb2a9689930e3465c9a392397ef012e243222820" diff --git a/testsuite/replay-verify/pyproject.toml b/testsuite/replay-verify/pyproject.toml index 63b0bac43b699..e893e923c0b19 100644 --- a/testsuite/replay-verify/pyproject.toml +++ b/testsuite/replay-verify/pyproject.toml @@ -12,6 +12,7 @@ google-cloud-compute = "^1.20.1" google-cloud-container = "^2.53.0" google-cloud-storage = "^2.18.2" psutil = "^6.1.0" +tenacity = "^9.0.0" [tool.poetry.group.dev.dependencies] diff --git a/testsuite/replay-verify/xfs-disk-repair.yaml b/testsuite/replay-verify/xfs-disk-repair.yaml index e07427cc56bdc..1b571b7443022 100644 --- a/testsuite/replay-verify/xfs-disk-repair.yaml +++ b/testsuite/replay-verify/xfs-disk-repair.yaml @@ -3,7 +3,7 @@ kind: Job metadata: name: self-deleting-job spec: - ttlSecondsAfterFinished: 60 + ttlSecondsAfterFinished: 60 template: metadata: name: self-deleting-job @@ -11,7 +11,7 @@ spec: containers: - name: self-deleting-container image: gcr.io/google.com/cloudsdktool/google-cloud-cli:latest - command: ["sh", "-c", "ls /mnt/* && sleep 10"] + command: ["sh", "-c", "ls /mnt/* && sync && sleep 10"] volumeMounts: - name: my-volume mountPath: /mnt