Skip to content

Commit

Permalink
[SDK-parquet] parquet sized buffer and gcs handler
Browse files Browse the repository at this point in the history
  • Loading branch information
yuunlimm committed Nov 14, 2024
1 parent 8f43e54 commit f1ca3f9
Show file tree
Hide file tree
Showing 8 changed files with 632 additions and 23 deletions.
4 changes: 4 additions & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 11 additions & 5 deletions rust/sdk-processor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ edition = "2021"

[dependencies]
ahash = { workspace = true }
allocative = { workspace = true }
anyhow = { workspace = true }
aptos-indexer-processor-sdk = { workspace = true }
aptos-indexer-processor-sdk-server-framework = { workspace = true }
Expand All @@ -21,11 +22,20 @@ diesel_migrations = { workspace = true }
field_count = { workspace = true }
futures = { workspace = true }
futures-util = { workspace = true }

google-cloud-storage = { workspace = true }
hex = { workspace = true }
jemallocator = { workspace = true }
kanal = { workspace = true }
lazy_static = { workspace = true }

# Postgres SSL support
native-tls = { workspace = true }
num_cpus = { workspace = true }
# Parquet support
parquet = { workspace = true }
parquet_derive = { workspace = true }
postgres-native-tls = { workspace = true }
processor = { workspace = true }
rayon = { workspace = true }
serde = { workspace = true }
Expand All @@ -34,14 +44,10 @@ sha2 = { workspace = true }
strum = { workspace = true }
tiny-keccak = { workspace = true }
tokio = { workspace = true }
tokio-postgres = { workspace = true }
tracing = { workspace = true }
url = { workspace = true }

# Postgres SSL support
native-tls = { workspace = true }
postgres-native-tls = { workspace = true }
tokio-postgres = { workspace = true }

