diff --git a/rust/agama-server/src/network/dbus/service.rs b/rust/agama-server/src/network/dbus/service.rs index b085cf4855..553020b2ae 100644 --- a/rust/agama-server/src/network/dbus/service.rs +++ b/rust/agama-server/src/network/dbus/service.rs @@ -3,12 +3,12 @@ //! This module defines a D-Bus service which exposes Agama's network configuration. use crate::network::{Adapter, NetworkSystem}; use std::error::Error; -use tokio; use zbus::Connection; /// Represents the Agama networking D-Bus service. /// /// It is responsible for starting the [NetworkSystem] on a different thread. +/// TODO: this struct might not be needed anymore. pub struct NetworkService; impl NetworkService { @@ -17,16 +17,8 @@ impl NetworkService { connection: &Connection, adapter: T, ) -> Result<(), Box> { - let mut network = NetworkSystem::new(connection.clone(), adapter); - - tokio::spawn(async move { - network - .setup() - .await - .expect("Could not set up the D-Bus tree"); - - network.listen().await; - }); + let network = NetworkSystem::new(connection.clone(), adapter); + network.start().await?; Ok(()) } } diff --git a/rust/agama-server/src/network/system.rs b/rust/agama-server/src/network/system.rs index 5b53672af8..e5ec6e4780 100644 --- a/rust/agama-server/src/network/system.rs +++ b/rust/agama-server/src/network/system.rs @@ -1,67 +1,233 @@ -use super::{error::NetworkStateError, model::StateConfig, NetworkAdapterError}; -use crate::network::{dbus::Tree, model::Connection, Action, Adapter, NetworkState}; -use agama_lib::network::types::DeviceType; +use super::{ + error::NetworkStateError, + model::{AccessPoint, Device, StateConfig}, + NetworkAdapterError, +}; +use crate::network::{ + dbus::Tree, + model::{Connection, GeneralState}, + Action, Adapter, NetworkState, +}; +use agama_lib::{error::ServiceError, network::types::DeviceType}; use std::{error::Error, sync::Arc}; use tokio::sync::{ - mpsc::{self, UnboundedReceiver, UnboundedSender}, + mpsc::{self, error::SendError, UnboundedReceiver, UnboundedSender}, + oneshot::{self, error::RecvError}, Mutex, }; use uuid::Uuid; use zbus::zvariant::OwnedObjectPath; -/// Represents the network system using holding the state and setting up the D-Bus tree. -pub struct NetworkSystem { - /// Network state - pub state: NetworkState, - /// Side of the channel to send actions. - actions_tx: UnboundedSender, - actions_rx: UnboundedReceiver, - tree: Arc>, - /// Adapter to read/write the network state. +#[derive(thiserror::Error, Debug)] +pub enum NetworkSystemError { + #[error("Network state error: {0}")] + State(#[from] NetworkStateError), + #[error("Could not talk to the network system: {0}")] + InputError(#[from] SendError), + #[error("Could not read an answer from the network system: {0}")] + OutputError(#[from] RecvError), + #[error("D-Bus service error: {0}")] + ServiceError(#[from] ServiceError), + #[error("Network backend error: {0}")] + AdapterError(#[from] NetworkAdapterError), +} + +/// Represents the network configuration service. +/// +/// It offers an API to start the service and interact with it by using message +/// passing like the example below. +/// +/// ```no_run +/// # use agama_server::network::{Action, NetworkManagerAdapter, NetworkSystem}; +/// # use agama_lib::connection; +/// # use tokio::sync::oneshot; +/// +/// # tokio_test::block_on(async { +/// let adapter = NetworkManagerAdapter::from_system() +/// .await +/// .expect("Could not connect to NetworkManager."); +/// let dbus = connection() +/// .await +/// .expect("Could not connect to Agama's D-Bus server."); +/// let network = NetworkSystem::new(dbus, adapter); +/// +/// // Start the networking service and get the client for communication. +/// let client = network.start() +/// .await +/// .expect("Could not start the networking configuration system."); +/// +/// // Perform some action, like getting the list of devices. +/// let devices = client.get_devices().await +/// .expect("Could not get the list of devices."); +/// # }); +/// ``` +pub struct NetworkSystem { + connection: zbus::Connection, adapter: T, } -impl NetworkSystem { - pub fn new(conn: zbus::Connection, adapter: T) -> Self { - let (actions_tx, actions_rx) = mpsc::unbounded_channel(); - let tree = Tree::new(conn, actions_tx.clone()); +impl NetworkSystem { + /// Returns a new instance of the network configuration system. + /// + /// This function does not start the system. To get it running, you must call + /// the [start](Self::start) method. + /// + /// * `connection`: D-Bus connection to publish the network tree. + /// * `adapter`: networking configuration adapter. + pub fn new(connection: zbus::Connection, adapter: T) -> Self { Self { - state: NetworkState::default(), - actions_tx, - actions_rx, - tree: Arc::new(Mutex::new(tree)), + connection, adapter, } } - /// Writes the network configuration. - pub async fn write(&mut self) -> Result<(), NetworkAdapterError> { - self.adapter.write(&self.state).await?; - self.state = self.adapter.read(StateConfig::default()).await?; - Ok(()) + /// Starts the network configuration service and returns a client for communication purposes. + /// + /// This function starts the server (using [NetworkSystemServer]) on a separate + /// task. All the communication is performed through the returned [NetworkSystemClient]. + pub async fn start(self) -> Result { + let mut state = self.adapter.read(StateConfig::default()).await?; + let (actions_tx, actions_rx) = mpsc::unbounded_channel(); + let mut tree = Tree::new(self.connection, actions_tx.clone()); + tree.set_connections(&mut state.connections).await?; + tree.set_devices(&state.devices).await?; + + tokio::spawn(async move { + let mut server = NetworkSystemServer { + state, + input: actions_rx, + adapter: self.adapter, + tree: Arc::new(Mutex::new(tree)), + }; + + server.listen().await; + }); + + Ok(NetworkSystemClient { + actions: actions_tx, + }) } +} + +/// Client to interact with the NetworkSystem once it is running. +/// +/// It hides the details of the message-passing behind a convenient API. +#[derive(Clone)] +pub struct NetworkSystemClient { + actions: UnboundedSender, +} - /// Returns a clone of the - /// [UnboundedSender](https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.UnboundedSender.html) - /// to execute [actions](Action). - pub fn actions_tx(&self) -> UnboundedSender { - self.actions_tx.clone() +// TODO: add a NetworkSystemError type +impl NetworkSystemClient { + /// Returns the general state. + pub async fn get_state(&self) -> Result { + let (tx, rx) = oneshot::channel(); + self.actions.send(Action::GetGeneralState(tx))?; + Ok(rx.await?) } - /// Populates the D-Bus tree with the known devices and connections. - pub async fn setup(&mut self) -> Result<(), Box> { - self.state = self.adapter.read(StateConfig::default()).await?; - let mut tree = self.tree.lock().await; - tree.set_connections(&mut self.state.connections).await?; - tree.set_devices(&self.state.devices).await?; + /// Updates the network general state. + pub fn update_state(&self, state: GeneralState) -> Result<(), NetworkSystemError> { + self.actions.send(Action::UpdateGeneralState(state))?; Ok(()) } + /// Returns the collection of network devices. + pub async fn get_devices(&self) -> Result, NetworkSystemError> { + let (tx, rx) = oneshot::channel(); + self.actions.send(Action::GetDevices(tx))?; + Ok(rx.await?) + } + + /// Returns the collection of network connections. + pub async fn get_connections(&self) -> Result, NetworkSystemError> { + let (tx, rx) = oneshot::channel(); + self.actions.send(Action::GetConnections(tx))?; + Ok(rx.await?) + } + + /// Adds a new connection. + pub async fn add_connection(&self, connection: Connection) -> Result<(), NetworkSystemError> { + let (tx, rx) = oneshot::channel(); + self.actions + .send(Action::NewConnection(Box::new(connection.clone()), tx))?; + let result = rx.await?; + Ok(result?) + } + + /// Returns the connection with the given ID. + /// + /// * `id`: Connection ID. + pub async fn get_connection(&self, id: &str) -> Result, NetworkSystemError> { + let (tx, rx) = oneshot::channel(); + self.actions + .send(Action::GetConnection(id.to_string(), tx))?; + let result = rx.await?; + Ok(result) + } + + /// Updates the connection. + /// + /// * `connection`: Updated connection. + pub async fn update_connection( + &self, + connection: Connection, + ) -> Result<(), NetworkSystemError> { + let (tx, rx) = oneshot::channel(); + self.actions + .send(Action::UpdateConnection(Box::new(connection), tx))?; + let result = rx.await?; + Ok(result?) + } + + /// Removes the connection with the given ID. + /// + /// * `id`: Connection ID. + pub async fn remove_connection(&self, id: &str) -> Result<(), NetworkSystemError> { + let (tx, rx) = oneshot::channel(); + self.actions + .send(Action::RemoveConnection(id.to_string(), tx))?; + let result = rx.await?; + Ok(result?) + } + + /// Applies the network configuration. + pub async fn apply(&self) -> Result<(), NetworkSystemError> { + let (tx, rx) = oneshot::channel(); + self.actions.send(Action::Apply(tx))?; + let result = rx.await?; + Ok(result?) + } + + /// Returns the collection of access points. + pub async fn get_access_points(&self) -> Result, NetworkSystemError> { + let (tx, rx) = oneshot::channel(); + self.actions.send(Action::GetAccessPoints(tx))?; + let access_points = rx.await?; + Ok(access_points) + } + + pub async fn wifi_scan(&self) -> Result<(), NetworkSystemError> { + let (tx, rx) = oneshot::channel(); + self.actions.send(Action::RefreshScan(tx)).unwrap(); + let result = rx.await?; + Ok(result?) + } +} + +struct NetworkSystemServer { + state: NetworkState, + input: UnboundedReceiver, + adapter: T, + tree: Arc>, +} + +impl NetworkSystemServer { /// Process incoming actions. /// /// This function is expected to be executed on a separate thread. pub async fn listen(&mut self) { - while let Some(action) = self.actions_rx.recv().await { + while let Some(action) = self.input.recv().await { if let Err(error) = self.dispatch_action(action).await { eprintln!("Could not process the action: {}", error); } @@ -245,4 +411,11 @@ impl NetworkSystem { let tree = self.tree.lock().await; tree.connection_path(conn.uuid) } + + /// Writes the network configuration. + pub async fn write(&mut self) -> Result<(), NetworkAdapterError> { + self.adapter.write(&self.state).await?; + self.state = self.adapter.read(StateConfig::default()).await?; + Ok(()) + } } diff --git a/rust/agama-server/src/network/web.rs b/rust/agama-server/src/network/web.rs index 052070cac0..cfec6fa602 100644 --- a/rust/agama-server/src/network/web.rs +++ b/rust/agama-server/src/network/web.rs @@ -1,6 +1,7 @@ //! This module implements the web API for the network module. use crate::error::Error; +use anyhow::Context; use axum::{ extract::{Path, State}, http::StatusCode, @@ -12,16 +13,16 @@ use axum::{ use super::{ error::NetworkStateError, model::{AccessPoint, GeneralState}, + system::{NetworkSystemClient, NetworkSystemError}, Action, Adapter, }; use crate::network::{model::Connection, model::Device, NetworkSystem}; -use agama_lib::error::ServiceError; -use agama_lib::network::settings::NetworkConnection; +use agama_lib::{error::ServiceError, network::settings::NetworkConnection}; use serde_json::json; use thiserror::Error; -use tokio::sync::{mpsc::UnboundedSender, oneshot}; +use tokio::sync::oneshot; #[derive(Error, Debug)] pub enum NetworkError { @@ -35,8 +36,11 @@ pub enum NetworkError { CannotUpdate(String), #[error("Cannot apply configuration")] CannotApplyConfig, + // TODO: to be removed after adapting to the NetworkSystemServer API #[error("Network state error: {0}")] Error(#[from] NetworkStateError), + #[error("Network system error: {0}")] + SystemError(#[from] NetworkSystemError), } impl IntoResponse for NetworkError { @@ -50,7 +54,7 @@ impl IntoResponse for NetworkError { #[derive(Clone)] struct NetworkState { - actions: UnboundedSender, + network: NetworkSystemClient, } /// Sets up and returns the axum service for the network module. @@ -60,20 +64,14 @@ pub async fn network_service( dbus: zbus::Connection, adapter: T, ) -> Result { - let mut network = NetworkSystem::new(dbus.clone(), adapter); - - let state = NetworkState { - actions: network.actions_tx(), - }; - - tokio::spawn(async move { - network - .setup() - .await - .expect("Could not set up the D-Bus tree"); - - network.listen().await; - }); + let network = NetworkSystem::new(dbus.clone(), adapter); + // FIXME: we are somehow abusing ServiceError. The HTTP/JSON API should have its own + // error type. + let client = network + .start() + .await + .context("Could not start the network configuration service.")?; + let state = NetworkState { network: client }; Ok(Router::new() .route("/state", get(general_state).put(update_general_state)) @@ -93,13 +91,11 @@ pub async fn network_service( #[utoipa::path(get, path = "/network/state", responses( (status = 200, description = "Get general network config", body = GenereralState) ))] -async fn general_state(State(state): State) -> Json { - let (tx, rx) = oneshot::channel(); - state.actions.send(Action::GetGeneralState(tx)).unwrap(); - - let state = rx.await.unwrap(); - - Json(state) +async fn general_state( + State(state): State, +) -> Result, NetworkError> { + let general_state = state.network.get_state().await?; + Ok(Json(general_state)) } #[utoipa::path(put, path = "/network/state", responses( @@ -109,64 +105,49 @@ async fn update_general_state( State(state): State, Json(value): Json, ) -> Result, NetworkError> { - state - .actions - .send(Action::UpdateGeneralState(value.clone())) - .unwrap(); - - let (tx, rx) = oneshot::channel(); - state.actions.send(Action::GetGeneralState(tx)).unwrap(); - let state = rx.await.unwrap(); - + state.network.update_state(value)?; + let state = state.network.get_state().await?; Ok(Json(state)) } #[utoipa::path(get, path = "/network/wifi", responses( (status = 200, description = "List of wireless networks", body = Vec) ))] -async fn wifi_networks(State(state): State) -> Json> { - let (tx, rx) = oneshot::channel(); - state.actions.send(Action::RefreshScan(tx)).unwrap(); - let _ = rx.await.unwrap(); - let (tx, rx) = oneshot::channel(); - state.actions.send(Action::GetAccessPoints(tx)).unwrap(); - - let access_points = rx.await.unwrap(); +async fn wifi_networks( + State(state): State, +) -> Result>, NetworkError> { + state.network.wifi_scan().await?; + let access_points = state.network.get_access_points().await?; let mut networks = vec![]; - for ap in access_points { if !ap.ssid.to_string().is_empty() { networks.push(ap); } } - Json(networks) + Ok(Json(networks)) } #[utoipa::path(get, path = "/network/devices", responses( (status = 200, description = "List of devices", body = Vec) ))] -async fn devices(State(state): State) -> Json> { - let (tx, rx) = oneshot::channel(); - state.actions.send(Action::GetDevices(tx)).unwrap(); - - Json(rx.await.unwrap()) +async fn devices(State(state): State) -> Result>, NetworkError> { + Ok(Json(state.network.get_devices().await?)) } #[utoipa::path(get, path = "/network/connections", responses( (status = 200, description = "List of known connections", body = Vec) ))] -async fn connections(State(state): State) -> Json> { - let (tx, rx) = oneshot::channel(); - state.actions.send(Action::GetConnections(tx)).unwrap(); - let connections = rx.await.unwrap(); +async fn connections( + State(state): State, +) -> Result>, NetworkError> { + let connections = state.network.get_connections().await?; let connections = connections .iter() .map(|c| NetworkConnection::try_from(c.clone()).unwrap()) .collect(); - - Json(connections) + Ok(Json(connections)) } #[utoipa::path(post, path = "/network/connections", responses( @@ -176,24 +157,11 @@ async fn add_connection( State(state): State, Json(conn): Json, ) -> Result, NetworkError> { - let (tx, rx) = oneshot::channel(); - let conn = Connection::try_from(conn)?; let id = conn.id.clone(); - state - .actions - .send(Action::NewConnection(Box::new(conn.clone()), tx)) - .unwrap(); - let _ = rx.await.unwrap(); - - let (tx, rx) = oneshot::channel(); - state - .actions - .send(Action::GetConnection(id.clone(), tx)) - .unwrap(); - - match rx.await.unwrap() { + state.network.add_connection(conn).await?; + match state.network.get_connection(&id).await? { None => Err(NetworkError::CannotAddConnection(id.clone())), Some(conn) => Ok(Json(conn)), } @@ -206,12 +174,7 @@ async fn delete_connection( State(state): State, Path(id): Path, ) -> impl IntoResponse { - let (tx, rx) = oneshot::channel(); - state - .actions - .send(Action::RemoveConnection(id, tx)) - .unwrap(); - if rx.await.unwrap().is_ok() { + if state.network.remove_connection(&id).await.is_ok() { StatusCode::NO_CONTENT } else { StatusCode::NOT_FOUND @@ -219,109 +182,81 @@ async fn delete_connection( } #[utoipa::path(put, path = "/network/connections/:id", responses( - (status = 200, description = "Update connection", body = Connection) + (status = 204, description = "Update connection", body = Connection) ))] async fn update_connection( State(state): State, Path(id): Path, Json(conn): Json, -) -> Result, NetworkError> { - let (tx, rx) = oneshot::channel(); - state - .actions - .send(Action::GetConnection(id.clone(), tx)) - .unwrap(); - let orig_conn = rx.await.unwrap(); +) -> Result { + let orig_conn = state + .network + .get_connection(&id) + .await? + .ok_or_else(|| NetworkError::UnknownConnection(id.clone()))?; let mut conn = Connection::try_from(conn)?; - let orig_conn = orig_conn.ok_or_else(|| NetworkError::UnknownConnection(id.clone()))?; if orig_conn.id != id { + // FIXME: why? return Err(NetworkError::UnknownConnection(id)); } else { conn.uuid = orig_conn.uuid; } - let (tx, rx) = oneshot::channel(); - state - .actions - .send(Action::UpdateConnection(Box::new(conn), tx)) - .unwrap(); - - Ok(Json(rx.await.unwrap()?)) + state.network.update_connection(conn).await?; + Ok(StatusCode::NO_CONTENT) } #[utoipa::path(get, path = "/network/connections/:id/connect", responses( - (status = 200, description = "Connect to the given connection", body = String) + (status = 204, description = "Connect to the given connection", body = String) ))] async fn connect( State(state): State, Path(id): Path, -) -> Result, NetworkError> { - let (tx, rx) = oneshot::channel(); - state - .actions - .send(Action::GetConnection(id.clone(), tx)) - .unwrap(); - - let Some(mut conn) = rx.await.unwrap() else { +) -> Result { + let Some(mut conn) = state.network.get_connection(&id).await? else { return Err(NetworkError::UnknownConnection(id)); }; conn.set_up(); - let (tx, rx) = oneshot::channel(); state - .actions - .send(Action::UpdateConnection(Box::new(conn), tx)) - .unwrap(); - - rx.await - .unwrap() + .network + .update_connection(conn) + .await .map_err(|_| NetworkError::CannotApplyConfig)?; - Ok(Json(())) + Ok(StatusCode::NO_CONTENT) } #[utoipa::path(get, path = "/network/connections/:id/disconnect", responses( - (status = 200, description = "Connect to the given connection", body = String) + (status = 204, description = "Connect to the given connection", body = String) ))] async fn disconnect( State(state): State, Path(id): Path, -) -> Result, NetworkError> { - let (tx, rx) = oneshot::channel(); - state - .actions - .send(Action::GetConnection(id.clone(), tx)) - .unwrap(); - - let Some(mut current_conn) = rx.await.unwrap() else { +) -> Result { + let Some(mut conn) = state.network.get_connection(&id).await? else { return Err(NetworkError::UnknownConnection(id)); }; + conn.set_down(); - current_conn.set_down(); - - let (tx, rx) = oneshot::channel(); state - .actions - .send(Action::UpdateConnection(Box::new(current_conn), tx)) - .unwrap(); - - rx.await - .unwrap() + .network + .update_connection(conn) + .await .map_err(|_| NetworkError::CannotApplyConfig)?; - Ok(Json(())) + Ok(StatusCode::NO_CONTENT) } #[utoipa::path(put, path = "/network/system/apply", responses( - (status = 200, description = "Apply configuration") + (status = 204, description = "Apply configuration") ))] -async fn apply(State(state): State) -> Result, NetworkError> { - let (tx, rx) = oneshot::channel(); - state.actions.send(Action::Apply(tx)).unwrap(); - - rx.await - .unwrap() +async fn apply(State(state): State) -> Result { + state + .network + .apply() + .await .map_err(|_| NetworkError::CannotApplyConfig)?; - Ok(Json(())) + Ok(StatusCode::NO_CONTENT) }