Skip to content

Commit

Permalink
Fix parquet default processor and clean up a bit (#480)
Browse files Browse the repository at this point in the history
* - removed table_metadata from parquet, which is a problematic in terms of not being sent to channel [todo: rootcause]
- added fields to logs for easier debugging
- moved parquet periodic upload logic before looping structs.
- made parquet gap detector logic simpler.

* fix events

* fix token_v2 version tracking
  • Loading branch information
yuunlimm authored Aug 5, 2024
1 parent 7069b0c commit 89844fb
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 117 deletions.
34 changes: 20 additions & 14 deletions rust/processor/src/bq_analytics/generic_parquet_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,20 @@ where
) -> Result<()> {
let parquet_structs = changes.data;
let processor_name = self.processor_name.clone();

if self.last_upload_time.elapsed() >= self.upload_interval {
info!(
"Time has elapsed more than {} since last upload for {}",
self.upload_interval.as_secs(),
ParquetType::TABLE_NAME
);
if let Err(e) = self.upload_buffer(gcs_client).await {
error!("Failed to upload buffer: {}", e);
return Err(e);
}
self.last_upload_time = Instant::now();
}

for parquet_struct in parquet_structs {
let size_of_struct = allocative::size_of_unique(&parquet_struct);
PARQUET_STRUCT_SIZE
Expand All @@ -154,19 +168,6 @@ where
}
}

if self.last_upload_time.elapsed() >= self.upload_interval {
info!(
"Time has elapsed more than {} since last upload for {}",
self.upload_interval.as_secs(),
ParquetType::TABLE_NAME
);
if let Err(e) = self.upload_buffer(gcs_client).await {
error!("Failed to upload buffer: {}", e);
return Err(e);
}
self.last_upload_time = Instant::now();
}

PARQUET_HANDLER_CURRENT_BUFFER_SIZE
.with_label_values(&[&self.processor_name, ParquetType::TABLE_NAME])
.set(self.buffer_size_bytes as i64);
Expand Down Expand Up @@ -252,7 +253,12 @@ where
parquet_processed_structs: Some(parquet_processed_transactions),
table_name: ParquetType::TABLE_NAME.to_string(),
};

