diff --git a/src/docker_data/mod.rs b/src/docker_data/mod.rs index 5daa6c7..c700654 100644 --- a/src/docker_data/mod.rs +++ b/src/docker_data/mod.rs @@ -3,7 +3,7 @@ use bollard::{ service::ContainerSummary, Docker, }; -use futures_util::{Future, StreamExt}; +use futures_util::StreamExt; use parking_lot::Mutex; use std::{ collections::HashMap, @@ -33,7 +33,7 @@ enum SpawnId { /// Cpu & Mem stats take twice as long as the update interval to get a value, so will have two being executed at the same time /// SpawnId::Stats takes container_id and binate value to enable both cycles of the same container_id to be inserted into the hashmap -/// Binate value is toggled when all join handles have been spawned off +/// Binate value is toggled when all handles have been spawned off /// Also effectively means that if the docker_update interval minimum will be 1000ms #[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)] enum Binate { @@ -56,7 +56,6 @@ pub struct DockerData { binate: Binate, docker: Arc, gui_state: Arc>, - initialised: bool, is_running: Arc, receiver: Receiver, spawns: Arc>>>, @@ -94,12 +93,12 @@ impl DockerData { /// don't take &self, so that can tokio::spawn into it's own thread /// remove if from spawns hashmap when complete async fn update_container_stat( + app_data: Arc>, docker: Arc, id: ContainerId, - app_data: Arc>, is_running: bool, - spawns: Arc>>>, spawn_id: SpawnId, + spawns: Arc>>>, ) { let mut stream = docker .stats( @@ -156,18 +155,18 @@ impl DockerData { let docker = Arc::clone(&self.docker); let app_data = Arc::clone(&self.app_data); let spawns = Arc::clone(&self.spawns); - let spawn_key = SpawnId::Stats((id.clone(), self.binate)); + let spawn_id = SpawnId::Stats((id.clone(), self.binate)); self.spawns .lock() - .entry(spawn_key.clone()) + .entry(spawn_id.clone()) .or_insert_with(|| { tokio::spawn(Self::update_container_stat( + app_data, docker, id.clone(), - app_data, *is_running, + spawn_id, spawns, - spawn_key, )) }); } @@ -223,19 +222,17 @@ impl DockerData { } /// Update single container logs - /// don't take &self, so that can tokio::spawn into it's own thread - /// remove if from spawns hashmap when complete + /// remove it from spawns hashmap when complete async fn update_log( + app_data: Arc>, docker: Arc, id: ContainerId, - timestamps: bool, since: u64, - app_data: Arc>, spawns: Arc>>>, ) { let options = Some(LogsOptions:: { stdout: true, - timestamps, + timestamps: true, since: i64::try_from(since).unwrap_or_default(), ..Default::default() }); @@ -243,16 +240,14 @@ impl DockerData { let mut logs = docker.logs(id.get(), options); let mut output = vec![]; - while let Some(value) = logs.next().await { - if let Ok(data) = value { - let log_string = data.to_string(); - if !log_string.trim().is_empty() { - output.push(log_string); - } + while let Some(Ok(value)) = logs.next().await { + let data = value.to_string(); + if !data.trim().is_empty() { + output.push(data); } } spawns.lock().remove(&SpawnId::Log(id.clone())); - app_data.lock().update_log_by_id(&output, &id); + app_data.lock().update_log_by_id(output, &id); } /// Update all logs, spawn each container into own tokio::spawn thread @@ -264,14 +259,7 @@ impl DockerData { let key = SpawnId::Log(id.clone()); self.spawns.lock().insert( key, - tokio::spawn(Self::update_log( - docker, - id.clone(), - self.args.timestamp, - 0, - app_data, - spawns, - )), + tokio::spawn(Self::update_log(app_data, docker, id.clone(), 0, spawns)), ); } } @@ -290,11 +278,10 @@ impl DockerData { let app_data = Arc::clone(&self.app_data); let spawns = Arc::clone(&self.spawns); tokio::spawn(Self::update_log( + app_data, docker, container.id.clone(), - self.args.timestamp, container.last_updated, - app_data, spawns, )) }); @@ -305,8 +292,8 @@ impl DockerData { } /// Animate the loading icon - async fn loading_spin(&mut self, loading_uuid: Uuid) -> JoinHandle<()> { - let gui_state = Arc::clone(&self.gui_state); + async fn loading_spin(loading_uuid: Uuid, gui_state: &Arc>) -> JoinHandle<()> { + let gui_state = Arc::clone(&gui_state); tokio::spawn(async move { loop { tokio::time::sleep(std::time::Duration::from_millis(100)).await; @@ -316,89 +303,106 @@ impl DockerData { } /// Stop the loading_spin function, and reset gui loading status - fn stop_loading_spin(&mut self, handle: &JoinHandle<()>, loading_uuid: Uuid) { + fn stop_loading_spin( + gui_state: &Arc>, + handle: &JoinHandle<()>, + loading_uuid: Uuid, + ) { handle.abort(); - self.gui_state.lock().remove_loading(loading_uuid); + gui_state.lock().remove_loading(loading_uuid); } /// Initialize docker container data, before any messages are received async fn initialise_container_data(&mut self) { self.gui_state.lock().status_push(Status::Init); let loading_uuid = Uuid::new_v4(); - let loading_spin = self.loading_spin(loading_uuid).await; + let loading_spin = Self::loading_spin(loading_uuid, &Arc::clone(&self.gui_state)).await; let all_ids = self.update_all_containers().await; self.update_all_container_stats(&all_ids); - // Maybe only do a single one at first? self.init_all_logs(&all_ids); - if all_ids.is_empty() { - self.initialised = true; - } - // wait until all logs have initialised - while !self.initialised { + while !self.app_data.lock().initialised(&all_ids) { tokio::time::sleep(std::time::Duration::from_millis(100)).await; - self.initialised = self.app_data.lock().initialised(&all_ids); } self.gui_state.lock().status_del(Status::Init); - self.stop_loading_spin(&loading_spin, loading_uuid); + Self::stop_loading_spin(&self.gui_state, &loading_spin, loading_uuid); } /// Set the global error as the docker error, and set gui_state to error - fn set_error(&mut self, error: DockerControls) { - self.app_data - .lock() - .set_error(AppError::DockerCommand(error)); - self.gui_state.lock().status_push(Status::Error); - } - - /// Execute a docker command, will start and stop the loading spinner, and set correct error - async fn exec_docker( - &mut self, - docker_fn: impl Future> + Send, - control: DockerControls, + fn set_error( + app_data: &Arc>, + error: DockerControls, + gui_state: &Arc>, ) { - let uuid = Uuid::new_v4(); - let loading_spin = self.loading_spin(uuid).await; - if docker_fn.await.is_err() { - self.set_error(control); - }; - self.stop_loading_spin(&loading_spin, uuid); + app_data.lock().set_error(AppError::DockerCommand(error)); + gui_state.lock().status_push(Status::Error); } /// Handle incoming messages, container controls & all container information update + /// Spawn dowcker commands off into own thread async fn message_handler(&mut self) { while let Some(message) = self.receiver.recv().await { let docker = Arc::clone(&self.docker); + let gui_state = Arc::clone(&self.gui_state); + let app_data = Arc::clone(&self.app_data); + let uuid = Uuid::new_v4(); match message { DockerMessage::Pause(id) => { - self.exec_docker(docker.pause_container(id.get()), DockerControls::Pause) - .await; + tokio::spawn(async move { + let loading_spin = Self::loading_spin(uuid, &gui_state).await; + if docker.pause_container(id.get()).await.is_err() { + Self::set_error(&app_data, DockerControls::Pause, &gui_state); + } + Self::stop_loading_spin(&gui_state, &loading_spin, uuid); + }); + self.update_everything().await; } DockerMessage::Restart(id) => { - self.exec_docker( - docker.restart_container(id.get(), None), - DockerControls::Restart, - ) - .await; + tokio::spawn(async move { + let loading_spin = Self::loading_spin(uuid, &gui_state).await; + if docker.restart_container(id.get(), None).await.is_err() { + Self::set_error(&app_data, DockerControls::Restart, &gui_state); + } + Self::stop_loading_spin(&gui_state, &loading_spin, uuid); + }); + self.update_everything().await; } DockerMessage::Start(id) => { - self.exec_docker( - docker.start_container(id.get(), None::>), - DockerControls::Start, - ) - .await; + tokio::spawn(async move { + let loading_spin = Self::loading_spin(uuid, &gui_state).await; + if docker + .start_container(id.get(), None::>) + .await + .is_err() + { + Self::set_error(&app_data, DockerControls::Start, &gui_state); + } + Self::stop_loading_spin(&gui_state, &loading_spin, uuid); + }); + self.update_everything().await; } DockerMessage::Stop(id) => { - self.exec_docker(docker.stop_container(id.get(), None), DockerControls::Stop) - .await; + tokio::spawn(async move { + let loading_spin = Self::loading_spin(uuid, &gui_state).await; + if docker.stop_container(id.get(), None).await.is_err() { + Self::set_error(&app_data, DockerControls::Stop, &gui_state); + } + Self::stop_loading_spin(&gui_state, &loading_spin, uuid); + }); + self.update_everything().await; } DockerMessage::Unpause(id) => { - self.exec_docker(docker.unpause_container(id.get()), DockerControls::Unpause) - .await; + tokio::spawn(async move { + let loading_spin = Self::loading_spin(uuid, &gui_state).await; + if docker.unpause_container(id.get()).await.is_err() { + Self::set_error(&app_data, DockerControls::Unpause, &gui_state); + } + Self::stop_loading_spin(&gui_state, &loading_spin, uuid); + }); self.update_everything().await; } DockerMessage::Update => self.update_everything().await, @@ -418,8 +422,8 @@ impl DockerData { pub async fn init( app_data: Arc>, docker: Docker, + docker_rx: Receiver, gui_state: Arc>, - receiver: Receiver, is_running: Arc, ) { let args = app_data.lock().args; @@ -427,13 +431,12 @@ impl DockerData { let mut inner = Self { app_data, args, + binate: Binate::One, docker: Arc::new(docker), gui_state, - initialised: false, - receiver, - spawns: Arc::new(Mutex::new(HashMap::new())), is_running, - binate: Binate::One, + receiver: docker_rx, + spawns: Arc::new(Mutex::new(HashMap::new())), }; inner.initialise_container_data().await;