Skip to content

Commit

Permalink
rename to input/output
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid committed Aug 24, 2023
1 parent 3115e8b commit 4255dee
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 57 deletions.
18 changes: 9 additions & 9 deletions yellowstone-grpc-kafka/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,6 @@
"kafka": {
"bootstrap.servers": "localhost:29092"
},
"output": {
"kafka_topic": "grpc",
"kafka": {
"group.id": "output",
"group.instance.id": "output"
},
"listen": "127.0.0.1:10001",
"channel_capacity": 250000
},
"input": {
"endpoint": "http://127.0.0.1:10000",
"x_token": "",
Expand All @@ -26,5 +17,14 @@
},
"kafka_topic": "grpc",
"kafka": {}
},
"output": {
"kafka_topic": "grpc",
"kafka": {
"group.id": "output",
"group.instance.id": "output"
},
"listen": "127.0.0.1:10001",
"channel_capacity": 250000
}
}
96 changes: 48 additions & 48 deletions yellowstone-grpc-kafka/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,69 +46,33 @@ struct Args {
enum ArgsAction {
// Receive data from Kafka, deduplicate and send them back to Kafka
// TODO: Dedup
/// Receive data from Kafka and send them over gRPC
Read,
/// Receive data from gRPC and send them to the Kafka
Write,
Input,
/// Receive data from Kafka and send them over gRPC
Output,
}

impl ArgsAction {
async fn run(self, config: Config, kafka_config: ClientConfig) -> anyhow::Result<()> {
match self {
ArgsAction::Read => {
let output_config = match config.output {
Some(config) => config,
None => anyhow::bail!("`output` section in config should be defined"),
};
Self::read(kafka_config, output_config).await
}
ArgsAction::Write => {
ArgsAction::Input => {
let input_config = match config.input {
Some(config) => config,
None => anyhow::bail!("`input` section in config should be defined"),
};
Self::write(kafka_config, input_config).await
Self::input(kafka_config, input_config).await
}
}
}

async fn read(
mut kafka_config: ClientConfig,
output_config: ConfigOutput,
) -> anyhow::Result<()> {
for (key, value) in output_config.kafka.into_iter() {
kafka_config.set(key, value);
}

let grpc_tx = GrpcService::run(output_config.listen, output_config.channel_capacity)?;

let consumer: StreamConsumer = kafka_config.create()?;
consumer.subscribe(&[&output_config.kafka_topic])?;

loop {
let message = consumer.recv().await?;
debug!(
"received message with key: {:?}",
message
.key()
.and_then(|k| k.try_into().ok())
.map(u64::from_be_bytes)
);

if let Some(payload) = message.payload() {
match SubscribeUpdate::decode(payload) {
Ok(message) => {
let _ = grpc_tx.send(message);
}
Err(error) => {
warn!("failed to decode message: {error}");
}
}
ArgsAction::Output => {
let output_config = match config.output {
Some(config) => config,
None => anyhow::bail!("`output` section in config should be defined"),
};
Self::output(kafka_config, output_config).await
}
}
}

async fn write(
async fn input(
mut kafka_config: ClientConfig,
input_config: ConfigInput,
) -> anyhow::Result<()> {
Expand Down Expand Up @@ -197,6 +161,42 @@ impl ArgsAction {
}
}
}

async fn output(
mut kafka_config: ClientConfig,
output_config: ConfigOutput,
) -> anyhow::Result<()> {
for (key, value) in output_config.kafka.into_iter() {
kafka_config.set(key, value);
}

let grpc_tx = GrpcService::run(output_config.listen, output_config.channel_capacity)?;

let consumer: StreamConsumer = kafka_config.create()?;
consumer.subscribe(&[&output_config.kafka_topic])?;

loop {
let message = consumer.recv().await?;
debug!(
"received message with key: {:?}",
message
.key()
.and_then(|k| k.try_into().ok())
.map(u64::from_be_bytes)
);

if let Some(payload) = message.payload() {
match SubscribeUpdate::decode(payload) {
Ok(message) => {
let _ = grpc_tx.send(message);
}
Err(error) => {
warn!("failed to decode message: {error}");
}
}
}
}
}
}

#[tokio::main]
Expand Down

0 comments on commit 4255dee

Please sign in to comment.