Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose the Progress interface through the HTTP/JSON API #1092

Merged
merged 7 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 7 additions & 48 deletions rust/agama-server/src/manager/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
//! * `manager_service` which returns the Axum service.
//! * `manager_stream` which offers an stream that emits the manager events coming from D-Bus.

use std::pin::Pin;

use agama_lib::{
error::ServiceError,
manager::{InstallationPhase, ManagerClient},
Expand All @@ -25,7 +27,7 @@ use tokio_stream::{Stream, StreamExt};
use crate::{
error::Error,
web::{
common::{progress_router, progress_stream, service_status_router, service_status_stream},
common::{progress_router, service_status_router},
Event,
},
};
Expand Down Expand Up @@ -63,55 +65,12 @@ pub struct InstallerStatus {

/// Returns a stream that emits manager related events coming from D-Bus.
///
/// It emits the Event::BusyServicesChanged and Event::InstallationPhaseChanged events.
/// It emits the Event::InstallationPhaseChanged event.
///
/// * `connection`: D-Bus connection to listen for events.
pub async fn manager_stream(dbus: zbus::Connection) -> Result<impl Stream<Item = Event>, Error> {
Ok(StreamExt::merge(
StreamExt::merge(
StreamExt::merge(
busy_services_changed_stream(dbus.clone()).await?,
installation_phase_changed_stream(dbus.clone()).await?,
),
service_status_stream(
dbus.clone(),
"org.opensuse.Agama.Manager1",
"/org/opensuse/Agama/Manager1",
)
.await?,
),
progress_stream(
dbus,
"org.opensuse.Agama.Manager1",
"/org/opensuse/Agama/Manager1",
)
.await,
))
}

pub async fn busy_services_changed_stream(
dbus: zbus::Connection,
) -> Result<impl Stream<Item = Event>, Error> {
let proxy = Manager1Proxy::new(&dbus).await?;
let stream = proxy
.receive_busy_services_changed()
.await
.then(|change| async move {
if let Ok(busy_services) = change.get().await {
Some(Event::BusyServicesChanged {
services: busy_services,
})
} else {
None
}
})
.filter_map(|e| e);
Ok(stream)
}

pub async fn installation_phase_changed_stream(
pub async fn manager_stream(
dbus: zbus::Connection,
) -> Result<impl Stream<Item = Event>, Error> {
) -> Result<Pin<Box<dyn Stream<Item = Event> + Send>>, Error> {
let proxy = Manager1Proxy::new(&dbus).await?;
let stream = proxy
.receive_current_installation_phase_changed()
Expand All @@ -130,7 +89,7 @@ pub async fn installation_phase_changed_stream(
}
})
.filter_map(|e| e);
Ok(stream)
Ok(Box::pin(stream))
}

/// Sets up and returns the axum service for the manager module
Expand Down
33 changes: 10 additions & 23 deletions rust/agama-server/src/software/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
use crate::{
error::Error,
web::{
common::{progress_router, progress_stream, service_status_router, service_status_stream},
common::{progress_router, service_status_router},
Event,
},
};
Expand All @@ -29,7 +29,7 @@ use axum::{
};
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::collections::HashMap;
use std::{collections::HashMap, pin::Pin};
use thiserror::Error;
use tokio_stream::{Stream, StreamExt};

Expand Down Expand Up @@ -65,27 +65,14 @@ impl IntoResponse for SoftwareError {
/// It emits the Event::ProductChanged and Event::PatternsChanged events.
///
/// * `connection`: D-Bus connection to listen for events.
pub async fn software_stream(dbus: zbus::Connection) -> Result<impl Stream<Item = Event>, Error> {
Ok(StreamExt::merge(
StreamExt::merge(
StreamExt::merge(
product_changed_stream(dbus.clone()).await?,
patterns_changed_stream(dbus.clone()).await?,
),
service_status_stream(
dbus.clone(),
"org.opensuse.Agama.Software1",
"/org/opensuse/Agama/Software1",
)
.await?,
),
progress_stream(
dbus,
"org.opensuse.Agama.Software1",
"/org/opensuse/Agama/Software1",
)
.await,
))
pub async fn software_stream(
dbus: zbus::Connection,
) -> Result<Pin<Box<dyn Stream<Item = Event> + Send>>, Error> {
let stream = StreamExt::merge(
product_changed_stream(dbus.clone()).await?,
patterns_changed_stream(dbus.clone()).await?,
);
Ok(Box::pin(stream))
}

async fn product_changed_stream(
Expand Down
49 changes: 44 additions & 5 deletions rust/agama-server/src/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::{
l10n::web::l10n_service,
manager::web::{manager_service, manager_stream},
software::web::{software_service, software_stream},
web::common::{progress_stream, service_status_stream},
};
use axum::Router;

Expand All @@ -29,7 +30,7 @@ pub use docs::ApiDoc;
pub use event::{Event, EventsReceiver, EventsSender};
pub use service::MainServiceBuilder;
use std::path::Path;
use tokio_stream::StreamExt;
use tokio_stream::{StreamExt, StreamMap};

/// Returns a service that implements the web-based Agama API.
///
Expand Down Expand Up @@ -72,13 +73,51 @@ pub async fn run_monitor(events: EventsSender) -> Result<(), ServiceError> {
/// * `connection`: D-Bus connection.
/// * `events`: channel to send the events to.
async fn run_events_monitor(dbus: zbus::Connection, events: EventsSender) -> Result<(), Error> {
let stream = StreamExt::merge(
manager_stream(dbus.clone()).await?,
software_stream(dbus).await?,
let mut stream = StreamMap::new();

stream.insert("manager", manager_stream(dbus.clone()).await?);
stream.insert(
"manager-status",
service_status_stream(
dbus.clone(),
"org.opensuse.Agama.Manager1",
"/org/opensuse/Agama/Manager1",
)
.await?,
);
stream.insert(
"manager-progress",
progress_stream(
dbus.clone(),
"org.opensuse.Agama.Manager1",
"/org/opensuse/Agama/Manager1",
)
.await,
);

stream.insert("software", software_stream(dbus.clone()).await?);
stream.insert(
"software-status",
service_status_stream(
dbus.clone(),
"org.opensuse.Agama.Software1",
"/org/opensuse/Agama/Software1",
)
.await?,
);
stream.insert(
"software-progress",
progress_stream(
dbus.clone(),
"org.opensuse.Agama.Software1",
"/org/opensuse/Agama/Software1",
)
.await,
imobachgs marked this conversation as resolved.
Show resolved Hide resolved
);

tokio::pin!(stream);
let e = events.clone();
while let Some(event) = stream.next().await {
while let Some((_, event)) = stream.next().await {
_ = e.send(event);
}
Ok(())
Expand Down
10 changes: 5 additions & 5 deletions rust/agama-server/src/web/common.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! This module defines functions to be used accross all services.

use std::task::Poll;
use std::{pin::Pin, task::Poll};

use agama_lib::{
error::ServiceError,
Expand Down Expand Up @@ -85,7 +85,7 @@ pub async fn service_status_stream(
dbus: zbus::Connection,
destination: &'static str,
path: &'static str,
) -> Result<impl Stream<Item = Event>, Error> {
) -> Result<Pin<Box<dyn Stream<Item = Event> + Send>>, Error> {
let proxy = build_service_status_proxy(&dbus, destination, path).await?;
let stream = proxy
.receive_current_changed()
Expand All @@ -101,7 +101,7 @@ pub async fn service_status_stream(
}
})
.filter_map(|e| e);
Ok(stream)
Ok(Box::pin(stream))
}

async fn build_service_status_proxy<'a>(
Expand Down Expand Up @@ -180,11 +180,11 @@ pub async fn progress_stream<'a>(
dbus: zbus::Connection,
destination: &'static str,
path: &'static str,
) -> ProgressStream<'a> {
) -> Pin<Box<impl Stream<Item = Event> + Send>> {
let proxy = build_progress_proxy(&dbus, destination, path)
.await
.unwrap();
ProgressStream::new(proxy).await
Box::pin(ProgressStream::new(proxy).await)
}

impl<'a> ProgressStream<'a> {
Expand Down