Skip to content

Commit

Permalink
fix conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
yuunlimm committed Nov 15, 2024
1 parent 823215e commit 44418f2
Show file tree
Hide file tree
Showing 9 changed files with 46 additions and 204 deletions.
7 changes: 3 additions & 4 deletions rust/processor/src/bq_analytics/generic_parquet_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ where
}

pub struct ParquetHandler<ParquetType>
where
where
ParquetType: NamedTable + NamedTable + HasVersion + HasParquetSchema + 'static + Allocative,
for<'a> &'a [ParquetType]: RecordWriter<ParquetType>,
{
{
pub schema: Arc<Type>,
pub writer: SerializedFileWriter<Vec<u8>>,
pub buffer: Vec<ParquetType>,
Expand All @@ -84,8 +84,7 @@ pub fn create_new_writer(schema: Arc<Type>) -> Result<SerializedFileWriter<Vec<u

impl<ParquetType> ParquetHandler<ParquetType>
where
ParquetType: Allocative +
GetTimeStamp + HasVersion + HasParquetSchema + 'static + NamedTable,
ParquetType: Allocative + GetTimeStamp + HasVersion + HasParquetSchema + 'static + NamedTable,
for<'a> &'a [ParquetType]: RecordWriter<ParquetType>,
{
fn create_new_writer(&self) -> Result<SerializedFileWriter<Vec<u8>>> {
Expand Down
54 changes: 35 additions & 19 deletions rust/sdk-processor/src/config/processor_config.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
use crate::{
parquet_processors::ParquetTypeEnum,
processors::{
ans_processor::AnsProcessorConfig, objects_processor::ObjectsProcessorConfig,
stake_processor::StakeProcessorConfig, token_v2_processor::TokenV2ProcessorConfig,
},
use crate::processors::{
ans_processor::AnsProcessorConfig, objects_processor::ObjectsProcessorConfig,
stake_processor::StakeProcessorConfig, token_v2_processor::TokenV2ProcessorConfig,
};
use ahash::AHashMap;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use strum::IntoEnumIterator;

/// This enum captures the configs for all the different processors that are defined.
///
Expand Down Expand Up @@ -74,13 +70,20 @@ impl ProcessorConfig {
// Get the processor name as a prefix
let prefix = self.name();
// Collect valid table names from `ParquetTypeEnum` into a set for quick lookup
let valid_table_names: HashSet<String> =
ParquetTypeEnum::iter().map(|e| e.to_string()).collect();
let valid_table_names = HashSet::from([
"move_resources".to_string(),
"transactions".to_string(),
"write_set_changes".to_string(),
"table_items".to_string(),
"move_modules".to_string(),
]);

// Validate and map table names with prefix
let mut validated_table_names = Vec::new();
for table_name in &config.tables {
// Ensure the table name is a valid `ParquetTypeEnum` variant
println!("table_name: {}", table_name);
println!("valid_table_names: {:?}", valid_table_names);
if !valid_table_names.contains(table_name) {
return Err(anyhow::anyhow!(
"Invalid table name '{}'. Expected one of: {:?}",
Expand Down Expand Up @@ -184,7 +187,7 @@ mod tests {
#[test]
fn test_valid_table_names() {
let config = ProcessorConfig::ParquetDefaultProcessor(ParquetDefaultProcessorConfig {
tables: HashSet::from(["MoveResource".to_string(), "Transaction".to_string()]),
tables: HashSet::from(["move_resources".to_string(), "transactions".to_string()]),
bucket_name: "bucket_name".to_string(),
bucket_root: "bucket_root".to_string(),
google_application_credentials: None,
Expand All @@ -199,7 +202,7 @@ mod tests {
let table_names = result.unwrap();
let table_names: HashSet<String> = table_names.into_iter().collect();
let expected_names: HashSet<String> =
["Transaction".to_string(), "MoveResource".to_string()]
["transactions".to_string(), "move_resources".to_string()]
.iter()
.map(|e| format!("parquet_default_processor.{}", e))
.collect();
Expand All @@ -209,7 +212,7 @@ mod tests {
#[test]
fn test_invalid_table_name() {
let config = ProcessorConfig::ParquetDefaultProcessor(ParquetDefaultProcessorConfig {
tables: HashSet::from(["InvalidTable".to_string(), "Transaction".to_string()]),
tables: HashSet::from(["InvalidTable".to_string(), "transactions".to_string()]),
bucket_name: "bucket_name".to_string(),
bucket_root: "bucket_root".to_string(),
google_application_credentials: None,
Expand Down Expand Up @@ -247,7 +250,7 @@ mod tests {
#[test]
fn test_duplicate_table_names() {
let config = ProcessorConfig::ParquetDefaultProcessor(ParquetDefaultProcessorConfig {
tables: HashSet::from(["Transaction".to_string(), "Transaction".to_string()]),
tables: HashSet::from(["transactions".to_string(), "transactions".to_string()]),
bucket_name: "bucket_name".to_string(),
bucket_root: "bucket_root".to_string(),
google_application_credentials: None,
Expand All @@ -261,14 +264,20 @@ mod tests {

let table_names = result.unwrap();
assert_eq!(table_names, vec![
"parquet_default_processor.Transaction".to_string(),
"parquet_default_processor.transactions".to_string(),
]);
}

#[test]
fn test_all_enum_table_names() {
fn test_all_table_names() {
let config = ProcessorConfig::ParquetDefaultProcessor(ParquetDefaultProcessorConfig {
tables: ParquetTypeEnum::iter().map(|e| e.to_string()).collect(),
tables: HashSet::from([
"move_resources".to_string(),
"transactions".to_string(),
"write_set_changes".to_string(),
"table_items".to_string(),
"move_modules".to_string(),
]),
bucket_name: "bucket_name".to_string(),
bucket_root: "bucket_root".to_string(),
google_application_credentials: None,
Expand All @@ -281,9 +290,16 @@ mod tests {
assert!(result.is_ok());

let table_names = result.unwrap();
let expected_names: HashSet<String> = ParquetTypeEnum::iter()
.map(|e| format!("parquet_default_processor.{}", e))
.collect();
let expected_names: HashSet<String> = [
"move_resources".to_string(),
"transactions".to_string(),
"write_set_changes".to_string(),
"table_items".to_string(),
"move_modules".to_string(),
]
.iter()
.map(|e| format!("parquet_default_processor.{}", e))
.collect();
let table_names: HashSet<String> = table_names.into_iter().collect();
assert_eq!(table_names, expected_names);
}
Expand Down
6 changes: 3 additions & 3 deletions rust/sdk-processor/src/parquet_processors/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::{
config::{db_config::DbConfig, processor_config::ParquetDefaultProcessorConfig},
steps::{
common::gcs_uploader::{create_new_writer, GCSUploader},
parquet_default_processor::size_buffer::ParquetBufferStep,
steps::common::{
gcs_uploader::{create_new_writer, GCSUploader},
parquet_buffer_step::ParquetBufferStep,
},
utils::database::{new_db_pool, ArcDbPool},
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,11 @@ use crate::{
ParquetTypeEnum,
},
steps::{
common::{processor_status_saver::get_parquet_processor_status_saver,
parquet_buffer_step::ParquetBufferStep,
},
parquet_default_processor::{
parquet_default_extractor::ParquetDefaultExtractor,
common::{
parquet_version_tracker_step::ParquetVersionTrackerStep,
processor_status_saver::get_parquet_processor_status_saver,
},
parquet_default_processor::parquet_default_extractor::ParquetDefaultExtractor,
},
utils::{
chain_id::check_or_update_chain_id,
Expand Down
1 change: 1 addition & 0 deletions rust/sdk-processor/src/steps/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod gcs_uploader;
pub mod parquet_buffer_step;
pub mod parquet_version_tracker_step;
pub mod processor_status_saver;

pub use processor_status_saver::get_processor_status_saver;
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
backfill_processor_status::{BackfillProcessorStatus, BackfillStatus},
processor_status::ProcessorStatus,
},
steps::parquet_default_processor::parquet_version_tracker_step::ParquetProcessorStatusSaver,
steps::common::parquet_version_tracker_step::ParquetProcessorStatusSaver,
utils::database::{execute_with_better_error, ArcDbPool},
};
use anyhow::Result;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
pub mod parquet_default_extractor;
pub mod parquet_version_tracker_step;

0 comments on commit 44418f2

Please sign in to comment.