Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose the Progress interface through the HTTP/JSON API #1092

Merged
merged 7 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions rust/agama-lib/src/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,19 @@ impl Progress {
finished: finished?,
})
}

pub fn from_cached_proxy(proxy: &crate::proxies::ProgressProxy<'_>) -> Option<Progress> {
let (current_step, current_title) = proxy.cached_current_step().ok()??;
let max_steps = proxy.cached_total_steps().ok()??;
let finished = proxy.cached_finished().ok()??;

Some(Progress {
current_step,
current_title,
max_steps,
finished,
})
}
}

/// Monitorizes and reports the progress of Agama's current operation.
Expand Down
1 change: 1 addition & 0 deletions rust/agama-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ chrono = { version = "0.4.34", default-features = false, features = [
] }
pam = "0.8.0"
serde_with = "3.6.1"
pin-project = "1.1.5"

[[bin]]
name = "agama-dbus-server"
Expand Down
61 changes: 14 additions & 47 deletions rust/agama-server/src/manager/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
//! * `manager_service` which returns the Axum service.
//! * `manager_stream` which offers an stream that emits the manager events coming from D-Bus.

use std::pin::Pin;

use agama_lib::{
error::ServiceError,
manager::{InstallationPhase, ManagerClient},
Expand All @@ -25,7 +27,7 @@ use tokio_stream::{Stream, StreamExt};
use crate::{
error::Error,
web::{
common::{service_status_router, service_status_stream},
common::{progress_router, service_status_router},
Event,
},
};
Expand Down Expand Up @@ -63,47 +65,12 @@ pub struct InstallerStatus {

/// Returns a stream that emits manager related events coming from D-Bus.
///
/// It emits the Event::BusyServicesChanged and Event::InstallationPhaseChanged events.
/// It emits the Event::InstallationPhaseChanged event.
///
/// * `connection`: D-Bus connection to listen for events.
pub async fn manager_stream(dbus: zbus::Connection) -> Result<impl Stream<Item = Event>, Error> {
Ok(StreamExt::merge(
StreamExt::merge(
busy_services_changed_stream(dbus.clone()).await?,
installation_phase_changed_stream(dbus.clone()).await?,
),
service_status_stream(
dbus,
"org.opensuse.Agama.Manager1",
"/org/opensuse/Agama/Manager1",
)
.await?,
))
}

pub async fn busy_services_changed_stream(
dbus: zbus::Connection,
) -> Result<impl Stream<Item = Event>, Error> {
let proxy = Manager1Proxy::new(&dbus).await?;
let stream = proxy
.receive_busy_services_changed()
.await
.then(|change| async move {
if let Ok(busy_services) = change.get().await {
Some(Event::BusyServicesChanged {
services: busy_services,
})
} else {
None
}
})
.filter_map(|e| e);
Ok(stream)
}

pub async fn installation_phase_changed_stream(
pub async fn manager_stream(
dbus: zbus::Connection,
) -> Result<impl Stream<Item = Event>, Error> {
) -> Result<Pin<Box<dyn Stream<Item = Event> + Send>>, Error> {
let proxy = Manager1Proxy::new(&dbus).await?;
let stream = proxy
.receive_current_installation_phase_changed()
Expand All @@ -122,25 +89,25 @@ pub async fn installation_phase_changed_stream(
}
})
.filter_map(|e| e);
Ok(stream)
Ok(Box::pin(stream))
}

/// Sets up and returns the axum service for the manager module
pub async fn manager_service(dbus: zbus::Connection) -> Result<Router, ServiceError> {
let status_route = service_status_router(
&dbus,
"org.opensuse.Agama.Manager1",
"/org/opensuse/Agama/Manager1",
)
.await?;
const DBUS_SERVICE: &'static str = "org.opensuse.Agama.Manager1";
const DBUS_PATH: &'static str = "/org/opensuse/Agama/Manager1";

let status_router = service_status_router(&dbus, DBUS_SERVICE, DBUS_PATH).await?;
let progress_router = progress_router(&dbus, DBUS_SERVICE, DBUS_PATH).await?;
let manager = ManagerClient::new(dbus).await?;
let state = ManagerState { manager };
Ok(Router::new()
.route("/probe", post(probe_action))
.route("/install", post(install_action))
.route("/finish", post(finish_action))
.route("/installer", get(installer_status))
.merge(status_route)
.merge(status_router)
.merge(progress_router)
.with_state(state))
}

Expand Down
27 changes: 22 additions & 5 deletions rust/agama-server/src/software/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,13 @@
//! * `software_service` which returns the Axum service.
//! * `software_stream` which offers an stream that emits the software events coming from D-Bus.

use crate::{error::Error, web::Event};
use crate::{
error::Error,
web::{
common::{progress_router, service_status_router},
Event,
},
};
use agama_lib::{
error::ServiceError,
product::{Product, ProductClient},
Expand All @@ -23,7 +29,7 @@ use axum::{
};
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::collections::HashMap;
use std::{collections::HashMap, pin::Pin};
use thiserror::Error;
use tokio_stream::{Stream, StreamExt};

Expand Down Expand Up @@ -59,11 +65,14 @@ impl IntoResponse for SoftwareError {
/// It emits the Event::ProductChanged and Event::PatternsChanged events.
///
/// * `connection`: D-Bus connection to listen for events.
pub async fn software_stream(dbus: zbus::Connection) -> Result<impl Stream<Item = Event>, Error> {
Ok(StreamExt::merge(
pub async fn software_stream(
dbus: zbus::Connection,
) -> Result<Pin<Box<dyn Stream<Item = Event> + Send>>, Error> {
let stream = StreamExt::merge(
product_changed_stream(dbus.clone()).await?,
patterns_changed_stream(dbus.clone()).await?,
))
);
Ok(Box::pin(stream))
}

async fn product_changed_stream(
Expand Down Expand Up @@ -122,6 +131,12 @@ fn reason_to_selected_by(

/// Sets up and returns the axum service for the software module.
pub async fn software_service(dbus: zbus::Connection) -> Result<Router, ServiceError> {
const DBUS_SERVICE: &'static str = "org.opensuse.Agama.Software1";
const DBUS_PATH: &'static str = "/org/opensuse/Agama/Software1";

let status_router = service_status_router(&dbus, DBUS_SERVICE, DBUS_PATH).await?;
let progress_router = progress_router(&dbus, DBUS_SERVICE, DBUS_PATH).await?;

let product = ProductClient::new(dbus.clone()).await?;
let software = SoftwareClient::new(dbus).await?;
let state = SoftwareState { product, software };
Expand All @@ -131,6 +146,8 @@ pub async fn software_service(dbus: zbus::Connection) -> Result<Router, ServiceE
.route("/proposal", get(proposal))
.route("/config", put(set_config).get(get_config))
.route("/probe", post(probe))
.merge(status_router)
.merge(progress_router)
.with_state(state);
Ok(router)
}
Expand Down
62 changes: 46 additions & 16 deletions rust/agama-server/src/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
//! * Emit relevant events via websocket.
//! * Serve the code for the web user interface (not implemented yet).

use self::progress::EventsProgressPresenter;
use crate::{
error::Error,
l10n::web::l10n_service,
manager::web::{manager_service, manager_stream},
software::web::{software_service, software_stream},
web::common::{progress_stream, service_status_stream},
};
use axum::Router;

Expand All @@ -19,19 +19,18 @@ mod config;
mod docs;
mod event;
mod http;
mod progress;
mod service;
mod state;
mod ws;

use agama_lib::{connection, error::ServiceError, progress::ProgressMonitor};
use agama_lib::{connection, error::ServiceError};
pub use auth::generate_token;
pub use config::ServiceConfig;
pub use docs::ApiDoc;
pub use event::{Event, EventsReceiver, EventsSender};
pub use service::MainServiceBuilder;
use std::path::Path;
use tokio_stream::StreamExt;
use tokio_stream::{StreamExt, StreamMap};

/// Returns a service that implements the web-based Agama API.
///
Expand Down Expand Up @@ -63,14 +62,7 @@ where
///
/// * `events`: channel to send the events to.
pub async fn run_monitor(events: EventsSender) -> Result<(), ServiceError> {
let presenter = EventsProgressPresenter::new(events.clone());
let connection = connection().await?;
let mut monitor = ProgressMonitor::new(connection.clone()).await?;
tokio::spawn(async move {
if let Err(error) = monitor.run(presenter).await {
eprintln!("Could not monitor the D-Bus server: {}", error);
}
});
tokio::spawn(run_events_monitor(connection, events.clone()));

Ok(())
Expand All @@ -80,14 +72,52 @@ pub async fn run_monitor(events: EventsSender) -> Result<(), ServiceError> {
///
/// * `connection`: D-Bus connection.
/// * `events`: channel to send the events to.
pub async fn run_events_monitor(dbus: zbus::Connection, events: EventsSender) -> Result<(), Error> {
let stream = StreamExt::merge(
manager_stream(dbus.clone()).await?,
software_stream(dbus).await?,
async fn run_events_monitor(dbus: zbus::Connection, events: EventsSender) -> Result<(), Error> {
let mut stream = StreamMap::new();

stream.insert("manager", manager_stream(dbus.clone()).await?);
stream.insert(
"manager-status",
service_status_stream(
dbus.clone(),
"org.opensuse.Agama.Manager1",
"/org/opensuse/Agama/Manager1",
)
.await?,
);
stream.insert(
"manager-progress",
progress_stream(
dbus.clone(),
"org.opensuse.Agama.Manager1",
"/org/opensuse/Agama/Manager1",
)
.await,
);

stream.insert("software", software_stream(dbus.clone()).await?);
stream.insert(
"software-status",
service_status_stream(
dbus.clone(),
"org.opensuse.Agama.Software1",
"/org/opensuse/Agama/Software1",
)
.await?,
);
stream.insert(
"software-progress",
progress_stream(
dbus.clone(),
"org.opensuse.Agama.Software1",
"/org/opensuse/Agama/Software1",
)
.await,
imobachgs marked this conversation as resolved.
Show resolved Hide resolved
);

tokio::pin!(stream);
let e = events.clone();
while let Some(event) = stream.next().await {
while let Some((_, event)) = stream.next().await {
_ = e.send(event);
}
Ok(())
Expand Down
Loading