diff --git a/components/google-cloud/google_cloud_pipeline_components/_implementation/llm/arbiter_preprocess.py b/components/google-cloud/google_cloud_pipeline_components/_implementation/llm/arbiter_preprocess.py index 344e8fe6dbd..1f226a42a51 100644 --- a/components/google-cloud/google_cloud_pipeline_components/_implementation/llm/arbiter_preprocess.py +++ b/components/google-cloud/google_cloud_pipeline_components/_implementation/llm/arbiter_preprocess.py @@ -56,7 +56,7 @@ def arbiter_preprocess( and responses. id_columns: The columns which distinguish unique evaluation examples. response_column_a: The column containing responses for model a. - response_column_b: The column containing responses for model a. + response_column_b: The column containing responses for model b. task: Task to evaluate. output_path: Path to write the path where preprocessed predictions are stored. diff --git a/components/google-cloud/google_cloud_pipeline_components/_implementation/llm/autosxs_metrics_computer.py b/components/google-cloud/google_cloud_pipeline_components/_implementation/llm/autosxs_metrics_computer.py index ede9a816f9e..f7bd53d9b77 100644 --- a/components/google-cloud/google_cloud_pipeline_components/_implementation/llm/autosxs_metrics_computer.py +++ b/components/google-cloud/google_cloud_pipeline_components/_implementation/llm/autosxs_metrics_computer.py @@ -31,16 +31,16 @@ def _resolve_image() -> str: @dsl.container_component def autosxs_metrics_computer( judgments_dir: str, - has_human_preference: bool, autosxs_metrics: dsl.Output[dsl.Metrics], # pylint: disable=unused-argument # pytype: disable=unsupported-operands gcp_resources: dsl.OutputPath(str), # pytype: disable=invalid-annotation + human_preference_column: str = '', ) -> dsl.ContainerSpec: # pylint: disable=g-doc-args """Compute AutoSXS metrics using judgments outputs from Arbiter. Args: judgments_dir: Path where store the Judgments. - has_human_preference: Boolean value. True if users provided human preference - data, otherwise false. + human_preference_column: The column containing ground truths. The default + value is an empty string if not be provided by users. Returns: autosxs_metrics: Autosxs win rate metrics and human alignment metrics. @@ -58,7 +58,7 @@ def autosxs_metrics_computer( '--', # Used to mark the start of component flags. 'autosxs_metrics', f'--judgments_dir={judgments_dir}', - f'--has_human_preference={has_human_preference}', + f'--human_preference_column={human_preference_column}', '--executor_input={{$.json_escape[1]}}', ], ), diff --git a/components/google-cloud/google_cloud_pipeline_components/_implementation/llm/batch_prediction_sxs.py b/components/google-cloud/google_cloud_pipeline_components/_implementation/llm/batch_prediction_sxs.py new file mode 100644 index 00000000000..c5839a6e1fb --- /dev/null +++ b/components/google-cloud/google_cloud_pipeline_components/_implementation/llm/batch_prediction_sxs.py @@ -0,0 +1,149 @@ +# Copyright 2024 The Kubeflow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Component for running LLM Batch Prediction jobs side-by-side.""" + +import os +from typing import Any, Dict, List + +from google_cloud_pipeline_components import _placeholders +from google_cloud_pipeline_components import utils as gcpc_utils +from google_cloud_pipeline_components._implementation.llm import utils +from kfp import dsl + + +def _resolve_image() -> str: + """Determines the image URI to create a container from.""" + return os.environ.get( + 'AUTOSXS_IMAGE_OVERRIDE' + ) or utils.get_default_image_uri('autosxs') + + +# pylint: disable=unused-argument,dangerous-default-value +@dsl.container_component +def batch_prediction_sxs( + display_name: str, + evaluation_dataset: str, + id_columns: List[str], + task: str, + autorater_prompt_parameters: Dict[str, Dict[str, str]], + response_column_a: str, + response_column_b: str, + preprocessed_evaluation_dataset: dsl.Output[dsl.Dataset], # pylint: disable=unused-argument # pytype: disable=unsupported-operands + preprocessed_evaluation_dataset_uri: dsl.OutputPath(str), # pylint: disable=unused-argument # pytype: disable=invalid-annotation + gcp_resources: dsl.OutputPath(str), # pytype: disable=invalid-annotation + metadata: dsl.OutputPath(Dict[str, Any]), # pytype: disable=invalid-annotation + model_a: str = '', + model_b: str = '', + model_a_prompt_parameters: Dict[str, Dict[str, str]] = {}, + model_b_prompt_parameters: Dict[str, Dict[str, str]] = {}, + model_a_parameters: Dict[str, str] = {}, + model_b_parameters: Dict[str, str] = {}, + human_preference_column: str = '', +) -> dsl.ContainerSpec: # pylint: disable=g-doc-args + """Runs up to two LLM Batch Prediction jobs side-by-side. + + Args: + display_name: Display name for the batch prediction job. + evaluation_dataset: GCS or BigQuery URIs representing a dataset of prompts + and responses. + id_columns: The columns which distinguish unique evaluation examples. + task: Task to evaluate. + autorater_prompt_parameters: Map of autorater prompt template parameters to + columns or templates. + response_column_a: The column containing responses for model a. + response_column_b: The column containing responses for model b. + model_a: A fully-qualified model resource name + (`projects/{project}/locations/{location}/models/{model}@{version}`) or + publisher model resource name (`publishers/{publisher}/models/{model}`). + This parameter is optional if Model A responses are specified. + model_b: A fully-qualified model resource name + (`projects/{project}/locations/{location}/models/{model}@{version}`) or + publisher model resource name (`publishers/{publisher}/models/{model}`). + This parameter is optional if Model B responses are specified. + model_a_prompt_parameters: Map of model A prompt template parameters to + columns or templates. + model_b_prompt_parameters: Map of model B prompt template parameters to + columns or templates. + model_a_parameters: The parameters that govern the predictions from model A, + such as temperature or maximum output tokens. + model_b_parameters: The parameters that govern the predictions from model B, + such as temperature or maximum output tokens. + human_preference_column: The column containing ground truths. The default + value is an empty string if not be provided by users. + + Returns: + preprocessed_evaluation_dataset: Dataset of the table containing the inputs + expected by the Arbiter. + preprocessed_evaluation_dataset_uri: URI of the table containing the inputs + expected by the Arbiter. + gcp_resources: Tracker for GCP resources created by this component. + metadata_path: Path to write the object that stores computed metrics + metadata for the task preprocess component. + """ + return gcpc_utils.build_serverless_customjob_container_spec( + project=_placeholders.PROJECT_ID_PLACEHOLDER, + location=_placeholders.LOCATION_PLACEHOLDER, + custom_job_payload=utils.build_payload( + display_name='batch_prediction_sxs', + machine_type='n1-standard-4', + image_uri=_resolve_image(), + args=[ + '--', # Used to mark the start of component flags. + 'batch_prediction_sxs', + f'--display_name={display_name}', + f'--evaluation_dataset={evaluation_dataset}', + ( + '--id_columns=' + "{{$.inputs.parameters['id_columns'].json_escape[0]}}" + ), + f'--task={task}', + f'--project={_placeholders.PROJECT_ID_PLACEHOLDER}', + f'--location={_placeholders.LOCATION_PLACEHOLDER}', + f'--model_a={model_a}', + f'--model_b={model_b}', + ( + '--model_a_prompt_parameters=' + "{{$.inputs.parameters['model_a_prompt_parameters']" + '.json_escape[0]}}' + ), + ( + '--model_b_prompt_parameters=' + "{{$.inputs.parameters['model_b_prompt_parameters']" + '.json_escape[0]}}' + ), + ( + '--autorater_prompt_parameters=' + "{{$.inputs.parameters['autorater_prompt_parameters']" + '.json_escape[0]}}' + ), + f'--response_column_a={response_column_a}', + f'--response_column_b={response_column_b}', + ( + '--model_a_parameters=' + "{{$.inputs.parameters['model_a_parameters'].json_escape[0]}}" + ), + ( + '--model_b_parameters=' + "{{$.inputs.parameters['model_b_parameters'].json_escape[0]}}" + ), + f'--human_preference_column={human_preference_column}', + f'--staging_dir={dsl.PIPELINE_ROOT_PLACEHOLDER}', + f'--preprocessed_evaluation_dataset_uri={preprocessed_evaluation_dataset_uri}', + f'--metadata_path={metadata}', + f'--gcp_resources_path={gcp_resources}', + '--executor_input={{$.json_escape[1]}}', + ], + ), + gcp_resources=gcp_resources, + ) diff --git a/components/google-cloud/google_cloud_pipeline_components/_implementation/llm/env.py b/components/google-cloud/google_cloud_pipeline_components/_implementation/llm/env.py index b975c6871b7..e20fa2126e5 100644 --- a/components/google-cloud/google_cloud_pipeline_components/_implementation/llm/env.py +++ b/components/google-cloud/google_cloud_pipeline_components/_implementation/llm/env.py @@ -14,7 +14,8 @@ """A collection of constants shared across components and pipelines.""" import os -_DEFAULT_AUTOSXS_IMAGE_TAG = '20240116_0507_RC00' +_DEFAULT_AUTOSXS_IMAGE_TAG = '20240123_0507_RC00' + def get_private_image_tag() -> str: return os.getenv('PRIVATE_IMAGE_TAG') or '20240124_0507_RC00' diff --git a/components/google-cloud/google_cloud_pipeline_components/preview/model_evaluation/model_based_llm_evaluation/autosxs/autosxs_pipeline.py b/components/google-cloud/google_cloud_pipeline_components/preview/model_evaluation/model_based_llm_evaluation/autosxs/autosxs_pipeline.py index 04bc0eab5ed..98a7f49b472 100644 --- a/components/google-cloud/google_cloud_pipeline_components/preview/model_evaluation/model_based_llm_evaluation/autosxs/autosxs_pipeline.py +++ b/components/google-cloud/google_cloud_pipeline_components/preview/model_evaluation/model_based_llm_evaluation/autosxs/autosxs_pipeline.py @@ -16,73 +16,12 @@ from typing import Any, Dict, List from google_cloud_pipeline_components import _placeholders -from google_cloud_pipeline_components._implementation.llm import arbiter_preprocess from google_cloud_pipeline_components._implementation.llm import autosxs_arbiter from google_cloud_pipeline_components._implementation.llm import autosxs_metrics_computer -from google_cloud_pipeline_components._implementation.llm import function_based -from google_cloud_pipeline_components._implementation.llm import task_preprocess -from google_cloud_pipeline_components.types import artifact_types -from google_cloud_pipeline_components.v1 import batch_predict_job +from google_cloud_pipeline_components._implementation.llm import batch_prediction_sxs from kfp import dsl -# pylint: disable=no-value-for-parameter -@dsl.pipeline( - name='predictions-pipeline', - description='Runs the prediction pipeline for one of the two SxS models.', -) -def _get_predictions( - name: str, - project: str, - location: str, - model: str, - model_parameters: Dict[str, str], - prediction_inputs: List[str], - is_model_inference: bool, -) -> str: - """Makes predictions for a given model.""" - with dsl.If(is_model_inference == True, name='Inference Required'): # pylint: disable=singleton-comparison - get_vertex_model_task = dsl.importer( - artifact_uri=( - f'https://{location}-aiplatform.googleapis.com/v1/{model}' - ), - artifact_class=artifact_types.VertexModel, - metadata={'resourceName': model}, - ).set_display_name('Import Vertex Model Artifact') - - batch_predict_task = batch_predict_job.ModelBatchPredictOp( - project=project, - location=location, - model=get_vertex_model_task.outputs['artifact'], - job_display_name=( - f'autosxs-{name}-{{$.pipeline_job_uuid}}-{{$.pipeline_task_uuid}}' - ), - gcs_source_uris=prediction_inputs, - instances_format='jsonl', - predictions_format='jsonl', - gcs_destination_output_uri_prefix=( - f'{dsl.PIPELINE_ROOT_PLACEHOLDER}/{dsl.PIPELINE_TASK_ID_PLACEHOLDER}' - f'/{name}_predictions' - ), - model_parameters=model_parameters, - ) - prediction_uris_from_inference = function_based.get_uri( - artifact=batch_predict_task.outputs['gcs_output_directory'], - is_dir=True, - ) - - with dsl.Else(name='Responses Provided'): # pylint: disable=singleton-comparison - prediction_uris_inference_provided = function_based.get_empty_string() - - prediction_uris = dsl.OneOf( - prediction_uris_from_inference.output, - prediction_uris_inference_provided.output, - ) - - # We can't directly output dsl.OneOf, so we need to use identity. - return function_based.identity(x=prediction_uris).output - - # pylint: disable=dangerous-default-value,g-bare-generic,unused-argument @dsl.pipeline( name='autosxs-template', @@ -132,72 +71,24 @@ def autosxs_pipeline( experimental_args: Experimentally released arguments. Subject to change. """ # fmt: on - prediction_inputs_a = task_preprocess.task_preprocess( + arbiter_input = batch_prediction_sxs.batch_prediction_sxs( + display_name='autosxs-{{$.pipeline_job_uuid}}-{{$.pipeline_task_uuid}}', evaluation_dataset=evaluation_dataset, - task=task, - model_prompt_parameters=model_a_prompt_parameters, - response_column=response_column_a, - human_preference_column=human_preference_column, id_columns=id_columns, - ).set_display_name('Preprocess Model A Inputs') - - prediction_inputs_b = task_preprocess.task_preprocess( - evaluation_dataset=evaluation_dataset, task=task, - model_prompt_parameters=model_b_prompt_parameters, - response_column=response_column_b, - human_preference_column=human_preference_column, - id_columns=id_columns, - ).set_display_name('Preprocess Model B Inputs') - - is_model_a_inference = function_based.get_usage_metric( - metadata=prediction_inputs_a.outputs['metadata'], - key='is_model_inference', - ).set_display_name('Read is_model_a_inference') - - is_model_b_inference = function_based.get_usage_metric( - metadata=prediction_inputs_b.outputs['metadata'], - key='is_model_inference', - ).set_display_name('Read is_model_b_inference') - - inferrer_a = _get_predictions( - name='A', - project=project, - location=location, - model=model_a, - model_parameters=model_a_parameters, - prediction_inputs=prediction_inputs_a.outputs['prediction_inputs'], - is_model_inference=is_model_a_inference.output, - ).set_display_name('Model A Responses') - - inferrer_b = _get_predictions( - name='B', - project=project, - location=location, - model=model_b, - model_parameters=model_b_parameters, - prediction_inputs=prediction_inputs_b.outputs['prediction_inputs'], - is_model_inference=is_model_b_inference.output, - ).set_display_name('Model B Responses') - - arbiter_input_preprocess = arbiter_preprocess.arbiter_preprocess( autorater_prompt_parameters=autorater_prompt_parameters, - evaluation_dataset=evaluation_dataset, - id_columns=id_columns, - prediction_uris_b=inferrer_b.output, - prediction_uris_a=inferrer_a.output, - model_a_prompt_parameters=model_a_prompt_parameters, - model_b_prompt_parameters=model_b_prompt_parameters, - task=task, response_column_a=response_column_a, response_column_b=response_column_b, + model_a=model_a, + model_b=model_b, + model_a_prompt_parameters=model_a_prompt_parameters, + model_b_prompt_parameters=model_b_prompt_parameters, + model_a_parameters=model_a_parameters, + model_b_parameters=model_b_parameters, human_preference_column=human_preference_column, - is_bp_output_a=is_model_a_inference.output, - is_bp_output_b=is_model_b_inference.output, - ).set_display_name('Preprocess Predictions') - + ).set_display_name('AutoSxS Batch Prediction') autosxs_arbiter_task = autosxs_arbiter.autosxs_arbiter( - inference_output_uri=arbiter_input_preprocess.outputs[ + inference_output_uri=arbiter_input.outputs[ 'preprocessed_evaluation_dataset_uri' ], id_columns=id_columns, @@ -207,13 +98,7 @@ def autosxs_pipeline( bigquery_destination_prefix=bigquery_destination_prefix, experimental_args=experimental_args, ).set_display_name('AutoSxS Arbiter') - - has_human_preference = function_based.get_usage_metric( - metadata=prediction_inputs_a.outputs['metadata'], - key='has_human_preference_column', - ).set_display_name('Read has_human_preference_column') - autosxs_metrics_computer.autosxs_metrics_computer( judgments_dir=autosxs_arbiter_task.outputs['judgments_uri'], - has_human_preference=has_human_preference.output, + human_preference_column=human_preference_column, ).set_display_name('AutoSxS Metrics')