Skip to content

Commit

Permalink
Refactor the NetworkSystem API (#1147)
Browse files Browse the repository at this point in the history
## The problem

While working in introducing an API to get the runtime configuration of
the network, we decided that we were not happy with the current
`NetworkSystem` API. There was quite some manual to do when using the
API, like dealing with channels and having to explictly move the server
to a separate Tokio task.

## The solution

This change splits the API into smaller pieces:

* `NetworkSystem` is still the main entry point.
* `NetworkSystemClient` wraps the `actions` channel and offers a
higher-level API by hiding the message passing handling.
* `NetworkSystemServer` keeps the network state and dispatches the
actions. Beware that this struct is private and the
`NetworkSystemClient` should be the only way to communicate with it.
* 
You can get a better idea of how it looks like through the following
example (included in the documentation):

```rust
let adapter = NetworkManagerAdapter::from_system()
    .await
    .expect("Could not connect to NetworkManager.");
let dbus = connection()
    .await
    .expect("Could not connect to Agama's D-Bus server.");
let network = NetworkSystem::new(dbus, adapter);

// Start the networking service and get the client for communication.
let actions = network.start()
    .await
    .expect("Could not start the networking configuration system.");

// Perform some action, like getting the list of devices.
let devices = client.get_devices().await.unwrap();
```

## To do

* [ ] Get rid of `NetworkService` as it is not needed anymore. Should we
rename `NetworkSystem*`?.
* [x] Adapt the HTTP/JSON interface to the new API (partially done).
  • Loading branch information
imobachgs authored Apr 15, 2024
2 parents 748cfbb + 0edf191 commit 12587e1
Show file tree
Hide file tree
Showing 3 changed files with 285 additions and 185 deletions.
14 changes: 3 additions & 11 deletions rust/agama-server/src/network/dbus/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
//! This module defines a D-Bus service which exposes Agama's network configuration.
use crate::network::{Adapter, NetworkSystem};
use std::error::Error;
use tokio;
use zbus::Connection;

/// Represents the Agama networking D-Bus service.
///
/// It is responsible for starting the [NetworkSystem] on a different thread.
/// TODO: this struct might not be needed anymore.
pub struct NetworkService;

impl NetworkService {
Expand All @@ -17,16 +17,8 @@ impl NetworkService {
connection: &Connection,
adapter: T,
) -> Result<(), Box<dyn Error>> {
let mut network = NetworkSystem::new(connection.clone(), adapter);

tokio::spawn(async move {
network
.setup()
.await
.expect("Could not set up the D-Bus tree");

network.listen().await;
});
let network = NetworkSystem::new(connection.clone(), adapter);
network.start().await?;
Ok(())
}
}
249 changes: 211 additions & 38 deletions rust/agama-server/src/network/system.rs
Original file line number Diff line number Diff line change
@@ -1,67 +1,233 @@
use super::{error::NetworkStateError, model::StateConfig, NetworkAdapterError};
use crate::network::{dbus::Tree, model::Connection, Action, Adapter, NetworkState};
use agama_lib::network::types::DeviceType;
use super::{
error::NetworkStateError,
model::{AccessPoint, Device, StateConfig},
NetworkAdapterError,
};
use crate::network::{
dbus::Tree,
model::{Connection, GeneralState},
Action, Adapter, NetworkState,
};
use agama_lib::{error::ServiceError, network::types::DeviceType};
use std::{error::Error, sync::Arc};
use tokio::sync::{
mpsc::{self, UnboundedReceiver, UnboundedSender},
mpsc::{self, error::SendError, UnboundedReceiver, UnboundedSender},
oneshot::{self, error::RecvError},
Mutex,
};
use uuid::Uuid;
use zbus::zvariant::OwnedObjectPath;

/// Represents the network system using holding the state and setting up the D-Bus tree.
pub struct NetworkSystem<T: Adapter> {
/// Network state
pub state: NetworkState,
/// Side of the channel to send actions.
actions_tx: UnboundedSender<Action>,
actions_rx: UnboundedReceiver<Action>,
tree: Arc<Mutex<Tree>>,
/// Adapter to read/write the network state.
#[derive(thiserror::Error, Debug)]
pub enum NetworkSystemError {
#[error("Network state error: {0}")]
State(#[from] NetworkStateError),
#[error("Could not talk to the network system: {0}")]
InputError(#[from] SendError<Action>),
#[error("Could not read an answer from the network system: {0}")]
OutputError(#[from] RecvError),
#[error("D-Bus service error: {0}")]
ServiceError(#[from] ServiceError),
#[error("Network backend error: {0}")]
AdapterError(#[from] NetworkAdapterError),
}

/// Represents the network configuration service.
///
/// It offers an API to start the service and interact with it by using message
/// passing like the example below.
///
/// ```no_run
/// # use agama_server::network::{Action, NetworkManagerAdapter, NetworkSystem};
/// # use agama_lib::connection;
/// # use tokio::sync::oneshot;
///
/// # tokio_test::block_on(async {
/// let adapter = NetworkManagerAdapter::from_system()
/// .await
/// .expect("Could not connect to NetworkManager.");
/// let dbus = connection()
/// .await
/// .expect("Could not connect to Agama's D-Bus server.");
/// let network = NetworkSystem::new(dbus, adapter);
///
/// // Start the networking service and get the client for communication.
/// let client = network.start()
/// .await
/// .expect("Could not start the networking configuration system.");
///
/// // Perform some action, like getting the list of devices.
/// let devices = client.get_devices().await
/// .expect("Could not get the list of devices.");
/// # });
/// ```
pub struct NetworkSystem<T: Adapter + Send> {
connection: zbus::Connection,
adapter: T,
}

impl<T: Adapter> NetworkSystem<T> {
pub fn new(conn: zbus::Connection, adapter: T) -> Self {
let (actions_tx, actions_rx) = mpsc::unbounded_channel();
let tree = Tree::new(conn, actions_tx.clone());
impl<T: Adapter + Send + 'static> NetworkSystem<T> {
/// Returns a new instance of the network configuration system.
///
/// This function does not start the system. To get it running, you must call
/// the [start](Self::start) method.
///
/// * `connection`: D-Bus connection to publish the network tree.
/// * `adapter`: networking configuration adapter.
pub fn new(connection: zbus::Connection, adapter: T) -> Self {
Self {
state: NetworkState::default(),
actions_tx,
actions_rx,
tree: Arc::new(Mutex::new(tree)),
connection,
adapter,
}
}

/// Writes the network configuration.
pub async fn write(&mut self) -> Result<(), NetworkAdapterError> {
self.adapter.write(&self.state).await?;
self.state = self.adapter.read(StateConfig::default()).await?;
Ok(())
/// Starts the network configuration service and returns a client for communication purposes.
///
/// This function starts the server (using [NetworkSystemServer]) on a separate
/// task. All the communication is performed through the returned [NetworkSystemClient].
pub async fn start(self) -> Result<NetworkSystemClient, NetworkSystemError> {
let mut state = self.adapter.read(StateConfig::default()).await?;
let (actions_tx, actions_rx) = mpsc::unbounded_channel();
let mut tree = Tree::new(self.connection, actions_tx.clone());
tree.set_connections(&mut state.connections).await?;
tree.set_devices(&state.devices).await?;

tokio::spawn(async move {
let mut server = NetworkSystemServer {
state,
input: actions_rx,
adapter: self.adapter,
tree: Arc::new(Mutex::new(tree)),
};

server.listen().await;
});

Ok(NetworkSystemClient {
actions: actions_tx,
})
}
}

/// Client to interact with the NetworkSystem once it is running.
///
/// It hides the details of the message-passing behind a convenient API.
#[derive(Clone)]
pub struct NetworkSystemClient {
actions: UnboundedSender<Action>,
}

/// Returns a clone of the
/// [UnboundedSender](https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.UnboundedSender.html)
/// to execute [actions](Action).
pub fn actions_tx(&self) -> UnboundedSender<Action> {
self.actions_tx.clone()
// TODO: add a NetworkSystemError type
impl NetworkSystemClient {
/// Returns the general state.
pub async fn get_state(&self) -> Result<GeneralState, NetworkSystemError> {
let (tx, rx) = oneshot::channel();
self.actions.send(Action::GetGeneralState(tx))?;
Ok(rx.await?)
}

/// Populates the D-Bus tree with the known devices and connections.
pub async fn setup(&mut self) -> Result<(), Box<dyn Error>> {
self.state = self.adapter.read(StateConfig::default()).await?;
let mut tree = self.tree.lock().await;
tree.set_connections(&mut self.state.connections).await?;
tree.set_devices(&self.state.devices).await?;
/// Updates the network general state.
pub fn update_state(&self, state: GeneralState) -> Result<(), NetworkSystemError> {
self.actions.send(Action::UpdateGeneralState(state))?;
Ok(())
}

/// Returns the collection of network devices.
pub async fn get_devices(&self) -> Result<Vec<Device>, NetworkSystemError> {
let (tx, rx) = oneshot::channel();
self.actions.send(Action::GetDevices(tx))?;
Ok(rx.await?)
}

/// Returns the collection of network connections.
pub async fn get_connections(&self) -> Result<Vec<Connection>, NetworkSystemError> {
let (tx, rx) = oneshot::channel();
self.actions.send(Action::GetConnections(tx))?;
Ok(rx.await?)
}

/// Adds a new connection.
pub async fn add_connection(&self, connection: Connection) -> Result<(), NetworkSystemError> {
let (tx, rx) = oneshot::channel();
self.actions
.send(Action::NewConnection(Box::new(connection.clone()), tx))?;
let result = rx.await?;
Ok(result?)
}

/// Returns the connection with the given ID.
///
/// * `id`: Connection ID.
pub async fn get_connection(&self, id: &str) -> Result<Option<Connection>, NetworkSystemError> {
let (tx, rx) = oneshot::channel();
self.actions
.send(Action::GetConnection(id.to_string(), tx))?;
let result = rx.await?;
Ok(result)
}

/// Updates the connection.
///
/// * `connection`: Updated connection.
pub async fn update_connection(
&self,
connection: Connection,
) -> Result<(), NetworkSystemError> {
let (tx, rx) = oneshot::channel();
self.actions
.send(Action::UpdateConnection(Box::new(connection), tx))?;
let result = rx.await?;
Ok(result?)
}

/// Removes the connection with the given ID.
///
/// * `id`: Connection ID.
pub async fn remove_connection(&self, id: &str) -> Result<(), NetworkSystemError> {
let (tx, rx) = oneshot::channel();
self.actions
.send(Action::RemoveConnection(id.to_string(), tx))?;
let result = rx.await?;
Ok(result?)
}

/// Applies the network configuration.
pub async fn apply(&self) -> Result<(), NetworkSystemError> {
let (tx, rx) = oneshot::channel();
self.actions.send(Action::Apply(tx))?;
let result = rx.await?;
Ok(result?)
}

/// Returns the collection of access points.
pub async fn get_access_points(&self) -> Result<Vec<AccessPoint>, NetworkSystemError> {
let (tx, rx) = oneshot::channel();
self.actions.send(Action::GetAccessPoints(tx))?;
let access_points = rx.await?;
Ok(access_points)
}

pub async fn wifi_scan(&self) -> Result<(), NetworkSystemError> {
let (tx, rx) = oneshot::channel();
self.actions.send(Action::RefreshScan(tx)).unwrap();
let result = rx.await?;
Ok(result?)
}
}

struct NetworkSystemServer<T: Adapter> {
state: NetworkState,
input: UnboundedReceiver<Action>,
adapter: T,
tree: Arc<Mutex<Tree>>,
}

impl<T: Adapter> NetworkSystemServer<T> {
/// Process incoming actions.
///
/// This function is expected to be executed on a separate thread.
pub async fn listen(&mut self) {
while let Some(action) = self.actions_rx.recv().await {
while let Some(action) = self.input.recv().await {
if let Err(error) = self.dispatch_action(action).await {
eprintln!("Could not process the action: {}", error);
}
Expand Down Expand Up @@ -245,4 +411,11 @@ impl<T: Adapter> NetworkSystem<T> {
let tree = self.tree.lock().await;
tree.connection_path(conn.uuid)
}

/// Writes the network configuration.
pub async fn write(&mut self) -> Result<(), NetworkAdapterError> {
self.adapter.write(&self.state).await?;
self.state = self.adapter.read(StateConfig::default()).await?;
Ok(())
}
}
Loading

0 comments on commit 12587e1

Please sign in to comment.