Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EI-444] Migrate events processor to sdk #470

Merged
merged 17 commits into from
Aug 14, 2024
902 changes: 830 additions & 72 deletions rust/Cargo.lock

Large diffs are not rendered by default.

19 changes: 16 additions & 3 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
[workspace]
resolver = "2"

members = ["indexer-metrics", "moving-average", "processor", "server-framework"]
members = [
"indexer-metrics",
"moving-average",
"processor",
"sdk-processor",
"server-framework",
]

[workspace.package]
authors = ["Aptos Labs <[email protected]>"]
Expand All @@ -16,9 +22,12 @@ rust-version = "1.75"
processor = { path = "processor" }
server-framework = { path = "server-framework" }
aptos-moving-average = { path = "moving-average" }
sdk-processor = { path = "sdk-processor" }

ahash = { version = "0.8.7", features = ["serde"] }
anyhow = "1.0.62"
anyhow = "1.0.86"
aptos-indexer-processor-sdk = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "e1e1bdd9349f0a68c9fc53b7e2cebda9e2ce92b7" }
aptos-indexer-processor-sdk-server-framework = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "e1e1bdd9349f0a68c9fc53b7e2cebda9e2ce92b7" }
aptos-protos = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "5c48aee129b5a141be2792ffa3d9bd0a1a61c9cb" }
aptos-system-utils = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "4541add3fd29826ec57f22658ca286d2d6134b93" }
async-trait = "0.1.53"
Expand Down Expand Up @@ -73,6 +82,7 @@ pbjson = "0.5.1"
prometheus = { version = "0.13.0", default-features = false }
prost = { version = "0.12.3", features = ["no-recursion-limit"] }
prost-types = "0.12.3"
rayon = "1.10.0"
regex = "1.5.5"
reqwest = { version = "0.11.20", features = [
"blocking",
Expand Down Expand Up @@ -111,7 +121,10 @@ postgres-native-tls = "0.5.0"
tokio-postgres = "0.7.10"

# Parquet support
parquet = { version = "52.0.0", default-features = false, features = ["async", "lz4"] }
parquet = { version = "52.0.0", default-features = false, features = [
"async",
"lz4",
] }
num = "0.4.0"
google-cloud-storage = "0.13.0"
hyper = { version = "0.14.18", features = ["full"] }
Expand Down
3 changes: 3 additions & 0 deletions rust/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ RUN cargo build --locked --release -p processor
RUN cp target/release/processor /usr/local/bin
RUN cargo build --locked --release -p indexer-metrics
RUN cp target/release/indexer-metrics /usr/local/bin
RUN cargo build --locked --release -p sdk-processor
RUN cp target/release/sdk-processor /usr/local/bin

# add build info
ARG GIT_TAG
Expand All @@ -29,6 +31,7 @@ FROM debian:bullseye-slim

COPY --from=builder /usr/local/bin/processor /usr/local/bin
COPY --from=builder /usr/local/bin/indexer-metrics /usr/local/bin
COPY --from=builder /usr/local/bin/sdk-processor /usr/local/bin

RUN --mount=type=cache,target=/var/cache/apt,sharing=locked \
--mount=type=cache,target=/var/lib/apt,sharing=locked \
Expand Down
1 change: 1 addition & 0 deletions rust/processor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ ahash = { workspace = true }
allocative = { workspace = true }
allocative_derive = { workspace = true }
anyhow = { workspace = true }
aptos-indexer-processor-sdk = { workspace = true }
aptos-moving-average = { workspace = true }
aptos-protos = { workspace = true }
async-trait = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion rust/processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub use config::IndexerGrpcProcessorConfig;

pub mod bq_analytics;
mod config;
mod db;
pub mod db;
pub mod gap_detectors;
pub mod grpc_stream;
pub mod processors;
Expand Down
49 changes: 49 additions & 0 deletions rust/sdk-processor/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
[package]
name = "sdk-processor"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
ahash = { workspace = true }
anyhow = { workspace = true }
aptos-indexer-processor-sdk = { workspace = true }
aptos-indexer-processor-sdk-server-framework = { workspace = true }
async-trait = { workspace = true }
bcs = { workspace = true }
bigdecimal = { workspace = true }
chrono = { workspace = true }
clap = { workspace = true }
diesel = { workspace = true }
diesel-async = { workspace = true }
diesel_migrations = { workspace = true }
field_count = { workspace = true }
futures = { workspace = true }
futures-util = { workspace = true }
hex = { workspace = true }
jemallocator = { workspace = true }
kanal = { workspace = true }
lazy_static = { workspace = true }
num_cpus = { workspace = true }
processor = { workspace = true }
rayon = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
sha2 = { workspace = true }
strum = { workspace = true }
tiny-keccak = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
url = { workspace = true }

# Postgres SSL support
native-tls = { workspace = true }
postgres-native-tls = { workspace = true }
tokio-postgres = { workspace = true }

[features]
libpq = ["diesel/postgres"]
# When using the default features we enable the diesel/postgres feature. We configure
# it in a feature so the CLI can opt out, since it cannot tolerate the libpq dep.
# Recall that features should always be additive.
default = ["libpq"]
50 changes: 50 additions & 0 deletions rust/sdk-processor/src/config/db_config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use serde::{Deserialize, Serialize};

/// This enum captures the configs for all the different db storages that are defined.
/// The configs for each db storage should only contain configuration specific to that
/// type.
#[derive(Clone, Debug, Deserialize, Serialize, strum::IntoStaticStr, strum::EnumDiscriminants)]
#[serde(tag = "type", rename_all = "snake_case")]
// What is all this strum stuff? Let me explain.
//
// Previously we had consts called NAME in each module and a function called `name` on
// the ProcessorTrait. As such it was possible for this name to not match the snake case
// representation of the struct name. By using strum we can have a single source for
// processor names derived from the enum variants themselves.
//
// That's what this strum_discriminants stuff is, it uses macro magic to generate the
// ProcessorName enum based on ProcessorConfig. The rest of the derives configure this
// generation logic, e.g. to make sure we use snake_case.
#[strum(serialize_all = "snake_case")]
#[strum_discriminants(
derive(
Deserialize,
Serialize,
strum::EnumVariantNames,
strum::IntoStaticStr,
strum::Display,
clap::ValueEnum
),
name(DbTypeName),
clap(rename_all = "snake_case"),
serde(rename_all = "snake_case"),
strum(serialize_all = "snake_case")
)]
pub enum DbConfig {
PostgresConfig(PostgresConfig),
}

