Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serve files over p2p #2523

Merged
merged 3 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/crates/prisma-helpers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ file_path::select!(file_path_to_handle_custom_uri {
instance: select {
identity
remote_identity
node_remote_identity
}
}
});
Expand Down
14 changes: 14 additions & 0 deletions core/crates/sync/src/backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ pub async fn backfill_operations(db: &PrismaClient, sync: &crate::Manager, insta
.find_many(vec![location::id::gt(cursor)])
.order_by(location::id::order(SortOrder::Asc))
.take(1000)
.include(location::include!({
instance: select {
id
pub_id
}
}))
.exec()
},
|location| location.id,
Expand Down Expand Up @@ -108,6 +114,14 @@ pub async fn backfill_operations(db: &PrismaClient, sync: &crate::Manager, insta
),
option_sync_entry!(l.hidden, hidden),
option_sync_entry!(l.date_created, date_created),
option_sync_entry!(
l.instance.map(|i| {
prisma_sync::instance::SyncId {
pub_id: i.pub_id,
}
}),
instance
),
],
),
)
Expand Down
7 changes: 3 additions & 4 deletions core/prisma/schema.prisma
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
datasource db {

Check failure on line 1 in core/prisma/schema.prisma

View workflow job for this annotation

GitHub Actions / Clippy (ubuntu-22.04)

Missing migration

New migration should be generated due to changes in prisma schema

Check failure on line 1 in core/prisma/schema.prisma

View workflow job for this annotation

GitHub Actions / iOS

Missing migration

New migration should be generated due to changes in prisma schema

Check failure on line 1 in core/prisma/schema.prisma

View workflow job for this annotation

GitHub Actions / Clippy (macos-14)

Missing migration

New migration should be generated due to changes in prisma schema

Check failure on line 1 in core/prisma/schema.prisma

View workflow job for this annotation

GitHub Actions / Cypress

Missing migration

New migration should be generated due to changes in prisma schema

Check failure on line 1 in core/prisma/schema.prisma

View workflow job for this annotation

GitHub Actions / Rust Formatting

Missing migration

New migration should be generated due to changes in prisma schema
provider = "sqlite"
url = "file:dev.db"
}
Expand Down Expand Up @@ -68,7 +68,7 @@

