Skip to content

Commit

Permalink
chore(blockstore-server): use spawn macro
Browse files Browse the repository at this point in the history
  • Loading branch information
matthias-wright committed May 27, 2024
1 parent 4a202fe commit 04e9492
Showing 1 changed file with 23 additions and 17 deletions.
40 changes: 23 additions & 17 deletions core/blockstore-server/src/blockstore_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ impl<C: Collection> BlockstoreServer<C> {
.take()
.expect("start should never be called twice");
drop(this);
waiter.run_until_shutdown(inner.start()).await;
let panic_waiter = waiter.clone();
waiter.run_until_shutdown(inner.start(panic_waiter)).await;
}
}

Expand Down Expand Up @@ -125,7 +126,7 @@ impl<C: Collection> BlockstoreServerInner<C> {
}
}

pub async fn start(mut self) {
pub async fn start(mut self, waiter: ShutdownWaiter) {
let mut pending_requests: HashMap<
PeerRequest,
broadcast::Sender<Result<(), PeerRequestError>>,
Expand All @@ -137,6 +138,7 @@ impl<C: Collection> BlockstoreServerInner<C> {
tasks.spawn(futures::future::pending());

loop {
let panic_waiter = waiter.clone();
tokio::select! {
req = self.pool_responder.get_next_request() => {
match req {
Expand All @@ -149,21 +151,25 @@ impl<C: Collection> BlockstoreServerInner<C> {
let blockstore = self.blockstore.clone();
let num_responses = self.num_responses.clone();
let rep_reporter = self.rep_reporter.clone();
tokio::spawn(async move {
handle_request::<C>(
req_header.peer,
request,
blockstore,
responder,
num_responses,
rep_reporter,
).await;

increment_counter!(
"blockstore_server_handle_request",
Some("Counter for number of blockstore requests handled by this node")
);
});
spawn!(
async move {
handle_request::<C>(
req_header.peer,
request,
blockstore,
responder,
num_responses,
rep_reporter,
).await;

increment_counter!(
"blockstore_server_handle_request",
Some("Counter for number of blockstore requests handled by this node")
);
},
"BLOCKSTORE-SERVER: HANDLE-REQUEST",
crucial(panic_waiter)
);
} else {
self.num_responses.fetch_sub(1, Ordering::Release);
responder.reject(RejectReason::TooManyRequests);
Expand Down

0 comments on commit 04e9492

Please sign in to comment.