Skip to content

Commit

Permalink
Add Parquet Events Processor (#451)
Browse files Browse the repository at this point in the history
* temp

* remove logs

* add more metrics

* Add Parquet Events Processor

* add validator txn handling logi

* use context

* change variable name

* rebase

* add event_version for events v2 future proofing

* add block_timestamp to events

* fix conflicts

* handle validator txn event to have default size of 0

* rebase

* lint

* add logs when event size and event size info size don't match
  • Loading branch information
yuunlimm authored Jul 25, 2024
1 parent b11e05a commit 7c65e51
Show file tree
Hide file tree
Showing 10 changed files with 319 additions and 27 deletions.
3 changes: 3 additions & 0 deletions rust/processor/src/db/common/models/events_models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,6 @@
// SPDX-License-Identifier: Apache-2.0

pub mod events;

// parquet model
pub mod parquet_events;
118 changes: 118 additions & 0 deletions rust/processor/src/db/common/models/events_models/parquet_events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

#![allow(clippy::extra_unused_lifetimes)]

use crate::{
bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable},
utils::util::{standardize_address, truncate_str},
};
use allocative_derive::Allocative;
use aptos_protos::transaction::v1::{Event as EventPB, EventSizeInfo};
use itertools::Itertools;
use parquet_derive::ParquetRecordWriter;
use serde::{Deserialize, Serialize};

// p99 currently is 303 so using 300 as a safe max length
const EVENT_TYPE_MAX_LENGTH: usize = 300;

#[derive(Allocative, Clone, Debug, Default, Deserialize, ParquetRecordWriter, Serialize)]
pub struct Event {
pub txn_version: i64,
pub account_address: String,
pub sequence_number: i64,
pub creation_number: i64,
pub block_height: i64,
pub event_type: String,
pub data: String,
pub event_index: i64,
pub indexed_type: String,
pub type_tag_bytes: i64,
pub total_bytes: i64,
pub event_version: i8,
#[allocative(skip)]
pub block_timestamp: chrono::NaiveDateTime,
}

impl NamedTable for Event {
const TABLE_NAME: &'static str = "events";
}

impl HasVersion for Event {
fn version(&self) -> i64 {
self.txn_version
}
}

impl GetTimeStamp for Event {
fn get_timestamp(&self) -> chrono::NaiveDateTime {
self.block_timestamp
}
}

impl Event {
pub fn from_event(
event: &EventPB,
txn_version: i64,
block_height: i64,
event_index: i64,
size_info: &EventSizeInfo,
block_timestamp: chrono::NaiveDateTime,
) -> Self {
let event_type: &str = event.type_str.as_ref();
Event {
account_address: standardize_address(
event.key.as_ref().unwrap().account_address.as_str(),
),
creation_number: event.key.as_ref().unwrap().creation_number as i64,
sequence_number: event.sequence_number as i64,
txn_version,
block_height,
event_type: event_type.to_string(),
data: event.data.clone(),
event_index,
indexed_type: truncate_str(event_type, EVENT_TYPE_MAX_LENGTH),
type_tag_bytes: size_info.type_tag_bytes as i64,
total_bytes: size_info.total_bytes as i64,
event_version: 1i8, // this is for future proofing. TODO: change when events v2 comes
block_timestamp,
}
}

pub fn from_events(
events: &[EventPB],
txn_version: i64,
block_height: i64,
event_size_info: &[EventSizeInfo],
block_timestamp: chrono::NaiveDateTime,
) -> Vec<Self> {
// Ensure that lengths match, otherwise log and panic to investigate
if events.len() != event_size_info.len() {
tracing::error!(
events_len = events.len(),
event_size_info_len = event_size_info.len(),
txn_version,
"Length mismatch: events size does not match event_size_info size.",
);
panic!("Length mismatch: events len does not match event_size_info len");
}

events
.iter()
.zip_eq(event_size_info.iter())
.enumerate()
.map(|(index, (event, size_info))| {
Self::from_event(
event,
txn_version,
block_height,
index as i64,
size_info,
block_timestamp,
)
})
.collect::<Vec<ParquetEventModel>>()
}
}

