Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor the NetworkSystem API #1147

Merged
merged 7 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 3 additions & 11 deletions rust/agama-server/src/network/dbus/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
//! This module defines a D-Bus service which exposes Agama's network configuration.
use crate::network::{Adapter, NetworkSystem};
use std::error::Error;
use tokio;
use zbus::Connection;

/// Represents the Agama networking D-Bus service.
///
/// It is responsible for starting the [NetworkSystem] on a different thread.
/// TODO: this struct might not be needed anymore.
pub struct NetworkService;

impl NetworkService {
Expand All @@ -17,16 +17,8 @@ impl NetworkService {
connection: &Connection,
adapter: T,
) -> Result<(), Box<dyn Error>> {
let mut network = NetworkSystem::new(connection.clone(), adapter);

tokio::spawn(async move {
network
.setup()
.await
.expect("Could not set up the D-Bus tree");

network.listen().await;
});
let network = NetworkSystem::new(connection.clone(), adapter);
network.start().await?;
Ok(())
}
}
249 changes: 211 additions & 38 deletions rust/agama-server/src/network/system.rs
Original file line number Diff line number Diff line change
@@ -1,67 +1,233 @@
use super::{error::NetworkStateError, model::StateConfig, NetworkAdapterError};
use crate::network::{dbus::Tree, model::Connection, Action, Adapter, NetworkState};
use agama_lib::network::types::DeviceType;
use super::{
error::NetworkStateError,
model::{AccessPoint, Device, StateConfig},
NetworkAdapterError,
};
use crate::network::{
dbus::Tree,
model::{Connection, GeneralState},
Action, Adapter, NetworkState,
};
use agama_lib::{error::ServiceError, network::types::DeviceType};
use std::{error::Error, sync::Arc};
use tokio::sync::{
mpsc::{self, UnboundedReceiver, UnboundedSender},
mpsc::{self, error::SendError, UnboundedReceiver, UnboundedSender},
oneshot::{self, error::RecvError},
Mutex,
};
use uuid::Uuid;
use zbus::zvariant::OwnedObjectPath;

