Skip to content

Commit

Permalink
Replace async_trait usage with RPITIT / AFIT
Browse files Browse the repository at this point in the history
… for our own public traits.
  • Loading branch information
svix-jplatte committed Feb 16, 2024
1 parent 9713ed9 commit 71e7da4
Show file tree
Hide file tree
Showing 10 changed files with 227 additions and 229 deletions.
2 changes: 1 addition & 1 deletion omniqueue/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ description = "An abstraction layer over various queue backends"
authors = ["Svix Inc. <[email protected]>"]
repository = "https://github.com/svix/omniqueue-rs/"
readme = "../README.md"

rust-version = "1.75"
edition = "2021"

[dependencies]
Expand Down
3 changes: 0 additions & 3 deletions omniqueue/src/backends/gcp_pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ impl GcpPubSubProducer {
}
}

#[async_trait]
impl QueueBackend for GcpPubSubBackend {
type Config = GcpPubSubConfig;

Expand Down Expand Up @@ -152,7 +151,6 @@ impl std::fmt::Debug for GcpPubSubProducer {
}
}

#[async_trait]
impl QueueProducer for GcpPubSubProducer {
type Payload = Payload;

Expand Down Expand Up @@ -237,7 +235,6 @@ impl GcpPubSubConsumer {
}
}

