Skip to content

Commit

Permalink
Configurable account streams (#148)
Browse files Browse the repository at this point in the history
* Make workers configurable

Make workers fully configurab le and remove reference to the plerkle plugin.

* fix lifetime

---------

Co-authored-by: Kirill Fomichev <[email protected]>
  • Loading branch information
linuskendall and fanatid authored Jan 9, 2024
1 parent 2d5a0dc commit 41ead54
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 113 deletions.
53 changes: 37 additions & 16 deletions nft_ingester/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,25 @@ pub struct IngesterConfig {
pub backfiller_trees: Option<Vec<String>>,
pub role: Option<IngesterRole>,
pub max_postgres_connections: Option<u32>,
pub account_stream_worker_count: Option<u32>,
pub account_backfill_stream_worker_count: Option<u32>,
pub transaction_stream_worker_count: Option<u32>,
pub transaction_backfill_stream_worker_count: Option<u32>,
pub worker_config: Option<Vec<WorkerConfig>>,
pub code_version: Option<&'static str>,
pub background_task_runner_config: Option<BackgroundTaskRunnerConfig>,
pub cl_audits: Option<bool>, // save transaction logs for compressed nfts
}

#[derive(Deserialize, PartialEq, Debug, Clone)]
pub struct WorkerConfig {
pub stream_name: String,
pub worker_type: WorkerType,
pub worker_count: u32,
}

#[derive(Deserialize, PartialEq, Debug, Clone)]
pub enum WorkerType {
Account,
Transaction,
}

impl IngesterConfig {
/// Get the db url out of the dict, this is built a a dict so that future extra db parameters can be easily shoved in.
/// this panics if the key is not present
Expand Down Expand Up @@ -66,20 +76,31 @@ impl IngesterConfig {
mc
}

pub fn get_account_stream_worker_count(&self) -> u32 {
self.account_stream_worker_count.unwrap_or(2)
pub fn get_worker_config(&self) -> Vec<WorkerConfig> {
return if let Some(wc) = &self.worker_config {
wc.to_vec()
} else {
vec![
WorkerConfig {
stream_name: "ACC".to_string(),
worker_count: 2,
worker_type: WorkerType::Account,
},
WorkerConfig {
stream_name: "TXN".to_string(),
worker_count: 2,
worker_type: WorkerType::Transaction,
},
]
};
}

pub fn get_account_backfill_stream_worker_count(&self) -> u32 {
self.account_backfill_stream_worker_count.unwrap_or(0)
}

pub fn get_transaction_stream_worker_count(&self) -> u32 {
self.transaction_stream_worker_count.unwrap_or(2)
}

pub fn get_transaction_backfill_stream_worker_count(&self) -> u32 {
self.transaction_backfill_stream_worker_count.unwrap_or(0)
pub fn get_worker_count(&self) -> u32 {
let mut count = 0;
for wc in self.get_worker_config() {
count += wc.worker_count;
}
count
}
}

Expand Down
4 changes: 2 additions & 2 deletions nft_ingester/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ const DEFAULT_MAX: u32 = 125;
pub async fn setup_database(config: IngesterConfig) -> PgPool {
let max = config.max_postgres_connections.unwrap_or(DEFAULT_MAX);
if config.role == Some(IngesterRole::All) || config.role == Some(IngesterRole::Ingester) {
let relative_max =
config.get_account_stream_worker_count() + config.get_transaction_stream_worker_count();
let relative_max: u32 =
config.get_worker_config().iter().map(|c| c.worker_count).sum();
let should_be_at_least = relative_max * 5;
if should_be_at_least > max {
panic!("Please increase max_postgres_connections to at least {}, at least 5 connections per worker process should be given", should_be_at_least);
Expand Down
144 changes: 49 additions & 95 deletions nft_ingester/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{
account_updates::account_worker,
ack::ack_worker,
backfiller::setup_backfiller,
config::{init_logger, rand_string, setup_config, IngesterRole},
config::{init_logger, rand_string, setup_config, IngesterRole, WorkerType},
database::setup_database,
error::IngesterError,
metrics::setup_metrics,
Expand All @@ -26,9 +26,7 @@ use cadence_macros::{is_global_default_set, statsd_count};
use chrono::Duration;
use clap::{arg, command, value_parser};
use log::{error, info};
use plerkle_messenger::{
redis_messenger::RedisMessenger, ConsumptionType, ACCOUNT_STREAM, ACCOUNT_BACKFILL_STREAM, TRANSACTION_STREAM, TRANSACTION_BACKFILL_STREAM
};
use plerkle_messenger::{redis_messenger::RedisMessenger, ConsumptionType};
use std::{path::PathBuf, time};
use tokio::{signal, task::JoinSet};

Expand Down Expand Up @@ -97,102 +95,58 @@ pub async fn main() -> Result<(), IngesterError> {
if role != IngesterRole::BackgroundTaskRunner {
tasks.spawn(bg_task_listener);
}
let mut timer_acc = StreamSizeTimer::new(
stream_metrics_timer,
config.messenger_config.clone(),
ACCOUNT_STREAM,
)?;
let mut timer_backfiller_acc = StreamSizeTimer::new(
stream_metrics_timer,
config.messenger_config.clone(),
ACCOUNT_BACKFILL_STREAM,
)?;
let mut timer_txn = StreamSizeTimer::new(
stream_metrics_timer,
config.messenger_config.clone(),
TRANSACTION_STREAM,
)?;
let mut timer_backfiller_txn = StreamSizeTimer::new(
stream_metrics_timer,
config.messenger_config.clone(),
TRANSACTION_BACKFILL_STREAM,
)?;


if let Some(t) = timer_acc.start::<RedisMessenger>().await {
tasks.spawn(t);
}
if let Some(t) = timer_backfiller_acc.start::<RedisMessenger>().await {
tasks.spawn(t);
}
if let Some(t) = timer_txn.start::<RedisMessenger>().await {
tasks.spawn(t);
}
if let Some(t) = timer_backfiller_txn.start::<RedisMessenger>().await {
tasks.spawn(t);
}

// Stream Consumers Setup -------------------------------------
if role == IngesterRole::Ingester || role == IngesterRole::All {
let workers = config.get_worker_config().clone();

let (_ack_task, ack_sender) =
ack_worker::<RedisMessenger>(config.get_messneger_client_config());
for i in 0..config.get_account_stream_worker_count() {
let _account = account_worker::<RedisMessenger>(
database_pool.clone(),
config.get_messneger_client_config(),
bg_task_sender.clone(),
ack_sender.clone(),
if i == 0 {
ConsumptionType::Redeliver
} else {
ConsumptionType::New
},
ACCOUNT_STREAM,
);
}
for i in 0..config.get_account_backfill_stream_worker_count() {
let _account_backfill = account_worker::<RedisMessenger>(
database_pool.clone(),
config.get_messneger_client_config(),
bg_task_sender.clone(),
ack_sender.clone(),
if i == 0 {
ConsumptionType::Redeliver
} else {
ConsumptionType::New
},
ACCOUNT_BACKFILL_STREAM,
);
}
for i in 0..config.get_transaction_stream_worker_count() {
let _txn = transaction_worker::<RedisMessenger>(
database_pool.clone(),
config.get_messneger_client_config(),
bg_task_sender.clone(),
ack_sender.clone(),
if i == 0 {
ConsumptionType::Redeliver
} else {
ConsumptionType::New
},
config.cl_audits.unwrap_or(false),
TRANSACTION_STREAM,
);
}
for i in 0..config.get_transaction_backfill_stream_worker_count() {
let _txn_backfill = transaction_worker::<RedisMessenger>(
database_pool.clone(),
config.get_messneger_client_config(),
bg_task_sender.clone(),
ack_sender.clone(),
if i == 0 {
ConsumptionType::Redeliver
} else {
ConsumptionType::New
},
config.cl_audits.unwrap_or(false),
TRANSACTION_BACKFILL_STREAM,
);

// iterate all the workers
for worker in workers {
let stream_name = Box::leak(Box::new(worker.stream_name.to_owned()));

let mut timer_worker = StreamSizeTimer::new(
stream_metrics_timer,
config.messenger_config.clone(),
stream_name.as_str(),
)?;

if let Some(t) = timer_worker.start::<RedisMessenger>().await {
tasks.spawn(t);
}

for i in 0..worker.worker_count {
if worker.worker_type == WorkerType::Account {
let _account = account_worker::<RedisMessenger>(
database_pool.clone(),
config.get_messneger_client_config(),
bg_task_sender.clone(),
ack_sender.clone(),
if i == 0 {
ConsumptionType::Redeliver
} else {
ConsumptionType::New
},
stream_name,
);
} else if worker.worker_type == WorkerType::Transaction {
let _txn = transaction_worker::<RedisMessenger>(
database_pool.clone(),
config.get_messneger_client_config(),
bg_task_sender.clone(),
ack_sender.clone(),
if i == 0 {
ConsumptionType::Redeliver
} else {
ConsumptionType::New
},
config.cl_audits.unwrap_or(false),
stream_name,
);
}
}
}
}
// Stream Size Timers ----------------------------------------
Expand Down

0 comments on commit 41ead54

Please sign in to comment.