Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[parquet-sdk-migration] add a logic to determine the starting version for parquet processor #587

Merged
merged 3 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 150 additions & 13 deletions rust/sdk-processor/src/config/processor_config.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
use crate::processors::{
ans_processor::AnsProcessorConfig, objects_processor::ObjectsProcessorConfig,
stake_processor::StakeProcessorConfig, token_v2_processor::TokenV2ProcessorConfig,
use crate::{
parquet_processors::ParquetTypeEnum,
processors::{
ans_processor::AnsProcessorConfig, objects_processor::ObjectsProcessorConfig,
stake_processor::StakeProcessorConfig, token_v2_processor::TokenV2ProcessorConfig,
},
};
use ahash::AHashMap;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use strum::IntoEnumIterator;

/// This enum captures the configs for all the different processors that are defined.
///
/// The configs for each processor should only contain configuration specific to that
Expand Down Expand Up @@ -63,23 +68,45 @@ impl ProcessorConfig {
///
/// This is a convenience method to map the table names to include the processor name as a prefix, which
/// is useful for querying the status from the processor status table in the database.
pub fn get_table_names(&self) -> Option<Vec<String>> {
pub fn get_table_names(&self) -> anyhow::Result<Vec<String>> {
match self {
ProcessorConfig::ParquetDefaultProcessor(config) => {
// Get the processor name as a prefix
let prefix = self.name();
// Use the tables from the config and map them to include the prefix
Some(
config
.tables
.iter()
.map(|table_name| format!("{}_{}", prefix, table_name))
.collect(),
)

// Collect valid table names from `ParquetTypeEnum` into a set for quick lookup
let valid_table_names: HashSet<String> =
ParquetTypeEnum::iter().map(|e| e.to_string()).collect();

// Validate and map table names with prefix
let mut validated_table_names = Vec::new();
for table_name in &config.tables {
// Ensure the table name is a valid `ParquetTypeEnum` variant
if !valid_table_names.contains(table_name) {
return Err(anyhow::anyhow!(
"Invalid table name '{}'. Expected one of: {:?}",
table_name,
valid_table_names
));
}

// Append the prefix to the validated table name
validated_table_names.push(Self::format_table_name(prefix, table_name));
}

Ok(validated_table_names)
},
_ => None, // For all other processor types, return None
_ => Err(anyhow::anyhow!(
"Invalid parquet processor config: {:?}",
self
)),
}
}

/// helper function to format the table name with the processor name.
fn format_table_name(prefix: &str, table_name: &str) -> String {
format!("{}.{}", prefix, table_name)
}
}

#[derive(Clone, Debug, Deserialize, Serialize)]
Expand Down Expand Up @@ -152,3 +179,113 @@ impl ParquetDefaultProcessorConfig {
1800 // 30 minutes
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_valid_table_names() {
let config = ProcessorConfig::ParquetDefaultProcessor(ParquetDefaultProcessorConfig {
tables: HashSet::from(["MoveResource".to_string(), "Transaction".to_string()]),
bucket_name: "bucket_name".to_string(),
bucket_root: "bucket_root".to_string(),
google_application_credentials: None,
parquet_handler_response_channel_size: 10,
max_buffer_size: 100000,
parquet_upload_interval: 1800,
});

let result = config.get_table_names();
assert!(result.is_ok());

let table_names = result.unwrap();

assert_eq!(table_names, vec![
"parquet_default_processor.Transaction".to_string(),
"parquet_default_processor.MoveResource".to_string(),
]);
}

#[test]
fn test_invalid_table_name() {
let config = ProcessorConfig::ParquetDefaultProcessor(ParquetDefaultProcessorConfig {
tables: HashSet::from(["InvalidTable".to_string(), "Transaction".to_string()]),
bucket_name: "bucket_name".to_string(),
bucket_root: "bucket_root".to_string(),
google_application_credentials: None,
parquet_handler_response_channel_size: 10,
max_buffer_size: 100000,
parquet_upload_interval: 1800,
});

let result = config.get_table_names();
assert!(result.is_err());

let error_message = result.unwrap_err().to_string();
assert!(error_message.contains("Invalid table name 'InvalidTable'"));
assert!(error_message.contains("Expected one of:"));
}

#[test]
fn test_empty_tables() {
let config = ProcessorConfig::ParquetDefaultProcessor(ParquetDefaultProcessorConfig {
tables: HashSet::new(),
bucket_name: "bucket_name".to_string(),
bucket_root: "bucket_root".to_string(),
google_application_credentials: None,
parquet_handler_response_channel_size: 10,
max_buffer_size: 100000,
parquet_upload_interval: 1800,
});
let result = config.get_table_names();
assert!(result.is_ok());

let table_names = result.unwrap();
assert_eq!(table_names, Vec::<String>::new());
}

#[test]
fn test_duplicate_table_names() {
let config = ProcessorConfig::ParquetDefaultProcessor(ParquetDefaultProcessorConfig {
tables: HashSet::from(["Transaction".to_string(), "Transaction".to_string()]),
bucket_name: "bucket_name".to_string(),
bucket_root: "bucket_root".to_string(),
google_application_credentials: None,
parquet_handler_response_channel_size: 10,
max_buffer_size: 100000,
parquet_upload_interval: 1800,
});

let result = config.get_table_names();
assert!(result.is_ok());

let table_names = result.unwrap();
assert_eq!(table_names, vec![
"parquet_default_processor.Transaction".to_string(),
]);
}

#[test]
fn test_all_enum_table_names() {
let config = ProcessorConfig::ParquetDefaultProcessor(ParquetDefaultProcessorConfig {
tables: ParquetTypeEnum::iter().map(|e| e.to_string()).collect(),
bucket_name: "bucket_name".to_string(),
bucket_root: "bucket_root".to_string(),
google_application_credentials: None,
parquet_handler_response_channel_size: 10,
max_buffer_size: 100000,
parquet_upload_interval: 1800,
});

let result = config.get_table_names();
assert!(result.is_ok());

let table_names = result.unwrap();
let expected_names: HashSet<String> = ParquetTypeEnum::iter()
.map(|e| format!("parquet_default_processor.{}", e))
.collect();
let table_names: HashSet<String> = table_names.into_iter().collect();
assert_eq!(table_names, expected_names);
}
}
86 changes: 86 additions & 0 deletions rust/sdk-processor/src/parquet_processors/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,87 @@
use processor::db::common::models::default_models::{
parquet_move_modules::MoveModule, parquet_move_resources::MoveResource,
parquet_move_tables::TableItem, parquet_transactions::Transaction as ParquetTransaction,
parquet_write_set_changes::WriteSetChangeModel,
};
use serde::{Deserialize, Serialize};
use strum::{Display, EnumIter};

pub mod parquet_default_processor;

/// Enum representing the different types of Parquet files that can be processed.
#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Display, EnumIter)]
#[cfg_attr(
test,
derive(strum::EnumDiscriminants),
strum_discriminants(
derive(
strum::EnumVariantNames,
Deserialize,
Serialize,
strum::IntoStaticStr,
strum::Display,
clap::ValueEnum
),
name(ParquetTypeName),
strum(serialize_all = "snake_case")
)
)]
pub enum ParquetTypeEnum {
MoveResource,
WriteSetChange,
Transaction,
TableItem,
MoveModule,
}

