diff --git a/R/final_models.R b/R/final_models.R index cc97ab98..6823599f 100644 --- a/R/final_models.R +++ b/R/final_models.R @@ -619,6 +619,15 @@ final_models <- function(run_info, # clean up any parallel run process par_end(cl) + + # condense outputs into less files for larger runs + if(length(combo_list) > 100) { + cli::cli_progress_step("Condensing Forecasts") + + condense_data(run_info, + parallel_processing, + num_cores) + } # reconcile hierarchical forecasts if (forecast_approach != "bottoms_up") { @@ -633,13 +642,6 @@ final_models <- function(run_info, ) } - # condense outputs into less files for larger runs - if(length(combo_list) > 1000) { - condense_data(run_info, - parallel_processing, - num_cores) - } - # calculate weighted mape weighted_mape <- get_forecast_data(run_info = run_info) %>% dplyr::filter(Run_Type == "Back_Test", diff --git a/R/hierarchy.R b/R/hierarchy.R index da0ab062..72d87255 100644 --- a/R/hierarchy.R +++ b/R/hierarchy.R @@ -420,6 +420,20 @@ reconcile_hierarchical_data <- function(run_info, hts_nodes <- hts_list$nodes original_combo_list <- hts_list$original_combos hts_combo_list <- hts_list$hts_combos + + # check if data has been condensed + cond_path <- paste0( + run_info$path, "/forecasts/*", hash_data(run_info$experiment_name), "-", + hash_data(run_info$run_name), "*condensed", ".", run_info$data_output + ) + + condensed_files <- list_files(run_info$storage_object, fs::path(cond_path)) + + if (length(condensed_files) > 0) { + condensed <- TRUE + } else { + condensed <- FALSE + } # get unreconciled forecast data if (is.null(parallel_processing)) { @@ -430,10 +444,17 @@ reconcile_hierarchical_data <- function(run_info, return_type <- "df" } - fcst_path <- paste0( - "/forecasts/*", hash_data(run_info$experiment_name), "-", - hash_data(run_info$run_name), "*models", ".", run_info$data_output - ) + if(condensed) { + fcst_path <- paste0( + "/forecasts/*", hash_data(run_info$experiment_name), "-", + hash_data(run_info$run_name), "*condensed", ".", run_info$data_output + ) + } else { + fcst_path <- paste0( + "/forecasts/*", hash_data(run_info$experiment_name), "-", + hash_data(run_info$run_name), "*models", ".", run_info$data_output + ) + } unreconciled_tbl <- read_file(run_info, path = fcst_path, @@ -677,11 +698,6 @@ reconcile_hierarchical_data <- function(run_info, ) %>% dplyr::select(Combo, Date, Target) - fcst_path <- paste0( - "/forecasts/*", hash_data(run_info$experiment_name), "-", - hash_data(run_info$run_name), "*models", ".", run_info$data_output - ) - schema <- arrow::schema( arrow::field("Combo_ID", arrow::string()), arrow::field("Model_ID", arrow::string()), diff --git a/R/read_write_data.R b/R/read_write_data.R index a2999023..5ee304b0 100644 --- a/R/read_write_data.R +++ b/R/read_write_data.R @@ -86,21 +86,20 @@ get_forecast_data <- function(run_info, } # get forecast data - if(condensed) { - print("reading condensed files") + if(forecast_approach != "bottoms_up") { fcst_path <- paste0( "/forecasts/*", hash_data(run_info$experiment_name), "-", - hash_data(run_info$run_name), "*condensed", ".", run_info$data_output + hash_data(run_info$run_name), "*reconciled", ".", run_info$data_output ) - } else if (forecast_approach == "bottoms_up") { + } else if (condensed) { fcst_path <- paste0( "/forecasts/*", hash_data(run_info$experiment_name), "-", - hash_data(run_info$run_name), "*models", ".", run_info$data_output + hash_data(run_info$run_name), "*condensed", ".", run_info$data_output ) } else { fcst_path <- paste0( "/forecasts/*", hash_data(run_info$experiment_name), "-", - hash_data(run_info$run_name), "*reconciled", ".", run_info$data_output + hash_data(run_info$run_name), "*models", ".", run_info$data_output ) } @@ -716,7 +715,7 @@ condense_data <- function(run_info, list_of_batches <- list() # Define the batch size - batch_size <- 10000 + batch_size <- 100 # Calculate the number of batches needed num_batches <- ceiling(length(initial_file_list) / batch_size)