info!(
table_name = ParquetType::TABLE_NAME,
start_version = start_version,
end_version = end_version,
"Uploaded parquet to GCS and sending result to gap detector."
);
self.gap_detector_sender
.send(ProcessingResult::ParquetProcessingResult(
parquet_processing_result,
Expand Down
5 changes: 0 additions & 5 deletions rust/processor/src/gap_detectors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,6 @@ pub async fn create_gap_detector_status_tracker_loop(
}
},
Ok(ProcessingResult::ParquetProcessingResult(result)) => {
tracing::info!(
processor_name,
service_type = PROCESSOR_SERVICE_TYPE,
"[ParquetGapDetector] received parquet gap detector task",
);
match gap_detector
.process_versions(ProcessingResult::ParquetProcessingResult(result))
{
Expand Down
85 changes: 39 additions & 46 deletions rust/processor/src/gap_detectors/parquet_gap_detector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::gap_detectors::{GapDetectorResult, GapDetectorTrait, ProcessingResult
use ahash::{AHashMap, AHashSet};
use anyhow::Result;
use std::{
cmp::{max, min},
cmp::max,
sync::{Arc, Mutex},
};
use tracing::{debug, info};
Expand Down Expand Up @@ -64,62 +64,55 @@ impl ParquetFileGapDetectorInner {
}
}
self.max_version = max(self.max_version, end_version);

// b/c of the case where file gets uploaded first, we should check if we have to update last_success_version for this processor
self.update_next_version_to_process(
min(self.next_version_to_process, end_version),
"all_table",
);
}

/// This function updates the next version to process based on the current version counters.
/// It will increment the next version to process if the current version is fully processed.
/// It will also remove the version from the version counters if it is fully processed.
/// what it means to be fully processed is that all the structs for that version processed, i.e. count = 0.
/// Note that for tables other than transactions, it won't be always the latest txn version since we update this value with
/// Thus we will keep the latest version_to_process in the db with the min(max version of latest table files per processor)
/// that has been uploaded to GCS. so whenever we restart the processor, it may generate some duplicates rows, and we are okay with that.
/// This function updates the `next_version_to_process` based on the current version counters.
/// It increments the `next_version_to_process` if the current version is fully processed, which means
/// that all the structs for that version have been processed, i.e., `count = 0`.
/// If a version is fully processed, it removes the version from the version counters and adds it to the `seen_versions`.
/// For tables other than transactions, the latest version to process may not always be the most recent transaction version
/// since this value is updated based on the minimum of the maximum versions of the latest table files per processor
/// that have been uploaded to GCS. Therefore, when the processor restarts, some duplicate rows may be generated, which is acceptable.
/// The function also ensures that the current version starts checking from the `next_version_to_process`
/// value stored in the database. While there might be potential performance improvements,
/// the current implementation prioritizes data integrity.
/// The function also handles cases where a version is already processed or where no struct count
/// is found for a version, providing appropriate logging for these scenarios.
pub fn update_next_version_to_process(&mut self, end_version: i64, table_name: &str) {
// this has to start checking with this value all the time, since this is the value that will be stored in the db as well.
// maybe there could be an improvement to be more performant. but priortizing the data integrity as of now.
let mut current_version = self.next_version_to_process;

while current_version <= end_version {
#[allow(clippy::collapsible_else_if)]
if self.version_counters.contains_key(&current_version) {
while let Some(&count) = self.version_counters.get(&current_version) {
if current_version > end_version {
// we shouldn't update further b/c we haven't uploaded the files containing versions after end_version.
break;
}
if count == 0 {
self.version_counters.remove(&current_version);
self.seen_versions.insert(current_version); // seen_version holds the txns version that we have processed already
current_version += 1;
self.next_version_to_process += 1;
} else {
break;
}
}
} else {
if self.seen_versions.contains(&current_version) {
debug!(
"Version {} already processed, skipping and current next_version {} ",
current_version, self.next_version_to_process
);
self.next_version_to_process =
max(self.next_version_to_process, current_version + 1);
// If the current version has a struct count entry
if let Some(&count) = self.version_counters.get(&current_version) {
if count == 0 {
self.version_counters.remove(&current_version);
self.seen_versions.insert(current_version);
self.next_version_to_process += 1;
} else {
// this is the case where we haven't updated the map yet, while the file gets uploaded first. the bigger file size we will have,
// the less chance we will see this as upload takes longer time. And map population is done before the upload.
debug!(
current_version = current_version,
"No struct count found for version. This shouldn't happen b/c we already added default count for this version."
);
// Stop processing if the version is not yet complete
break;
}
} else if self.seen_versions.contains(&current_version) {
// If the version is already seen and processed
debug!(
"Version {} already processed, skipping and current next_version {} ",
current_version, self.next_version_to_process
);
self.next_version_to_process =
max(self.next_version_to_process, current_version + 1);
} else {
// If the version is neither in seen_versions nor version_counters
debug!(
current_version = current_version,
"No struct count found for version. This shouldn't happen b/c we already added default count for this version."
);
}

current_version += 1;
}

debug!(
next_version_to_process = self.next_version_to_process,
table_name = table_name,
Expand Down Expand Up @@ -155,7 +148,8 @@ impl GapDetectorTrait for ParquetFileGapDetectorInner {
info!(
start_version = result.start_version,
end_version = result.end_version,
"Parquet file has been uploaded."
table_name = &result.table_name,
"[Parquet Gap Detector] Processing versions after parquet file upload."
);

for (version, count) in parquet_processed_structs.iter() {
Expand All @@ -166,7 +160,6 @@ impl GapDetectorTrait for ParquetFileGapDetectorInner {
self.version_counters.insert(*version, -count);
}
}

self.update_next_version_to_process(result.end_version, &result.table_name);

Ok(GapDetectorResult::ParquetFileGapDetectorResult(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
db::common::models::default_models::{
parquet_move_modules::MoveModule,
parquet_move_resources::MoveResource,
parquet_move_tables::{TableItem, TableMetadata},
parquet_move_tables::TableItem,
parquet_transactions::{Transaction as ParquetTransaction, TransactionModel},
parquet_write_set_changes::{WriteSetChangeDetail, WriteSetChangeModel},
},
Expand Down Expand Up @@ -51,7 +51,6 @@ pub struct ParquetDefaultProcessor {
wsc_sender: AsyncSender<ParquetDataGeneric<WriteSetChangeModel>>,
table_item_sender: AsyncSender<ParquetDataGeneric<TableItem>>,
move_module_sender: AsyncSender<ParquetDataGeneric<MoveModule>>,
table_metadata_sender: AsyncSender<ParquetDataGeneric<TableMetadata>>,
}

// TODO: Since each table item has different size allocated, the pace of being backfilled to PQ varies a lot.
Expand Down Expand Up @@ -113,24 +112,13 @@ impl ParquetDefaultProcessor {
config.parquet_upload_interval_in_secs(),
);

let table_metadata_sender = create_parquet_handler_loop::<TableMetadata>(
new_gap_detector_sender.clone(),
ProcessorName::ParquetDefaultProcessor.into(),
config.bucket_name.clone(),
config.bucket_root.clone(),
config.parquet_handler_response_channel_size,
config.max_buffer_size,
config.parquet_upload_interval_in_secs(),
);

Self {
connection_pool,
transaction_sender,
move_resource_sender,
wsc_sender,
table_item_sender,
move_module_sender,
table_metadata_sender,
}
}
}
Expand All @@ -139,13 +127,12 @@ impl Debug for ParquetDefaultProcessor {
fn fmt(&self, f: &mut Formatter<'_>) -> Result {
write!(
f,
"ParquetProcessor {{ capacity of trnasactions channel: {:?}, capacity of move resource channel: {:?}, capacity of wsc channel: {:?}, capacity of table items channel: {:?}, capacity of move_module channel: {:?}, capacity of table_metadata channel: {:?} }}",
"ParquetProcessor {{ capacity of trnasactions channel: {:?}, capacity of move resource channel: {:?}, capacity of wsc channel: {:?}, capacity of table items channel: {:?}, capacity of move_module channel: {:?}}}",
&self.transaction_sender.capacity(),
&self.move_resource_sender.capacity(),
&self.wsc_sender.capacity(),
&self.table_item_sender.capacity(),
&self.move_module_sender.capacity(),
&self.table_metadata_sender.capacity(),
)
}
}
Expand All @@ -166,14 +153,7 @@ impl ProcessorTrait for ParquetDefaultProcessor {
let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone();

let (
(
move_resources,
write_set_changes,
transactions,
table_items,
move_modules,
table_metadata,
),
(move_resources, write_set_changes, transactions, table_items, move_modules),
transaction_version_to_struct_count,
) = tokio::task::spawn_blocking(move || process_transactions(transactions))
.await
Expand Down Expand Up @@ -216,15 +196,6 @@ impl ProcessorTrait for ParquetDefaultProcessor {
.await
.map_err(|e| anyhow!("Failed to send to parquet manager: {}", e))?;

let tm_parquet_data = ParquetDataGeneric {
data: table_metadata,
};

self.table_metadata_sender
.send(tm_parquet_data)
.await
.map_err(|e| anyhow!("Failed to send to parquet manager: {}", e))?;

Ok(ProcessingResult::ParquetProcessingResult(
ParquetProcessingResult {
start_version: start_version as i64,
Expand All @@ -251,7 +222,6 @@ pub fn process_transactions(
Vec<TransactionModel>,
Vec<TableItem>,
Vec<MoveModule>,
Vec<TableMetadata>,
),
AHashMap<i64, i64>,
) {
Expand All @@ -265,7 +235,6 @@ pub fn process_transactions(
let mut move_modules = vec![];
let mut move_resources = vec![];
let mut table_items = vec![];
let mut table_metadata: AHashMap<String, TableMetadata> = AHashMap::new();

for detail in wsc_details {
match detail {
Expand All @@ -280,38 +249,28 @@ pub fn process_transactions(
WriteSetChangeDetail::Resource(resource) => {
transaction_version_to_struct_count
.entry(resource.txn_version)
.and_modify(|e| *e += 1);
.and_modify(|e| *e += 1)
.or_insert(1);
move_resources.push(resource);
},
WriteSetChangeDetail::Table(item, _current_item, metadata) => {
WriteSetChangeDetail::Table(item, _current_item, _) => {
let txn_version = item.txn_version;
transaction_version_to_struct_count
.entry(txn_version)
.and_modify(|e| *e += 1);
.and_modify(|e| *e += 1)
.or_insert(1);
table_items.push(item);

if let Some(meta) = metadata {
table_metadata.insert(meta.handle.clone(), meta);
transaction_version_to_struct_count
.entry(txn_version)
.and_modify(|e| *e += 1);
}
},
}
}

let mut table_metadata = table_metadata.into_values().collect::<Vec<TableMetadata>>();
// Sort by PK
table_metadata.sort_by(|a, b| a.handle.cmp(&b.handle));

(
(
move_resources,
write_set_changes,
txns,
table_items,
move_modules,
table_metadata,
),
transaction_version_to_struct_count,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,9 @@ impl ProcessorTrait for ParquetEventsProcessor {
);
transaction_version_to_struct_count
.entry(txn_version)
.and_modify(|e| *e += txn_events.len() as i64);
.and_modify(|e| *e += txn_events.len() as i64)
.or_insert(txn_events.len() as i64);

events.extend(txn_events);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ impl Debug for ParquetFungibleAssetProcessor {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"ParquetFungibleAssetProcessor {{ capacity of tsi channel: {:?}, capacity of es channel: {:?}}}",
"ParquetFungibleAssetProcessor {{ capacity of coin_supply channel: {:?}, capacity of fungible_asset_balances channel: {:?}}}",
&self.coin_supply_sender.capacity(),
&self.fungible_asset_balances_sender.capacity(),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ async fn parse_v2_token(
transaction_version_to_struct_count
.entry(txn_version)
.and_modify(|e| *e += ownerships.len() as i64 + 1)
.or_insert(1);
.or_insert(ownerships.len() as i64 + 1);
token_ownerships_v2.append(&mut ownerships);
token_datas_v2.push(token_data);
}
Expand Down

0 comments on commit 89844fb

Please sign in to comment.