diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 87d2ca5210..1a7deead07 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -107,6 +107,7 @@ dependencies = [ "macaddr", "once_cell", "pam", + "pin-project", "rand", "regex", "serde", @@ -2276,18 +2277,18 @@ dependencies = [ [[package]] name = "pin-project" -version = "1.1.4" +version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0302c4a0442c456bd56f841aee5c3bfd17967563f6fadc9ceb9f9c23cf3807e0" +checksum = "b6bf43b791c5b9e34c3d182969b4abb522f9343702850a2e57f460d00d09b4b3" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.1.4" +version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "266c042b60c9c76b8d53061e52b2e0d1116abc57cefc8c5cd671619a56ac3690" +checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", diff --git a/rust/agama-lib/src/progress.rs b/rust/agama-lib/src/progress.rs index b79db61195..ab9bd5aa89 100644 --- a/rust/agama-lib/src/progress.rs +++ b/rust/agama-lib/src/progress.rs @@ -78,6 +78,19 @@ impl Progress { finished: finished?, }) } + + pub fn from_cached_proxy(proxy: &crate::proxies::ProgressProxy<'_>) -> Option { + let (current_step, current_title) = proxy.cached_current_step().ok()??; + let max_steps = proxy.cached_total_steps().ok()??; + let finished = proxy.cached_finished().ok()??; + + Some(Progress { + current_step, + current_title, + max_steps, + finished, + }) + } } /// Monitorizes and reports the progress of Agama's current operation. diff --git a/rust/agama-server/Cargo.toml b/rust/agama-server/Cargo.toml index d03ff59cc7..1e6cd6652f 100644 --- a/rust/agama-server/Cargo.toml +++ b/rust/agama-server/Cargo.toml @@ -48,6 +48,7 @@ chrono = { version = "0.4.34", default-features = false, features = [ ] } pam = "0.8.0" serde_with = "3.6.1" +pin-project = "1.1.5" [[bin]] name = "agama-dbus-server" diff --git a/rust/agama-server/src/manager/web.rs b/rust/agama-server/src/manager/web.rs index 86c3739537..8f6bc6b08c 100644 --- a/rust/agama-server/src/manager/web.rs +++ b/rust/agama-server/src/manager/web.rs @@ -5,6 +5,8 @@ //! * `manager_service` which returns the Axum service. //! * `manager_stream` which offers an stream that emits the manager events coming from D-Bus. +use std::pin::Pin; + use agama_lib::{ error::ServiceError, manager::{InstallationPhase, ManagerClient}, @@ -25,7 +27,7 @@ use tokio_stream::{Stream, StreamExt}; use crate::{ error::Error, web::{ - common::{service_status_router, service_status_stream}, + common::{progress_router, service_status_router}, Event, }, }; @@ -63,47 +65,12 @@ pub struct InstallerStatus { /// Returns a stream that emits manager related events coming from D-Bus. /// -/// It emits the Event::BusyServicesChanged and Event::InstallationPhaseChanged events. +/// It emits the Event::InstallationPhaseChanged event. /// /// * `connection`: D-Bus connection to listen for events. -pub async fn manager_stream(dbus: zbus::Connection) -> Result, Error> { - Ok(StreamExt::merge( - StreamExt::merge( - busy_services_changed_stream(dbus.clone()).await?, - installation_phase_changed_stream(dbus.clone()).await?, - ), - service_status_stream( - dbus, - "org.opensuse.Agama.Manager1", - "/org/opensuse/Agama/Manager1", - ) - .await?, - )) -} - -pub async fn busy_services_changed_stream( - dbus: zbus::Connection, -) -> Result, Error> { - let proxy = Manager1Proxy::new(&dbus).await?; - let stream = proxy - .receive_busy_services_changed() - .await - .then(|change| async move { - if let Ok(busy_services) = change.get().await { - Some(Event::BusyServicesChanged { - services: busy_services, - }) - } else { - None - } - }) - .filter_map(|e| e); - Ok(stream) -} - -pub async fn installation_phase_changed_stream( +pub async fn manager_stream( dbus: zbus::Connection, -) -> Result, Error> { +) -> Result + Send>>, Error> { let proxy = Manager1Proxy::new(&dbus).await?; let stream = proxy .receive_current_installation_phase_changed() @@ -122,17 +89,16 @@ pub async fn installation_phase_changed_stream( } }) .filter_map(|e| e); - Ok(stream) + Ok(Box::pin(stream)) } /// Sets up and returns the axum service for the manager module pub async fn manager_service(dbus: zbus::Connection) -> Result { - let status_route = service_status_router( - &dbus, - "org.opensuse.Agama.Manager1", - "/org/opensuse/Agama/Manager1", - ) - .await?; + const DBUS_SERVICE: &'static str = "org.opensuse.Agama.Manager1"; + const DBUS_PATH: &'static str = "/org/opensuse/Agama/Manager1"; + + let status_router = service_status_router(&dbus, DBUS_SERVICE, DBUS_PATH).await?; + let progress_router = progress_router(&dbus, DBUS_SERVICE, DBUS_PATH).await?; let manager = ManagerClient::new(dbus).await?; let state = ManagerState { manager }; Ok(Router::new() @@ -140,7 +106,8 @@ pub async fn manager_service(dbus: zbus::Connection) -> Result Result, Error> { - Ok(StreamExt::merge( +pub async fn software_stream( + dbus: zbus::Connection, +) -> Result + Send>>, Error> { + let stream = StreamExt::merge( product_changed_stream(dbus.clone()).await?, patterns_changed_stream(dbus.clone()).await?, - )) + ); + Ok(Box::pin(stream)) } async fn product_changed_stream( @@ -122,6 +131,12 @@ fn reason_to_selected_by( /// Sets up and returns the axum service for the software module. pub async fn software_service(dbus: zbus::Connection) -> Result { + const DBUS_SERVICE: &'static str = "org.opensuse.Agama.Software1"; + const DBUS_PATH: &'static str = "/org/opensuse/Agama/Software1"; + + let status_router = service_status_router(&dbus, DBUS_SERVICE, DBUS_PATH).await?; + let progress_router = progress_router(&dbus, DBUS_SERVICE, DBUS_PATH).await?; + let product = ProductClient::new(dbus.clone()).await?; let software = SoftwareClient::new(dbus).await?; let state = SoftwareState { product, software }; @@ -131,6 +146,8 @@ pub async fn software_service(dbus: zbus::Connection) -> Result Result<(), ServiceError> { - let presenter = EventsProgressPresenter::new(events.clone()); let connection = connection().await?; - let mut monitor = ProgressMonitor::new(connection.clone()).await?; - tokio::spawn(async move { - if let Err(error) = monitor.run(presenter).await { - eprintln!("Could not monitor the D-Bus server: {}", error); - } - }); tokio::spawn(run_events_monitor(connection, events.clone())); Ok(()) @@ -80,14 +72,52 @@ pub async fn run_monitor(events: EventsSender) -> Result<(), ServiceError> { /// /// * `connection`: D-Bus connection. /// * `events`: channel to send the events to. -pub async fn run_events_monitor(dbus: zbus::Connection, events: EventsSender) -> Result<(), Error> { - let stream = StreamExt::merge( - manager_stream(dbus.clone()).await?, - software_stream(dbus).await?, +async fn run_events_monitor(dbus: zbus::Connection, events: EventsSender) -> Result<(), Error> { + let mut stream = StreamMap::new(); + + stream.insert("manager", manager_stream(dbus.clone()).await?); + stream.insert( + "manager-status", + service_status_stream( + dbus.clone(), + "org.opensuse.Agama.Manager1", + "/org/opensuse/Agama/Manager1", + ) + .await?, + ); + stream.insert( + "manager-progress", + progress_stream( + dbus.clone(), + "org.opensuse.Agama.Manager1", + "/org/opensuse/Agama/Manager1", + ) + .await?, + ); + + stream.insert("software", software_stream(dbus.clone()).await?); + stream.insert( + "software-status", + service_status_stream( + dbus.clone(), + "org.opensuse.Agama.Software1", + "/org/opensuse/Agama/Software1", + ) + .await?, + ); + stream.insert( + "software-progress", + progress_stream( + dbus.clone(), + "org.opensuse.Agama.Software1", + "/org/opensuse/Agama/Software1", + ) + .await?, ); + tokio::pin!(stream); let e = events.clone(); - while let Some(event) = stream.next().await { + while let Some((_, event)) = stream.next().await { _ = e.send(event); } Ok(()) diff --git a/rust/agama-server/src/web/common.rs b/rust/agama-server/src/web/common.rs index f11ffea274..5c927bbe06 100644 --- a/rust/agama-server/src/web/common.rs +++ b/rust/agama-server/src/web/common.rs @@ -1,9 +1,25 @@ //! This module defines functions to be used accross all services. -use agama_lib::{error::ServiceError, proxies::ServiceStatusProxy}; -use axum::{extract::State, routing::get, Json, Router}; +use std::{pin::Pin, task::Poll}; + +use agama_lib::{ + error::ServiceError, + progress::Progress, + proxies::{ProgressProxy, ServiceStatusProxy}, +}; +use axum::{ + extract::State, + http::StatusCode, + response::{IntoResponse, Response}, + routing::get, + Json, Router, +}; +use pin_project::pin_project; use serde::Serialize; +use serde_json::json; +use thiserror::Error; use tokio_stream::{Stream, StreamExt}; +use zbus::PropertyStream; use crate::error::Error; @@ -67,6 +83,21 @@ struct ServiceStatus { current: u32, } +#[derive(Error, Debug)] +pub enum ServiceStatusError { + #[error("Service status error: {0}")] + Error(#[from] ServiceError), +} + +impl IntoResponse for ServiceStatusError { + fn into_response(self) -> Response { + let body = json!({ + "error": self.to_string() + }); + (StatusCode::BAD_REQUEST, Json(body)).into_response() + } +} + /// Builds a stream of the changes in the the `org.opensuse.Agama1.ServiceStatus` /// interface of the given D-Bus object. /// @@ -77,7 +108,7 @@ pub async fn service_status_stream( dbus: zbus::Connection, destination: &'static str, path: &'static str, -) -> Result, Error> { +) -> Result + Send>>, Error> { let proxy = build_service_status_proxy(&dbus, destination, path).await?; let stream = proxy .receive_current_changed() @@ -93,7 +124,7 @@ pub async fn service_status_stream( } }) .filter_map(|e| e); - Ok(stream) + Ok(Box::pin(stream)) } async fn build_service_status_proxy<'a>( @@ -108,3 +139,135 @@ async fn build_service_status_proxy<'a>( .await?; Ok(proxy) } + +/// Builds a router to the `org.opensuse.Agama1.Progress` +/// interface of the given D-Bus object. +/// +/// ```no_run +/// # use axum::{extract::State, routing::get, Json, Router}; +/// # use agama_lib::connection; +/// # use agama_server::web::common::progress_router; +/// # use tokio_test; +/// +/// # tokio_test::block_on(async { +/// async fn hello(state: State) {}; +/// +/// #[derive(Clone)] +/// struct HelloWorldState {}; +/// +/// let dbus = connection().await.unwrap(); +/// let progress_router = progress_router( +/// &dbus, "org.opensuse.HelloWorld", "/org/opensuse/hello" +/// ).await.unwrap(); +/// let router: Router = Router::new() +/// .route("/hello", get(hello)) +/// .merge(progress) +/// .with_state(HelloWorldState {}); +/// }); +/// ``` +/// +/// * `dbus`: D-Bus connection. +/// * `destination`: D-Bus service name. +/// * `path`: D-Bus object path. +pub async fn progress_router( + dbus: &zbus::Connection, + destination: &str, + path: &str, +) -> Result, ServiceError> { + let proxy = build_progress_proxy(dbus, destination, path).await?; + let state = ProgressState { proxy }; + Ok(Router::new() + .route("/progress", get(progress)) + .with_state(state)) +} + +#[derive(Clone)] +struct ProgressState<'a> { + proxy: ProgressProxy<'a>, +} + +#[derive(Error, Debug)] +pub enum ProgressError { + #[error("Progress error: {0}")] + Error(#[from] ServiceError), + #[error("D-Bus error: {0}")] + DBusError(#[from] zbus::Error), +} + +impl IntoResponse for ProgressError { + fn into_response(self) -> Response { + let body = json!({ + "error": self.to_string() + }); + (StatusCode::BAD_REQUEST, Json(body)).into_response() + } +} + +async fn progress(State(state): State>) -> Result, ProgressError> { + let proxy = state.proxy; + let progress = Progress::from_proxy(&proxy).await?; + Ok(Json(progress)) +} + +#[pin_project] +pub struct ProgressStream<'a> { + #[pin] + inner: PropertyStream<'a, (u32, String)>, + proxy: ProgressProxy<'a>, +} + +pub async fn progress_stream<'a>( + dbus: zbus::Connection, + destination: &'static str, + path: &'static str, +) -> Result + Send>>, zbus::Error> { + let proxy = build_progress_proxy(&dbus, destination, path).await?; + Ok(Box::pin(ProgressStream::new(proxy).await)) +} + +impl<'a> ProgressStream<'a> { + pub async fn new(proxy: ProgressProxy<'a>) -> Self { + let stream = proxy.receive_current_step_changed().await; + ProgressStream { + inner: stream, + proxy, + } + } +} + +impl<'a> Stream for ProgressStream<'a> { + type Item = Event; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let pinned = self.project(); + match pinned.inner.poll_next(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(_change) => match Progress::from_cached_proxy(&pinned.proxy) { + Some(progress) => { + let event = Event::Progress { + progress, + service: pinned.proxy.destination().to_string(), + }; + Poll::Ready(Some(event)) + } + _ => Poll::Pending, + }, + } + } +} + +async fn build_progress_proxy<'a>( + dbus: &zbus::Connection, + destination: &str, + path: &str, +) -> Result, zbus::Error> { + let proxy = ProgressProxy::builder(&dbus) + .destination(destination.to_string())? + .path(path.to_string())? + .build() + .await?; + Ok(proxy) +} diff --git a/rust/agama-server/src/web/event.rs b/rust/agama-server/src/web/event.rs index a67317cdd0..045d4804ba 100644 --- a/rust/agama-server/src/web/event.rs +++ b/rust/agama-server/src/web/event.rs @@ -8,13 +8,28 @@ use tokio::sync::broadcast::{Receiver, Sender}; #[serde(tag = "type")] pub enum Event { L10nConfigChanged(LocaleConfig), - LocaleChanged { locale: String }, - Progress(Progress), - ProductChanged { id: String }, + LocaleChanged { + locale: String, + }, + Progress { + service: String, + #[serde(flatten)] + progress: Progress, + }, + ProductChanged { + id: String, + }, PatternsChanged(HashMap), - InstallationPhaseChanged { phase: InstallationPhase }, - BusyServicesChanged { services: Vec }, - ServiceStatusChanged { service: String, status: u32 }, + InstallationPhaseChanged { + phase: InstallationPhase, + }, + BusyServicesChanged { + services: Vec, + }, + ServiceStatusChanged { + service: String, + status: u32, + }, } pub type EventsSender = Sender; diff --git a/rust/agama-server/src/web/progress.rs b/rust/agama-server/src/web/progress.rs deleted file mode 100644 index c892edd8ed..0000000000 --- a/rust/agama-server/src/web/progress.rs +++ /dev/null @@ -1,40 +0,0 @@ -//! Implements a mechanism to monitor track service progress. - -use super::event::{Event, EventsSender}; -use agama_lib::progress::{Progress, ProgressPresenter}; -use async_trait::async_trait; - -// let presenter = EventsProgressPresenter::new(socket); -// let mut monitor = ProgressMonitor::new(connection).await.unwrap(); -// _ = monitor.run(presenter).await; - -/// Experimental ProgressPresenter to emit progress events over a Events. -pub struct EventsProgressPresenter(EventsSender); - -impl EventsProgressPresenter { - pub fn new(events: EventsSender) -> Self { - Self(events) - } - - pub async fn report_progress(&mut self, progress: &Progress) { - _ = self.0.send(Event::Progress(progress.clone())) - // _ = self.events.send(Message::Text(payload)).await; - } -} - -#[async_trait] -impl ProgressPresenter for EventsProgressPresenter { - async fn start(&mut self, progress: &Progress) { - self.report_progress(progress).await; - } - - async fn update_main(&mut self, progress: &Progress) { - self.report_progress(progress).await; - } - - async fn update_detail(&mut self, progress: &Progress) { - self.report_progress(progress).await; - } - - async fn finish(&mut self) {} -}