[features]
libpq = ["diesel/postgres"]
# When using the default features we enable the diesel/postgres feature. We configure
Expand Down
18 changes: 8 additions & 10 deletions rust/sdk-processor/src/config/processor_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,8 @@ pub struct ParquetDefaultProcessorConfig {
pub bucket_name: String,
#[serde(default)]
pub bucket_root: String,
#[serde(
default = "ParquetDefaultProcessorConfig::default_parquet_handler_response_channel_size"
)]
pub parquet_handler_response_channel_size: usize,
#[serde(default = "ParquetDefaultProcessorConfig::default_channel_size")]
pub channel_size: usize,
#[serde(default = "ParquetDefaultProcessorConfig::default_max_buffer_size")]
pub max_buffer_size: usize,
#[serde(default = "ParquetDefaultProcessorConfig::default_parquet_upload_interval")]
Expand All @@ -165,7 +163,7 @@ pub struct ParquetDefaultProcessorConfig {
impl ParquetDefaultProcessorConfig {
/// Make the default very large on purpose so that by default it's not chunked
/// This prevents any unexpected changes in behavior
pub const fn default_parquet_handler_response_channel_size() -> usize {
pub const fn default_channel_size() -> usize {
100_000
}

Expand All @@ -191,7 +189,7 @@ mod tests {
bucket_name: "bucket_name".to_string(),
bucket_root: "bucket_root".to_string(),
google_application_credentials: None,
parquet_handler_response_channel_size: 10,
channel_size: 10,
max_buffer_size: 100000,
parquet_upload_interval: 1800,
});
Expand All @@ -214,7 +212,7 @@ mod tests {
bucket_name: "bucket_name".to_string(),
bucket_root: "bucket_root".to_string(),
google_application_credentials: None,
parquet_handler_response_channel_size: 10,
channel_size: 10,
max_buffer_size: 100000,
parquet_upload_interval: 1800,
});
Expand All @@ -234,7 +232,7 @@ mod tests {
bucket_name: "bucket_name".to_string(),
bucket_root: "bucket_root".to_string(),
google_application_credentials: None,
parquet_handler_response_channel_size: 10,
channel_size: 10,
max_buffer_size: 100000,
parquet_upload_interval: 1800,
});
Expand All @@ -252,7 +250,7 @@ mod tests {
bucket_name: "bucket_name".to_string(),
bucket_root: "bucket_root".to_string(),
google_application_credentials: None,
parquet_handler_response_channel_size: 10,
channel_size: 10,
max_buffer_size: 100000,
parquet_upload_interval: 1800,
});
Expand All @@ -273,7 +271,7 @@ mod tests {
bucket_name: "bucket_name".to_string(),
bucket_root: "bucket_root".to_string(),
google_application_credentials: None,
parquet_handler_response_channel_size: 10,
channel_size: 10,
max_buffer_size: 100000,
parquet_upload_interval: 1800,
});
Expand Down
45 changes: 44 additions & 1 deletion rust/sdk-processor/src/parquet_processors/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use aptos_indexer_processor_sdk::utils::errors::ProcessorError;
use processor::db::common::models::default_models::{
parquet_move_modules::MoveModule, parquet_move_resources::MoveResource,
parquet_move_tables::TableItem, parquet_transactions::Transaction as ParquetTransaction,
Expand Down Expand Up @@ -73,11 +74,53 @@ impl ParquetTypeStructs {
match self {
ParquetTypeStructs::MoveResource(_) => "move_resources",
ParquetTypeStructs::WriteSetChange(_) => "write_set_changes",
ParquetTypeStructs::Transaction(_) => "parquet_transactions",
ParquetTypeStructs::Transaction(_) => "transactions",
ParquetTypeStructs::TableItem(_) => "table_items",
ParquetTypeStructs::MoveModule(_) => "move_modules",
}
}

pub fn calculate_size(&self) -> usize {
match self {
ParquetTypeStructs::MoveResource(data) => allocative::size_of_unique(data),
ParquetTypeStructs::WriteSetChange(data) => allocative::size_of_unique(data),
ParquetTypeStructs::Transaction(data) => allocative::size_of_unique(data),
ParquetTypeStructs::TableItem(data) => allocative::size_of_unique(data),
ParquetTypeStructs::MoveModule(data) => allocative::size_of_unique(data),
}
}

/// Appends data to the current buffer within each ParquetTypeStructs variant.
pub fn append(&mut self, other: ParquetTypeStructs) -> Result<(), ProcessorError> {
match (self, other) {
(ParquetTypeStructs::MoveResource(buf), ParquetTypeStructs::MoveResource(mut data)) => {
buf.append(&mut data);
Ok(())
},
(
ParquetTypeStructs::WriteSetChange(buf),
ParquetTypeStructs::WriteSetChange(mut data),
) => {
buf.append(&mut data);
Ok(())
},
(ParquetTypeStructs::Transaction(buf), ParquetTypeStructs::Transaction(mut data)) => {
buf.append(&mut data);
Ok(())
},
(ParquetTypeStructs::TableItem(buf), ParquetTypeStructs::TableItem(mut data)) => {
buf.append(&mut data);
Ok(())
},
(ParquetTypeStructs::MoveModule(buf), ParquetTypeStructs::MoveModule(mut data)) => {
buf.append(&mut data);
Ok(())
},
_ => Err(ProcessorError::ProcessError {
message: "Mismatched buffer types in append operation".to_string(),
}),
}
}
}

