Skip to content

Commit

Permalink
fix: applying account and transction filters to grpc subscription req…
Browse files Browse the repository at this point in the history
…uest
  • Loading branch information
kespinola committed Apr 15, 2024
1 parent a00ec75 commit a17087b
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 15 deletions.
22 changes: 13 additions & 9 deletions grpc-ingest/config-grpc2redis.yml
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
prometheus: 127.0.0.1:8873
endpoint: http://127.0.0.1:10000
x_token: null
commitment: processed
endpoint: https://index.rpcpool.com
x_token: 3ac13c5d-be8d-4a17-9be6-93bd08e565ac
commitment: finalized
accounts:
stream: ACCOUNTS
stream_maxlen: 100_000_000
stream_data_key: data
filters:
- owner:
- TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA
- metaqbxxUerdq28cj1RbAWkYQm3ybzjb6a8bt518x1s
filter:
owner:
- "metaqbxxUerdq28cj1RbAWkYQm3ybzjb6a8bt518x1s"
- "TokenzQdBNbLqP5VEhdkAS6EPFLC1PHnBqCXEpPxuEb"
- "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"
- "ATokenGPvbdGVxr1b2hvZbsiqW5xWH25efTNsLJA8knL"
- "BGUMAp9Gq7iTEuizy4pqaxsTyUCBK68MDfK752saRPUY"
- "CoREENxT6tW1HoK8ypY1SxRMZTcVPm7R94rH4PZNhX7d"
transactions:
stream: TRANSACTIONS
stream_maxlen: 10_000_000
stream_data_key: data
filters:
- account_include:
filter:
account_include:
- BGUMAp9Gq7iTEuizy4pqaxsTyUCBK68MDfK752saRPUY
redis:
url: redis://localhost:6379
Expand Down
4 changes: 2 additions & 2 deletions grpc-ingest/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub struct ConfigGrpcAccounts {
#[serde(default = "ConfigGrpcAccounts::default_stream_data_key")]
pub stream_data_key: String,

pub filters: Vec<ConfigGrpcRequestAccounts>,
pub filter: ConfigGrpcRequestAccounts,
}

impl ConfigGrpcAccounts {
Expand Down Expand Up @@ -90,7 +90,7 @@ pub struct ConfigGrpcTransactions {
#[serde(default = "ConfigGrpcTransactions::default_stream_data_key")]
pub stream_data_key: String,

pub filters: Vec<ConfigGrpcRequestTransactions>,
pub filter: ConfigGrpcRequestTransactions,
}

impl ConfigGrpcTransactions {
Expand Down
30 changes: 26 additions & 4 deletions grpc-ingest/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,21 @@ use {
anyhow::Context,
futures::stream::StreamExt,
redis::{streams::StreamMaxlen, RedisResult, Value as RedisValue},
std::collections::HashMap,
std::{sync::Arc, time::Duration},
tokio::{
task::JoinSet,
time::{sleep, Instant},
},
tracing::warn,
tracing::{info, warn},
yellowstone_grpc_client::GeyserGrpcClient,
yellowstone_grpc_proto::{prelude::subscribe_update::UpdateOneof, prost::Message},
yellowstone_grpc_proto::{
geyser::SubscribeRequest,
prelude::subscribe_update::UpdateOneof,
prelude::{SubscribeRequestFilterAccounts, SubscribeRequestFilterTransactions},
prost::Message,
},
yellowstone_grpc_tools::config::GrpcRequestToProto,
};

pub async fn run(config: ConfigGrpc) -> anyhow::Result<()> {
Expand Down Expand Up @@ -41,7 +48,22 @@ pub async fn run(config: ConfigGrpc) -> anyhow::Result<()> {
.connect()
.await
.context("failed to connect go gRPC")?;
let (mut _subscribe_tx, mut stream) = client.subscribe().await?;

let mut accounts = HashMap::with_capacity(1);
let mut transactions = HashMap::with_capacity(1);

accounts.insert("das".to_string(), config.accounts.filter.clone().to_proto());
transactions.insert(
"das".to_string(),
config.transactions.filter.clone().to_proto(),
);

let request = SubscribeRequest {
accounts,
transactions,
..Default::default()
};
let (mut _subscribe_tx, mut stream) = client.subscribe_with_request(Some(request)).await?;

// recv-send loop
let mut shutdown = create_shutdown()?;
Expand Down Expand Up @@ -117,7 +139,7 @@ pub async fn run(config: ConfigGrpc) -> anyhow::Result<()> {
let result: RedisResult<RedisValue> =
pipe.atomic().query_async(&mut connection).await;

let status = if result.is_ok() { Ok(()) } else { Err(()) };
let status = result.map(|_| ()).map_err(|_| ());
redis_xadd_status_inc(&config.accounts.stream, status, pipe_accounts);
redis_xadd_status_inc(&config.transactions.stream, status, pipe_transactions);

Expand Down

0 comments on commit a17087b

Please sign in to comment.