Skip to content

Commit

Permalink
condense data function
Browse files Browse the repository at this point in the history
  • Loading branch information
mitokic committed Jul 24, 2024
1 parent c3e5558 commit 5d10d7f
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 8 deletions.
2 changes: 1 addition & 1 deletion R/parallel_util.R
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ par_start <- function(run_info,
)

parallel_packages <- c(
"gtools", "hts", "magrittr", "methods", "base", "modeltime.resample",
"gtools", "hts", "magrittr", "methods", "base",
"plyr", "rsample"
)

Expand Down
103 changes: 97 additions & 6 deletions R/read_write_data.R
Original file line number Diff line number Diff line change
Expand Up @@ -499,21 +499,28 @@ download_file <- function(storage_object,
#'
#' @param run_info run info using the [set_run_info()] function
#' @param path file path
#' @param file_list files
#' @param return_type type of data output read
#' @param schema column schema for arrow::open_dataset()
#'
#' @return file read into memory
#' @noRd
read_file <- function(run_info,
path,
path = NULL,
file_list = NULL,
return_type = "df",
schema = NULL) {
folder <- fs::path_dir(path)
storage_object <- run_info$storage_object
initial_path <- run_info$path
file <- fs::path_file(path)

if(!is.null(path)) {
folder <- fs::path_dir(path)
initial_path <- run_info$path
file <- fs::path_file(path)
}

if (inherits(storage_object, c("blob_container", "ms_drive"))) {
if (!is.null(file_list)) {
files <- file_list
} else if (inherits(storage_object, c("blob_container", "ms_drive"))) {
download_file(storage_object, fs::path(initial_path, path), folder)
final_path <- fs::path(tempdir(), folder)
files <- list_files(NULL, fs::path(final_path, file))
Expand All @@ -524,7 +531,10 @@ read_file <- function(run_info,
files <- list_files(storage_object, fs::path(initial_path, path))
}

if (fs::path_ext(file) == "*") {
if(!is.null(file_list)) {
file_temp <- files[[1]]
file_ext <- fs::path_ext(file_temp)
} else if (fs::path_ext(file) == "*") {
file_temp <- files[[1]]
file_ext <- fs::path_ext(file_temp)
} else {
Expand Down Expand Up @@ -659,3 +669,84 @@ get_recipe_data <- function(run_info,

return(recipe_tbl)
}

#' Condense forecast output files into less files
#'
#' @param run_info run info using the [set_run_info()] function
#' @param parallel_processing type of parallel processing to run
#' @param num_cores number of cores to use
#'
#' @return nothing
#' @noRd
condense_data <- function(run_info,
parallel_processing = NULL,
num_cores = NULL) {

# get initial list of files to condense
initial_file_list <- list_files(
run_info$storage_object,
paste0(
run_info$path, "/forecasts/*", hash_data(run_info$experiment_name), "-",
hash_data(run_info$run_name), "*_models.", run_info$data_output
)
)

# Initialize an empty list to store the batches
list_of_batches <- list()

# Define the batch size
batch_size <- 10000

# Calculate the number of batches needed
num_batches <- ceiling(length(initial_file_list) / batch_size)

# Loop through the large list and create batches
for (i in 1:num_batches) {
start_index <- (i - 1) * batch_size + 1
end_index <- min(i * batch_size, length(initial_file_list))
batch_name <- paste0("batch_", i)
list_of_batches[[batch_name]] <- initial_file_list[start_index:end_index]
}

# parallel run info
par_info <- par_start(
run_info = run_info,
parallel_processing = parallel_processing,
num_cores = min(length(names(list_of_batches)), num_cores),
task_length = length(names(list_of_batches))
)

cl <- par_info$cl
packages <- par_info$packages
`%op%` <- par_info$foreach_operator

# submit tasks
condense_data_tbl <- foreach::foreach(
batch = names(list_of_batches),
.combine = "rbind",
.packages = packages,
.errorhandling = "stop",
.verbose = FALSE,
.inorder = FALSE,
.multicombine = TRUE,
.noexport = NULL
) %op%
{
files <- list_of_batches[[batch]]

data <- read_file(run_info,
file_list = files,
return_type = "df")

write_data(
x = data,
combo = batch,
run_info = run_info,
output_type = "data",
folder = "forecasts",
suffix = "-condensed"
)

return(batch)
}
}
3 changes: 2 additions & 1 deletion man/final_models.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 5d10d7f

Please sign in to comment.