Skip to content

Commit

Permalink
add parquet txn metadata processor that handles write set size info (#…
Browse files Browse the repository at this point in the history
…461)

* temp

* remove logs

* add more metrics

* add parquet txn metadata processor that handles write set size info

* rebase

* add block_timestamp to write_set_size_info table
  • Loading branch information
yuunlimm authored Jul 25, 2024
1 parent a3a16af commit fe207f5
Show file tree
Hide file tree
Showing 8 changed files with 237 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,6 @@
pub mod event_size_info;
pub mod transaction_size_info;
pub mod write_set_size_info;

// parquet models
pub mod parquet_write_set_size_info;
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

#![allow(clippy::extra_unused_lifetimes)]

use crate::bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable};
use allocative_derive::Allocative;
use aptos_protos::transaction::v1::WriteOpSizeInfo;
use field_count::FieldCount;
use parquet_derive::ParquetRecordWriter;
use serde::{Deserialize, Serialize};

#[derive(
Allocative, Clone, Debug, Default, Deserialize, FieldCount, ParquetRecordWriter, Serialize,
)]
pub struct WriteSetSize {
pub txn_version: i64,
pub change_index: i64,
pub key_bytes: i64,
pub value_bytes: i64,
pub total_bytes: i64,
#[allocative(skip)]
pub block_timestamp: chrono::NaiveDateTime,
}

impl NamedTable for WriteSetSize {
const TABLE_NAME: &'static str = "write_set_size";
}

impl HasVersion for WriteSetSize {
fn version(&self) -> i64 {
self.txn_version
}
}

impl GetTimeStamp for WriteSetSize {
fn get_timestamp(&self) -> chrono::NaiveDateTime {
self.block_timestamp
}
}

impl WriteSetSize {
pub fn from_transaction_info(
info: &WriteOpSizeInfo,
txn_version: i64,
change_index: i64,
block_timestamp: chrono::NaiveDateTime,
) -> Self {
WriteSetSize {
txn_version,
change_index,
key_bytes: info.key_bytes as i64,
value_bytes: info.value_bytes as i64,
total_bytes: info.key_bytes as i64 + info.value_bytes as i64,
block_timestamp,
}
}
}
6 changes: 6 additions & 0 deletions rust/processor/src/processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ use crate::{
parquet_fungible_asset_processor::{
ParquetFungibleAssetProcessor, ParquetFungibleAssetProcessorConfig,
},
parquet_transaction_metadata_processor::{
ParquetTransactionMetadataProcessor, ParquetTransactionMetadataProcessorConfig,
},
},
schema::processor_status,
utils::{
Expand Down Expand Up @@ -194,6 +197,7 @@ pub enum ProcessorConfig {
UserTransactionProcessor,
ParquetDefaultProcessor(ParquetDefaultProcessorConfig),
ParquetFungibleAssetProcessor(ParquetFungibleAssetProcessorConfig),
ParquetTransactionMetadataProcessor(ParquetTransactionMetadataProcessorConfig),
}

impl ProcessorConfig {
Expand All @@ -208,6 +212,7 @@ impl ProcessorConfig {
self,
ProcessorConfig::ParquetDefaultProcessor(_)
| ProcessorConfig::ParquetFungibleAssetProcessor(_)
| ProcessorConfig::ParquetTransactionMetadataProcessor(_)
)
}
}
Expand Down Expand Up @@ -244,6 +249,7 @@ pub enum Processor {
UserTransactionProcessor,
ParquetDefaultProcessor,
ParquetFungibleAssetProcessor,
ParquetTransactionMetadataProcessor,
}

#[cfg(test)]
Expand Down
1 change: 1 addition & 0 deletions rust/processor/src/processors/parquet_processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::time::Duration;
pub mod parquet_default_processor;

pub mod parquet_fungible_asset_processor;
pub mod parquet_transaction_metadata_processor;

pub const GOOGLE_APPLICATION_CREDENTIALS: &str = "GOOGLE_APPLICATION_CREDENTIALS";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub struct ParquetDefaultProcessorConfig {
pub max_buffer_size: usize,
pub parquet_upload_interval: u64,
}

