Skip to content

Commit

Permalink
[SDK-parquet] add parquet version tracker
Browse files Browse the repository at this point in the history
  • Loading branch information
yuunlimm committed Nov 15, 2024
1 parent d0a5380 commit 5376225
Show file tree
Hide file tree
Showing 11 changed files with 604 additions and 111 deletions.
10 changes: 6 additions & 4 deletions rust/processor/src/bq_analytics/generic_parquet_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ where
}

pub struct ParquetHandler<ParquetType>
where
where
ParquetType: NamedTable + NamedTable + HasVersion + HasParquetSchema + 'static + Allocative,
for<'a> &'a [ParquetType]: RecordWriter<ParquetType>,
{
{
pub schema: Arc<Type>,
pub writer: SerializedFileWriter<Vec<u8>>,
pub buffer: Vec<ParquetType>,
Expand All @@ -72,7 +72,8 @@ where
pub last_upload_time: Instant,
pub processor_name: String,
}
fn create_new_writer(schema: Arc<Type>) -> Result<SerializedFileWriter<Vec<u8>>> {

pub fn create_new_writer(schema: Arc<Type>) -> Result<SerializedFileWriter<Vec<u8>>> {
let props = WriterProperties::builder()
.set_compression(parquet::basic::Compression::LZ4)
.build();
Expand All @@ -83,7 +84,8 @@ fn create_new_writer(schema: Arc<Type>) -> Result<SerializedFileWriter<Vec<u8>>>

impl<ParquetType> ParquetHandler<ParquetType>
where
ParquetType: Allocative + GetTimeStamp + HasVersion + HasParquetSchema + 'static + NamedTable,
ParquetType: Allocative +
GetTimeStamp + HasVersion + HasParquetSchema + 'static + NamedTable,
for<'a> &'a [ParquetType]: RecordWriter<ParquetType>,
{
fn create_new_writer(&self) -> Result<SerializedFileWriter<Vec<u8>>> {
Expand Down
2 changes: 1 addition & 1 deletion rust/processor/src/processors/default_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ impl ProcessorTrait for DefaultProcessor {
}
}

// TODO: we can further optimize this by passing in a falg to selectively parse only the required data (e.g. table_items for parquet)
// TODO: we can further optimize this by passing in a flag to selectively parse only the required data (e.g. table_items for parquet)
/// Processes a list of transactions and extracts relevant data into different models.
///
/// This function iterates over a list of transactions, extracting block metadata transactions,
Expand Down
1 change: 0 additions & 1 deletion rust/sdk-processor/src/config/processor_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ impl ProcessorConfig {
ProcessorConfig::ParquetDefaultProcessor(config) => {
// Get the processor name as a prefix
let prefix = self.name();

// Collect valid table names from `ParquetTypeEnum` into a set for quick lookup
let valid_table_names: HashSet<String> =
ParquetTypeEnum::iter().map(|e| e.to_string()).collect();
Expand Down
78 changes: 78 additions & 0 deletions rust/sdk-processor/src/parquet_processors/mod.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,30 @@
use crate::{
config::{db_config::DbConfig, processor_config::ParquetDefaultProcessorConfig},
steps::{
common::gcs_uploader::{create_new_writer, GCSUploader},
parquet_default_processor::size_buffer::ParquetBufferStep,
},
utils::database::{new_db_pool, ArcDbPool},
};
use aptos_indexer_processor_sdk::utils::errors::ProcessorError;
use google_cloud_storage::client::{Client as GCSClient, ClientConfig as GcsClientConfig};
use parquet::schema::types::Type;
use processor::db::parquet::models::default_models::{
parquet_move_modules::MoveModule, parquet_move_resources::MoveResource,
parquet_move_tables::TableItem, parquet_transactions::Transaction as ParquetTransaction,
parquet_write_set_changes::WriteSetChangeModel,
};
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, sync::Arc, time::Duration};
use strum::{Display, EnumIter};

pub mod parquet_default_processor;

const GOOGLE_APPLICATION_CREDENTIALS: &str = "GOOGLE_APPLICATION_CREDENTIALS";

/// Enum representing the different types of Parquet files that can be processed.
#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Display, EnumIter)]
#[strum(serialize_all = "snake_case")]
#[cfg_attr(
test,
derive(strum::EnumDiscriminants),
Expand Down Expand Up @@ -123,6 +137,70 @@ impl ParquetTypeStructs {
}
}

async fn initialize_gcs_client(credentials: Option<String>) -> Arc<GCSClient> {
if let Some(credentials) = credentials {
std::env::set_var(GOOGLE_APPLICATION_CREDENTIALS, credentials);
}

let gcs_config = GcsClientConfig::default()
.with_auth()
.await
.expect("Failed to create GCS client config");

Arc::new(GCSClient::new(gcs_config))
}

async fn initialize_database_pool(config: &DbConfig) -> anyhow::Result<ArcDbPool> {
match config {
DbConfig::PostgresConfig(ref postgres_config) => {
let conn_pool = new_db_pool(
&postgres_config.connection_string,
Some(postgres_config.db_pool_size),
)
.await
.map_err(|e| {
anyhow::anyhow!(
"Failed to create connection pool for PostgresConfig: {:?}",
e
)
})?;
Ok(conn_pool)
},
}
}

async fn initialize_parquet_buffer_step(
gcs_client: Arc<GCSClient>,
parquet_processor_config: ParquetDefaultProcessorConfig,
parquet_type_to_schemas: HashMap<ParquetTypeEnum, Arc<Type>>,
processor_name: String,
) -> anyhow::Result<ParquetBufferStep<GCSUploader>> {
let parquet_type_to_writer = parquet_type_to_schemas
.iter()
.map(|(key, schema)| {
let writer = create_new_writer(schema.clone()).expect("Failed to create writer");
(*key, writer)
})
.collect();

let buffer_uploader = GCSUploader::new(
gcs_client,
parquet_type_to_schemas,
parquet_type_to_writer,
parquet_processor_config.bucket_name.clone(),
parquet_processor_config.bucket_root.clone(),
processor_name,
)?;

let default_size_buffer_step = ParquetBufferStep::new(
Duration::from_secs(parquet_processor_config.parquet_upload_interval),
buffer_uploader,
parquet_processor_config.max_buffer_size,
);

Ok(default_size_buffer_step)
}

#[cfg(test)]
mod test {
use super::*;
Expand Down
137 changes: 53 additions & 84 deletions rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,30 @@ use crate::{
db_config::DbConfig, indexer_processor_config::IndexerProcessorConfig,
processor_config::ProcessorConfig,
},
parquet_processors::ParquetTypeEnum,
parquet_processors::{
initialize_database_pool, initialize_gcs_client, initialize_parquet_buffer_step,
ParquetTypeEnum,
},
steps::{
common::gcs_uploader::{create_new_writer, GCSUploader},
common::processor_status_saver::get_parquet_processor_status_saver,
parquet_default_processor::{
parquet_default_extractor::ParquetDefaultExtractor, size_buffer::ParquetBufferStep,
parquet_default_extractor::ParquetDefaultExtractor,
parquet_version_tracker_step::ParquetVersionTrackerStep,
},
},
utils::{
chain_id::check_or_update_chain_id,
database::{new_db_pool, run_migrations, ArcDbPool},
starting_version::{get_min_last_success_version_parquet, get_starting_version},
database::{run_migrations, ArcDbPool},
starting_version::get_min_last_success_version_parquet,
},
};
use anyhow::Context;
use aptos_indexer_processor_sdk::{
aptos_indexer_transaction_stream::{TransactionStream, TransactionStreamConfig},
builder::ProcessorBuilder,
common_steps::TransactionStreamStep,
common_steps::{TransactionStreamStep, DEFAULT_UPDATE_PROCESSOR_STATUS_SECS},
traits::{processor_trait::ProcessorTrait, IntoRunnableStep},
};
use google_cloud_storage::client::{Client as GCSClient, ClientConfig as GcsClientConfig};
use parquet::schema::types::Type;
use processor::{
bq_analytics::generic_parquet_processor::HasParquetSchema,
Expand All @@ -34,38 +37,18 @@ use processor::{
},
worker::TableFlags,
};
use std::{collections::HashMap, sync::Arc, time::Duration};
use std::{collections::HashMap, sync::Arc};
use tracing::{debug, info};

const GOOGLE_APPLICATION_CREDENTIALS: &str = "GOOGLE_APPLICATION_CREDENTIALS";

pub struct ParquetDefaultProcessor {
pub config: IndexerProcessorConfig,
pub db_pool: ArcDbPool, // for processor status
pub db_pool: ArcDbPool,
}

impl ParquetDefaultProcessor {
pub async fn new(config: IndexerProcessorConfig) -> anyhow::Result<Self> {
match config.db_config {
DbConfig::PostgresConfig(ref postgres_config) => {
let conn_pool = new_db_pool(
&postgres_config.connection_string,
Some(postgres_config.db_pool_size),
)
.await
.map_err(|e| {
anyhow::anyhow!(
"Failed to create connection pool for PostgresConfig: {:?}",
e
)
})?;

Ok(Self {
config,
db_pool: conn_pool,
})
},
}
let db_pool = initialize_database_pool(&config.db_config).await?;
Ok(Self { config, db_pool })
}
}

Expand All @@ -87,25 +70,6 @@ impl ProcessorTrait for ParquetDefaultProcessor {
},
}

// Determine the processing mode (backfill or regular)
let is_backfill = self.config.backfill_config.is_some();

// TODO: Revisit when parquet version tracker is available.
// Query the starting version
let starting_version = if is_backfill {
get_starting_version(&self.config, self.db_pool.clone()).await?
} else {
// Regular mode logic: Fetch the minimum last successful version across all relevant tables
let table_names = self
.config
.processor_config
.get_table_names()
.context("Failed to get table names for the processor")?;

get_min_last_success_version_parquet(&self.config, self.db_pool.clone(), table_names)
.await?
};

// Check and update the ledger chain id to ensure we're indexing the correct chain
let grpc_chain_id = TransactionStream::new(self.config.transaction_stream_config.clone())
.await?
Expand All @@ -125,6 +89,22 @@ impl ProcessorTrait for ParquetDefaultProcessor {
},
};

// TODO: Revisit when parquet version tracker is available.
// Query the starting version
let table_names = if let Some(backfill_config) = &self.config.backfill_config {
// for backfill we will only backfill one table per job
vec![backfill_config.backfill_alias.clone()]
} else {
self.config
.processor_config
.get_table_names()
.context("Failed to get table names for the processor")?
};

let starting_version =
get_min_last_success_version_parquet(&self.config, self.db_pool.clone(), table_names)
.await?;

// Define processor transaction stream config
let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig {
starting_version: Some(starting_version),
Expand All @@ -136,20 +116,12 @@ impl ProcessorTrait for ParquetDefaultProcessor {
opt_in_tables: TableFlags::empty(),
};

let credentials = parquet_processor_config
.google_application_credentials
.clone();

if let Some(credentials) = credentials {
std::env::set_var(GOOGLE_APPLICATION_CREDENTIALS, credentials);
}

let gcs_config = GcsClientConfig::default()
.with_auth()
.await
.expect("Failed to create GCS client config");

let gcs_client = Arc::new(GCSClient::new(gcs_config));
let gcs_client = initialize_gcs_client(
parquet_processor_config
.google_application_credentials
.clone(),
)
.await;

let parquet_type_to_schemas: HashMap<ParquetTypeEnum, Arc<Type>> = [
(ParquetTypeEnum::MoveResource, MoveResource::schema()),
Expand All @@ -164,37 +136,34 @@ impl ProcessorTrait for ParquetDefaultProcessor {
.into_iter()
.collect();

let parquet_type_to_writer = parquet_type_to_schemas
.iter()
.map(|(key, schema)| {
let writer = create_new_writer(schema.clone()).expect("Failed to create writer");
(*key, writer)
})
.collect();

let buffer_uploader = GCSUploader::new(
let default_size_buffer_step = initialize_parquet_buffer_step(
gcs_client.clone(),
parquet_processor_config.clone(),
parquet_type_to_schemas,
parquet_type_to_writer,
parquet_processor_config.bucket_name.clone(),
parquet_processor_config.bucket_root.clone(),
self.name().to_string(),
)?;

let channel_size = parquet_processor_config.channel_size;

let default_size_buffer_step = ParquetBufferStep::new(
Duration::from_secs(parquet_processor_config.parquet_upload_interval),
buffer_uploader,
parquet_processor_config.max_buffer_size,
)
.await
.unwrap_or_else(|e| {
panic!("Failed to initialize parquet buffer step: {:?}", e);
});

let parquet_version_tracker_step = ParquetVersionTrackerStep::new(
get_parquet_processor_status_saver(self.db_pool.clone(), self.config.clone()),
DEFAULT_UPDATE_PROCESSOR_STATUS_SECS,
);

let channel_size = parquet_processor_config.channel_size;

// Connect processor steps together
let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step(
transaction_stream.into_runnable_step(),
)
.connect_to(parquet_default_extractor.into_runnable_step(), channel_size)
.connect_to(default_size_buffer_step.into_runnable_step(), channel_size)
.connect_to(
parquet_version_tracker_step.into_runnable_step(),
channel_size,
)
.end_and_return_output_receiver(channel_size);

loop {
Expand Down
Loading

0 comments on commit 5376225

Please sign in to comment.