Skip to content

Commit

Permalink
feat: add depth arg
Browse files Browse the repository at this point in the history
  • Loading branch information
vyloy committed May 26, 2024
1 parent 8cc08da commit ff9a214
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 59 deletions.
8 changes: 6 additions & 2 deletions bin/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use clap::Subcommand;
use env_logger::Env;
use log::{error, info};

use gt::manager::Signal;
use gt::*;
use gt::manager::Signal;

use crate::cs::{ClientArgs, ServerArgs};
use crate::manager::ManagerArgs;
Expand All @@ -33,9 +33,12 @@ struct Cli {
#[command(subcommand)]
command: Option<Commands>,

/// Path to the config file or the directory contains the config files
/// Path to the config file or the directory containing the config files
#[arg(short, long)]
config: Option<PathBuf>,
/// The maximum allowed depth of the subdirectory to be traversed to search config files
#[arg(long)]
depth: Option<u8>,
/// Send signal to the running GT processes
#[arg(short, long, value_enum)]
signal: Option<Signal>,
Expand Down Expand Up @@ -69,6 +72,7 @@ fn main() {
}
let mut manager_args = ManagerArgs {
config: cli.config,
depth: cli.depth,
server_args: None,
client_args: None,
};
Expand Down
110 changes: 53 additions & 57 deletions bin/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/

use std::{env, fs, future, io, process};
use std::collections::{BTreeMap, HashMap};
use std::ffi::OsStr;
use std::future::Future;
Expand All @@ -24,28 +25,28 @@ use std::os::unix::process::CommandExt;
use std::os::windows::process::CommandExt;
use std::path::PathBuf;
use std::process::Stdio;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use std::{env, fs, future, io, process};

use anyhow::{anyhow, Context, Error, Result};
use clap::ValueEnum;
use futures::future::{BoxFuture, FutureExt};
use log::{error, info, warn};
use notify::{ErrorKind, Event, PollWatcher, RecommendedWatcher, RecursiveMode, Watcher};
use serde::{de, ser, Deserialize, Serialize};
use serde::{de, Deserialize, ser, Serialize};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::process::{Child, ChildStdin, ChildStdout, Command};
use tokio::sync::{mpsc, Mutex, oneshot};
use tokio::sync::oneshot::{Receiver, Sender};
use tokio::sync::{mpsc, oneshot, Mutex};
use tokio::time::timeout;

use crate::cs::{ClientArgs, ServerArgs};

#[derive(Debug)]
pub struct ManagerArgs {
pub config: Option<PathBuf>,
pub depth: Option<u8>,
pub server_args: Option<ServerArgs>,
pub client_args: Option<ClientArgs>,
}
Expand Down Expand Up @@ -87,6 +88,46 @@ impl Manager {
}
}

fn collect_files(&self, path: PathBuf, depth: u8) -> io::Result<Vec<ProcessConfigEnum>> {
let mut files = vec![];
if path.is_dir() {
for entry in fs::read_dir(path)? {
let entry = entry?;
let path = entry.path();
if path.is_dir() {
let max = self.args.depth.unwrap_or(3);
if max != 0 && depth + 1 > max {
info!("{} is too deep", path.display());
continue;
}
files.append(self.collect_files(path, depth + 1)?.as_mut());
} else {
match path.extension().and_then(OsStr::to_str) {
Some("yaml") | Some("yml") => {}
None | Some(_) => {
info!(
"ignored file {} is not end with yml or yaml",
path.display()
);
continue;
}
}
let fm = entry.metadata()?.len();
if fm > 10 * 1024 * 1024 {
info!("ignored file {} is too large", path.display());
continue;
}
info!("collected file {}", path.display());
files.push(ProcessConfigEnum::Config(path));
}
}
} else {
info!("collected file {}", path.display());
files.push(ProcessConfigEnum::Config(path));
}
Ok(files)
}

async fn collect_configs(
&self,
) -> Result<(Vec<ProcessConfigEnum>, Option<Vec<ProcessConfigEnum>>)> {
Expand All @@ -100,7 +141,7 @@ impl Manager {
None => env::current_dir()?,
Some(path) => path.into(),
};
configs = collect_files(config.clone())?;
configs = self.collect_files(config.clone(), 1)?;
}
if configs.is_empty() {
return Err(anyhow!("no target found"));
Expand Down Expand Up @@ -324,9 +365,7 @@ impl Manager {
let reconnect = async {
let cmds = cmd_map.clone();
let config = config.clone();
if let Err(e) =
Self::sync_run(cmds, vec![config.clone()], sub_cmd).await
{
if let Err(e) = Self::sync_run(cmds, vec![config.clone()], sub_cmd).await {
error!("{sub_cmd} ({config:?}) reconnect sync_run failed: {:?}", e);
}
};
Expand Down Expand Up @@ -427,23 +466,15 @@ impl Manager {
}
}
if !server_config.is_empty() {
Self::run(
self.cmds.clone(),
server_config,
"sub-server",
)
.await
.context("run_server failed")?;
Self::run(self.cmds.clone(), server_config, "sub-server")
.await
.context("run_server failed")?;
}

if !client_config.is_empty() {
Self::run(
self.cmds.clone(),
client_config,
"sub-client",
)
.await
.context("run_client failed")?;
Self::run(self.cmds.clone(), client_config, "sub-client")
.await
.context("run_client failed")?;
}
Ok(())
}
Expand Down Expand Up @@ -759,41 +790,6 @@ pub fn send_signal(signal: Signal) -> Result<()> {
Ok(())
}

fn collect_files(path: PathBuf) -> io::Result<Vec<ProcessConfigEnum>> {
let mut files = vec![];
if path.is_dir() {
for entry in fs::read_dir(path)? {
let entry = entry?;
let path = entry.path();
if path.is_dir() {
collect_files(path)?;
} else {
let fm = entry.metadata()?.len();
if fm > 10 * 1024 * 1024 {
info!("ignored file {} is too large", path.display());
continue;
}
match path.extension().and_then(OsStr::to_str) {
Some("yaml") | Some("yml") => {}
None | Some(_) => {
info!(
"ignored file {} is not end with yml or yaml",
path.display()
);
continue;
}
}
info!("collected file {}", path.display());
files.push(ProcessConfigEnum::Config(path));
}
}
} else {
info!("collected file {}", path.display());
files.push(ProcessConfigEnum::Config(path));
}
Ok(files)
}

#[derive(Serialize, Deserialize, Debug)]
struct Config {
#[serde(rename = "type")]
Expand Down

0 comments on commit ff9a214

Please sign in to comment.