#[cfg(test)]
Expand Down
124 changes: 117 additions & 7 deletions rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ use crate::{
db_config::DbConfig, indexer_processor_config::IndexerProcessorConfig,
processor_config::ProcessorConfig,
},
parquet_processors::ParquetTypeEnum,
steps::parquet_default_processor::{
gcs_handler::{create_new_writer, GCSUploader},
parquet_default_extractor::ParquetDefaultExtractor,
size_buffer::SizeBufferStep,
},
utils::{
chain_id::check_or_update_chain_id,
database::{new_db_pool, run_migrations, ArcDbPool},
Expand All @@ -11,8 +17,26 @@ use crate::{
};
use anyhow::Context;
use aptos_indexer_processor_sdk::{
aptos_indexer_transaction_stream::TransactionStream, traits::processor_trait::ProcessorTrait,
aptos_indexer_transaction_stream::{TransactionStream, TransactionStreamConfig},
builder::ProcessorBuilder,
common_steps::TransactionStreamStep,
traits::{processor_trait::ProcessorTrait, IntoRunnableStep},
};
use google_cloud_storage::client::{Client as GCSClient, ClientConfig as GcsClientConfig};
use parquet::schema::types::Type;
use processor::{
bq_analytics::generic_parquet_processor::HasParquetSchema,
db::common::models::default_models::{
parquet_move_modules::MoveModule, parquet_move_resources::MoveResource,
parquet_move_tables::TableItem, parquet_transactions::Transaction as ParquetTransaction,
parquet_write_set_changes::WriteSetChangeModel,
},
worker::TableFlags,
};
use std::{collections::HashMap, sync::Arc, time::Duration};
use tracing::{debug, info};

const GOOGLE_APPLICATION_CREDENTIALS: &str = "GOOGLE_APPLICATION_CREDENTIALS";

pub struct ParquetDefaultProcessor {
pub config: IndexerProcessorConfig,
Expand Down Expand Up @@ -65,9 +89,10 @@ impl ProcessorTrait for ParquetDefaultProcessor {
// Determine the processing mode (backfill or regular)
let is_backfill = self.config.backfill_config.is_some();

// TODO: Revisit when parquet version tracker is available.
// Query the starting version
let _starting_version = if is_backfill {
get_starting_version(&self.config, self.db_pool.clone()).await?;
let starting_version = if is_backfill {
get_starting_version(&self.config, self.db_pool.clone()).await?
} else {
// Regular mode logic: Fetch the minimum last successful version across all relevant tables
let table_names = self
Expand All @@ -77,7 +102,7 @@ impl ProcessorTrait for ParquetDefaultProcessor {
.context("Failed to get table names for the processor")?;

get_min_last_success_version_parquet(&self.config, self.db_pool.clone(), table_names)
.await?;
.await?
};

// Check and update the ledger chain id to ensure we're indexing the correct chain
Expand All @@ -87,7 +112,7 @@ impl ProcessorTrait for ParquetDefaultProcessor {
.await?;
check_or_update_chain_id(grpc_chain_id as i64, self.db_pool.clone()).await?;

let _parquet_processor_config = match self.config.processor_config.clone() {
let parquet_processor_config = match self.config.processor_config.clone() {
ProcessorConfig::ParquetDefaultProcessor(parquet_processor_config) => {
parquet_processor_config
},
Expand All @@ -99,7 +124,92 @@ impl ProcessorTrait for ParquetDefaultProcessor {
},
};

// Define processor steps
Ok(())
// Define processor transaction stream config
let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig {
starting_version: Some(starting_version),
..self.config.transaction_stream_config.clone()
})
.await?;

let parquet_default_extractor = ParquetDefaultExtractor {
opt_in_tables: TableFlags::empty(),
};

let credentials = parquet_processor_config
.google_application_credentials
.clone();

if let Some(credentials) = credentials {
std::env::set_var(GOOGLE_APPLICATION_CREDENTIALS, credentials);
}

let gcs_config = GcsClientConfig::default()
.with_auth()
.await
.expect("Failed to create GCS client config");

let gcs_client = Arc::new(GCSClient::new(gcs_config));

let parquet_type_to_schemas: HashMap<ParquetTypeEnum, Arc<Type>> = [
(ParquetTypeEnum::MoveResource, MoveResource::schema()),
(
ParquetTypeEnum::WriteSetChange,
WriteSetChangeModel::schema(),
),
(ParquetTypeEnum::Transaction, ParquetTransaction::schema()),
(ParquetTypeEnum::TableItem, TableItem::schema()),
(ParquetTypeEnum::MoveModule, MoveModule::schema()),
]
.into_iter()
.collect();

let parquet_type_to_writer = parquet_type_to_schemas
.iter()
.map(|(key, schema)| {
let writer = create_new_writer(schema.clone()).expect("Failed to create writer");
(*key, writer)
})
.collect();

let buffer_uploader = GCSUploader::new(
gcs_client.clone(),
parquet_type_to_schemas,
parquet_type_to_writer,
parquet_processor_config.bucket_name.clone(),
parquet_processor_config.bucket_root.clone(),
self.name().to_string(),
)?;

let channel_size = parquet_processor_config.channel_size;

let default_size_buffer_step = SizeBufferStep::new(
Duration::from_secs(parquet_processor_config.parquet_upload_interval),
buffer_uploader,
parquet_processor_config.max_buffer_size,
);

// Connect processor steps together
let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step(
transaction_stream.into_runnable_step(),
)
.connect_to(parquet_default_extractor.into_runnable_step(), channel_size)
.connect_to(default_size_buffer_step.into_runnable_step(), channel_size)
.end_and_return_output_receiver(channel_size);

// (Optional) Parse the results
loop {
match buffer_receiver.recv().await {
Ok(txn_context) => {
debug!(
"Finished processing versions [{:?}, {:?}]",
txn_context.metadata.start_version, txn_context.metadata.end_version,
);
},
Err(e) => {
info!("No more transactions in channel: {:?}", e);
break Ok(());
},
}
}
}
}
Loading

0 comments on commit f1ca3f9

Please sign in to comment.