impl UploadIntervalConfig for ParquetDefaultProcessorConfig {
fn parquet_upload_interval_in_secs(&self) -> Duration {
Duration::from_secs(self.parquet_upload_interval)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::{
bq_analytics::{
create_parquet_handler_loop, generic_parquet_processor::ParquetDataGeneric,
ParquetProcessingResult,
},
db::common::models::transaction_metadata_model::parquet_write_set_size_info::WriteSetSize,
gap_detectors::ProcessingResult,
processors::{
parquet_processors::{UploadIntervalConfig, GOOGLE_APPLICATION_CREDENTIALS},
ProcessorName, ProcessorTrait,
},
utils::{database::ArcDbPool, util::parse_timestamp},
};
use ahash::AHashMap;
use anyhow::Context;
use aptos_protos::transaction::v1::Transaction;
use async_trait::async_trait;
use kanal::AsyncSender;
use serde::{Deserialize, Serialize};
use std::{fmt::Debug, time::Duration};
use tracing::warn;

#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct ParquetTransactionMetadataProcessorConfig {
pub google_application_credentials: Option<String>,
pub bucket_name: String,
pub bucket_root: String,
pub parquet_handler_response_channel_size: usize,
pub max_buffer_size: usize,
pub parquet_upload_interval: u64,
}

impl UploadIntervalConfig for ParquetTransactionMetadataProcessorConfig {
fn parquet_upload_interval_in_secs(&self) -> Duration {
Duration::from_secs(self.parquet_upload_interval)
}
}

pub struct ParquetTransactionMetadataProcessor {
connection_pool: ArcDbPool,
write_set_size_info_sender: AsyncSender<ParquetDataGeneric<WriteSetSize>>,
}

impl ParquetTransactionMetadataProcessor {
pub fn new(
connection_pool: ArcDbPool,
config: ParquetTransactionMetadataProcessorConfig,
new_gap_detector_sender: AsyncSender<ProcessingResult>,
) -> Self {
if let Some(credentials) = config.google_application_credentials.clone() {
std::env::set_var(GOOGLE_APPLICATION_CREDENTIALS, credentials);
}

let write_set_size_info_sender = create_parquet_handler_loop::<WriteSetSize>(
new_gap_detector_sender.clone(),
ProcessorName::ParquetTransactionMetadataProcessor.into(),
config.bucket_name.clone(),
config.bucket_root.clone(),
config.parquet_handler_response_channel_size,
config.max_buffer_size,
config.parquet_upload_interval_in_secs(),
);
Self {
connection_pool,
write_set_size_info_sender,
}
}
}

impl Debug for ParquetTransactionMetadataProcessor {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"ParquetTransactionMetadataProcessor {{ capacity of write set size info channel: {:?} }}",
self.write_set_size_info_sender.capacity(),
)
}
}

#[async_trait]
impl ProcessorTrait for ParquetTransactionMetadataProcessor {
fn name(&self) -> &'static str {
ProcessorName::ParquetTransactionMetadataProcessor.into()
}

async fn process_transactions(
&self,
transactions: Vec<Transaction>,
start_version: u64,
end_version: u64,
_: Option<u64>,
) -> anyhow::Result<ProcessingResult> {
let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone();
let mut transaction_version_to_struct_count: AHashMap<i64, i64> = AHashMap::new();

let mut write_set_sizes = vec![];

for txn in &transactions {
let txn_version = txn.version as i64;
let block_timestamp = parse_timestamp(txn.timestamp.as_ref().unwrap(), txn_version);
let size_info = match txn.size_info.as_ref() {
Some(size_info) => size_info,
None => {
warn!(version = txn.version, "Transaction size info not found");
continue;
},
};
for (index, write_set_size_info) in size_info.write_op_size_info.iter().enumerate() {
write_set_sizes.push(WriteSetSize::from_transaction_info(
write_set_size_info,
txn_version,
index as i64,
block_timestamp,
));
transaction_version_to_struct_count
.entry(txn_version)
.and_modify(|e| *e += 1)
.or_insert(1);
}
}

let write_set_size_info_parquet_data = ParquetDataGeneric {
data: write_set_sizes,
};

self.write_set_size_info_sender
.send(write_set_size_info_parquet_data)
.await
.context("Error sending write set size info to parquet handler")?;

Ok(ProcessingResult::ParquetProcessingResult(
ParquetProcessingResult {
start_version: start_version as i64,
end_version: end_version as i64,
last_transaction_timestamp: last_transaction_timestamp.clone(),
txn_version_to_struct_count: Some(transaction_version_to_struct_count),
parquet_processed_structs: None,
table_name: "".to_string(),
},
))
}

fn connection_pool(&self) -> &ArcDbPool {
&self.connection_pool
}
}
10 changes: 10 additions & 0 deletions rust/processor/src/utils/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,3 +293,13 @@ pub static PARQUET_BUFFER_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
)
.unwrap()
});

/// Size of parquet buffer after upload
pub static PARQUET_BUFFER_SIZE_AFTER_UPLOAD: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"indexer_parquet_size_after_upload",
"Size of Parquet buffer after upload",
&["parquet_type"]
)
.unwrap()
});
8 changes: 8 additions & 0 deletions rust/processor/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::{
parquet_processors::{
parquet_default_processor::ParquetDefaultProcessor,
parquet_fungible_asset_processor::ParquetFungibleAssetProcessor,
parquet_transaction_metadata_processor::ParquetTransactionMetadataProcessor,
},
stake_processor::StakeProcessor,
token_v2_processor::TokenV2Processor,
Expand Down Expand Up @@ -959,5 +960,12 @@ pub fn build_processor(
gap_detector_sender.expect("Parquet processor requires a gap detector sender"),
))
},
ProcessorConfig::ParquetTransactionMetadataProcessor(config) => {
Processor::from(ParquetTransactionMetadataProcessor::new(
db_pool,
config.clone(),
gap_detector_sender.expect("Parquet processor requires a gap detector sender"),
))
},
}
}

0 comments on commit fe207f5

Please sign in to comment.