Skip to content

Commit

Permalink
feat: set num threads for topograph. report on postgres and redis in …
Browse files Browse the repository at this point in the history
…ingester using dedicate exec
  • Loading branch information
kespinola committed Aug 14, 2024
1 parent f398880 commit 70eef04
Show file tree
Hide file tree
Showing 7 changed files with 252 additions and 169 deletions.
2 changes: 2 additions & 0 deletions grpc-ingest/config-grpc2redis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ prometheus: 127.0.0.1:8873
geyser_endpoint: http://127.0.0.1:10000
x_token: null
commitment: finalized
topograph:
num_threads: 5
accounts:
stream: ACCOUNTS
stream_maxlen: 100_000_000
Expand Down
4 changes: 3 additions & 1 deletion grpc-ingest/config-ingester.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
prometheus: 127.0.0.1:8874
topograph:
num_threads: 1
redis:
url: redis://localhost:6379
group: ingester
Expand All @@ -22,6 +24,6 @@ postgres:
min_connections: 10
max_connections: 50 # `max_connection` should be bigger than `program_transformer.max_tasks_in_process` otherwise unresolved lock is possible
program_transformer:
max_tasks_in_process: 40
max_tasks_in_process: 100
download_metadata:
max_attempts: 3
49 changes: 34 additions & 15 deletions grpc-ingest/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,26 @@ where
}
}

#[derive(Debug, Default, Deserialize)]
#[derive(Debug, Clone, Deserialize, Default)]
#[serde(default)]
pub struct ConfigTopograph {
#[serde(default = "ConfigTopograph::default_num_threads")]
pub num_threads: usize,
}

impl ConfigTopograph {
pub const fn default_num_threads() -> usize {
5
}
}

#[derive(Debug, Clone, Default, Deserialize)]
#[serde(default)]
pub struct ConfigPrometheus {
pub prometheus: Option<SocketAddr>,
}

