Skip to content

Commit

Permalink
use option to replace expect with context and avoid creating two gap …
Browse files Browse the repository at this point in the history
…detectors (#428)

* use option to replace expect with context and avoid creating two gap detectors

* add trait param for gap detector loop

* lint

* - add a new version of standarize_address that takes in slice to avoid encoding beforehand to call this function.
- reduced the duplicated lines of code
- Use context instead of unwraping with unsafe assumptions
- make gap detector trait fn to return Result type without requiring a unnecessary error type

* lint
  • Loading branch information
yuunlimm authored Jul 9, 2024
1 parent 8a6034a commit c6c7305
Show file tree
Hide file tree
Showing 15 changed files with 323 additions and 250 deletions.
23 changes: 9 additions & 14 deletions rust/processor/src/bq_analytics/gcs_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,22 @@ use google_cloud_storage::{
http::objects::upload::{Media, UploadObjectRequest, UploadType},
};
use hyper::Body;
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use tokio::io::AsyncReadExt; // for read_to_end()
use tokio::{
fs::File as TokioFile,
time::{sleep, timeout, Duration},
};
use tracing::{debug, error, info};
const BUCKET_REGULAR_TRAFFIC: &str = "devnet-airflow-continue";
const MAX_RETRIES: usize = 3;
const INITIAL_DELAY_MS: u64 = 500;
const TIMEOUT_SECONDS: u64 = 300;
pub async fn upload_parquet_to_gcs(
client: &GCSClient,
file_path: &PathBuf,
file_path: &Path,
table_name: &str,
bucket_name: &str,
bucket_root: &Path,
) -> Result<(), ParquetProcessorError> {
let mut file = TokioFile::open(&file_path)
.await
Expand Down Expand Up @@ -54,13 +54,8 @@ pub async fn upload_parquet_to_gcs(
let highwater_s = start_of_month.timestamp_millis();
let highwater_ms = now.timestamp_millis();
let counter = 0; // THIS NEED TO BE REPLACED OR REIMPLEMENTED WITH AN ACTUAL LOGIC TO ENSURE FILE UNIQUENESS.
let object_name: PathBuf = generate_parquet_file_path(
BUCKET_REGULAR_TRAFFIC,
table_name,
highwater_s,
highwater_ms,
counter,
);
let object_name: PathBuf =
generate_parquet_file_path(bucket_root, table_name, highwater_s, highwater_ms, counter);

let file_name = object_name.to_str().unwrap().to_owned();
let upload_type: UploadType = UploadType::Simple(Media::new(file_name.clone()));
Expand Down Expand Up @@ -108,14 +103,14 @@ pub async fn upload_parquet_to_gcs(
}

fn generate_parquet_file_path(
gcs_bucket_root: &str,
gcs_bucket_root: &Path,
table: &str,
highwater_s: i64,
highwater_ms: i64,
counter: u32,
) -> PathBuf {
PathBuf::from(format!(
"{}/{}/{}/{}_{}.parquet",
gcs_bucket_root, table, highwater_s, highwater_ms, counter
gcs_bucket_root.join(format!(
"{}/{}/{}_{}.parquet",
table, highwater_s, highwater_ms, counter
))
}
30 changes: 20 additions & 10 deletions rust/processor/src/bq_analytics/generic_parquet_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ use super::ParquetProcessingResult;
use crate::{
bq_analytics::gcs_handler::upload_parquet_to_gcs,
gap_detectors::ProcessingResult,
utils::counters::{PARQUET_HANDLER_BUFFER_SIZE, PARQUET_STRUCT_SIZE},
utils::{
counters::{PARQUET_HANDLER_BUFFER_SIZE, PARQUET_STRUCT_SIZE},
util::naive_datetime_to_timestamp,
},
};
use ahash::AHashMap;
use allocative::Allocative;
Expand All @@ -24,9 +27,6 @@ use uuid::Uuid;
#[derive(Debug, Default, Clone)]
pub struct ParquetDataGeneric<ParquetType> {
pub data: Vec<ParquetType>,
pub first_txn_version: u64,
pub last_txn_version: u64,
pub last_transaction_timestamp: Option<aptos_protos::util::timestamp::Timestamp>,
pub transaction_version_to_struct_count: AHashMap<i64, i64>,
}

Expand All @@ -42,6 +42,10 @@ pub trait HasParquetSchema {
fn schema() -> Arc<parquet::schema::types::Type>;
}

pub trait GetTimeStamp {
fn get_timestamp(&self) -> chrono::NaiveDateTime;
}

/// Auto-implement this for all types that implement `Default` and `RecordWriter`
impl<ParquetType> HasParquetSchema for ParquetType
where
Expand All @@ -56,7 +60,7 @@ where

pub struct ParquetHandler<ParquetType>
where
ParquetType: NamedTable + HasVersion + HasParquetSchema + 'static + Allocative,
ParquetType: NamedTable + NamedTable + HasVersion + HasParquetSchema + 'static + Allocative,
for<'a> &'a [ParquetType]: RecordWriter<ParquetType>,
{
pub schema: Arc<parquet::schema::types::Type>,
Expand All @@ -66,6 +70,7 @@ where

pub transaction_version_to_struct_count: AHashMap<i64, i64>,
pub bucket_name: String,
pub bucket_root: String,
pub gap_detector_sender: kanal::AsyncSender<ProcessingResult>,
pub file_path: String,
}
Expand Down Expand Up @@ -93,7 +98,7 @@ fn create_new_writer(

impl<ParquetType> ParquetHandler<ParquetType>
where
ParquetType: NamedTable + HasVersion + HasParquetSchema + 'static + Allocative,
ParquetType: Allocative + GetTimeStamp + HasVersion + HasParquetSchema + 'static + NamedTable,
for<'a> &'a [ParquetType]: RecordWriter<ParquetType>,
{
fn create_new_writer(&self) -> Result<SerializedFileWriter<File>> {
Expand All @@ -110,8 +115,9 @@ where

pub fn new(
bucket_name: String,
bucket_root: String,
gap_detector_sender: kanal::AsyncSender<ProcessingResult>,
schema: Arc<parquet::schema::types::Type>,
schema: Arc<Type>,
) -> Result<Self> {
// had to append unique id to avoid concurrent write issues
let file_path = format!("{}_{}.parquet", ParquetType::TABLE_NAME, Uuid::new_v4());
Expand All @@ -123,6 +129,7 @@ where
buffer_size_bytes: 0,
transaction_version_to_struct_count: AHashMap::new(),
bucket_name,
bucket_root,
gap_detector_sender,
schema,
file_path,
Expand All @@ -135,7 +142,6 @@ where
changes: ParquetDataGeneric<ParquetType>,
max_buffer_size: usize,
) -> Result<()> {
let last_transaction_timestamp = changes.last_transaction_timestamp;
let parquet_structs = changes.data;
self.transaction_version_to_struct_count
.extend(changes.transaction_version_to_struct_count);
Expand All @@ -152,7 +158,9 @@ where
// for now, it's okay to go little above the buffer_size, given that we will keep max size as 200 MB
if self.buffer_size_bytes >= max_buffer_size {
let start_version = self.buffer.first().unwrap().version();
let end_version = self.buffer.last().unwrap().version();
let last = self.buffer.last().unwrap();
let end_version = last.version();
let last_transaction_timestamp = naive_datetime_to_timestamp(last.get_timestamp());

let txn_version_to_struct_count = process_struct_count_map(
&self.buffer,
Expand Down Expand Up @@ -182,11 +190,13 @@ where
end_version = end_version,
"Max buffer size reached, uploading to GCS."
);
let bucket_root = PathBuf::from(&self.bucket_root);
let upload_result = upload_parquet_to_gcs(
gcs_client,
&new_file_path,
ParquetType::TABLE_NAME,
&self.bucket_name,
&bucket_root,
)
.await;
self.buffer_size_bytes = 0;
Expand All @@ -197,7 +207,7 @@ where
let parquet_processing_result = ParquetProcessingResult {
start_version,
end_version,
last_transaction_timestamp: last_transaction_timestamp.clone(),
last_transaction_timestamp: Some(last_transaction_timestamp),
txn_version_to_struct_count,
};

Expand Down
13 changes: 11 additions & 2 deletions rust/processor/src/bq_analytics/parquet_handler.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
bq_analytics::generic_parquet_processor::{
HasParquetSchema, HasVersion, NamedTable, ParquetDataGeneric,
GetTimeStamp, HasParquetSchema, HasVersion, NamedTable, ParquetDataGeneric,
ParquetHandler as GenericParquetHandler,
},
gap_detectors::ProcessingResult,
Expand All @@ -17,11 +17,19 @@ pub fn create_parquet_handler_loop<ParquetType>(
new_gap_detector_sender: AsyncSender<ProcessingResult>,
processor_name: &str,
bucket_name: String,
bucket_root: String,
parquet_handler_response_channel_size: usize,
max_buffer_size: usize,
) -> AsyncSender<ParquetDataGeneric<ParquetType>>
where
ParquetType: NamedTable + HasVersion + HasParquetSchema + Send + Sync + 'static + Allocative,
ParquetType: NamedTable
+ GetTimeStamp
+ HasVersion
+ HasParquetSchema
+ Send
+ Sync
+ 'static
+ Allocative,
for<'a> &'a [ParquetType]: RecordWriter<ParquetType>,
{
let processor_name = processor_name.to_owned();
Expand All @@ -38,6 +46,7 @@ where

let mut parquet_manager = GenericParquetHandler::new(
bucket_name.clone(),
bucket_root.clone(),
new_gap_detector_sender.clone(),
ParquetType::schema(),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#![allow(clippy::extra_unused_lifetimes)]

use crate::{
bq_analytics::generic_parquet_processor::{HasVersion, NamedTable},
bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable},
utils::util::standardize_address,
};
use allocative_derive::Allocative;
Expand Down Expand Up @@ -45,7 +45,14 @@ impl HasVersion for MoveResource {
}
}

impl GetTimeStamp for MoveResource {
fn get_timestamp(&self) -> chrono::NaiveDateTime {
self.block_timestamp
}
}

pub struct MoveStructTag {
#[allow(dead_code)]
resource_address: String,
pub module: String,
pub fun: String,
Expand All @@ -59,14 +66,15 @@ impl MoveResource {
txn_version: i64,
block_height: i64,
block_timestamp: chrono::NaiveDateTime,
) -> Self {
let parsed_data = Self::convert_move_struct_tag(
write_resource
.r#type
.as_ref()
.expect("MoveStructTag Not Exists."),
);
Self {
) -> Result<Option<Self>> {
let move_struct_tag = match write_resource.r#type.as_ref() {
Some(t) => t,
None => return Ok(None),
};

let parsed_data = convert_move_struct_tag(move_struct_tag);

let move_resource = Self {
txn_version,
block_height,
write_set_change_index,
Expand All @@ -81,7 +89,8 @@ impl MoveResource {
hex::encode(write_resource.state_key_hash.as_slice()).as_str(),
),
block_timestamp,
}
};
Ok(Some(move_resource))
}

pub fn from_delete_resource(
Expand All @@ -90,14 +99,13 @@ impl MoveResource {
txn_version: i64,
block_height: i64,
block_timestamp: chrono::NaiveDateTime,
) -> Self {
let parsed_data = Self::convert_move_struct_tag(
delete_resource
.r#type
.as_ref()
.expect("MoveStructTag Not Exists."),
);
Self {
) -> Result<Option<Self>> {
let move_struct_tag = match delete_resource.r#type.as_ref() {
Some(t) => t,
None => return Ok(None),
};
let parsed_data = convert_move_struct_tag(move_struct_tag);
let move_resource = Self {
txn_version,
block_height,
write_set_change_index,
Expand All @@ -112,42 +120,25 @@ impl MoveResource {
hex::encode(delete_resource.state_key_hash.as_slice()).as_str(),
),
block_timestamp,
}
}

pub fn convert_move_struct_tag(struct_tag: &MoveStructTagPB) -> MoveStructTag {
MoveStructTag {
resource_address: standardize_address(struct_tag.address.as_str()),
module: struct_tag.module.to_string(),
fun: struct_tag.name.to_string(),
generic_type_params: struct_tag
.generic_type_params
.iter()
.map(|move_type| -> Result<Option<String>> {
Ok(Some(
serde_json::to_string(move_type).context("Failed to parse move type")?,
))
})
.collect::<Result<Option<String>>>()
.unwrap_or(None),
}
}

pub fn get_outer_type_from_resource(write_resource: &WriteResource) -> String {
let move_struct_tag =
Self::convert_move_struct_tag(write_resource.r#type.as_ref().unwrap());

format!(
"{}::{}::{}",
move_struct_tag.get_address(),
move_struct_tag.module,
move_struct_tag.fun,
)
};
Ok(Some(move_resource))
}
}

impl MoveStructTag {
pub fn get_address(&self) -> String {
standardize_address(self.resource_address.as_str())
pub fn convert_move_struct_tag(struct_tag: &MoveStructTagPB) -> MoveStructTag {
MoveStructTag {
resource_address: standardize_address(struct_tag.address.as_str()),
module: struct_tag.module.to_string(),
fun: struct_tag.name.to_string(),
generic_type_params: struct_tag
.generic_type_params
.iter()
.map(|move_type| -> Result<Option<String>> {
Ok(Some(
serde_json::to_string(move_type).context("Failed to parse move type")?,
))
})
.collect::<Result<Option<String>>>()
.unwrap_or(None),
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#![allow(clippy::extra_unused_lifetimes)]

use crate::{
bq_analytics::generic_parquet_processor::{HasVersion, NamedTable},
bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable},
utils::util::{hash_str, standardize_address},
};
use allocative_derive::Allocative;
Expand Down Expand Up @@ -38,6 +38,13 @@ impl HasVersion for TableItem {
self.txn_version
}
}

impl GetTimeStamp for TableItem {
fn get_timestamp(&self) -> chrono::NaiveDateTime {
self.block_timestamp
}
}

#[derive(Clone, Debug, Deserialize, FieldCount, Serialize)]
pub struct CurrentTableItem {
pub table_handle: String,
Expand Down
Loading

0 comments on commit c6c7305

Please sign in to comment.