pub type ParquetEventModel = Event;
5 changes: 5 additions & 0 deletions rust/processor/src/processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use crate::{
processors::parquet_processors::{
parquet_ans_processor::{ParquetAnsProcessor, ParquetAnsProcessorConfig},
parquet_default_processor::{ParquetDefaultProcessor, ParquetDefaultProcessorConfig},
parquet_events_processor::{ParquetEventsProcessor, ParquetEventsProcessorConfig},
parquet_fungible_asset_processor::{
ParquetFungibleAssetProcessor, ParquetFungibleAssetProcessorConfig,
},
Expand Down Expand Up @@ -200,6 +201,7 @@ pub enum ProcessorConfig {
ParquetFungibleAssetProcessor(ParquetFungibleAssetProcessorConfig),
ParquetTransactionMetadataProcessor(ParquetTransactionMetadataProcessorConfig),
ParquetAnsProcessor(ParquetAnsProcessorConfig),
ParquetEventsProcessor(ParquetEventsProcessorConfig),
}

impl ProcessorConfig {
Expand All @@ -216,6 +218,7 @@ impl ProcessorConfig {
| ProcessorConfig::ParquetFungibleAssetProcessor(_)
| ProcessorConfig::ParquetTransactionMetadataProcessor(_)
| ProcessorConfig::ParquetAnsProcessor(_)
| ProcessorConfig::ParquetEventsProcessor(_)
)
}
}
Expand Down Expand Up @@ -250,10 +253,12 @@ pub enum Processor {
TokenV2Processor,
TransactionMetadataProcessor,
UserTransactionProcessor,
// Parquet processors
ParquetDefaultProcessor,
ParquetFungibleAssetProcessor,
ParquetTransactionMetadataProcessor,
ParquetAnsProcessor,
ParquetEventsProcessor,
}

#[cfg(test)]
Expand Down
9 changes: 8 additions & 1 deletion rust/processor/src/processors/parquet_processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,18 @@ use std::time::Duration;

pub mod parquet_ans_processor;
pub mod parquet_default_processor;
pub mod parquet_events_processor;
pub mod parquet_fungible_asset_processor;
pub mod parquet_transaction_metadata_processor;

pub const GOOGLE_APPLICATION_CREDENTIALS: &str = "GOOGLE_APPLICATION_CREDENTIALS";

pub trait UploadIntervalConfig {
pub trait ParquetProcessorTrait {
fn parquet_upload_interval_in_secs(&self) -> Duration;

fn set_google_credentials(&self, credentials: Option<String>) {
if let Some(credentials) = credentials {
std::env::set_var(GOOGLE_APPLICATION_CREDENTIALS, credentials);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use super::{UploadIntervalConfig, GOOGLE_APPLICATION_CREDENTIALS};
use super::ParquetProcessorTrait;
use crate::{
bq_analytics::{
create_parquet_handler_loop, generic_parquet_processor::ParquetDataGeneric,
Expand Down Expand Up @@ -40,7 +40,7 @@ pub struct ParquetAnsProcessorConfig {
pub parquet_upload_interval: u64,
}

impl UploadIntervalConfig for ParquetAnsProcessorConfig {
impl ParquetProcessorTrait for ParquetAnsProcessorConfig {
fn parquet_upload_interval_in_secs(&self) -> Duration {
Duration::from_secs(self.parquet_upload_interval)
}
Expand All @@ -58,9 +58,7 @@ impl ParquetAnsProcessor {
config: ParquetAnsProcessorConfig,
new_gap_detector_sender: AsyncSender<ProcessingResult>,
) -> Self {
if let Some(credentials) = config.google_application_credentials.clone() {
std::env::set_var(GOOGLE_APPLICATION_CREDENTIALS, credentials);
}
config.set_google_credentials(config.google_application_credentials.clone());

let ans_primary_name_v2_sender = create_parquet_handler_loop::<AnsPrimaryNameV2>(
new_gap_detector_sender.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@ use crate::{
parquet_write_set_changes::{WriteSetChangeDetail, WriteSetChangeModel},
},
gap_detectors::ProcessingResult,
processors::{
parquet_processors::{UploadIntervalConfig, GOOGLE_APPLICATION_CREDENTIALS},
ProcessorName, ProcessorTrait,
},
processors::{parquet_processors::ParquetProcessorTrait, ProcessorName, ProcessorTrait},
utils::database::ArcDbPool,
};
use ahash::AHashMap;
Expand All @@ -42,7 +39,7 @@ pub struct ParquetDefaultProcessorConfig {
pub parquet_upload_interval: u64,
}

impl UploadIntervalConfig for ParquetDefaultProcessorConfig {
impl ParquetProcessorTrait for ParquetDefaultProcessorConfig {
fn parquet_upload_interval_in_secs(&self) -> Duration {
Duration::from_secs(self.parquet_upload_interval)
}
Expand All @@ -66,9 +63,7 @@ impl ParquetDefaultProcessor {
config: ParquetDefaultProcessorConfig,
new_gap_detector_sender: AsyncSender<ProcessingResult>,
) -> Self {
if let Some(credentials) = config.google_application_credentials.clone() {
std::env::set_var(GOOGLE_APPLICATION_CREDENTIALS, credentials);
}
config.set_google_credentials(config.google_application_credentials.clone());

let transaction_sender = create_parquet_handler_loop::<ParquetTransaction>(
new_gap_detector_sender.clone(),
Expand Down
Loading

0 comments on commit 7c65e51

Please sign in to comment.