Skip to content

Commit

Permalink
Create a file checker.
Browse files Browse the repository at this point in the history
  • Loading branch information
larry-aptos committed Oct 11, 2024
1 parent 0efb4fc commit 4dd8777
Show file tree
Hide file tree
Showing 9 changed files with 320 additions and 3 deletions.
19 changes: 19 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ members = [
"dkg",
"ecosystem/indexer-grpc/indexer-grpc-cache-worker",
"ecosystem/indexer-grpc/indexer-grpc-data-service",
"ecosystem/indexer-grpc/indexer-grpc-file-checker",
"ecosystem/indexer-grpc/indexer-grpc-file-store",
"ecosystem/indexer-grpc/indexer-grpc-file-store-backfiller",
"ecosystem/indexer-grpc/indexer-grpc-fullnode",
Expand Down Expand Up @@ -360,6 +361,7 @@ aptos-indexer = { path = "crates/indexer" }
aptos-indexer-grpc-cache-worker = { path = "ecosystem/indexer-grpc/indexer-grpc-cache-worker" }
aptos-indexer-grpc-data-service = { path = "ecosystem/indexer-grpc/indexer-grpc-data-service" }
aptos-indexer-grpc-file-store = { path = "ecosystem/indexer-grpc/indexer-grpc-file-store" }
aptos-indexer-grpc-file-checker = { path = "ecosystem/indexer-grpc/indexer-grpc-file-checker" }
aptos-indexer-grpc-file-store-backfiller = { path = "ecosystem/indexer-grpc/indexer-grpc-file-store-backfiller" }
aptos-indexer-grpc-fullnode = { path = "ecosystem/indexer-grpc/indexer-grpc-fullnode" }
aptos-indexer-grpc-in-memory-cache-benchmark = { path = "ecosystem/indexer-grpc/indexer-grpc-in-memory-cache-benchmark" }
Expand Down
4 changes: 2 additions & 2 deletions docker/builder/build-indexer.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ cargo build --locked --profile=$PROFILE \
-p aptos-indexer-grpc-file-store \
-p aptos-indexer-grpc-data-service \
-p aptos-nft-metadata-crawler \
-p aptos-indexer-grpc-file-store-backfiller \
-p aptos-indexer-grpc-file-checker \
"$@"

# After building, copy the binaries we need to `dist` since the `target` directory is used as docker cache mount and only available during the RUN step
Expand All @@ -25,7 +25,7 @@ BINS=(
aptos-indexer-grpc-file-store
aptos-indexer-grpc-data-service
aptos-nft-metadata-crawler
aptos-indexer-grpc-file-store-backfiller
aptos-indexer-grpc-file-checker
)

mkdir dist
Expand Down
2 changes: 1 addition & 1 deletion docker/builder/indexer-grpc.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ RUN --mount=type=cache,target=/var/cache/apt,sharing=locked \
COPY --link --from=indexer-builder /aptos/dist/aptos-indexer-grpc-cache-worker /usr/local/bin/aptos-indexer-grpc-cache-worker
COPY --link --from=indexer-builder /aptos/dist/aptos-indexer-grpc-file-store /usr/local/bin/aptos-indexer-grpc-file-store
COPY --link --from=indexer-builder /aptos/dist/aptos-indexer-grpc-data-service /usr/local/bin/aptos-indexer-grpc-data-service
COPY --link --from=indexer-builder /aptos/dist/aptos-indexer-grpc-file-store-backfiller /usr/local/bin/aptos-indexer-grpc-file-store-backfiller
COPY --link --from=indexer-builder /aptos/dist/aptos-indexer-grpc-file-checker /usr/local/bin/aptos-indexer-grpc-file-checker

# The health check port
EXPOSE 8080
Expand Down
30 changes: 30 additions & 0 deletions ecosystem/indexer-grpc/indexer-grpc-file-checker/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
[package]
name = "aptos-indexer-grpc-file-checker"
description = "Indexer gRPC file checker."
version = "1.0.0"

# Workspace inherited keys
authors = { workspace = true }
edition = { workspace = true }
homepage = { workspace = true }
license = { workspace = true }
publish = { workspace = true }
repository = { workspace = true }
rust-version = { workspace = true }

[dependencies]
anyhow = { workspace = true }
aptos-indexer-grpc-server-framework = { workspace = true }
aptos-indexer-grpc-utils = { workspace = true }
aptos-metrics-core = { workspace = true }
async-trait = { workspace = true }
clap = { workspace = true }
cloud-storage = { workspace = true }
once_cell = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }

