Skip to content

Commit

Permalink
add skip db write flags for tables in parquet migration tranche 2 and…
Browse files Browse the repository at this point in the history
… 3 (#429)

* add skip db write flags for tables in parquet migration tranche 2 and 3

* remove flags for transaction_metadata_processor

* include coin_infos to deprecated table flags

* remove flags from coin and token processor since we are deprecating those two
  • Loading branch information
yuunlimm authored Jun 28, 2024
1 parent 3077460 commit e351895
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 25 deletions.
4 changes: 2 additions & 2 deletions rust/processor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ Indexer GRPC parser is to indexer data processor that leverages the indexer grpc
- `starting_version`: start processor at starting_version.
- `ending_version`: stop processor after ending_version.
- `number_concurrent_processing_tasks`: number of tasks to parse and insert; 1 means sequential processing, otherwise,
- `deprecated_tables`: a list of tables to skip writing to alloyDB.
transactions are splitted into tasks and inserted with random order.
- `deprecated_tables`: a list of tables to skip writing to alloyDB. you can find a full list of deprecated tables [here](https://aptoslabs.notion.site/Deprecated-Tables-33518cfcff0543378289b2bf06001576?pvs=4)
transactions are splitted into tasks and inserted with random order.

### Use docker image for existing parsers(Only for **Unix/Linux**)

Expand Down
42 changes: 37 additions & 5 deletions rust/processor/src/processors/ans_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::{
database::{execute_in_chunks, get_config_table_chunk_size, ArcDbPool},
util::standardize_address,
},
worker::TableFlags,
};
use ahash::AHashMap;
use anyhow::bail;
Expand Down Expand Up @@ -45,13 +46,15 @@ pub struct AnsProcessor {
connection_pool: ArcDbPool,
config: AnsProcessorConfig,
per_table_chunk_sizes: AHashMap<String, usize>,
deprecated_tables: TableFlags,
}

impl AnsProcessor {
pub fn new(
connection_pool: ArcDbPool,
config: AnsProcessorConfig,
per_table_chunk_sizes: AHashMap<String, usize>,
deprecated_tables: TableFlags,
) -> Self {
tracing::info!(
ans_v1_primary_names_table_handle = config.ans_v1_primary_names_table_handle,
Expand All @@ -63,6 +66,7 @@ impl AnsProcessor {
connection_pool,
config,
per_table_chunk_sizes,
deprecated_tables,
}
}
}
Expand Down Expand Up @@ -372,14 +376,14 @@ impl ProcessorTrait for AnsProcessor {
let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone();

let (
all_current_ans_lookups,
all_ans_lookups,
all_current_ans_primary_names,
all_ans_primary_names,
mut all_current_ans_lookups,
mut all_ans_lookups,
mut all_current_ans_primary_names,
mut all_ans_primary_names,
all_current_ans_lookups_v2,
all_ans_lookups_v2,
all_current_ans_primary_names_v2,
all_ans_primary_names_v2,
mut all_ans_primary_names_v2,
) = parse_ans(
&transactions,
self.config.ans_v1_primary_names_table_handle.clone(),
Expand All @@ -390,6 +394,34 @@ impl ProcessorTrait for AnsProcessor {
let processing_duration_in_secs = processing_start.elapsed().as_secs_f64();
let db_insertion_start = std::time::Instant::now();

if self
.deprecated_tables
.contains(TableFlags::ANS_PRIMARY_NAME)
{
all_ans_primary_names.clear();
}
if self
.deprecated_tables
.contains(TableFlags::ANS_PRIMARY_NAME_V2)
{
all_ans_primary_names_v2.clear();
}
if self.deprecated_tables.contains(TableFlags::ANS_LOOKUP) {
all_ans_lookups.clear();
}
if self
.deprecated_tables
.contains(TableFlags::CURRENT_ANS_LOOKUP)
{
all_current_ans_lookups.clear();
}
if self
.deprecated_tables
.contains(TableFlags::CURRENT_ANS_PRIMARY_NAME)
{
all_current_ans_primary_names.clear();
}

// Insert values to db
let tx_result = insert_to_db(
self.get_pool(),
Expand Down
6 changes: 6 additions & 0 deletions rust/processor/src/processors/default_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,12 @@ fn process_transactions(
if flags.contains(TableFlags::TABLE_ITEMS) {
table_items.clear();
}
if flags.contains(TableFlags::TABLE_METADATAS) {
table_metadata.clear();
}
if flags.contains(TableFlags::MOVE_MODULES) {
move_modules.clear();
}

(
txns,
Expand Down
34 changes: 30 additions & 4 deletions rust/processor/src/processors/fungible_asset_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::{
database::{execute_in_chunks, get_config_table_chunk_size, ArcDbPool},
util::{get_entry_function_from_user_request, standardize_address},
},
worker::TableFlags,
};
use ahash::AHashMap;
use anyhow::bail;
Expand All @@ -45,13 +46,19 @@ use tracing::error;
pub struct FungibleAssetProcessor {
connection_pool: ArcDbPool,
per_table_chunk_sizes: AHashMap<String, usize>,
deprecated_tables: TableFlags,
}

impl FungibleAssetProcessor {
pub fn new(connection_pool: ArcDbPool, per_table_chunk_sizes: AHashMap<String, usize>) -> Self {
pub fn new(
connection_pool: ArcDbPool,
per_table_chunk_sizes: AHashMap<String, usize>,
deprecated_tables: TableFlags,
) -> Self {
Self {
connection_pool,
per_table_chunk_sizes,
deprecated_tables,
}
}
}
Expand Down Expand Up @@ -355,10 +362,10 @@ impl ProcessorTrait for FungibleAssetProcessor {
let (
fungible_asset_activities,
fungible_asset_metadata,
fungible_asset_balances,
current_fungible_asset_balances,
mut fungible_asset_balances,
mut current_fungible_asset_balances,
current_unified_fungible_asset_balances,
coin_supply,
mut coin_supply,
) = parse_v2_coin(&transactions).await;

let processing_duration_in_secs = processing_start.elapsed().as_secs_f64();
Expand All @@ -367,6 +374,25 @@ impl ProcessorTrait for FungibleAssetProcessor {
let (coin_balance, fa_balance): (Vec<_>, Vec<_>) = current_unified_fungible_asset_balances
.into_iter()
.partition(|x| x.is_primary.is_none());

if self
.deprecated_tables
.contains(TableFlags::FUNGIBLE_ASSET_BALANCES)
{
fungible_asset_balances.clear();
}

if self
.deprecated_tables
.contains(TableFlags::CURRENT_FUNGIBLE_ASSET_BALANCES)
{
current_fungible_asset_balances.clear();
}

if self.deprecated_tables.contains(TableFlags::COIN_SUPPLY) {
coin_supply.clear();
}

let tx_result = insert_to_db(
self.get_pool(),
self.name(),
Expand Down
8 changes: 8 additions & 0 deletions rust/processor/src/processors/objects_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::{
database::{execute_in_chunks, get_config_table_chunk_size, ArcDbPool},
util::standardize_address,
},
worker::TableFlags,
IndexerGrpcProcessorConfig,
};
use ahash::AHashMap;
Expand Down Expand Up @@ -40,18 +41,21 @@ pub struct ObjectsProcessor {
connection_pool: ArcDbPool,
config: ObjectsProcessorConfig,
per_table_chunk_sizes: AHashMap<String, usize>,
deprecated_tables: TableFlags,
}

impl ObjectsProcessor {
pub fn new(
connection_pool: ArcDbPool,
config: ObjectsProcessorConfig,
per_table_chunk_sizes: AHashMap<String, usize>,
deprecated_tables: TableFlags,
) -> Self {
Self {
connection_pool,
config,
per_table_chunk_sizes,
deprecated_tables,
}
}
}
Expand Down Expand Up @@ -265,6 +269,10 @@ impl ProcessorTrait for ObjectsProcessor {
.collect::<Vec<CurrentObject>>();
all_current_objects.sort_by(|a, b| a.object_address.cmp(&b.object_address));

if self.deprecated_tables.contains(TableFlags::OBJECTS) {
all_objects.clear();
}

let processing_duration_in_secs = processing_start.elapsed().as_secs_f64();
let db_insertion_start = std::time::Instant::now();

Expand Down
31 changes: 27 additions & 4 deletions rust/processor/src/processors/token_v2_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use crate::{
database::{execute_in_chunks, get_config_table_chunk_size, ArcDbPool, DbPoolConnection},
util::{get_entry_function_from_user_request, parse_timestamp, standardize_address},
},
worker::TableFlags,
IndexerGrpcProcessorConfig,
};
use ahash::{AHashMap, AHashSet};
Expand Down Expand Up @@ -64,18 +65,21 @@ pub struct TokenV2Processor {
connection_pool: ArcDbPool,
config: TokenV2ProcessorConfig,
per_table_chunk_sizes: AHashMap<String, usize>,
deprecated_tables: TableFlags,
}

impl TokenV2Processor {
pub fn new(
connection_pool: ArcDbPool,
config: TokenV2ProcessorConfig,
per_table_chunk_sizes: AHashMap<String, usize>,
deprecated_tables: TableFlags,
) -> Self {
Self {
connection_pool,
config,
per_table_chunk_sizes,
deprecated_tables,
}
}
}
Expand Down Expand Up @@ -594,16 +598,16 @@ impl ProcessorTrait for TokenV2Processor {
let query_retry_delay_ms = self.config.query_retry_delay_ms;
// Token V2 processing which includes token v1
let (
collections_v2,
token_datas_v2,
token_ownerships_v2,
mut collections_v2,
mut token_datas_v2,
mut token_ownerships_v2,
current_collections_v2,
current_token_datas_v2,
current_deleted_token_datas_v2,
current_token_ownerships_v2,
current_deleted_token_ownerships_v2,
token_activities_v2,
current_token_v2_metadata,
mut current_token_v2_metadata,
current_token_royalties_v1,
current_token_claims,
) = parse_v2_token(
Expand All @@ -618,6 +622,25 @@ impl ProcessorTrait for TokenV2Processor {
let processing_duration_in_secs = processing_start.elapsed().as_secs_f64();
let db_insertion_start = std::time::Instant::now();

if self
.deprecated_tables
.contains(TableFlags::TOKEN_OWNERSHIPS_V2)
{
token_ownerships_v2.clear();
}
if self.deprecated_tables.contains(TableFlags::TOKEN_DATAS_V2) {
token_datas_v2.clear();
}
if self.deprecated_tables.contains(TableFlags::COLLECTIONS_V2) {
collections_v2.clear();
}
if self
.deprecated_tables
.contains(TableFlags::CURRENT_TOKEN_V2_METADATA)
{
current_token_v2_metadata.clear();
}

let tx_result = insert_to_db(
self.get_pool(),
self.name(),
Expand Down
13 changes: 12 additions & 1 deletion rust/processor/src/processors/user_transaction_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::{
counters::PROCESSOR_UNKNOWN_TYPE_COUNT,
database::{execute_in_chunks, get_config_table_chunk_size, ArcDbPool},
},
worker::TableFlags,
};
use ahash::AHashMap;
use anyhow::bail;
Expand All @@ -28,13 +29,19 @@ use tracing::error;
pub struct UserTransactionProcessor {
connection_pool: ArcDbPool,
per_table_chunk_sizes: AHashMap<String, usize>,
deprecated_tables: TableFlags,
}

impl UserTransactionProcessor {
pub fn new(connection_pool: ArcDbPool, per_table_chunk_sizes: AHashMap<String, usize>) -> Self {
pub fn new(
connection_pool: ArcDbPool,
per_table_chunk_sizes: AHashMap<String, usize>,
deprecated_tables: TableFlags,
) -> Self {
Self {
connection_pool,
per_table_chunk_sizes,
deprecated_tables,
}
}
}
Expand Down Expand Up @@ -177,6 +184,10 @@ impl ProcessorTrait for UserTransactionProcessor {
}
}

if self.deprecated_tables.contains(TableFlags::SIGNATURES) {
signatures.clear();
}

let processing_duration_in_secs = processing_start.elapsed().as_secs_f64();
let db_insertion_start = std::time::Instant::now();

Expand Down
Loading

0 comments on commit e351895

Please sign in to comment.