diff --git a/yellowstone-grpc-kafka/config.json b/yellowstone-grpc-kafka/config.json index 1a2a5394..a3a4e6cc 100644 --- a/yellowstone-grpc-kafka/config.json +++ b/yellowstone-grpc-kafka/config.json @@ -2,15 +2,6 @@ "kafka": { "bootstrap.servers": "localhost:29092" }, - "output": { - "kafka_topic": "grpc", - "kafka": { - "group.id": "output", - "group.instance.id": "output" - }, - "listen": "127.0.0.1:10001", - "channel_capacity": 250000 - }, "input": { "endpoint": "http://127.0.0.1:10000", "x_token": "", @@ -26,5 +17,14 @@ }, "kafka_topic": "grpc", "kafka": {} + }, + "output": { + "kafka_topic": "grpc", + "kafka": { + "group.id": "output", + "group.instance.id": "output" + }, + "listen": "127.0.0.1:10001", + "channel_capacity": 250000 } } diff --git a/yellowstone-grpc-kafka/src/main.rs b/yellowstone-grpc-kafka/src/main.rs index ffabd60a..11bccffe 100644 --- a/yellowstone-grpc-kafka/src/main.rs +++ b/yellowstone-grpc-kafka/src/main.rs @@ -46,69 +46,33 @@ struct Args { enum ArgsAction { // Receive data from Kafka, deduplicate and send them back to Kafka // TODO: Dedup - /// Receive data from Kafka and send them over gRPC - Read, /// Receive data from gRPC and send them to the Kafka - Write, + Input, + /// Receive data from Kafka and send them over gRPC + Output, } impl ArgsAction { async fn run(self, config: Config, kafka_config: ClientConfig) -> anyhow::Result<()> { match self { - ArgsAction::Read => { - let output_config = match config.output { - Some(config) => config, - None => anyhow::bail!("`output` section in config should be defined"), - }; - Self::read(kafka_config, output_config).await - } - ArgsAction::Write => { + ArgsAction::Input => { let input_config = match config.input { Some(config) => config, None => anyhow::bail!("`input` section in config should be defined"), }; - Self::write(kafka_config, input_config).await + Self::input(kafka_config, input_config).await } - } - } - - async fn read( - mut kafka_config: ClientConfig, - output_config: ConfigOutput, - ) -> anyhow::Result<()> { - for (key, value) in output_config.kafka.into_iter() { - kafka_config.set(key, value); - } - - let grpc_tx = GrpcService::run(output_config.listen, output_config.channel_capacity)?; - - let consumer: StreamConsumer = kafka_config.create()?; - consumer.subscribe(&[&output_config.kafka_topic])?; - - loop { - let message = consumer.recv().await?; - debug!( - "received message with key: {:?}", - message - .key() - .and_then(|k| k.try_into().ok()) - .map(u64::from_be_bytes) - ); - - if let Some(payload) = message.payload() { - match SubscribeUpdate::decode(payload) { - Ok(message) => { - let _ = grpc_tx.send(message); - } - Err(error) => { - warn!("failed to decode message: {error}"); - } - } + ArgsAction::Output => { + let output_config = match config.output { + Some(config) => config, + None => anyhow::bail!("`output` section in config should be defined"), + }; + Self::output(kafka_config, output_config).await } } } - async fn write( + async fn input( mut kafka_config: ClientConfig, input_config: ConfigInput, ) -> anyhow::Result<()> { @@ -197,6 +161,42 @@ impl ArgsAction { } } } + + async fn output( + mut kafka_config: ClientConfig, + output_config: ConfigOutput, + ) -> anyhow::Result<()> { + for (key, value) in output_config.kafka.into_iter() { + kafka_config.set(key, value); + } + + let grpc_tx = GrpcService::run(output_config.listen, output_config.channel_capacity)?; + + let consumer: StreamConsumer = kafka_config.create()?; + consumer.subscribe(&[&output_config.kafka_topic])?; + + loop { + let message = consumer.recv().await?; + debug!( + "received message with key: {:?}", + message + .key() + .and_then(|k| k.try_into().ok()) + .map(u64::from_be_bytes) + ); + + if let Some(payload) = message.payload() { + match SubscribeUpdate::decode(payload) { + Ok(message) => { + let _ = grpc_tx.send(message); + } + Err(error) => { + warn!("failed to decode message: {error}"); + } + } + } + } + } } #[tokio::main]