Skip to content

Commit

Permalink
fix: standardize backend for storage and mq
Browse files Browse the repository at this point in the history
  • Loading branch information
geofmureithi committed Jul 10, 2024
1 parent d5496ff commit a76dc1f
Show file tree
Hide file tree
Showing 18 changed files with 26 additions and 45 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ async fn main() -> {
.register_with_count(2, {
WorkerBuilder::new(format!("email-worker"))
.data(0usize)
.with_storage(storage)
.backend(storage)
.build_fn(send_email)
})
.run()
Expand Down
2 changes: 1 addition & 1 deletion examples/actix-web/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ async fn main() -> Result<()> {
.register_with_count(2, {
WorkerBuilder::new("tasty-avocado")
.layer(TraceLayer::new())
.with_storage(storage)
.backend(storage)
.build_fn(send_email)
})
.run_with_signal(signal::ctrl_c());
Expand Down
2 changes: 1 addition & 1 deletion examples/axum/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ async fn main() -> Result<()> {
.register_with_count(2, {
WorkerBuilder::new("tasty-pear")
.layer(TraceLayer::new())
.with_storage(storage.clone())
.backend(storage.clone())
.build_fn(send_email)
})
.run()
Expand Down
2 changes: 1 addition & 1 deletion examples/basics/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ async fn main() -> Result<(), std::io::Error> {
// Add shared context to all jobs executed by this worker
.data(EmailService::new())
.data(ValidEmailCache::new())
.with_storage(sqlite)
.backend(sqlite)
.build_fn(send_email)
})
.shutdown_timeout(Duration::from_secs(5))
Expand Down
2 changes: 1 addition & 1 deletion examples/mysql/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async fn main() -> Result<()> {
.register_with_count(1, {
WorkerBuilder::new("tasty-avocado")
.layer(TraceLayer::new())
.with_storage(mysql)
.backend(mysql)
.build_fn(send_email)
})
.run()
Expand Down
2 changes: 1 addition & 1 deletion examples/postgres/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ async fn main() -> Result<()> {
WorkerBuilder::new("tasty-orange")
.layer(TraceLayer::new())
.layer(RetryLayer::new(RetryPolicy::retries(5)))
.with_storage(pg)
.backend(pg)
.build_fn(send_email)
})
.on_event(|e| debug!("{e:?}"))
Expand Down
2 changes: 1 addition & 1 deletion examples/prometheus/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ async fn main() -> Result<()> {
.register_with_count(2, {
WorkerBuilder::new("tasty-banana")
.layer(PrometheusLayer)
.with_storage(storage.clone())
.backend(storage.clone())
.build_fn(send_email)
})
.run()
Expand Down
2 changes: 1 addition & 1 deletion examples/redis-deadpool/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ async fn main() -> Result<()> {
produce_jobs(&mut storage).await?;

let worker = WorkerBuilder::new("rango-tango")
.with_storage(storage)
.backend(storage)
.data(pool)
.build_fn(send_email);

Expand Down
2 changes: 1 addition & 1 deletion examples/redis-mq-example/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ async fn main() -> Result<()> {

let worker = WorkerBuilder::new("rango-tango")
.layer(TraceLayer::new())
.with_mq(mq)
.backend(mq)
.build_fn(send_email);

Monitor::<TokioExecutor>::new()
Expand Down
2 changes: 1 addition & 1 deletion examples/redis-with-msg-pack/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ async fn main() -> Result<()> {
produce_jobs(storage.clone()).await?;

let worker = WorkerBuilder::new("rango-tango")
.with_storage(storage)
.backend(storage)
.build_fn(send_email);

Monitor::<TokioExecutor>::new()
Expand Down
2 changes: 1 addition & 1 deletion examples/redis/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ async fn main() -> Result<()> {
.layer(RateLimitLayer::new(5, Duration::from_secs(1)))
.layer(TimeoutLayer::new(Duration::from_millis(500)))
.data(Count::default())
.with_storage(storage)
.backend(storage)
.build_fn(send_email);

Monitor::<TokioExecutor>::new()
Expand Down
8 changes: 4 additions & 4 deletions examples/rest-api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,28 +318,28 @@ async fn main() -> anyhow::Result<()> {
WorkerBuilder::new("tasty-apple")
.layer(SentryJobLayer)
.layer(TraceLayer::new())
.with_storage(worker_storage.clone())
.backend(worker_storage.clone())
.build_fn(send_email)
})
.register_with_count(4, move |c| {
WorkerBuilder::new(format!("tasty-avocado-{c}"))
.layer(SentryJobLayer)
.layer(TraceLayer::new())
.with_storage(sqlite_storage.clone())
.backend(sqlite_storage.clone())
.build_fn(notification_service)
})
.register_with_count(2, move |c| {
WorkerBuilder::new(format!("tasty-banana-{c}"))
.layer(SentryJobLayer)
.layer(TraceLayer::new())
.with_storage(pg_storage.clone())
.backend(pg_storage.clone())
.build_fn(document_service)
})
.register_with_count(2, move |c| {
WorkerBuilder::new(format!("tasty-pear-{c}"))
.layer(SentryJobLayer::new())
.layer(TraceLayer::new())
.with_storage(mysql_storage.clone())
.backend(mysql_storage.clone())
.build_fn(upload_service)
})
.run();
Expand Down
2 changes: 1 addition & 1 deletion examples/sentry/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ async fn main() -> Result<()> {
.layer(NewSentryLayer::new_from_top())
.layer(SentryLayer::new())
.layer(TraceLayer::new())
.with_storage(storage.clone())
.backend(storage.clone())
.build_fn(email_service)
})
.run()
Expand Down
4 changes: 2 additions & 2 deletions examples/sqlite/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,13 @@ async fn main() -> Result<()> {
.register_with_count(2, {
WorkerBuilder::new("tasty-banana")
.layer(TraceLayer::new())
.with_storage(email_storage)
.backend(email_storage)
.build_fn(send_email)
})
.register_with_count(10, {
WorkerBuilder::new("tasty-mango")
.layer(TraceLayer::new())
.with_storage(notification_storage)
.backend(notification_storage)
.build_fn(job::notify)
})
.run()
Expand Down
2 changes: 1 addition & 1 deletion examples/tracing/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ async fn main() -> Result<()> {
.register(
WorkerBuilder::new("tasty-avocado")
.chain(|srv| srv.layer(TraceLayer::new()))
.with_storage(storage)
.backend(storage)
.build_fn(email_service),
)
.run()
Expand Down
23 changes: 2 additions & 21 deletions packages/apalis-core/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use tower::{
use crate::{
error::Error,
layers::extensions::Data,
mq::MessageQueue,
request::Request,
service_fn::service_fn,
service_fn::ServiceFn,
Expand Down Expand Up @@ -84,22 +83,8 @@ impl<J, S, M, Serv> WorkerBuilder<J, S, M, Serv> {
}
}

/// Set the source to a [MessageQueue]
pub fn with_mq<NS: MessageQueue<NJ>, NJ>(
self,
message_queue: NS,
) -> WorkerBuilder<NJ, NS, M, Serv> {
WorkerBuilder {
request: PhantomData,
layer: self.layer,
source: message_queue,
id: self.id,
service: self.service,
}
}

/// Set the source to a generic backend that implements only [Backend]
pub fn source<NS: Backend<Request<NJ>>, NJ>(
/// Set the source to a backend that implements only [Backend]
pub fn backend<NS: Backend<Request<NJ>>, NJ>(
self,
backend: NS,
) -> WorkerBuilder<NJ, NS, M, Serv> {
Expand Down Expand Up @@ -168,16 +153,12 @@ where

S::Response: 'static,
M: Layer<S>,
// P::Layer: Layer<S>,
// M: Layer<<P::Layer as Layer<S>>::Service>,
{
type Source = P;

type Service = M::Service;
/// Build a worker, given a tower service
fn build(self, service: S) -> Worker<Ready<Self::Service, P>> {
let worker_id = self.id;
// let common_layer = self.source.common_layer(worker_id.clone());
let poller = self.source;
let middleware = self.layer;
let service = middleware.service(service);
Expand Down
4 changes: 2 additions & 2 deletions packages/apalis-core/src/monitor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ mod tests {
Ok::<_, io::Error>(request)
});
let worker = WorkerBuilder::new("rango-tango")
.source(backend)
.backend(backend)
.build(service);
let monitor: Monitor<TestExecutor> = Monitor::new();
let monitor = monitor.register(worker);
Expand All @@ -351,7 +351,7 @@ mod tests {
Ok::<_, io::Error>(request)
});
let worker = WorkerBuilder::new("rango-tango")
.source(backend)
.backend(backend)
.build(service);
let monitor: Monitor<TestExecutor> = Monitor::new();
let monitor = monitor.on_event(|e| {
Expand Down
6 changes: 3 additions & 3 deletions packages/apalis-core/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -720,8 +720,8 @@ mod tests {

#[tokio::test]
async fn it_works() {
let backend = MemoryStorage::new();
let mut handle = backend.clone();
let in_memory = MemoryStorage::new();
let mut handle = in_memory.clone();

tokio::spawn(async move {
for i in 0..ITEMS {
Expand Down Expand Up @@ -749,7 +749,7 @@ mod tests {
let worker = WorkerBuilder::new("rango-tango")
// .chain(|svc| svc.timeout(Duration::from_millis(500)))
.data(Count::default())
.source(backend);
.backend(in_memory);
let worker = worker.build_fn(task);
let worker = worker.with_executor(TokioTestExecutor);
let w = worker.clone();
Expand Down

0 comments on commit a76dc1f

Please sign in to comment.