Skip to content

Commit

Permalink
test: metadata json stream processing
Browse files Browse the repository at this point in the history
  • Loading branch information
kespinola committed Aug 12, 2024
1 parent a151630 commit f398880
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 26 deletions.
7 changes: 3 additions & 4 deletions grpc-ingest/config-ingester.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@ redis:
group: ingester
consumer: consumer # every new ingester instance should have uniq name
streams:
- type: account # possible values: `account`, `metadatajson`, `transaction`, required for message decoding
- type: account
stream: ACCOUNTS
data_key: data
- type: metadatajson
stream: METADATA_JSON
- type: transaction
stream: TRANSACTIONS
xack_batch_max_size: 100
xack_batch_max_idle_ms: 10
xack_max_in_process: 100
- type: metadatajson
stream: METADATA_JSON
prefetch_queue_size: 1_000 # max number of messages available in the read queue for processing
xpending_max: 250 # used for reading pending messages
xpending_only: false # exit once all pending messages consumed (should be applied if you want downscale number of ingesters)
Expand Down
13 changes: 3 additions & 10 deletions grpc-ingest/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,8 @@ use {
},
anyhow::Context,
futures::{channel::mpsc, stream::StreamExt, SinkExt},
lru::LruCache,
opentelemetry_sdk::trace::Config,
redis::{
aio::MultiplexedConnection, streams::StreamMaxlen, Pipeline, RedisResult,
Value as RedisValue,
},
serde::de,
sqlx::pool,
std::{collections::HashMap, num::NonZeroUsize, sync::Arc, time::Duration},
redis::{streams::StreamMaxlen, RedisResult, Value as RedisValue},
std::{collections::HashMap, sync::Arc, time::Duration},
tokio::{
spawn,
sync::Mutex,
Expand All @@ -27,7 +20,6 @@ use {
prelude::*,
},
tracing::{debug, warn},
tracing_subscriber::field::debug,
yellowstone_grpc_client::GeyserGrpcClient,
yellowstone_grpc_proto::{
geyser::{SubscribeRequest, SubscribeUpdate},
Expand Down Expand Up @@ -176,6 +168,7 @@ pub async fn run_v2(config: ConfigGrpc) -> anyhow::Result<()> {
Ok(())
}

#[allow(dead_code)]
pub async fn run(config: ConfigGrpc) -> anyhow::Result<()> {
let config = Arc::new(config);
let (tx, mut rx) = mpsc::channel::<UpdateOneof>(config.geyser_update_message_buffer_size); // Adjust buffer size as needed
Expand Down
15 changes: 6 additions & 9 deletions grpc-ingest/src/ingester.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use {
crate::{
config::{ConfigIngester, ConfigIngesterDownloadMetadata},
download_metadata::{self, TASK_TYPE},
download_metadata::{TASK_TYPE},
postgres::{create_pool as pg_create_pool, metrics_pgpool},
prom::{
download_metadata_inserted_total_inc, program_transformer_task_status_inc,
Expand All @@ -17,28 +17,24 @@ use {
chrono::Utc,
crypto::{digest::Digest, sha2::Sha256},
das_core::{
perform_metadata_json_task, DownloadMetadata, DownloadMetadataInfo,
DownloadMetadata, DownloadMetadataInfo,
DownloadMetadataNotifier,
},
digital_asset_types::dao::{sea_orm_active_enums::TaskStatus, tasks},
futures::{
future::{pending, BoxFuture, FusedFuture, FutureExt},
stream::StreamExt,
Future,
},
opentelemetry_sdk::trace::Config,
program_transformers::{error::ProgramTransformerError, ProgramTransformer},
redis::{aio::MultiplexedConnection, streams::StreamMaxlen},
sea_orm::{
entity::{ActiveModelTrait, ActiveValue},
error::{DbErr, RuntimeErr},
SqlxPostgresConnector,
},
serde::Serialize,
sqlx::{Error as SqlxError, PgPool},
std::{
borrow::Cow,
pin::Pin,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
Expand Down Expand Up @@ -208,7 +204,7 @@ pub async fn run_v2(config: ConfigIngester) -> anyhow::Result<()> {
debug!("Message acknowledged successfully");
}

()

}
}
}
Expand All @@ -226,6 +222,7 @@ pub async fn run_v2(config: ConfigIngester) -> anyhow::Result<()> {
}
Some(signal) = shutdown.next() => {
warn!("{signal} received, waiting spawned tasks...");
exec.push(IngestJob::FlushRedisPipe(Arc::clone(&pipe), connection.clone()));
break;
}
result = &mut redis_tasks_fut => {
Expand Down Expand Up @@ -350,7 +347,7 @@ pub async fn run(config: ConfigIngester) -> anyhow::Result<()> {
ProgramTransformerInfo::Transaction(transaction) => {
pt_transactions.handle_transaction(transaction).await
}
ProgramTransformerInfo::MetadataJson(download_metadata_info) => {
ProgramTransformerInfo::MetadataJson(_download_metadata_info) => {
todo!()
}
};
Expand All @@ -369,7 +366,7 @@ pub async fn run(config: ConfigIngester) -> anyhow::Result<()> {
$error
)
}
ProgramTransformerInfo::MetadataJson(download_metadata_info) => {
ProgramTransformerInfo::MetadataJson(_download_metadata_info) => {
todo!()
}
}
Expand Down
3 changes: 1 addition & 2 deletions grpc-ingest/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@ use {
StreamClaimReply, StreamId, StreamKey, StreamMaxlen, StreamPendingCountReply,
StreamReadOptions, StreamReadReply,
},
AsyncCommands, Cmd, ErrorKind as RedisErrorKind, RedisResult, Value as RedisValue,
AsyncCommands, ErrorKind as RedisErrorKind, RedisResult, Value as RedisValue,
},
serde::{Deserialize, Serialize},
solana_sdk::{pubkey::Pubkey, signature::Signature},
std::{
collections::HashMap,
Expand Down
1 change: 0 additions & 1 deletion program_transformers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use {
entity::EntityTrait, query::Select, ConnectionTrait, DatabaseConnection, DbErr,
SqlxPostgresConnector, TransactionTrait,
},
serde::{Deserialize, Serialize},
solana_sdk::{instruction::CompiledInstruction, pubkey::Pubkey, signature::Signature},
solana_transaction_status::InnerInstructions,
sqlx::PgPool,
Expand Down

0 comments on commit f398880

Please sign in to comment.