#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct PostgresConfig {
pub connection_string: String,
// Size of the pool for writes/reads to the DB. Limits maximum number of queries in flight
#[serde(default = "PostgresConfig::default_db_pool_size")]
pub db_pool_size: u32,
}

impl PostgresConfig {
pub const fn default_db_pool_size() -> u32 {
150
}
}
40 changes: 40 additions & 0 deletions rust/sdk-processor/src/config/indexer_processor_config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use super::{db_config::DbConfig, processor_config::ProcessorConfig};
use crate::processors::events_processor::EventsProcessor;
use anyhow::Result;
use aptos_indexer_processor_sdk::aptos_indexer_transaction_stream::TransactionStreamConfig;
use aptos_indexer_processor_sdk_server_framework::RunnableConfig;
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct IndexerProcessorConfig {
pub processor_config: ProcessorConfig,
rtso marked this conversation as resolved.
Show resolved Hide resolved
pub transaction_stream_config: TransactionStreamConfig,
pub db_config: DbConfig,
rtso marked this conversation as resolved.
Show resolved Hide resolved
}

#[async_trait::async_trait]
impl RunnableConfig for IndexerProcessorConfig {
async fn run(&self) -> Result<()> {
match self.processor_config {
ProcessorConfig::EventsProcessor(_) => {
let events_processor = EventsProcessor::new(self.clone()).await?;
events_processor.run_processor().await
},
}
}

fn get_server_name(&self) -> String {
// Get the part before the first _ and trim to 12 characters.
let before_underscore = self
.processor_config
.name()
.split('_')
.next()
.unwrap_or("unknown");
before_underscore[..before_underscore.len().min(12)].to_string()
}
}
3 changes: 3 additions & 0 deletions rust/sdk-processor/src/config/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod db_config;
pub mod indexer_processor_config;
pub mod processor_config;
75 changes: 75 additions & 0 deletions rust/sdk-processor/src/config/processor_config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use crate::processors::events_processor::EventsProcessorConfig;
use serde::{Deserialize, Serialize};

