Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use option to replace expect with context and avoid creating two gap detectors #428

Merged
merged 5 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 9 additions & 14 deletions rust/processor/src/bq_analytics/gcs_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,22 @@ use google_cloud_storage::{
http::objects::upload::{Media, UploadObjectRequest, UploadType},
};
use hyper::Body;
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use tokio::io::AsyncReadExt; // for read_to_end()
use tokio::{
fs::File as TokioFile,
time::{sleep, timeout, Duration},
};
use tracing::{debug, error, info};
const BUCKET_REGULAR_TRAFFIC: &str = "devnet-airflow-continue";
yuunlimm marked this conversation as resolved.
Show resolved Hide resolved
const MAX_RETRIES: usize = 3;
const INITIAL_DELAY_MS: u64 = 500;
const TIMEOUT_SECONDS: u64 = 300;
pub async fn upload_parquet_to_gcs(
client: &GCSClient,
file_path: &PathBuf,
file_path: &Path,
table_name: &str,
bucket_name: &str,
bucket_root: &Path,
) -> Result<(), ParquetProcessorError> {
let mut file = TokioFile::open(&file_path)
.await
Expand Down Expand Up @@ -54,13 +54,8 @@ pub async fn upload_parquet_to_gcs(
let highwater_s = start_of_month.timestamp_millis();
let highwater_ms = now.timestamp_millis();
let counter = 0; // THIS NEED TO BE REPLACED OR REIMPLEMENTED WITH AN ACTUAL LOGIC TO ENSURE FILE UNIQUENESS.
let object_name: PathBuf = generate_parquet_file_path(
BUCKET_REGULAR_TRAFFIC,
table_name,
highwater_s,
highwater_ms,
counter,
);
let object_name: PathBuf =
generate_parquet_file_path(bucket_root, table_name, highwater_s, highwater_ms, counter);

let file_name = object_name.to_str().unwrap().to_owned();
let upload_type: UploadType = UploadType::Simple(Media::new(file_name.clone()));
Expand Down Expand Up @@ -108,14 +103,14 @@ pub async fn upload_parquet_to_gcs(
}

fn generate_parquet_file_path(
gcs_bucket_root: &str,
gcs_bucket_root: &Path,
table: &str,
highwater_s: i64,
highwater_ms: i64,
counter: u32,
) -> PathBuf {
PathBuf::from(format!(
"{}/{}/{}/{}_{}.parquet",
gcs_bucket_root, table, highwater_s, highwater_ms, counter
gcs_bucket_root.join(format!(
"{}/{}/{}_{}.parquet",
table, highwater_s, highwater_ms, counter
))
}
30 changes: 20 additions & 10 deletions rust/processor/src/bq_analytics/generic_parquet_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ use super::ParquetProcessingResult;
use crate::{
bq_analytics::gcs_handler::upload_parquet_to_gcs,
gap_detectors::ProcessingResult,
utils::counters::{PARQUET_HANDLER_BUFFER_SIZE, PARQUET_STRUCT_SIZE},
utils::{
counters::{PARQUET_HANDLER_BUFFER_SIZE, PARQUET_STRUCT_SIZE},
util::naive_datetime_to_timestamp,
},
};
use ahash::AHashMap;
use allocative::Allocative;
Expand All @@ -24,9 +27,6 @@ use uuid::Uuid;
#[derive(Debug, Default, Clone)]
pub struct ParquetDataGeneric<ParquetType> {
pub data: Vec<ParquetType>,
pub first_txn_version: u64,
pub last_txn_version: u64,
pub last_transaction_timestamp: Option<aptos_protos::util::timestamp::Timestamp>,
pub transaction_version_to_struct_count: AHashMap<i64, i64>,
}

Expand All @@ -42,6 +42,10 @@ pub trait HasParquetSchema {
fn schema() -> Arc<parquet::schema::types::Type>;
}

pub trait GetTimeStamp {
fn get_timestamp(&self) -> chrono::NaiveDateTime;
}

/// Auto-implement this for all types that implement `Default` and `RecordWriter`
impl<ParquetType> HasParquetSchema for ParquetType
where
Expand All @@ -56,7 +60,7 @@ where

pub struct ParquetHandler<ParquetType>
where
ParquetType: NamedTable + HasVersion + HasParquetSchema + 'static + Allocative,
ParquetType: NamedTable + NamedTable + HasVersion + HasParquetSchema + 'static + Allocative,
for<'a> &'a [ParquetType]: RecordWriter<ParquetType>,
{
pub schema: Arc<parquet::schema::types::Type>,
Expand All @@ -66,6 +70,7 @@ where

pub transaction_version_to_struct_count: AHashMap<i64, i64>,
pub bucket_name: String,
pub bucket_root: String,
pub gap_detector_sender: kanal::AsyncSender<ProcessingResult>,
pub file_path: String,
}
Expand Down Expand Up @@ -93,7 +98,7 @@ fn create_new_writer(

impl<ParquetType> ParquetHandler<ParquetType>
where
ParquetType: NamedTable + HasVersion + HasParquetSchema + 'static + Allocative,
ParquetType: Allocative + GetTimeStamp + HasVersion + HasParquetSchema + 'static + NamedTable,
for<'a> &'a [ParquetType]: RecordWriter<ParquetType>,
{
fn create_new_writer(&self) -> Result<SerializedFileWriter<File>> {
Expand All @@ -110,8 +115,9 @@ where

pub fn new(
bucket_name: String,
bucket_root: String,
gap_detector_sender: kanal::AsyncSender<ProcessingResult>,
schema: Arc<parquet::schema::types::Type>,
schema: Arc<Type>,
) -> Result<Self> {
// had to append unique id to avoid concurrent write issues
let file_path = format!("{}_{}.parquet", ParquetType::TABLE_NAME, Uuid::new_v4());
Expand All @@ -123,6 +129,7 @@ where
buffer_size_bytes: 0,
transaction_version_to_struct_count: AHashMap::new(),
bucket_name,
bucket_root,
gap_detector_sender,
schema,
file_path,
Expand All @@ -135,7 +142,6 @@ where
changes: ParquetDataGeneric<ParquetType>,
max_buffer_size: usize,
) -> Result<()> {
let last_transaction_timestamp = changes.last_transaction_timestamp;
let parquet_structs = changes.data;
self.transaction_version_to_struct_count
.extend(changes.transaction_version_to_struct_count);
Expand All @@ -152,7 +158,9 @@ where
// for now, it's okay to go little above the buffer_size, given that we will keep max size as 200 MB
if self.buffer_size_bytes >= max_buffer_size {
let start_version = self.buffer.first().unwrap().version();
let end_version = self.buffer.last().unwrap().version();
let last = self.buffer.last().unwrap();
let end_version = last.version();
let last_transaction_timestamp = naive_datetime_to_timestamp(last.get_timestamp());

let txn_version_to_struct_count = process_struct_count_map(
&self.buffer,
Expand Down Expand Up @@ -182,11 +190,13 @@ where
end_version = end_version,
"Max buffer size reached, uploading to GCS."
);
let bucket_root = PathBuf::from(&self.bucket_root);
let upload_result = upload_parquet_to_gcs(
gcs_client,
&new_file_path,
ParquetType::TABLE_NAME,
&self.bucket_name,
&bucket_root,
)
.await;
self.buffer_size_bytes = 0;
Expand All @@ -197,7 +207,7 @@ where
let parquet_processing_result = ParquetProcessingResult {
start_version,
end_version,
last_transaction_timestamp: last_transaction_timestamp.clone(),
last_transaction_timestamp: Some(last_transaction_timestamp),
txn_version_to_struct_count,
};

Expand Down
13 changes: 11 additions & 2 deletions rust/processor/src/bq_analytics/parquet_handler.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
bq_analytics::generic_parquet_processor::{
HasParquetSchema, HasVersion, NamedTable, ParquetDataGeneric,
GetTimeStamp, HasParquetSchema, HasVersion, NamedTable, ParquetDataGeneric,
ParquetHandler as GenericParquetHandler,
},
gap_detectors::ProcessingResult,
Expand All @@ -17,11 +17,19 @@ pub fn create_parquet_handler_loop<ParquetType>(
new_gap_detector_sender: AsyncSender<ProcessingResult>,
processor_name: &str,
bucket_name: String,
bucket_root: String,
parquet_handler_response_channel_size: usize,
max_buffer_size: usize,
) -> AsyncSender<ParquetDataGeneric<ParquetType>>
where
ParquetType: NamedTable + HasVersion + HasParquetSchema + Send + Sync + 'static + Allocative,
ParquetType: NamedTable
+ GetTimeStamp
+ HasVersion
+ HasParquetSchema
+ Send
+ Sync
+ 'static
+ Allocative,
for<'a> &'a [ParquetType]: RecordWriter<ParquetType>,
{
let processor_name = processor_name.to_owned();
Expand All @@ -38,6 +46,7 @@ where

let mut parquet_manager = GenericParquetHandler::new(
bucket_name.clone(),
bucket_root.clone(),
new_gap_detector_sender.clone(),
ParquetType::schema(),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#![allow(clippy::extra_unused_lifetimes)]

use crate::{
bq_analytics::generic_parquet_processor::{HasVersion, NamedTable},
bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable},
utils::util::standardize_address,
};
use allocative_derive::Allocative;
Expand Down Expand Up @@ -45,7 +45,14 @@ impl HasVersion for MoveResource {
}
}

impl GetTimeStamp for MoveResource {
fn get_timestamp(&self) -> chrono::NaiveDateTime {
self.block_timestamp
}
}

pub struct MoveStructTag {
#[allow(dead_code)]
resource_address: String,
pub module: String,
pub fun: String,
Expand All @@ -59,14 +66,15 @@ impl MoveResource {
txn_version: i64,
block_height: i64,
block_timestamp: chrono::NaiveDateTime,
) -> Self {
let parsed_data = Self::convert_move_struct_tag(
write_resource
.r#type
.as_ref()
.expect("MoveStructTag Not Exists."),
);
Self {
) -> Result<Option<Self>> {
let move_struct_tag = match write_resource.r#type.as_ref() {
Some(t) => t,
None => return Ok(None),
};

let parsed_data = convert_move_struct_tag(move_struct_tag);

let move_resource = Self {
txn_version,
block_height,
write_set_change_index,
Expand All @@ -81,7 +89,8 @@ impl MoveResource {
hex::encode(write_resource.state_key_hash.as_slice()).as_str(),
),
block_timestamp,
}
};
Ok(Some(move_resource))
}

pub fn from_delete_resource(
Expand All @@ -90,14 +99,13 @@ impl MoveResource {
txn_version: i64,
block_height: i64,
block_timestamp: chrono::NaiveDateTime,
) -> Self {
let parsed_data = Self::convert_move_struct_tag(
delete_resource
.r#type
.as_ref()
.expect("MoveStructTag Not Exists."),
);
Self {
) -> Result<Option<Self>> {
let move_struct_tag = match delete_resource.r#type.as_ref() {
Some(t) => t,
None => return Ok(None),
};
let parsed_data = convert_move_struct_tag(move_struct_tag);
let move_resource = Self {
txn_version,
block_height,
write_set_change_index,
Expand All @@ -112,42 +120,25 @@ impl MoveResource {
hex::encode(delete_resource.state_key_hash.as_slice()).as_str(),
),
block_timestamp,
}
}

pub fn convert_move_struct_tag(struct_tag: &MoveStructTagPB) -> MoveStructTag {
MoveStructTag {
resource_address: standardize_address(struct_tag.address.as_str()),
module: struct_tag.module.to_string(),
fun: struct_tag.name.to_string(),
generic_type_params: struct_tag
.generic_type_params
.iter()
.map(|move_type| -> Result<Option<String>> {
Ok(Some(
serde_json::to_string(move_type).context("Failed to parse move type")?,
))
})
.collect::<Result<Option<String>>>()
.unwrap_or(None),
}
}

pub fn get_outer_type_from_resource(write_resource: &WriteResource) -> String {
let move_struct_tag =
Self::convert_move_struct_tag(write_resource.r#type.as_ref().unwrap());

format!(
"{}::{}::{}",
move_struct_tag.get_address(),
move_struct_tag.module,
move_struct_tag.fun,
)
};
Ok(Some(move_resource))
}
}

impl MoveStructTag {
pub fn get_address(&self) -> String {
standardize_address(self.resource_address.as_str())
pub fn convert_move_struct_tag(struct_tag: &MoveStructTagPB) -> MoveStructTag {
MoveStructTag {
resource_address: standardize_address(struct_tag.address.as_str()),
module: struct_tag.module.to_string(),
fun: struct_tag.name.to_string(),
generic_type_params: struct_tag
.generic_type_params
.iter()
.map(|move_type| -> Result<Option<String>> {
Ok(Some(
serde_json::to_string(move_type).context("Failed to parse move type")?,
))
})
.collect::<Result<Option<String>>>()
.unwrap_or(None),
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#![allow(clippy::extra_unused_lifetimes)]

use crate::{
bq_analytics::generic_parquet_processor::{HasVersion, NamedTable},
bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable},
utils::util::{hash_str, standardize_address},
};
use allocative_derive::Allocative;
Expand Down Expand Up @@ -38,6 +38,13 @@ impl HasVersion for TableItem {
self.txn_version
}
}

impl GetTimeStamp for TableItem {
fn get_timestamp(&self) -> chrono::NaiveDateTime {
self.block_timestamp
}
}

#[derive(Clone, Debug, Deserialize, FieldCount, Serialize)]
pub struct CurrentTableItem {
pub table_handle: String,
Expand Down
Loading
Loading