Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SDK-parquet] add parquet version tracker #609

Open
wants to merge 3 commits into
base: 11-12-_sdk-parquet_parquet_sized_buffer_and_gcs_handler
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion rust/processor/src/bq_analytics/generic_parquet_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ where
pub last_upload_time: Instant,
pub processor_name: String,
}
fn create_new_writer(schema: Arc<Type>) -> Result<SerializedFileWriter<Vec<u8>>> {

pub fn create_new_writer(schema: Arc<Type>) -> Result<SerializedFileWriter<Vec<u8>>> {
let props = WriterProperties::builder()
.set_compression(parquet::basic::Compression::LZ4)
.build();
Expand Down
2 changes: 1 addition & 1 deletion rust/processor/src/processors/default_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ impl ProcessorTrait for DefaultProcessor {
}
}

// TODO: we can further optimize this by passing in a falg to selectively parse only the required data (e.g. table_items for parquet)
// TODO: we can further optimize this by passing in a flag to selectively parse only the required data (e.g. table_items for parquet)
/// Processes a list of transactions and extracts relevant data into different models.
///
/// This function iterates over a list of transactions, extracting block metadata transactions,
Expand Down
96 changes: 60 additions & 36 deletions rust/sdk-processor/src/config/processor_config.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use crate::{
parquet_processors::ParquetTypeEnum,
processors::{
ans_processor::AnsProcessorConfig, objects_processor::ObjectsProcessorConfig,
stake_processor::StakeProcessorConfig, token_v2_processor::TokenV2ProcessorConfig,
},
utils::parquet_processor_table_mapping::VALID_TABLE_NAMES,
};
use ahash::AHashMap;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use strum::IntoEnumIterator;

/// This enum captures the configs for all the different processors that are defined.
///
Expand Down Expand Up @@ -72,29 +71,18 @@ impl ProcessorConfig {
match self {
ProcessorConfig::ParquetDefaultProcessor(config) => {
// Get the processor name as a prefix
let prefix = self.name();
let processor_name = self.name();

// Collect valid table names from `ParquetTypeEnum` into a set for quick lookup
let valid_table_names: HashSet<String> =
ParquetTypeEnum::iter().map(|e| e.to_string()).collect();
let valid_table_names = VALID_TABLE_NAMES
.get(processor_name)
.ok_or_else(|| anyhow::anyhow!("Processor type not recognized"))?;

// Validate and map table names with prefix
let mut validated_table_names = Vec::new();
for table_name in &config.tables {
// Ensure the table name is a valid `ParquetTypeEnum` variant
if !valid_table_names.contains(table_name) {
return Err(anyhow::anyhow!(
"Invalid table name '{}'. Expected one of: {:?}",
table_name,
valid_table_names
));
}

// Append the prefix to the validated table name
validated_table_names.push(Self::format_table_name(prefix, table_name));
}

Ok(validated_table_names)
// Use the helper function for validation and mapping
Self::validate_and_prefix_table_names(
&config.backfill_table,
valid_table_names,
processor_name,
)
},
_ => Err(anyhow::anyhow!(
"Invalid parquet processor config: {:?}",
Expand All @@ -107,6 +95,26 @@ impl ProcessorConfig {
fn format_table_name(prefix: &str, table_name: &str) -> String {
format!("{}.{}", prefix, table_name)
}

fn validate_and_prefix_table_names(
table_names: &HashSet<String>,
valid_table_names: &HashSet<String>,
prefix: &str,
) -> anyhow::Result<Vec<String>> {
table_names
.iter()
.map(|table_name| {
if !valid_table_names.contains(table_name) {
return Err(anyhow::anyhow!(
"Invalid table name '{}'. Expected one of: {:?}",
table_name,
valid_table_names
));
}
Ok(Self::format_table_name(prefix, table_name))
})
.collect()
}
}

#[derive(Clone, Debug, Deserialize, Serialize)]
Expand Down Expand Up @@ -155,9 +163,9 @@ pub struct ParquetDefaultProcessorConfig {
pub max_buffer_size: usize,
#[serde(default = "ParquetDefaultProcessorConfig::default_parquet_upload_interval")]
pub parquet_upload_interval: u64,
// list of table names to backfill. Using HashSet for fast lookups, and for future extensibility.
// Set of table name to backfill. Using HashSet for fast lookups, and for future extensibility.
#[serde(default)]
pub tables: HashSet<String>,
pub backfill_table: HashSet<String>,
}

impl ParquetDefaultProcessorConfig {
Expand Down Expand Up @@ -185,7 +193,10 @@ mod tests {
#[test]
fn test_valid_table_names() {
let config = ProcessorConfig::ParquetDefaultProcessor(ParquetDefaultProcessorConfig {
tables: HashSet::from(["MoveResource".to_string(), "Transaction".to_string()]),
backfill_table: HashSet::from([
"move_resources".to_string(),
"transactions".to_string(),
]),
bucket_name: "bucket_name".to_string(),
bucket_root: "bucket_root".to_string(),
google_application_credentials: None,
Expand All @@ -200,7 +211,7 @@ mod tests {
let table_names = result.unwrap();
let table_names: HashSet<String> = table_names.into_iter().collect();
let expected_names: HashSet<String> =
["Transaction".to_string(), "MoveResource".to_string()]
["transactions".to_string(), "move_resources".to_string()]
.iter()
.map(|e| format!("parquet_default_processor.{}", e))
.collect();
Expand All @@ -210,7 +221,7 @@ mod tests {
#[test]
fn test_invalid_table_name() {
let config = ProcessorConfig::ParquetDefaultProcessor(ParquetDefaultProcessorConfig {
tables: HashSet::from(["InvalidTable".to_string(), "Transaction".to_string()]),
backfill_table: HashSet::from(["InvalidTable".to_string(), "transactions".to_string()]),
bucket_name: "bucket_name".to_string(),
bucket_root: "bucket_root".to_string(),
google_application_credentials: None,
Expand All @@ -230,7 +241,7 @@ mod tests {
#[test]
fn test_empty_tables() {
let config = ProcessorConfig::ParquetDefaultProcessor(ParquetDefaultProcessorConfig {
tables: HashSet::new(),
backfill_table: HashSet::new(),
bucket_name: "bucket_name".to_string(),
bucket_root: "bucket_root".to_string(),
google_application_credentials: None,
Expand All @@ -248,7 +259,7 @@ mod tests {
#[test]
fn test_duplicate_table_names() {
let config = ProcessorConfig::ParquetDefaultProcessor(ParquetDefaultProcessorConfig {
tables: HashSet::from(["Transaction".to_string(), "Transaction".to_string()]),
backfill_table: HashSet::from(["transactions".to_string(), "transactions".to_string()]),
bucket_name: "bucket_name".to_string(),
bucket_root: "bucket_root".to_string(),
google_application_credentials: None,
Expand All @@ -262,14 +273,20 @@ mod tests {

let table_names = result.unwrap();
assert_eq!(table_names, vec![
"parquet_default_processor.Transaction".to_string(),
"parquet_default_processor.transactions".to_string(),
]);
}

#[test]
fn test_all_enum_table_names() {
fn test_all_table_names() {
let config = ProcessorConfig::ParquetDefaultProcessor(ParquetDefaultProcessorConfig {
tables: ParquetTypeEnum::iter().map(|e| e.to_string()).collect(),
backfill_table: HashSet::from([
"move_resources".to_string(),
"transactions".to_string(),
"write_set_changes".to_string(),
"table_items".to_string(),
"move_modules".to_string(),
]),
bucket_name: "bucket_name".to_string(),
bucket_root: "bucket_root".to_string(),
google_application_credentials: None,
Expand All @@ -282,9 +299,16 @@ mod tests {
assert!(result.is_ok());

let table_names = result.unwrap();
let expected_names: HashSet<String> = ParquetTypeEnum::iter()
.map(|e| format!("parquet_default_processor.{}", e))
.collect();
let expected_names: HashSet<String> = [
"move_resources".to_string(),
"transactions".to_string(),
"write_set_changes".to_string(),
"table_items".to_string(),
"move_modules".to_string(),
]
.iter()
.map(|e| format!("parquet_default_processor.{}", e))
.collect();
let table_names: HashSet<String> = table_names.into_iter().collect();
assert_eq!(table_names, expected_names);
}
Expand Down
100 changes: 96 additions & 4 deletions rust/sdk-processor/src/parquet_processors/mod.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,33 @@
use crate::{
config::{db_config::DbConfig, processor_config::ParquetDefaultProcessorConfig},
steps::common::{
gcs_uploader::{create_new_writer, GCSUploader},
parquet_buffer_step::ParquetBufferStep,
},
utils::database::{new_db_pool, ArcDbPool},
};
use aptos_indexer_processor_sdk::utils::errors::ProcessorError;
use processor::db::parquet::models::default_models::{
parquet_move_modules::MoveModule, parquet_move_resources::MoveResource,
parquet_move_tables::TableItem, parquet_transactions::Transaction as ParquetTransaction,
parquet_write_set_changes::WriteSetChangeModel,
use google_cloud_storage::client::{Client as GCSClient, ClientConfig as GcsClientConfig};
use parquet::schema::types::Type;
use processor::{
db::parquet::models::default_models::{
parquet_move_modules::MoveModule, parquet_move_resources::MoveResource,
parquet_move_tables::TableItem, parquet_transactions::Transaction as ParquetTransaction,
parquet_write_set_changes::WriteSetChangeModel,
},
worker::TableFlags,
};
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, sync::Arc, time::Duration};
use strum::{Display, EnumIter};

pub mod parquet_default_processor;

const GOOGLE_APPLICATION_CREDENTIALS: &str = "GOOGLE_APPLICATION_CREDENTIALS";

/// Enum representing the different types of Parquet files that can be processed.
#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Display, EnumIter)]
#[strum(serialize_all = "snake_case")]
#[cfg_attr(
test,
derive(strum::EnumDiscriminants),
Expand Down Expand Up @@ -123,6 +140,81 @@ impl ParquetTypeStructs {
}
}

async fn initialize_gcs_client(credentials: Option<String>) -> Arc<GCSClient> {
if let Some(credentials) = credentials {
std::env::set_var(GOOGLE_APPLICATION_CREDENTIALS, credentials);
}

let gcs_config = GcsClientConfig::default()
.with_auth()
.await
.expect("Failed to create GCS client config");

Arc::new(GCSClient::new(gcs_config))
}

async fn initialize_database_pool(config: &DbConfig) -> anyhow::Result<ArcDbPool> {
match config {
DbConfig::PostgresConfig(ref postgres_config) => {
let conn_pool = new_db_pool(
&postgres_config.connection_string,
Some(postgres_config.db_pool_size),
)
.await
.map_err(|e| {
anyhow::anyhow!(
"Failed to create connection pool for PostgresConfig: {:?}",
e
)
})?;
Ok(conn_pool)
},
}
}

async fn initialize_parquet_buffer_step(
gcs_client: Arc<GCSClient>,
parquet_processor_config: ParquetDefaultProcessorConfig,
parquet_type_to_schemas: HashMap<ParquetTypeEnum, Arc<Type>>,
processor_name: String,
) -> anyhow::Result<ParquetBufferStep<GCSUploader>> {
let parquet_type_to_writer = parquet_type_to_schemas
.iter()
.map(|(key, schema)| {
let writer = create_new_writer(schema.clone()).expect("Failed to create writer");
(*key, writer)
})
.collect();

let buffer_uploader = GCSUploader::new(
gcs_client,
parquet_type_to_schemas,
parquet_type_to_writer,
parquet_processor_config.bucket_name.clone(),
parquet_processor_config.bucket_root.clone(),
processor_name,
)?;

let default_size_buffer_step = ParquetBufferStep::new(
Duration::from_secs(parquet_processor_config.parquet_upload_interval),
buffer_uploader,
parquet_processor_config.max_buffer_size,
);

Ok(default_size_buffer_step)
}

fn set_backfill_table_flag(table_names: Vec<String>) -> TableFlags {
let mut backfill_table = TableFlags::empty();

for table_name in table_names {
if let Some(flag) = TableFlags::from_name(&table_name) {
backfill_table |= flag;
}
}
backfill_table
}

#[cfg(test)]
mod test {
use super::*;
Expand Down
Loading
Loading