diff --git a/airflow/providers/google/cloud/utils/credentials_provider.py b/airflow/providers/google/cloud/utils/credentials_provider.py index 1cf33ea70b056..b809703d426c9 100644 --- a/airflow/providers/google/cloud/utils/credentials_provider.py +++ b/airflow/providers/google/cloud/utils/credentials_provider.py @@ -173,6 +173,9 @@ class _CredentialProvider(LoggingMixin): :param key_path: Path to Google Cloud Service Account key file (JSON). :param keyfile_dict: A dict representing Cloud Service Account as in the Credential JSON file + :param key_secret_name: Keyfile Secret Name in GCP Secret Manager. + :param key_secret_project_id: Project ID to read the secrets from. If not passed, the project ID from + default credentials will be used. :param scopes: OAuth scopes for the connection :param delegate_to: The account to impersonate using domain-wide delegation of authority, if any. For this to work, the service account making the request must have @@ -194,6 +197,7 @@ def __init__( key_path: Optional[str] = None, keyfile_dict: Optional[Dict[str, str]] = None, key_secret_name: Optional[str] = None, + key_secret_project_id: Optional[str] = None, scopes: Optional[Collection[str]] = None, delegate_to: Optional[str] = None, disable_logging: bool = False, @@ -210,6 +214,7 @@ def __init__( self.key_path = key_path self.keyfile_dict = keyfile_dict self.key_secret_name = key_secret_name + self.key_secret_project_id = key_secret_project_id self.scopes = scopes self.delegate_to = delegate_to self.disable_logging = disable_logging @@ -288,7 +293,10 @@ def _get_credentials_using_key_secret_name(self): if not secret_manager_client.is_valid_secret_name(self.key_secret_name): raise AirflowException('Invalid secret name specified for fetching JSON key data.') - secret_value = secret_manager_client.get_secret(self.key_secret_name, adc_project_id) + secret_value = secret_manager_client.get_secret( + secret_id=self.key_secret_name, + project_id=self.key_secret_project_id if self.key_secret_project_id else adc_project_id, + ) if secret_value is None: raise AirflowException(f"Failed getting value of secret {self.key_secret_name}.") diff --git a/airflow/providers/google/common/hooks/base_google.py b/airflow/providers/google/common/hooks/base_google.py index 6169e707bc071..997c72c4e6ac7 100644 --- a/airflow/providers/google/common/hooks/base_google.py +++ b/airflow/providers/google/common/hooks/base_google.py @@ -192,6 +192,9 @@ def get_connection_form_widgets() -> Dict[str, Any]: "extra__google_cloud_platform__key_secret_name": StringField( lazy_gettext('Keyfile Secret Name (in GCP Secret Manager)'), widget=BS3TextFieldWidget() ), + "extra__google_cloud_platform__key_secret_project_id": StringField( + lazy_gettext('Keyfile Secret Project Id (in GCP Secret Manager)'), widget=BS3TextFieldWidget() + ), "extra__google_cloud_platform__num_retries": IntegerField( lazy_gettext('Number of Retries'), validators=[NumberRange(min=0)], @@ -236,6 +239,7 @@ def _get_credentials_and_project_id(self) -> Tuple[google.auth.credentials.Crede except json.decoder.JSONDecodeError: raise AirflowException('Invalid key JSON.') key_secret_name: Optional[str] = self._get_field('key_secret_name', None) + key_secret_project_id: Optional[str] = self._get_field('key_secret_project_id', None) target_principal, delegates = _get_target_principal_and_delegates(self.impersonation_chain) @@ -243,6 +247,7 @@ def _get_credentials_and_project_id(self) -> Tuple[google.auth.credentials.Crede key_path=key_path, keyfile_dict=keyfile_dict_json, key_secret_name=key_secret_name, + key_secret_project_id=key_secret_project_id, scopes=self.scopes, delegate_to=self.delegate_to, target_principal=target_principal, diff --git a/docs/apache-airflow-providers-google/connections/gcp.rst b/docs/apache-airflow-providers-google/connections/gcp.rst index 6f9f144bbc0be..ba87995e9eee8 100644 --- a/docs/apache-airflow-providers-google/connections/gcp.rst +++ b/docs/apache-airflow-providers-google/connections/gcp.rst @@ -128,6 +128,7 @@ Number of Retries * ``extra__google_cloud_platform__key_path`` - Keyfile Path * ``extra__google_cloud_platform__keyfile_dict`` - Keyfile JSON * ``extra__google_cloud_platform__key_secret_name`` - Secret name which holds Keyfile JSON + * ``extra__google_cloud_platform__key_secret_project_id`` - Project Id which holds Keyfile JSON * ``extra__google_cloud_platform__scope`` - Scopes * ``extra__google_cloud_platform__num_retries`` - Number of Retries diff --git a/tests/providers/google/common/hooks/test_base_google.py b/tests/providers/google/common/hooks/test_base_google.py index f833469412bee..a60d3a4e22840 100644 --- a/tests/providers/google/common/hooks/test_base_google.py +++ b/tests/providers/google/common/hooks/test_base_google.py @@ -333,6 +333,7 @@ def test_get_credentials_and_project_id_with_default_auth(self, mock_get_creds_a key_path=None, keyfile_dict=None, key_secret_name=None, + key_secret_project_id=None, scopes=self.instance.scopes, delegate_to=None, target_principal=None, @@ -350,6 +351,7 @@ def test_get_credentials_and_project_id_with_service_account_file(self, mock_get key_path='KEY_PATH.json', keyfile_dict=None, key_secret_name=None, + key_secret_project_id=None, scopes=self.instance.scopes, delegate_to=None, target_principal=None, @@ -378,6 +380,7 @@ def test_get_credentials_and_project_id_with_service_account_info(self, mock_get key_path=None, keyfile_dict=service_account, key_secret_name=None, + key_secret_project_id=None, scopes=self.instance.scopes, delegate_to=None, target_principal=None, @@ -396,6 +399,7 @@ def test_get_credentials_and_project_id_with_default_auth_and_delegate(self, moc key_path=None, keyfile_dict=None, key_secret_name=None, + key_secret_project_id=None, scopes=self.instance.scopes, delegate_to="USER", target_principal=None, @@ -430,6 +434,7 @@ def test_get_credentials_and_project_id_with_default_auth_and_overridden_project key_path=None, keyfile_dict=None, key_secret_name=None, + key_secret_project_id=None, scopes=self.instance.scopes, delegate_to=None, target_principal=None, @@ -634,6 +639,7 @@ def test_get_credentials_and_project_id_with_impersonation_chain( key_path=None, keyfile_dict=None, key_secret_name=None, + key_secret_project_id=None, scopes=self.instance.scopes, delegate_to=None, target_principal=target_principal,