Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Consensus Observer] Debug test block. #15170

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .config/nextest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ status-level = "skip"
failure-output = "immediate-final"
# Cancel test run on the first failure. Accounts for retries.
fail-fast = true
# Mark each test as requiring 3 threads. This avoids starvation (e.g., due to heavy tests that spawn additional threads).
threads-required = 3

junit = { path = "junit.xml" }

Expand Down
126 changes: 82 additions & 44 deletions consensus/src/consensus_observer/observer/payload_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,59 +297,97 @@ mod test {
PeerId,
};
use claims::assert_matches;
use std::time::Duration;
use tokio::time::timeout;

#[tokio::test]
async fn test_all_payloads_exist() {
// Create the test tasks
let mut test_tasks = vec![];
for i in 0..100 {
let test_task = async move {
println!("Starting test_all_payloads_exist run: {:?}", i);

// Create the consensus observer config
let max_num_pending_blocks = 1000;
let consensus_observer_config = ConsensusObserverConfig {
max_num_pending_blocks,
..ConsensusObserverConfig::default()
};

// Create a new block payload store
let mut block_payload_store = BlockPayloadStore::new(consensus_observer_config);
println!("Created block payload store!");

// Add some unverified blocks to the payload store
let num_blocks_in_store = 100;
let unverified_blocks = create_and_add_blocks_to_store(
&mut block_payload_store,
num_blocks_in_store,
1,
false,
);
println!("Added unverified blocks to the store!");

// Verify the payloads don't exist in the block payload store
assert!(!block_payload_store.all_payloads_exist(&unverified_blocks));
assert_eq!(get_num_verified_payloads(&block_payload_store), 0);
assert_eq!(
get_num_unverified_payloads(&block_payload_store),
num_blocks_in_store
);
println!("Verified unverified blocks don't exist in the store!");

#[test]
fn test_all_payloads_exist() {
// Create the consensus observer config
let max_num_pending_blocks = 1000;
let consensus_observer_config = ConsensusObserverConfig {
max_num_pending_blocks,
..ConsensusObserverConfig::default()
};

// Create a new block payload store
let mut block_payload_store = BlockPayloadStore::new(consensus_observer_config);

// Add some unverified blocks to the payload store
let num_blocks_in_store = 100;
let unverified_blocks =
create_and_add_blocks_to_store(&mut block_payload_store, num_blocks_in_store, 1, false);

// Verify the payloads don't exist in the block payload store
assert!(!block_payload_store.all_payloads_exist(&unverified_blocks));
assert_eq!(get_num_verified_payloads(&block_payload_store), 0);
assert_eq!(
get_num_unverified_payloads(&block_payload_store),
num_blocks_in_store
);
// Add some verified blocks to the payload store
let num_blocks_in_store = 100;
let verified_blocks = create_and_add_blocks_to_store(
&mut block_payload_store,
num_blocks_in_store,
0,
true,
);
println!("Added verified blocks to the store!");

// Add some verified blocks to the payload store
let num_blocks_in_store = 100;
let verified_blocks =
create_and_add_blocks_to_store(&mut block_payload_store, num_blocks_in_store, 0, true);
// Check that all the payloads exist in the block payload store
assert!(block_payload_store.all_payloads_exist(&verified_blocks));
println!("Verified all verified blocks exist in the store!");

// Check that all the payloads exist in the block payload store
assert!(block_payload_store.all_payloads_exist(&verified_blocks));
// Check that a subset of the payloads exist in the block payload store
let subset_verified_blocks = &verified_blocks[0..50];
assert!(block_payload_store.all_payloads_exist(subset_verified_blocks));
println!("Verified a subset of verified blocks exist in the store!");

// Check that a subset of the payloads exist in the block payload store
let subset_verified_blocks = &verified_blocks[0..50];
assert!(block_payload_store.all_payloads_exist(subset_verified_blocks));
// Remove some of the payloads from the block payload store
block_payload_store.remove_committed_blocks(subset_verified_blocks);
println!("Removed a subset of verified blocks from the store!");

// Remove some of the payloads from the block payload store
block_payload_store.remove_committed_blocks(subset_verified_blocks);
// Check that the payloads no longer exist in the block payload store
assert!(!block_payload_store.all_payloads_exist(subset_verified_blocks));
println!("Verified a subset of verified blocks no longer exist in the store!");

// Check that the payloads no longer exist in the block payload store
assert!(!block_payload_store.all_payloads_exist(subset_verified_blocks));
// Check that the remaining payloads still exist in the block payload store
let subset_verified_blocks = &verified_blocks[50..100];
assert!(block_payload_store.all_payloads_exist(subset_verified_blocks));
println!("Verified the remaining verified blocks still exist in the store!");

// Check that the remaining payloads still exist in the block payload store
let subset_verified_blocks = &verified_blocks[50..100];
assert!(block_payload_store.all_payloads_exist(subset_verified_blocks));
// Remove the remaining payloads from the block payload store
block_payload_store.remove_committed_blocks(subset_verified_blocks);
println!("Removed the remaining verified blocks from the store!");

// Remove the remaining payloads from the block payload store
block_payload_store.remove_committed_blocks(subset_verified_blocks);
// Check that the payloads no longer exist in the block payload store
assert!(!block_payload_store.all_payloads_exist(subset_verified_blocks));
println!("Verified the remaining verified blocks no longer exist in the store!");
};
test_tasks.push(test_task);
}

// Check that the payloads no longer exist in the block payload store
assert!(!block_payload_store.all_payloads_exist(subset_verified_blocks));
// Run the test tasks with a timeout
for test_task in test_tasks {
match timeout(Duration::from_secs(10), test_task).await {
Ok(()) => println!("Test completed successfully!"),
error => panic!("Test timed-out with error: {:?}", error),
}
}
}

#[test]
Expand Down
2 changes: 1 addition & 1 deletion devtools/aptos-cargo-cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ fn run_targeted_compiler_v2_tests(
Ok(())
}

/// Runs the targeted unit tests
/// Runs the targeted unit tests!
fn run_targeted_unit_tests(
packages_to_test: Vec<String>,
mut direct_args: Vec<String>,
Expand Down
3 changes: 3 additions & 0 deletions storage/db-tool/src/replay_on_archive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,9 @@ impl Verifier {
expected_epoch_events: &Vec<Vec<ContractEvent>>,
expected_epoch_writesets: &Vec<WriteSet>,
) -> Result<Vec<Error>> {
if cur_txns.is_empty() {
return Ok(Vec::new());
}
let executed_outputs = AptosVM::execute_block_no_limit(
cur_txns
.iter()
Expand Down
12 changes: 12 additions & 0 deletions testsuite/replay-verify/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
This script orchestrates the replay and verification of blockchain data using Kubernetes pods. It defines a WorkerPod class to manage individual pods, handling their status, logs, and environment variables. The ReplayScheduler class schedules tasks for these pods, ensuring they run sequentially while managing retries, logging, and error handling. It supports scheduling from specific blockchain versions, skipping defined ranges, and collecting logs from failed or mismatched transactions. The script uses Kubernetes API for pod management and includes configurable hyperparameters for sharding, retries, concurrency, and delays. The main function initializes the scheduler and starts the scheduling process from scratch.

## Prerequiste
Install minikube

## Local test
minikube start --mount --mount-string="/mnt/testnet_archive:/mnt/testnet_archive" --memory=81920 --cpus=17
minikb apply -f ./testnet-archive.yaml

poetry shell
poetry install # install kubenetes
poetry run
153 changes: 153 additions & 0 deletions testsuite/replay-verify/archive_disk_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
import google.auth
from google.cloud import compute_v1
from kubernetes import client, config
import time
import logging
import concurrent.futures

# Constants
DISK_COPIES = 2

# Logging configuration
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Creating snapshot from archive node disk
def create_snapshot_from_disk(project, zone, disk_name, snapshot_name):
# TODO: Implement this function, this requires permission to write to the archive node
# Example command: gcloud compute snapshots create testnet-archive --source-disk https://www.googleapis.com/compute/v1/projects/aptos-bowu-playground/zones/us-central1-a/disks/testnet-archive --project replay-verify
pass

# Creating disk from import snapshots
# require getting a hold of the kubectrl of the cluster
# eg: gcloud container clusters get-credentials replay-on-archive --region us-central1 --project replay-verify
def create_disk_pv_pvc_from_snapshot(project, zone, snapshot_name, disk_name, pv_name, pvc_name, namespace):
disk_client = compute_v1.DisksClient()
snapshot_client = compute_v1.SnapshotsClient()

# Check if the disk already exists
try:
disk = disk_client.get(project=project, zone=zone, disk=disk_name)
logger.info(f"Disk {disk_name} already exists. Deleting it.")

# Delete the existing disk
operation = disk_client.delete(project=project, zone=zone, disk=disk_name)
wait_for_operation(project, zone, operation.name, compute_v1.ZoneOperationsClient())
logger.info(f"Disk {disk_name} deleted.")
except Exception as e:
logger.info(f"Disk {disk_name} does not exist. Creating a new one.")

# Create a new disk from the snapshot
snapshot = snapshot_client.get(project=project, snapshot=snapshot_name)
disk_body = compute_v1.Disk(
name=disk_name,
source_snapshot=snapshot.self_link,
type=f"projects/{project}/zones/{zone}/diskTypes/pd-standard"
)

operation = disk_client.insert(project=project, zone=zone, disk_resource=disk_body)
wait_for_operation(project, zone, operation.name, compute_v1.ZoneOperationsClient())
logger.info(f"Disk {disk_name} created from snapshot {snapshot_name}.")

create_persistent_volume(disk_name, pv_name, pvc_name, namespace)

def wait_for_operation(project, zone, operation_name, zone_operations_client):
while True:
result = zone_operations_client.get(project=project, zone=zone, operation=operation_name)
logger.info(f"Waiting for operation {operation_name} {result}")

if result.status == compute_v1.Operation.Status.DONE:
if 'error' in result:
raise Exception(result.error)
return result

time.sleep(20)

def create_persistent_volume(disk_name, pv_name, pvc_name, namespace):
v1 = client.CoreV1Api()

# Delete existing PVC if it exists
try:
existing_pvc = v1.read_namespaced_persistent_volume_claim(name=pvc_name, namespace=namespace)
if existing_pvc:
logger.info(f"PVC {pvc_name} already exists. Deleting it.")
v1.delete_namespaced_persistent_volume_claim(name=pvc_name, namespace=namespace)
logger.info(f"PVC {pvc_name} deleted.")
except client.exceptions.ApiException as e:
if e.status != 404:
raise

# Delete existing PV if it exists
try:
existing_pv = v1.read_persistent_volume(name=pv_name)
if existing_pv:
logger.info(f"PV {pv_name} already exists. Deleting it.")
v1.delete_persistent_volume(name=pv_name)
logger.info(f"PV {pv_name} deleted.")
except client.exceptions.ApiException as e:
if e.status != 404:
raise

# Create PersistentVolume
pv = client.V1PersistentVolume(
api_version="v1",
kind="PersistentVolume",
metadata=client.V1ObjectMeta(name=pv_name),
spec=client.V1PersistentVolumeSpec(
capacity={"storage": "10000Gi"},
access_modes=["ReadOnlyMany"],
gce_persistent_disk=client.V1GCEPersistentDiskVolumeSource(
pd_name=disk_name,
fs_type="xfs",
read_only=True
),
persistent_volume_reclaim_policy="Retain",
storage_class_name="standard"
)
)

# Create PersistentVolumeClaim
pvc = client.V1PersistentVolumeClaim(
api_version="v1",
kind="PersistentVolumeClaim",
metadata=client.V1ObjectMeta(name=pvc_name, namespace=namespace),
spec=client.V1PersistentVolumeClaimSpec(
access_modes=["ReadOnlyMany"],
resources=client.V1ResourceRequirements(
requests={"storage": "10000Gi"}
),
storage_class_name="standard",
volume_name=pv_name
)
)

v1.create_persistent_volume(body=pv)
v1.create_namespaced_persistent_volume_claim(namespace=namespace, body=pvc)

def main():
project = "replay-verify"
zone = "us-central1-a"
snapshot_name = "testnet-archive"
prefix = "testnet-archive"
namespace = "default"

tasks = []

for copy in range(DISK_COPIES):
disk_name = f"{prefix}-{copy}"
pv_name = f"{prefix}-{copy}"
pvc_name = f"{prefix}-claim-{copy}"
tasks.append((project, zone, snapshot_name, disk_name, pv_name, pvc_name, namespace))

# Execute tasks in parallel
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(create_disk_pv_pvc_from_snapshot, *task) for task in tasks]
for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
logger.info(f"Task result: {result}")
except Exception as e:
logger.error(f"Task generated an exception: {e}")

if __name__ == "__main__":
main()
Loading
Loading