diff --git a/R/parallel_util.R b/R/parallel_util.R index 751ccad1..3746d971 100644 --- a/R/parallel_util.R +++ b/R/parallel_util.R @@ -34,7 +34,7 @@ par_start <- function(run_info, ) parallel_packages <- c( - "gtools", "hts", "magrittr", "methods", "base", "modeltime.resample", + "gtools", "hts", "magrittr", "methods", "base", "plyr", "rsample" ) diff --git a/R/read_write_data.R b/R/read_write_data.R index 34e0c2d3..36e68e2b 100644 --- a/R/read_write_data.R +++ b/R/read_write_data.R @@ -499,21 +499,28 @@ download_file <- function(storage_object, #' #' @param run_info run info using the [set_run_info()] function #' @param path file path +#' @param file_list files #' @param return_type type of data output read #' @param schema column schema for arrow::open_dataset() #' #' @return file read into memory #' @noRd read_file <- function(run_info, - path, + path = NULL, + file_list = NULL, return_type = "df", schema = NULL) { - folder <- fs::path_dir(path) storage_object <- run_info$storage_object - initial_path <- run_info$path - file <- fs::path_file(path) + + if(!is.null(path)) { + folder <- fs::path_dir(path) + initial_path <- run_info$path + file <- fs::path_file(path) + } - if (inherits(storage_object, c("blob_container", "ms_drive"))) { + if (!is.null(file_list)) { + files <- file_list + } else if (inherits(storage_object, c("blob_container", "ms_drive"))) { download_file(storage_object, fs::path(initial_path, path), folder) final_path <- fs::path(tempdir(), folder) files <- list_files(NULL, fs::path(final_path, file)) @@ -524,7 +531,10 @@ read_file <- function(run_info, files <- list_files(storage_object, fs::path(initial_path, path)) } - if (fs::path_ext(file) == "*") { + if(!is.null(file_list)) { + file_temp <- files[[1]] + file_ext <- fs::path_ext(file_temp) + } else if (fs::path_ext(file) == "*") { file_temp <- files[[1]] file_ext <- fs::path_ext(file_temp) } else { @@ -659,3 +669,84 @@ get_recipe_data <- function(run_info, return(recipe_tbl) } + +#' Condense forecast output files into less files +#' +#' @param run_info run info using the [set_run_info()] function +#' @param parallel_processing type of parallel processing to run +#' @param num_cores number of cores to use +#' +#' @return nothing +#' @noRd +condense_data <- function(run_info, + parallel_processing = NULL, + num_cores = NULL) { + + # get initial list of files to condense + initial_file_list <- list_files( + run_info$storage_object, + paste0( + run_info$path, "/forecasts/*", hash_data(run_info$experiment_name), "-", + hash_data(run_info$run_name), "*_models.", run_info$data_output + ) + ) + + # Initialize an empty list to store the batches + list_of_batches <- list() + + # Define the batch size + batch_size <- 10000 + + # Calculate the number of batches needed + num_batches <- ceiling(length(initial_file_list) / batch_size) + + # Loop through the large list and create batches + for (i in 1:num_batches) { + start_index <- (i - 1) * batch_size + 1 + end_index <- min(i * batch_size, length(initial_file_list)) + batch_name <- paste0("batch_", i) + list_of_batches[[batch_name]] <- initial_file_list[start_index:end_index] + } + + # parallel run info + par_info <- par_start( + run_info = run_info, + parallel_processing = parallel_processing, + num_cores = min(length(names(list_of_batches)), num_cores), + task_length = length(names(list_of_batches)) + ) + + cl <- par_info$cl + packages <- par_info$packages + `%op%` <- par_info$foreach_operator + + # submit tasks + condense_data_tbl <- foreach::foreach( + batch = names(list_of_batches), + .combine = "rbind", + .packages = packages, + .errorhandling = "stop", + .verbose = FALSE, + .inorder = FALSE, + .multicombine = TRUE, + .noexport = NULL + ) %op% + { + files <- list_of_batches[[batch]] + + data <- read_file(run_info, + file_list = files, + return_type = "df") + + write_data( + x = data, + combo = batch, + run_info = run_info, + output_type = "data", + folder = "forecasts", + suffix = "-condensed" + ) + + return(batch) + } +} diff --git a/man/final_models.Rd b/man/final_models.Rd index d6c86402..e6dcaaae 100644 --- a/man/final_models.Rd +++ b/man/final_models.Rd @@ -17,7 +17,8 @@ final_models( \arguments{ \item{run_info}{run info using the \code{\link[=set_run_info]{set_run_info()}} function.} -\item{average_models}{If TRUE, create simple averages of individual models.} +\item{average_models}{If TRUE, create simple averages of individual models +and save the most accurate one.} \item{max_model_average}{Max number of models to average together. Will create model averages for 2 models up until input value or max number of