Skip to content

Commit

Permalink
Move the messsages, requests and responses for each subsystem into su…
Browse files Browse the repository at this point in the history
…bmodules
  • Loading branch information
rklaehn committed Jul 3, 2024
1 parent ecaf242 commit 225ea91
Show file tree
Hide file tree
Showing 9 changed files with 1,288 additions and 1,263 deletions.
3 changes: 2 additions & 1 deletion iroh/src/client/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use iroh_blobs::{
format::collection::{Collection, SimpleStore},
get::db::DownloadProgress as BytesDownloadProgress,
store::{ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress},
util::SetTagOption,
BlobFormat, Hash, Tag,
};
use iroh_net::NodeAddr;
Expand All @@ -35,7 +36,7 @@ use crate::rpc_protocol::{
BlobAddPathRequest, BlobAddStreamRequest, BlobAddStreamUpdate, BlobConsistencyCheckRequest,
BlobDeleteBlobRequest, BlobDownloadRequest, BlobExportRequest, BlobListIncompleteRequest,
BlobListRequest, BlobReadAtRequest, BlobReadAtResponse, BlobValidateRequest,
CreateCollectionRequest, CreateCollectionResponse, NodeStatusRequest, SetTagOption,
CreateCollectionRequest, CreateCollectionResponse, NodeStatusRequest,
};

