diff --git a/DESCRIPTION b/DESCRIPTION index 0f210549..9354060e 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,6 +1,6 @@ Package: finnts Title: Microsoft Finance Time Series Forecasting Framework -Version: 0.4.0.9004 +Version: 0.4.0.9005 Authors@R: c(person(given = "Mike", family = "Tokic", @@ -24,7 +24,7 @@ License: MIT + file LICENSE Encoding: UTF-8 LazyData: true Roxygen: list(markdown = TRUE) -RoxygenNote: 7.1.1 +RoxygenNote: 7.3.1 Imports: cli, Cubist, diff --git a/NEWS.md b/NEWS.md index 444a0ad6..ee1b746b 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,10 +1,13 @@ -# finnts 0.4.0.9004 (DEVELOPMENT VERSION) +# finnts 0.4.0.9005 (DEVELOPMENT VERSION) ## Improvements - Added support for hierarchical forecasting with external regressors - Allow global models for hierarchical forecasts - Multistep horizon forecasts for R1 recipe, listed as `multistep_horizon` within `prep_data()` +- Always save the most accurate model average, regardless if selected as best model. This allows for improved scaling with large data sets. +- Automatically condense large forecasts (+10k time series) into smaller amount of files to make it easier to read forecast outputs +- Improved weighted MAPE calculation across all time series ## Bug Fixes diff --git a/R/ensemble_models.R b/R/ensemble_models.R index df4eba6b..e39b5270 100644 --- a/R/ensemble_models.R +++ b/R/ensemble_models.R @@ -181,27 +181,29 @@ ensemble_models <- function(run_info, # model forecasts single_model_tbl <- NULL if (run_local_models) { - suppressWarnings(try(single_model_tbl <- read_file(run_info, - path = paste0( - "/forecasts/", hash_data(run_info$experiment_name), "-", hash_data(run_info$run_name), - "-", combo, "-single_models.", run_info$data_output + suppressWarnings(try( + single_model_tbl <- read_file(run_info, + path = paste0( + "/forecasts/", hash_data(run_info$experiment_name), "-", hash_data(run_info$run_name), + "-", combo, "-single_models.", run_info$data_output + ), + return_type = "df" ), - return_type = "df" - ), - silent = TRUE + silent = TRUE )) } global_model_tbl <- NULL if (run_global_models) { - suppressWarnings(try(global_model_tbl <- read_file(run_info, - path = paste0( - "/forecasts/", hash_data(run_info$experiment_name), "-", hash_data(run_info$run_name), - "-", combo, "-global_models.", run_info$data_output + suppressWarnings(try( + global_model_tbl <- read_file(run_info, + path = paste0( + "/forecasts/", hash_data(run_info$experiment_name), "-", hash_data(run_info$run_name), + "-", combo, "-global_models.", run_info$data_output + ), + return_type = "df" ), - return_type = "df" - ), - silent = TRUE + silent = TRUE )) } @@ -336,7 +338,6 @@ ensemble_models <- function(run_info, .multicombine = TRUE, .noexport = NULL ) %do% { - # get initial run info model <- model_run %>% dplyr::pull(Model_Name) diff --git a/R/feature_selection.R b/R/feature_selection.R index d93c790f..67f828f6 100644 --- a/R/feature_selection.R +++ b/R/feature_selection.R @@ -23,10 +23,8 @@ run_feature_selection <- function(input_data, forecast_horizon, external_regressors, multistep_horizon = FALSE) { - # check for more than one unique target value if (input_data %>% tidyr::drop_na(Target) %>% dplyr::pull(Target) %>% unique() %>% length() < 2) { - # just return the date features fs_list <- input_data %>% dplyr::select(tidyselect::contains("Date")) @@ -83,7 +81,6 @@ run_feature_selection <- function(input_data, # run feature selection if (date_type %in% c("day", "week")) { - # number of votes needed for feature to be selected votes_needed <- 3 @@ -410,7 +407,6 @@ lofo_fn <- function(run_info, parallel_processing, pca = FALSE, seed = 123) { - # parallel run info par_info <- par_start( run_info = run_info, diff --git a/R/final_models.R b/R/final_models.R index 68510c0c..c8cac696 100644 --- a/R/final_models.R +++ b/R/final_models.R @@ -3,7 +3,8 @@ #' Select Best Models and Prep Final Outputs #' #' @param run_info run info using the [set_run_info()] function. -#' @param average_models If TRUE, create simple averages of individual models. +#' @param average_models If TRUE, create simple averages of individual models +#' and save the most accurate one. #' @param max_model_average Max number of models to average together. Will #' create model averages for 2 models up until input value or max number of #' models ran. @@ -124,7 +125,8 @@ final_models <- function(run_info, current_combo_list_final <- setdiff( current_combo_list, prev_combo_list - ) + ) %>% + sample() prev_log_df <- read_file(run_info, path = paste0("logs/", hash_data(run_info$experiment_name), "-", hash_data(run_info$run_name), ".csv"), @@ -138,8 +140,7 @@ final_models <- function(run_info, run_local_models <- prev_log_df$run_local_models run_ensemble_models <- prev_log_df$run_ensemble_models - if ((length(current_combo_list_final) == 0 & length(prev_combo_list) > 0) | sum(colnames(prev_log_df) %in% "weighted_mape")) { - + if (sum(colnames(prev_log_df) %in% "weighted_mape")) { # check if input values have changed current_log_df <- tibble::tibble( average_models = average_models, @@ -175,7 +176,7 @@ final_models <- function(run_info, # submit tasks best_model_tbl <- foreach::foreach( - x = current_combo_list, + x = current_combo_list_final, .combine = "rbind", .packages = packages, .errorhandling = "stop", @@ -262,31 +263,7 @@ final_models <- function(run_info, # check if model averaging already happened if ("Best_Model" %in% colnames(local_model_tbl %>% rbind(global_model_tbl))) { - # see if average models file exists and add to model tbl - average_model_tbl <- tryCatch( - { - read_file(run_info, - path = paste0( - "/forecasts/", hash_data(run_info$experiment_name), "-", hash_data(run_info$run_name), - "-", combo, "-average_models.", run_info$data_output - ), - return_type = "df" - ) - }, - warning = function(w) { - # do nothing - }, - error = function(e) { - NULL - } - ) - - local_model_tbl <- local_model_tbl %>% - rbind(average_model_tbl) - - best_model_check <- TRUE - } else { - best_model_check <- FALSE + return(data.frame(Combo_Hash = combo)) } # combine all forecasts @@ -315,8 +292,7 @@ final_models <- function(run_info, final_model_list <- c(local_model_list, global_model_list) # simple model averaging - if (average_models & length(final_model_list) > 1 & !best_model_check) { - + if (average_models & length(final_model_list) > 1) { # create model combinations list model_combinations <- tibble::tibble() @@ -360,7 +336,6 @@ final_models <- function(run_info, .noexport = NULL ) %op% { - # get list of models to average model_list <- strsplit(x, "_")[[1]] @@ -385,7 +360,34 @@ final_models <- function(run_info, averages_tbl <- NULL } - # choose best model + # choose best average model + if (!is.null(averages_tbl)) { + avg_back_test_mape <- averages_tbl %>% + dplyr::mutate( + Train_Test_ID = as.numeric(Train_Test_ID), + Target = ifelse(Target == 0, 0.1, Target) + ) %>% + dplyr::filter(Train_Test_ID != 1) %>% + dplyr::mutate(MAPE = round(abs((Forecast - Target) / Target), digits = 4)) + + avg_best_model_mape <- avg_back_test_mape %>% + dplyr::group_by(Model_ID, Combo) %>% + dplyr::mutate( + Combo_Total = sum(abs(Target), na.rm = TRUE), + weighted_MAPE = (abs(Target) / Combo_Total) * MAPE + ) %>% + dplyr::summarise(Rolling_MAPE = sum(weighted_MAPE, na.rm = TRUE)) %>% + dplyr::arrange(Rolling_MAPE) %>% + dplyr::ungroup() %>% + dplyr::group_by(Combo) %>% + dplyr::slice(1) %>% + dplyr::ungroup() + + avg_best_model_tbl <- avg_best_model_mape %>% + dplyr::select(Combo, Model_ID) + } + + # choose best overall model final_predictions_tbl <- predictions_tbl %>% dplyr::select(Combo, Model_ID, Train_Test_ID, Date, Forecast, Target) %>% rbind(averages_tbl) @@ -513,7 +515,6 @@ final_models <- function(run_info, ) } } else { # choose the most accurate individual model and write outputs - final_model_tbl <- tibble::tibble(Model_ID = final_model_list) %>% dplyr::left_join( best_model_final_tbl %>% @@ -522,6 +523,35 @@ final_models <- function(run_info, ) %>% dplyr::mutate(Best_Model = ifelse(!is.na(Best_Model), "Yes", "No")) + if (!is.null(averages_tbl)) { + avg_model_final_tbl <- averages_tbl %>% + dplyr::right_join(avg_best_model_tbl, + by = c("Combo", "Model_ID") + ) %>% + dplyr::mutate( + Combo_ID = Combo, + Model_Name = "NA", + Model_Type = "local", + Recipe_ID = "simple_average", + Hyperparameter_ID = "NA", + Best_Model = "No" + ) %>% + dplyr::group_by(Combo_ID, Model_ID, Train_Test_ID) %>% + dplyr::mutate(Horizon = dplyr::row_number()) %>% + dplyr::ungroup() %>% + create_prediction_intervals(model_train_test_tbl) %>% + convert_weekly_to_daily(date_type, weekly_to_daily) + + write_data( + x = avg_model_final_tbl, + combo = unique(avg_model_final_tbl$Combo), + run_info = run_info, + output_type = "data", + folder = "forecasts", + suffix = "-average_models" + ) + } + if (!is.null(single_model_tbl)) { single_model_final_tbl <- single_model_tbl %>% remove_best_model() %>% @@ -580,13 +610,24 @@ final_models <- function(run_info, } } - return(best_model_mape) + return(data.frame(Combo_Hash = combo)) } %>% base::suppressPackageStartupMessages() # clean up any parallel run process par_end(cl) + # condense outputs into less files for larger runs + if (length(combo_list) > 10000) { + cli::cli_progress_step("Condensing Forecasts") + + condense_data( + run_info, + parallel_processing, + num_cores + ) + } + # reconcile hierarchical forecasts if (forecast_approach != "bottoms_up") { cli::cli_progress_step("Reconciling Hierarchical Forecasts") @@ -600,6 +641,24 @@ final_models <- function(run_info, ) } + # calculate weighted mape + weighted_mape <- get_forecast_data(run_info = run_info) %>% + dplyr::filter( + Run_Type == "Back_Test", + Best_Model == "Yes" + ) %>% + dplyr::mutate( + Target = ifelse(Target == 0, 0.1, Target) + ) %>% + dplyr::mutate( + MAPE = round(abs((Forecast - Target) / Target), digits = 4), + Total = sum(Target, na.rm = TRUE), + Weight = (MAPE * Target) / Total + ) %>% + dplyr::pull(Weight) %>% + sum() %>% + round(digits = 4) + # update logging file log_df <- read_file(run_info, path = paste0("logs/", hash_data(run_info$experiment_name), "-", hash_data(run_info$run_name), ".csv"), @@ -608,7 +667,7 @@ final_models <- function(run_info, dplyr::mutate( average_models = average_models, max_model_average = max_model_average, - weighted_mape = base::mean(best_model_tbl$Rolling_MAPE, na.rm = TRUE) + weighted_mape = round(weighted_mape, digits = 4) ) write_data( diff --git a/R/forecast_time_series.R b/R/forecast_time_series.R index 3177edd2..fe35feca 100644 --- a/R/forecast_time_series.R +++ b/R/forecast_time_series.R @@ -323,7 +323,6 @@ forecast_backwards_compatibility <- function(run_info, dplyr::select(Combo, Model, Best_Model) %>% dplyr::distinct() } else { - # read in unreconciled results best_model_tbl <- read_file(run_info, path = paste0( diff --git a/R/hierarchy.R b/R/hierarchy.R index da0ab062..544d802d 100644 --- a/R/hierarchy.R +++ b/R/hierarchy.R @@ -82,7 +82,6 @@ prep_hierarchical_data <- function(input_data, hierarchical_tbl <- hierarchical_tbl %>% dplyr::left_join(temp_tbl, by = c("Date")) } else if (value_level != "All") { - # agg by lowest level bottom_tbl <- input_data_adj %>% tidyr::unite("Combo", @@ -400,7 +399,6 @@ reconcile_hierarchical_data <- function(run_info, forecast_approach, negative_forecast = FALSE, num_cores) { - # get run splits model_train_test_tbl <- read_file(run_info, path = paste0( @@ -421,6 +419,20 @@ reconcile_hierarchical_data <- function(run_info, original_combo_list <- hts_list$original_combos hts_combo_list <- hts_list$hts_combos + # check if data has been condensed + cond_path <- paste0( + run_info$path, "/forecasts/*", hash_data(run_info$experiment_name), "-", + hash_data(run_info$run_name), "*condensed", ".", run_info$data_output + ) + + condensed_files <- list_files(run_info$storage_object, fs::path(cond_path)) + + if (length(condensed_files) > 0) { + condensed <- TRUE + } else { + condensed <- FALSE + } + # get unreconciled forecast data if (is.null(parallel_processing)) { return_type <- "df" @@ -430,10 +442,17 @@ reconcile_hierarchical_data <- function(run_info, return_type <- "df" } - fcst_path <- paste0( - "/forecasts/*", hash_data(run_info$experiment_name), "-", - hash_data(run_info$run_name), "*models", ".", run_info$data_output - ) + if (condensed) { + fcst_path <- paste0( + "/forecasts/*", hash_data(run_info$experiment_name), "-", + hash_data(run_info$run_name), "*condensed", ".", run_info$data_output + ) + } else { + fcst_path <- paste0( + "/forecasts/*", hash_data(run_info$experiment_name), "-", + hash_data(run_info$run_name), "*models", ".", run_info$data_output + ) + } unreconciled_tbl <- read_file(run_info, path = fcst_path, @@ -677,11 +696,6 @@ reconcile_hierarchical_data <- function(run_info, ) %>% dplyr::select(Combo, Date, Target) - fcst_path <- paste0( - "/forecasts/*", hash_data(run_info$experiment_name), "-", - hash_data(run_info$run_name), "*models", ".", run_info$data_output - ) - schema <- arrow::schema( arrow::field("Combo_ID", arrow::string()), arrow::field("Model_ID", arrow::string()), @@ -873,7 +887,6 @@ reconcile_hierarchical_data <- function(run_info, external_regressor_mapping <- function(data, combo_variables, external_regressors) { - # create var combinations list var_combinations <- tibble::tibble() @@ -902,7 +915,6 @@ external_regressor_mapping <- function(data, .multicombine = TRUE, .noexport = NULL ) %do% { - # get unique values of regressor per combo variable iteration var_unique_tbl <- foreach::foreach( var = iter_list, @@ -984,7 +996,6 @@ sum_hts_data <- function(bottom_level_tbl, forecast_approach, frequency_number, return_type = "data") { - # create aggregations for target variable Date <- bottom_level_tbl$Date diff --git a/R/input_checks.R b/R/input_checks.R index 0e174c0f..174a300b 100644 --- a/R/input_checks.R +++ b/R/input_checks.R @@ -1,4 +1,3 @@ - #' Check input values #' #' @param input_name input name @@ -13,21 +12,23 @@ check_input_type <- function(input_name, type, expected_value = NULL) { if (!inherits(input_value, type)) { - stop(paste0( - "invalid type for input name '", input_name, "', needs to be of type ", - glue::glue_collapse(type, " or ") - ), - call. = FALSE + stop( + paste0( + "invalid type for input name '", input_name, "', needs to be of type ", + glue::glue_collapse(type, " or ") + ), + call. = FALSE ) } if (!is.null(expected_value) & !is.null(input_value)) { if (!sum(input_value %in% expected_value)) { - stop(paste0( - "invalid value for input name '", input_name, "', value needs to equal ", - glue::glue_collapse(expected_value, " or ") - ), - call. = FALSE + stop( + paste0( + "invalid value for input name '", input_name, "', value needs to equal ", + glue::glue_collapse(expected_value, " or ") + ), + call. = FALSE ) } } @@ -52,7 +53,6 @@ check_input_data <- function(input_data, date_type, fiscal_year_start, parallel_processing) { - # data combo names match the input data if (sum(combo_variables %in% colnames(input_data)) != length(combo_variables)) { stop("combo variables do not match column headers in input data") @@ -103,7 +103,6 @@ check_input_data <- function(input_data, # input_data is correct type for parallel processing if (inherits(input_data, c("data.frame", "tbl")) & is.null(parallel_processing)) { - # do nothing } else if (inherits(input_data, "tbl_spark") & is.null(parallel_processing)) { stop("spark data frames should run with spark parallel processing", @@ -148,7 +147,6 @@ check_input_data <- function(input_data, check_parallel_processing <- function(run_info, parallel_processing, inner_parallel = FALSE) { - # parallel processing formatting if (is.null(parallel_processing)) { return() diff --git a/R/models.R b/R/models.R index 8cc9e420..67015038 100644 --- a/R/models.R +++ b/R/models.R @@ -701,7 +701,6 @@ glmnet <- function(train_data, horizon, external_regressors, frequency) { - # create model recipe and spec if (multistep) { recipe_spec_glmnet <- train_data %>% @@ -1328,7 +1327,6 @@ xgboost <- function(train_data, horizon, external_regressors, frequency) { - # create model recipe and spec if (multistep) { recipe_spec_xgboost <- train_data %>% diff --git a/R/multistep_cubist.R b/R/multistep_cubist.R index 8a005a4c..e03c047f 100644 --- a/R/multistep_cubist.R +++ b/R/multistep_cubist.R @@ -1,4 +1,3 @@ - # CUBIST Multistep ---- #' Initialize custom cubist parsnip model @@ -298,7 +297,6 @@ cubist_multistep_fit_impl <- function(x, y, external_regressors = NULL, forecast_horizon = NULL, selected_features = NULL) { - # X & Y # Expect outcomes = vector # Expect predictor = data.frame @@ -321,7 +319,6 @@ cubist_multistep_fit_impl <- function(x, y, model_predictions <- list() for (lag in get_multi_lags(lag_periods, forecast_horizon)) { - # get final features based on lag xreg_tbl_final <- multi_feature_selection( xreg_tbl, @@ -438,7 +435,6 @@ predict.cubist_multistep_fit_impl <- function(object, new_data, ...) { #' @keywords internal #' @export cubist_multistep_predict_impl <- function(object, new_data, ...) { - # Date Mapping Table date_tbl <- new_data %>% dplyr::select(Date, Date_index.num) %>% diff --git a/R/multistep_glmnet.R b/R/multistep_glmnet.R index f177aaa0..b67e8217 100644 --- a/R/multistep_glmnet.R +++ b/R/multistep_glmnet.R @@ -1,4 +1,3 @@ - # GLMNET Multistep ---- #' Initialize custom glmnet parsnip model @@ -282,7 +281,6 @@ glmnet_multistep_fit_impl <- function(x, y, external_regressors = NULL, forecast_horizon = NULL, selected_features = NULL) { - # X & Y # Expect outcomes = vector # Expect predictor = data.frame @@ -311,7 +309,6 @@ glmnet_multistep_fit_impl <- function(x, y, parsnip::set_engine("glmnet") for (lag in get_multi_lags(lag_periods, forecast_horizon)) { - # get final features based on lag xreg_tbl_final <- multi_feature_selection( xreg_tbl, @@ -426,7 +423,6 @@ predict.glmnet_multistep_fit_impl <- function(object, new_data, ...) { #' @keywords internal #' @export glmnet_multistep_predict_impl <- function(object, new_data, ...) { - # Date Mapping Table date_tbl <- new_data %>% dplyr::select(Date, Date_index.num) %>% diff --git a/R/multistep_helper.R b/R/multistep_helper.R index 9cd1cdc9..b8435cb1 100644 --- a/R/multistep_helper.R +++ b/R/multistep_helper.R @@ -1,4 +1,3 @@ - # Helper Functions ---- #' Return xregs that contain future values for multistep horizon forecast diff --git a/R/multistep_mars.R b/R/multistep_mars.R index 02e84e5b..68899d9e 100644 --- a/R/multistep_mars.R +++ b/R/multistep_mars.R @@ -1,4 +1,3 @@ - # MARS Multistep ---- #' Initialize custom mars parsnip model @@ -303,7 +302,6 @@ mars_multistep_fit_impl <- function(x, y, external_regressors = NULL, forecast_horizon = NULL, selected_features = NULL) { - # X & Y # Expect outcomes = vector # Expect predictor = data.frame @@ -334,7 +332,6 @@ mars_multistep_fit_impl <- function(x, y, parsnip::set_engine("earth") for (lag in get_multi_lags(lag_periods, forecast_horizon)) { - # get final features based on lag xreg_tbl_final <- multi_feature_selection( xreg_tbl, @@ -449,7 +446,6 @@ predict.mars_multistep_fit_impl <- function(object, new_data, ...) { #' @keywords internal #' @export mars_multistep_predict_impl <- function(object, new_data, ...) { - # Date Mapping Table date_tbl <- new_data %>% dplyr::select(Date, Date_index.num) %>% diff --git a/R/multistep_svm_poly.R b/R/multistep_svm_poly.R index 14ad5e21..4b57a8b6 100644 --- a/R/multistep_svm_poly.R +++ b/R/multistep_svm_poly.R @@ -1,4 +1,3 @@ - # SVM-POLY Multistep ---- #' Initialize custom svm-poly parsnip model @@ -325,7 +324,6 @@ svm_poly_multistep_fit_impl <- function(x, y, external_regressors = NULL, forecast_horizon = NULL, selected_features = NULL) { - # X & Y # Expect outcomes = vector # Expect predictor = data.frame @@ -357,7 +355,6 @@ svm_poly_multistep_fit_impl <- function(x, y, parsnip::set_engine("kernlab") for (lag in get_multi_lags(lag_periods, forecast_horizon)) { - # get final features based on lag xreg_tbl_final <- multi_feature_selection( xreg_tbl, @@ -475,7 +472,6 @@ predict.svm_poly_multistep_fit_impl <- function(object, new_data, ...) { #' @keywords internal #' @export svm_poly_multistep_predict_impl <- function(object, new_data, ...) { - # Date Mapping Table date_tbl <- new_data %>% dplyr::select(Date, Date_index.num) %>% diff --git a/R/multistep_svm_rbf.R b/R/multistep_svm_rbf.R index 6a7f779f..d7480912 100644 --- a/R/multistep_svm_rbf.R +++ b/R/multistep_svm_rbf.R @@ -1,4 +1,3 @@ - # SVM-RBF Multistep ---- #' Initialize custom svm-rbf parsnip model @@ -306,7 +305,6 @@ svm_rbf_multistep_fit_impl <- function(x, y, external_regressors = NULL, forecast_horizon = NULL, selected_features = NULL) { - # X & Y # Expect outcomes = vector # Expect predictor = data.frame @@ -337,7 +335,6 @@ svm_rbf_multistep_fit_impl <- function(x, y, parsnip::set_engine("kernlab") for (lag in get_multi_lags(lag_periods, forecast_horizon)) { - # get final features based on lag xreg_tbl_final <- multi_feature_selection( xreg_tbl, @@ -455,7 +452,6 @@ predict.svm_rbf_multistep_fit_impl <- function(object, new_data, ...) { #' @keywords internal #' @export svm_rbf_multistep_predict_impl <- function(object, new_data, ...) { - # Date Mapping Table date_tbl <- new_data %>% dplyr::select(Date, Date_index.num) %>% diff --git a/R/multistep_xgboost.R b/R/multistep_xgboost.R index 03b327f1..9ce5332d 100644 --- a/R/multistep_xgboost.R +++ b/R/multistep_xgboost.R @@ -1,4 +1,3 @@ - # XGBOOST Multistep ---- #' Initialize custom xgboost parsnip model @@ -389,7 +388,6 @@ xgboost_multistep_fit_impl <- function(x, y, forecast_horizon = NULL, selected_features = NULL, ...) { - # X & Y # Expect outcomes = vector # Expect predictor = data.frame @@ -412,7 +410,6 @@ xgboost_multistep_fit_impl <- function(x, y, model_predictions <- list() for (lag in get_multi_lags(lag_periods, forecast_horizon)) { - # get final features based on lag xreg_tbl_final <- multi_feature_selection( xreg_tbl, @@ -437,7 +434,7 @@ xgboost_multistep_fit_impl <- function(x, y, y = outcome, max_depth = max_depth, nrounds = nrounds, - eta = eta, + eta = eta, colsample_bytree = colsample_bytree, colsample_bynode = colsample_bynode, min_child_weight = min_child_weight, @@ -537,7 +534,6 @@ predict.xgboost_multistep_fit_impl <- function(object, new_data, ...) { #' @keywords internal #' @export xgboost_multistep_predict_impl <- function(object, new_data, ...) { - # Date Mapping Table date_tbl <- new_data %>% dplyr::select(Date, Date_index.num) %>% diff --git a/R/parallel_util.R b/R/parallel_util.R index 751ccad1..3746d971 100644 --- a/R/parallel_util.R +++ b/R/parallel_util.R @@ -34,7 +34,7 @@ par_start <- function(run_info, ) parallel_packages <- c( - "gtools", "hts", "magrittr", "methods", "base", "modeltime.resample", + "gtools", "hts", "magrittr", "methods", "base", "plyr", "rsample" ) diff --git a/R/prep_data.R b/R/prep_data.R index 41d28735..92108afd 100644 --- a/R/prep_data.R +++ b/R/prep_data.R @@ -227,7 +227,6 @@ prep_data <- function(run_info, dplyr::filter(Combo %in% current_combo_list_final) if (length(combo_diff) == 0 & length(prev_combo_list) > 0) { - # check if input values have changed current_log_df <- tibble::tibble( combo_variables = paste(combo_variables, collapse = "---"), @@ -466,204 +465,205 @@ prep_data <- function(run_info, } else if (parallel_processing == "spark") { final_data <- filtered_initial_prep_tbl %>% adjust_df(return_type = "sdf") %>% - sparklyr::spark_apply(function(df, context) { - # update objects - fn_env <- .GlobalEnv - - for (name in names(context)) { - assign(name, context[[name]], envir = fn_env) - } - - # get specific time series - combo <- unique(df$Combo) - - return_tbl <- tibble::tibble( - Combo = combo, - Combo_Hash = hash_data(combo) - ) - - # handle external regressors - xregs_future_tbl <- get_xregs_future_values_tbl( - df, - external_regressors, - hist_end_date - ) - - if (length(colnames(xregs_future_tbl)) > 2) { - xregs_future_list <- xregs_future_tbl %>% - dplyr::select(-Date, -Combo) %>% - colnames() - } else { - xregs_future_list <- NULL - } + sparklyr::spark_apply( + function(df, context) { + # update objects + fn_env <- .GlobalEnv - # initial data prep - initial_tbl <- df %>% - dplyr::filter(Combo == combo) %>% - dplyr::select( - Combo, - Date, - Target, - tidyselect::all_of(external_regressors) - ) %>% - dplyr::group_by(Combo) %>% - timetk::pad_by_time(Date, - .by = date_type, - .pad_value = ifelse(clean_missing_values, NA, 0), - .end_date = hist_end_date - ) %>% # fill in missing values in between existing data points - timetk::pad_by_time(Date, - .by = date_type, - .pad_value = 0, - .start_date = hist_start_date, - .end_date = hist_end_date - ) %>% # fill in missing values at beginning of time series with zero - timetk::future_frame(Date, - .length_out = forecast_horizon, - .bind_data = TRUE - ) %>% # add future data - dplyr::ungroup() %>% - dplyr::left_join(xregs_future_tbl, - by = c("Combo", "Date") - ) %>% # join xregs that contain values given by user - clean_outliers_missing_values( - clean_outliers, - clean_missing_values, - get_frequency_number(date_type), - external_regressors - ) %>% # clean outliers and missing values - dplyr::mutate_if(is.numeric, list(~ replace(., is.infinite(.), NA))) %>% # replace infinite values - dplyr::mutate_if(is.numeric, list(~ replace(., is.nan(.), NA))) %>% # replace NaN values - dplyr::mutate_if(is.numeric, list(~ replace(., is.na(.), 0))) %>% # replace NA values - dplyr::mutate(Target = ifelse(Date > hist_end_date, - NA, - Target - )) + for (name in names(context)) { + assign(name, context[[name]], envir = fn_env) + } - # box-cox transformation - if (box_cox) { - box_cox_tbl <- initial_tbl %>% - apply_box_cox() + # get specific time series + combo <- unique(df$Combo) - initial_tbl <- box_cox_tbl$data + return_tbl <- tibble::tibble( + Combo = combo, + Combo_Hash = hash_data(combo) + ) - return_tbl <- return_tbl %>% - dplyr::left_join(box_cox_tbl$diff_info, by = "Combo") - } + # handle external regressors + xregs_future_tbl <- get_xregs_future_values_tbl( + df, + external_regressors, + hist_end_date + ) - # make stationary - if (stationary) { - stationary_tbl <- initial_tbl %>% - make_stationary() + if (length(colnames(xregs_future_tbl)) > 2) { + xregs_future_list <- xregs_future_tbl %>% + dplyr::select(-Date, -Combo) %>% + colnames() + } else { + xregs_future_list <- NULL + } - initial_tbl <- stationary_tbl$data + # initial data prep + initial_tbl <- df %>% + dplyr::filter(Combo == combo) %>% + dplyr::select( + Combo, + Date, + Target, + tidyselect::all_of(external_regressors) + ) %>% + dplyr::group_by(Combo) %>% + timetk::pad_by_time(Date, + .by = date_type, + .pad_value = ifelse(clean_missing_values, NA, 0), + .end_date = hist_end_date + ) %>% # fill in missing values in between existing data points + timetk::pad_by_time(Date, + .by = date_type, + .pad_value = 0, + .start_date = hist_start_date, + .end_date = hist_end_date + ) %>% # fill in missing values at beginning of time series with zero + timetk::future_frame(Date, + .length_out = forecast_horizon, + .bind_data = TRUE + ) %>% # add future data + dplyr::ungroup() %>% + dplyr::left_join(xregs_future_tbl, + by = c("Combo", "Date") + ) %>% # join xregs that contain values given by user + clean_outliers_missing_values( + clean_outliers, + clean_missing_values, + get_frequency_number(date_type), + external_regressors + ) %>% # clean outliers and missing values + dplyr::mutate_if(is.numeric, list(~ replace(., is.infinite(.), NA))) %>% # replace infinite values + dplyr::mutate_if(is.numeric, list(~ replace(., is.nan(.), NA))) %>% # replace NaN values + dplyr::mutate_if(is.numeric, list(~ replace(., is.na(.), 0))) %>% # replace NA values + dplyr::mutate(Target = ifelse(Date > hist_end_date, + NA, + Target + )) + + # box-cox transformation + if (box_cox) { + box_cox_tbl <- initial_tbl %>% + apply_box_cox() + + initial_tbl <- box_cox_tbl$data + + return_tbl <- return_tbl %>% + dplyr::left_join(box_cox_tbl$diff_info, by = "Combo") + } - return_tbl <- return_tbl %>% - dplyr::left_join(stationary_tbl$diff_info, by = "Combo") - } + # make stationary + if (stationary) { + stationary_tbl <- initial_tbl %>% + make_stationary() - # create date features - date_features <- initial_tbl %>% - dplyr::select(Date) %>% - dplyr::mutate( - Date_Adj = Date %m+% months(fiscal_year_start - 1), - Date_day_month_end = ifelse(lubridate::day(Date_Adj) == lubridate::days_in_month(Date_Adj), 1, 0) - ) %>% - timetk::tk_augment_timeseries_signature(Date_Adj) %>% - dplyr::select(!tidyselect::matches(get_date_regex(date_type)), -Date_Adj, -Date) + initial_tbl <- stationary_tbl$data - names(date_features) <- stringr::str_c("Date_", names(date_features)) + return_tbl <- return_tbl %>% + dplyr::left_join(stationary_tbl$diff_info, by = "Combo") + } - initial_tbl <- initial_tbl %>% - cbind(date_features) + # create date features + date_features <- initial_tbl %>% + dplyr::select(Date) %>% + dplyr::mutate( + Date_Adj = Date %m+% months(fiscal_year_start - 1), + Date_day_month_end = ifelse(lubridate::day(Date_Adj) == lubridate::days_in_month(Date_Adj), 1, 0) + ) %>% + timetk::tk_augment_timeseries_signature(Date_Adj) %>% + dplyr::select(!tidyselect::matches(get_date_regex(date_type)), -Date_Adj, -Date) - # Run Recipes - if (is.null(recipes_to_run)) { - run_all_recipes_override <- FALSE - } else if (recipes_to_run == "all") { - run_all_recipes_override <- TRUE - } else { - run_all_recipes_override <- FALSE - } + names(date_features) <- stringr::str_c("Date_", names(date_features)) - if (is.null(recipes_to_run) | "R1" %in% recipes_to_run | run_all_recipes_override) { - R1 <- initial_tbl %>% - multivariate_prep_recipe_1(external_regressors, - xregs_future_values_list = xregs_future_list, - get_fourier_periods(fourier_periods, date_type), - get_lag_periods(lag_periods, date_type, forecast_horizon, multistep_horizon, TRUE), - get_rolling_window_periods(rolling_window_periods, date_type) - ) %>% - dplyr::mutate(Target = base::ifelse(Date > hist_end_date, NA, Target)) + initial_tbl <- initial_tbl %>% + cbind(date_features) - write_data( - x = R1, - combo = combo, - run_info = run_info, - output_type = "data", - folder = "prep_data", - suffix = "-R1" - ) - } + # Run Recipes + if (is.null(recipes_to_run)) { + run_all_recipes_override <- FALSE + } else if (recipes_to_run == "all") { + run_all_recipes_override <- TRUE + } else { + run_all_recipes_override <- FALSE + } - if ((is.null(recipes_to_run) & date_type %in% c("month", "quarter", "year")) | "R2" %in% recipes_to_run | run_all_recipes_override) { - R2 <- initial_tbl %>% - multivariate_prep_recipe_2(external_regressors, - xregs_future_values_list = xregs_future_list, - get_fourier_periods(fourier_periods, date_type), - get_lag_periods(lag_periods, date_type, forecast_horizon), - get_rolling_window_periods(rolling_window_periods, date_type), - date_type, - forecast_horizon - ) %>% - dplyr::mutate(Target = base::ifelse(Date > hist_end_date, NA, Target)) + if (is.null(recipes_to_run) | "R1" %in% recipes_to_run | run_all_recipes_override) { + R1 <- initial_tbl %>% + multivariate_prep_recipe_1(external_regressors, + xregs_future_values_list = xregs_future_list, + get_fourier_periods(fourier_periods, date_type), + get_lag_periods(lag_periods, date_type, forecast_horizon, multistep_horizon, TRUE), + get_rolling_window_periods(rolling_window_periods, date_type) + ) %>% + dplyr::mutate(Target = base::ifelse(Date > hist_end_date, NA, Target)) + + write_data( + x = R1, + combo = combo, + run_info = run_info, + output_type = "data", + folder = "prep_data", + suffix = "-R1" + ) + } - write_data( - x = R2, - combo = combo, - run_info = run_info, - output_type = "data", - folder = "prep_data", - suffix = "-R2" - ) - } + if ((is.null(recipes_to_run) & date_type %in% c("month", "quarter", "year")) | "R2" %in% recipes_to_run | run_all_recipes_override) { + R2 <- initial_tbl %>% + multivariate_prep_recipe_2(external_regressors, + xregs_future_values_list = xregs_future_list, + get_fourier_periods(fourier_periods, date_type), + get_lag_periods(lag_periods, date_type, forecast_horizon), + get_rolling_window_periods(rolling_window_periods, date_type), + date_type, + forecast_horizon + ) %>% + dplyr::mutate(Target = base::ifelse(Date > hist_end_date, NA, Target)) + + write_data( + x = R2, + combo = combo, + run_info = run_info, + output_type = "data", + folder = "prep_data", + suffix = "-R2" + ) + } - return(data.frame(return_tbl)) - }, - group_by = "Combo", - context = list( - get_xregs_future_values_tbl = get_xregs_future_values_tbl, - external_regressors = external_regressors, - clean_missing_values = clean_missing_values, - clean_outliers_missing_values = clean_outliers_missing_values, - hash_data = hash_data, - hist_end_date = hist_end_date, - hist_start_date = hist_start_date, - forecast_approach = forecast_approach, - forecast_horizon = forecast_horizon, - clean_outliers = clean_outliers, - get_frequency_number = get_frequency_number, - date_type = date_type, - fiscal_year_start = fiscal_year_start, - get_date_regex = get_date_regex, - recipes_to_run = recipes_to_run, - multivariate_prep_recipe_1 = multivariate_prep_recipe_1, - multivariate_prep_recipe_2 = multivariate_prep_recipe_2, - run_info = run_info, - get_fourier_periods = get_fourier_periods, - fourier_periods = fourier_periods, - get_lag_periods = get_lag_periods, - lag_periods = lag_periods, - get_rolling_window_periods = get_rolling_window_periods, - rolling_window_periods = rolling_window_periods, - write_data = write_data, - write_data_folder = write_data_folder, - write_data_type = write_data_type, - box_cox = box_cox, - stationary = stationary, - make_stationary = make_stationary, - apply_box_cox = apply_box_cox - ) + return(data.frame(return_tbl)) + }, + group_by = "Combo", + context = list( + get_xregs_future_values_tbl = get_xregs_future_values_tbl, + external_regressors = external_regressors, + clean_missing_values = clean_missing_values, + clean_outliers_missing_values = clean_outliers_missing_values, + hash_data = hash_data, + hist_end_date = hist_end_date, + hist_start_date = hist_start_date, + forecast_approach = forecast_approach, + forecast_horizon = forecast_horizon, + clean_outliers = clean_outliers, + get_frequency_number = get_frequency_number, + date_type = date_type, + fiscal_year_start = fiscal_year_start, + get_date_regex = get_date_regex, + recipes_to_run = recipes_to_run, + multivariate_prep_recipe_1 = multivariate_prep_recipe_1, + multivariate_prep_recipe_2 = multivariate_prep_recipe_2, + run_info = run_info, + get_fourier_periods = get_fourier_periods, + fourier_periods = fourier_periods, + get_lag_periods = get_lag_periods, + lag_periods = lag_periods, + get_rolling_window_periods = get_rolling_window_periods, + rolling_window_periods = rolling_window_periods, + write_data = write_data, + write_data_folder = write_data_folder, + write_data_type = write_data_type, + box_cox = box_cox, + stationary = stationary, + make_stationary = make_stationary, + apply_box_cox = apply_box_cox + ) ) } @@ -693,12 +693,13 @@ prep_data <- function(run_info, length() if (successful_combos != total_combos) { - stop(paste0( - "Not all time series were prepped within 'prep_data', expected ", - total_combos, " time series but only ", successful_combos, - " time series are prepped. ", "Please run 'prep_data' again." - ), - call. = FALSE + stop( + paste0( + "Not all time series were prepped within 'prep_data', expected ", + total_combos, " time series but only ", successful_combos, + " time series are prepped. ", "Please run 'prep_data' again." + ), + call. = FALSE ) } @@ -1067,7 +1068,6 @@ apply_box_cox <- function(df) { ) for (column_name in names(df)) { - # Only check numeric columns with more than 2 unique values if (is.numeric(df[[column_name]]) & length(unique(df[[column_name]])) > 2) { temp_tbl <- df %>% @@ -1119,7 +1119,6 @@ make_stationary <- function(df) { ) for (column_name in names(df)) { - # Only check numeric columns with more than 2 unique values if (is.numeric(df[[column_name]]) & length(unique(df[[column_name]])) > 2) { temp_tbl <- df %>% @@ -1183,7 +1182,6 @@ multivariate_prep_recipe_1 <- function(data, rolling_window_periods, hist_end_date, date_type) { - # apply polynomial transformations numeric_xregs <- c() @@ -1345,7 +1343,6 @@ multivariate_prep_recipe_2 <- function(data, } for (period in 1:forecast_horizon) { - # add horizon and origin components data_lag_window <- df_poly %>% dplyr::mutate( diff --git a/R/prep_models.R b/R/prep_models.R index 7b9dede0..5602820d 100644 --- a/R/prep_models.R +++ b/R/prep_models.R @@ -1,4 +1,3 @@ - #' Prep Models #' #' Preps various aspects of run before training models. Things like train/test @@ -60,7 +59,6 @@ prep_models <- function(run_info, pca = NULL, num_hyperparameters = 10, seed = 123) { - # check input values check_input_type("run_info", run_info, "list") check_input_type("back_test_scenarios", back_test_scenarios, c("NULL", "numeric")) @@ -514,7 +512,6 @@ model_workflows <- function(run_info, ml_models <- list_models() if (is.null(models_to_run) & is.null(models_not_to_run)) { - # do nothing, using existing ml_models list } else if (is.null(models_to_run) & !is.null(models_not_to_run)) { ml_models <- setdiff(ml_models, models_not_to_run) diff --git a/R/read_write_data.R b/R/read_write_data.R index 34e0c2d3..2e43572a 100644 --- a/R/read_write_data.R +++ b/R/read_write_data.R @@ -1,4 +1,3 @@ - #' Get Final Forecast Data #' #' @param run_info run info using the [set_run_info()] function @@ -46,7 +45,6 @@ #' @export get_forecast_data <- function(run_info, return_type = "df") { - # check input values check_input_type("run_info", run_info, "list") check_input_type("return_type", return_type, "character", c("df", "sdf")) @@ -60,7 +58,7 @@ get_forecast_data <- function(run_info, combo_variables <- strsplit(log_df$combo_variables, split = "---")[[1]] forecast_approach <- log_df$forecast_approach - # get forecast data + # get train test split data model_train_test_tbl <- read_file(run_info, path = paste0( "/prep_models/", hash_data(run_info$experiment_name), "-", hash_data(run_info$run_name), @@ -71,15 +69,35 @@ get_forecast_data <- function(run_info, dplyr::select(Run_Type, Train_Test_ID) %>% dplyr::mutate(Train_Test_ID = as.numeric(Train_Test_ID)) - if (forecast_approach == "bottoms_up") { + # check if data has been condensed + cond_path <- paste0( + run_info$path, "/forecasts/*", hash_data(run_info$experiment_name), "-", + hash_data(run_info$run_name), "*condensed", ".", run_info$data_output + ) + + condensed_files <- list_files(run_info$storage_object, fs::path(cond_path)) + + if (length(condensed_files) > 0) { + condensed <- TRUE + } else { + condensed <- FALSE + } + + # get forecast data + if (forecast_approach != "bottoms_up") { fcst_path <- paste0( "/forecasts/*", hash_data(run_info$experiment_name), "-", - hash_data(run_info$run_name), "*models", ".", run_info$data_output + hash_data(run_info$run_name), "*reconciled", ".", run_info$data_output + ) + } else if (condensed) { + fcst_path <- paste0( + "/forecasts/*", hash_data(run_info$experiment_name), "-", + hash_data(run_info$run_name), "*condensed", ".", run_info$data_output ) } else { fcst_path <- paste0( "/forecasts/*", hash_data(run_info$experiment_name), "-", - hash_data(run_info$run_name), "*reconciled", ".", run_info$data_output + hash_data(run_info$run_name), "*models", ".", run_info$data_output ) } @@ -152,7 +170,6 @@ get_forecast_data <- function(run_info, #' } #' @export get_trained_models <- function(run_info) { - # check input values check_input_type("run_info", run_info, "list") @@ -208,7 +225,6 @@ get_trained_models <- function(run_info) { get_prepped_data <- function(run_info, recipe, return_type = "df") { - # check input values check_input_type("run_info", run_info, "list") check_input_type("recipe", recipe, "character", c("R1", "R2")) @@ -279,7 +295,6 @@ get_prepped_data <- function(run_info, #' } #' @export get_prepped_models <- function(run_info) { - # check input values check_input_type("run_info", run_info, "list") @@ -395,7 +410,7 @@ write_data_type <- function(x, switch(type, rds = saveRDS(x, path), parquet = arrow::write_parquet(x, path), - csv = utils::write.csv(x, path, row.names = FALSE), + csv = vroom::vroom_write(x, path, delim = ",", progress = FALSE), qs = qs::qsave(x, path) ) } @@ -499,21 +514,28 @@ download_file <- function(storage_object, #' #' @param run_info run info using the [set_run_info()] function #' @param path file path +#' @param file_list files #' @param return_type type of data output read #' @param schema column schema for arrow::open_dataset() #' #' @return file read into memory #' @noRd read_file <- function(run_info, - path, + path = NULL, + file_list = NULL, return_type = "df", schema = NULL) { - folder <- fs::path_dir(path) storage_object <- run_info$storage_object - initial_path <- run_info$path - file <- fs::path_file(path) - if (inherits(storage_object, c("blob_container", "ms_drive"))) { + if (!is.null(path)) { + folder <- fs::path_dir(path) + initial_path <- run_info$path + file <- fs::path_file(path) + } + + if (!is.null(file_list)) { + files <- file_list + } else if (inherits(storage_object, c("blob_container", "ms_drive"))) { download_file(storage_object, fs::path(initial_path, path), folder) final_path <- fs::path(tempdir(), folder) files <- list_files(NULL, fs::path(final_path, file)) @@ -524,7 +546,10 @@ read_file <- function(run_info, files <- list_files(storage_object, fs::path(initial_path, path)) } - if (fs::path_ext(file) == "*") { + if (!is.null(file_list)) { + file_temp <- files[[1]] + file_ext <- fs::path_ext(file_temp) + } else if (fs::path_ext(file) == "*") { file_temp <- files[[1]] file_ext <- fs::path_ext(file_temp) } else { @@ -659,3 +684,83 @@ get_recipe_data <- function(run_info, return(recipe_tbl) } + +#' Condense forecast output files into less files +#' +#' @param run_info run info using the [set_run_info()] function +#' @param parallel_processing type of parallel processing to run +#' @param num_cores number of cores to use +#' +#' @return nothing +#' @noRd +condense_data <- function(run_info, + parallel_processing = NULL, + num_cores = NULL) { + # get initial list of files to condense + initial_file_list <- list_files( + run_info$storage_object, + paste0( + run_info$path, "/forecasts/*", hash_data(run_info$experiment_name), "-", + hash_data(run_info$run_name), "*_models.", run_info$data_output + ) + ) + + # Initialize an empty list to store the batches + list_of_batches <- list() + + # Define the batch size + batch_size <- 10000 + + # Calculate the number of batches needed + num_batches <- ceiling(length(initial_file_list) / batch_size) + + # Loop through the large list and create batches + for (i in 1:num_batches) { + start_index <- (i - 1) * batch_size + 1 + end_index <- min(i * batch_size, length(initial_file_list)) + batch_name <- paste0("batch_", i) + list_of_batches[[batch_name]] <- initial_file_list[start_index:end_index] + } + + # parallel run info + par_info <- par_start( + run_info = run_info, + parallel_processing = parallel_processing, + num_cores = min(length(names(list_of_batches)), num_cores), + task_length = length(names(list_of_batches)) + ) + + cl <- par_info$cl + packages <- par_info$packages + `%op%` <- par_info$foreach_operator + + # submit tasks + condense_data_tbl <- foreach::foreach( + batch = names(list_of_batches), + .combine = "rbind", + .packages = packages, + .errorhandling = "stop", + .verbose = FALSE, + .inorder = FALSE, + .multicombine = TRUE, + .noexport = NULL + ) %op% { + files <- list_of_batches[[batch]] + + data <- read_file(run_info, + file_list = files, + return_type = "df" + ) + + write_data( + x = data, + combo = batch, + run_info = run_info, + output_type = "data", + folder = "forecasts", + suffix = "-condensed" + ) + + return(batch) + } +} diff --git a/R/run_info.R b/R/run_info.R index bf9acc5b..bffece82 100644 --- a/R/run_info.R +++ b/R/run_info.R @@ -44,7 +44,6 @@ set_run_info <- function(experiment_name = "finn_fcst", data_output = "csv", object_output = "rds", add_unique_id = TRUE) { - # initial input checks if (!inherits(run_name, c("NULL", "character"))) { stop("`run_name` must either be a NULL or a string") @@ -151,7 +150,6 @@ set_run_info <- function(experiment_name = "finn_fcst", base::suppressWarnings() if (nrow(log_df) > 0 & add_unique_id == FALSE) { - # check if input values have changed current_log_df <- tibble::tibble( experiment_name = experiment_name, @@ -266,7 +264,6 @@ get_run_info <- function(experiment_name = NULL, run_name = NULL, storage_object = NULL, path = NULL) { - # input checks if (!inherits(run_name, c("NULL", "character"))) { stop("`run_name` must either be a NULL or a string") diff --git a/R/train_models.R b/R/train_models.R index 511c9383..d8f406e1 100644 --- a/R/train_models.R +++ b/R/train_models.R @@ -1,4 +1,3 @@ - #' Train Individual Models #' #' @param run_info run info using the [set_run_info()] function @@ -220,7 +219,6 @@ train_models <- function(run_info, stringr::str_replace(hash_data("All-Data"), "All-Data") if (length(combo_diff) == 0 & length(prev_combo_list) > 0) { - # check if input values have changed current_log_df <- tibble::tibble( run_global_models = run_global_models, @@ -270,7 +268,6 @@ train_models <- function(run_info, .noexport = NULL ) %op% { - # get time series combo_hash <- x @@ -402,7 +399,6 @@ train_models <- function(run_info, .multicombine = TRUE, .noexport = NULL ) %do% { - # get initial run info model <- model_run %>% dplyr::pull(Model_Name) @@ -719,12 +715,13 @@ train_models <- function(run_info, length() if (successful_combos != total_combos) { - stop(paste0( - "Not all time series were completed within 'train_models', expected ", - total_combos, " time series but only ", successful_combos, - " time series were ran. ", "Please run 'train_models' again." - ), - call. = FALSE + stop( + paste0( + "Not all time series were completed within 'train_models', expected ", + total_combos, " time series but only ", successful_combos, + " time series were ran. ", "Please run 'train_models' again." + ), + call. = FALSE ) } @@ -784,7 +781,6 @@ negative_fcst_adj <- function(data, #' @return tbl with train test splits #' @noRd create_splits <- function(data, train_test_splits) { - # Create the rsplit object analysis_split <- function(data, train_indices, test_indices) { rsplit_object <- rsample::make_splits( @@ -846,7 +842,6 @@ create_splits <- function(data, train_test_splits) { undifference_forecast <- function(forecast_data, recipe_data, diff_tbl) { - # check if data needs to be undifferenced diff1 <- diff_tbl$Diff_Value1 diff2 <- diff_tbl$Diff_Value2 @@ -863,10 +858,8 @@ undifference_forecast <- function(forecast_data, # non seasonal differencing if (!is.na(diff1)) { - # loop through each back test split for (id in train_test_id) { - # get specific train test split fcst_temp_tbl <- forecast_data %>% dplyr::filter(Train_Test_ID == id) @@ -950,7 +943,6 @@ undifference_forecast <- function(forecast_data, undifference_recipe <- function(recipe_data, diff_tbl, hist_end_date) { - # check if data needs to be undifferenced diff1 <- diff_tbl$Diff_Value1 diff2 <- diff_tbl$Diff_Value2 diff --git a/R/utility.R b/R/utility.R index cb5a57d5..50125e32 100644 --- a/R/utility.R +++ b/R/utility.R @@ -15,7 +15,8 @@ utils::globalVariables(c( "Auto_Accept", "Feature", "Imp", "Importance", "LOFO_Var", "Var_RMSE", "Vote", "Votes", "desc", "term", "Column", "Box_Cox_Lambda", "get_recipie_configurable", "Agg", "Unique", "Var", "Var_Combo", "regressor", "regressor_tbl", "value_level_iter", ".actual", ".fitted", - "forecast_horizon", "lag", "new_data", "object", "fit", "Row_Num", "Run_Number" + "forecast_horizon", "lag", "new_data", "object", "fit", "Row_Num", "Run_Number", "weight", + "Total", "Weight", "batch" )) #' @importFrom magrittr %>% @@ -88,7 +89,6 @@ cbind.fill <- function(..., fill = NA) { # been loaded. .onLoad <- function(libname, pkgname) { - # CRAN OMP THREAD LIMIT Sys.setenv("OMP_THREAD_LIMIT" = 1) diff --git a/man/final_models.Rd b/man/final_models.Rd index d6c86402..e6dcaaae 100644 --- a/man/final_models.Rd +++ b/man/final_models.Rd @@ -17,7 +17,8 @@ final_models( \arguments{ \item{run_info}{run info using the \code{\link[=set_run_info]{set_run_info()}} function.} -\item{average_models}{If TRUE, create simple averages of individual models.} +\item{average_models}{If TRUE, create simple averages of individual models +and save the most accurate one.} \item{max_model_average}{Max number of models to average together. Will create model averages for 2 models up until input value or max number of diff --git a/tests/testthat/test-forecast_time_series.R b/tests/testthat/test-forecast_time_series.R index 2b5117d9..3f5bf646 100644 --- a/tests/testthat/test-forecast_time_series.R +++ b/tests/testthat/test-forecast_time_series.R @@ -1,4 +1,3 @@ - # * custom test functions ---- check_exist <- function(to_check, ret) { diff --git a/tests/testthat/test-hierarchical.R b/tests/testthat/test-hierarchical.R index fc7bde9d..61bc0c1b 100644 --- a/tests/testthat/test-hierarchical.R +++ b/tests/testthat/test-hierarchical.R @@ -1,6 +1,4 @@ - test_that("prep_hierarchical_data returns correct grouped hierarchies", { - # Mock data setup data <- tibble::tibble( Segment = as.character(c( @@ -77,7 +75,6 @@ test_that("prep_hierarchical_data returns correct grouped hierarchies", { }) test_that("prep_hierarchical_data returns correct standard hierarchies", { - # Mock data setup data <- tibble::tibble( Area = as.character(c("EMEA", "EMEA", "EMEA", "EMEA", "EMEA", "EMEA", "EMEA", "EMEA", "United States", "United States", "United States", "United States")), diff --git a/tests/testthat/test-multistep_horizon.R b/tests/testthat/test-multistep_horizon.R index de0c6492..7fcb3448 100644 --- a/tests/testthat/test-multistep_horizon.R +++ b/tests/testthat/test-multistep_horizon.R @@ -1,5 +1,4 @@ test_that("multistep_horizon monthly data", { - # Mock data setup data <- timetk::m4_monthly %>% dplyr::mutate(id = as.character(id)) %>% diff --git a/vignettes/back-testing-and-hyperparameter-tuning.Rmd b/vignettes/back-testing-and-hyperparameter-tuning.Rmd index 2c69aeb9..ab9a854d 100644 --- a/vignettes/back-testing-and-hyperparameter-tuning.Rmd +++ b/vignettes/back-testing-and-hyperparameter-tuning.Rmd @@ -22,27 +22,33 @@ When `prep_models()` is ran, hyperparameters and back test splits are calculated library(finnts) hist_data <- timetk::m4_monthly %>% - dplyr::filter(date >= "2012-01-01", - id == "M2") %>% + dplyr::filter( + date >= "2012-01-01", + id == "M2" + ) %>% dplyr::rename(Date = date) %>% dplyr::mutate(id = as.character(id)) run_info <- set_run_info( - experiment_name = "finnts_fcst", + experiment_name = "finnts_fcst", run_name = "get_prepped_models" ) -prep_data(run_info = run_info, - input_data = hist_data, - combo_variables = c("id"), - target_variable = "value", - date_type = "month", - recipes_to_run = "R1", - forecast_horizon = 6) +prep_data( + run_info = run_info, + input_data = hist_data, + combo_variables = c("id"), + target_variable = "value", + date_type = "month", + recipes_to_run = "R1", + forecast_horizon = 6 +) -prep_models(run_info = run_info, - models_to_run = c("arima", "ets", "xgboost"), - num_hyperparameters = 10) +prep_models( + run_info = run_info, + models_to_run = c("arima", "ets", "xgboost"), + num_hyperparameters = 10 +) model_info <- get_prepped_models(run_info = run_info) @@ -67,7 +73,6 @@ model_hyperparameter_info <- model_info %>% tidyr::unnest(Data) print(model_hyperparameter_info) - ``` The above outputs allow a Finn user to understand what hyperparameters are chosen for tuning and how the model refitting process will work. When tuning hyperparameters, Finn uses the "Validation" train/test splits, with the final parameters chosen using RMSE. For some models like ARIMA that don't follow a traditional hyperparameter tuning process, the model is fit from scratch across all train/test splits. After hyperparameters are chosen, the model is refit across the "Back_Test" and "Future_Forecast" splits. The "Back_Test" splits are the true testing data that will be used when selecting the final "Best-Model". "Ensemble" splits are also created as ensemble training data if ensemble models are chosen to run. Ensemble models follow a similar tuning process. diff --git a/vignettes/best-model-selection.Rmd b/vignettes/best-model-selection.Rmd index 6b869aa8..0b791151 100644 --- a/vignettes/best-model-selection.Rmd +++ b/vignettes/best-model-selection.Rmd @@ -23,17 +23,21 @@ suppressMessages(library(dplyr)) message("Simple Back Test Results") back_test_tbl <- tibble( - Combo = c("Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1"), - Date = c("2020-01-01", "2020-02-01", "2020-03-01", "2020-04-01", "2020-05-01", "2020-01-01", "2020-02-01", "2020-03-01", "2020-04-01", "2020-05-01"), - Model = c("arima", "arima", "arima", "arima", "arima", "ets", "ets", "ets", "ets", "ets"), - FCST = c(9, 23, 35, 41, 48, 7, 22, 29, 42, 53), + Combo = c("Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1"), + Date = c("2020-01-01", "2020-02-01", "2020-03-01", "2020-04-01", "2020-05-01", "2020-01-01", "2020-02-01", "2020-03-01", "2020-04-01", "2020-05-01"), + Model = c("arima", "arima", "arima", "arima", "arima", "ets", "ets", "ets", "ets", "ets"), + FCST = c(9, 23, 35, 41, 48, 7, 22, 29, 42, 53), Target = c(10, 20, 30, 40, 50, 10, 20, 30, 40, 50) ) %>% - dplyr::mutate(MAPE = abs(Target-FCST)/Target, - Date = as.Date(Date)) %>% + dplyr::mutate( + MAPE = abs(Target - FCST) / Target, + Date = as.Date(Date) + ) %>% dplyr::group_by(Combo, Model) %>% - dplyr::mutate(Target_Total = sum(Target), - Percent_Total = Target/Target_Total) %>% + dplyr::mutate( + Target_Total = sum(Target), + Percent_Total = Target / Target_Total + ) %>% dplyr::ungroup() print(back_test_tbl) @@ -44,12 +48,13 @@ message("Overall Model Accuracy by Combo") suppressMessages(best_model <- back_test_tbl %>% dplyr::group_by(Combo, Model) %>% dplyr::mutate(Weighted_MAPE = MAPE * Percent_Total) %>% - dplyr::summarise(MAPE = mean(MAPE), - Weighted_MAPE = sum(Weighted_MAPE)) %>% + dplyr::summarise( + MAPE = mean(MAPE), + Weighted_MAPE = sum(Weighted_MAPE) + ) %>% dplyr::ungroup()) print(best_model) - ``` During the simple back test process above, arima seems to be the better model from a pure MAPE perspective, but ETS ends up being the winner when using weighted MAPE. The benefits of weighted MAPE allow finnts to find the optimal model that performs the best on the biggest components of a forecast, which comes with the added benefit of putting more weight on more recent observations since those are more likely to have larger target values then ones further into the past. Another way of putting more weight on more recent observations is how Finn overlaps its back testing scenarios. This means the most recent observations are tested for accuracy in different forecast horizons (H=1, H=2, etc). More info on this in the back testing vignette. diff --git a/vignettes/external-regressors.Rmd b/vignettes/external-regressors.Rmd index d89387fa..ebd0f8e4 100644 --- a/vignettes/external-regressors.Rmd +++ b/vignettes/external-regressors.Rmd @@ -42,11 +42,11 @@ Below is an example of how you can set up your input_data to leverage future val library(dplyr) tibble( - Combo = c("Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1"), - Date = c("2020-01-01", "2020-02-01", "2020-03-01", "2020-04-01", "2020-05-01", "2020-06-01", "2020-07-01", "2020-08-01", "2020-09-01", "2020-10-01", "2020-11-01", "2020-12-01", "2021-01-01", "2021-02-01", "2021-03-01"), - Target = c(10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120, NA, NA, NA), - Holiday = c("New Years", "Valentines Day", "None", "Easter", "None", "None", "4th of July", "None", "Labor Day", "Halloween", "Thanksgiving", "Christmas", "New Years", "Valentines Day", "None"), - GDP = c(5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, NA, NA, NA), + Combo = c("Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1"), + Date = c("2020-01-01", "2020-02-01", "2020-03-01", "2020-04-01", "2020-05-01", "2020-06-01", "2020-07-01", "2020-08-01", "2020-09-01", "2020-10-01", "2020-11-01", "2020-12-01", "2021-01-01", "2021-02-01", "2021-03-01"), + Target = c(10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120, NA, NA, NA), + Holiday = c("New Years", "Valentines Day", "None", "Easter", "None", "None", "4th of July", "None", "Labor Day", "Halloween", "Thanksgiving", "Christmas", "New Years", "Valentines Day", "None"), + GDP = c(5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, NA, NA, NA), Sales_Pipeline = c(100, 110, 120, 130, 140, 150, 160, 170, 180, 190, 200, 210, 220, 230, 240) ) %>% dplyr::mutate(Date = as.Date(Date)) diff --git a/vignettes/feature-engineering.Rmd b/vignettes/feature-engineering.Rmd index 2cbd38dc..02122060 100644 --- a/vignettes/feature-engineering.Rmd +++ b/vignettes/feature-engineering.Rmd @@ -74,7 +74,6 @@ In addition to the standard approaches above, finnts also does two different way In the first recipe, referred to as "R1" in default finnts models, by default takes a single step horizon approach. Meaning all of the engineered target and external regressor features are used but the lags cannot be less than the forecast horizon. For example, a monthly data set with a forecast horizon of 3, finnts will take engineered features like lags and rolling window features but only use those lags that are for periods equal to or greater than 3. You can also run a multistep horizon approach by setting `multistep_horizon` to TRUE in `prep_models()`. The multistep approach will create features that can be used by specific multivariate models that optimize for each period in a forecast horizon. More on this in the "models used in finnts" vignette. Recursive forecasting is not supported in finnts multivariate machine learning models, since feeding forecast outputs as features to create another forecast adds complex layers of uncertainty that can easily spiral out of control and produce poor forecasts. NA values created by generating lag features are filled "up". This results in the first initial periods of a time series having some data leakage but the effect should be small if the time series is long enough. ```{r, message = FALSE} - library(finnts) hist_data <- timetk::m4_monthly %>% @@ -112,7 +111,6 @@ print(R1_prepped_data_tbl) The second recipe is referred to as "R2" in default finnts models. It takes a very different approach than the "R1" recipe. For a 3 month forecast horizon on a monthly dataset, target and rolling window features are created depending on the horizon period. They are also constrained to be equal or less than the forecast horizon. In the below example, "Origin" and "Horizon" features are created for each time period. This results in duplicating rows in the original data set to create new features that are now specific to each horizon period. This helps the default finnts models find new unique relationships to model, when compared to a more formal approach in "R1". NA values created by generating lag features are filled "up". ```{r, message = FALSE} - library(finnts) hist_data <- timetk::m4_monthly %>% diff --git a/vignettes/finnts.Rmd b/vignettes/finnts.Rmd index 0915bf17..32060018 100644 --- a/vignettes/finnts.Rmd +++ b/vignettes/finnts.Rmd @@ -10,8 +10,8 @@ vignette: > ```{r, include = FALSE} knitr::opts_chunk$set( collapse = TRUE, - comment = "#>", - warning = FALSE, + comment = "#>", + warning = FALSE, message = FALSE ) library(finnts) @@ -62,18 +62,15 @@ The above data set contains 4 individual time series, identified using the "id" Before we call the Finn forecast function. Let's first set up some run information using `set_run_info()`, this helps log all components of our Finn forecast successfully. ```{r, message = FALSE, eval = hist_data, error=FALSE, warning = FALSE, echo=T, eval = TRUE} - run_info <- set_run_info( - experiment_name = "finn_forecast", + experiment_name = "finn_forecast", run_name = "test_run" ) - ``` Calling the "forecast_time_series" function is the easiest part. In this example we will be running just two models. ```{r, message = FALSE, eval = hist_data, error=FALSE, warning = FALSE, echo=T, eval = TRUE} - # no need to assign it to a variable, since all of the outputs are written to disk :) forecast_time_series( run_info = run_info, @@ -82,8 +79,8 @@ forecast_time_series( target_variable = "value", date_type = "month", forecast_horizon = 3, - back_test_scenarios = 6, - models_to_run = c("arima", "ets"), + back_test_scenarios = 6, + models_to_run = c("arima", "ets"), return_data = FALSE ) ``` @@ -93,7 +90,6 @@ forecast_time_series( ### Initial Finn Outputs ```{r, message = FALSE, eval = finn_output, message = FALSE, eval = FALSE, echo=T} - finn_output_tbl <- get_forecast_data(run_info = run_info) print(finn_output_tbl) @@ -102,7 +98,6 @@ print(finn_output_tbl) ### Future Forecast ```{r, message = FALSE, eval = finn_output, message = FALSE, eval = FALSE, echo=T} - future_forecast_tbl <- finn_output_tbl %>% dplyr::filter(Run_Type == "Future_Forecast") @@ -142,13 +137,17 @@ print(trained_model_tbl) ### Initial Prepped Data ```{r, message = FALSE, eval = finn_output, eval = FALSE, echo=T} -R1_prepped_data_tbl <- get_prepped_data(run_info = run_info, - recipe = "R1") +R1_prepped_data_tbl <- get_prepped_data( + run_info = run_info, + recipe = "R1" +) print(R1_prepped_data_tbl) -R2_prepped_data_tbl <- get_prepped_data(run_info = run_info, - recipe = "R2") +R2_prepped_data_tbl <- get_prepped_data( + run_info = run_info, + recipe = "R2" +) print(R2_prepped_data_tbl) ``` diff --git a/vignettes/forecast-components.Rmd b/vignettes/forecast-components.Rmd index b1a29e47..7a23c43c 100644 --- a/vignettes/forecast-components.Rmd +++ b/vignettes/forecast-components.Rmd @@ -10,8 +10,8 @@ vignette: > ```{r, include = FALSE} knitr::opts_chunk$set( collapse = TRUE, - comment = "#>", - warning = FALSE, + comment = "#>", + warning = FALSE, message = FALSE ) library(finnts) @@ -29,13 +29,15 @@ Let's get some example data and then set our Finn run info. library(finnts) hist_data <- timetk::m4_monthly %>% - dplyr::filter(date >= "2013-01-01", - id == "M2") %>% + dplyr::filter( + date >= "2013-01-01", + id == "M2" + ) %>% dplyr::rename(Date = date) %>% dplyr::mutate(id = as.character(id)) run_info <- set_run_info( - experiment_name = "finnts_fcst", + experiment_name = "finnts_fcst", run_name = "finn_sub_component_run" ) ``` @@ -45,20 +47,26 @@ run_info <- set_run_info( Clean and prepare our data before training models. We can even pull out our prepped data to see the features and transformations applied before models are trained. ```{r message=FALSE} -prep_data(run_info = run_info, - input_data = hist_data, - combo_variables = c("id"), - target_variable = "value", - date_type = "month", - forecast_horizon = 6) +prep_data( + run_info = run_info, + input_data = hist_data, + combo_variables = c("id"), + target_variable = "value", + date_type = "month", + forecast_horizon = 6 +) -R1_prepped_data_tbl <- get_prepped_data(run_info = run_info, - recipe = "R1") +R1_prepped_data_tbl <- get_prepped_data( + run_info = run_info, + recipe = "R1" +) print(R1_prepped_data_tbl) -R2_prepped_data_tbl <- get_prepped_data(run_info = run_info, - recipe = "R2") +R2_prepped_data_tbl <- get_prepped_data( + run_info = run_info, + recipe = "R2" +) print(R2_prepped_data_tbl) ``` @@ -70,12 +78,16 @@ Now that our data is prepared for modeling, let's now train some models. First w Then we can kick off training each model on our data. ```{r, message = FALSE} -prep_models(run_info = run_info, - models_to_run = c("arima", "ets", "glmnet"), - num_hyperparameters = 2) +prep_models( + run_info = run_info, + models_to_run = c("arima", "ets", "glmnet"), + num_hyperparameters = 2 +) -train_models(run_info = run_info, - run_global_models = FALSE) +train_models( + run_info = run_info, + run_global_models = FALSE +) ``` ## Train Ensemble Models @@ -99,7 +111,6 @@ final_models(run_info = run_info) Finally we can now retrieve the forecast results from this Finn run. ```{r, message = FALSE} - finn_output_tbl <- get_forecast_data(run_info = run_info) print(finn_output_tbl) diff --git a/vignettes/hierarchical-forecasting.Rmd b/vignettes/hierarchical-forecasting.Rmd index f358456d..7f617130 100644 --- a/vignettes/hierarchical-forecasting.Rmd +++ b/vignettes/hierarchical-forecasting.Rmd @@ -27,15 +27,14 @@ message("Standard Hierarchical Time Series Data") hts <- tibble( Continent = c("North America", "North America", "North America", "North America", "North America", "North America", "North America", "North America", "North America"), - Country = c("United States", "United States", "United States", "United States", "United States", "United States", "Mexico", "Mexico", "Mexico"), - City = c("Kansas City", "Kansas City", "Kansas City", "Seattle", "Seattle", "Seattle", "Mexico City", "Mexico City", "Mexico City"), - Date = c("2020-01-01", "2020-02-01", "2020-03-01", "2020-01-01", "2020-02-01", "2020-03-01", "2020-01-01", "2020-02-01", "2020-03-01"), + Country = c("United States", "United States", "United States", "United States", "United States", "United States", "Mexico", "Mexico", "Mexico"), + City = c("Kansas City", "Kansas City", "Kansas City", "Seattle", "Seattle", "Seattle", "Mexico City", "Mexico City", "Mexico City"), + Date = c("2020-01-01", "2020-02-01", "2020-03-01", "2020-01-01", "2020-02-01", "2020-03-01", "2020-01-01", "2020-02-01", "2020-03-01"), Target = c(100, 250, 320, 80, 200, 270, 50, 80, 120) ) %>% dplyr::mutate(Date = as.Date(Date)) print(hts) - ``` In the above example, "City" was the lowest level of the hierarchy, which feeds into "Country", which then feeds into "Continent". Finn will take this data and will forecast by City, total Country, and total Continent. After each model is ran for every level in the hierarchy, the best model is chosen at each level, then the "Best Model" and every other model is reconciled back down to the lowest level. @@ -50,16 +49,15 @@ suppressMessages(library(dplyr)) message("Grouped Hierarchical Time Series Data") gts <- tibble( - Country = c("United States", "United States", "United States", "United States", "United States", "United States", "Mexico", "Mexico", "Mexico", "Mexico", "Mexico", "Mexico"), - Segment = c("Enterprise", "Enterprise", "Enterprise", "Public Sector", "Public Sector", "Public Sector", "Enterprise", "Enterprise", "Enterprise", "Enterprise", "Enterprise", "Enterprise"), - Product = c("Coffee", "Coffee", "Coffee", "Coffee", "Coffee", "Coffee", "Coffee", "Coffee", "Coffee", "Tea", "Tea", "Tea"), - Date = c("2020-01-01", "2020-02-01", "2020-03-01", "2020-01-01", "2020-02-01", "2020-03-01", "2020-01-01", "2020-02-01", "2020-03-01", "2020-01-01", "2020-02-01", "2020-03-01"), + Country = c("United States", "United States", "United States", "United States", "United States", "United States", "Mexico", "Mexico", "Mexico", "Mexico", "Mexico", "Mexico"), + Segment = c("Enterprise", "Enterprise", "Enterprise", "Public Sector", "Public Sector", "Public Sector", "Enterprise", "Enterprise", "Enterprise", "Enterprise", "Enterprise", "Enterprise"), + Product = c("Coffee", "Coffee", "Coffee", "Coffee", "Coffee", "Coffee", "Coffee", "Coffee", "Coffee", "Tea", "Tea", "Tea"), + Date = c("2020-01-01", "2020-02-01", "2020-03-01", "2020-01-01", "2020-02-01", "2020-03-01", "2020-01-01", "2020-02-01", "2020-03-01", "2020-01-01", "2020-02-01", "2020-03-01"), Target = c(10, 20, 30, 5, 8, 11, 20, 23, 27, 50, 55, 60) ) %>% dplyr::mutate(Date = as.Date(Date)) print(gts) - ``` It would be hard to aggregate the above data in a traditional hierarchy. The same products are found in different segments and countries, also the same segments are found in multiple countries. Finn will follow a similar modeling process as the one described for a traditional hierarchy, but instead will create forecasts at the below levels. diff --git a/vignettes/models-used-in-finnts.Rmd b/vignettes/models-used-in-finnts.Rmd index 0e5eae27..fb2da330 100644 --- a/vignettes/models-used-in-finnts.Rmd +++ b/vignettes/models-used-in-finnts.Rmd @@ -22,7 +22,7 @@ reactable::reactable( data.frame() %>% rbind(data.frame(Model = "arima", Type = "univariate, local", Underlying.Package = "modeltime, forecast", Description = "Regression model that is based on finding relationships between lagged values of the target variable you are trying to forecast.")) %>% rbind(data.frame(Model = "arima-boost", Type = "multivariate, local", Underlying.Package = "modeltime, forecast, xgboost", Description = "Arima model (refer to arima) that models the trend compoent of target variable, then uses xgboost model (refer to xgboost) to train on the remaining residuals.")) %>% - rbind(data.frame(Model = "arimax", Type = "multivariate, local", Underlying.Package = "modeltime, forecast", Description = "ARIMA model that incorporates external regressors and other engineered features.")) %>% + rbind(data.frame(Model = "arimax", Type = "multivariate, local", Underlying.Package = "modeltime, forecast", Description = "ARIMA model that incorporates external regressors and other engineered features.")) %>% rbind(data.frame(Model = "cubist", Type = "multivariate, local, global, ensemble", Underlying.Package = "rules", Description = "Hybrid of tree based and linear regression approach. Many decision trees are built, but regression coefficients are used at each terminal node instead of averging values in other tree based approaches.")) %>% rbind(data.frame(Model = "croston", Type = "univariate, local", Underlying.Package = "modeltime, forecast", Description = "Useful for intermittent demand forecasting, aka when there are a lot of periods of zero values. Involves simple exponential smoothing on non-zero values of target variable and another application of seasonal exponential smoothing on periods between non-zero elements of the target variable. Refer to ets for more details on exponential smoothing.")) %>% rbind(data.frame(Model = "ets", Type = "univariate, local", Underlying.Package = "modeltime, forecast", Description = "Forecasts produced using exponential smoothing methods are weighted averages of past observations, with the weights decaying exponentially as the observations get older. Exponential smoothing models try to forecast the components of a time series which can be broken down in to error, trend, and seasonality. These components can be forecasted separately then either added or multiplied together to get the final forecast output.")) %>% @@ -41,8 +41,7 @@ reactable::reactable( rbind(data.frame(Model = "svm-rbf", Type = "multivariate, local, global, ensemble", Underlying.Package = "parsnip, kernlab", Description = "Uses a nonlinear function, specifically a radial basis function, to create a regression line of the target variable.")) %>% rbind(data.frame(Model = "tbats", Type = "univariate, local", Underlying.Package = "modeltime, forecast", Description = "A spin off of the traditional ets model (refer to ets), with some additional components to capture multiple seasonalities.")) %>% rbind(data.frame(Model = "theta", Type = "univariate, local", Underlying.Package = "modeltime, forecast", Description = "Theta is similar to exponential smoothing (refer to ets) but with another component called drift. Adding drift to exponential smoothing allows the forecast to increase or decrease over time, where the amount of change over time (called the drift) is set to be the average change seen within the historical data.")) %>% - rbind(data.frame(Model = "xgboost", Type = "multivariate, local, global, ensemble", Underlying.Package = "parsnip, xgboost", Description = "Builds many decision trees (similar to random forests), but predictions that are initially inaccurate are applied more weight in subsequent training rounds to increase accuracy across all predictions.")) - , + rbind(data.frame(Model = "xgboost", Type = "multivariate, local, global, ensemble", Underlying.Package = "parsnip, xgboost", Description = "Builds many decision trees (similar to random forests), but predictions that are initially inaccurate are applied more weight in subsequent training rounds to increase accuracy across all predictions.")), defaultColDef = colDef( header = function(value) gsub(".", " ", value, fixed = TRUE), cell = function(value) format(value, nsmall = 1), @@ -51,12 +50,11 @@ reactable::reactable( headerStyle = list(background = "#f7f7f8") ), columns = list( - Description = colDef(minWidth = 140, align = "left") # overrides the default + Description = colDef(minWidth = 140, align = "left") # overrides the default ), bordered = TRUE, highlight = TRUE ) - ``` ### Univariate vs Multivariate Models diff --git a/vignettes/parallel-processing.Rmd b/vignettes/parallel-processing.Rmd index 2a2b8da3..b9ecc616 100644 --- a/vignettes/parallel-processing.Rmd +++ b/vignettes/parallel-processing.Rmd @@ -49,8 +49,8 @@ hist_data <- timetk::m4_monthly %>% data_sdf <- sparklyr::copy_to(sc, hist_data, "data_sdf", overwrite = TRUE) run_info <- set_run_info( - experiment_name = "finn_fcst", - run_name = "spark_run_1", + experiment_name = "finn_fcst", + run_name = "spark_run_1", path = "/dbfs/mnt/example/folder" # important that you mount an ADLS path ) @@ -60,16 +60,18 @@ forecast_time_series( combo_variables = c("id"), target_variable = "value", date_type = "month", - forecast_horizon = 3, - parallel_processing = "spark", + forecast_horizon = 3, + parallel_processing = "spark", return_data = FALSE ) # return the outputs as a spark data frame -finn_output_tbl <- get_forecast_data(run_info = run_info, - return_type = "sdf") +finn_output_tbl <- get_forecast_data( + run_info = run_info, + return_type = "sdf" +) ``` The above example runs each time series on a separate core on a spark cluster. You can also submit multiple time series where each time series runs on a separate spark executor (VM) and then leverage all of the cores on that executor to run things like hyperparameter tuning or model refitting in parallel. This creates two levels of parallelization. One at the time series level, then another when doing things like hyperparameter tuning within a specific time series. To do that set `inner_parallel` to TRUE in `forecast_time_series()`. Also make sure that you adjust the number of spark executor cores to 1, that ensures that only 1 time series runs on an executor at a time. Leverage the "spark.executor.cores" argument when configuring your spark connection. This can be done using [sparklyr](https://spark.rstudio.com/guides/connections#:~:text=In%20sparklyr%2C%20Spark%20properties%20can%20be%20set%20by,customized%20as%20shown%20in%20the%20example%20code%20below.) or within the cluster manager itself within the Azure resource. Use the "num_cores" argument in the "forecast_time_series" function to control how many cores should be used within an executor when running things like hyperparameter tuning. -`forecast_time_series()` will be looking for a variable called "sc" to use when submitting tasks to the spark cluster, so make sure you use that as the variable name when connecting to spark. Also it's important that you mount your spark session to an Azure Data Lake Storage (ADLS) account, and provide the mounted path to where you'd like your Finn results to be written to within `set_run_info()`. \ No newline at end of file +`forecast_time_series()` will be looking for a variable called "sc" to use when submitting tasks to the spark cluster, so make sure you use that as the variable name when connecting to spark. Also it's important that you mount your spark session to an Azure Data Lake Storage (ADLS) account, and provide the mounted path to where you'd like your Finn results to be written to within `set_run_info()`.