Skip to content

Commit

Permalink
Merge pull request #3293 from autonomys/gateway-http-server-types
Browse files Browse the repository at this point in the history
Simplify gateway HTTP server code using typed parameters
  • Loading branch information
teor2345 authored Dec 10, 2024
2 parents d53af9e + 21f997d commit 6a6cfb6
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 40 deletions.
4 changes: 2 additions & 2 deletions crates/subspace-gateway/src/commands/http.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! Gateway http command.
//! This command start an HTTP server to serve object requests.
//! This command starts an HTTP server to serve object requests.
pub(crate) mod server;

Expand All @@ -22,7 +22,7 @@ pub(crate) struct HttpCommandOptions {
http_listen_on: String,
}

/// Runs an HTTP server
/// Runs an HTTP server which fetches DSN objects based on object hashes.
pub async fn run(run_options: HttpCommandOptions) -> anyhow::Result<()> {
let signal = shutdown_signal();

Expand Down
64 changes: 28 additions & 36 deletions crates/subspace-gateway/src/commands/http/server.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! HTTP server which fetches objects from the DSN based on a hash, using a mapping indexer service.
use actix_web::{web, App, HttpResponse, HttpServer, Responder};
use serde::{Deserialize, Deserializer, Serialize};
use std::default::Default;
Expand All @@ -9,6 +11,7 @@ use subspace_data_retrieval::object_fetcher::ObjectFetcher;
use subspace_data_retrieval::piece_getter::PieceGetter;
use tracing::{debug, error, trace};

/// Parameters for the DSN object HTTP server.
pub(crate) struct ServerParameters<PG>
where
PG: PieceGetter + Send + Sync + 'static,
Expand All @@ -18,6 +21,7 @@ where
pub(crate) http_endpoint: String,
}

/// Object mapping format from the indexer service.
#[derive(Serialize, Deserialize, Debug, Default)]
#[serde(rename_all = "camelCase")]
struct ObjectMapping {
Expand All @@ -28,6 +32,7 @@ struct ObjectMapping {
block_number: BlockNumber,
}

/// Utility function to deserialize a JSON string into a u32.
fn string_to_u32<'de, D>(deserializer: D) -> Result<u32, D::Error>
where
D: Deserializer<'de>,
Expand All @@ -36,68 +41,52 @@ where
s.parse::<u32>().map_err(serde::de::Error::custom)
}

async fn request_object_mappings(endpoint: String, key: String) -> anyhow::Result<ObjectMapping> {
/// Requests an object mapping with `hash` from the indexer service.
async fn request_object_mapping(endpoint: &str, hash: Blake3Hash) -> anyhow::Result<ObjectMapping> {
let client = reqwest::Client::new();
let object_mappings_url = format!("http://{}/objects/{}", endpoint, key,);
let object_mappings_url = format!("http://{}/objects/{}", endpoint, hex::encode(hash));

debug!(?key, ?object_mappings_url, "Requesting object mapping...");
debug!(?hash, ?object_mappings_url, "Requesting object mapping...");

let response = client
.get(object_mappings_url.clone())
.get(&object_mappings_url)
.send()
.await?
.json::<ObjectMapping>()
.await;
match &response {
Ok(json) => {
trace!(?key, ?json, "Requested object mapping.");
trace!(?hash, ?json, "Received object mapping");
}
Err(err) => {
error!(?key, ?err, ?object_mappings_url, "Request failed");
error!(?hash, ?err, ?object_mappings_url, "Request failed");
}
}

response.map_err(|err| err.into())
}

/// Fetches a DSN object with `hash`, using the mapping indexer service.
async fn serve_object<PG>(
key: web::Path<String>,
hash: web::Path<Blake3Hash>,
additional_data: web::Data<Arc<ServerParameters<PG>>>,
) -> impl Responder
where
PG: PieceGetter + Send + Sync + 'static,
{
let server_params = additional_data.into_inner();
let key = key.into_inner();

// Validate object hash
let decode_result = hex::decode(key.clone());
let object_hash = match decode_result {
Ok(hash) => {
if hash.len() != Blake3Hash::SIZE {
error!(?key, ?hash, "Invalid hash provided.");
return HttpResponse::BadRequest().finish();
}

Blake3Hash::try_from(hash.as_slice()).expect("Hash size was confirmed.")
}
Err(err) => {
error!(?key, ?err, "Invalid hash provided.");
return HttpResponse::BadRequest().finish();
}
};
let hash = hash.into_inner();

let Ok(object_mapping) =
request_object_mappings(server_params.indexer_endpoint.clone(), key.clone()).await
let Ok(object_mapping) = request_object_mapping(&server_params.indexer_endpoint, hash).await
else {
return HttpResponse::BadRequest().finish();
};

if object_mapping.hash != object_hash {
if object_mapping.hash != hash {
error!(
?key,
object_mapping_hash=?object_mapping.hash,
"Requested hash doesn't match object mapping."
?object_mapping,
?hash,
"Returned object mapping doesn't match requested hash"
);
return HttpResponse::ServiceUnavailable().finish();
}
Expand All @@ -109,22 +98,24 @@ where

let object = match object_fetcher_result {
Ok(object) => {
trace!(?key, size=%object.len(), "Object fetched successfully");
trace!(?hash, size = %object.len(), "Object fetched successfully");

let data_hash = blake3_hash(&object);
if data_hash != object_hash {
if data_hash != hash {
error!(
?data_hash,
?object_hash,
"Retrieved data did not match mapping hash"
data_size = %object.len(),
?hash,
"Retrieved data doesn't match requested mapping hash"
);
trace!(data = %hex::encode(object), "Retrieved data");
return HttpResponse::ServiceUnavailable().finish();
}

object
}
Err(err) => {
error!(?key, ?err, "Failed to fetch object.");
error!(?hash, ?err, "Failed to fetch object");
return HttpResponse::ServiceUnavailable().finish();
}
};
Expand All @@ -134,6 +125,7 @@ where
.body(object)
}

/// Starts the DSN object HTTP server.
pub async fn start_server<PG>(server_params: ServerParameters<PG>) -> std::io::Result<()>
where
PG: PieceGetter + Send + Sync + 'static,
Expand Down
4 changes: 2 additions & 2 deletions crates/subspace-gateway/src/commands/rpc.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! Gateway rpc command.
//! This command start an RPC server to serve object requests.
//! This command starts an RPC server to serve object requests from the DSN.
pub(crate) mod server;

use crate::commands::rpc::server::{launch_rpc_server, RpcOptions, RPC_DEFAULT_PORT};
Expand All @@ -21,7 +21,7 @@ pub(crate) struct RpcCommandOptions {
rpc_options: RpcOptions<RPC_DEFAULT_PORT>,
}

/// Runs an RPC server
/// Runs an RPC server which fetches DSN objects based on mappings.
pub async fn run(run_options: RpcCommandOptions) -> anyhow::Result<()> {
let signal = shutdown_signal();

Expand Down

0 comments on commit 6a6cfb6

Please sign in to comment.