Skip to content

Commit

Permalink
Use axum_server
Browse files Browse the repository at this point in the history
  • Loading branch information
dcadenas committed Oct 15, 2024
1 parent 5a5d3c0 commit 470bddf
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 86 deletions.
20 changes: 20 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ path = "src/main.rs"
anyhow = "1.0.87"
async-trait = "0.1.82"
axum = "0.7.5"
axum-server = "0.7.1"
cached = { version = "0.53.1", features = ["async"] }
chrono = { version = "0.4.38", features = ["serde"] }
config_rs = { version = "0.14", package = "config", features = ["yaml"] }
Expand Down
19 changes: 19 additions & 0 deletions compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ services:
depends_on:
db:
condition: service_healthy
relay:
condition: service_healthy
ports:
- "3000:3000"
- "3001:3001"
Expand Down Expand Up @@ -59,8 +61,25 @@ services:

relay:
image: ghcr.io/planetary-social/nosrelay:latest
environment:
- REDIS_URL=redis://redis:6379
- RELAY_URL=ws://localhost:7777
ports:
- "7777:7777"
depends_on:
redis:
condition: service_healthy

redis:
image: redis:latest
ports:
- "6379:6379"
healthcheck:
test: redis-cli ping
interval: 10s
timeout: 10s
retries: 10
start_period: 60s

db:
image: neo4j:latest
Expand Down
42 changes: 10 additions & 32 deletions src/http_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ use crate::{
};
use anyhow::{Context, Result};
use axum::Router;
use axum_server::Handle;
use moka::future::Cache;
use router::create_router;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::timeout;
use tokio_util::{sync::CancellationToken, task::TaskTracker};
use tracing::{error, info};
use tracing::info;

pub struct AppState<T, U>
where
Expand Down Expand Up @@ -84,40 +84,18 @@ fn start_http_server(
) {
task_tracker.spawn(async move {
let addr = SocketAddr::from(([0, 0, 0, 0], http_port));
let Ok(listener) = tokio::net::TcpListener::bind(addr).await else {
error!("Failed to bind to address: {}", addr);
cancellation_token.cancel();
return;
};

let token_clone = cancellation_token.clone();
let server_future = tokio::spawn(async {
axum::serve(
listener,
router.into_make_service_with_connect_info::<SocketAddr>(),
)
.with_graceful_shutdown(shutdown_hook(token_clone))
let handle = Handle::new();
tokio::spawn(await_shutdown(cancellation_token, handle.clone()));
axum_server::bind(addr)
.handle(handle)
.serve(router.into_make_service_with_connect_info::<SocketAddr>())
.await
.context("Failed to start HTTP server")
});

await_shutdown(cancellation_token, server_future).await;
});
}

async fn await_shutdown(
cancellation_token: CancellationToken,
server_future: tokio::task::JoinHandle<Result<()>>,
) {
cancellation_token.cancelled().await;
info!("Shutdown signal received.");
match timeout(Duration::from_secs(5), server_future).await {
Ok(_) => info!("HTTP service exited successfully."),
Err(e) => info!("HTTP service exited after timeout: {}", e),
}
}

