Skip to content

Commit

Permalink
Fix retrieval of deprecated non-config values (#23723)
Browse files Browse the repository at this point in the history
It turned out that deprecation of config values did not work as
intended. While deprecation worked fine when the value was specified
in configuration value it did not work when `run_as_user` was used.

In those cases the "as_dict" option was used to generate temporary
configuratin and this temporary configuration contained default value
for the new configuration value - for example it caused that
the generated temporary value contained:

```
[database]
sql_alchemy_conn=sqlite:///{AIRFLOW_HOME}/airflow.db
```

Even if the deprecated `core/sql_alchemy_conn` was set (and no
new `database/sql_alchemy_conn` was set at the same time.

This effectively rendered the old installation that did not convert
to the new "database" configuration not working for run_as_user, because
the tasks run with "run_as_user" used wrong, empty sqlite database
instaead of the one configured for Airflow.

Also during adding tests, it turned out that the mechanism was also
not working as intended before - in case `_CMD` or `_SECRET` were used
as environment variables rather than configuration. In those cases
both _CMD and _SECRET should be evaluated during as_dict() evaluation,
because the "run_as_user" might have not enough permission to run the
command or retrieve secret. The _cmd and _secret variables were only
evaluated during as_dict() when they were in the config file (note
that this only happens when include_cmd, include_env, include_secret
are set to True).

The changes implemented in this PR fix both problems:

* the _CMD and _SECRET env vars are evaluated during as_dict when the
  respective include_* is set
* the defaults are only set for the values that have deprecations
  in case the deprecations have no values set in either of the ways:
    * in config file
    * in env variable
    * in _cmd (via config file or env variable)
    * in _secret (via config file or env variable)

Fixes: #23679
(cherry picked from commit 888bc2e)
  • Loading branch information
potiuk authored and ephraimbuddy committed May 20, 2022
1 parent 743f4b9 commit d61cde6
Show file tree
Hide file tree
Showing 8 changed files with 849 additions and 189 deletions.
2 changes: 1 addition & 1 deletion airflow/config_templates/config.yml.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
},
"sensitive": {
"type": "boolean",
"description": "When true, this option is sensitive and can be specified using AIRFLOW__{section}___{name}__SECRET or AIRFLOW__{section}___{name}__CMD environment variables. See: airflow.configuration.AirflowConfigParser.sensitive_config_values"
"description": "When true, this option is sensitive and can be specified using AIRFLOW__{section}___{name}__SECRET or AIRFLOW__{section}___{name}_CMD environment variables. See: airflow.configuration.AirflowConfigParser.sensitive_config_values"
}
},
"required": [
Expand Down
180 changes: 164 additions & 16 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
ConfigSectionSourcesType = Dict[str, Union[str, Tuple[str, str]]]
ConfigSourcesType = Dict[str, ConfigSectionSourcesType]

ENV_VAR_PREFIX = 'AIRFLOW__'


def _parse_sqlite_version(s: str) -> Tuple[int, ...]:
match = _SQLITE3_VERSION_PATTERN.match(s)
Expand Down Expand Up @@ -144,7 +146,7 @@ class AirflowConfigParser(ConfigParser):
"""Custom Airflow Configparser supporting defaults and deprecated options"""

# These configuration elements can be fetched as the stdout of commands
# following the "{section}__{name}__cmd" pattern, the idea behind this
# following the "{section}__{name}_cmd" pattern, the idea behind this
# is to not store password on boxes in text files.
# These configs can also be fetched from Secrets backend
# following the "{section}__{name}__secret" pattern
Expand Down Expand Up @@ -435,10 +437,8 @@ def _create_future_warning(name: str, section: str, current_value: Any, new_valu
FutureWarning,
)

ENV_VAR_PREFIX = 'AIRFLOW__'

def _env_var_name(self, section: str, key: str) -> str:
return f'{self.ENV_VAR_PREFIX}{section.upper()}__{key.upper()}'
return f'{ENV_VAR_PREFIX}{section.upper()}__{key.upper()}'

def _get_env_var_option(self, section: str, key: str):
# must have format AIRFLOW__{SECTION}__{KEY} (note double underscore)
Expand All @@ -461,23 +461,53 @@ def _get_env_var_option(self, section: str, key: str):

def _get_cmd_option(self, section: str, key: str):
fallback_key = key + '_cmd'
# if this is a valid command key...
if (section, key) in self.sensitive_config_values:
if super().has_option(section, fallback_key):
command = super().get(section, fallback_key)
return run_command(command)
return None

def _get_cmd_option_from_config_sources(
self, config_sources: ConfigSourcesType, section: str, key: str
) -> Optional[str]:
fallback_key = key + '_cmd'
if (section, key) in self.sensitive_config_values:
section_dict = config_sources.get(section)
if section_dict is not None:
command_value = section_dict.get(fallback_key)
if command_value is not None:
if isinstance(command_value, str):
command = command_value
else:
command = command_value[0]
return run_command(command)
return None

def _get_secret_option(self, section: str, key: str) -> Optional[str]:
"""Get Config option values from Secret Backend"""
fallback_key = key + '_secret'
# if this is a valid secret key...
if (section, key) in self.sensitive_config_values:
if super().has_option(section, fallback_key):
secrets_path = super().get(section, fallback_key)
return _get_config_value_from_secret_backend(secrets_path)
return None

def _get_secret_option_from_config_sources(
self, config_sources: ConfigSourcesType, section: str, key: str
) -> Optional[str]:
fallback_key = key + '_secret'
if (section, key) in self.sensitive_config_values:
section_dict = config_sources.get(section)
if section_dict is not None:
secrets_path_value = section_dict.get(fallback_key)
if secrets_path_value is not None:
if isinstance(secrets_path_value, str):
secrets_path = secrets_path_value
else:
secrets_path = secrets_path_value[0]
return _get_config_value_from_secret_backend(secrets_path)
return None

def get_mandatory_value(self, section: str, key: str, **kwargs) -> str:
value = self.get(section, key, **kwargs)
if value is None:
Expand Down Expand Up @@ -859,7 +889,16 @@ def as_dict(
('airflow.cfg', self),
]

self._replace_config_with_display_sources(config_sources, configs, display_source, raw)
self._replace_config_with_display_sources(
config_sources,
configs,
display_source,
raw,
self.deprecated_options,
include_cmds=include_cmds,
include_env=include_env,
include_secret=include_secret,
)

# add env vars and overwrite because they have priority
if include_env:
Expand Down Expand Up @@ -889,7 +928,7 @@ def _include_secrets(
raw: bool,
):
for (section, key) in self.sensitive_config_values:
value: Optional[str] = self._get_secret_option(section, key)
value: Optional[str] = self._get_secret_option_from_config_sources(config_sources, section, key)
if value:
if not display_sensitive:
value = '< hidden >'
Expand All @@ -910,17 +949,20 @@ def _include_commands(
raw: bool,
):
for (section, key) in self.sensitive_config_values:
opt = self._get_cmd_option(section, key)
opt = self._get_cmd_option_from_config_sources(config_sources, section, key)
if not opt:
continue
opt_to_set: Union[str, Tuple[str, str], None] = opt
if not display_sensitive:
opt = '< hidden >'
opt_to_set = '< hidden >'
if display_source:
opt = (opt, 'cmd')
opt_to_set = (str(opt_to_set), 'cmd')
elif raw:
opt = opt.replace('%', '%%')
config_sources.setdefault(section, OrderedDict()).update({key: opt})
del config_sources[section][key + '_cmd']
opt_to_set = str(opt_to_set).replace('%', '%%')
if opt_to_set is not None:
dict_to_update: Dict[str, Union[str, Tuple[str, str]]] = {key: opt_to_set}
config_sources.setdefault(section, OrderedDict()).update(dict_to_update)
del config_sources[section][key + '_cmd']

def _include_envs(
self,
Expand All @@ -930,7 +972,7 @@ def _include_envs(
raw: bool,
):
for env_var in [
os_environment for os_environment in os.environ if os_environment.startswith(self.ENV_VAR_PREFIX)
os_environment for os_environment in os.environ if os_environment.startswith(ENV_VAR_PREFIX)
]:
try:
_, section, key = env_var.split('__', 2)
Expand Down Expand Up @@ -1010,13 +1052,82 @@ def _replace_config_with_display_sources(
configs: Iterable[Tuple[str, ConfigParser]],
display_source: bool,
raw: bool,
deprecated_options: Dict[Tuple[str, str], Tuple[str, str, str]],
include_env: bool,
include_cmds: bool,
include_secret: bool,
):
for (source_name, config) in configs:
for section in config.sections():
AirflowConfigParser._replace_section_config_with_display_sources(
config, config_sources, display_source, raw, section, source_name
config,
config_sources,
display_source,
raw,
section,
source_name,
deprecated_options,
configs,
include_env=include_env,
include_cmds=include_cmds,
include_secret=include_secret,
)

@staticmethod
def _deprecated_value_is_set_in_config(
deprecated_section: str,
deprecated_key: str,
configs: Iterable[Tuple[str, ConfigParser]],
) -> bool:
for config_type, config in configs:
if config_type == 'default':
continue
try:
deprecated_section_array = config.items(section=deprecated_section, raw=True)
for (key_candidate, _) in deprecated_section_array:
if key_candidate == deprecated_key:
return True
except NoSectionError:
pass
return False

@staticmethod
def _deprecated_variable_is_set(deprecated_section: str, deprecated_key: str) -> bool:
return (
os.environ.get(f'{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}')
is not None
)

@staticmethod
def _deprecated_command_is_set_in_config(
deprecated_section: str, deprecated_key: str, configs: Iterable[Tuple[str, ConfigParser]]
) -> bool:
return AirflowConfigParser._deprecated_value_is_set_in_config(
deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_cmd", configs=configs
)

@staticmethod
def _deprecated_variable_command_is_set(deprecated_section: str, deprecated_key: str) -> bool:
return (
os.environ.get(f'{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_CMD')
is not None
)

@staticmethod
def _deprecated_secret_is_set_in_config(
deprecated_section: str, deprecated_key: str, configs: Iterable[Tuple[str, ConfigParser]]
) -> bool:
return AirflowConfigParser._deprecated_value_is_set_in_config(
deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_secret", configs=configs
)

@staticmethod
def _deprecated_variable_secret_is_set(deprecated_section: str, deprecated_key: str) -> bool:
return (
os.environ.get(f'{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_SECRET')
is not None
)

@staticmethod
def _replace_section_config_with_display_sources(
config: ConfigParser,
Expand All @@ -1025,9 +1136,46 @@ def _replace_section_config_with_display_sources(
raw: bool,
section: str,
source_name: str,
deprecated_options: Dict[Tuple[str, str], Tuple[str, str, str]],
configs: Iterable[Tuple[str, ConfigParser]],
include_env: bool,
include_cmds: bool,
include_secret: bool,
):
sect = config_sources.setdefault(section, OrderedDict())
for (k, val) in config.items(section=section, raw=raw):
deprecated_section, deprecated_key, _ = deprecated_options.get((section, k), (None, None, None))
if deprecated_section and deprecated_key:
if source_name == 'default':
# If deprecated entry has some non-default value set for any of the sources requested,
# We should NOT set default for the new entry (because it will override anything
# coming from the deprecated ones)
if AirflowConfigParser._deprecated_value_is_set_in_config(
deprecated_section, deprecated_key, configs
):
continue
if include_env and AirflowConfigParser._deprecated_variable_is_set(
deprecated_section, deprecated_key
):
continue
if include_cmds and (
AirflowConfigParser._deprecated_variable_command_is_set(
deprecated_section, deprecated_key
)
or AirflowConfigParser._deprecated_command_is_set_in_config(
deprecated_section, deprecated_key, configs
)
):
continue
if include_secret and (
AirflowConfigParser._deprecated_variable_secret_is_set(
deprecated_section, deprecated_key
)
or AirflowConfigParser._deprecated_secret_is_set_in_config(
deprecated_section, deprecated_key, configs
)
):
continue
if display_source:
sect[k] = (val, source_name)
else:
Expand Down
28 changes: 28 additions & 0 deletions tests/config_templates/deprecated.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.


# This is the template for Airflow's unit test configuration. When Airflow runs
# unit tests, it looks for a configuration file at $AIRFLOW_HOME/unittests.cfg.
# If it doesn't exist, Airflow uses this template to generate it by replacing
# variables in curly braces with their global values from configuration.py.

# Users should not modify this file; they should customize the generated
# unittests.cfg instead.
[core]
sql_alchemy_conn = mysql://
29 changes: 29 additions & 0 deletions tests/config_templates/deprecated_cmd.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.


# This is the template for Airflow's unit test configuration. When Airflow runs
# unit tests, it looks for a configuration file at $AIRFLOW_HOME/unittests.cfg.
# If it doesn't exist, Airflow uses this template to generate it by replacing
# variables in curly braces with their global values from configuration.py.

# Users should not modify this file; they should customize the generated
# unittests.cfg instead.

[core]
sql_alchemy_conn_cmd = echo -n "postgresql://"
29 changes: 29 additions & 0 deletions tests/config_templates/deprecated_secret.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.


# This is the template for Airflow's unit test configuration. When Airflow runs
# unit tests, it looks for a configuration file at $AIRFLOW_HOME/unittests.cfg.
# If it doesn't exist, Airflow uses this template to generate it by replacing
# variables in curly braces with their global values from configuration.py.

# Users should not modify this file; they should customize the generated
# unittests.cfg instead.

[core]
sql_alchemy_conn_secret = secret_path
Loading

0 comments on commit d61cde6

Please sign in to comment.