/// This enum captures the configs for all the different processors that are defined.
/// The configs for each processor should only contain configuration specific to that
/// processor. For configuration that is common to all processors, put it in
/// IndexerGrpcProcessorConfig.
#[derive(Clone, Debug, Deserialize, Serialize, strum::IntoStaticStr, strum::EnumDiscriminants)]
#[serde(tag = "type", rename_all = "snake_case")]
// What is all this strum stuff? Let me explain.
//
// Previously we had consts called NAME in each module and a function called `name` on
// the ProcessorTrait. As such it was possible for this name to not match the snake case
// representation of the struct name. By using strum we can have a single source for
// processor names derived from the enum variants themselves.
//
// That's what this strum_discriminants stuff is, it uses macro magic to generate the
// ProcessorName enum based on ProcessorConfig. The rest of the derives configure this
// generation logic, e.g. to make sure we use snake_case.
#[strum(serialize_all = "snake_case")]
#[strum_discriminants(
derive(
Deserialize,
Serialize,
strum::EnumVariantNames,
strum::IntoStaticStr,
strum::Display,
clap::ValueEnum
),
name(ProcessorName),
clap(rename_all = "snake_case"),
serde(rename_all = "snake_case"),
strum(serialize_all = "snake_case")
)]
pub enum ProcessorConfig {
EventsProcessor(EventsProcessorConfig),
}

impl ProcessorConfig {
/// Get the name of the processor config as a static str. This is a convenience
/// method to access the derived functionality implemented by strum::IntoStaticStr.
pub fn name(&self) -> &'static str {
self.into()
}
}
#[derive(Debug)]
// To ensure that the variants of ProcessorConfig and Processor line up, in the testing
// build path we derive EnumDiscriminants on this enum as well and make sure the two
// sets of variants match up in `test_processor_names_complete`.
#[cfg_attr(
test,
derive(strum::EnumDiscriminants),
strum_discriminants(
derive(strum::EnumVariantNames),
name(ProcessorDiscriminants),
strum(serialize_all = "snake_case")
)
)]
pub enum Processor {
EventsProcessor,
}

#[cfg(test)]
mod test {
use super::*;
use strum::VariantNames;

/// This test exists to make sure that when a new processor is added, it is added
/// to both Processor and ProcessorConfig. To make sure this passes, make sure the
/// variants are in the same order (lexicographical) and the names match.
#[test]
fn test_processor_names_complete() {
assert_eq!(ProcessorName::VARIANTS, ProcessorDiscriminants::VARIANTS);
}
}
1 change: 1 addition & 0 deletions rust/sdk-processor/src/db/common/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod models;
79 changes: 79 additions & 0 deletions rust/sdk-processor/src/db/common/models/events_models/events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

#![allow(clippy::extra_unused_lifetimes)]

// Copied from processor crate. The only difference is the protos are imported from the SDK
// instead of the aptos-protos crate.
use aptos_indexer_processor_sdk::aptos_protos::transaction::v1::Event as EventPB;
use diesel::{Identifiable, Insertable};
use field_count::FieldCount;
use processor::{
schema::events,
utils::util::{standardize_address, truncate_str},
};
use serde::{Deserialize, Serialize};

// p99 currently is 303 so using 300 as a safe max length
const EVENT_TYPE_MAX_LENGTH: usize = 300;

#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)]
#[diesel(primary_key(transaction_version, event_index))]
#[diesel(table_name = events)]
pub struct Event {
pub sequence_number: i64,
pub creation_number: i64,
pub account_address: String,
pub transaction_version: i64,
pub transaction_block_height: i64,
pub type_: String,
pub data: serde_json::Value,
pub event_index: i64,
pub indexed_type: String,
}

impl Event {
pub fn from_event(
event: &EventPB,
transaction_version: i64,
transaction_block_height: i64,
event_index: i64,
) -> Self {
let t: &str = event.type_str.as_ref();
Event {
account_address: standardize_address(
event.key.as_ref().unwrap().account_address.as_str(),
),
creation_number: event.key.as_ref().unwrap().creation_number as i64,
sequence_number: event.sequence_number as i64,
transaction_version,
transaction_block_height,
type_: t.to_string(),
data: serde_json::from_str(event.data.as_str()).unwrap(),
event_index,
indexed_type: truncate_str(t, EVENT_TYPE_MAX_LENGTH),
}
}

pub fn from_events(
events: &[EventPB],
transaction_version: i64,
transaction_block_height: i64,
) -> Vec<Self> {
events
.iter()
.enumerate()
.map(|(index, event)| {
Self::from_event(
event,
transaction_version,
transaction_block_height,
index as i64,
)
})
.collect::<Vec<EventModel>>()
}
}

// Prevent conflicts with other things named `Event`
pub type EventModel = Event;
4 changes: 4 additions & 0 deletions rust/sdk-processor/src/db/common/models/events_models/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

pub mod events;
2 changes: 2 additions & 0 deletions rust/sdk-processor/src/db/common/models/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod events_models;
pub mod processor_status;
Loading
Loading