Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

standardize backend for storage and mq #358

Merged
merged 2 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ async fn main() -> {
.register_with_count(2, {
WorkerBuilder::new(format!("email-worker"))
.data(0usize)
.with_storage(storage)
.backend(storage)
.build_fn(send_email)
})
.run()
Expand Down
2 changes: 1 addition & 1 deletion benches/storages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ macro_rules! define_bench {
let worker =
WorkerBuilder::new(format!("{}-bench", $name))
.data(c)
.source(storage)
.backend(storage)
.build_fn(handle_test_job);
worker
})
Expand Down
2 changes: 1 addition & 1 deletion examples/actix-web/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ async fn main() -> Result<()> {
.register_with_count(2, {
WorkerBuilder::new("tasty-avocado")
.layer(TraceLayer::new())
.with_storage(storage)
.backend(storage)
.build_fn(send_email)
})
.run_with_signal(signal::ctrl_c());
Expand Down
2 changes: 1 addition & 1 deletion examples/axum/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ async fn main() -> Result<()> {
.register_with_count(2, {
WorkerBuilder::new("tasty-pear")
.layer(TraceLayer::new())
.with_storage(storage.clone())
.backend(storage.clone())
.build_fn(send_email)
})
.run()
Expand Down
2 changes: 1 addition & 1 deletion examples/basics/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ async fn main() -> Result<(), std::io::Error> {
// Add shared context to all jobs executed by this worker
.data(EmailService::new())
.data(ValidEmailCache::new())
.with_storage(sqlite)
.backend(sqlite)
.build_fn(send_email)
})
.shutdown_timeout(Duration::from_secs(5))
Expand Down
2 changes: 1 addition & 1 deletion examples/mysql/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async fn main() -> Result<()> {
.register_with_count(1, {
WorkerBuilder::new("tasty-avocado")
.layer(TraceLayer::new())
.with_storage(mysql)
.backend(mysql)
.build_fn(send_email)
})
.run()
Expand Down
2 changes: 1 addition & 1 deletion examples/postgres/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ async fn main() -> Result<()> {
WorkerBuilder::new("tasty-orange")
.layer(TraceLayer::new())
.layer(RetryLayer::new(RetryPolicy::retries(5)))
.with_storage(pg)
.backend(pg)
.build_fn(send_email)
})
.on_event(|e| debug!("{e:?}"))
Expand Down
2 changes: 1 addition & 1 deletion examples/prometheus/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ async fn main() -> Result<()> {
.register_with_count(2, {
WorkerBuilder::new("tasty-banana")
.layer(PrometheusLayer)
.with_storage(storage.clone())
.backend(storage.clone())
.build_fn(send_email)
})
.run()
Expand Down
2 changes: 1 addition & 1 deletion examples/redis-deadpool/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ async fn main() -> Result<()> {
produce_jobs(&mut storage).await?;

let worker = WorkerBuilder::new("rango-tango")
.with_storage(storage)
.backend(storage)
.data(pool)
.build_fn(send_email);

Expand Down
2 changes: 1 addition & 1 deletion examples/redis-mq-example/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ async fn main() -> Result<()> {

let worker = WorkerBuilder::new("rango-tango")
.layer(TraceLayer::new())
.with_mq(mq)
.backend(mq)
.build_fn(send_email);

Monitor::<TokioExecutor>::new()
Expand Down
2 changes: 1 addition & 1 deletion examples/redis-with-msg-pack/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ async fn main() -> Result<()> {
produce_jobs(storage.clone()).await?;

let worker = WorkerBuilder::new("rango-tango")
.with_storage(storage)
.backend(storage)
.build_fn(send_email);

Monitor::<TokioExecutor>::new()
Expand Down
2 changes: 1 addition & 1 deletion examples/redis/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ async fn main() -> Result<()> {
.layer(RateLimitLayer::new(5, Duration::from_secs(1)))
.layer(TimeoutLayer::new(Duration::from_millis(500)))
.data(Count::default())
.with_storage(storage)
.backend(storage)
.build_fn(send_email);

Monitor::<TokioExecutor>::new()
Expand Down
8 changes: 4 additions & 4 deletions examples/rest-api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,28 +318,28 @@ async fn main() -> anyhow::Result<()> {
WorkerBuilder::new("tasty-apple")
.layer(SentryJobLayer)
.layer(TraceLayer::new())
.with_storage(worker_storage.clone())
.backend(worker_storage.clone())
.build_fn(send_email)
})
.register_with_count(4, move |c| {
WorkerBuilder::new(format!("tasty-avocado-{c}"))
.layer(SentryJobLayer)
.layer(TraceLayer::new())
.with_storage(sqlite_storage.clone())
.backend(sqlite_storage.clone())
.build_fn(notification_service)
})
.register_with_count(2, move |c| {
WorkerBuilder::new(format!("tasty-banana-{c}"))
.layer(SentryJobLayer)
.layer(TraceLayer::new())
.with_storage(pg_storage.clone())
.backend(pg_storage.clone())
.build_fn(document_service)
})
.register_with_count(2, move |c| {
WorkerBuilder::new(format!("tasty-pear-{c}"))
.layer(SentryJobLayer::new())
.layer(TraceLayer::new())
.with_storage(mysql_storage.clone())
.backend(mysql_storage.clone())
.build_fn(upload_service)
})
.run();
Expand Down
2 changes: 1 addition & 1 deletion examples/sentry/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ async fn main() -> Result<()> {
.layer(NewSentryLayer::new_from_top())
.layer(SentryLayer::new())
.layer(TraceLayer::new())
.with_storage(storage.clone())
.backend(storage.clone())
.build_fn(email_service)
})
.run()
Expand Down
4 changes: 2 additions & 2 deletions examples/sqlite/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,13 @@ async fn main() -> Result<()> {
.register_with_count(2, {
WorkerBuilder::new("tasty-banana")
.layer(TraceLayer::new())
.with_storage(email_storage)
.backend(email_storage)
.build_fn(send_email)
})
.register_with_count(10, {
WorkerBuilder::new("tasty-mango")
.layer(TraceLayer::new())
.with_storage(notification_storage)
.backend(notification_storage)
.build_fn(job::notify)
})
.run()
Expand Down
2 changes: 1 addition & 1 deletion examples/tracing/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ async fn main() -> Result<()> {
.register(
WorkerBuilder::new("tasty-avocado")
.chain(|srv| srv.layer(TraceLayer::new()))
.with_storage(storage)
.backend(storage)
.build_fn(email_service),
)
.run()
Expand Down
40 changes: 3 additions & 37 deletions packages/apalis-core/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,9 @@ use tower::{
use crate::{
error::Error,
layers::extensions::Data,
mq::MessageQueue,
request::Request,
service_fn::service_fn,
service_fn::ServiceFn,
storage::Storage,
worker::{Ready, Worker, WorkerId},
Backend,
};
Expand Down Expand Up @@ -55,7 +53,7 @@ impl<Serv> WorkerBuilder<(), (), Identity, Serv> {
}
}

impl<J, S, M, Serv> WorkerBuilder<J, S, M, Serv> {
impl<J, M, Serv> WorkerBuilder<J, (), M, Serv> {
/// Consume a stream directly
pub fn stream<NS: Stream<Item = Result<Option<Request<NJ>>, Error>> + Send + 'static, NJ>(
self,
Expand All @@ -70,36 +68,8 @@ impl<J, S, M, Serv> WorkerBuilder<J, S, M, Serv> {
}
}

/// Set the source to a [Storage]
pub fn with_storage<NS: Storage<Job = NJ>, NJ>(
self,
storage: NS,
) -> WorkerBuilder<NJ, NS, M, Serv> {
WorkerBuilder {
request: PhantomData,
layer: self.layer,
source: storage,
id: self.id,
service: self.service,
}
}

/// Set the source to a [MessageQueue]
pub fn with_mq<NS: MessageQueue<NJ>, NJ>(
self,
message_queue: NS,
) -> WorkerBuilder<NJ, NS, M, Serv> {
WorkerBuilder {
request: PhantomData,
layer: self.layer,
source: message_queue,
id: self.id,
service: self.service,
}
}

/// Set the source to a generic backend that implements only [Backend]
pub fn source<NS: Backend<Request<NJ>>, NJ>(
/// Set the source to a backend that implements [Backend]
pub fn backend<NS: Backend<Request<NJ>>, NJ>(
self,
backend: NS,
) -> WorkerBuilder<NJ, NS, M, Serv> {
Expand Down Expand Up @@ -168,16 +138,12 @@ where

S::Response: 'static,
M: Layer<S>,
// P::Layer: Layer<S>,
// M: Layer<<P::Layer as Layer<S>>::Service>,
{
type Source = P;

type Service = M::Service;
/// Build a worker, given a tower service
fn build(self, service: S) -> Worker<Ready<Self::Service, P>> {
let worker_id = self.id;
// let common_layer = self.source.common_layer(worker_id.clone());
let poller = self.source;
let middleware = self.layer;
let service = middleware.service(service);
Expand Down
2 changes: 1 addition & 1 deletion packages/apalis-core/src/layers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ pub mod extensions {
///
/// let worker = WorkerBuilder::new("tasty-avocado")
/// .data(state)
/// .source(MemoryStorage::new())
/// .backend(MemoryStorage::new())
/// .build(service_fn(email_service));
/// ```

Expand Down
4 changes: 2 additions & 2 deletions packages/apalis-core/src/monitor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ mod tests {
Ok::<_, io::Error>(request)
});
let worker = WorkerBuilder::new("rango-tango")
.source(backend)
.backend(backend)
.build(service);
let monitor: Monitor<TestExecutor> = Monitor::new();
let monitor = monitor.register(worker);
Expand All @@ -351,7 +351,7 @@ mod tests {
Ok::<_, io::Error>(request)
});
let worker = WorkerBuilder::new("rango-tango")
.source(backend)
.backend(backend)
.build(service);
let monitor: Monitor<TestExecutor> = Monitor::new();
let monitor = monitor.on_event(|e| {
Expand Down
6 changes: 3 additions & 3 deletions packages/apalis-core/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -720,8 +720,8 @@ mod tests {

#[tokio::test]
async fn it_works() {
let backend = MemoryStorage::new();
let mut handle = backend.clone();
let in_memory = MemoryStorage::new();
let mut handle = in_memory.clone();

tokio::spawn(async move {
for i in 0..ITEMS {
Expand Down Expand Up @@ -749,7 +749,7 @@ mod tests {
let worker = WorkerBuilder::new("rango-tango")
// .chain(|svc| svc.timeout(Duration::from_millis(500)))
.data(Count::default())
.source(backend);
.backend(in_memory);
let worker = worker.build_fn(task);
let worker = worker.with_executor(TokioTestExecutor);
let w = worker.clone();
Expand Down
2 changes: 1 addition & 1 deletion packages/apalis-redis/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! Monitor::<TokioExecutor>::new()
//! .register(
//! WorkerBuilder::new("tasty-pear")
//! .source(storage.clone())
//! .backend(storage.clone())
//! .build_fn(send_email),
//! )
//! .run()
Expand Down
2 changes: 1 addition & 1 deletion packages/apalis-sql/src/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
//! .register_with_count(4, {
//! WorkerBuilder::new(&format!("tasty-avocado"))
//! .data(0usize)
//! .source(pg)
//! .backend(pg)
//! .build_fn(send_email)
//! })
//! .run()
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
//! .register_with_count(2, {
//! WorkerBuilder::new(&format!("quick-sand"))
//! .data(0usize)
//! .source(storage.clone())
//! .backend(storage.clone())
//! .build_fn(send_email)
//! })
//! .run()
Expand Down
Loading