async fn shutdown_hook(cancellation_token: CancellationToken) {
async fn await_shutdown(cancellation_token: CancellationToken, handle: Handle) {
cancellation_token.cancelled().await;
info!("Exiting the process");
info!("Shuting down.");
handle.graceful_shutdown(Some(Duration::from_secs(30)));
}
70 changes: 35 additions & 35 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,47 +40,14 @@ async fn main() -> Result<()> {
let config = Config::new("config")?;
let settings = config.get::<Settings>()?;

if let Err(e) = start(settings).await {
if let Err(e) = start_server(settings).await {
error!("Failed to start the server: {}", e);
}

Ok(())
}

// Listen to ctrl-c, terminate and cancellation token
async fn cancel_on_stop_signals(cancellation_token: CancellationToken) -> Result<()> {
#[cfg(unix)]
let terminate = async {
signal(SignalKind::terminate())
.expect("Failed to install signal handler")
.recv()
.await;
};

#[cfg(not(unix))]
let terminate = std::future::pending::<()>();

tokio::select! {
_ = cancellation_token.cancelled() => {
info!("Starting graceful termination, from cancellation token");
},
_ = signal::ctrl_c() => {
info!("Starting graceful termination, from ctrl-c");
},
_ = terminate => {
info!("Starting graceful termination, from terminate signal");
},
}

cancellation_token.cancel();

info!("Waiting 3 seconds before exiting");
tokio::time::sleep(Duration::from_secs(3)).await;

Ok(())
}

async fn start(settings: Settings) -> Result<()> {
async fn start_server(settings: Settings) -> Result<()> {
info!("Initializing repository at {}", settings.neo4j_uri);
let graph = Graph::new(
&settings.neo4j_uri,
Expand Down Expand Up @@ -199,3 +166,36 @@ async fn start(settings: Settings) -> Result<()> {
info!("Follower server stopped");
Ok(())
}

// Listen to ctrl-c, terminate and cancellation token
async fn cancel_on_stop_signals(cancellation_token: CancellationToken) -> Result<()> {
#[cfg(unix)]
let terminate = async {
signal(SignalKind::terminate())
.expect("Failed to install signal handler")
.recv()
.await;
};

#[cfg(not(unix))]
let terminate = std::future::pending::<()>();

tokio::select! {
_ = cancellation_token.cancelled() => {
info!("Starting graceful termination, from cancellation token");
},
_ = signal::ctrl_c() => {
info!("Starting graceful termination, from ctrl-c");
},
_ = terminate => {
info!("Starting graceful termination, from terminate signal");
},
}

cancellation_token.cancel();

info!("Waiting 3 seconds before exiting");
tokio::time::sleep(Duration::from_secs(3)).await;

Ok(())
}
44 changes: 25 additions & 19 deletions src/worker_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ fn create_dispatcher_task<Item>(
}
}

cancellation_token.cancel();
info!("{}: Worker pool finished", pool_name);
});
}
Expand All @@ -136,28 +135,35 @@ fn create_worker_task<Item, Worker>(
break;
}

Some(item) = worker_rx.recv() => {
trace!("{}: Worker task processing item {:?}", worker_name, item);
let result = timeout(Duration::from_secs(worker_timeout_secs.get() as u64), worker.call(item)).await;

match result {
Ok(Ok(())) => {
trace!("{}: Worker task finished successfully processing item", worker_name);
},
Ok(Err(e)) => {
metrics::worker_failures(pool_name.to_string(), worker_index).increment(1);
error!("{}: Worker failed: {}", worker_name, e);
},
Err(_) => {
metrics::worker_timeouts(pool_name.to_string(), worker_index).increment(1);
error!("{}: Worker task timed out after {} seconds", worker_name, worker_timeout_secs);
}
}
result = worker_rx.recv() => {
match result {
None => {
info!("{}: Worker task finished", worker_name);
break;
}
Some(item) => {
trace!("{}: Worker task processing item {:?}", worker_name, item);
let result = timeout(Duration::from_secs(worker_timeout_secs.get() as u64), worker.call(item)).await;

match result {
Ok(Ok(())) => {
trace!("{}: Worker task finished successfully processing item", worker_name);
},
Ok(Err(e)) => {
metrics::worker_failures(pool_name.to_string(), worker_index).increment(1);
error!("{}: Worker failed: {}", worker_name, e);
},
Err(_) => {
metrics::worker_timeouts(pool_name.to_string(), worker_index).increment(1);
error!("{}: Worker task timed out after {} seconds", worker_name, worker_timeout_secs);
}
}
}
}
}
}
}

cancellation_token.cancel();
info!("{}: Worker task finished", worker_name);
});
}
Expand Down

0 comments on commit 470bddf

Please sign in to comment.