Skip to content

Commit

Permalink
feat: spawn docker exec commands into own thread
Browse files Browse the repository at this point in the history
  • Loading branch information
mrjackwills committed Jan 18, 2023
1 parent 9dcd050 commit 9ec43e1
Showing 1 changed file with 86 additions and 83 deletions.
169 changes: 86 additions & 83 deletions src/docker_data/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use bollard::{
service::ContainerSummary,
Docker,
};
use futures_util::{Future, StreamExt};
use futures_util::StreamExt;
use parking_lot::Mutex;
use std::{
collections::HashMap,
Expand Down Expand Up @@ -33,7 +33,7 @@ enum SpawnId {

/// Cpu & Mem stats take twice as long as the update interval to get a value, so will have two being executed at the same time
/// SpawnId::Stats takes container_id and binate value to enable both cycles of the same container_id to be inserted into the hashmap
/// Binate value is toggled when all join handles have been spawned off
/// Binate value is toggled when all handles have been spawned off
/// Also effectively means that if the docker_update interval minimum will be 1000ms
#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)]
enum Binate {
Expand All @@ -56,7 +56,6 @@ pub struct DockerData {
binate: Binate,
docker: Arc<Docker>,
gui_state: Arc<Mutex<GuiState>>,
initialised: bool,
is_running: Arc<AtomicBool>,
receiver: Receiver<DockerMessage>,
spawns: Arc<Mutex<HashMap<SpawnId, JoinHandle<()>>>>,
Expand Down Expand Up @@ -94,12 +93,12 @@ impl DockerData {
/// don't take &self, so that can tokio::spawn into it's own thread
/// remove if from spawns hashmap when complete
async fn update_container_stat(
app_data: Arc<Mutex<AppData>>,
docker: Arc<Docker>,
id: ContainerId,
app_data: Arc<Mutex<AppData>>,
is_running: bool,
spawns: Arc<Mutex<HashMap<SpawnId, JoinHandle<()>>>>,
spawn_id: SpawnId,
spawns: Arc<Mutex<HashMap<SpawnId, JoinHandle<()>>>>,
) {
let mut stream = docker
.stats(
Expand Down Expand Up @@ -156,18 +155,18 @@ impl DockerData {
let docker = Arc::clone(&self.docker);
let app_data = Arc::clone(&self.app_data);
let spawns = Arc::clone(&self.spawns);
let spawn_key = SpawnId::Stats((id.clone(), self.binate));
let spawn_id = SpawnId::Stats((id.clone(), self.binate));
self.spawns
.lock()
.entry(spawn_key.clone())
.entry(spawn_id.clone())
.or_insert_with(|| {
tokio::spawn(Self::update_container_stat(
app_data,
docker,
id.clone(),
app_data,
*is_running,
spawn_id,
spawns,
spawn_key,
))
});
}
Expand Down Expand Up @@ -223,36 +222,32 @@ impl DockerData {
}

/// Update single container logs
/// don't take &self, so that can tokio::spawn into it's own thread
/// remove if from spawns hashmap when complete
/// remove it from spawns hashmap when complete
async fn update_log(
app_data: Arc<Mutex<AppData>>,
docker: Arc<Docker>,
id: ContainerId,
timestamps: bool,
since: u64,
app_data: Arc<Mutex<AppData>>,
spawns: Arc<Mutex<HashMap<SpawnId, JoinHandle<()>>>>,
) {
let options = Some(LogsOptions::<String> {
stdout: true,
timestamps,
timestamps: true,
since: i64::try_from(since).unwrap_or_default(),
..Default::default()
});

let mut logs = docker.logs(id.get(), options);
let mut output = vec![];

while let Some(value) = logs.next().await {
if let Ok(data) = value {
let log_string = data.to_string();
if !log_string.trim().is_empty() {
output.push(log_string);
}
while let Some(Ok(value)) = logs.next().await {
let data = value.to_string();
if !data.trim().is_empty() {
output.push(data);
}
}
spawns.lock().remove(&SpawnId::Log(id.clone()));
app_data.lock().update_log_by_id(&output, &id);
app_data.lock().update_log_by_id(output, &id);
}

/// Update all logs, spawn each container into own tokio::spawn thread
Expand All @@ -264,14 +259,7 @@ impl DockerData {
let key = SpawnId::Log(id.clone());
self.spawns.lock().insert(
key,
tokio::spawn(Self::update_log(
docker,
id.clone(),
self.args.timestamp,
0,
app_data,
spawns,
)),
tokio::spawn(Self::update_log(app_data, docker, id.clone(), 0, spawns)),
);
}
}
Expand All @@ -290,11 +278,10 @@ impl DockerData {
let app_data = Arc::clone(&self.app_data);
let spawns = Arc::clone(&self.spawns);
tokio::spawn(Self::update_log(
app_data,
docker,
container.id.clone(),
self.args.timestamp,
container.last_updated,
app_data,
spawns,
))
});
Expand All @@ -305,8 +292,8 @@ impl DockerData {
}

/// Animate the loading icon
async fn loading_spin(&mut self, loading_uuid: Uuid) -> JoinHandle<()> {
let gui_state = Arc::clone(&self.gui_state);
async fn loading_spin(loading_uuid: Uuid, gui_state: &Arc<Mutex<GuiState>>) -> JoinHandle<()> {
let gui_state = Arc::clone(&gui_state);
tokio::spawn(async move {
loop {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
Expand All @@ -316,89 +303,106 @@ impl DockerData {
}

/// Stop the loading_spin function, and reset gui loading status
fn stop_loading_spin(&mut self, handle: &JoinHandle<()>, loading_uuid: Uuid) {
fn stop_loading_spin(
gui_state: &Arc<Mutex<GuiState>>,
handle: &JoinHandle<()>,
loading_uuid: Uuid,
) {
handle.abort();
self.gui_state.lock().remove_loading(loading_uuid);
gui_state.lock().remove_loading(loading_uuid);
}

/// Initialize docker container data, before any messages are received
async fn initialise_container_data(&mut self) {
self.gui_state.lock().status_push(Status::Init);
let loading_uuid = Uuid::new_v4();
let loading_spin = self.loading_spin(loading_uuid).await;
let loading_spin = Self::loading_spin(loading_uuid, &Arc::clone(&self.gui_state)).await;

let all_ids = self.update_all_containers().await;

self.update_all_container_stats(&all_ids);

// Maybe only do a single one at first?
self.init_all_logs(&all_ids);

if all_ids.is_empty() {
self.initialised = true;
}

// wait until all logs have initialised
while !self.initialised {
while !self.app_data.lock().initialised(&all_ids) {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
self.initialised = self.app_data.lock().initialised(&all_ids);
}
self.gui_state.lock().status_del(Status::Init);
self.stop_loading_spin(&loading_spin, loading_uuid);
Self::stop_loading_spin(&self.gui_state, &loading_spin, loading_uuid);
}

/// Set the global error as the docker error, and set gui_state to error
fn set_error(&mut self, error: DockerControls) {
self.app_data
.lock()
.set_error(AppError::DockerCommand(error));
self.gui_state.lock().status_push(Status::Error);
}

/// Execute a docker command, will start and stop the loading spinner, and set correct error
async fn exec_docker(
&mut self,
docker_fn: impl Future<Output = Result<(), bollard::errors::Error>> + Send,
control: DockerControls,
fn set_error(
app_data: &Arc<Mutex<AppData>>,
error: DockerControls,
gui_state: &Arc<Mutex<GuiState>>,
) {
let uuid = Uuid::new_v4();
let loading_spin = self.loading_spin(uuid).await;
if docker_fn.await.is_err() {
self.set_error(control);
};
self.stop_loading_spin(&loading_spin, uuid);
app_data.lock().set_error(AppError::DockerCommand(error));
gui_state.lock().status_push(Status::Error);
}

/// Handle incoming messages, container controls & all container information update
/// Spawn dowcker commands off into own thread
async fn message_handler(&mut self) {
while let Some(message) = self.receiver.recv().await {
let docker = Arc::clone(&self.docker);
let gui_state = Arc::clone(&self.gui_state);
let app_data = Arc::clone(&self.app_data);
let uuid = Uuid::new_v4();
match message {
DockerMessage::Pause(id) => {
self.exec_docker(docker.pause_container(id.get()), DockerControls::Pause)
.await;
tokio::spawn(async move {
let loading_spin = Self::loading_spin(uuid, &gui_state).await;
if docker.pause_container(id.get()).await.is_err() {
Self::set_error(&app_data, DockerControls::Pause, &gui_state);
}
Self::stop_loading_spin(&gui_state, &loading_spin, uuid);
});
self.update_everything().await;
}
DockerMessage::Restart(id) => {
self.exec_docker(
docker.restart_container(id.get(), None),
DockerControls::Restart,
)
.await;
tokio::spawn(async move {
let loading_spin = Self::loading_spin(uuid, &gui_state).await;
if docker.restart_container(id.get(), None).await.is_err() {
Self::set_error(&app_data, DockerControls::Restart, &gui_state);
}
Self::stop_loading_spin(&gui_state, &loading_spin, uuid);
});
self.update_everything().await;
}
DockerMessage::Start(id) => {
self.exec_docker(
docker.start_container(id.get(), None::<StartContainerOptions<String>>),
DockerControls::Start,
)
.await;
tokio::spawn(async move {
let loading_spin = Self::loading_spin(uuid, &gui_state).await;
if docker
.start_container(id.get(), None::<StartContainerOptions<String>>)
.await
.is_err()
{
Self::set_error(&app_data, DockerControls::Start, &gui_state);
}
Self::stop_loading_spin(&gui_state, &loading_spin, uuid);
});
self.update_everything().await;
}
DockerMessage::Stop(id) => {
self.exec_docker(docker.stop_container(id.get(), None), DockerControls::Stop)
.await;
tokio::spawn(async move {
let loading_spin = Self::loading_spin(uuid, &gui_state).await;
if docker.stop_container(id.get(), None).await.is_err() {
Self::set_error(&app_data, DockerControls::Stop, &gui_state);
}
Self::stop_loading_spin(&gui_state, &loading_spin, uuid);
});
self.update_everything().await;
}
DockerMessage::Unpause(id) => {
self.exec_docker(docker.unpause_container(id.get()), DockerControls::Unpause)
.await;
tokio::spawn(async move {
let loading_spin = Self::loading_spin(uuid, &gui_state).await;
if docker.unpause_container(id.get()).await.is_err() {
Self::set_error(&app_data, DockerControls::Unpause, &gui_state);
}
Self::stop_loading_spin(&gui_state, &loading_spin, uuid);
});
self.update_everything().await;
}
DockerMessage::Update => self.update_everything().await,
Expand All @@ -418,22 +422,21 @@ impl DockerData {
pub async fn init(
app_data: Arc<Mutex<AppData>>,
docker: Docker,
docker_rx: Receiver<DockerMessage>,
gui_state: Arc<Mutex<GuiState>>,
receiver: Receiver<DockerMessage>,
is_running: Arc<AtomicBool>,
) {
let args = app_data.lock().args;
if app_data.lock().get_error().is_none() {
let mut inner = Self {
app_data,
args,
binate: Binate::One,
docker: Arc::new(docker),
gui_state,
initialised: false,
receiver,
spawns: Arc::new(Mutex::new(HashMap::new())),
is_running,
binate: Binate::One,
receiver: docker_rx,
spawns: Arc::new(Mutex::new(HashMap::new())),
};
inner.initialise_container_data().await;

Expand Down

0 comments on commit 9ec43e1

Please sign in to comment.