Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

patch: 0.5.4 #407

Merged
merged 1 commit into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
- uses: actions-rs/cargo@v1
with:
command: check
args: --features async-std-comp --no-default-features
args: --workspace --all-features

test:
name: Test Suite
Expand All @@ -39,7 +39,7 @@ jobs:
- uses: actions-rs/cargo@v1
with:
command: test
args: --features tokio-comp
args: --workspace --all-features


fmt:
Expand Down Expand Up @@ -72,4 +72,4 @@ jobs:
- uses: actions-rs/cargo@v1
with:
command: clippy
args: --features tokio-comp -- -D warnings
args: --workspace --all-features
2 changes: 1 addition & 1 deletion examples/async-std-runtime/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ impl From<DateTime<Utc>> for Reminder {

async fn send_in_background(reminder: Reminder) {
apalis_core::sleep(Duration::from_secs(2)).await;
debug!("Called at {reminder:?}");
debug!("Called at {:?}", reminder.0);
}
async fn send_reminder(reminder: Reminder, worker: WorkerCtx) -> bool {
// this will happen in the workers background and wont block the next tasks
Expand Down
2 changes: 1 addition & 1 deletion examples/basics/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ async fn main() -> Result<(), std::io::Error> {

Monitor::<TokioExecutor>::new()
.register_with_count(2, {
WorkerBuilder::new("tasty-banana".to_string())
WorkerBuilder::new("tasty-banana")
.layer(TraceLayer::new())
.layer(LogLayer::new("some-log-example"))
// Add shared context to all jobs executed by this worker
Expand Down
2 changes: 1 addition & 1 deletion examples/mysql/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ async fn main() -> Result<()> {

Monitor::new_with_executor(TokioExecutor)
.register_with_count(1, {
WorkerBuilder::new(format!("tasty-avocado"))
WorkerBuilder::new("tasty-avocado")
.layer(TraceLayer::new())
.with_storage(mysql)
.build_fn(send_email)
Expand Down
3 changes: 1 addition & 2 deletions examples/postgres/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ use anyhow::Result;
use apalis::layers::retry::RetryPolicy;
use apalis::postgres::PgPool;
use apalis::prelude::*;
use apalis::{layers::tracing::TraceLayer, postgres::PostgresStorage};
use apalis::{layers::retry::RetryLayer, layers::tracing::TraceLayer, postgres::PostgresStorage};
use email_service::{send_email, Email};
use tower::retry::RetryLayer;
use tracing::{debug, info};

async fn produce_jobs(storage: &PostgresStorage<Email>) -> Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion examples/tracing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ license = "MIT OR Apache-2.0"
anyhow = "1"
apalis = { path = "../../", features = ["redis"] }
serde = "1"
tokio = { version ="1", features = ["macros"]}
tokio = { version ="1", features = ["full"]}
env_logger = "0.10"
tracing-subscriber = { version = "0.3.11", features = ["env-filter", "json"] }
chrono = { version = "0.4", default-features = false, features = ["clock"] }
Expand Down
3 changes: 2 additions & 1 deletion examples/tracing/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ impl fmt::Display for InvalidEmailError {

impl Error for InvalidEmailError {}

async fn email_service(_email: Email) {
async fn email_service(email: Email) -> Result<(), InvalidEmailError> {
tracing::info!("Checking if dns configured");
sleep(Duration::from_millis(1008)).await;
tracing::info!("Sent in 1 sec");
Err(InvalidEmailError { email: email.to })
}

async fn produce_jobs(mut storage: RedisStorage<Email>) -> Result<()> {
Expand Down
6 changes: 3 additions & 3 deletions packages/apalis-redis/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@
async fn len(&self) -> Result<i64, RedisError> {
let mut conn = self.conn.clone();
let all_jobs: i64 = redis::cmd("HLEN")
.arg(&self.queue.job_data_hash.to_string())
.arg(self.queue.job_data_hash.to_string())
.query_async(&mut conn)
.await?;
let done_jobs: i64 = redis::cmd("ZCOUNT")
Expand All @@ -511,7 +511,7 @@
async fn fetch_by_id(&self, job_id: &TaskId) -> Result<Option<Request<Self::Job>>, RedisError> {
let mut conn = self.conn.clone();
let data: Value = redis::cmd("HMGET")
.arg(&self.queue.job_data_hash.to_string())
.arg(self.queue.job_data_hash.to_string())
.arg(job_id.to_string())
.query_async(&mut conn)
.await?;
Expand All @@ -538,7 +538,7 @@
.encode(&job)
.map_err(|e| (ErrorKind::IoError, "Encode error", e.to_string()))?;
let _: i64 = redis::cmd("HSET")
.arg(&self.queue.job_data_hash.to_string())
.arg(self.queue.job_data_hash.to_string())
.arg(job.ctx.id.to_string())
.arg(bytes)
.query_async(&mut conn)
Expand Down Expand Up @@ -787,7 +787,7 @@
.expect("failed to Flushdb");
}

struct DummyService {}

Check warning on line 790 in packages/apalis-redis/src/storage.rs

View workflow job for this annotation

GitHub Actions / Test Suite

struct `DummyService` is never constructed

fn example_email() -> Email {
Email {
Expand Down Expand Up @@ -910,7 +910,7 @@
let worker_id = register_worker_at(&mut storage).await;

let _job = consume_one(&mut storage, &worker_id).await;
let result = storage

Check warning on line 913 in packages/apalis-redis/src/storage.rs

View workflow job for this annotation

GitHub Actions / Test Suite

unused variable: `result`
.reenqueue_orphaned(5, 300)
.await
.expect("failed to reenqueue_orphaned");
Expand Down
5 changes: 2 additions & 3 deletions src/layers/prometheus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,9 @@ where
("name", this.operation.to_string()),
("namespace", this.job_type.to_string()),
("status", status),
("latency", latency.to_string()),
];
metrics::counter!("requests_total", &labels);
metrics::histogram!("request_duration_seconds", &labels);
metrics::counter!("requests_total", &labels).increment(1);
metrics::histogram!("request_duration_seconds", &labels).record(latency);
Poll::Ready(response)
}
}
8 changes: 4 additions & 4 deletions src/layers/retry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,22 @@ impl<T, Res> Policy<Req<T>, Res, Err> for RetryPolicy
where
T: Clone,
{
type Future = future::Ready<Self>;
type Future = future::Ready<()>;

fn retry(&self, req: &Req<T>, result: Result<&Res, &Err>) -> Option<Self::Future> {
fn retry(&mut self, req: &mut Req<T>, result: &mut Result<Res, Err>) -> Option<Self::Future> {
let ctx = req.get::<Attempt>().cloned().unwrap_or_default();
match result {
Ok(_) => {
// Treat all `Response`s as success,
// so don't retry...
None
}
Err(_) if (self.retries - ctx.current() > 0) => Some(future::ready(self.clone())),
Err(_) if (self.retries - ctx.current() > 0) => Some(future::ready(())),
Err(_) => None,
}
}

fn clone_request(&self, req: &Req<T>) -> Option<Req<T>> {
fn clone_request(&mut self, req: &Req<T>) -> Option<Req<T>> {
let mut req = req.clone();
let value = req
.get::<Attempt>()
Expand Down
Loading