Skip to content

Commit

Permalink
Enable Connection creation from Vault parameters
Browse files Browse the repository at this point in the history
Currently using the Vault secrets backends requires that users store
the secrets in connection URI format:
https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html#connection-uri-format

Unfortunately the connection URI format is not capable of expressing
all values of the Connection class. In particular the Connection
class allows for arbitrary string values for the  `extra` parameter,
while the URI format requires that this parameter be unnested JSON
so that it can serialize into query parameters.

```
>>> Connection(conn_id='id', conn_type='http', extra='foobar').get_uri()
[2021-03-25 13:31:07,535] {connection.py:337} ERROR - Expecting value: line 1 column 1 (char 0)
Traceback (most recent call last):
  File "/Users/da.lum/code/python/airflow/airflow/models/connection.py", line 335, in extra_dejson
    obj = json.loads(self.extra)
  File "/nix/store/8kzdflq0v06fq0mh9m2fd73gnyqp57xr-python3-3.7.3/lib/python3.7/json/__init__.py", line 348, in loads
    return _default_decoder.decode(s)
  File "/nix/store/8kzdflq0v06fq0mh9m2fd73gnyqp57xr-python3-3.7.3/lib/python3.7/json/decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/nix/store/8kzdflq0v06fq0mh9m2fd73gnyqp57xr-python3-3.7.3/lib/python3.7/json/decoder.py", line 355, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
[2021-03-25 13:31:07,535] {connection.py:338} ERROR - Failed parsing the json for conn_id id
'http://'
```

As shown, the `extra` data is missing from the return value `http://`.
Although there is an error logged, this does not help users who were
previously able to store other data.
  • Loading branch information
davlum committed Aug 20, 2021
1 parent 50771e0 commit fc0dcff
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 13 deletions.
41 changes: 36 additions & 5 deletions airflow/providers/hashicorp/secrets/vault.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"""Objects relating to sourcing connections & variables from Hashicorp Vault"""
from typing import Optional

from airflow.models.connection import Connection
from airflow.providers.hashicorp._internal_client.vault_client import _VaultClient
from airflow.secrets import BaseSecretsBackend
from airflow.utils.log.logging_mixin import LoggingMixin
Expand Down Expand Up @@ -178,6 +179,20 @@ def __init__(
**kwargs,
)

def get_response(self, conn_id: str) -> Optional[dict]:
"""
Get data from Vault
:type conn_id: str
:rtype: dict
:return: The data from the Vault path if exists
"""
if self.connections_path is None:
return None

secret_path = self.build_path(self.connections_path, conn_id)
return self.vault_client.get_secret(secret_path=secret_path)

def get_conn_uri(self, conn_id: str) -> Optional[str]:
"""
Get secret value from Vault. Store the secret in the form of URI
Expand All @@ -187,12 +202,28 @@ def get_conn_uri(self, conn_id: str) -> Optional[str]:
:rtype: str
:return: The connection uri retrieved from the secret
"""
if self.connections_path is None:
response = self.get_response(conn_id)

return response.get("conn_uri") if response else None

def get_connection(self, conn_id: str) -> Optional[Connection]:
"""
Get connection from Vault as secret. Prioritize conn_uri if exists,
if not fall back to normal Connection creation.
:type conn_id: str
:rtype: Connection
:return: A Connection object constructed from Vault data
"""
response = self.get_response(conn_id)
if response is None:
return None
else:
secret_path = self.build_path(self.connections_path, conn_id)
response = self.vault_client.get_secret(secret_path=secret_path)
return response.get("conn_uri") if response else None

uri = response.get("conn_uri")
if uri:
return Connection(conn_id, uri=uri)

return Connection(conn_id, **response)

def get_variable(self, key: str) -> Optional[str]:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@ key to ``backend_kwargs``:
export VAULT_ADDR="http://127.0.0.1:8200"
Set up a Vault mount point
""""""""""""""""""""""""""

You can make a ``mount_point`` for ``airflow`` as follows:

.. code-block:: bash
vault secrets enable -path=airflow -version=2 kv
Optional lookup
"""""""""""""""

Expand All @@ -60,8 +69,8 @@ For example, if you want to set parameter ``connections_path`` to ``"airflow-con
backend = airflow.providers.hashicorp.secrets.vault.VaultBackend
backend_kwargs = {"connections_path": "airflow-connections", "variables_path": null, "mount_point": "airflow", "url": "http://127.0.0.1:8200"}
Storing and Retrieving Connections
""""""""""""""""""""""""""""""""""
Storing and Retrieving Connections using connection URI representation
""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""

If you have set ``connections_path`` as ``connections`` and ``mount_point`` as ``airflow``, then for a connection id of
``smtp_default``, you would want to store your secret as:
Expand All @@ -73,12 +82,6 @@ If you have set ``connections_path`` as ``connections`` and ``mount_point`` as `
Note that the ``Key`` is ``conn_uri``, ``Value`` is ``postgresql://airflow:airflow@host:5432/airflow`` and
``mount_point`` is ``airflow``.

You can make a ``mount_point`` for ``airflow`` as follows:

.. code-block:: bash
vault secrets enable -path=airflow -version=2 kv
Verify that you can get the secret from ``vault``:

.. code-block:: console
Expand All @@ -100,6 +103,40 @@ Verify that you can get the secret from ``vault``:
The value of the Vault key must be the :ref:`connection URI representation <generating_connection_uri>`
of the connection object to get connection.

Storing and Retrieving Connections using Connection class representation
""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""

If you have set ``connections_path`` as ``connections`` and ``mount_point`` as ``airflow``, then for a connection id of
``smtp_default``, you would want to store your secret as:

.. code-block:: bash
vault kv put airflow/connections/smtp_default conn_type=smtps login=user password=host host=relay.example.com port=465
Note that the ``Keys`` are parameters of the ``Connection`` class and the ``Value`` their argument.

Verify that you can get the secret from ``vault``:

.. code-block:: console
❯ vault kv get airflow/connections/smtp_default
====== Metadata ======
Key Value
--- -----
created_time 2020-03-19T19:17:51.281721Z
deletion_time n/a
destroyed false
version 1
====== Data ======
Key Value
--- -----
conn_type smtps
login user
password host
host relay.example.com
port 465
Storing and Retrieving Variables
""""""""""""""""""""""""""""""""

Expand Down
43 changes: 43 additions & 0 deletions tests/providers/hashicorp/secrets/test_vault.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,49 @@ def test_get_conn_uri(self, mock_hvac):
returned_uri = test_client.get_conn_uri(conn_id="test_postgres")
assert 'postgresql://airflow:airflow@host:5432/airflow' == returned_uri

@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
def test_get_connection(self, mock_hvac):
mock_client = mock.MagicMock()
mock_hvac.Client.return_value = mock_client
mock_client.secrets.kv.v2.read_secret_version.return_value = {
'request_id': '94011e25-f8dc-ec29-221b-1f9c1d9ad2ae',
'lease_id': '',
'renewable': False,
'lease_duration': 0,
'data': {
'data': {
'conn_type': 'postgresql',
'login': 'airflow',
'password': 'airflow',
'host': 'host',
'port': '5432',
'schema': 'airflow',
'extra': '{"foo":"bar","baz":"taz"}',
},
'metadata': {
'created_time': '2020-03-16T21:01:43.331126Z',
'deletion_time': '',
'destroyed': False,
'version': 1,
},
},
'wrap_info': None,
'warnings': None,
'auth': None,
}

kwargs = {
"connections_path": "connections",
"mount_point": "airflow",
"auth_type": "token",
"url": "http://127.0.0.1:8200",
"token": "s.7AU0I51yv1Q1lxOIg1F3ZRAS",
}

test_client = VaultBackend(**kwargs)
connection = test_client.get_connection(conn_id="test_postgres")
assert 'postgresql://airflow:airflow@host:5432/airflow?foo=bar&baz=taz' == connection.get_uri()

@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
def test_get_conn_uri_engine_version_1(self, mock_hvac):
mock_client = mock.MagicMock()
Expand Down

0 comments on commit fc0dcff

Please sign in to comment.