[target.'cfg(unix)'.dependencies]
jemallocator = { workspace = true }
14 changes: 14 additions & 0 deletions ecosystem/indexer-grpc/indexer-grpc-file-checker/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Indexer GRPC file checker
A program that compares files in two buckets and to make sure the content are the same.

## How to run it.

Example of config:

```
health_check_port: 8081
server_config:
existing_bucket_name: bucket_being_used
new_bucket_name: bucket_with_new_sharding
starting_version: 123123
```
44 changes: 44 additions & 0 deletions ecosystem/indexer-grpc/indexer-grpc-file-checker/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

pub mod processor;

use anyhow::Result;
use aptos_indexer_grpc_server_framework::RunnableConfig;
use processor::Processor;
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct IndexerGrpcFileCheckerConfig {
pub existing_bucket_name: String,
pub new_bucket_name: String,
pub starting_version: u64,
}

impl From<IndexerGrpcFileCheckerConfig> for Processor {
fn from(val: IndexerGrpcFileCheckerConfig) -> Self {
Processor {
existing_bucket_name: val.existing_bucket_name,
new_bucket_name: val.new_bucket_name,
starting_version: val.starting_version,
}
}
}

#[async_trait::async_trait]
impl RunnableConfig for IndexerGrpcFileCheckerConfig {
async fn run(&self) -> Result<()> {
let processor: Processor = self.clone().into();

processor
.run()
.await
.expect("File checker exited unexpectedly");
Ok(())
}

fn get_server_name(&self) -> String {
"idxfilechk".to_string()
}
}
20 changes: 20 additions & 0 deletions ecosystem/indexer-grpc/indexer-grpc-file-checker/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use anyhow::Result;
use aptos_indexer_grpc_file_checker::IndexerGrpcFileCheckerConfig;
use aptos_indexer_grpc_server_framework::ServerArgs;
use clap::Parser;

#[cfg(unix)]
#[global_allocator]
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;

#[tokio::main]
async fn main() -> Result<()> {
let args = ServerArgs::parse();
args.run::<IndexerGrpcFileCheckerConfig>()
.await
.expect("Failed to run server");
Ok(())
}
188 changes: 188 additions & 0 deletions ecosystem/indexer-grpc/indexer-grpc-file-checker/src/processor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use anyhow::{ensure, Context, Result};
use aptos_indexer_grpc_utils::compression_util::{FileEntry, StorageFormat};
use aptos_metrics_core::{register_int_counter, IntCounter};
use cloud_storage::{Client, Error as CloudStorageError};
use once_cell::sync::Lazy;
use serde::{de::DeserializeOwned, Deserialize, Serialize};

pub static FILE_DIFF_COUNTER: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"indexer_grpc_file_checker_file_diff",
"Count of the files that are different.",
)
.unwrap()
});

const PROGRESS_FILE_NAME: &str = "file_checker_progress.json";
const METADATA_FILE_NAME: &str = "metadata.json";

/// Checker compares the data in the existing bucket with the data in the new bucket.
/// The progress is saved in a file under the new bucket.
pub struct Processor {
/// Existing bucket name.
pub existing_bucket_name: String,
/// New bucket name; this job is to make sure the data in the new bucket is correct.
pub new_bucket_name: String,
/// The version to start from. This is for **bootstrapping** the file checker only.
pub starting_version: u64,
}

#[derive(Debug, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct ProgressFile {
file_checker_version: u64,
file_checker_chain_id: u64,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct MetadataFile {
chain_id: u64,
}

