Skip to content

Commit

Permalink
Use tokio-amqp, fix changes for latest master
Browse files Browse the repository at this point in the history
  • Loading branch information
Danny Browning committed Apr 14, 2021
1 parent 92dc8a1 commit 5cdbd5a
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 40 deletions.
29 changes: 14 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 3 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,7 @@ smpl_jwt = { version = "0.6.1", default-features = false, optional = true }

# Amqp
lapin = { version = "1.6", optional = true }
async-lapin = { version = "1.2", optional = true }
smol = { version = "1.2", optional = true }
tokio-amqp = { version = "1.0", optional = true }

# API
async-graphql = { version = "=2.6.4", default-features = false, optional = true, features = ["chrono"] }
Expand Down Expand Up @@ -389,7 +388,7 @@ sources-metrics = [
"sources-vector",
]

sources-amqp = ["lapin", "async-lapin", "smol"]
sources-amqp = ["lapin", "tokio-amqp"]
sources-apache_metrics = []
sources-aws_ecs_metrics = []
sources-aws_kinesis_firehose = ["base64", "sources-utils-tls", "warp"]
Expand Down Expand Up @@ -544,7 +543,7 @@ sinks-metrics = [
"sinks-vector"
]

sinks-amqp = ["lapin", "async-lapin", "smol"]
sinks-amqp = ["lapin", "tokio-amqp"]
sinks-aws_cloudwatch_logs = ["rusoto", "rusoto_logs"]
sinks-aws_cloudwatch_metrics = ["rusoto", "rusoto_cloudwatch"]
sinks-aws_kinesis_firehose = ["rusoto", "rusoto_firehose"]
Expand Down
14 changes: 2 additions & 12 deletions src/amqp.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use serde::{Deserialize, Serialize};
use std::time::Duration;
use async_lapin::*;
use tokio_amqp::*;

#[derive(Clone, Debug, Deserialize, Serialize)]
pub(crate) struct AmqpConfig {
Expand All @@ -25,16 +25,6 @@ impl Default for AmqpConfig {
}
}

#[derive(Debug)]
struct SmolExecutor;

impl lapin::executor::Executor for SmolExecutor {
fn spawn(&self, f: std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>) -> lapin::Result<()> {
smol::spawn(f).detach();
Ok(())
}
}

impl AmqpConfig {
fn connection_string(&self) -> String {
let mut user = String::default();
Expand Down Expand Up @@ -66,7 +56,7 @@ impl AmqpConfig {
pub async fn connect(&self) -> Result<(lapin::Connection, lapin::Channel), lapin::Error> {
let addr = self.connection_string();
info!("Connecting to {}", addr);
let conn = lapin::Connection::connect(&addr, lapin::ConnectionProperties::default().with_async_io(SmolExecutor)).await?;
let conn = lapin::Connection::connect(&addr, lapin::ConnectionProperties::default().with_tokio()).await?;
let channel = conn.create_channel().await?;
Ok((conn, channel))
}
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/amqp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ mod integration_test {
out.push(s);
} else {
failures += 1;
tokio::time::delay_for(Duration::from_millis(50)).await;
tokio::time::sleep(Duration::from_millis(50)).await;
}
}

Expand Down
14 changes: 6 additions & 8 deletions src/sources/amqp.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
config::{log_schema, DataType, GlobalOptions, SourceConfig, SourceDescription},
config::{log_schema, DataType, SourceConfig, SourceDescription},
event::{Event, Value},
internal_events::{AmqpEventFailed, AmqpEventReceived, AmqpCommitFailed, AmqpConsumerFailed},
amqp::AmqpConfig,
Expand All @@ -12,6 +12,7 @@ use futures::{FutureExt, SinkExt, StreamExt};
use serde::{Deserialize, Serialize};
use snafu::Snafu;
use lapin::{Connection, Channel};
use crate::config::SourceContext;

#[derive(Debug, Snafu)]
enum BuildError {
Expand Down Expand Up @@ -56,12 +57,9 @@ impl_generate_config_from_default!(AmqpSourceConfig);
impl SourceConfig for AmqpSourceConfig {
async fn build(
&self,
_name: &str,
_globals: &GlobalOptions,
shutdown: ShutdownSignal,
out: Pipeline,
cx: SourceContext,
) -> crate::Result<super::Source> {
amqp_source(self, shutdown, out).await
amqp_source(self, cx.shutdown, cx.out).await
}

fn output_type(&self) -> DataType {
Expand Down Expand Up @@ -252,8 +250,8 @@ mod integration_test {
let mut config = make_config();
config.consumer = consumer;
config.queue = queue;
config.key_field = Some("message_key".to_string());
config.topic_key = Some("exchange".to_string());
config.routing_key = Some("message_key".to_string());
config.exchange_key = Some("exchange".to_string());
let (_conn, channel) = config.connection.connect().await.unwrap();

let mut exchange_opts = lapin::options::ExchangeDeclareOptions::default();
Expand Down

0 comments on commit 5cdbd5a

Please sign in to comment.