#[derive(Debug, Deserialize)]
#[derive(Debug, Clone, Deserialize)]
pub struct ConfigGrpc {
pub x_token: Option<String>,

Expand All @@ -61,6 +74,8 @@ pub struct ConfigGrpc {
pub solana_seen_event_cache_max_size: usize,

pub redis: ConfigGrpcRedis,

pub topograph: ConfigTopograph,
}

impl ConfigGrpc {
Expand All @@ -73,7 +88,7 @@ impl ConfigGrpc {
}
}

#[derive(Debug, Deserialize)]
#[derive(Debug, Clone, Deserialize)]
pub struct ConfigGrpcAccounts {
#[serde(default = "ConfigGrpcAccounts::default_stream")]
pub stream: String,
Expand Down Expand Up @@ -102,7 +117,7 @@ impl ConfigGrpcAccounts {
}
}

#[derive(Debug, Deserialize)]
#[derive(Debug, Clone, Deserialize)]
pub struct ConfigGrpcTransactions {
pub stream: String,
#[serde(
Expand All @@ -120,7 +135,7 @@ impl ConfigGrpcTransactions {
}
}

#[derive(Debug, Deserialize)]
#[derive(Debug, Clone, Deserialize)]
pub struct ConfigGrpcRedis {
pub url: String,
#[serde(
Expand Down Expand Up @@ -163,23 +178,27 @@ where
Ok(Duration::from_millis(ms as u64))
}

#[derive(Debug, Deserialize)]
#[derive(Debug, Clone, Deserialize)]
pub struct ConfigIngester {
pub redis: ConfigIngesterRedis,
pub postgres: ConfigIngesterPostgres,
pub program_transformer: ConfigIngesterProgramTransformer,
pub download_metadata: ConfigIngesterDownloadMetadata,
pub topograph: ConfigTopograph,
pub program_transformer: ConfigIngesterProgramTransformer,
}

impl ConfigIngester {
pub fn check(&self) {
if self.postgres.max_connections < self.program_transformer.max_tasks_in_process {
warn!("`postgres.max_connections` should be bigger than `program_transformer.max_tasks_in_process` otherwise unresolved lock is possible");
if self.postgres.max_connections < self.topograph.num_threads {
warn!(
"postgres.max_connections ({}) should be more than the number of threads ({})",
self.postgres.max_connections, self.topograph.num_threads
);
}
}
}

#[derive(Debug, Deserialize)]
#[derive(Debug, Clone, Deserialize)]
pub struct ConfigIngesterRedis {
pub url: String,
#[serde(default = "ConfigIngesterRedis::default_group")]
Expand Down Expand Up @@ -246,7 +265,7 @@ impl<'de> Deserialize<'de> for ConfigIngesterRedisStream {
where
D: de::Deserializer<'de>,
{
#[derive(Debug, Deserialize)]
#[derive(Debug, Clone, Copy, Deserialize)]
struct Raw {
#[serde(rename = "type")]
pub stream_type: ConfigIngesterRedisStreamType,
Expand Down Expand Up @@ -296,15 +315,15 @@ impl<'de> Deserialize<'de> for ConfigIngesterRedisStream {
}
}

#[derive(Debug, Deserialize, Clone, Copy, PartialEq, Eq)]
#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum ConfigIngesterRedisStreamType {
Account,
Transaction,
MetadataJson,
}

#[derive(Debug, Deserialize)]
#[derive(Debug, Clone, Deserialize)]
pub struct ConfigIngesterPostgres {
pub url: String,
#[serde(
Expand All @@ -329,7 +348,7 @@ impl ConfigIngesterPostgres {
}
}

#[derive(Debug, Deserialize)]
#[derive(Debug, Clone, Deserialize)]
pub struct ConfigIngesterProgramTransformer {
#[serde(
default = "ConfigIngesterProgramTransformer::default_max_tasks_in_process",
Expand Down Expand Up @@ -403,7 +422,7 @@ impl ConfigIngesterDownloadMetadata {
}
}

#[derive(Debug, Deserialize)]
#[derive(Debug, Clone, Deserialize)]
pub struct ConfigDownloadMetadata {
pub postgres: ConfigIngesterPostgres,
pub download_metadata: ConfigDownloadMetadataOpts,
Expand Down
119 changes: 59 additions & 60 deletions grpc-ingest/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,74 +76,73 @@ pub async fn run_v2(config: ConfigGrpc) -> anyhow::Result<()> {
let pool_connection = connection.clone();
let pool_pipe = Arc::clone(&pipe);

let exec =
Executor::builder(Nonblock(Tokio))
.num_threads(None)
.build(move |update, _handle| {
let config = Arc::clone(&pool_config);
let connection = pool_connection.clone();
let pipe = Arc::clone(&pool_pipe);

async move {
match update {
GrpcJob::FlushRedisPipe => {
let mut pipe = pipe.lock().await;
let mut connection = connection;

let flush = pipe.flush(&mut connection).await;

let status = flush.as_ref().map(|_| ()).map_err(|_| ());
let counts = flush.as_ref().unwrap_or_else(|counts| counts);

for (stream, count) in counts.iter() {
redis_xadd_status_inc(stream, status, *count);
}
let exec = Executor::builder(Nonblock(Tokio))
.num_threads(Some(config.topograph.num_threads))
.build(move |update, _handle| {
let config = Arc::clone(&pool_config);
let connection = pool_connection.clone();
let pipe = Arc::clone(&pool_pipe);

async move {
match update {
GrpcJob::FlushRedisPipe => {
let mut pipe = pipe.lock().await;
let mut connection = connection;

let flush = pipe.flush(&mut connection).await;

debug!(message = "Redis pipe flushed", ?status, ?counts);
let status = flush.as_ref().map(|_| ()).map_err(|_| ());
let counts = flush.as_ref().unwrap_or_else(|counts| counts);

for (stream, count) in counts.iter() {
redis_xadd_status_inc(stream, status, *count);
}
GrpcJob::ProcessSubscribeUpdate(update) => {
let accounts_stream = config.accounts.stream.clone();
let accounts_stream_maxlen = config.accounts.stream_maxlen;
let transactions_stream = config.transactions.stream.clone();
let transactions_stream_maxlen = config.transactions.stream_maxlen;

let SubscribeUpdate { update_oneof, .. } = *update;

let mut pipe = pipe.lock().await;

if let Some(update) = update_oneof {
match update {
UpdateOneof::Account(account) => {
pipe.xadd_maxlen(
&accounts_stream,
StreamMaxlen::Approx(accounts_stream_maxlen),
"*",
account.encode_to_vec(),
);

debug!(message = "Account update", ?account,);
}
UpdateOneof::Transaction(transaction) => {
pipe.xadd_maxlen(
&transactions_stream,
StreamMaxlen::Approx(transactions_stream_maxlen),
"*",
transaction.encode_to_vec(),
);

debug!(message = "Transaction update", ?transaction);
}
var => warn!(message = "Unknown update variant", ?var),

debug!(message = "Redis pipe flushed", ?status, ?counts);
}
GrpcJob::ProcessSubscribeUpdate(update) => {
let accounts_stream = config.accounts.stream.clone();
let accounts_stream_maxlen = config.accounts.stream_maxlen;
let transactions_stream = config.transactions.stream.clone();
let transactions_stream_maxlen = config.transactions.stream_maxlen;

let SubscribeUpdate { update_oneof, .. } = *update;

let mut pipe = pipe.lock().await;

if let Some(update) = update_oneof {
match update {
UpdateOneof::Account(account) => {
pipe.xadd_maxlen(
&accounts_stream,
StreamMaxlen::Approx(accounts_stream_maxlen),
"*",
account.encode_to_vec(),
);

debug!(message = "Account update", ?account,);
}
UpdateOneof::Transaction(transaction) => {
pipe.xadd_maxlen(
&transactions_stream,
StreamMaxlen::Approx(transactions_stream_maxlen),
"*",
transaction.encode_to_vec(),
);

debug!(message = "Transaction update", ?transaction);
}
var => warn!(message = "Unknown update variant", ?var),
}
}

if pipe.size() >= config.redis.pipeline_max_size {
// handle.push(GrpcJob::FlushRedisPipe);
}
if pipe.size() >= config.redis.pipeline_max_size {
// handle.push(GrpcJob::FlushRedisPipe);
}
}
}
})?;
}
})?;

let deadline_config = Arc::clone(&config);

Expand Down
Loading

0 comments on commit 70eef04

Please sign in to comment.