From 3af90fd838f923caf0da6c533ef0b693477b8d12 Mon Sep 17 00:00:00 2001 From: olegkachur-e Date: Fri, 20 Dec 2024 23:34:14 +0100 Subject: [PATCH] Introduce gcp translation(V3) glossaries providers (#45085) Operators: - TranslateCreateGlossaryOperator - TranslateUpdateGlossaryOperator - TranslateListGlossariesOperator - TranslateDeleteGlossaryOperator This set of operators allows to work with custom translation dictionaries (glossaries), using Google Cloud Translate V3 API. Co-authored-by: Oleg Kachur --- .../operators/cloud/translate.rst | 86 ++++ docs/spelling_wordlist.txt | 1 + .../providers/google/cloud/hooks/translate.py | 231 ++++++++++ .../providers/google/cloud/links/translate.py | 28 ++ .../google/cloud/operators/translate.py | 411 ++++++++++++++++++ .../airflow/providers/google/provider.yaml | 1 + .../google/cloud/operators/test_translate.py | 226 +++++++++- .../translate/example_translate_glossary.py | 116 +++++ 8 files changed, 1099 insertions(+), 1 deletion(-) create mode 100644 providers/tests/system/google/cloud/translate/example_translate_glossary.py diff --git a/docs/apache-airflow-providers-google/operators/cloud/translate.rst b/docs/apache-airflow-providers-google/operators/cloud/translate.rst index 4b1cee34617b5..2112ce5f35cfc 100644 --- a/docs/apache-airflow-providers-google/operators/cloud/translate.rst +++ b/docs/apache-airflow-providers-google/operators/cloud/translate.rst @@ -289,6 +289,92 @@ Basic usage of the operator: :end-before: [END howto_operator_translate_document_batch] +.. _howto/operator:TranslateCreateGlossaryOperator: + +TranslateCreateGlossaryOperator +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Create a translation glossary, using Cloud Translate API (Advanced V3). + +For parameter definition, take a look at +:class:`~airflow.providers.google.cloud.operators.translate.TranslateCreateGlossaryOperator` + +Using the operator +"""""""""""""""""" + +Basic usage of the operator: + +.. exampleinclude:: /../../providers/tests/system/google/cloud/translate/example_translate_glossary.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_translate_create_glossary] + :end-before: [END howto_operator_translate_create_glossary] + + +.. _howto/operator:TranslateUpdateGlossaryOperator: + +TranslateUpdateGlossaryOperator +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Updates translation glossary, using Cloud Translate API (Advanced V3). +Only ``display_name`` and ``input_config`` fields available for update. +By updating input_config - the glossary dictionary updates. + +For parameter definition, take a look at +:class:`~airflow.providers.google.cloud.operators.translate.TranslateUpdateGlossaryOperator` + +Using the operator +"""""""""""""""""" + +Basic usage of the operator: + +.. exampleinclude:: /../../providers/tests/system/google/cloud/translate/example_translate_glossary.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_translate_update_glossary] + :end-before: [END howto_operator_translate_update_glossary] + + +.. _howto/operator:TranslateListGlossariesOperator: + +TranslateListGlossariesOperator +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +List all available translation glossaries on the project. + +For parameter definition, take a look at +:class:`~airflow.providers.google.cloud.operators.translate.TranslateListGlossariesOperator` + +Using the operator +"""""""""""""""""" + +Basic usage of the operator: + +.. exampleinclude:: /../../providers/tests/system/google/cloud/translate/example_translate_glossary.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_translate_list_glossaries] + :end-before: [END howto_operator_translate_list_glossaries] + + +.. _howto/operator:TranslateDeleteGlossaryOperator: + +TranslateDeleteGlossaryOperator +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Delete the translation glossary resource. + +For parameter definition, take a look at +:class:`~airflow.providers.google.cloud.operators.translate.TranslateDeleteGlossaryOperator` + +Using the operator +"""""""""""""""""" + +Basic usage of the operator: + +.. exampleinclude:: /../../providers/tests/system/google/cloud/translate/example_translate_glossary.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_translate_delete_glossary] + :end-before: [END howto_operator_translate_delete_glossary] + + More information """""""""""""""""" See: diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index 31d14579932e8..d85becdb1586b 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -125,6 +125,7 @@ autoscale autoscaled autoscaler autoscaling +available avp Avro avro diff --git a/providers/src/airflow/providers/google/cloud/hooks/translate.py b/providers/src/airflow/providers/google/cloud/hooks/translate.py index cf9a748d1a2ae..6bb095d9a25e6 100644 --- a/providers/src/airflow/providers/google/cloud/hooks/translate.py +++ b/providers/src/airflow/providers/google/cloud/hooks/translate.py @@ -30,6 +30,7 @@ from google.api_core.retry import Retry from google.cloud.translate_v2 import Client from google.cloud.translate_v3 import TranslationServiceClient +from google.cloud.translate_v3.types.translation_service import GlossaryInputConfig from airflow.exceptions import AirflowException from airflow.providers.google.common.consts import CLIENT_INFO @@ -51,6 +52,7 @@ TransliterationConfig, automl_translation, ) + from google.cloud.translate_v3.types.translation_service import Glossary from proto import Message @@ -915,3 +917,232 @@ def batch_translate_document( retry=retry, metadata=metadata, ) + + def create_glossary( + self, + project_id: str, + location: str, + glossary_id: str, + input_config: GlossaryInputConfig | dict, + language_pair: Glossary.LanguageCodePair | dict | None = None, + language_codes_set: Glossary.LanguageCodesSet | MutableSequence[str] | None = None, + retry: Retry | _MethodDefault = DEFAULT, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] = (), + ) -> Operation: + """ + Create the glossary resource from the input source file. + + :param project_id: ID of the Google Cloud project where dataset is located. If not provided + default project_id is used. + :param location: The location of the project. + :param glossary_id: User-specified id to built glossary resource name. + :param input_config: The input configuration of examples to built glossary from. + Total glossary must not exceed 10M Unicode codepoints. + The headers should not be included into the input file table, as languages specified with the + ``language_pair`` or ``language_codes_set`` params. + :param language_pair: Pair of language codes to be used for glossary creation. + Used to built unidirectional glossary. If specified, the ``language_codes_set`` should be empty. + :param language_codes_set: Set of language codes to create the equivalent term sets glossary. + Meant multiple languages mapping. If specified, the ``language_pair`` should be empty. + :param retry: A retry object used to retry requests. If `None` is specified, requests will not be + retried. + :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if + `retry` is specified, the timeout applies to each individual attempt. + :param metadata: Additional metadata that is provided to the method. + + :return: `Operation` object with the glossary creation results. + """ + client = self.get_client() + parent = f"projects/{project_id}/locations/{location}" + name = f"projects/{project_id}/locations/{location}/glossaries/{glossary_id}" + + result = client.create_glossary( + request={ + "parent": parent, + "glossary": { + "name": name, + "input_config": input_config, + "language_pair": language_pair, + "language_codes_set": language_codes_set, + }, + }, + retry=retry, + timeout=timeout, + metadata=metadata, + ) + return result + + def get_glossary( + self, + project_id: str, + location: str, + glossary_id: str, + retry: Retry | _MethodDefault = DEFAULT, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] = (), + ) -> Glossary: + """ + Fetch glossary item data by the given id. + + The glossary_id is a substring of glossary name, following the format: + ``projects/{project-number-or-id}/locations/{location-id}/glossaries/{glossary-id}`` + + :param project_id: ID of the Google Cloud project where dataset is located. If not provided + default project_id is used. + :param location: The location of the project. + :param glossary_id: User-specified id to built glossary resource name. + :param retry: A retry object used to retry requests. If `None` is specified, requests will not be + retried. + :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if + `retry` is specified, the timeout applies to each individual attempt. + :param metadata: Additional metadata that is provided to the method. + + :return: Fetched glossary item. + """ + client = self.get_client() + name = f"projects/{project_id}/locations/{location}/glossaries/{glossary_id}" + result = client.get_glossary( + name=name, + retry=retry, + timeout=timeout, + metadata=metadata, + ) + if not result: + raise AirflowException(f"Fail to get glossary {name}! Please check if it exists.") + return result + + def update_glossary( + self, + glossary: Glossary, + new_display_name: str | None = None, + new_input_config: GlossaryInputConfig | dict | None = None, + retry: Retry | _MethodDefault = DEFAULT, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] = (), + ) -> Operation: + """ + Update glossary item with values provided. + + Only ``display_name`` and ``input_config`` fields are allowed for update. + + :param glossary: Glossary item to update. + :param new_display_name: New value of the ``display_name`` to be updated. + :param new_input_config: New value of the ``input_config`` to be updated. + :param retry: A retry object used to retry requests. If `None` is specified, requests will not be + retried. + :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if + `retry` is specified, the timeout applies to each individual attempt. + :param metadata: Additional metadata that is provided to the method. + + :return: `Operation` with glossary update results. + """ + client = self.get_client() + updated_fields = [] + if new_display_name: + glossary.display_name = new_display_name + updated_fields.append("display_name") + if new_input_config is not None: + if isinstance(new_input_config, dict): + new_input_config = GlossaryInputConfig(**new_input_config) + glossary.input_config = new_input_config + updated_fields.append("input_config") + result = client.update_glossary( + request={"glossary": glossary, "update_mask": {"paths": updated_fields}}, + retry=retry, + timeout=timeout, + metadata=metadata, + ) + return result + + def list_glossaries( + self, + project_id: str, + location: str, + page_size: int | None = None, + page_token: str | None = None, + filter_str: str | None = None, + retry: Retry | _MethodDefault = DEFAULT, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] = (), + ) -> pagers.ListGlossariesPager: + """ + Get the list of glossaries available. + + :param project_id: ID of the Google Cloud project where dataset is located. If not provided + default project_id is used. + :param location: The location of the project. + :param page_size: Page size requested, if not set server use appropriate default. + :param page_token: A token identifying a page of results the server should return. + The first page is returned if ``page_token`` is empty or missing. + :param filter_str: Filter specifying constraints of a list operation. Specify the constraint by the + format of "key=value", where key must be ``src`` or ``tgt``, and the value must be a valid + language code. + For multiple restrictions, concatenate them by "AND" (uppercase only), such as: + ``src=en-US AND tgt=zh-CN``. Notice that the exact match is used here, which means using 'en-US' + and 'en' can lead to different results, which depends on the language code you used when you + create the glossary. + For the unidirectional glossaries, the ``src`` and ``tgt`` add restrictions + on the source and target language code separately. + For the equivalent term set glossaries, the ``src`` and/or ``tgt`` add restrictions on the term set. + For example: ``src=en-US AND tgt=zh-CN`` will only pick the unidirectional glossaries which exactly + match the source language code as ``en-US`` and the target language code ``zh-CN``, but all + equivalent term set glossaries which contain ``en-US`` and ``zh-CN`` in their language set will + be picked. + If missing, no filtering is performed. + :param retry: A retry object used to retry requests. If `None` is specified, requests will not be + retried. + :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if + `retry` is specified, the timeout applies to each individual attempt. + :param metadata: Additional metadata that is provided to the method. + + :return: Glossaries list pager object. + """ + client = self.get_client() + parent = f"projects/{project_id}/locations/{location}" + result = client.list_glossaries( + request={ + "parent": parent, + "page_size": page_size, + "page_token": page_token, + "filter": filter_str, + }, + retry=retry, + timeout=timeout, + metadata=metadata, + ) + return result + + def delete_glossary( + self, + project_id: str, + location: str, + glossary_id: str, + retry: Retry | _MethodDefault = DEFAULT, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] = (), + ) -> Operation: + """ + Delete the glossary item by the given id. + + :param project_id: ID of the Google Cloud project where dataset is located. If not provided + default project_id is used. + :param location: The location of the project. + :param glossary_id: Glossary id to be deleted. + :param retry: A retry object used to retry requests. If `None` is specified, requests will not be + retried. + :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if + `retry` is specified, the timeout applies to each individual attempt. + :param metadata: Additional metadata that is provided to the method. + + :return: `Operation` with glossary deletion results. + """ + client = self.get_client() + name = f"projects/{project_id}/locations/{location}/glossaries/{glossary_id}" + result = client.delete_glossary( + name=name, + retry=retry, + timeout=timeout, + metadata=metadata, + ) + return result diff --git a/providers/src/airflow/providers/google/cloud/links/translate.py b/providers/src/airflow/providers/google/cloud/links/translate.py index ecf595e9a59c1..4415b058829fb 100644 --- a/providers/src/airflow/providers/google/cloud/links/translate.py +++ b/providers/src/airflow/providers/google/cloud/links/translate.py @@ -56,6 +56,8 @@ ) TRANSLATION_MODELS_LIST_LINK = TRANSLATION_BASE_LINK + "/models/list?project={project_id}" +TRANSLATION_HUB_RESOURCES_LIST_LINK = TRANSLATION_BASE_LINK + "/hub/resources?project={project_id}" + class TranslationLegacyDatasetLink(BaseGoogleLink): """ @@ -368,3 +370,29 @@ def persist( ), }, ) + + +class TranslationGlossariesListLink(BaseGoogleLink): + """ + Helper class for constructing Translation Glossaries List link. + + Link for the list of available glossaries. + """ + + name = "Translation Glossaries List" + key = "translation_glossaries_list" + format_str = TRANSLATION_HUB_RESOURCES_LIST_LINK + + @staticmethod + def persist( + context: Context, + task_instance, + project_id: str, + ): + task_instance.xcom_push( + context, + key=TranslationGlossariesListLink.key, + value={ + "project_id": project_id, + }, + ) diff --git a/providers/src/airflow/providers/google/cloud/operators/translate.py b/providers/src/airflow/providers/google/cloud/operators/translate.py index e57b9e46fccae..2c2424ea5f41e 100644 --- a/providers/src/airflow/providers/google/cloud/operators/translate.py +++ b/providers/src/airflow/providers/google/cloud/operators/translate.py @@ -31,6 +31,7 @@ TranslateResultByOutputConfigLink, TranslateTextBatchLink, TranslationDatasetsListLink, + TranslationGlossariesListLink, TranslationModelLink, TranslationModelsListLink, TranslationNativeDatasetLink, @@ -52,6 +53,7 @@ TransliterationConfig, automl_translation, ) + from google.cloud.translate_v3.types.translation_service import Glossary, GlossaryInputConfig from airflow.utils.context import Context @@ -158,6 +160,7 @@ class TranslateTextOperator(GoogleCloudBaseOperator): :param project_id: Optional. The ID of the Google Cloud project that the service belongs to. + If not provided default project_id is used. :param location: optional. The ID of the Google Cloud location that the service belongs to. if not specified, 'global' is used. Non-global location is required for requests using AutoML models or custom glossaries. @@ -1308,3 +1311,411 @@ def execute(self, context: Context) -> dict: result = hook.wait_for_operation_result(batch_document_translate_operation) self.log.info("Batch document translation job finished") return cast(dict, type(result).to_dict(result)) + + +class TranslateCreateGlossaryOperator(GoogleCloudBaseOperator): + """ + Creates a Google Cloud Translation Glossary. + + Creates a translation glossary, using API V3. + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:TranslateCreateGlossaryOperator`. + + :param glossary_id: User-specified id to built glossary resource name. + :param input_config: The input configuration of examples to built glossary from. + Total glossary must not exceed 10M Unicode codepoints. + The headers should not be included into the input file table, as languages specified with the + ``language_pair`` or ``language_codes_set`` params. + :param language_pair: Pair of language codes to be used for glossary creation. + Used to built unidirectional glossary. If specified, the ``language_codes_set`` should be empty. + :param language_codes_set: Set of language codes to create the equivalent term sets glossary. + Meant multiple languages mapping. If specified, the ``language_pair`` should be empty. + :param project_id: ID of the Google Cloud project where glossary is located. + If not provided default project_id is used. + :param location: The location of the project. + :param retry: Designation of what errors, if any, should be retried. + :param timeout: The timeout for this request. + :param metadata: Strings which should be sent along with the request as metadata. + :param gcp_conn_id: The connection ID to use connecting to Google Cloud. + :param impersonation_chain: Optional service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). + """ + + template_fields: Sequence[str] = ( + "glossary_id", + "location", + "project_id", + "gcp_conn_id", + "impersonation_chain", + ) + + def __init__( + self, + *, + project_id: str = PROVIDE_PROJECT_ID, + location: str, + glossary_id: str, + input_config: GlossaryInputConfig | dict, + language_pair: Glossary.LanguageCodePair | dict | None = None, + language_codes_set: Glossary.LanguageCodesSet | MutableSequence[str] | None = None, + timeout: float | None = None, + retry: Retry | _MethodDefault = DEFAULT, + gcp_conn_id: str = "google_cloud_default", + metadata: Sequence[tuple[str, str]] = (), + impersonation_chain: str | Sequence[str] | None = None, + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.project_id = project_id + self.location = location + self.glossary_id = glossary_id + self.input_config = input_config + self.language_pair = language_pair + self.language_codes_set = language_codes_set + self.metadata = metadata + self.timeout = timeout + self.retry = retry + self.gcp_conn_id = gcp_conn_id + self.impersonation_chain = impersonation_chain + + def execute(self, context: Context) -> str: + hook = TranslateHook( + gcp_conn_id=self.gcp_conn_id, + impersonation_chain=self.impersonation_chain, + ) + project_id = self.project_id or hook.project_id + try: + result_operation = hook.create_glossary( + glossary_id=self.glossary_id, + input_config=self.input_config, + language_pair=self.language_pair, + language_codes_set=self.language_codes_set, + project_id=project_id, + location=self.location, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + except GoogleAPICallError as e: + self.log.error("Error submitting create_glossary operation ") + raise AirflowException(e) + self.log.info("Glossary creation started, glossary_id %s...", self.glossary_id) + + result = hook.wait_for_operation_result(operation=result_operation) + result = type(result).to_dict(result) + + glossary_id = hook.extract_object_id(result) + self.xcom_push(context, key="glossary_id", value=glossary_id) + self.log.info("Glossary creation complete. The glossary_id: %s.", glossary_id) + return result + + +class TranslateUpdateGlossaryOperator(GoogleCloudBaseOperator): + """ + Update glossary item with values provided. + + Updates the translation glossary, using translation API V3. + Only ``display_name`` and ``input_config`` fields are allowed for update. + + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:TranslateUpdateGlossaryOperator`. + + :param glossary_id: User-specified id to built glossary resource name. + :param input_config: The input configuration of examples to built glossary from. + Total glossary must not exceed 10M Unicode codepoints. + The headers should not be included into the input file table, as languages specified with the + ``language_pair`` or ``language_codes_set`` params. + :param language_pair: Pair of language codes to be used for glossary creation. + Used to built unidirectional glossary. If specified, the ``language_codes_set`` should be empty. + :param language_codes_set: Set of language codes to create the equivalent term sets glossary. + Meant multiple languages mapping. If specified, the ``language_pair`` should be empty. + :param project_id: ID of the Google Cloud project where glossary is located. + If not provided default project_id is used. + :param location: The location of the project. + :param retry: Designation of what errors, if any, should be retried. + :param timeout: The timeout for this request. + :param metadata: Strings which should be sent along with the request as metadata. + :param gcp_conn_id: The connection ID to use connecting to Google Cloud. + :param impersonation_chain: Optional service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). + """ + + template_fields: Sequence[str] = ( + "glossary_id", + "location", + "project_id", + "gcp_conn_id", + "impersonation_chain", + ) + + def __init__( + self, + *, + project_id: str = PROVIDE_PROJECT_ID, + location: str, + glossary_id: str, + new_display_name: str, + new_input_config: GlossaryInputConfig | dict | None = None, + timeout: float | None = None, + retry: Retry | _MethodDefault = DEFAULT, + gcp_conn_id: str = "google_cloud_default", + metadata: Sequence[tuple[str, str]] = (), + impersonation_chain: str | Sequence[str] | None = None, + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.project_id = project_id + self.location = location + self.glossary_id = glossary_id + self.new_display_name = new_display_name + self.new_input_config = new_input_config + self.metadata = metadata + self.timeout = timeout + self.retry = retry + self.gcp_conn_id = gcp_conn_id + self.impersonation_chain = impersonation_chain + + def execute(self, context: Context) -> str: + hook = TranslateHook( + gcp_conn_id=self.gcp_conn_id, + impersonation_chain=self.impersonation_chain, + ) + project_id = self.project_id or hook.project_id + glossary = hook.get_glossary( + glossary_id=self.glossary_id, + project_id=project_id, + location=self.location, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + try: + result_operation = hook.update_glossary( + glossary=glossary, + new_display_name=self.new_display_name, + new_input_config=self.new_input_config, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + except GoogleAPICallError as e: + self.log.error("Error submitting update_glossary operation ") + raise AirflowException(e) + self.log.info("Glossary update started, glossary_id %s...", self.glossary_id) + + result = hook.wait_for_operation_result(operation=result_operation) + result = type(result).to_dict(result) + self.log.info("Glossary update complete. The glossary_id: %s.", self.glossary_id) + return result + + +class TranslateListGlossariesOperator(GoogleCloudBaseOperator): + """ + Get a list of translation glossaries in a project. + + List the translation glossaries, using translation API V3. + + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:TranslateListGlossariesOperator`. + + :param project_id: ID of the Google Cloud project where glossary is located. + If not provided default project_id is used. + :param page_size: Page size requested, if not set server use appropriate default. + :param page_token: A token identifying a page of results the server should return. + The first page is returned if ``page_token`` is empty or missing. + :param filter_str: Filter specifying constraints of a list operation. Specify the constraint by the + format of "key=value", where key must be ``src`` or ``tgt``, and the value must be a valid + language code. + For multiple restrictions, concatenate them by "AND" (uppercase only), such as: + ``src=en-US AND tgt=zh-CN``. Notice that the exact match is used here, which means using 'en-US' + and 'en' can lead to different results, which depends on the language code you used when you + create the glossary. + For the unidirectional glossaries, the ``src`` and ``tgt`` add restrictions + on the source and target language code separately. + For the equivalent term set glossaries, the ``src`` and/or ``tgt`` + add restrictions on the term set. + For example: ``src=en-US AND tgt=zh-CN`` will only pick the unidirectional glossaries which + exactly match the source language code as ``en-US`` and the target language code ``zh-CN``, + but all equivalent term set glossaries which contain ``en-US`` and ``zh-CN`` in their language + set will be picked. + If missing, no filtering is performed. + :param location: The location of the project. + :param retry: Designation of what errors, if any, should be retried. + :param timeout: The timeout for this request. + :param metadata: Strings which should be sent along with the request as metadata. + :param gcp_conn_id: The connection ID to use connecting to Google Cloud. + :param impersonation_chain: Optional service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). + """ + + operator_extra_links = (TranslationGlossariesListLink(),) + + template_fields: Sequence[str] = ( + "location", + "project_id", + "gcp_conn_id", + "impersonation_chain", + ) + + def __init__( + self, + *, + project_id: str = PROVIDE_PROJECT_ID, + location: str, + page_size: int | None = None, + page_token: str | None = None, + filter_str: str | None = None, + timeout: float | None = None, + retry: Retry | _MethodDefault = DEFAULT, + gcp_conn_id: str = "google_cloud_default", + metadata: Sequence[tuple[str, str]] = (), + impersonation_chain: str | Sequence[str] | None = None, + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.project_id = project_id + self.location = location + self.page_size = page_size + self.page_token = page_token + self.filter_str = filter_str + self.metadata = metadata + self.timeout = timeout + self.retry = retry + self.gcp_conn_id = gcp_conn_id + self.impersonation_chain = impersonation_chain + + def execute(self, context: Context) -> Sequence[str]: + hook = TranslateHook( + gcp_conn_id=self.gcp_conn_id, + impersonation_chain=self.impersonation_chain, + ) + project_id = self.project_id or hook.project_id + TranslationGlossariesListLink.persist( + context=context, + task_instance=self, + project_id=project_id, + ) + self.log.info("Requesting glossaries list") + try: + results_pager = hook.list_glossaries( + project_id=project_id, + location=self.location, + page_size=self.page_size, + page_token=self.page_token, + filter_str=self.filter_str, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + except GoogleAPICallError as e: + self.log.error("Error submitting list_glossaries request") + raise AirflowException(e) + + result_ids = [] + for glossary_item in results_pager: + glossary_item = type(glossary_item).to_dict(glossary_item) + glossary_id = hook.extract_object_id(glossary_item) + result_ids.append(glossary_id) + self.log.info("Fetching the glossaries list complete. Glossary id-s: %s", result_ids) + return result_ids + + +class TranslateDeleteGlossaryOperator(GoogleCloudBaseOperator): + """ + Delete a Google Cloud Translation Glossary. + + Deletes a translation glossary, using API V3. + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:TranslateDeleteGlossaryOperator`. + + :param glossary_id: User-specified id to delete glossary resource item. + :param project_id: ID of the Google Cloud project where glossary is located. + If not provided default project_id is used. + :param location: The location of the project. + :param retry: Designation of what errors, if any, should be retried. + :param timeout: The timeout for this request. + :param metadata: Strings which should be sent along with the request as metadata. + :param gcp_conn_id: The connection ID to use connecting to Google Cloud. + :param impersonation_chain: Optional service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). + """ + + template_fields: Sequence[str] = ( + "glossary_id", + "location", + "project_id", + "gcp_conn_id", + "impersonation_chain", + ) + + def __init__( + self, + *, + project_id: str = PROVIDE_PROJECT_ID, + location: str, + glossary_id: str, + timeout: float | None = None, + retry: Retry | _MethodDefault = DEFAULT, + gcp_conn_id: str = "google_cloud_default", + metadata: Sequence[tuple[str, str]] = (), + impersonation_chain: str | Sequence[str] | None = None, + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.glossary_id = glossary_id + self.project_id = project_id + self.location = location + self.metadata = metadata + self.timeout = timeout + self.retry = retry + self.gcp_conn_id = gcp_conn_id + self.impersonation_chain = impersonation_chain + + def execute(self, context: Context) -> str: + hook = TranslateHook( + gcp_conn_id=self.gcp_conn_id, + impersonation_chain=self.impersonation_chain, + ) + project_id = self.project_id or hook.project_id + try: + result_operation = hook.delete_glossary( + glossary_id=self.glossary_id, + project_id=project_id, + location=self.location, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + except GoogleAPICallError as e: + self.log.error("Error submitting delete_glossary operation ") + raise AirflowException(e) + self.log.info("Glossary delete started, glossary_id %s...", self.glossary_id) + + result = hook.wait_for_operation_result(operation=result_operation) + result = type(result).to_dict(result) + self.log.info("Glossary deletion complete. The glossary_id: %s.", self.glossary_id) + return result diff --git a/providers/src/airflow/providers/google/provider.yaml b/providers/src/airflow/providers/google/provider.yaml index 21e53063f0454..4afff189b60ca 100644 --- a/providers/src/airflow/providers/google/provider.yaml +++ b/providers/src/airflow/providers/google/provider.yaml @@ -1306,6 +1306,7 @@ extra-links: - airflow.providers.google.cloud.links.translate.TranslationModelLink - airflow.providers.google.cloud.links.translate.TranslationModelsListLink - airflow.providers.google.cloud.links.translate.TranslateResultByOutputConfigLink + - airflow.providers.google.cloud.links.translate.TranslationGlossariesListLink secrets-backends: diff --git a/providers/tests/google/cloud/operators/test_translate.py b/providers/tests/google/cloud/operators/test_translate.py index 2e4217ee53a8e..1d5ef328ae360 100644 --- a/providers/tests/google/cloud/operators/test_translate.py +++ b/providers/tests/google/cloud/operators/test_translate.py @@ -24,22 +24,27 @@ BatchTranslateDocumentResponse, TranslateDocumentResponse, automl_translation, + translation_service, ) from airflow.providers.google.cloud.hooks.translate import TranslateHook from airflow.providers.google.cloud.operators.translate import ( CloudTranslateTextOperator, TranslateCreateDatasetOperator, + TranslateCreateGlossaryOperator, TranslateCreateModelOperator, TranslateDatasetsListOperator, TranslateDeleteDatasetOperator, + TranslateDeleteGlossaryOperator, TranslateDeleteModelOperator, TranslateDocumentBatchOperator, TranslateDocumentOperator, TranslateImportDataOperator, + TranslateListGlossariesOperator, TranslateModelsListOperator, TranslateTextBatchOperator, TranslateTextOperator, + TranslateUpdateGlossaryOperator, ) GCP_CONN_ID = "google_cloud_default" @@ -47,6 +52,7 @@ PROJECT_ID = "test-project-id" DATASET_ID = "sample_ds_id" MODEL_ID = "sample_model_id" +GLOSSARY_ID = "sample_glossary_id" TIMEOUT_VALUE = 30 LOCATION = "location_id" @@ -155,7 +161,6 @@ def test_minimal_green_path(self, mock_hook, mock_link_persist): } SRC_LANG_CODE = "src_lang_code" TARGET_LANG_CODES = ["target_lang_code1", "target_lang_code2"] - LOCATION = "location-id" TIMEOUT = 30 INPUT_CONFIGS = [input_config_item] OUTPUT_CONFIG = {"gcs_destination": {"output_uri_prefix": "gs://source_bucket_uri/output/"}} @@ -708,3 +713,222 @@ def test_minimal_green_path(self, mock_hook, mock_link_persist): project_id=PROJECT_ID, output_config=OUTPUT_CONFIG, ) + + +class TestTranslateGlossaryCreate: + @mock.patch( + "airflow.providers.google.cloud.operators.translate.TranslateCreateGlossaryOperator.xcom_push" + ) + @mock.patch("airflow.providers.google.cloud.operators.translate.TranslateHook") + def test_minimal_green_path(self, mock_hook, mock_xcom_push): + GLOSSARY_CREATION_RESULT = { + "name": f"projects/{PROJECT_ID}/locations/{LOCATION}/glossaries/{GLOSSARY_ID}", + "display_name": f"{GLOSSARY_ID}", + "entry_count": 42, + "input_config": {"gcs_source": {"input_uri": "gs://input_bucket_path/glossary.csv"}}, + "language_pair": {"source_language_code": "en", "target_language_code": "es"}, + "submit_time": "2024-11-17T14:05:00Z", + "end_time": "2024-11-17T17:09:03Z", + } + sample_operation = mock.MagicMock() + sample_operation.result.return_value = translation_service.Glossary(GLOSSARY_CREATION_RESULT) + + mock_hook.return_value.create_glossary.return_value = sample_operation + mock_hook.return_value.wait_for_operation_result.side_effect = lambda operation: operation.result() + mock_hook.return_value.extract_object_id = TranslateHook.extract_object_id + + GLOSSARY_FILE_INPUT = {"gcs_source": {"input_uri": "gs://RESOURCE_BUCKET/glossary_sample.tsv"}} + op = TranslateCreateGlossaryOperator( + task_id="task_id", + glossary_id=f"{GLOSSARY_ID}", + input_config=GLOSSARY_FILE_INPUT, + language_pair={"source_language_code": "en", "target_language_code": "es"}, + language_codes_set=None, + project_id=PROJECT_ID, + location=LOCATION, + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + timeout=TIMEOUT_VALUE, + retry=None, + ) + context = mock.MagicMock() + result = op.execute(context=context) + + mock_hook.assert_called_once_with( + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + mock_hook.return_value.create_glossary.assert_called_once_with( + glossary_id=f"{GLOSSARY_ID}", + input_config=GLOSSARY_FILE_INPUT, + language_pair={"source_language_code": "en", "target_language_code": "es"}, + language_codes_set=None, + project_id=PROJECT_ID, + location=LOCATION, + timeout=TIMEOUT_VALUE, + retry=None, + metadata=(), + ) + mock_xcom_push.assert_called_once_with(context, key="glossary_id", value=GLOSSARY_ID) + assert result == GLOSSARY_CREATION_RESULT + + +class TestTranslateGlossaryUpdate: + @mock.patch("airflow.providers.google.cloud.operators.translate.TranslateHook") + def test_minimal_green_path(self, mock_hook): + UPDATE_GLOSSARY_RESULT = { + "name": f"projects/{PROJECT_ID}/locations/{LOCATION}/glossaries/{GLOSSARY_ID}", + "display_name": "new_glossary_display_name", + "entry_count": 42, + "input_config": {"gcs_source": {"input_uri": "gs://input_bucket_path/glossary_updated.csv"}}, + "language_pair": {"source_language_code": "en", "target_language_code": "es"}, + "submit_time": "2024-11-17T14:05:00Z", + "end_time": "2024-11-17T17:09:03Z", + } + get_glossary_mock = mock.MagicMock() + mock_hook.return_value.get_glossary.return_value = get_glossary_mock + sample_operation = mock.MagicMock() + sample_operation.result.return_value = translation_service.Glossary(UPDATE_GLOSSARY_RESULT) + + mock_hook.return_value.update_glossary.return_value = sample_operation + mock_hook.return_value.wait_for_operation_result.side_effect = lambda operation: operation.result() + + UPDATE_GLOSSARY_FILE_INPUT = {"gcs_source": {"input_uri": "gs://RESOURCE_BUCKET/glossary_sample.tsv"}} + op = TranslateUpdateGlossaryOperator( + task_id="task_id", + new_input_config=UPDATE_GLOSSARY_FILE_INPUT, + new_display_name="new_glossary_display_name", + glossary_id=GLOSSARY_ID, + project_id=PROJECT_ID, + location=LOCATION, + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + timeout=TIMEOUT_VALUE, + retry=None, + ) + context = mock.MagicMock() + result = op.execute(context=context) + + mock_hook.assert_called_once_with( + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + mock_hook.return_value.update_glossary.assert_called_once_with( + glossary=get_glossary_mock, + new_input_config=UPDATE_GLOSSARY_FILE_INPUT, + new_display_name="new_glossary_display_name", + timeout=TIMEOUT_VALUE, + retry=None, + metadata=(), + ) + assert result == UPDATE_GLOSSARY_RESULT + + +class TestTranslateListGlossaries: + @mock.patch("airflow.providers.google.cloud.links.translate.TranslationGlossariesListLink.persist") + @mock.patch("airflow.providers.google.cloud.operators.translate.TranslateHook") + def test_minimal_green_path(self, mock_hook, mock_link_persist): + GLOSSARY_ID_1 = "sample_glossary_1" + GLOSSARY_ID_2 = "sample_glossary_2" + glossary_result_1 = translation_service.Glossary( + dict( + name=f"projects/{PROJECT_ID}/locations/{LOCATION}/glossaries/{GLOSSARY_ID_1}", + display_name=f"{GLOSSARY_ID_1}", + entry_count=100, + input_config={"gcs_source": {"input_uri": "gs://input1.csv"}}, + language_pair={"source_language_code": "en", "target_language_code": "es"}, + submit_time="2024-11-17T14:05:00Z", + end_time="2024-11-17T17:09:03Z", + ) + ) + glossary_result_2 = translation_service.Glossary( + dict( + name=f"projects/{PROJECT_ID}/locations/{LOCATION}/glossaries/{GLOSSARY_ID_2}", + display_name=f"{GLOSSARY_ID_2}", + entry_count=200, + input_config={"gcs_source": {"input_uri": "gs://input2.csv"}}, + language_pair={"source_language_code": "es", "target_language_code": "en"}, + submit_time="2024-11-17T14:05:00Z", + end_time="2024-11-17T17:09:03Z", + ) + ) + mock_hook.return_value.list_glossaries.return_value = [glossary_result_1, glossary_result_2] + mock_hook.return_value.extract_object_id = TranslateHook.extract_object_id + + op = TranslateListGlossariesOperator( + task_id="task_id", + project_id=PROJECT_ID, + page_size=100, + location=LOCATION, + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + timeout=TIMEOUT_VALUE, + retry=DEFAULT, + ) + context = mock.MagicMock() + result = op.execute(context=context) + mock_hook.assert_called_once_with( + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + mock_hook.return_value.list_glossaries.assert_called_once_with( + project_id=PROJECT_ID, + page_size=100, + filter_str=None, + page_token=None, + location=LOCATION, + timeout=TIMEOUT_VALUE, + retry=DEFAULT, + metadata=(), + ) + assert result == [GLOSSARY_ID_1, GLOSSARY_ID_2] + mock_link_persist.assert_called_once_with( + context=context, + task_instance=op, + project_id=PROJECT_ID, + ) + + +class TestTranslateDeleteGlossary: + @mock.patch("airflow.providers.google.cloud.operators.translate.TranslateHook") + def test_minimal_green_path(self, mock_hook): + DELETION_RESULT_SAMPLE = { + "submit_time": "2024-11-17T14:05:00Z", + "end_time": "2024-11-17T17:09:03Z", + "name": f"projects/{PROJECT_ID}/locations/{LOCATION}/glossaries/{GLOSSARY_ID}", + } + sample_operation = mock.MagicMock() + sample_operation.result.return_value = translation_service.DeleteGlossaryResponse( + DELETION_RESULT_SAMPLE + ) + gl_delete_method = mock_hook.return_value.delete_glossary + gl_delete_method.return_value = sample_operation + + mock_hook.return_value.wait_for_operation_result.side_effect = lambda operation: operation.result() + + op = TranslateDeleteGlossaryOperator( + task_id="task_id", + glossary_id=GLOSSARY_ID, + project_id=PROJECT_ID, + location=LOCATION, + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + timeout=TIMEOUT_VALUE, + retry=DEFAULT, + ) + context = mock.MagicMock() + result = op.execute(context=context) + mock_hook.assert_called_once_with( + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + + gl_delete_method.assert_called_once_with( + glossary_id=GLOSSARY_ID, + project_id=PROJECT_ID, + location=LOCATION, + timeout=TIMEOUT_VALUE, + retry=DEFAULT, + metadata=(), + ) + assert result == DELETION_RESULT_SAMPLE diff --git a/providers/tests/system/google/cloud/translate/example_translate_glossary.py b/providers/tests/system/google/cloud/translate/example_translate_glossary.py new file mode 100644 index 0000000000000..16427f6e3d53b --- /dev/null +++ b/providers/tests/system/google/cloud/translate/example_translate_glossary.py @@ -0,0 +1,116 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Example Airflow DAG that translates text in Google Cloud Translate using V3 API version +service in the Google Cloud. +""" + +from __future__ import annotations + +import os +from datetime import datetime + +from airflow.models.dag import DAG +from airflow.providers.google.cloud.operators.translate import ( + TranslateCreateGlossaryOperator, + TranslateDeleteGlossaryOperator, + TranslateListGlossariesOperator, + TranslateUpdateGlossaryOperator, +) + +DAG_ID = "translate_glossary" +PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default") +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default") +REGION = "us-central1" + +RESOURCE_DATA_BUCKET = "airflow-system-tests-resources" + +GLOSSARY_FILE_INPUT = { + "gcs_source": {"input_uri": f"gs://{RESOURCE_DATA_BUCKET}/V3_translate/glossaries/glossary_sample.tsv"} +} +UPDATE_GLOSSARY_FILE_INPUT = { + "gcs_source": { + "input_uri": f"gs://{RESOURCE_DATA_BUCKET}/V3_translate/glossaries/glossary_update_sample.tsv" + } +} + + +with DAG( + DAG_ID, + schedule="@once", # Override to match your needs + start_date=datetime(2024, 1, 1), + catchup=False, + tags=["example", "glossary_translate", "translate_V3"], +) as dag: + # [START howto_operator_translate_create_glossary] + create_glossary = TranslateCreateGlossaryOperator( + task_id="glossary_create", + project_id=PROJECT_ID, + location=REGION, + input_config=GLOSSARY_FILE_INPUT, + glossary_id=f"glossary_new_{PROJECT_ID}", + language_pair={"source_language_code": "en", "target_language_code": "es"}, + ) + # [END howto_operator_translate_create_glossary] + + # [START howto_operator_translate_update_glossary] + glossary_id = create_glossary.output["glossary_id"] + update_glossary = TranslateUpdateGlossaryOperator( + task_id="glossary_update", + project_id=PROJECT_ID, + location=REGION, + new_input_config=UPDATE_GLOSSARY_FILE_INPUT, + new_display_name=f"gl_{PROJECT_ID}_updated", + glossary_id=glossary_id, + ) + # [END howto_operator_translate_update_glossary] + + # [START howto_operator_translate_list_glossaries] + list_glossaries = TranslateListGlossariesOperator( + task_id="list_glossaries", + page_size=100, + project_id=PROJECT_ID, + location=REGION, + ) + # [END howto_operator_translate_list_glossaries] + + # [START howto_operator_translate_delete_glossary] + delete_glossary = TranslateDeleteGlossaryOperator( + task_id="delete_glossary", + glossary_id=glossary_id, + project_id=PROJECT_ID, + location=REGION, + ) + # [END howto_operator_translate_delete_glossary] + + ( + # TEST BODY + create_glossary >> update_glossary >> list_glossaries >> delete_glossary + ) + + from tests_common.test_utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + +from tests_common.test_utils.system_tests import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag)