Skip to content

Commit

Permalink
bump version that fixes pollable async step
Browse files Browse the repository at this point in the history
  • Loading branch information
rtso committed Aug 6, 2024
1 parent a896516 commit af38d2c
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 54 deletions.
18 changes: 6 additions & 12 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ sdk-processor = { path = "sdk-processor" }

ahash = { version = "0.8.7", features = ["serde"] }
anyhow = "1.0.86"
aptos-indexer-processor-sdk = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "5b1075ccdd48cd4e41aaa776b47441167da3bba1" }
aptos-indexer-processor-sdk-server-framework = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "5b1075ccdd48cd4e41aaa776b47441167da3bba1" }
aptos-indexer-processor-sdk = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "205019b45c500952c4963f16485c44470f682b65" }
aptos-indexer-processor-sdk-server-framework = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "205019b45c500952c4963f16485c44470f682b65" }
aptos-protos = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "5c48aee129b5a141be2792ffa3d9bd0a1a61c9cb" }
aptos-system-utils = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "4541add3fd29826ec57f22658ca286d2d6134b93" }
async-trait = "0.1.53"
Expand Down
2 changes: 1 addition & 1 deletion rust/sdk-processor/src/processors/events_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use anyhow::Result;
use aptos_indexer_processor_sdk::{
aptos_indexer_transaction_stream::{TransactionStream, TransactionStreamConfig},
builder::ProcessorBuilder,
common_steps::TransactionStreamStep,
instrumented_channel::instrumented_bounded_channel,
steps::TransactionStreamStep,
traits::{IntoRunnableStep, RunnableStepWithInputReceiver},
};
use serde::{Deserialize, Serialize};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ use crate::utils::database::{execute_with_better_error, ArcDbPool};
use ahash::AHashMap;
use anyhow::Result;
use aptos_indexer_processor_sdk::{
steps::{pollable_async_step::PollableAsyncRunType, PollableAsyncStep},
traits::{NamedStep, Processable},
traits::{
pollable_async_step::PollableAsyncRunType, NamedStep, PollableAsyncStep, Processable,
},
types::transaction_context::TransactionContext,
utils::{errors::ProcessorError, time::parse_timestamp},
};
Expand Down Expand Up @@ -54,6 +55,40 @@ where
self.next_version = new_prev_batch.end_version + 1;
self.last_success_batch = Some(new_prev_batch);
}

async fn save_processor_status(&mut self) -> Result<(), ProcessorError> {
// Update the processor status
if let Some(last_success_batch) = self.last_success_batch.as_ref() {
let end_timestamp = last_success_batch
.end_transaction_timestamp
.as_ref()
.map(|t| parse_timestamp(t, last_success_batch.end_version as i64))
.map(|t| t.naive_utc());
let status = ProcessorStatus {
processor: self.tracker_name.clone(),
last_success_version: last_success_batch.end_version as i64,
last_transaction_timestamp: end_timestamp,
};
execute_with_better_error(
self.conn_pool.clone(),
diesel::insert_into(processor_status::table)
.values(&status)
.on_conflict(processor_status::processor)
.do_update()
.set((
processor_status::last_success_version
.eq(excluded(processor_status::last_success_version)),
processor_status::last_updated.eq(excluded(processor_status::last_updated)),
processor_status::last_transaction_timestamp
.eq(excluded(processor_status::last_transaction_timestamp)),
)),
Some(" WHERE processor_status.last_success_version <= EXCLUDED.last_success_version "),
).await.map_err(|e| ProcessorError::DBStoreError {
message: format!("Failed to update processor status: {}", e),
})?;
}
Ok(())
}
}

#[async_trait]
Expand All @@ -79,7 +114,7 @@ where
// If there's a gap in the next_version and current_version, save the current_version to seen_versions for
// later processing.
if self.next_version != current_batch.start_version {
tracing::info!(
info!(
expected_next_version = self.next_version,
step = self.name(),
batch_version = current_batch.start_version,
Expand All @@ -95,7 +130,7 @@ where
total_size_in_bytes: current_batch.total_size_in_bytes,
});
} else {
tracing::info!("No gap detected");
info!("No gap detected");
// If the current_batch is the next expected version, update the last success batch
self.update_last_success_batch(TransactionContext {
data: vec![], // No data is needed for tracking. This is to avoid clone.
Expand All @@ -109,12 +144,20 @@ where
// Pass through
Ok(Some(current_batch))
}

async fn cleanup(
&mut self,
) -> Result<Option<Vec<TransactionContext<Self::Output>>>, ProcessorError> {
// If processing or polling ends, save the last successful batch to the database.
self.save_processor_status().await?;
Ok(None)
}
}

#[async_trait]
impl<T: Send + 'static> PollableAsyncStep for LatestVersionProcessedTracker<T>
where
Self: Sized + Send + 'static,
Self: Sized + Send + Sync + 'static,
T: Send + 'static,
{
fn poll_interval(&self) -> std::time::Duration {
Expand All @@ -123,36 +166,7 @@ where

async fn poll(&mut self) -> Result<Option<Vec<TransactionContext<T>>>, ProcessorError> {
// TODO: Add metrics for gap count
// Update the processor status
if let Some(last_success_batch) = self.last_success_batch.as_ref() {
let end_timestamp = last_success_batch
.end_transaction_timestamp
.as_ref()
.map(|t| parse_timestamp(t, last_success_batch.end_version as i64))
.map(|t| t.naive_utc());
let status = ProcessorStatus {
processor: self.tracker_name.clone(),
last_success_version: last_success_batch.end_version as i64,
last_transaction_timestamp: end_timestamp,
};
execute_with_better_error(
self.conn_pool.clone(),
diesel::insert_into(processor_status::table)
.values(&status)
.on_conflict(processor_status::processor)
.do_update()
.set((
processor_status::last_success_version
.eq(excluded(processor_status::last_success_version)),
processor_status::last_updated.eq(excluded(processor_status::last_updated)),
processor_status::last_transaction_timestamp
.eq(excluded(processor_status::last_transaction_timestamp)),
)),
Some(" WHERE processor_status.last_success_version <= EXCLUDED.last_success_version "),
).await.map_err(|e| ProcessorError::DBStoreError {
message: format!("Failed to update processor status: {}", e),
})?;
}
self.save_processor_status().await?;
// Nothing should be returned
Ok(None)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use crate::db::common::models::events_models::events::EventModel;
use aptos_indexer_processor_sdk::{
aptos_protos::transaction::v1::{transaction::TxnData, Transaction},
steps::{async_step::AsyncRunType, AsyncStep},
traits::{NamedStep, Processable},
traits::{async_step::AsyncRunType, AsyncStep, NamedStep, Processable},
types::transaction_context::TransactionContext,
utils::errors::ProcessorError,
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ use crate::{
use ahash::AHashMap;
use anyhow::Result;
use aptos_indexer_processor_sdk::{
steps::{async_step::AsyncRunType, AsyncStep},
traits::{NamedStep, Processable},
traits::{async_step::AsyncRunType, AsyncStep, NamedStep, Processable},
types::transaction_context::TransactionContext,
utils::errors::ProcessorError,
};
Expand Down

0 comments on commit af38d2c

Please sign in to comment.