impl Processor {
pub async fn run(&self) -> Result<()> {
let (client, mut progress_file) = self.init().await?;

loop {
let current_version = progress_file.file_checker_version;

let file_name =
FileEntry::build_key(current_version, StorageFormat::Lz4CompressedProto);
let existing_file =
download_raw_file(&client, &self.existing_bucket_name, &file_name).await?;
let new_file = download_raw_file(&client, &self.new_bucket_name, &file_name).await?;
if existing_file.is_none() || new_file.is_none() {
let bucket_name = if existing_file.is_none() {
&self.existing_bucket_name
} else {
&self.new_bucket_name
};
tracing::info!(
bucket_name = bucket_name,
file_name = file_name.as_str(),
"Transaction file is not found in one of the buckets.");
// Wait for the next file to be uploaded.
tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
continue;
}
// Compare the files.
let existing_file = existing_file.unwrap();
let new_file = new_file.unwrap();
if existing_file != new_file {
// Files are different.
tracing::error!("Files are different: {}", file_name);
FILE_DIFF_COUNTER.inc();

// Sleep for a while to allow metrics to be updated.
tokio::time::sleep(tokio::time::Duration::from_secs(120)).await;
panic!("Files are different: {}", file_name);
}
tracing::info!(
file_name = file_name.as_str(),
transaction_version = progress_file.file_checker_version,
"File is verified.");

progress_file.file_checker_version += 1000;
// Upload the progress file.
let progress_file_bytes =
serde_json::to_vec(&progress_file).context("Failed to serialize progress file.")?;
client
.object()
.create(
&self.new_bucket_name,
progress_file_bytes,
PROGRESS_FILE_NAME,
"application/json",
)
.await
.context("Update progress file failure")?;
tracing::info!("Progress file is updated.");
}
}

/// Initialize the processor.
pub async fn init(&self) -> Result<(Client, ProgressFile)> {
let client = Client::new();

// All errors are considered fatal: files must exist for the processor to work.
let existing_metadata =
download_file::<MetadataFile>(&client, &self.existing_bucket_name, METADATA_FILE_NAME)
.await
.context("Failed to get metadata.")?
.expect("Failed to download metadata file");
let new_metadata =
download_file::<MetadataFile>(&client, &self.new_bucket_name, METADATA_FILE_NAME)
.await
.context("Failed to get metadata.")?
.expect("Failed to download metadata file");

// Ensure the chain IDs match.
ensure!(
existing_metadata.chain_id == new_metadata.chain_id,
"Chain IDs do not match: {} != {}",
existing_metadata.chain_id,
new_metadata.chain_id
);

let progress_file =
download_file::<ProgressFile>(&client, &self.new_bucket_name, PROGRESS_FILE_NAME)
.await
.context("Failed to get progress file.")?
.unwrap_or(ProgressFile {
file_checker_version: self.starting_version,
file_checker_chain_id: existing_metadata.chain_id,
});
// Ensure the chain IDs match.
ensure!(
existing_metadata.chain_id == progress_file.file_checker_chain_id,
"Chain IDs do not match: {} != {}",
existing_metadata.chain_id,
progress_file.file_checker_chain_id
);
tracing::info!(
starting_version = self.starting_version,
"Processor initialized.",
);

Ok((client, progress_file))
}
}

async fn download_raw_file(
client: &Client,
bucket_name: &str,
file_name: &str,
) -> Result<Option<Vec<u8>>> {
let file = client.object().download(bucket_name, file_name).await;
match file {
Ok(file) => Ok(Some(file)),
Err(cloud_storage::Error::Other(err)) => {
if err.contains("No such object: ") {
Ok(None)
} else {
anyhow::bail!(
"[Indexer File] Error happens when downloading transaction file. {}",
err
);
}
},
Err(e) => Err(e.into()),
}
}

async fn download_file<T>(client: &Client, bucket_name: &str, file_name: &str) -> Result<Option<T>>
where
T: DeserializeOwned,
{
let file = download_raw_file(client, bucket_name, file_name).await?;
match file {
Some(file) => {
let file = serde_json::from_slice(&file).context("Failed to parse file.")?;
Ok(Some(file))
},
None => Ok(None),
}
}

0 comments on commit 4dd8777

Please sign in to comment.