use super::{flatten, tags, Iroh, RpcClient};
Expand Down
7 changes: 2 additions & 5 deletions iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,13 +477,10 @@ mod tests {
use anyhow::{bail, Context};
use bytes::Bytes;
use iroh_base::node_addr::AddrInfoOptions;
use iroh_blobs::{provider::AddProgress, BlobFormat};
use iroh_blobs::{provider::AddProgress, util::SetTagOption, BlobFormat};
use iroh_net::{relay::RelayMode, test_utils::DnsPkarrServer, NodeAddr};

use crate::{
client::blobs::{AddOutcome, WrapOption},
rpc_protocol::SetTagOption,
};
use crate::client::blobs::{AddOutcome, WrapOption};

use super::*;

Expand Down
77 changes: 39 additions & 38 deletions iroh/src/node/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use iroh_blobs::get::db::DownloadProgress;
use iroh_blobs::get::Stats;
use iroh_blobs::store::{ConsistencyCheckProgress, ExportFormat, ImportProgress, MapEntry};
use iroh_blobs::util::progress::ProgressSender;
use iroh_blobs::util::SetTagOption;
use iroh_blobs::BlobFormat;
use iroh_blobs::{
provider::AddProgress,
Expand All @@ -40,17 +41,17 @@ use crate::client::{
};
use crate::node::{docs::DocsEngine, NodeInner};
use crate::rpc_protocol::{
AuthorsRequest, BlobAddPathRequest, BlobAddPathResponse, BlobAddStreamRequest,
BlobAddStreamResponse, BlobAddStreamUpdate, BlobConsistencyCheckRequest, BlobDeleteBlobRequest,
BlobDownloadRequest, BlobDownloadResponse, BlobExportRequest, BlobExportResponse,
BlobListIncompleteRequest, BlobListRequest, BlobReadAtRequest, BlobReadAtResponse,
BlobValidateRequest, BlobsRequest, CreateCollectionRequest, CreateCollectionResponse,
authors, blobs, docs::Request as DocsRequest, node, BlobAddPathRequest, BlobAddPathResponse,
BlobAddStreamRequest, BlobAddStreamResponse, BlobAddStreamUpdate, BlobConsistencyCheckRequest,
BlobDeleteBlobRequest, BlobDownloadRequest, BlobDownloadResponse, BlobExportRequest,
BlobExportResponse, BlobListIncompleteRequest, BlobListRequest, BlobReadAtRequest,
BlobReadAtResponse, BlobValidateRequest, CreateCollectionRequest, CreateCollectionResponse,
DeleteTagRequest, DocExportFileRequest, DocExportFileResponse, DocImportFileRequest,
DocImportFileResponse, DocSetHashRequest, DocsRequest, ListTagsRequest, NodeAddAddrRequest,
NodeAddrRequest, NodeConnectionInfoRequest, NodeConnectionInfoResponse, NodeConnectionsRequest,
NodeConnectionsResponse, NodeIdRequest, NodeRelayRequest, NodeRequest, NodeShutdownRequest,
DocImportFileResponse, DocSetHashRequest, ListTagsRequest, NodeAddAddrRequest, NodeAddrRequest,
NodeConnectionInfoRequest, NodeConnectionInfoResponse, NodeConnectionsRequest,
NodeConnectionsResponse, NodeIdRequest, NodeRelayRequest, NodeShutdownRequest,
NodeStatsRequest, NodeStatsResponse, NodeStatusRequest, NodeWatchRequest, NodeWatchResponse,
Request, RpcService, SetTagOption, TagsRequest,
Request, RpcService, TagsRequest,
};

mod docs;
Expand Down Expand Up @@ -130,60 +131,60 @@ impl<D: BaoStore> Handler<D> {
use Request::*;
debug!("handling rpc request: {msg}");
match msg {
Node(NodeRequest::Watch(msg)) => {
Node(node::Request::Watch(msg)) => {
chan.server_streaming(msg, self, Self::node_watch).await
}
Node(NodeRequest::Status(msg)) => chan.rpc(msg, self, Self::node_status).await,
Node(NodeRequest::Id(msg)) => chan.rpc(msg, self, Self::node_id).await,
Node(NodeRequest::Addr(msg)) => chan.rpc(msg, self, Self::node_addr).await,
Node(NodeRequest::Relay(msg)) => chan.rpc(msg, self, Self::node_relay).await,
Node(NodeRequest::Shutdown(msg)) => chan.rpc(msg, self, Self::node_shutdown).await,
Node(NodeRequest::Stats(msg)) => chan.rpc(msg, self, Self::node_stats).await,
Node(NodeRequest::Connections(msg)) => {
Node(node::Request::Status(msg)) => chan.rpc(msg, self, Self::node_status).await,
Node(node::Request::Id(msg)) => chan.rpc(msg, self, Self::node_id).await,
Node(node::Request::Addr(msg)) => chan.rpc(msg, self, Self::node_addr).await,
Node(node::Request::Relay(msg)) => chan.rpc(msg, self, Self::node_relay).await,
Node(node::Request::Shutdown(msg)) => chan.rpc(msg, self, Self::node_shutdown).await,
Node(node::Request::Stats(msg)) => chan.rpc(msg, self, Self::node_stats).await,
Node(node::Request::Connections(msg)) => {
chan.server_streaming(msg, self, Self::node_connections)
.await
}
Node(NodeRequest::ConnectionInfo(msg)) => {
Node(node::Request::ConnectionInfo(msg)) => {
chan.rpc(msg, self, Self::node_connection_info).await
}
Node(NodeRequest::AddAddr(msg)) => chan.rpc(msg, self, Self::node_add_addr).await,
Blobs(BlobsRequest::List(msg)) => {
Node(node::Request::AddAddr(msg)) => chan.rpc(msg, self, Self::node_add_addr).await,
Blobs(blobs::Request::List(msg)) => {
chan.server_streaming(msg, self, Self::blob_list).await
}
Blobs(BlobsRequest::ListIncomplete(msg)) => {
Blobs(blobs::Request::ListIncomplete(msg)) => {
chan.server_streaming(msg, self, Self::blob_list_incomplete)
.await
}
Blobs(BlobsRequest::CreateCollection(msg)) => {
Blobs(blobs::Request::CreateCollection(msg)) => {
chan.rpc(msg, self, Self::create_collection).await
}
Blobs(BlobsRequest::DeleteBlob(msg)) => {
Blobs(blobs::Request::DeleteBlob(msg)) => {
chan.rpc(msg, self, Self::blob_delete_blob).await
}
Blobs(BlobsRequest::AddPath(msg)) => {
Blobs(blobs::Request::AddPath(msg)) => {
chan.server_streaming(msg, self, Self::blob_add_from_path)
.await
}
Blobs(BlobsRequest::Download(msg)) => {
Blobs(blobs::Request::Download(msg)) => {
chan.server_streaming(msg, self, Self::blob_download).await
}
Blobs(BlobsRequest::Export(msg)) => {
Blobs(blobs::Request::Export(msg)) => {
chan.server_streaming(msg, self, Self::blob_export).await
}
Blobs(BlobsRequest::Validate(msg)) => {
Blobs(blobs::Request::Validate(msg)) => {
chan.server_streaming(msg, self, Self::blob_validate).await
}
Blobs(BlobsRequest::Fsck(msg)) => {
Blobs(blobs::Request::Fsck(msg)) => {
chan.server_streaming(msg, self, Self::blob_consistency_check)
.await
}
Blobs(BlobsRequest::ReadAt(msg)) => {
Blobs(blobs::Request::ReadAt(msg)) => {
chan.server_streaming(msg, self, Self::blob_read_at).await
}
Blobs(BlobsRequest::AddStream(msg)) => {
Blobs(blobs::Request::AddStream(msg)) => {
chan.bidi_streaming(msg, self, Self::blob_add_stream).await
}
Blobs(BlobsRequest::AddStreamUpdate(_msg)) => {
Blobs(blobs::Request::AddStreamUpdate(_msg)) => {
Err(RpcServerError::UnexpectedUpdateMessage)
}

Expand All @@ -192,43 +193,43 @@ impl<D: BaoStore> Handler<D> {
}
Tags(TagsRequest::DeleteTag(msg)) => chan.rpc(msg, self, Self::blob_delete_tag).await,

Authors(AuthorsRequest::List(msg)) => {
Authors(authors::Request::List(msg)) => {
chan.server_streaming(msg, self, |handler, req| {
handler.with_docs_stream(|docs| docs.author_list(req))
})
.await
}
Authors(AuthorsRequest::Create(msg)) => {
Authors(authors::Request::Create(msg)) => {
chan.rpc(msg, self, |handler, req| {
handler.with_docs(|docs| async move { docs.author_create(req).await })
})
.await
}
Authors(AuthorsRequest::Import(msg)) => {
Authors(authors::Request::Import(msg)) => {
chan.rpc(msg, self, |handler, req| {
handler.with_docs(|docs| async move { docs.author_import(req).await })
})
.await
}
Authors(AuthorsRequest::Export(msg)) => {
Authors(authors::Request::Export(msg)) => {
chan.rpc(msg, self, |handler, req| {
handler.with_docs(|docs| async move { docs.author_export(req).await })
})
.await
}
Authors(AuthorsRequest::Delete(msg)) => {
Authors(authors::Request::Delete(msg)) => {
chan.rpc(msg, self, |handler, req| {
handler.with_docs(|docs| async move { docs.author_delete(req).await })
})
.await
}
Authors(AuthorsRequest::GetDefault(msg)) => {
Authors(authors::Request::GetDefault(msg)) => {
chan.rpc(msg, self, |handler, req| {
handler.with_docs(|docs| async move { Ok(docs.author_default(req)) })
})
.await
}
Authors(AuthorsRequest::SetDefault(msg)) => {
Authors(authors::Request::SetDefault(msg)) => {
chan.rpc(msg, self, |handler, req| {
handler.with_docs(|docs| async move { docs.author_set_default(req).await })
})
Expand Down
Loading

0 comments on commit 225ea91

Please sign in to comment.