#[derive(Clone, Debug, strum::EnumDiscriminants)]
#[strum(serialize_all = "snake_case")]
#[strum_discriminants(
derive(
Deserialize,
Serialize,
strum::EnumVariantNames,
strum::IntoStaticStr,
strum::Display,
clap::ValueEnum
),
name(ParquetTypeStructName),
clap(rename_all = "snake_case"),
serde(rename_all = "snake_case"),
strum(serialize_all = "snake_case")
)]
pub enum ParquetTypeStructs {
MoveResource(Vec<MoveResource>),
WriteSetChange(Vec<WriteSetChangeModel>),
Transaction(Vec<ParquetTransaction>),
TableItem(Vec<TableItem>),
MoveModule(Vec<MoveModule>),
}

impl ParquetTypeStructs {
pub fn default_for_type(parquet_type: &ParquetTypeEnum) -> Self {
match parquet_type {
ParquetTypeEnum::MoveResource => ParquetTypeStructs::MoveResource(Vec::new()),
ParquetTypeEnum::WriteSetChange => ParquetTypeStructs::WriteSetChange(Vec::new()),
ParquetTypeEnum::Transaction => ParquetTypeStructs::Transaction(Vec::new()),
ParquetTypeEnum::TableItem => ParquetTypeStructs::TableItem(Vec::new()),
ParquetTypeEnum::MoveModule => ParquetTypeStructs::MoveModule(Vec::new()),
}
}
}

#[cfg(test)]
mod test {
use super::*;
use strum::VariantNames;

/// This test exists to make sure that when a new processor is added, it is added
/// to both Processor and ProcessorConfig.
///
/// To make sure this passes, make sure the variants are in the same order
/// (lexicographical) and the names match.
#[test]
fn test_parquet_type_names_complete() {
assert_eq!(ParquetTypeStructName::VARIANTS, ParquetTypeName::VARIANTS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,8 @@ impl ProcessorTrait for ParquetDefaultProcessor {
.config
.processor_config
.get_table_names()
.context(format!(
"Failed to get table names for the processor {}",
self.config.processor_config.name()
))?;
.context("Failed to get table names for the processor")?;

get_min_last_success_version_parquet(&self.config, self.db_pool.clone(), table_names)
.await?;
};
Expand Down
Loading