Skip to content

Commit

Permalink
refactor: add structs with builder for SubscriptionTask and add bette…
Browse files Browse the repository at this point in the history
…r shutdown handling
  • Loading branch information
Nagaprasadvr committed Oct 17, 2024
1 parent c5726ca commit 10ba722
Show file tree
Hide file tree
Showing 4 changed files with 259 additions and 168 deletions.
64 changes: 36 additions & 28 deletions grpc-ingest/config-grpc2redis.yml
Original file line number Diff line number Diff line change
@@ -1,51 +1,59 @@
prometheus: 0.0.0.0:8873
geyser_endpoint: http://127.0.0.1:10000
x_token: null
commitment: finalized
max_concurrency: 5
geyser:
endpoint: http://127.0.0.1:10000
x_token: null
commitment: finalized
connection_timeout: 10
timeout: 10
subscriptions:
token-metadata:
stream:
name: !ACCOUNTS
max_len: 100_000_000
filter: !ACCOUNTS
owner:
- metaqbxxUerdq28cj1RbAWkYQm3ybzjb6a8bt518x1s
max_concurrency: 2
filter:
accounts:
owner:
- metaqbxxUerdq28cj1RbAWkYQm3ybzjb6a8bt518x1s
token-extension:
stream:
name: !ACCOUNTS
max_len: 100_000_000
filter: !ACCOUNTS
owner:
- TokenzQdBNbLqP5VEhdkAS6EPFLC1PHnBqCXEpPxuEb
max_concurrency: 2
filter:
accounts:
owner:
- TokenzQdBNbLqP5VEhdkAS6EPFLC1PHnBqCXEpPxuEb
token:
stream:
name: !ACCOUNTS
max_len: 100_000_000
filter: !ACCOUNTS
owner:
- TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA
max_concurrency: 5
filter:
accounts:
owner:
- TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA
tcomp:
stream:
name: !ACCOUNTS
max_len: 100_000_000
filter: !ACCOUNTS
owner:
- CoREENxT6tW1HoK8ypY1SxRMZTcVPm7R94rH4PZNhX7d
bubblegum-accounts:
max_concurrency: 2
filter:
accounts:
owner:
- CoREENxT6tW1HoK8ypY1SxRMZTcVPm7R94rH4PZNhX7d
bubblegum:
stream:
name: !ACCOUNTS
name: !ACCOUNTS_WITH_TRANSACTIONS
max_len: 100_000_000
filter: !ACCOUNTS
owner:
- BGUMAp9Gq7iTEuizy4pqaxsTyUCBK68MDfK752saRPUY
bubblegum-transactions:
stream:
name: !TRANSACTIONS
max_len: 10_000_000
filter: !TRANSACTIONS
account_include:
- BGUMAp9Gq7iTEuizy4pqaxsTyUCBK68MDfK752saRPUY
max_concurrency: 2
filter:
accounts:
owner:
- BGUMAp9Gq7iTEuizy4pqaxsTyUCBK68MDfK752saRPUY
transactions:
account_include:
- BGUMAp9Gq7iTEuizy4pqaxsTyUCBK68MDfK752saRPUY

redis:
url: redis://localhost:6379
Expand Down
93 changes: 61 additions & 32 deletions grpc-ingest/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,78 +120,107 @@ pub struct ConfigPrometheus {
pub prometheus: Option<SocketAddr>,
}

#[derive(Debug, Clone, Deserialize)]
#[derive(Debug, Clone, Deserialize, Default)]
pub struct ConfigGeyser {
pub endpoint: String,
pub x_token: Option<String>,
#[serde(default = "ConfigGeyser::default_commitment")]
pub commitment: ConfigGrpcRequestCommitment,
#[serde(
default = "ConfigGeyser::default_connection_timeout",
deserialize_with = "deserialize_usize_str"
)]
pub connect_timeout: usize,
#[serde(
default = "ConfigGeyser::default_timeout",
deserialize_with = "deserialize_usize_str"
)]
pub timeout: usize,
}

impl ConfigGeyser {
pub const fn default_commitment() -> ConfigGrpcRequestCommitment {
ConfigGrpcRequestCommitment::Finalized
}

pub const fn default_connection_timeout() -> usize {
10
}

pub const fn default_timeout() -> usize {
10
}
}

#[derive(Debug, Clone, Deserialize, Default)]
#[serde(rename_all = "UPPERCASE")]
pub enum StreamName {
#[default]
Accounts,
Transactions,
#[serde(rename = "ACCOUNTS_WITH_TRANSACTIONS")]
AccountsWithTransactions,
}

#[derive(Debug, Clone, Deserialize)]
pub struct StreamConfig {
#[derive(Debug, Clone, Deserialize, Default)]
pub struct ConfigStream {
pub name: StreamName,
#[serde(
default = "StreamConfig::default_stream_maxlen",
default = "ConfigStream::default_stream_maxlen",
deserialize_with = "deserialize_usize_str"
)]
pub max_len: usize,
#[serde(
default = "ConfigStream::default_max_concurrency",
deserialize_with = "deserialize_usize_str"
)]
pub max_concurrency: usize,
}

impl StreamConfig {
impl ConfigStream {
pub const fn default_stream_maxlen() -> usize {
10_000_000
}

pub const fn default_max_concurrency() -> usize {
10
}
}

impl ToString for StreamName {
fn to_string(&self) -> String {
match self {
StreamName::Accounts => "ACCOUNTS".to_string(),
StreamName::Transactions => "TRANSACTIONS".to_string(),
StreamName::AccountsWithTransactions => "ACCOUNTS_WITH_TRANSACTIONS".to_string(),
}
}
}

#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "UPPERCASE")]
pub enum ConfigGrpcRequestFilter {
Transactions(ConfigGrpcRequestTransactions),
Accounts(ConfigGrpcRequestAccounts),
#[derive(Debug, Clone, Deserialize, Default)]
pub struct ConfigGrpcRequestFilter {
pub accounts: Option<ConfigGrpcRequestAccounts>,
pub transactions: Option<ConfigGrpcRequestTransactions>,
}

#[derive(Debug, Clone, Deserialize)]
pub struct SubscriptionConfig {
pub stream: StreamConfig,
#[derive(Debug, Clone, Deserialize, Default)]
pub struct ConfigSubscription {
pub stream: ConfigStream,
pub filter: ConfigGrpcRequestFilter,
}

pub type ConfigGrpcSubscriptions = HashMap<String, SubscriptionConfig>;
pub type ConfigGrpcSubscriptions = HashMap<String, ConfigSubscription>;

#[derive(Debug, Clone, Deserialize)]
#[derive(Debug, Clone, Deserialize, Default)]
pub struct ConfigGrpc {
pub x_token: Option<String>,
pub geyser: ConfigGeyser,

pub commitment: ConfigGrpcRequestCommitment,
pub subscriptions: ConfigGrpcSubscriptions,

pub geyser_endpoint: String,

pub redis: ConfigGrpcRedis,

#[serde(
default = "ConfigGrpc::default_max_concurrency",
deserialize_with = "deserialize_usize_str"
)]
pub max_concurrency: usize,
}

impl ConfigGrpc {
pub const fn default_max_concurrency() -> usize {
10
}
}

#[derive(Debug, Clone, Deserialize)]
#[derive(Debug, Clone, Deserialize, Default)]
pub struct ConfigGrpcRedis {
pub url: String,
#[serde(
Expand Down
Loading

0 comments on commit 10ba722

Please sign in to comment.