Skip to content

Commit

Permalink
fanout builder failing with send trait
Browse files Browse the repository at this point in the history
  • Loading branch information
yuunlimm committed Nov 4, 2024
1 parent 461caa2 commit e98b828
Show file tree
Hide file tree
Showing 11 changed files with 683 additions and 16 deletions.
4 changes: 4 additions & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions rust/processor/src/bq_analytics/generic_parquet_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ where
}

pub struct ParquetHandler<ParquetType>
where
where
ParquetType: NamedTable + NamedTable + HasVersion + HasParquetSchema + 'static + Allocative,
for<'a> &'a [ParquetType]: RecordWriter<ParquetType>,
{
{
pub schema: Arc<Type>,
pub writer: SerializedFileWriter<Vec<u8>>,
pub buffer: Vec<ParquetType>,
Expand All @@ -72,7 +72,7 @@ where
pub last_upload_time: Instant,
pub processor_name: String,
}
fn create_new_writer(schema: Arc<Type>) -> Result<SerializedFileWriter<Vec<u8>>> {
pub fn create_new_writer(schema: Arc<Type>) -> Result<SerializedFileWriter<Vec<u8>>> {
let props = WriterProperties::builder()
.set_compression(parquet::basic::Compression::LZ4)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use serde::{Deserialize, Serialize};

#[derive(
Allocative, Clone, Debug, Default, Deserialize, FieldCount, Serialize, ParquetRecordWriter,
)]
)]
pub struct MoveResource {
pub txn_version: i64,
pub write_set_change_index: i64,
Expand Down
6 changes: 6 additions & 0 deletions rust/sdk-processor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ native-tls = { workspace = true }
postgres-native-tls = { workspace = true }
tokio-postgres = { workspace = true }

google-cloud-storage = { workspace = true }
# Parquet support
parquet = { workspace = true }
parquet_derive = { workspace = true }
allocative = { workspace = true }

[features]
libpq = ["diesel/postgres"]
# When using the default features we enable the diesel/postgres feature. We configure
Expand Down
8 changes: 3 additions & 5 deletions rust/sdk-processor/src/config/processor_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,8 @@ pub struct ParquetDefaultProcessorConfig {
pub bucket_name: String,
#[serde(default)]
pub bucket_root: String,
#[serde(
default = "ParquetDefaultProcessorConfig::default_parquet_handler_response_channel_size"
)]
pub parquet_handler_response_channel_size: usize,
#[serde(default = "ParquetDefaultProcessorConfig::default_channel_size")]
pub channel_size: usize,
#[serde(default = "ParquetDefaultProcessorConfig::default_max_buffer_size")]
pub max_buffer_size: usize,
#[serde(default = "ParquetDefaultProcessorConfig::default_parquet_upload_interval")]
Expand All @@ -135,7 +133,7 @@ pub struct ParquetDefaultProcessorConfig {
impl ParquetDefaultProcessorConfig {
/// Make the default very large on purpose so that by default it's not chunked
/// This prevents any unexpected changes in behavior
pub const fn default_parquet_handler_response_channel_size() -> usize {
pub const fn default_channel_size() -> usize {
100_000
}

Expand Down
238 changes: 232 additions & 6 deletions rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;
use std::time::Duration;
use crate::{
config::{
db_config::DbConfig, indexer_processor_config::IndexerProcessorConfig,
Expand All @@ -13,6 +15,38 @@ use anyhow::Context;
use aptos_indexer_processor_sdk::{
aptos_indexer_transaction_stream::TransactionStream, traits::processor_trait::ProcessorTrait,
};
use aptos_indexer_processor_sdk::aptos_indexer_transaction_stream::TransactionStreamConfig;
use aptos_indexer_processor_sdk::builder::ProcessorBuilder;
use aptos_indexer_processor_sdk::common_steps::{DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, TransactionStreamStep, VersionTrackerStep};
use aptos_indexer_processor_sdk::traits::IntoRunnableStep;
use crate::steps::common::get_processor_status_saver;
use google_cloud_storage::{
client::{Client as GCSClient, ClientConfig as GcsClientConfig},
http::Error as StorageError,
};
use tracing::{debug, info};
use processor::db::common::models::default_models::parquet_move_resources::MoveResource;
use crate::steps::parquet_default_processor::timed_size_buffer::TableConfig;
use crate::steps::parquet_default_processor::timed_size_buffer::TimedSizeBufferStep;
use crate::steps::parquet_default_processor::parquet_default_extractor::ParquetDefaultExtractor;
use processor::db::common::models::default_models::parquet_write_set_changes::WriteSetChangeModel;
use processor::db::common::models::default_models::parquet_move_tables::TableItem;
use processor::db::common::models::default_models::parquet_transactions::Transaction as ParquetTransaction;
use processor::db::common::models::default_models::parquet_move_modules::MoveModule;
use aptos_indexer_processor_sdk::test::steps::pass_through_step::PassThroughStep;
use aptos_indexer_processor_sdk::traits::RunnableAsyncStep;
use aptos_indexer_processor_sdk::types::transaction_context::TransactionContext;
use aptos_indexer_processor_sdk::utils::errors::ProcessorError;
use aptos_indexer_processor_sdk::traits::async_step::AsyncStep;
use aptos_indexer_processor_sdk::traits::processable::Processable;
use aptos_indexer_processor_sdk::traits::NamedStep;
use async_trait::async_trait;
use aptos_indexer_processor_sdk::aptos_protos::transaction::v1::Transaction;
use aptos_indexer_processor_sdk::traits::RunnableStepWithInputReceiver;
use aptos_indexer_processor_sdk::instrumented_channel::instrumented_bounded_channel;


const GOOGLE_APPLICATION_CREDENTIALS: &str = "GOOGLE_APPLICATION_CREDENTIALS";

pub struct ParquetDefaultProcessor {
pub config: IndexerProcessorConfig,
Expand Down Expand Up @@ -44,6 +78,14 @@ impl ParquetDefaultProcessor {
}
}

type Input = (
Vec<ParquetTransaction>,
Vec<MoveResource>,
Vec<WriteSetChangeModel>,
Vec<TableItem>,
Vec<MoveModule>,
);

#[async_trait::async_trait]
impl ProcessorTrait for ParquetDefaultProcessor {
fn name(&self) -> &'static str {
Expand All @@ -66,8 +108,8 @@ impl ProcessorTrait for ParquetDefaultProcessor {
let is_backfill = self.config.backfill_config.is_some();

// Query the starting version
let _starting_version = if is_backfill {
get_starting_version(&self.config, self.db_pool.clone()).await?;
let starting_version = if is_backfill {
get_starting_version(&self.config, self.db_pool.clone()).await?
} else {
// Regular mode logic: Fetch the minimum last successful version across all relevant tables
let table_names = self
Expand All @@ -79,7 +121,7 @@ impl ProcessorTrait for ParquetDefaultProcessor {
self.config.processor_config.name()
))?;
get_min_last_success_version_parquet(&self.config, self.db_pool.clone(), table_names)
.await?;
.await?
};

// Check and update the ledger chain id to ensure we're indexing the correct chain
Expand All @@ -89,7 +131,7 @@ impl ProcessorTrait for ParquetDefaultProcessor {
.await?;
check_or_update_chain_id(grpc_chain_id as i64, self.db_pool.clone()).await?;

let _parquet_processor_config = match self.config.processor_config.clone() {
let parquet_processor_config = match self.config.processor_config.clone() {
ProcessorConfig::ParquetDefaultProcessor(parquet_processor_config) => {
parquet_processor_config
},
Expand All @@ -101,7 +143,191 @@ impl ProcessorTrait for ParquetDefaultProcessor {
},
};

// Define processor steps
Ok(())
println!("===============Starting version: {}===============", starting_version);

// Define processor transaction stream config
let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig {
starting_version: Some(starting_version),
..self.config.transaction_stream_config.clone()
})
.await?;
//
// // TODO: look at the config to dynamically set the opt_in_tables, tables
let parquet_default_extractor = ParquetDefaultExtractor {
opt_in_tables: None
// : parquet_processor_config.tables.iter().map(|s| s.to_string()).collect(),
};

let credentials = parquet_processor_config.google_application_credentials.clone();

if let Some(credentials) = credentials {
std::env::set_var(GOOGLE_APPLICATION_CREDENTIALS, credentials);
}

let gcs_config = GcsClientConfig::default()
.with_auth()
.await
.expect("Failed to create GCS client config");

let gcs_client = Arc::new(GCSClient::new(gcs_config));


let move_resource_step = TimedSizeBufferStep::<Input, MoveResource>::new(
Duration::from_secs(parquet_processor_config.parquet_upload_interval),
TableConfig {
table_name: "move_resources".to_string(),
bucket_name: parquet_processor_config.bucket_name.clone(),
bucket_root: parquet_processor_config.bucket_root.clone(),
max_size: parquet_processor_config.max_buffer_size,
},
gcs_client.clone(),
self.name(),
);

// TODO: add other steps later
// let move_module_step = TimedSizeBufferStep::<Input, MoveModule>::new(
// Duration::from_secs(parquet_processor_config.parquet_upload_interval),
// TableConfig {
// table_name: "move_modules".to_string(),
// bucket_name: parquet_processor_config.bucket_name.clone(),
// bucket_root: parquet_processor_config.bucket_root.clone(),
// max_size: parquet_processor_config.max_buffer_size,
// },
// gcs_client.clone(),
// );

let channel_size = parquet_processor_config.channel_size;
// let version_tracker = VersionTrackerStep::new(
// get_processor_status_saver(self.db_pool.clone(), self.config.clone()),
// DEFAULT_UPDATE_PROCESSOR_STATUS_SECS,
// );

/// TODO: Figure out how to make this work
///
let (input_sender, input_receiver) = instrumented_bounded_channel("input", 1);

let input_step = RunnableStepWithInputReceiver::new(
input_receiver,
RunnableAsyncStep::new(PassThroughStep::default()),
);
let mut fanout_builder = ProcessorBuilder::new_with_inputless_first_step(
input_step
// transaction_stream.into_runnable_step(),
).fanout_broadcast(1);

// let builder = builder.connect_to(parquet_default_extractor.into_runnable_step(), channel_size);
// let mut fanout_builder = builder.
// fanout_broadcast(1);

let (first_builder, first_output_receiver) = fanout_builder
.get_processor_builder()
.unwrap()
.connect_to(
RunnableAsyncStep::new(PassThroughStep::new_named("FanoutStep1".to_string())),
channel_size
)
.end_and_return_output_receiver(channel_size);

let test_step = TestStep;
let test_step = RunnableAsyncStep::new(test_step);

let (_, buffer_receiver) = ProcessorBuilder::new_with_fanin_step_with_receivers(
vec![
(first_output_receiver, first_builder.graph),
],
RunnableAsyncStep::new(PassThroughStep::new_named("FaninStep".to_string())),
channel_size,
)
.connect_to(test_step, channel_size)
// .connect_to(version_tracker.into_runnable_step(), channel_size)
.end_and_return_output_receiver(channel_size);

// let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step(
// transaction_stream.into_runnable_step(),
// )
// .connect_to(parquet_default_extractor.into_runnable_step(), channel_size)
// .connect_to(move_resource_step.into_runnable_step(), channel_size)
// .connect_to(version_tracker.into_runnable_step(), channel_size)
// .end_and_return_output_receiver(channel_size);

// (Optional) Parse the results
loop {
match buffer_receiver.recv().await {
Ok(txn_context) => {
debug!(
"Finished processing versions [{:?}, {:?}]",
txn_context.metadata.start_version, txn_context.metadata.end_version,
);
},
Err(e) => {
info!("No more transactions in channel: {:?}", e);
break Ok(());
},
}
}
}
}

pub struct TestStep;

impl AsyncStep for TestStep {}

impl NamedStep for TestStep {
fn name(&self) -> String {
"TestStep".to_string()
}
}

#[async_trait]
impl Processable for TestStep {
type Input = Vec<Transaction>;
type Output = ();
type RunType = ();

async fn process(
&mut self,
item: TransactionContext<Vec<Transaction>>,
) -> Result<Option<TransactionContext<Self::Output>>, ProcessorError> {
println!("processtedprocesstedprocesstedprocesstedprocessted");
Ok(None)
// let processed = item.data.into_iter().map(|i| TestStruct { i }).collect();
// Ok(Some(TransactionContext {
// data: processed,
// metadata: item.metadata,
// }))
}
}

pub trait ExtractResources<ParquetType> {
fn extract(&self) -> Vec<ParquetType>;
}

impl ExtractResources<ParquetTransaction> for Input{
fn extract(&self) -> Vec<ParquetTransaction> {
self.0.clone()
}
}

impl ExtractResources<MoveResource> for Input {
fn extract(&self) -> Vec<MoveResource> {
self.1.clone()
}
}

impl ExtractResources<WriteSetChangeModel> for Input {
fn extract(&self) -> Vec<WriteSetChangeModel> {
self.2.clone()
}
}

impl ExtractResources<TableItem> for Input {
fn extract(&self) -> Vec<TableItem> {
self.3.clone()
}
}

impl ExtractResources<MoveModule> for Input {
fn extract(&self) -> Vec<MoveModule> {
self.4.clone()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use processor::{
processors::default_processor::process_transactions,
worker::TableFlags,
};
pub const MIN_TRANSACTIONS_PER_RAYON_JOB: usize = 64;

pub struct DefaultExtractor
where
Expand Down
1 change: 1 addition & 0 deletions rust/sdk-processor/src/steps/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ pub mod common;
pub mod default_processor;
pub mod events_processor;
pub mod fungible_asset_processor;
pub mod parquet_default_processor;
pub mod stake_processor;
pub mod token_v2_processor;
2 changes: 2 additions & 0 deletions rust/sdk-processor/src/steps/parquet_default_processor/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod parquet_default_extractor;
pub mod timed_size_buffer;
Loading

0 comments on commit e98b828

Please sign in to comment.