diff --git a/docs/toolbox.generated/Fine_Tuning.ray_fine_tuning_job.rst b/docs/toolbox.generated/Fine_Tuning.ray_fine_tuning_job.rst index 8aff7028c9..f6e9797d0e 100644 --- a/docs/toolbox.generated/Fine_Tuning.ray_fine_tuning_job.rst +++ b/docs/toolbox.generated/Fine_Tuning.ray_fine_tuning_job.rst @@ -92,8 +92,6 @@ Parameters * The number of GPUs to request for the fine-tuning job -* default value: ``1`` - ``memory`` diff --git a/projects/fine_tuning/testing/config.yaml b/projects/fine_tuning/testing/config.yaml index 086844dcb8..960a31ac2e 100644 --- a/projects/fine_tuning/testing/config.yaml +++ b/projects/fine_tuning/testing/config.yaml @@ -312,16 +312,26 @@ ci_presets: tests.fine_tuning.fms.enabled: false tests.fine_tuning.ray.enabled: true tests.capture_prom: false # not needed for the time being - tests.visualize: false # not needed for the time being - tests.capture_state: false # not needed for the time being tests.fine_tuning.test_settings.hyper_parameters: {} + matbench.lts.generate: false + tests.fine_tuning.test_settings.name: ray ray_bench: extends: [ray, no_model] + matbench.config_file: ray_benchmark.yaml tests.fine_tuning.ray.workload: ray-benchmark + tests.fine_tuning.test_settings.hyper_parameters: num_samples: 10 + ray_bench_scale: + extends: [ray_bench] + tests.fine_tuning.matbenchmarking.enabled: true + tests.fine_tuning.matbenchmarking.stop_on_error: false + tests.fine_tuning.test_settings.worker_replicas: [2, 8, 16, 32] + tests.fine_tuning.test_settings.hyper_parameters.num_samples: [20, 50, 100, 150] + tests.fine_tuning.test_settings.gpu: 0 + # --- cluster_ibm_dgx: diff --git a/projects/fine_tuning/toolbox/fine_tuning.py b/projects/fine_tuning/toolbox/fine_tuning.py index 6bda0233b8..d478b102b7 100644 --- a/projects/fine_tuning/toolbox/fine_tuning.py +++ b/projects/fine_tuning/toolbox/fine_tuning.py @@ -143,7 +143,7 @@ def ray_fine_tuning_job( dataset_prepare_cache_only=False, container_image="quay.io/rhoai/ray:2.35.0-py39-cu121-torch24-fa26", ray_version="2.35.0", - gpu=1, + gpu=0, memory=10, cpu=1, request_equals_limits=False, diff --git a/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/defaults/main/config.yml b/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/defaults/main/config.yml index 4d0ee0c0a3..ac72a038bf 100644 --- a/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/defaults/main/config.yml +++ b/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/defaults/main/config.yml @@ -42,7 +42,7 @@ fine_tuning_ray_fine_tuning_job_container_image: quay.io/rhoai/ray:2.35.0-py39-c fine_tuning_ray_fine_tuning_job_ray_version: 2.35.0 # the number of GPUs to request for the fine-tuning job -fine_tuning_ray_fine_tuning_job_gpu: 1 +fine_tuning_ray_fine_tuning_job_gpu: 0 # the number of RAM gigs to request for to the fine-tuning job (in Gigs) fine_tuning_ray_fine_tuning_job_memory: 10 diff --git a/projects/fine_tuning/visualizations/fine_tuning/data/ray_benchmark.yaml b/projects/fine_tuning/visualizations/fine_tuning/data/ray_benchmark.yaml new file mode 100644 index 0000000000..f7ad08d7d3 --- /dev/null +++ b/projects/fine_tuning/visualizations/fine_tuning/data/ray_benchmark.yaml @@ -0,0 +1,8 @@ +visualize: +- id: scale_test + generate: + - "report: Error report" + #- "report: Ray Benchmark Progress" + #- "report: Ray Benchmark Summary" + - "report: LTS Documentation" + - "report: KPI Table Report" diff --git a/projects/fine_tuning/visualizations/fine_tuning/plotting/__init__.py b/projects/fine_tuning/visualizations/fine_tuning/plotting/__init__.py index e66d959bf3..ed53bbaba8 100644 --- a/projects/fine_tuning/visualizations/fine_tuning/plotting/__init__.py +++ b/projects/fine_tuning/visualizations/fine_tuning/plotting/__init__.py @@ -1,5 +1,7 @@ from . import error_report from . import sfttrainer +#from . import ray_benchmark + import projects.matrix_benchmarking.visualizations.helpers.plotting.lts_documentation as lts_documentation import projects.matrix_benchmarking.visualizations.helpers.plotting.kpi_table as kpi_table @@ -9,3 +11,4 @@ def register(): lts_documentation.register() sfttrainer.register() kpi_table.register() + #ray_benchmark.register() diff --git a/projects/fine_tuning/visualizations/fine_tuning/plotting/error_report.py b/projects/fine_tuning/visualizations/fine_tuning/plotting/error_report.py index 62dbf9f5aa..137f9df46a 100644 --- a/projects/fine_tuning/visualizations/fine_tuning/plotting/error_report.py +++ b/projects/fine_tuning/visualizations/fine_tuning/plotting/error_report.py @@ -70,8 +70,8 @@ def _get_test_setup(entry): setup_info += [html.Li([f"Test UUID:", html.Code(entry.results.test_uuid, style={"white-space": "pre-wrap"})])] - setup_info += [html.Li([f"Job configuration:", - html.A(html.Code("config_final.json"), href=artifacts_basedir / entry.results.locations.tuning_config_file, target="_blank"), + setup_info += [html.Li([f"Workload configuration:", + html.A(html.Code("config_final.json"), href=artifacts_basedir / entry.results.locations.workload_config_file, target="_blank"), html.Code(yaml.dump(entry.results.job_config), style={"white-space": "pre-wrap"})])] setup_info += [html.Li([f"Job execution"])] @@ -82,9 +82,15 @@ def _get_test_setup(entry): if entry.results.finish_reason.message: exec_info += [html.Li([f"Exit message:", html.Code(entry.results.finish_reason.message, style={"white-space": "pre-wrap"})])] - metrics = yaml.safe_load(json.dumps(entry.results.sfttrainer_metrics, default=functools.partial(json_dumper, strict=False))) - if metrics.get("progress") or metrics.get("summary"): - exec_info += [html.Li([f"Fine-tuning metrics:", html.Code(yaml.dump(metrics), style={"white-space": "pre-wrap"})])] + if entry.results.locations.has_fms: + metrics = yaml.safe_load(json.dumps(entry.results.sfttrainer_metrics, default=functools.partial(json_dumper, strict=False))) + if metrics.get("progress") or metrics.get("summary"): + exec_info += [html.Li([f"Fine-tuning metrics:", html.Code(yaml.dump(metrics), style={"white-space": "pre-wrap"})])] + + elif entry.results.locations.has_ray: + metrics = yaml.safe_load(json.dumps(entry.results.ray_metrics, default=functools.partial(json_dumper, strict=False))) + if metrics.get("progress") or metrics.get("summary"): + exec_info += [html.Li([f"Fine-tuning metrics:", html.Code(yaml.dump(metrics), style={"white-space": "pre-wrap"})])] if entry.results.locations.job_logs: exec_info += [html.Li(html.A("Job logs", href=artifacts_basedir / entry.results.locations.job_logs, target="_blank"))] diff --git a/projects/fine_tuning/visualizations/fine_tuning/plotting/ray_benchmark.py b/projects/fine_tuning/visualizations/fine_tuning/plotting/ray_benchmark.py new file mode 100644 index 0000000000..235c4438c4 --- /dev/null +++ b/projects/fine_tuning/visualizations/fine_tuning/plotting/ray_benchmark.py @@ -0,0 +1,300 @@ +from collections import defaultdict +import re +import logging +import datetime +import math +import copy +import numbers +import numpy +import statistics as stats + +import plotly.subplots +import plotly.graph_objs as go +import pandas as pd +import plotly.express as px +from dash import html + +import matrix_benchmarking.plotting.table_stats as table_stats +import matrix_benchmarking.common as common + +from . import error_report, report + +def register(): + RayBenchmarkSummary() + RayBenchmarkProgress() + + +def generateRaySummaryData(entries, x_key, _variables, summary_key, compute_speedup=False, filter_key=None, filter_value=None, y_lower_better=True): + data = [] + + variables = [v for v in _variables if v != x_key] + if not variables and x_key != "gpu" and x_key is not None: + variables += [x_key] + + + for entry in entries: + if filter_key is not None and entry.get_settings()[filter_key] != filter_value: + continue + + datum = dict() + if x_key == "gpu": + datum[x_key] = entry.results.allocated_resources.gpu + elif x_key is None: + datum[x_key] = "Single entry" + else: + datum[x_key] = entry.settings.__dict__[x_key] + + datum[summary_key] = getattr(entry.results.sfttrainer_metrics.summary, summary_key, None) + + datum["name"] = entry.get_name(variables).replace("hyper_parameters.", "") + datum["text"] = "{:.2f}".format(datum[summary_key]) if datum[summary_key] is not None else "None" + datum["is_computed"] = False + + data.append(datum) + + if not compute_speedup: + return data, None + + ref = None + for datum in data: + if datum[x_key] == 1: + ref = datum[summary_key] + + if not ref: + return data, None + + for src_datum in data[:]: + + perfect_datum = src_datum.copy() + perfect_datum["is_computed"] = True + perfect_datum["name"] = (perfect_datum["name"] + " perfect scaling").strip() + perfect_datum[summary_key] = value = ref / src_datum[x_key] \ + if y_lower_better else ref * src_datum[x_key] + + if src_datum[x_key] != 1: + speedup = ref / src_datum[summary_key] + efficiency = speedup / src_datum[x_key] + perfect_datum["text"] = f"{value:.2f}
speedup: {speedup:.1f}
efficiency: {efficiency:.2f}" + + data.append(perfect_datum) + + if not src_datum["name"]: + src_datum["name"] = summary_key + + return data, ref + + +class RayBenchmarkSummary(): + def __init__(self): + self.name = "Ray Benchmark Summary" + self.id_name = self.name + + table_stats.TableStats._register_stat(self) + common.Matrix.settings["stats"].add(self.name) + + def do_hover(self, meta_value, variables, figure, data, click_info): + return "nothing" + + def do_plot(self, ordered_vars, settings, setting_lists, variables, cfg): + cfg__summary_key = cfg.get("summary_key", False) + + cfg__filter_key = cfg.get("filter_key", None) + cfg__filter_value = cfg.get("filter_value", False) + cfg__x_key = cfg.get("x_key", None) + + from ..store import parsers + summary_key_properties = parsers.SFT_TRAINER_SUMMARY_KEYS[cfg__summary_key] + y_lower_better = summary_key_properties.lower_better + + if not cfg__summary_key: + raise ValueError("'summary_key' is a mandatory parameter ...") + + entries = common.Matrix.all_records(settings, setting_lists) + + has_gpu = "gpu" in ordered_vars and cfg__filter_key != "gpu" + + x_key = cfg__x_key + if x_key is None: + if has_gpu: + x_key = "gpu" + elif "model_name" in ordered_vars: + x_key = "model_name" + elif ordered_vars: + x_key = ordered_vars[0] + else: + x_key = None + + compute_speedup = has_gpu + + data, has_speedup = generateSFTTrainerSummaryData(entries, x_key, variables, cfg__summary_key, compute_speedup, cfg__filter_key, cfg__filter_value, y_lower_better) + df = pd.DataFrame(data) + + if df.empty: + return None, "Not data available ..." + + if x_key is not None: + df = df.sort_values(by=[x_key], ascending=False) + + y_key = cfg__summary_key + + if has_gpu or has_speedup: + do_line_plot = True + + elif len(variables) == 1: + do_line_plot = all(isinstance(v, numbers.Number) for v in list(variables.values())[0]) + elif x_key is None: + do_line_plot = False + elif x_key.startswith("hyper_parameters."): + do_line_plot = True + else: + do_line_plot = False + + text = None if len(variables) > 3 else "text" + if do_line_plot: + color = None if (len(variables) == 1 and not has_speedup) else "name" + fig = px.line(df, hover_data=df.columns, x=x_key, y=y_key, color=color, text=text) + + for i in range(len(fig.data)): + fig.data[i].update(mode='lines+markers+text') + fig.update_yaxes(rangemode='tozero') + + fig.update_traces(textposition='top center') + + else: + if x_key is not None: + df = df.sort_values(by=["name", x_key], ascending=True) + color = None if (len(variables) == 1) else "name" + fig = px.bar(df, hover_data=df.columns, x=x_key, y=y_key, color=color, barmode='group', text=text) + + if has_gpu: + fig.update_xaxes(title="Number of GPUs used for the fine-tuning") + else: + fig.update_xaxes(title=x_key) + + y_title = getattr(summary_key_properties, "title", "speed") + y_units = summary_key_properties.units + x_name = (x_key or "single expe").replace("hyper_parameters.", "") + + y_lower_better = summary_key_properties.lower_better + what = f", in {y_units}" + + y_title = f"Fine-tuning {y_title}{what}. " + title = y_title + "
"+("Lower is better" if y_lower_better else "Higher is better") + + if cfg__filter_key == "gpu": + gpu_count = cfg__filter_value + title += f". {gpu_count} GPU{'s' if gpu_count > 1 else ''}." + + fig.update_yaxes(title=("❮ " if y_lower_better else "") + y_title + (" ❯" if not y_lower_better else "")) + fig.update_layout(title=title, title_x=0.5,) + fig.update_layout(legend_title_text="Configuration") + + fig.update_xaxes(title=x_name) + # ❯ or ❮ + + msg = [] + + values_df = df[y_key][df["is_computed"] != True] + + min_row_idx = values_df.idxmin() + max_row_idx = values_df.idxmax() + + if any(map(numpy.isnan, [min_row_idx, max_row_idx])): + return fig, ["Max or Min is NaN"] + + min_count = values_df[min_row_idx] + max_count = values_df[max_row_idx] + + if has_gpu: + min_name = f"{min_count} GPU" + ("s" if min_count > 1 else "") + max_name = f"{max_count} GPU" + ("s" if max_count > 1 else "") + else: + min_name = min_count + max_name = max_count + + if len(data) > 1: + if y_lower_better: + fastest = df[y_key][min_row_idx] + slowest = df[y_key][max_row_idx] + else: + fastest = df[y_key][max_row_idx] + slowest = df[y_key][min_row_idx] + + slower = (fastest-slowest)/fastest + faster = (fastest-slowest)/slowest + msg.append(f"Fastest: {fastest:.2f} {y_units} ({abs(faster)*100:.0f}% faster, best)") + msg.append(html.Br()) + msg.append(f"Slowest: {slowest:.2f} {y_units} ({abs(slower)*100:.0f}% slower)") + + return fig, msg + + +def generateRayProgressData(entries, x_key, variables, progress_key): + data = [] + + for entry in entries: + progress_entries = entry.results.sfttrainer_metrics.progress + entry_name = entry.get_name(variables) + + for progress in progress_entries: + datum = dict() + datum[x_key] = getattr(progress, x_key) + datum[progress_key] = getattr(progress, progress_key, None) + datum["name"] = entry_name + data.append(datum) + + return data + + +class RayBenchmarkProgress(): + def __init__(self): + self.name = "Ray Benchmark Progress" + self.id_name = self.name + + table_stats.TableStats._register_stat(self) + common.Matrix.settings["stats"].add(self.name) + + def do_hover(self, meta_value, variables, figure, data, click_info): + return "nothing" + + def do_plot(self, ordered_vars, settings, setting_lists, variables, cfg): + cfg__progress_key = cfg.get("progress_key", False) + + if not cfg__progress_key: + raise ValueError("'progress_key' is a mandatory parameter ...") + + from ..store import parsers + progress_key_properties = parsers.SFT_TRAINER_PROGRESS_KEYS[cfg__progress_key] + + entries = common.Matrix.all_records(settings, setting_lists) + + x_key = "epoch" + + data = generateSFTTrainerProgressData(entries, x_key, variables, cfg__progress_key) + df = pd.DataFrame(data) + + if df.empty: + return None, "Not data available ..." + + df = df.sort_values(by=[x_key], ascending=False) + + y_key = cfg__progress_key + y_lower_better = progress_key_properties.lower_better + + fig = px.line(df, hover_data=df.columns, x=x_key, y=y_key, color="name") + + for i in range(len(fig.data)): + fig.data[i].update(mode='lines+markers+text') + fig.update_yaxes(rangemode='tozero') + + fig.update_xaxes(title="epochs") + + y_title = f"Training {y_key}. " + title = f"Fine-tuning '{y_key}' progress over the training {x_key}s" + title += "
"+("Lower is better" if y_lower_better else "Higher is better") + y_title += ("Lower is better" if y_lower_better else "Higher is better") + fig.update_yaxes(title=("❮ " if y_lower_better else "") + y_title + (" ❯" if not y_lower_better else "")) + fig.update_layout(title=title, title_x=0.5) + fig.update_layout(legend_title_text="Configuration") + + return fig, "" diff --git a/projects/fine_tuning/visualizations/fine_tuning/store/lts_parser.py b/projects/fine_tuning/visualizations/fine_tuning/store/lts_parser.py index 89f91022df..a6a354e7d9 100644 --- a/projects/fine_tuning/visualizations/fine_tuning/store/lts_parser.py +++ b/projects/fine_tuning/visualizations/fine_tuning/store/lts_parser.py @@ -36,6 +36,9 @@ def generate_lts_metadata(results, import_settings): def generate_lts_results(results): results_lts = types.SimpleNamespace() + if not results.locations.has_fms: + return results_lts + if not results.sfttrainer_metrics.summary or not results.sfttrainer_metrics.summary.__dict__: return results_lts @@ -78,11 +81,12 @@ def generate_lts_settings(lts_metadata, results, import_settings): lts_settings.ocp_version = results.ocp_version lts_settings.rhoai_version = results.rhods_info.full_version + lts_settings.container_image = results.job_config["container_image"].split("/")[-1] lts_settings.instance_type = results.test_config.get("clusters.sutest.compute.machineset.type") lts_settings.model_name = results.job_config["model_name"] - lts_settings.tuning_method = results.tuning_config.get("peft_method", "none") + lts_settings.tuning_method = results.workload_config.get("peft_method", "none") if lts_settings.tuning_method in ("none" , None): lts_settings.tuning_method = "full" @@ -94,17 +98,18 @@ def generate_lts_settings(lts_metadata, results, import_settings): lts_settings.replicas = replicas lts_settings.accelerators_per_replica = accelerators_per_replica lts_settings.accelerator_count = replicas * accelerators_per_replica - lts_settings.per_device_train_batch_size = results.tuning_config["per_device_train_batch_size"] - lts_settings.batch_size = results.tuning_config["per_device_train_batch_size"] * lts_settings.accelerator_count - lts_settings.max_seq_length = results.tuning_config["max_seq_length"] - - lts_settings.lora_rank = results.tuning_config.get("r") - lts_settings.lora_alpha = results.tuning_config.get("lora_alpha") - lts_settings.lora_dropout = results.tuning_config.get("lora_dropout") - lts_settings.lora_modules = ", ".join(sorted(results.tuning_config.get("target_modules", []))) or None - - lts_settings.dataset_name = results.job_config["dataset_name"] - lts_settings.dataset_replication = results.job_config["dataset_replication"] + if results.locations.has_fms: + lts_settings.per_device_train_batch_size = results.workload_config["per_device_train_batch_size"] + lts_settings.batch_size = results.workload_config["per_device_train_batch_size"] * lts_settings.accelerator_count + lts_settings.max_seq_length = results.workload_config["max_seq_length"] + + lts_settings.lora_rank = results.workload_config.get("r") + lts_settings.lora_alpha = results.workload_config.get("lora_alpha") + lts_settings.lora_dropout = results.workload_config.get("lora_dropout") + lts_settings.lora_modules = ", ".join(sorted(results.workload_config.get("target_modules", []))) or None + + lts_settings.dataset_name = results.job_config["dataset_name"] + lts_settings.dataset_replication = results.job_config["dataset_replication"] lts_settings.ci_engine = results.from_env.test.ci_engine lts_settings.run_id = results.from_env.test.run_id diff --git a/projects/fine_tuning/visualizations/fine_tuning/store/parsers.py b/projects/fine_tuning/visualizations/fine_tuning/store/parsers.py index 62227a0b6d..100c93fbd8 100644 --- a/projects/fine_tuning/visualizations/fine_tuning/store/parsers.py +++ b/projects/fine_tuning/visualizations/fine_tuning/store/parsers.py @@ -24,6 +24,7 @@ artifact_dirnames = types.SimpleNamespace() artifact_dirnames.CLUSTER_CAPTURE_ENV_DIR = "*__cluster__capture_environment" artifact_dirnames.FINE_TUNING_RUN_FINE_TUNING_DIR = "*__fine_tuning__run_fine_tuning_job" +artifact_dirnames.FINE_TUNING_RAY_FINE_TUNING_DIR = "*__fine_tuning__ray_fine_tuning_job" artifact_dirnames.RHODS_CAPTURE_STATE = "*__rhods__capture_state" artifact_paths = types.SimpleNamespace() # will be dynamically populated @@ -33,10 +34,15 @@ f"{artifact_dirnames.CLUSTER_CAPTURE_ENV_DIR}/_ansible.log", f"{artifact_dirnames.CLUSTER_CAPTURE_ENV_DIR}/nodes.json", f"{artifact_dirnames.CLUSTER_CAPTURE_ENV_DIR}/ocp_version.yml", + f"{artifact_dirnames.FINE_TUNING_RUN_FINE_TUNING_DIR}/src/config_final.json", f"{artifact_dirnames.FINE_TUNING_RUN_FINE_TUNING_DIR}/artifacts/pod.log", f"{artifact_dirnames.FINE_TUNING_RUN_FINE_TUNING_DIR}/artifacts/pod.json", f"{artifact_dirnames.FINE_TUNING_RUN_FINE_TUNING_DIR}/_ansible.play.yaml", + + f"{artifact_dirnames.FINE_TUNING_RAY_FINE_TUNING_DIR}/src/config_final.json", + f"{artifact_dirnames.FINE_TUNING_RAY_FINE_TUNING_DIR}/artifacts/pod.log", + f"{artifact_dirnames.RHODS_CAPTURE_STATE}/rhods.createdAt", f"{artifact_dirnames.RHODS_CAPTURE_STATE}/rhods.version", ] @@ -62,12 +68,21 @@ def parse_once(results, dirname): results.test_start_end_time = _parse_start_end_time(dirname) - results.sfttrainer_metrics = _parse_sfttrainer_logs(dirname) - results.allocated_resources = _parse_allocated_resources(dirname) - results.finish_reason = _parse_finish_reason(dirname) results.locations = _prepare_file_locations(dirname) - results.job_config = _parse_job_config(dirname) - results.tuning_config = _parse_tuning_config(dirname, results.locations.tuning_config_file) + + results.job_config = _parse_job_config(dirname, results.locations) + + results.workload_config = _parse_workload_config(dirname, results.locations) + + if results.locations.has_fms: + results.sfttrainer_metrics = _parse_fms_logs(dirname) + results.allocated_resources = _parse_fms_allocated_resources(dirname) + results.finish_reason = _parse_fms_finish_reason(dirname) + + if results.locations.has_ray: + results.ray_metrics = _parse_ray_logs(dirname) + results.allocated_resources = _parse_ray_allocated_resources(dirname) + results.finish_reason = _parse_ray_finish_reason(dirname) @helpers_store_parsers.ignore_file_not_found @@ -120,7 +135,7 @@ def _parse_start_end_time(dirname): @helpers_store_parsers.ignore_file_not_found -def _parse_sfttrainer_logs(dirname): +def _parse_fms_logs(dirname): sfttrainer_metrics = types.SimpleNamespace() sfttrainer_metrics.summary = types.SimpleNamespace() sfttrainer_metrics.progress = [] @@ -177,8 +192,9 @@ def parse_dataset_stats(data): return sfttrainer_metrics + @helpers_store_parsers.ignore_file_not_found -def _parse_allocated_resources(dirname): +def _parse_fms_allocated_resources(dirname): allocated_resources = types.SimpleNamespace() with open(register_important_file(dirname, artifact_paths.FINE_TUNING_RUN_FINE_TUNING_DIR / "artifacts/pod.json")) as f: pod_def = json.load(f) @@ -190,8 +206,14 @@ def _parse_allocated_resources(dirname): return allocated_resources + +@helpers_store_parsers.ignore_file_not_found +def _parse_ray_allocated_resources(dirname): + pass + + @helpers_store_parsers.ignore_file_not_found -def _parse_finish_reason(dirname): +def _parse_fms_finish_reason(dirname): finish_reason = types.SimpleNamespace() finish_reason.exit_code = None finish_reason.message = "Parsing did not complete" @@ -216,41 +238,85 @@ def _parse_finish_reason(dirname): return finish_reason +@helpers_store_parsers.ignore_file_not_found +def _parse_ray_finish_reason(dirname): + finish_reason = types.SimpleNamespace() + finish_reason.exit_code = None + finish_reason.message = "_parse_ray_finish_reason: not implemented" + + return finish_reason + + def _prepare_file_locations(dirname): locations = types.SimpleNamespace() - locations.job_logs = artifact_paths.FINE_TUNING_RUN_FINE_TUNING_DIR / "artifacts/pod.log" + locations.has_fms = artifact_paths.FINE_TUNING_RUN_FINE_TUNING_DIR is not None + locations.has_ray = artifact_paths.FINE_TUNING_RAY_FINE_TUNING_DIR is not None + + if locations.has_fms: + locations.job_dir = artifact_paths.FINE_TUNING_RUN_FINE_TUNING_DIR + locations.job_logs = artifact_paths.FINE_TUNING_RUN_FINE_TUNING_DIR / "artifacts/pod.log" + + elif locations.has_ray: + locations.job_dir = artifact_paths.FINE_TUNING_RAY_FINE_TUNING_DIR + locations.job_logs = artifact_paths.FINE_TUNING_RAY_FINE_TUNING_DIR / "artifacts/pod.log" + else: + logging.error("Couldn't fine the FMS nor Ray job directory ...") + locations.job_dir = None + locations.job_logs = None + job_logs_file = register_important_file(dirname, locations.job_logs) if not job_logs_file.exists(): locations.job_logs = None logging.info(f"Job log file {job_logs_file} does not exist ...") - locations.tuning_config_file = (job_logs_file.parent.parent / "src" / "config_final.json").relative_to(dirname) + locations.workload_config_file = locations.job_dir / "src" / "config_final.json" return locations @helpers_store_parsers.ignore_file_not_found -def _parse_job_config(dirname): +def _parse_job_config(dirname, locations): job_config = {} - PREFIX = "fine_tuning_run_fine_tuning_job_" + if locations.has_fms: + prefix = "fine_tuning_run_fine_tuning_job_" + elif locations.has_ray: + prefix = "fine_tuning_ray_fine_tuning_job_" - with open(register_important_file(dirname, artifact_paths.FINE_TUNING_RUN_FINE_TUNING_DIR / "_ansible.play.yaml")) as f: + with open(register_important_file(dirname, locations.job_dir / "_ansible.play.yaml")) as f: ansible_play = yaml.safe_load(f) for k, v in ansible_play[0]["vars"].items(): - if not k.startswith(PREFIX): continue + if not k.startswith(prefix): continue - job_config[k.replace(PREFIX, "")] = v + job_config[k.replace(prefix, "")] = v return job_config @helpers_store_parsers.ignore_file_not_found -def _parse_tuning_config(dirname, tuning_config_file_location): - with open(register_important_file(dirname, tuning_config_file_location)) as f: - tuning_config = json.load(f) +def _parse_workload_config(dirname, locations): + with open(register_important_file(dirname, locations.workload_config_file)) as f: + workload_config = json.load(f) + + return workload_config + + +@helpers_store_parsers.ignore_file_not_found +def _parse_ray_logs(dirname): + ray_metrics = types.SimpleNamespace() + ray_metrics.summary = types.SimpleNamespace() + ray_metrics.summary.time = None + + ray_metrics.progress = [] + + with open(register_important_file(dirname, artifact_paths.FINE_TUNING_RAY_FINE_TUNING_DIR / "artifacts/job_pod.log")) as f: + for line in f.readlines(): + if not line.startswith("---"): + continue + ray_metrics.summary.time = float(line.strip().split("::: ")[-1].split()[0]) + break - return tuning_config + return ray_metrics diff --git a/projects/matrix_benchmarking/visualizations/helpers/store/parsers.py b/projects/matrix_benchmarking/visualizations/helpers/store/parsers.py index ca166dd7d1..8b5f45366c 100644 --- a/projects/matrix_benchmarking/visualizations/helpers/store/parsers.py +++ b/projects/matrix_benchmarking/visualizations/helpers/store/parsers.py @@ -255,6 +255,13 @@ def extract_cluster_info(nodes_info): def parse_rhods_info(dirname, capture_state_dir, version_name=None): rhods_info = types.SimpleNamespace() + if capture_state_dir is None: + logging.error("parse_rhods_info: `capture_state_dir` not available, returning dummy values :/") + rhods_info.version = "not available" + rhods_info.createdAt_raw = "not available" + rhods_info.full_version = "0.0.0" + return rhods_info + with open(register_important_file(dirname, capture_state_dir / "rhods.version")) as f: rhods_info.version = f.read().strip()