Skip to content

Commit

Permalink
update the parquet gap detector result to use better name and fix off…
Browse files Browse the repository at this point in the history
…set issue to start with correction version (#489)
  • Loading branch information
yuunlimm authored Aug 23, 2024
1 parent 8a095c6 commit 42ef2d9
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 6 deletions.
6 changes: 3 additions & 3 deletions rust/processor/src/gap_detectors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ pub async fn create_gap_detector_status_tracker_loop(
if res.num_gaps >= gap_detection_batch_size {
tracing::warn!(
processor_name,
gap_start_version = res.next_version_to_process,
gap_start_version = res.last_success_version,
num_gaps = res.num_gaps,
"[Parser] Processed batches with a gap",
);
Expand All @@ -149,13 +149,13 @@ pub async fn create_gap_detector_status_tracker_loop(
>= UPDATE_PROCESSOR_STATUS_SECS
{
tracing::info!(
last_processed_version = res.next_version_to_process,
last_processed_version = res.last_success_version,
processor_name,
"Updating last processed version"
);
processor
.update_last_processed_version(
res.next_version_to_process,
res.last_success_version,
res.last_transaction_timestamp,
)
.await
Expand Down
6 changes: 3 additions & 3 deletions rust/processor/src/gap_detectors/parquet_gap_detector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub struct ParquetFileGapDetectorInner {
}

pub struct ParquetFileGapDetectorResult {
pub next_version_to_process: u64,
pub last_success_version: u64,
pub num_gaps: u64,
pub last_transaction_timestamp: Option<aptos_protos::util::timestamp::Timestamp>,
}
Expand Down Expand Up @@ -138,7 +138,7 @@ impl GapDetectorTrait for ParquetFileGapDetectorInner {
self.update_next_version_to_process(self.max_version, &result.table_name);
return Ok(GapDetectorResult::ParquetFileGapDetectorResult(
ParquetFileGapDetectorResult {
next_version_to_process: self.next_version_to_process as u64,
last_success_version: self.next_version_to_process as u64 - 1,
num_gaps: (self.max_version - self.next_version_to_process) as u64,
last_transaction_timestamp: result.last_transaction_timestamp,
},
Expand All @@ -164,7 +164,7 @@ impl GapDetectorTrait for ParquetFileGapDetectorInner {

Ok(GapDetectorResult::ParquetFileGapDetectorResult(
ParquetFileGapDetectorResult {
next_version_to_process: self.next_version_to_process as u64,
last_success_version: self.next_version_to_process as u64 - 1,
num_gaps: (self.max_version - self.next_version_to_process) as u64,
last_transaction_timestamp: result.last_transaction_timestamp,
},
Expand Down

0 comments on commit 42ef2d9

Please sign in to comment.