Skip to content

Commit

Permalink
Serve files over p2p (#2523)
Browse files Browse the repository at this point in the history
* serve files over p2p

* include location instance id in sync

* Fix P2P addressing?

---------

Co-authored-by: Brendan Allan <[email protected]>
  • Loading branch information
oscartbeaumont and Brendonovich authored May 30, 2024
1 parent b015763 commit 0392c78
Show file tree
Hide file tree
Showing 16 changed files with 385 additions and 61 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/crates/prisma-helpers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ file_path::select!(file_path_to_handle_custom_uri {
instance: select {
identity
remote_identity
node_remote_identity
}
}
});
Expand Down
14 changes: 14 additions & 0 deletions core/crates/sync/src/backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ pub async fn backfill_operations(db: &PrismaClient, sync: &crate::Manager, insta
.find_many(vec![location::id::gt(cursor)])
.order_by(location::id::order(SortOrder::Asc))
.take(1000)
.include(location::include!({
instance: select {
id
pub_id
}
}))
.exec()
},
|location| location.id,
Expand Down Expand Up @@ -108,6 +114,14 @@ pub async fn backfill_operations(db: &PrismaClient, sync: &crate::Manager, insta
),
option_sync_entry!(l.hidden, hidden),
option_sync_entry!(l.date_created, date_created),
option_sync_entry!(
l.instance.map(|i| {
prisma_sync::instance::SyncId {
pub_id: i.pub_id,
}
}),
instance
),
],
),
)
Expand Down
7 changes: 3 additions & 4 deletions core/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ model Node {

// represents a single `.db` file (SQLite DB) that is paired to the current library.
// A `LibraryInstance` is always owned by a single `Node` but it's possible for that node to change (or two to be owned by a single node).
/// @local
/// @local(id: pub_id)
model Instance {
id Int @id @default(autoincrement()) // This is is NOT globally unique
pub_id Bytes @unique // This UUID is meaningless and exists soley cause the `uhlc::ID` must be 16-bit. Really this should be derived from the `identity` field.
Expand Down Expand Up @@ -148,8 +148,7 @@ model Location {
scan_state Int @default(0) // Enum: sd_core::location::ScanState
/// @local
// this is just a client side cache which is annoying but oh well (@brendan)
// this should just be a local-only cache but it's too much effort to broadcast online locations rn (@brendan)
instance_id Int?
instance Instance? @relation(fields: [instance_id], references: [id], onDelete: SetNull)
Expand Down Expand Up @@ -627,7 +626,7 @@ model IndexerRule {
id Int @id @default(autoincrement())
pub_id Bytes @unique
name String? @unique
name String? @unique
default Boolean?
rules_per_kind Bytes?
date_created DateTime?
Expand Down
81 changes: 59 additions & 22 deletions core/src/custom_uri/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,23 @@ use crate::{
api::{utils::InvalidateOperationEvent, CoreEvent},
library::Library,
object::media::old_thumbnail::WEBP_EXTENSION,
p2p::operations,
p2p::operations::{self, request_file},
util::InfallibleResponse,
Node,
};

use async_stream::stream;
use bytes::Bytes;
use mpsc_to_async_write::MpscToAsyncWrite;
use sd_core_file_path_helper::IsolatedFilePathData;
use sd_core_prisma_helpers::file_path_to_handle_custom_uri;

use sd_file_ext::text::is_text;
use sd_p2p::{RemoteIdentity, P2P};
use sd_p2p_block::Range;
use sd_prisma::prisma::{file_path, location};
use sd_utils::db::maybe_missing;
use tokio_util::sync::PollSender;

use std::{
cmp::min,
Expand All @@ -26,7 +31,7 @@ use std::{
};

use axum::{
body::{self, Body, BoxBody, Full},
body::{self, Body, BoxBody, Full, StreamBody},
extract::{self, State},
http::{HeaderMap, HeaderValue, Request, Response, StatusCode},
middleware,
Expand Down Expand Up @@ -68,7 +73,11 @@ pub enum ServeFrom {
/// Serve from the local filesystem
Local,
/// Serve from a specific instance
Remote(RemoteIdentity),
Remote {
library_identity: RemoteIdentity,
node_identity: RemoteIdentity,
library: Arc<Library>,
},
}

#[derive(Clone)]
Expand Down Expand Up @@ -176,17 +185,29 @@ async fn get_or_init_lru_entry(
let path = Path::new(path)
.join(IsolatedFilePathData::try_from((location_id, &file_path)).map_err(not_found)?);

let identity =
let library_identity =
RemoteIdentity::from_bytes(&instance.remote_identity).map_err(internal_server_error)?;

let node_identity = RemoteIdentity::from_bytes(
instance
.node_remote_identity
.as_ref()
.expect("node_remote_identity is required"),
)
.map_err(internal_server_error)?;

let lru_entry = CacheValue {
name: path,
ext: maybe_missing(file_path.extension, "extension").map_err(not_found)?,
file_path_pub_id: Uuid::from_slice(&file_path.pub_id).map_err(internal_server_error)?,
serve_from: if identity == library.identity.to_remote_identity() {
serve_from: if library_identity == library.identity.to_remote_identity() {
ServeFrom::Local
} else {
ServeFrom::Remote(identity)
ServeFrom::Remote {
library_identity,
node_identity,
library: library.clone(),
}
},
};

Expand Down Expand Up @@ -240,19 +261,16 @@ pub fn base_router() -> Router<LocalState> {
.route(
"/file/:lib_id/:loc_id/:path_id",
get(
|State(state): State<LocalState>,
path: ExtractedPath,
mut request: Request<Body>| async move {
let part_parts = path.0.clone();
|State(state): State<LocalState>, path: ExtractedPath, request: Request<Body>| async move {
let (
CacheValue {
name: file_path_full_path,
ext: extension,
file_path_pub_id: _file_path_pub_id,
file_path_pub_id,
serve_from,
..
},
_library,
library,
) = get_or_init_lru_entry(&state, path).await?;

match serve_from {
Expand Down Expand Up @@ -288,18 +306,37 @@ pub fn base_router() -> Router<LocalState> {

serve_file(file, Ok(metadata), request.into_parts().0, resp).await
}
ServeFrom::Remote(identity) => {
*request.uri_mut() =
format!("/file/{}/{}/{}", part_parts.0, part_parts.1, part_parts.2)
.parse()
.expect("url was validated by Axum");

Ok(request_to_remote_node(
ServeFrom::Remote {
library_identity: _,
node_identity,
library,
} => {
// TODO: Support `Range` requests and `ETag` headers

let (tx, mut rx) = tokio::sync::mpsc::channel::<io::Result<Bytes>>(150);
request_file(
state.node.p2p.p2p.clone(),
identity,
request,
node_identity,
&library.id,
&library.identity,
file_path_pub_id,
Range::Full,
MpscToAsyncWrite::new(PollSender::new(tx)),
)
.await)
.await
.map_err(|err| {
error!("Error requesting file {file_path_pub_id:?} from node {:?}: {err:?}", library.identity.to_remote_identity());
internal_server_error(())
})?;

// TODO: Content Type
Ok(InfallibleResponse::builder().status(StatusCode::OK).body(
body::boxed(StreamBody::new(stream! {
while let Some(item) = rx.recv().await {
yield item;
}
})),
))
}
}
},
Expand Down
2 changes: 1 addition & 1 deletion core/src/library/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,7 @@ async fn sync_rx_actor(
InvalidateOperationEvent::all(),
)),
SyncMessage::Created => {
p2p::sync::originator(library.id, &library.sync, &node.p2p).await
p2p::sync::originator(library.clone(), &library.sync, &node.p2p).await
}
}
}
Expand Down
16 changes: 15 additions & 1 deletion core/src/p2p/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,12 +387,13 @@ async fn start(

error!("Failed to handle Spacedrop request");
}
Header::Sync(library_id) => {
Header::Sync => {
let Ok(mut tunnel) = Tunnel::responder(stream).await.map_err(|err| {
error!("Failed `Tunnel::responder`: {}", err);
}) else {
return;
};
let library_id = tunnel.library_id();

let Ok(msg) = SyncMessage::from_stream(&mut tunnel).await.map_err(|err| {
error!("Failed `SyncMessage::from_stream`: {}", err);
Expand Down Expand Up @@ -432,6 +433,19 @@ async fn start(

error!("Failed to handling rspc request with '{remote}': {err:?}");
}
Header::LibraryFile {
file_path_id,
range,
} => {
let remote = stream.remote_identity();
let Err(err) =
operations::library::receiver(stream, file_path_id, range, &node).await
else {
return;
};

error!("Failed to handling library file request with {remote:?} for {file_path_id}: {err:?}");
}
};
});
}
Expand Down
Loading

0 comments on commit 0392c78

Please sign in to comment.