/// Represents the network system using holding the state and setting up the D-Bus tree.
pub struct NetworkSystem<T: Adapter> {
/// Network state
pub state: NetworkState,
/// Side of the channel to send actions.
actions_tx: UnboundedSender<Action>,
actions_rx: UnboundedReceiver<Action>,
tree: Arc<Mutex<Tree>>,
/// Adapter to read/write the network state.
#[derive(thiserror::Error, Debug)]
pub enum NetworkSystemError {
#[error("Network state error: {0}")]
State(#[from] NetworkStateError),
#[error("Could not talk to the network system: {0}")]
InputError(#[from] SendError<Action>),
#[error("Could not read an answer from the network system: {0}")]
OutputError(#[from] RecvError),
#[error("D-Bus service error: {0}")]
ServiceError(#[from] ServiceError),
#[error("Network backend error: {0}")]
AdapterError(#[from] NetworkAdapterError),
}

/// Represents the network configuration service.
///
/// It offers an API to start the service and interact with it by using message
/// passing like the example below.
///
/// ```no_run
/// # use agama_server::network::{Action, NetworkManagerAdapter, NetworkSystem};
/// # use agama_lib::connection;
/// # use tokio::sync::oneshot;
///
/// # tokio_test::block_on(async {
/// let adapter = NetworkManagerAdapter::from_system()
/// .await
/// .expect("Could not connect to NetworkManager.");
/// let dbus = connection()
/// .await
/// .expect("Could not connect to Agama's D-Bus server.");
/// let network = NetworkSystem::new(dbus, adapter);
///
/// // Start the networking service and get the client for communication.
/// let client = network.start()
/// .await
/// .expect("Could not start the networking configuration system.");
///
/// // Perform some action, like getting the list of devices.
/// let devices = client.get_devices().await
/// .expect("Could not get the list of devices.");
/// # });
/// ```
pub struct NetworkSystem<T: Adapter + Send> {
connection: zbus::Connection,
adapter: T,
}

impl<T: Adapter> NetworkSystem<T> {
pub fn new(conn: zbus::Connection, adapter: T) -> Self {
let (actions_tx, actions_rx) = mpsc::unbounded_channel();
let tree = Tree::new(conn, actions_tx.clone());
impl<T: Adapter + Send + 'static> NetworkSystem<T> {
/// Returns a new instance of the network configuration system.
///
/// This function does not start the system. To get it running, you must call
/// the [start](Self::start) method.
///
/// * `connection`: D-Bus connection to publish the network tree.
/// * `adapter`: networking configuration adapter.
pub fn new(connection: zbus::Connection, adapter: T) -> Self {
Self {
state: NetworkState::default(),
actions_tx,
actions_rx,
tree: Arc::new(Mutex::new(tree)),
connection,
adapter,
}
}

/// Writes the network configuration.
pub async fn write(&mut self) -> Result<(), NetworkAdapterError> {
self.adapter.write(&self.state).await?;
self.state = self.adapter.read(StateConfig::default()).await?;
Ok(())
/// Starts the network configuration service and returns a client for communication purposes.
///
/// This function starts the server (using [NetworkSystemServer]) on a separate
/// task. All the communication is performed through the returned [NetworkSystemClient].
pub async fn start(self) -> Result<NetworkSystemClient, NetworkSystemError> {
let mut state = self.adapter.read(StateConfig::default()).await?;
let (actions_tx, actions_rx) = mpsc::unbounded_channel();
let mut tree = Tree::new(self.connection, actions_tx.clone());
tree.set_connections(&mut state.connections).await?;
tree.set_devices(&state.devices).await?;

tokio::spawn(async move {
let mut server = NetworkSystemServer {
state,
input: actions_rx,
adapter: self.adapter,
tree: Arc::new(Mutex::new(tree)),
};

server.listen().await;
});

Ok(NetworkSystemClient {
actions: actions_tx,
})
}
}

/// Client to interact with the NetworkSystem once it is running.
///
/// It hides the details of the message-passing behind a convenient API.
#[derive(Clone)]
pub struct NetworkSystemClient {
actions: UnboundedSender<Action>,
}

/// Returns a clone of the
/// [UnboundedSender](https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.UnboundedSender.html)
/// to execute [actions](Action).
pub fn actions_tx(&self) -> UnboundedSender<Action> {
self.actions_tx.clone()
// TODO: add a NetworkSystemError type
impl NetworkSystemClient {
/// Returns the general state.
pub async fn get_state(&self) -> Result<GeneralState, NetworkSystemError> {
let (tx, rx) = oneshot::channel();
self.actions.send(Action::GetGeneralState(tx))?;
Ok(rx.await?)
}

/// Populates the D-Bus tree with the known devices and connections.
pub async fn setup(&mut self) -> Result<(), Box<dyn Error>> {
self.state = self.adapter.read(StateConfig::default()).await?;
let mut tree = self.tree.lock().await;
tree.set_connections(&mut self.state.connections).await?;
tree.set_devices(&self.state.devices).await?;
/// Updates the network general state.
pub fn update_state(&self, state: GeneralState) -> Result<(), NetworkSystemError> {
self.actions.send(Action::UpdateGeneralState(state))?;
Ok(())
}

/// Returns the collection of network devices.
pub async fn get_devices(&self) -> Result<Vec<Device>, NetworkSystemError> {
let (tx, rx) = oneshot::channel();
self.actions.send(Action::GetDevices(tx))?;
Ok(rx.await?)
}

/// Returns the collection of network connections.
pub async fn get_connections(&self) -> Result<Vec<Connection>, NetworkSystemError> {
let (tx, rx) = oneshot::channel();
self.actions.send(Action::GetConnections(tx))?;
Ok(rx.await?)
}

/// Adds a new connection.
pub async fn add_connection(&self, connection: Connection) -> Result<(), NetworkSystemError> {
let (tx, rx) = oneshot::channel();
self.actions
.send(Action::NewConnection(Box::new(connection.clone()), tx))?;
let result = rx.await?;
Ok(result?)
}

/// Returns the connection with the given ID.
///
/// * `id`: Connection ID.
pub async fn get_connection(&self, id: &str) -> Result<Option<Connection>, NetworkSystemError> {
let (tx, rx) = oneshot::channel();
self.actions
.send(Action::GetConnection(id.to_string(), tx))?;
let result = rx.await?;
Ok(result)
}

/// Updates the connection.
///
/// * `connection`: Updated connection.
pub async fn update_connection(
&self,
connection: Connection,
) -> Result<(), NetworkSystemError> {
let (tx, rx) = oneshot::channel();
self.actions
.send(Action::UpdateConnection(Box::new(connection), tx))?;
let result = rx.await?;
Ok(result?)
}

/// Removes the connection with the given ID.
///
/// * `id`: Connection ID.
pub async fn remove_connection(&self, id: &str) -> Result<(), NetworkSystemError> {
let (tx, rx) = oneshot::channel();
self.actions
.send(Action::RemoveConnection(id.to_string(), tx))?;
let result = rx.await?;
Ok(result?)
}

/// Applies the network configuration.
pub async fn apply(&self) -> Result<(), NetworkSystemError> {
let (tx, rx) = oneshot::channel();
self.actions.send(Action::Apply(tx))?;
let result = rx.await?;
Ok(result?)
}

/// Returns the collection of access points.
pub async fn get_access_points(&self) -> Result<Vec<AccessPoint>, NetworkSystemError> {
let (tx, rx) = oneshot::channel();
self.actions.send(Action::GetAccessPoints(tx))?;
let access_points = rx.await?;
Ok(access_points)
}

pub async fn wifi_scan(&self) -> Result<(), NetworkSystemError> {
let (tx, rx) = oneshot::channel();
self.actions.send(Action::RefreshScan(tx)).unwrap();
let result = rx.await?;
Ok(result?)
}
}

struct NetworkSystemServer<T: Adapter> {
state: NetworkState,
input: UnboundedReceiver<Action>,
adapter: T,
tree: Arc<Mutex<Tree>>,
}

impl<T: Adapter> NetworkSystemServer<T> {
/// Process incoming actions.
///
/// This function is expected to be executed on a separate thread.
pub async fn listen(&mut self) {
while let Some(action) = self.actions_rx.recv().await {
while let Some(action) = self.input.recv().await {
if let Err(error) = self.dispatch_action(action).await {
eprintln!("Could not process the action: {}", error);
}
Expand Down Expand Up @@ -245,4 +411,11 @@ impl<T: Adapter> NetworkSystem<T> {
let tree = self.tree.lock().await;
tree.connection_path(conn.uuid)
}

/// Writes the network configuration.
pub async fn write(&mut self) -> Result<(), NetworkAdapterError> {
self.adapter.write(&self.state).await?;
self.state = self.adapter.read(StateConfig::default()).await?;
Ok(())
}
}
Loading
Loading