Skip to content

Commit

Permalink
Make workers configurable
Browse files Browse the repository at this point in the history
Make workers fully configurab le and remove reference to the plerkle plugin.
  • Loading branch information
linuskendall committed Nov 27, 2023
1 parent a973f19 commit eb84064
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 111 deletions.
53 changes: 37 additions & 16 deletions nft_ingester/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,25 @@ pub struct IngesterConfig {
pub backfiller_trees: Option<Vec<String>>,
pub role: Option<IngesterRole>,
pub max_postgres_connections: Option<u32>,
pub account_stream_worker_count: Option<u32>,
pub account_backfill_stream_worker_count: Option<u32>,
pub transaction_stream_worker_count: Option<u32>,
pub transaction_backfill_stream_worker_count: Option<u32>,
pub worker_config: Option<Vec<WorkerConfig>>,
pub code_version: Option<&'static str>,
pub background_task_runner_config: Option<BackgroundTaskRunnerConfig>,
pub cl_audits: Option<bool>, // save transaction logs for compressed nfts
}

#[derive(Deserialize, PartialEq, Debug, Clone)]
pub struct WorkerConfig {
pub stream_name: String,
pub worker_type: WorkerType,
pub worker_count: u32,
}

#[derive(Deserialize, PartialEq, Debug, Clone)]
pub enum WorkerType {
Account,
Transaction,
}