// represents a single `.db` file (SQLite DB) that is paired to the current library.
// A `LibraryInstance` is always owned by a single `Node` but it's possible for that node to change (or two to be owned by a single node).
/// @local
/// @local(id: pub_id)
model Instance {
id Int @id @default(autoincrement()) // This is is NOT globally unique
pub_id Bytes @unique // This UUID is meaningless and exists soley cause the `uhlc::ID` must be 16-bit. Really this should be derived from the `identity` field.
Expand Down Expand Up @@ -148,8 +148,7 @@

scan_state Int @default(0) // Enum: sd_core::location::ScanState

/// @local
// this is just a client side cache which is annoying but oh well (@brendan)
// this should just be a local-only cache but it's too much effort to broadcast online locations rn (@brendan)
instance_id Int?
instance Instance? @relation(fields: [instance_id], references: [id], onDelete: SetNull)

Expand Down Expand Up @@ -627,7 +626,7 @@
id Int @id @default(autoincrement())
pub_id Bytes @unique

name String? @unique
name String? @unique
default Boolean?
rules_per_kind Bytes?
date_created DateTime?
Expand Down
81 changes: 59 additions & 22 deletions core/src/custom_uri/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,23 @@ use crate::{
api::{utils::InvalidateOperationEvent, CoreEvent},
library::Library,
object::media::old_thumbnail::WEBP_EXTENSION,
p2p::operations,
p2p::operations::{self, request_file},
util::InfallibleResponse,
Node,
};

use async_stream::stream;
use bytes::Bytes;
use mpsc_to_async_write::MpscToAsyncWrite;
use sd_core_file_path_helper::IsolatedFilePathData;
use sd_core_prisma_helpers::file_path_to_handle_custom_uri;

use sd_file_ext::text::is_text;
use sd_p2p::{RemoteIdentity, P2P};
use sd_p2p_block::Range;
use sd_prisma::prisma::{file_path, location};
use sd_utils::db::maybe_missing;
use tokio_util::sync::PollSender;

use std::{
cmp::min,
Expand All @@ -26,7 +31,7 @@ use std::{
};

use axum::{
body::{self, Body, BoxBody, Full},
body::{self, Body, BoxBody, Full, StreamBody},
extract::{self, State},
http::{HeaderMap, HeaderValue, Request, Response, StatusCode},
middleware,
Expand Down Expand Up @@ -68,7 +73,11 @@ pub enum ServeFrom {
/// Serve from the local filesystem
Local,
/// Serve from a specific instance
Remote(RemoteIdentity),
Remote {
library_identity: RemoteIdentity,
node_identity: RemoteIdentity,
library: Arc<Library>,
},
}

#[derive(Clone)]
Expand Down Expand Up @@ -176,17 +185,29 @@ async fn get_or_init_lru_entry(
let path = Path::new(path)
.join(IsolatedFilePathData::try_from((location_id, &file_path)).map_err(not_found)?);

let identity =
let library_identity =
RemoteIdentity::from_bytes(&instance.remote_identity).map_err(internal_server_error)?;

let node_identity = RemoteIdentity::from_bytes(
instance
.node_remote_identity
.as_ref()
.expect("node_remote_identity is required"),
)
.map_err(internal_server_error)?;

let lru_entry = CacheValue {
name: path,
ext: maybe_missing(file_path.extension, "extension").map_err(not_found)?,
file_path_pub_id: Uuid::from_slice(&file_path.pub_id).map_err(internal_server_error)?,
serve_from: if identity == library.identity.to_remote_identity() {
serve_from: if library_identity == library.identity.to_remote_identity() {
ServeFrom::Local
} else {
ServeFrom::Remote(identity)
ServeFrom::Remote {
library_identity,
node_identity,
library: library.clone(),
}
},
};

Expand Down Expand Up @@ -240,19 +261,16 @@ pub fn base_router() -> Router<LocalState> {
.route(
"/file/:lib_id/:loc_id/:path_id",
get(
|State(state): State<LocalState>,
path: ExtractedPath,
mut request: Request<Body>| async move {
let part_parts = path.0.clone();
|State(state): State<LocalState>, path: ExtractedPath, request: Request<Body>| async move {
let (
CacheValue {
name: file_path_full_path,
ext: extension,
file_path_pub_id: _file_path_pub_id,
file_path_pub_id,
serve_from,
..
},
_library,
library,
) = get_or_init_lru_entry(&state, path).await?;

match serve_from {
Expand Down Expand Up @@ -288,18 +306,37 @@ pub fn base_router() -> Router<LocalState> {

serve_file(file, Ok(metadata), request.into_parts().0, resp).await
}
ServeFrom::Remote(identity) => {
*request.uri_mut() =
format!("/file/{}/{}/{}", part_parts.0, part_parts.1, part_parts.2)
.parse()
.expect("url was validated by Axum");

Ok(request_to_remote_node(
ServeFrom::Remote {
library_identity: _,
node_identity,
library,
} => {
// TODO: Support `Range` requests and `ETag` headers

let (tx, mut rx) = tokio::sync::mpsc::channel::<io::Result<Bytes>>(150);
request_file(
state.node.p2p.p2p.clone(),
identity,
request,
node_identity,
&library.id,
&library.identity,
file_path_pub_id,
Range::Full,
MpscToAsyncWrite::new(PollSender::new(tx)),
)
.await)
.await
.map_err(|err| {
error!("Error requesting file {file_path_pub_id:?} from node {:?}: {err:?}", library.identity.to_remote_identity());
internal_server_error(())
})?;

// TODO: Content Type
Ok(InfallibleResponse::builder().status(StatusCode::OK).body(
body::boxed(StreamBody::new(stream! {
while let Some(item) = rx.recv().await {
yield item;
}
})),
))
}
}
},
Expand Down
2 changes: 1 addition & 1 deletion core/src/library/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,7 @@ async fn sync_rx_actor(
InvalidateOperationEvent::all(),
)),
SyncMessage::Created => {
p2p::sync::originator(library.id, &library.sync, &node.p2p).await
p2p::sync::originator(library.clone(), &library.sync, &node.p2p).await
}
}
}
Expand Down
16 changes: 15 additions & 1 deletion core/src/p2p/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,12 +387,13 @@ async fn start(

error!("Failed to handle Spacedrop request");
}
Header::Sync(library_id) => {
Header::Sync => {
let Ok(mut tunnel) = Tunnel::responder(stream).await.map_err(|err| {
error!("Failed `Tunnel::responder`: {}", err);
}) else {
return;
};
let library_id = tunnel.library_id();

let Ok(msg) = SyncMessage::from_stream(&mut tunnel).await.map_err(|err| {
error!("Failed `SyncMessage::from_stream`: {}", err);
Expand Down Expand Up @@ -432,6 +433,19 @@ async fn start(

error!("Failed to handling rspc request with '{remote}': {err:?}");
}
Header::LibraryFile {
file_path_id,
range,
} => {
let remote = stream.remote_identity();
let Err(err) =
operations::library::receiver(stream, file_path_id, range, &node).await
else {
return;
};

error!("Failed to handling library file request with {remote:?} for {file_path_id}: {err:?}");
}
};
});
}
Expand Down
Loading
Loading