#[async_trait]
impl QueueConsumer for GcpPubSubConsumer {
type Payload = Payload;

Expand Down
4 changes: 0 additions & 4 deletions omniqueue/src/backends/memory_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use crate::{

pub struct MemoryQueueBackend;

#[async_trait]
impl QueueBackend for MemoryQueueBackend {
type PayloadIn = Vec<u8>;

Expand Down Expand Up @@ -65,7 +64,6 @@ pub struct MemoryQueueProducer {
tx: mpsc::UnboundedSender<Vec<u8>>,
}

#[async_trait]
impl QueueProducer for MemoryQueueProducer {
type Payload = Vec<u8>;

Expand All @@ -83,7 +81,6 @@ impl QueueProducer for MemoryQueueProducer {
}
}

#[async_trait]
impl ScheduledProducer for MemoryQueueProducer {
async fn send_raw_scheduled(
&self,
Expand Down Expand Up @@ -123,7 +120,6 @@ impl MemoryQueueConsumer {
}
}

#[async_trait]
impl QueueConsumer for MemoryQueueConsumer {
type Payload = Vec<u8>;

Expand Down
4 changes: 0 additions & 4 deletions omniqueue/src/backends/rabbitmq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ async fn producer(
})
}

#[async_trait]
impl QueueBackend for RabbitMqBackend {
type PayloadIn = Vec<u8>;

Expand Down Expand Up @@ -145,7 +144,6 @@ pub struct RabbitMqProducer {
properties: BasicProperties,
}

#[async_trait]
impl QueueProducer for RabbitMqProducer {
type Payload = Vec<u8>;

Expand All @@ -169,7 +167,6 @@ impl QueueProducer for RabbitMqProducer {
}
}

#[async_trait]
impl ScheduledProducer for RabbitMqProducer {
async fn send_raw_scheduled(
&self,
Expand Down Expand Up @@ -218,7 +215,6 @@ impl RabbitMqConsumer {
}
}

#[async_trait]
impl QueueConsumer for RabbitMqConsumer {
type Payload = Vec<u8>;

Expand Down
4 changes: 0 additions & 4 deletions omniqueue/src/backends/redis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ pub type RedisClusterQueueBackend = RedisQueueBackend<RedisClusterConnectionMana

type RawPayload = Vec<u8>;

#[async_trait]
impl<R> QueueBackend for RedisQueueBackend<R>
where
R: RedisConnection,
Expand Down Expand Up @@ -565,7 +564,6 @@ pub struct RedisStreamProducer<M: ManageConnection> {
_background_tasks: Arc<JoinSet<Result<(), QueueError>>>,
}

#[async_trait]
impl<M> QueueProducer for RedisStreamProducer<M>
where
M: ManageConnection,
Expand Down Expand Up @@ -623,7 +621,6 @@ fn from_delayed_queue_key(key: &str) -> Result<RawPayload, QueueError> {
.map_err(QueueError::generic)
}

#[async_trait]
impl<M> ScheduledProducer for RedisStreamProducer<M>
where
M: ManageConnection,
Expand Down Expand Up @@ -691,7 +688,6 @@ where
}
}

#[async_trait]
impl<M> QueueConsumer for RedisStreamConsumer<M>
where
M: ManageConnection,
Expand Down
4 changes: 0 additions & 4 deletions omniqueue/src/backends/sqs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ pub struct SqsConfig {

pub struct SqsQueueBackend;

#[async_trait]
impl QueueBackend for SqsQueueBackend {
type PayloadIn = String;

Expand Down Expand Up @@ -195,7 +194,6 @@ pub struct SqsQueueProducer {
queue_dsn: String,
}

#[async_trait]
impl QueueProducer for SqsQueueProducer {
type Payload = String;

Expand All @@ -216,7 +214,6 @@ impl QueueProducer for SqsQueueProducer {
}
}

#[async_trait]
impl ScheduledProducer for SqsQueueProducer {
async fn send_raw_scheduled(
&self,
Expand Down Expand Up @@ -257,7 +254,6 @@ impl SqsQueueConsumer {
}
}

#[async_trait]
impl QueueConsumer for SqsQueueConsumer {
type Payload = String;

Expand Down
107 changes: 57 additions & 50 deletions omniqueue/src/queue/consumer.rs
Original file line number Diff line number Diff line change
@@ -1,94 +1,101 @@
use async_trait::async_trait;
use std::time::Duration;
use std::{future::Future, pin::Pin, time::Duration};

use crate::{decoding::DecoderRegistry, QueueError, QueuePayload};

use super::Delivery;

#[async_trait]
pub trait QueueConsumer: Send + Sync {
type Payload: QueuePayload;

async fn receive(&mut self) -> Result<Delivery, QueueError>;
fn receive(&mut self) -> impl Future<Output = Result<Delivery, QueueError>> + Send;

async fn receive_all(
fn receive_all(
&mut self,
max_messages: usize,
deadline: Duration,
) -> Result<Vec<Delivery>, QueueError>;
) -> impl Future<Output = Result<Vec<Delivery>, QueueError>> + Send;

fn into_dyn(self, custom_decoders: DecoderRegistry<Vec<u8>>) -> DynConsumer
where
Self: 'static + Sized,
{
DynConsumer::new(self, custom_decoders)
}
}

pub struct DynConsumer(Box<dyn ErasedQueueConsumer>);

impl DynConsumer {
fn new(inner: impl QueueConsumer + 'static, custom_decoders: DecoderRegistry<Vec<u8>>) -> Self {
let c = DynConsumerInner {
inner: self,
inner,
custom_decoders,
};
DynConsumer(Box::new(c))
Self(Box::new(c))
}
}

struct DynConsumerInner<T: QueuePayload, C: 'static + QueueConsumer<Payload = T>> {
trait ErasedQueueConsumer: Send + Sync {
fn receive(
&mut self,
) -> Pin<Box<dyn Future<Output = Result<Delivery, QueueError>> + Send + '_>>;
fn receive_all(
&mut self,
max_messages: usize,
deadline: Duration,
) -> Pin<Box<dyn Future<Output = Result<Vec<Delivery>, QueueError>> + Send + '_>>;
}

struct DynConsumerInner<C> {
inner: C,
custom_decoders: DecoderRegistry<Vec<u8>>,
}

#[async_trait]
impl<T: QueuePayload, C: 'static + QueueConsumer<Payload = T>> QueueConsumer
for DynConsumerInner<T, C>
{
type Payload = Vec<u8>;

async fn receive(&mut self) -> Result<Delivery, QueueError> {
let mut t_payload = self.inner.receive().await?;
let bytes_payload: Option<Vec<u8>> = match t_payload.payload_custom() {
Ok(b) => b,
Err(QueueError::NoDecoderForThisType) => t_payload.take_payload(),
Err(e) => return Err(e),
};

Ok(Delivery {
payload: bytes_payload,
decoders: self.custom_decoders.clone(),
acker: t_payload.acker,
})
}

async fn receive_all(
impl<C: QueueConsumer> ErasedQueueConsumer for DynConsumerInner<C> {
fn receive(
&mut self,
max_messages: usize,
deadline: Duration,
) -> Result<Vec<Delivery>, QueueError> {
let xs = self.inner.receive_all(max_messages, deadline).await?;
let mut out = Vec::with_capacity(xs.len());
for mut t_payload in xs {
) -> Pin<Box<dyn Future<Output = Result<Delivery, QueueError>> + Send + '_>> {
Box::pin(async move {
let mut t_payload = self.inner.receive().await?;
let bytes_payload: Option<Vec<u8>> = match t_payload.payload_custom() {
Ok(b) => b,
Err(QueueError::NoDecoderForThisType) => t_payload.take_payload(),
Err(e) => return Err(e),
};
out.push(Delivery {

Ok(Delivery {
payload: bytes_payload,
decoders: self.custom_decoders.clone(),
acker: t_payload.acker,
});
}
Ok(out)
})
})
}

fn into_dyn(mut self, custom_decoders: DecoderRegistry<Vec<u8>>) -> DynConsumer
where
Self: Sized,
{
self.custom_decoders = custom_decoders;
DynConsumer(Box::new(self))
fn receive_all(
&mut self,
max_messages: usize,
deadline: Duration,
) -> Pin<Box<dyn Future<Output = Result<Vec<Delivery>, QueueError>> + Send + '_>> {
Box::pin(async move {
let xs = self.inner.receive_all(max_messages, deadline).await?;
let mut out = Vec::with_capacity(xs.len());
for mut t_payload in xs {
let bytes_payload: Option<Vec<u8>> = match t_payload.payload_custom() {
Ok(b) => b,
Err(QueueError::NoDecoderForThisType) => t_payload.take_payload(),
Err(e) => return Err(e),
};
out.push(Delivery {
payload: bytes_payload,
decoders: self.custom_decoders.clone(),
acker: t_payload.acker,
});
}
Ok(out)
})
}
}

pub struct DynConsumer(Box<dyn QueueConsumer<Payload = Vec<u8>>>);

#[async_trait]
impl QueueConsumer for DynConsumer {
type Payload = Vec<u8>;

Expand Down
16 changes: 7 additions & 9 deletions omniqueue/src/queue/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::fmt;
use std::{any::TypeId, collections::HashMap, marker::PhantomData, sync::Arc};
use std::{any::TypeId, collections::HashMap, fmt, future::Future, marker::PhantomData, sync::Arc};

use async_trait::async_trait;
use serde::de::DeserializeOwned;
Expand All @@ -21,7 +20,6 @@ pub mod producer;
/// A marker trait with utility functions meant for the creation of new producers and/or consumers.
///
/// This trait is meant to be implemented on an empty struct representing the backend as a whole.
#[async_trait]
pub trait QueueBackend {
type PayloadIn: QueuePayload;
type PayloadOut: QueuePayload;
Expand All @@ -31,21 +29,21 @@ pub trait QueueBackend {

type Config;

async fn new_pair(
fn new_pair(
config: Self::Config,
custom_encoders: EncoderRegistry<Self::PayloadIn>,
custom_decoders: DecoderRegistry<Self::PayloadOut>,
) -> Result<(Self::Producer, Self::Consumer), QueueError>;
) -> impl Future<Output = Result<(Self::Producer, Self::Consumer), QueueError>> + Send;

async fn producing_half(
fn producing_half(
config: Self::Config,
custom_encoders: EncoderRegistry<Self::PayloadIn>,
) -> Result<Self::Producer, QueueError>;
) -> impl Future<Output = Result<Self::Producer, QueueError>> + Send;

async fn consuming_half(
fn consuming_half(
config: Self::Config,
custom_decoders: DecoderRegistry<Self::PayloadOut>,
) -> Result<Self::Consumer, QueueError>;
) -> impl Future<Output = Result<Self::Consumer, QueueError>> + Send;

fn builder(config: Self::Config) -> QueueBuilder<Self, Static>
where
Expand Down
Loading

0 comments on commit 71e7da4

Please sign in to comment.