From fc0dcffff28bf769daab53b56c82a33a88921c7c Mon Sep 17 00:00:00 2001 From: "David Lum/./Affiliates/Samsung Electronics" Date: Thu, 25 Mar 2021 13:22:21 -0400 Subject: [PATCH] Enable Connection creation from Vault parameters Currently using the Vault secrets backends requires that users store the secrets in connection URI format: https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html#connection-uri-format Unfortunately the connection URI format is not capable of expressing all values of the Connection class. In particular the Connection class allows for arbitrary string values for the `extra` parameter, while the URI format requires that this parameter be unnested JSON so that it can serialize into query parameters. ``` >>> Connection(conn_id='id', conn_type='http', extra='foobar').get_uri() [2021-03-25 13:31:07,535] {connection.py:337} ERROR - Expecting value: line 1 column 1 (char 0) Traceback (most recent call last): File "/Users/da.lum/code/python/airflow/airflow/models/connection.py", line 335, in extra_dejson obj = json.loads(self.extra) File "/nix/store/8kzdflq0v06fq0mh9m2fd73gnyqp57xr-python3-3.7.3/lib/python3.7/json/__init__.py", line 348, in loads return _default_decoder.decode(s) File "/nix/store/8kzdflq0v06fq0mh9m2fd73gnyqp57xr-python3-3.7.3/lib/python3.7/json/decoder.py", line 337, in decode obj, end = self.raw_decode(s, idx=_w(s, 0).end()) File "/nix/store/8kzdflq0v06fq0mh9m2fd73gnyqp57xr-python3-3.7.3/lib/python3.7/json/decoder.py", line 355, in raw_decode raise JSONDecodeError("Expecting value", s, err.value) from None json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0) [2021-03-25 13:31:07,535] {connection.py:338} ERROR - Failed parsing the json for conn_id id 'http://' ``` As shown, the `extra` data is missing from the return value `http://`. Although there is an error logged, this does not help users who were previously able to store other data. --- airflow/providers/hashicorp/secrets/vault.py | 41 ++++++++++++-- .../secrets-backends/hashicorp-vault.rst | 53 ++++++++++++++++--- .../providers/hashicorp/secrets/test_vault.py | 43 +++++++++++++++ 3 files changed, 124 insertions(+), 13 deletions(-) diff --git a/airflow/providers/hashicorp/secrets/vault.py b/airflow/providers/hashicorp/secrets/vault.py index b5e850277a530..084fdc1b03015 100644 --- a/airflow/providers/hashicorp/secrets/vault.py +++ b/airflow/providers/hashicorp/secrets/vault.py @@ -18,6 +18,7 @@ """Objects relating to sourcing connections & variables from Hashicorp Vault""" from typing import Optional +from airflow.models.connection import Connection from airflow.providers.hashicorp._internal_client.vault_client import _VaultClient from airflow.secrets import BaseSecretsBackend from airflow.utils.log.logging_mixin import LoggingMixin @@ -178,6 +179,20 @@ def __init__( **kwargs, ) + def get_response(self, conn_id: str) -> Optional[dict]: + """ + Get data from Vault + + :type conn_id: str + :rtype: dict + :return: The data from the Vault path if exists + """ + if self.connections_path is None: + return None + + secret_path = self.build_path(self.connections_path, conn_id) + return self.vault_client.get_secret(secret_path=secret_path) + def get_conn_uri(self, conn_id: str) -> Optional[str]: """ Get secret value from Vault. Store the secret in the form of URI @@ -187,12 +202,28 @@ def get_conn_uri(self, conn_id: str) -> Optional[str]: :rtype: str :return: The connection uri retrieved from the secret """ - if self.connections_path is None: + response = self.get_response(conn_id) + + return response.get("conn_uri") if response else None + + def get_connection(self, conn_id: str) -> Optional[Connection]: + """ + Get connection from Vault as secret. Prioritize conn_uri if exists, + if not fall back to normal Connection creation. + + :type conn_id: str + :rtype: Connection + :return: A Connection object constructed from Vault data + """ + response = self.get_response(conn_id) + if response is None: return None - else: - secret_path = self.build_path(self.connections_path, conn_id) - response = self.vault_client.get_secret(secret_path=secret_path) - return response.get("conn_uri") if response else None + + uri = response.get("conn_uri") + if uri: + return Connection(conn_id, uri=uri) + + return Connection(conn_id, **response) def get_variable(self, key: str) -> Optional[str]: """ diff --git a/docs/apache-airflow-providers-hashicorp/secrets-backends/hashicorp-vault.rst b/docs/apache-airflow-providers-hashicorp/secrets-backends/hashicorp-vault.rst index b71cd0d41e352..ffd40fee9be9f 100644 --- a/docs/apache-airflow-providers-hashicorp/secrets-backends/hashicorp-vault.rst +++ b/docs/apache-airflow-providers-hashicorp/secrets-backends/hashicorp-vault.rst @@ -44,6 +44,15 @@ key to ``backend_kwargs``: export VAULT_ADDR="http://127.0.0.1:8200" +Set up a Vault mount point +"""""""""""""""""""""""""" + +You can make a ``mount_point`` for ``airflow`` as follows: + +.. code-block:: bash + + vault secrets enable -path=airflow -version=2 kv + Optional lookup """"""""""""""" @@ -60,8 +69,8 @@ For example, if you want to set parameter ``connections_path`` to ``"airflow-con backend = airflow.providers.hashicorp.secrets.vault.VaultBackend backend_kwargs = {"connections_path": "airflow-connections", "variables_path": null, "mount_point": "airflow", "url": "http://127.0.0.1:8200"} -Storing and Retrieving Connections -"""""""""""""""""""""""""""""""""" +Storing and Retrieving Connections using connection URI representation +"""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""" If you have set ``connections_path`` as ``connections`` and ``mount_point`` as ``airflow``, then for a connection id of ``smtp_default``, you would want to store your secret as: @@ -73,12 +82,6 @@ If you have set ``connections_path`` as ``connections`` and ``mount_point`` as ` Note that the ``Key`` is ``conn_uri``, ``Value`` is ``postgresql://airflow:airflow@host:5432/airflow`` and ``mount_point`` is ``airflow``. -You can make a ``mount_point`` for ``airflow`` as follows: - -.. code-block:: bash - - vault secrets enable -path=airflow -version=2 kv - Verify that you can get the secret from ``vault``: .. code-block:: console @@ -100,6 +103,40 @@ Verify that you can get the secret from ``vault``: The value of the Vault key must be the :ref:`connection URI representation ` of the connection object to get connection. +Storing and Retrieving Connections using Connection class representation +"""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""" + +If you have set ``connections_path`` as ``connections`` and ``mount_point`` as ``airflow``, then for a connection id of +``smtp_default``, you would want to store your secret as: + +.. code-block:: bash + + vault kv put airflow/connections/smtp_default conn_type=smtps login=user password=host host=relay.example.com port=465 + +Note that the ``Keys`` are parameters of the ``Connection`` class and the ``Value`` their argument. + +Verify that you can get the secret from ``vault``: + +.. code-block:: console + + ❯ vault kv get airflow/connections/smtp_default + ====== Metadata ====== + Key Value + --- ----- + created_time 2020-03-19T19:17:51.281721Z + deletion_time n/a + destroyed false + version 1 + + ====== Data ====== + Key Value + --- ----- + conn_type smtps + login user + password host + host relay.example.com + port 465 + Storing and Retrieving Variables """""""""""""""""""""""""""""""" diff --git a/tests/providers/hashicorp/secrets/test_vault.py b/tests/providers/hashicorp/secrets/test_vault.py index da293d3204fba..dcd585e72dfb5 100644 --- a/tests/providers/hashicorp/secrets/test_vault.py +++ b/tests/providers/hashicorp/secrets/test_vault.py @@ -59,6 +59,49 @@ def test_get_conn_uri(self, mock_hvac): returned_uri = test_client.get_conn_uri(conn_id="test_postgres") assert 'postgresql://airflow:airflow@host:5432/airflow' == returned_uri + @mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac") + def test_get_connection(self, mock_hvac): + mock_client = mock.MagicMock() + mock_hvac.Client.return_value = mock_client + mock_client.secrets.kv.v2.read_secret_version.return_value = { + 'request_id': '94011e25-f8dc-ec29-221b-1f9c1d9ad2ae', + 'lease_id': '', + 'renewable': False, + 'lease_duration': 0, + 'data': { + 'data': { + 'conn_type': 'postgresql', + 'login': 'airflow', + 'password': 'airflow', + 'host': 'host', + 'port': '5432', + 'schema': 'airflow', + 'extra': '{"foo":"bar","baz":"taz"}', + }, + 'metadata': { + 'created_time': '2020-03-16T21:01:43.331126Z', + 'deletion_time': '', + 'destroyed': False, + 'version': 1, + }, + }, + 'wrap_info': None, + 'warnings': None, + 'auth': None, + } + + kwargs = { + "connections_path": "connections", + "mount_point": "airflow", + "auth_type": "token", + "url": "http://127.0.0.1:8200", + "token": "s.7AU0I51yv1Q1lxOIg1F3ZRAS", + } + + test_client = VaultBackend(**kwargs) + connection = test_client.get_connection(conn_id="test_postgres") + assert 'postgresql://airflow:airflow@host:5432/airflow?foo=bar&baz=taz' == connection.get_uri() + @mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac") def test_get_conn_uri_engine_version_1(self, mock_hvac): mock_client = mock.MagicMock()