impl IngesterConfig {
/// Get the db url out of the dict, this is built a a dict so that future extra db parameters can be easily shoved in.
/// this panics if the key is not present
Expand Down Expand Up @@ -66,20 +76,31 @@ impl IngesterConfig {
mc
}

pub fn get_account_stream_worker_count(&self) -> u32 {
self.account_stream_worker_count.unwrap_or(2)
pub fn get_worker_config(&self) -> Vec<WorkerConfig> {
return if let Some(wc) = &self.worker_config {
wc.to_vec()
} else {
vec![
WorkerConfig {
stream_name: "ACC".to_string(),
worker_count: 2,
worker_type: WorkerType::Account,
},
WorkerConfig {
stream_name: "TXN".to_string(),
worker_count: 2,
worker_type: WorkerType::Transaction,
},
]
};
}

pub fn get_account_backfill_stream_worker_count(&self) -> u32 {
self.account_backfill_stream_worker_count.unwrap_or(0)
}

pub fn get_transaction_stream_worker_count(&self) -> u32 {
self.transaction_stream_worker_count.unwrap_or(2)
}

pub fn get_transaction_backfill_stream_worker_count(&self) -> u32 {
self.transaction_backfill_stream_worker_count.unwrap_or(0)
pub fn get_worker_count(&self) -> u32 {
let mut count = 0;
for wc in self.get_worker_config() {
count += wc.worker_count;
}
count
}
}

Expand Down
144 changes: 49 additions & 95 deletions nft_ingester/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{
account_updates::account_worker,
ack::ack_worker,
backfiller::setup_backfiller,
config::{init_logger, rand_string, setup_config, IngesterRole},
config::{init_logger, rand_string, setup_config, IngesterRole, WorkerType},
database::setup_database,
error::IngesterError,
metrics::setup_metrics,
Expand All @@ -26,9 +26,7 @@ use cadence_macros::{is_global_default_set, statsd_count};
use chrono::Duration;
use clap::{arg, command, value_parser};
use log::{error, info};
use plerkle_messenger::{
redis_messenger::RedisMessenger, ConsumptionType, ACCOUNT_STREAM, ACCOUNT_BACKFILL_STREAM, TRANSACTION_STREAM, TRANSACTION_BACKFILL_STREAM
};
use plerkle_messenger::{redis_messenger::RedisMessenger, ConsumptionType};
use std::{path::PathBuf, time};
use tokio::{signal, task::JoinSet};

Expand Down Expand Up @@ -97,102 +95,58 @@ pub async fn main() -> Result<(), IngesterError> {
if role != IngesterRole::BackgroundTaskRunner {
tasks.spawn(bg_task_listener);
}
let mut timer_acc = StreamSizeTimer::new(
stream_metrics_timer,
config.messenger_config.clone(),
ACCOUNT_STREAM,
)?;
let mut timer_backfiller_acc = StreamSizeTimer::new(
stream_metrics_timer,
config.messenger_config.clone(),
ACCOUNT_BACKFILL_STREAM,
)?;
let mut timer_txn = StreamSizeTimer::new(
stream_metrics_timer,
config.messenger_config.clone(),
TRANSACTION_STREAM,
)?;
let mut timer_backfiller_txn = StreamSizeTimer::new(
stream_metrics_timer,
config.messenger_config.clone(),
TRANSACTION_BACKFILL_STREAM,
)?;


if let Some(t) = timer_acc.start::<RedisMessenger>().await {
tasks.spawn(t);
}
if let Some(t) = timer_backfiller_acc.start::<RedisMessenger>().await {
tasks.spawn(t);
}
if let Some(t) = timer_txn.start::<RedisMessenger>().await {
tasks.spawn(t);
}
if let Some(t) = timer_backfiller_txn.start::<RedisMessenger>().await {
tasks.spawn(t);
}

// Stream Consumers Setup -------------------------------------
if role == IngesterRole::Ingester || role == IngesterRole::All {
let workers = config.get_worker_config().clone();

let (_ack_task, ack_sender) =
ack_worker::<RedisMessenger>(config.get_messneger_client_config());
for i in 0..config.get_account_stream_worker_count() {
let _account = account_worker::<RedisMessenger>(
database_pool.clone(),
config.get_messneger_client_config(),
bg_task_sender.clone(),
ack_sender.clone(),
if i == 0 {
ConsumptionType::Redeliver
} else {
ConsumptionType::New
},
ACCOUNT_STREAM,
);
}
for i in 0..config.get_account_backfill_stream_worker_count() {
let _account_backfill = account_worker::<RedisMessenger>(
database_pool.clone(),
config.get_messneger_client_config(),
bg_task_sender.clone(),
ack_sender.clone(),
if i == 0 {
ConsumptionType::Redeliver
} else {
ConsumptionType::New
},
ACCOUNT_BACKFILL_STREAM,
);
}
for i in 0..config.get_transaction_stream_worker_count() {
let _txn = transaction_worker::<RedisMessenger>(
database_pool.clone(),
config.get_messneger_client_config(),
bg_task_sender.clone(),
ack_sender.clone(),
if i == 0 {
ConsumptionType::Redeliver
} else {
ConsumptionType::New
},
config.cl_audits.unwrap_or(false),
TRANSACTION_STREAM,
);
}
for i in 0..config.get_transaction_backfill_stream_worker_count() {
let _txn_backfill = transaction_worker::<RedisMessenger>(
database_pool.clone(),
config.get_messneger_client_config(),
bg_task_sender.clone(),
ack_sender.clone(),
if i == 0 {
ConsumptionType::Redeliver
} else {
ConsumptionType::New
},
config.cl_audits.unwrap_or(false),
TRANSACTION_BACKFILL_STREAM,
);

// iterate all the workers
for worker in workers {
let stream_name = worker.stream_name.to_owned().as_str();

let mut timer_worker = StreamSizeTimer::new(
stream_metrics_timer,
config.messenger_config.clone(),
stream_name.clone(),
)?;

if let Some(t) = timer_worker.start::<RedisMessenger>().await {
tasks.spawn(t);
}

for i in 0..worker.worker_count {
if worker.worker_type == WorkerType::Account {
let _account = account_worker::<RedisMessenger>(
database_pool.clone(),
config.get_messneger_client_config(),
bg_task_sender.clone(),
ack_sender.clone(),
if i == 0 {
ConsumptionType::Redeliver
} else {
ConsumptionType::New
},
stream_name,
);
} else if worker.worker_type == WorkerType::Transaction {
let _txn = transaction_worker::<RedisMessenger>(
database_pool.clone(),
config.get_messneger_client_config(),
bg_task_sender.clone(),
ack_sender.clone(),
if i == 0 {
ConsumptionType::Redeliver
} else {
ConsumptionType::New
},
config.cl_audits.unwrap_or(false),
stream_name,
);
}
}
}
}
// Stream Size Timers ----------------------------------------
Expand Down

0 comments on commit eb84064

Please sign in to comment.