Skip to content

Commit

Permalink
apply fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
ryankurte committed Aug 20, 2024
1 parent 9aa9026 commit 47a94ec
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 38 deletions.
2 changes: 1 addition & 1 deletion daemon/src/core/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::store::{object::ObjectIdentifier, Backend, Store, StoreError};
/// This serialises sqlite database operations to avoid the need for connection
/// pooling while using a dedicated thread to avoid blocking executor threads.
///
/// (eg. allows store to be shared, and core / RPC operations to keep moving
/// (eg. allows store to be shared, and core / RPC operations to keep moving
/// without waiting on database writes, though this has both advantages and disadvantages...)
#[derive(Clone)]
pub struct AsyncStore {
Expand Down
38 changes: 20 additions & 18 deletions daemon/src/rpc/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ use tracing::{span, Level};
use dsf_core::error::Error as CoreError;
use dsf_core::net;
use dsf_core::prelude::*;
use dsf_rpc::{self as rpc, ControlWriteOptions, ServiceState, SubscribeOptions, SubscriptionInfo, SubscriptionKind};
use dsf_rpc::{
self as rpc, ControlWriteOptions, ServiceState, SubscribeOptions, SubscriptionInfo,
SubscriptionKind,
};

use crate::rpc::data::{publish_data, push_data};
use crate::{
Expand All @@ -40,15 +43,11 @@ pub enum SubscribeState {
#[allow(async_fn_in_trait)]
pub trait Control {
/// Write a control message to a known service
async fn control(&self, options: ControlWriteOptions)
-> Result<(), DsfError>;
async fn control(&self, options: ControlWriteOptions) -> Result<(), DsfError>;
}

impl<T: Engine> Control for T {
async fn control(
&self,
options: ControlWriteOptions,
) -> Result<(), DsfError> {
async fn control(&self, options: ControlWriteOptions) -> Result<(), DsfError> {
info!("Sending control to service: {:?}", options);

// Lookup local service information
Expand All @@ -70,15 +69,16 @@ impl<T: Engine> Control for T {

// TODO: probably tidier to route via IPC for managed services / clients.
// (handle case where a service holds it's own keys but uses the daemon for comms)

// For locally managed services we can directly apply the update.
if target.private_key.is_some() {
// TODO: this

debug!("Generating updated data block");

// Generate and publish updated data block
let block = publish_data(self, &target, options.data, vec![Options::peer_id(our_id)]).await?;
let block =
publish_data(self, &target, options.data, vec![Options::peer_id(our_id)]).await?;

debug!("Publishing updated data block {}", block.signature());

Expand All @@ -87,19 +87,18 @@ impl<T: Engine> Control for T {

debug!("Data push complete");

return Ok(())
return Ok(());
}

// Otherwise use network communication


// Find local peers or delegated replicas
let target_peer = match self.peer_get(target.id.clone()).await {
Ok(p) => {
debug!("Found local peer matching service ID, attempting direct control");

p
},
}
_ => {
debug!("No local peers, searching for delegated replicas.");

Expand All @@ -113,17 +112,20 @@ impl<T: Engine> Control for T {
debug!("Located authorative replica: {}", r.peer_id);

r.peer_id.clone()
},
}
None if replicas.len() == 1 => {
debug!("No authorative replica, trying with only available peer: {}", replicas[0].page_id);
debug!(
"No authorative replica, trying with only available peer: {}",
replicas[0].page_id
);

replicas[0].peer_id.clone()
},
}
None => {
debug!("No authorative replica found ({} replicas)", replicas.len());

return Err(DsfError::NoReplicasFound)
},
return Err(DsfError::NoReplicasFound);
}
};

// Lookup non-local peer
Expand Down Expand Up @@ -153,7 +155,7 @@ impl<T: Engine> Control for T {
net::ResponseBody::Status(s) if s == net::Status::Ok => {
debug!("Request OK!");
Ok(())
},
}
_ => {
debug!("Request failed (response: {:?})", resp.data);
Err(DsfError::InvalidResponse)
Expand Down
29 changes: 13 additions & 16 deletions daemon/src/rpc/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,26 +94,23 @@ pub(super) async fn publish_data<E: Engine, B: DataBody>(

debug!("Building data object");


// Build and sign data object
let r = e
.svc_update(
info.id.clone(),
Box::new(
move |svc, _state| {
// Setup publishing options
let data_options = DataOptions {
//data_kind: opts.kind.into(),
body: Some(b.clone()),
private_options: &private_options,
..Default::default()
};
match svc.publish_data_buff(data_options) {
Ok((_n, c)) => CoreRes::Pages(vec![c.to_owned()], None),
Err(e) => CoreRes::Error(e),
}
},
),
Box::new(move |svc, _state| {
// Setup publishing options
let data_options = DataOptions {
//data_kind: opts.kind.into(),
body: Some(b.clone()),
private_options: &private_options,
..Default::default()
};
match svc.publish_data_buff(data_options) {
Ok((_n, c)) => CoreRes::Pages(vec![c.to_owned()], None),
Err(e) => CoreRes::Error(e),
}
}),
)
.await;

Expand Down
1 change: 0 additions & 1 deletion daemon/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ pub mod replicate;
pub mod subscribe;
pub mod sync;


/// Async RPC handler abstraction
#[allow(async_fn_in_trait)]
pub trait Rpc {
Expand Down
1 change: 0 additions & 1 deletion rpc/src/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,3 @@ pub struct ControlWriteOptions {
/// Data body (base64 encoded for string parsing)
pub data: Data,
}

2 changes: 1 addition & 1 deletion rpc/src/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use dsf_core::prelude::*;
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct ReplicaInfo {
//pub service_id: Id,
///
///
pub page_id: Id,
/// ID of replicating peer
pub peer_id: Id,
Expand Down

0 comments on commit 47a94ec

Please sign in to comment.