From 193aeb5c8f499b518269745d2293b03f5f3a0984 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Imobach=20Gonz=C3=A1lez=20Sosa?= Date: Sat, 13 Apr 2024 08:46:22 +0100 Subject: [PATCH 1/7] Split NetworkSystem in two structs (System and Server) * Hide behind the API the fact that it uses a different tokio::task. --- rust/agama-server/src/network/dbus/service.rs | 14 +- rust/agama-server/src/network/system.rs | 120 ++++++++++++------ rust/agama-server/src/network/web.rs | 17 +-- 3 files changed, 88 insertions(+), 63 deletions(-) diff --git a/rust/agama-server/src/network/dbus/service.rs b/rust/agama-server/src/network/dbus/service.rs index b085cf4855..553020b2ae 100644 --- a/rust/agama-server/src/network/dbus/service.rs +++ b/rust/agama-server/src/network/dbus/service.rs @@ -3,12 +3,12 @@ //! This module defines a D-Bus service which exposes Agama's network configuration. use crate::network::{Adapter, NetworkSystem}; use std::error::Error; -use tokio; use zbus::Connection; /// Represents the Agama networking D-Bus service. /// /// It is responsible for starting the [NetworkSystem] on a different thread. +/// TODO: this struct might not be needed anymore. pub struct NetworkService; impl NetworkService { @@ -17,16 +17,8 @@ impl NetworkService { connection: &Connection, adapter: T, ) -> Result<(), Box> { - let mut network = NetworkSystem::new(connection.clone(), adapter); - - tokio::spawn(async move { - network - .setup() - .await - .expect("Could not set up the D-Bus tree"); - - network.listen().await; - }); + let network = NetworkSystem::new(connection.clone(), adapter); + network.start().await?; Ok(()) } } diff --git a/rust/agama-server/src/network/system.rs b/rust/agama-server/src/network/system.rs index 5b53672af8..6d2371c3eb 100644 --- a/rust/agama-server/src/network/system.rs +++ b/rust/agama-server/src/network/system.rs @@ -1,6 +1,6 @@ use super::{error::NetworkStateError, model::StateConfig, NetworkAdapterError}; use crate::network::{dbus::Tree, model::Connection, Action, Adapter, NetworkState}; -use agama_lib::network::types::DeviceType; +use agama_lib::{error::ServiceError, network::types::DeviceType}; use std::{error::Error, sync::Arc}; use tokio::sync::{ mpsc::{self, UnboundedReceiver, UnboundedSender}, @@ -9,59 +9,96 @@ use tokio::sync::{ use uuid::Uuid; use zbus::zvariant::OwnedObjectPath; -/// Represents the network system using holding the state and setting up the D-Bus tree. -pub struct NetworkSystem { - /// Network state - pub state: NetworkState, - /// Side of the channel to send actions. - actions_tx: UnboundedSender, - actions_rx: UnboundedReceiver, - tree: Arc>, - /// Adapter to read/write the network state. +/// Represents the network configuration service. +/// +/// It offers an API to start the service and interact with it by using message +/// passing like the example below. +/// +/// ```no_run +/// # use agama_server::network::{Action, NetworkManagerAdapter, NetworkSystem}; +/// # use agama_lib::connection; +/// # use tokio::sync::oneshot; +/// +/// # tokio_test::block_on(async { +/// let adapter = NetworkManagerAdapter::from_system() +/// .await +/// .expect("Could not connect to NetworkManager."); +/// let dbus = connection() +/// .await +/// .expect("Could not connect to Agama's D-Bus server."); +/// let network = NetworkSystem::new(dbus, adapter); +/// +/// // Start the networking service and get the channel for communication. +/// let actions = network.start() +/// .await +/// .expect("Could not start the networking configuration system."); +/// +/// // Perform some action, like getting the list of devices. +/// let (tx, rx) = oneshot::channel(); +/// actions.send(Action::GetDevices(tx)).unwrap(); +/// let devices = rx.await.unwrap(); +/// # }); +/// ``` +pub struct NetworkSystem { + connection: zbus::Connection, adapter: T, } -impl NetworkSystem { - pub fn new(conn: zbus::Connection, adapter: T) -> Self { - let (actions_tx, actions_rx) = mpsc::unbounded_channel(); - let tree = Tree::new(conn, actions_tx.clone()); +impl NetworkSystem { + /// Returns a new instance of the network configuration system. + /// + /// This function does not start the system. To get it running, you must call + /// the [start](Self::start) method. + /// + /// * `connection`: D-Bus connection to publish the network tree. + /// * `adapter`: networking configuration adapter. + pub fn new(connection: zbus::Connection, adapter: T) -> Self { Self { - state: NetworkState::default(), - actions_tx, - actions_rx, - tree: Arc::new(Mutex::new(tree)), + connection, adapter, } } - /// Writes the network configuration. - pub async fn write(&mut self) -> Result<(), NetworkAdapterError> { - self.adapter.write(&self.state).await?; - self.state = self.adapter.read(StateConfig::default()).await?; - Ok(()) - } + /// Starts the network configuration service and returns a channel for communication. + /// + /// This function starts the server (using [NetworkSystemServer]) on a separate + /// task. All the communication is performed through the returned channel by + /// issuing [actions](crate::network::Action). + pub async fn start(self) -> Result, ServiceError> { + let mut state = self.adapter.read(StateConfig::default()).await.unwrap(); + let (actions_tx, actions_rx) = mpsc::unbounded_channel(); + let mut tree = Tree::new(self.connection, actions_tx.clone()); + tree.set_connections(&mut state.connections).await?; + tree.set_devices(&state.devices).await?; - /// Returns a clone of the - /// [UnboundedSender](https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.UnboundedSender.html) - /// to execute [actions](Action). - pub fn actions_tx(&self) -> UnboundedSender { - self.actions_tx.clone() - } + tokio::spawn(async move { + let mut server = NetworkSystemServer { + state, + input: actions_rx, + adapter: self.adapter, + tree: Arc::new(Mutex::new(tree)), + }; - /// Populates the D-Bus tree with the known devices and connections. - pub async fn setup(&mut self) -> Result<(), Box> { - self.state = self.adapter.read(StateConfig::default()).await?; - let mut tree = self.tree.lock().await; - tree.set_connections(&mut self.state.connections).await?; - tree.set_devices(&self.state.devices).await?; - Ok(()) + server.listen().await; + }); + + Ok(actions_tx) } +} + +struct NetworkSystemServer { + state: NetworkState, + input: UnboundedReceiver, + adapter: T, + tree: Arc>, +} +impl NetworkSystemServer { /// Process incoming actions. /// /// This function is expected to be executed on a separate thread. pub async fn listen(&mut self) { - while let Some(action) = self.actions_rx.recv().await { + while let Some(action) = self.input.recv().await { if let Err(error) = self.dispatch_action(action).await { eprintln!("Could not process the action: {}", error); } @@ -245,4 +282,11 @@ impl NetworkSystem { let tree = self.tree.lock().await; tree.connection_path(conn.uuid) } + + /// Writes the network configuration. + pub async fn write(&mut self) -> Result<(), NetworkAdapterError> { + self.adapter.write(&self.state).await?; + self.state = self.adapter.read(StateConfig::default()).await?; + Ok(()) + } } diff --git a/rust/agama-server/src/network/web.rs b/rust/agama-server/src/network/web.rs index 052070cac0..c27c0d0681 100644 --- a/rust/agama-server/src/network/web.rs +++ b/rust/agama-server/src/network/web.rs @@ -60,20 +60,9 @@ pub async fn network_service( dbus: zbus::Connection, adapter: T, ) -> Result { - let mut network = NetworkSystem::new(dbus.clone(), adapter); - - let state = NetworkState { - actions: network.actions_tx(), - }; - - tokio::spawn(async move { - network - .setup() - .await - .expect("Could not set up the D-Bus tree"); - - network.listen().await; - }); + let network = NetworkSystem::new(dbus.clone(), adapter); + let actions = network.start().await?; + let state = NetworkState { actions }; Ok(Router::new() .route("/state", get(general_state).put(update_general_state)) From f1610110c3ba71673651412ab8120fcacd4d0610 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Imobach=20Gonz=C3=A1lez=20Sosa?= Date: Sat, 13 Apr 2024 10:09:42 +0100 Subject: [PATCH 2/7] Add a NetworkSystemClient for communication --- rust/agama-server/src/network/system.rs | 75 +++++++++++++++++++++---- 1 file changed, 63 insertions(+), 12 deletions(-) diff --git a/rust/agama-server/src/network/system.rs b/rust/agama-server/src/network/system.rs index 6d2371c3eb..b687a44a0e 100644 --- a/rust/agama-server/src/network/system.rs +++ b/rust/agama-server/src/network/system.rs @@ -1,9 +1,18 @@ -use super::{error::NetworkStateError, model::StateConfig, NetworkAdapterError}; -use crate::network::{dbus::Tree, model::Connection, Action, Adapter, NetworkState}; +use super::{ + error::NetworkStateError, + model::{Device, StateConfig}, + NetworkAdapterError, +}; +use crate::network::{ + dbus::Tree, + model::{Connection, GeneralState}, + Action, Adapter, NetworkState, +}; use agama_lib::{error::ServiceError, network::types::DeviceType}; use std::{error::Error, sync::Arc}; use tokio::sync::{ - mpsc::{self, UnboundedReceiver, UnboundedSender}, + mpsc::{self, error::SendError, UnboundedReceiver, UnboundedSender}, + oneshot::{self, error::RecvError}, Mutex, }; use uuid::Uuid; @@ -29,14 +38,12 @@ use zbus::zvariant::OwnedObjectPath; /// let network = NetworkSystem::new(dbus, adapter); /// /// // Start the networking service and get the channel for communication. -/// let actions = network.start() +/// let client = network.start() /// .await /// .expect("Could not start the networking configuration system."); /// /// // Perform some action, like getting the list of devices. -/// let (tx, rx) = oneshot::channel(); -/// actions.send(Action::GetDevices(tx)).unwrap(); -/// let devices = rx.await.unwrap(); +/// let devices = client.get_devices().await.unwrap(); /// # }); /// ``` pub struct NetworkSystem { @@ -59,12 +66,11 @@ impl NetworkSystem { } } - /// Starts the network configuration service and returns a channel for communication. + /// Starts the network configuration service and returns a client for communication purposes. /// /// This function starts the server (using [NetworkSystemServer]) on a separate - /// task. All the communication is performed through the returned channel by - /// issuing [actions](crate::network::Action). - pub async fn start(self) -> Result, ServiceError> { + /// task. All the communication is performed through the returned [NetworkSystemClient]. + pub async fn start(self) -> Result { let mut state = self.adapter.read(StateConfig::default()).await.unwrap(); let (actions_tx, actions_rx) = mpsc::unbounded_channel(); let mut tree = Tree::new(self.connection, actions_tx.clone()); @@ -82,7 +88,52 @@ impl NetworkSystem { server.listen().await; }); - Ok(actions_tx) + Ok(NetworkSystemClient { + actions: actions_tx, + }) + } +} + +/// Client to interact with the NetworkSystem once it is running. +/// +/// It hides the details of the message-passing behind a convenient API. +#[derive(Clone)] +pub struct NetworkSystemClient { + /// Channel to talk to the server. This field is temporarily public, but it will + /// be set to private once web interface is adapted. + pub actions: UnboundedSender, +} + +#[derive(thiserror::Error, Debug)] +pub enum NetworkSystemError { + #[error("Network state error: {0}")] + StateError(#[from] NetworkStateError), + #[error("Could not talk to the network system: {0}")] + InputError(#[from] SendError), + #[error("Could not read an answer from the network system: {0}")] + OutputError(#[from] RecvError), +} + +// TODO: add a NetworkSystemError type +impl NetworkSystemClient { + /// Returns the general state. + pub async fn get_state(&self) -> Result { + let (tx, rx) = oneshot::channel(); + self.actions.send(Action::GetGeneralState(tx))?; + Ok(rx.await?) + } + + /// Updates the network general state. + pub fn update_state(&self, state: GeneralState) -> Result<(), NetworkSystemError> { + self.actions.send(Action::UpdateGeneralState(state))?; + Ok(()) + } + + /// Returns the collection of network devices. + pub async fn get_devices(&self) -> Result, NetworkSystemError> { + let (tx, rx) = oneshot::channel(); + self.actions.send(Action::GetDevices(tx))?; + Ok(rx.await?) } } From f0dddd19e01b8a6cd77f7c8b2d51a25731fa5c10 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Imobach=20Gonz=C3=A1lez=20Sosa?= Date: Sat, 13 Apr 2024 10:10:39 +0100 Subject: [PATCH 3/7] Partially adapt the network web API to the new NetworkSystem API --- rust/agama-server/src/network/web.rs | 61 +++++++++++++++++----------- 1 file changed, 38 insertions(+), 23 deletions(-) diff --git a/rust/agama-server/src/network/web.rs b/rust/agama-server/src/network/web.rs index c27c0d0681..7b78df747c 100644 --- a/rust/agama-server/src/network/web.rs +++ b/rust/agama-server/src/network/web.rs @@ -12,6 +12,7 @@ use axum::{ use super::{ error::NetworkStateError, model::{AccessPoint, GeneralState}, + system::{NetworkSystemClient, NetworkSystemError}, Action, Adapter, }; @@ -21,7 +22,7 @@ use agama_lib::network::settings::NetworkConnection; use serde_json::json; use thiserror::Error; -use tokio::sync::{mpsc::UnboundedSender, oneshot}; +use tokio::sync::oneshot; #[derive(Error, Debug)] pub enum NetworkError { @@ -35,8 +36,11 @@ pub enum NetworkError { CannotUpdate(String), #[error("Cannot apply configuration")] CannotApplyConfig, + // TODO: to be removed after adapting to the NetworkSystemServer API #[error("Network state error: {0}")] Error(#[from] NetworkStateError), + #[error("Network system error: {0}")] + SystemError(#[from] NetworkSystemError), } impl IntoResponse for NetworkError { @@ -50,7 +54,7 @@ impl IntoResponse for NetworkError { #[derive(Clone)] struct NetworkState { - actions: UnboundedSender, + network: NetworkSystemClient, } /// Sets up and returns the axum service for the network module. @@ -61,8 +65,8 @@ pub async fn network_service( adapter: T, ) -> Result { let network = NetworkSystem::new(dbus.clone(), adapter); - let actions = network.start().await?; - let state = NetworkState { actions }; + let client = network.start().await?; + let state = NetworkState { network: client }; Ok(Router::new() .route("/state", get(general_state).put(update_general_state)) @@ -84,7 +88,11 @@ pub async fn network_service( ))] async fn general_state(State(state): State) -> Json { let (tx, rx) = oneshot::channel(); - state.actions.send(Action::GetGeneralState(tx)).unwrap(); + state + .network + .actions + .send(Action::GetGeneralState(tx)) + .unwrap(); let state = rx.await.unwrap(); @@ -98,15 +106,8 @@ async fn update_general_state( State(state): State, Json(value): Json, ) -> Result, NetworkError> { - state - .actions - .send(Action::UpdateGeneralState(value.clone())) - .unwrap(); - - let (tx, rx) = oneshot::channel(); - state.actions.send(Action::GetGeneralState(tx)).unwrap(); - let state = rx.await.unwrap(); - + state.network.update_state(value)?; + let state = state.network.get_state().await?; Ok(Json(state)) } @@ -115,10 +116,14 @@ async fn update_general_state( ))] async fn wifi_networks(State(state): State) -> Json> { let (tx, rx) = oneshot::channel(); - state.actions.send(Action::RefreshScan(tx)).unwrap(); + state.network.actions.send(Action::RefreshScan(tx)).unwrap(); let _ = rx.await.unwrap(); let (tx, rx) = oneshot::channel(); - state.actions.send(Action::GetAccessPoints(tx)).unwrap(); + state + .network + .actions + .send(Action::GetAccessPoints(tx)) + .unwrap(); let access_points = rx.await.unwrap(); @@ -136,11 +141,8 @@ async fn wifi_networks(State(state): State) -> Json) ))] -async fn devices(State(state): State) -> Json> { - let (tx, rx) = oneshot::channel(); - state.actions.send(Action::GetDevices(tx)).unwrap(); - - Json(rx.await.unwrap()) +async fn devices(State(state): State) -> Result>, NetworkError> { + Ok(Json(state.network.get_devices().await?)) } #[utoipa::path(get, path = "/network/connections", responses( @@ -148,7 +150,11 @@ async fn devices(State(state): State) -> Json> { ))] async fn connections(State(state): State) -> Json> { let (tx, rx) = oneshot::channel(); - state.actions.send(Action::GetConnections(tx)).unwrap(); + state + .network + .actions + .send(Action::GetConnections(tx)) + .unwrap(); let connections = rx.await.unwrap(); let connections = connections .iter() @@ -171,6 +177,7 @@ async fn add_connection( let id = conn.id.clone(); state + .network .actions .send(Action::NewConnection(Box::new(conn.clone()), tx)) .unwrap(); @@ -178,6 +185,7 @@ async fn add_connection( let (tx, rx) = oneshot::channel(); state + .network .actions .send(Action::GetConnection(id.clone(), tx)) .unwrap(); @@ -197,6 +205,7 @@ async fn delete_connection( ) -> impl IntoResponse { let (tx, rx) = oneshot::channel(); state + .network .actions .send(Action::RemoveConnection(id, tx)) .unwrap(); @@ -217,6 +226,7 @@ async fn update_connection( ) -> Result, NetworkError> { let (tx, rx) = oneshot::channel(); state + .network .actions .send(Action::GetConnection(id.clone(), tx)) .unwrap(); @@ -231,6 +241,7 @@ async fn update_connection( let (tx, rx) = oneshot::channel(); state + .network .actions .send(Action::UpdateConnection(Box::new(conn), tx)) .unwrap(); @@ -247,6 +258,7 @@ async fn connect( ) -> Result, NetworkError> { let (tx, rx) = oneshot::channel(); state + .network .actions .send(Action::GetConnection(id.clone(), tx)) .unwrap(); @@ -258,6 +270,7 @@ async fn connect( let (tx, rx) = oneshot::channel(); state + .network .actions .send(Action::UpdateConnection(Box::new(conn), tx)) .unwrap(); @@ -278,6 +291,7 @@ async fn disconnect( ) -> Result, NetworkError> { let (tx, rx) = oneshot::channel(); state + .network .actions .send(Action::GetConnection(id.clone(), tx)) .unwrap(); @@ -290,6 +304,7 @@ async fn disconnect( let (tx, rx) = oneshot::channel(); state + .network .actions .send(Action::UpdateConnection(Box::new(current_conn), tx)) .unwrap(); @@ -306,7 +321,7 @@ async fn disconnect( ))] async fn apply(State(state): State) -> Result, NetworkError> { let (tx, rx) = oneshot::channel(); - state.actions.send(Action::Apply(tx)).unwrap(); + state.network.actions.send(Action::Apply(tx)).unwrap(); rx.await .unwrap() From 8dcf81695a6dbad9cde187d2b404da1a0bf76cde Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Imobach=20Gonz=C3=A1lez=20Sosa?= Date: Sat, 13 Apr 2024 10:49:17 +0100 Subject: [PATCH 4/7] Handle errors reading the network configuration --- rust/agama-server/src/network/system.rs | 31 ++++++++++++++----------- rust/agama-server/src/network/web.rs | 11 ++++++--- 2 files changed, 26 insertions(+), 16 deletions(-) diff --git a/rust/agama-server/src/network/system.rs b/rust/agama-server/src/network/system.rs index b687a44a0e..d679e59eae 100644 --- a/rust/agama-server/src/network/system.rs +++ b/rust/agama-server/src/network/system.rs @@ -18,6 +18,20 @@ use tokio::sync::{ use uuid::Uuid; use zbus::zvariant::OwnedObjectPath; +#[derive(thiserror::Error, Debug)] +pub enum NetworkSystemError { + #[error("Network state error: {0}")] + Stater(#[from] NetworkStateError), + #[error("Could not talk to the network system: {0}")] + InputError(#[from] SendError), + #[error("Could not read an answer from the network system: {0}")] + OutputError(#[from] RecvError), + #[error("D-Bus service error: {0}")] + ServiceError(#[from] ServiceError), + #[error("Network backend error: {0}")] + AdapterError(#[from] NetworkAdapterError), +} + /// Represents the network configuration service. /// /// It offers an API to start the service and interact with it by using message @@ -43,7 +57,8 @@ use zbus::zvariant::OwnedObjectPath; /// .expect("Could not start the networking configuration system."); /// /// // Perform some action, like getting the list of devices. -/// let devices = client.get_devices().await.unwrap(); +/// let devices = client.get_devices().await +/// .expect("Could not get the list of devices."); /// # }); /// ``` pub struct NetworkSystem { @@ -70,8 +85,8 @@ impl NetworkSystem { /// /// This function starts the server (using [NetworkSystemServer]) on a separate /// task. All the communication is performed through the returned [NetworkSystemClient]. - pub async fn start(self) -> Result { - let mut state = self.adapter.read(StateConfig::default()).await.unwrap(); + pub async fn start(self) -> Result { + let mut state = self.adapter.read(StateConfig::default()).await?; let (actions_tx, actions_rx) = mpsc::unbounded_channel(); let mut tree = Tree::new(self.connection, actions_tx.clone()); tree.set_connections(&mut state.connections).await?; @@ -104,16 +119,6 @@ pub struct NetworkSystemClient { pub actions: UnboundedSender, } -#[derive(thiserror::Error, Debug)] -pub enum NetworkSystemError { - #[error("Network state error: {0}")] - StateError(#[from] NetworkStateError), - #[error("Could not talk to the network system: {0}")] - InputError(#[from] SendError), - #[error("Could not read an answer from the network system: {0}")] - OutputError(#[from] RecvError), -} - // TODO: add a NetworkSystemError type impl NetworkSystemClient { /// Returns the general state. diff --git a/rust/agama-server/src/network/web.rs b/rust/agama-server/src/network/web.rs index 7b78df747c..78e034ea6b 100644 --- a/rust/agama-server/src/network/web.rs +++ b/rust/agama-server/src/network/web.rs @@ -1,6 +1,7 @@ //! This module implements the web API for the network module. use crate::error::Error; +use anyhow::Context; use axum::{ extract::{Path, State}, http::StatusCode, @@ -17,8 +18,7 @@ use super::{ }; use crate::network::{model::Connection, model::Device, NetworkSystem}; -use agama_lib::error::ServiceError; -use agama_lib::network::settings::NetworkConnection; +use agama_lib::{error::ServiceError, network::settings::NetworkConnection}; use serde_json::json; use thiserror::Error; @@ -65,7 +65,12 @@ pub async fn network_service( adapter: T, ) -> Result { let network = NetworkSystem::new(dbus.clone(), adapter); - let client = network.start().await?; + // FIXME: we are somehow abusing ServiceError. The HTTP/JSON API should have its own + // error type. + let client = network + .start() + .await + .context("Could not start the network configuration service.")?; let state = NetworkState { network: client }; Ok(Router::new() From d54eb8ea10a0a04f87465371c0ed57ffc7cccd04 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Imobach=20Gonz=C3=A1lez=20Sosa?= Date: Mon, 15 Apr 2024 07:21:41 +0100 Subject: [PATCH 5/7] Adapt the remaining web API to the new NetworkSystem --- rust/agama-server/src/network/system.rs | 67 +++++++++++- rust/agama-server/src/network/web.rs | 134 +++++++----------------- 2 files changed, 102 insertions(+), 99 deletions(-) diff --git a/rust/agama-server/src/network/system.rs b/rust/agama-server/src/network/system.rs index d679e59eae..6e93285fd2 100644 --- a/rust/agama-server/src/network/system.rs +++ b/rust/agama-server/src/network/system.rs @@ -8,7 +8,10 @@ use crate::network::{ model::{Connection, GeneralState}, Action, Adapter, NetworkState, }; -use agama_lib::{error::ServiceError, network::types::DeviceType}; +use agama_lib::{ + error::ServiceError, + network::{settings::NetworkConnection, types::DeviceType}, +}; use std::{error::Error, sync::Arc}; use tokio::sync::{ mpsc::{self, error::SendError, UnboundedReceiver, UnboundedSender}, @@ -21,7 +24,7 @@ use zbus::zvariant::OwnedObjectPath; #[derive(thiserror::Error, Debug)] pub enum NetworkSystemError { #[error("Network state error: {0}")] - Stater(#[from] NetworkStateError), + State(#[from] NetworkStateError), #[error("Could not talk to the network system: {0}")] InputError(#[from] SendError), #[error("Could not read an answer from the network system: {0}")] @@ -140,6 +143,66 @@ impl NetworkSystemClient { self.actions.send(Action::GetDevices(tx))?; Ok(rx.await?) } + + /// Returns the collection of network connections. + pub async fn get_connections(&self) -> Result, NetworkSystemError> { + let (tx, rx) = oneshot::channel(); + self.actions.send(Action::GetConnections(tx))?; + Ok(rx.await?) + } + + /// Adds a new connection. + pub async fn add_connection(&self, connection: Connection) -> Result<(), NetworkSystemError> { + let (tx, rx) = oneshot::channel(); + self.actions + .send(Action::NewConnection(Box::new(connection.clone()), tx))?; + let result = rx.await?; + Ok(result?) + } + + /// Returns the connection with the given ID. + /// + /// * `id`: Connection ID. + pub async fn get_connection(&self, id: &str) -> Result, NetworkSystemError> { + let (tx, rx) = oneshot::channel(); + self.actions + .send(Action::GetConnection(id.to_string(), tx))?; + let result = rx.await?; + Ok(result) + } + + /// Updates the connection. + /// + /// * `connection`: Updated connection. + pub async fn update_connection( + &self, + connection: Connection, + ) -> Result<(), NetworkSystemError> { + let (tx, rx) = oneshot::channel(); + self.actions + .send(Action::UpdateConnection(Box::new(connection), tx))?; + let result = rx.await?; + Ok(result?) + } + + /// Removes the connection with the given ID. + /// + /// * `id`: Connection ID. + pub async fn remove_connection(&self, id: &str) -> Result<(), NetworkSystemError> { + let (tx, rx) = oneshot::channel(); + self.actions + .send(Action::RemoveConnection(id.to_string(), tx))?; + let result = rx.await?; + Ok(result?) + } + + /// Applies the network configuration. + pub async fn apply(&self) -> Result<(), NetworkSystemError> { + let (tx, rx) = oneshot::channel(); + self.actions.send(Action::Apply(tx))?; + let result = rx.await?; + Ok(result?) + } } struct NetworkSystemServer { diff --git a/rust/agama-server/src/network/web.rs b/rust/agama-server/src/network/web.rs index 78e034ea6b..ee2589cf53 100644 --- a/rust/agama-server/src/network/web.rs +++ b/rust/agama-server/src/network/web.rs @@ -153,20 +153,15 @@ async fn devices(State(state): State) -> Result>, #[utoipa::path(get, path = "/network/connections", responses( (status = 200, description = "List of known connections", body = Vec) ))] -async fn connections(State(state): State) -> Json> { - let (tx, rx) = oneshot::channel(); - state - .network - .actions - .send(Action::GetConnections(tx)) - .unwrap(); - let connections = rx.await.unwrap(); +async fn connections( + State(state): State, +) -> Result>, NetworkError> { + let connections = state.network.get_connections().await?; let connections = connections .iter() .map(|c| NetworkConnection::try_from(c.clone()).unwrap()) .collect(); - - Json(connections) + Ok(Json(connections)) } #[utoipa::path(post, path = "/network/connections", responses( @@ -176,26 +171,11 @@ async fn add_connection( State(state): State, Json(conn): Json, ) -> Result, NetworkError> { - let (tx, rx) = oneshot::channel(); - let conn = Connection::try_from(conn)?; let id = conn.id.clone(); - state - .network - .actions - .send(Action::NewConnection(Box::new(conn.clone()), tx)) - .unwrap(); - let _ = rx.await.unwrap(); - - let (tx, rx) = oneshot::channel(); - state - .network - .actions - .send(Action::GetConnection(id.clone(), tx)) - .unwrap(); - - match rx.await.unwrap() { + state.network.add_connection(conn).await?; + match state.network.get_connection(&id).await? { None => Err(NetworkError::CannotAddConnection(id.clone())), Some(conn) => Ok(Json(conn)), } @@ -208,13 +188,7 @@ async fn delete_connection( State(state): State, Path(id): Path, ) -> impl IntoResponse { - let (tx, rx) = oneshot::channel(); - state - .network - .actions - .send(Action::RemoveConnection(id, tx)) - .unwrap(); - if rx.await.unwrap().is_ok() { + if state.network.remove_connection(&id).await.is_ok() { StatusCode::NO_CONTENT } else { StatusCode::NOT_FOUND @@ -222,115 +196,81 @@ async fn delete_connection( } #[utoipa::path(put, path = "/network/connections/:id", responses( - (status = 200, description = "Update connection", body = Connection) + (status = 204, description = "Update connection", body = Connection) ))] async fn update_connection( State(state): State, Path(id): Path, Json(conn): Json, -) -> Result, NetworkError> { - let (tx, rx) = oneshot::channel(); - state +) -> Result { + let orig_conn = state .network - .actions - .send(Action::GetConnection(id.clone(), tx)) - .unwrap(); - let orig_conn = rx.await.unwrap(); + .get_connection(&id) + .await? + .ok_or_else(|| NetworkError::UnknownConnection(id.clone()))?; let mut conn = Connection::try_from(conn)?; - let orig_conn = orig_conn.ok_or_else(|| NetworkError::UnknownConnection(id.clone()))?; if orig_conn.id != id { + // FIXME: why? return Err(NetworkError::UnknownConnection(id)); } else { conn.uuid = orig_conn.uuid; } - let (tx, rx) = oneshot::channel(); - state - .network - .actions - .send(Action::UpdateConnection(Box::new(conn), tx)) - .unwrap(); - - Ok(Json(rx.await.unwrap()?)) + state.network.update_connection(conn).await?; + Ok(StatusCode::NO_CONTENT) } #[utoipa::path(get, path = "/network/connections/:id/connect", responses( - (status = 200, description = "Connect to the given connection", body = String) + (status = 204, description = "Connect to the given connection", body = String) ))] async fn connect( State(state): State, Path(id): Path, -) -> Result, NetworkError> { - let (tx, rx) = oneshot::channel(); - state - .network - .actions - .send(Action::GetConnection(id.clone(), tx)) - .unwrap(); - - let Some(mut conn) = rx.await.unwrap() else { +) -> Result { + let Some(mut conn) = state.network.get_connection(&id).await? else { return Err(NetworkError::UnknownConnection(id)); }; conn.set_up(); - let (tx, rx) = oneshot::channel(); state .network - .actions - .send(Action::UpdateConnection(Box::new(conn), tx)) - .unwrap(); - - rx.await - .unwrap() + .update_connection(conn) + .await .map_err(|_| NetworkError::CannotApplyConfig)?; - Ok(Json(())) + Ok(StatusCode::NO_CONTENT) } #[utoipa::path(get, path = "/network/connections/:id/disconnect", responses( - (status = 200, description = "Connect to the given connection", body = String) + (status = 204, description = "Connect to the given connection", body = String) ))] async fn disconnect( State(state): State, Path(id): Path, -) -> Result, NetworkError> { - let (tx, rx) = oneshot::channel(); - state - .network - .actions - .send(Action::GetConnection(id.clone(), tx)) - .unwrap(); - - let Some(mut current_conn) = rx.await.unwrap() else { +) -> Result { + let Some(mut conn) = state.network.get_connection(&id).await? else { return Err(NetworkError::UnknownConnection(id)); }; + conn.set_down(); - current_conn.set_down(); - - let (tx, rx) = oneshot::channel(); state .network - .actions - .send(Action::UpdateConnection(Box::new(current_conn), tx)) - .unwrap(); - - rx.await - .unwrap() + .update_connection(conn) + .await .map_err(|_| NetworkError::CannotApplyConfig)?; - Ok(Json(())) + Ok(StatusCode::NO_CONTENT) } #[utoipa::path(put, path = "/network/system/apply", responses( - (status = 200, description = "Apply configuration") + (status = 204, description = "Apply configuration") ))] -async fn apply(State(state): State) -> Result, NetworkError> { - let (tx, rx) = oneshot::channel(); - state.network.actions.send(Action::Apply(tx)).unwrap(); - - rx.await - .unwrap() +async fn apply(State(state): State) -> Result { + state + .network + .apply() + .await .map_err(|_| NetworkError::CannotApplyConfig)?; - Ok(Json(())) + Ok(StatusCode::NO_CONTENT) } From fe95106d0d635b4487b691211e0636c185f81a9d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Imobach=20Gonz=C3=A1lez=20Sosa?= Date: Mon, 15 Apr 2024 09:49:53 +0100 Subject: [PATCH 6/7] Adapt some missing web API methods to the new NetworkSystem --- rust/agama-server/src/network/system.rs | 26 ++++++++++++------ rust/agama-server/src/network/web.rs | 36 ++++++++----------------- 2 files changed, 29 insertions(+), 33 deletions(-) diff --git a/rust/agama-server/src/network/system.rs b/rust/agama-server/src/network/system.rs index 6e93285fd2..28de1eed73 100644 --- a/rust/agama-server/src/network/system.rs +++ b/rust/agama-server/src/network/system.rs @@ -1,6 +1,6 @@ use super::{ error::NetworkStateError, - model::{Device, StateConfig}, + model::{AccessPoint, Device, StateConfig}, NetworkAdapterError, }; use crate::network::{ @@ -8,10 +8,7 @@ use crate::network::{ model::{Connection, GeneralState}, Action, Adapter, NetworkState, }; -use agama_lib::{ - error::ServiceError, - network::{settings::NetworkConnection, types::DeviceType}, -}; +use agama_lib::{error::ServiceError, network::types::DeviceType}; use std::{error::Error, sync::Arc}; use tokio::sync::{ mpsc::{self, error::SendError, UnboundedReceiver, UnboundedSender}, @@ -117,9 +114,7 @@ impl NetworkSystem { /// It hides the details of the message-passing behind a convenient API. #[derive(Clone)] pub struct NetworkSystemClient { - /// Channel to talk to the server. This field is temporarily public, but it will - /// be set to private once web interface is adapted. - pub actions: UnboundedSender, + actions: UnboundedSender, } // TODO: add a NetworkSystemError type @@ -203,6 +198,21 @@ impl NetworkSystemClient { let result = rx.await?; Ok(result?) } + + /// Returns the collection of access points. + pub async fn get_access_points(&self) -> Result, NetworkSystemError> { + let (tx, rx) = oneshot::channel(); + self.actions.send(Action::GetAccessPoints(tx))?; + let access_points = rx.await?; + Ok(access_points) + } + + pub async fn wifi_scan(&self) -> Result<(), NetworkSystemError> { + let (tx, rx) = oneshot::channel(); + self.actions.send(Action::RefreshScan(tx)).unwrap(); + let result = rx.await?; + Ok(result?) + } } struct NetworkSystemServer { diff --git a/rust/agama-server/src/network/web.rs b/rust/agama-server/src/network/web.rs index ee2589cf53..cfec6fa602 100644 --- a/rust/agama-server/src/network/web.rs +++ b/rust/agama-server/src/network/web.rs @@ -91,17 +91,11 @@ pub async fn network_service( #[utoipa::path(get, path = "/network/state", responses( (status = 200, description = "Get general network config", body = GenereralState) ))] -async fn general_state(State(state): State) -> Json { - let (tx, rx) = oneshot::channel(); - state - .network - .actions - .send(Action::GetGeneralState(tx)) - .unwrap(); - - let state = rx.await.unwrap(); - - Json(state) +async fn general_state( + State(state): State, +) -> Result, NetworkError> { + let general_state = state.network.get_state().await?; + Ok(Json(general_state)) } #[utoipa::path(put, path = "/network/state", responses( @@ -119,28 +113,20 @@ async fn update_general_state( #[utoipa::path(get, path = "/network/wifi", responses( (status = 200, description = "List of wireless networks", body = Vec) ))] -async fn wifi_networks(State(state): State) -> Json> { - let (tx, rx) = oneshot::channel(); - state.network.actions.send(Action::RefreshScan(tx)).unwrap(); - let _ = rx.await.unwrap(); - let (tx, rx) = oneshot::channel(); - state - .network - .actions - .send(Action::GetAccessPoints(tx)) - .unwrap(); - - let access_points = rx.await.unwrap(); +async fn wifi_networks( + State(state): State, +) -> Result>, NetworkError> { + state.network.wifi_scan().await?; + let access_points = state.network.get_access_points().await?; let mut networks = vec![]; - for ap in access_points { if !ap.ssid.to_string().is_empty() { networks.push(ap); } } - Json(networks) + Ok(Json(networks)) } #[utoipa::path(get, path = "/network/devices", responses( From 0edf191658cff2ac0b4e4886738d58e3024a849e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Imobach=20Gonz=C3=A1lez=20Sosa?= Date: Mon, 15 Apr 2024 09:50:33 +0100 Subject: [PATCH 7/7] Update from code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Knut Alejandro Anderssen González --- rust/agama-server/src/network/system.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/agama-server/src/network/system.rs b/rust/agama-server/src/network/system.rs index 28de1eed73..e5ec6e4780 100644 --- a/rust/agama-server/src/network/system.rs +++ b/rust/agama-server/src/network/system.rs @@ -51,7 +51,7 @@ pub enum NetworkSystemError { /// .expect("Could not connect to Agama's D-Bus server."); /// let network = NetworkSystem::new(dbus, adapter); /// -/// // Start the networking service and get the channel for communication. +/// // Start the networking service and get the client for communication. /// let client = network.start() /// .await /// .expect("Could not start the networking configuration system.");