Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace sequence concatination by unpacking in Airflow core #33934

Merged
merged 2 commits into from
Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow/api/common/trigger_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def _trigger_dag(
run_conf = conf if isinstance(conf, dict) else json.loads(conf)

dag_runs = []
dags_to_run = [dag] + dag.subdags
dags_to_run = [dag, *dag.subdags]
for _dag in dags_to_run:
dag_run = _dag.create_dagrun(
run_id=run_id,
Expand Down
4 changes: 2 additions & 2 deletions airflow/api_connexion/schemas/common_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,15 +135,15 @@ class ColorField(fields.String):

def __init__(self, **metadata):
super().__init__(**metadata)
self.validators = [validate.Regexp("^#[a-fA-F0-9]{3,6}$")] + list(self.validators)
self.validators = [validate.Regexp("^#[a-fA-F0-9]{3,6}$"), *self.validators]


class WeightRuleField(fields.String):
"""Schema for WeightRule."""

def __init__(self, **metadata):
super().__init__(**metadata)
self.validators = [validate.OneOf(WeightRule.all_weight_rules())] + list(self.validators)
self.validators = [validate.OneOf(WeightRule.all_weight_rules()), *self.validators]


class TimezoneField(fields.String):
Expand Down
4 changes: 2 additions & 2 deletions airflow/api_connexion/schemas/enum_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ class DagStateField(fields.String):

def __init__(self, **metadata):
super().__init__(**metadata)
self.validators = [validate.OneOf(State.dag_states)] + list(self.validators)
self.validators = [validate.OneOf(State.dag_states), *self.validators]


class TaskInstanceStateField(fields.String):
"""Schema for TaskInstanceState Enum."""

def __init__(self, **metadata):
super().__init__(**metadata)
self.validators = [validate.OneOf(State.task_states)] + list(self.validators)
self.validators = [validate.OneOf(State.task_states), *self.validators]
2 changes: 1 addition & 1 deletion airflow/cli/cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1619,7 +1619,7 @@ class GroupCommand(NamedTuple):
name="add",
help="Add a connection",
func=lazy_load_command("airflow.cli.commands.connection_command.connections_add"),
args=(ARG_CONN_ID, ARG_CONN_URI, ARG_CONN_JSON, ARG_CONN_EXTRA) + tuple(ALTERNATIVE_CONN_SPECS_ARGS),
args=(ARG_CONN_ID, ARG_CONN_URI, ARG_CONN_JSON, ARG_CONN_EXTRA, *ALTERNATIVE_CONN_SPECS_ARGS),
),
ActionCommand(
name="delete",
Expand Down
2 changes: 1 addition & 1 deletion airflow/cli/commands/standalone_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ def __init__(self, parent, name: str, command: list[str], env: dict[str, str]):
def run(self):
"""Run the actual process and captures it output to a queue."""
self.process = subprocess.Popen(
["airflow"] + self.command,
["airflow", *self.command],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
env=self.env,
Expand Down
2 changes: 1 addition & 1 deletion airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ def inversed_deprecated_sections(self):
("logging", "logging_level"): _available_logging_levels,
("logging", "fab_logging_level"): _available_logging_levels,
# celery_logging_level can be empty, which uses logging_level as fallback
("logging", "celery_logging_level"): _available_logging_levels + [""],
("logging", "celery_logging_level"): [*_available_logging_levels, ""],
("webserver", "analytical_tool"): ["google_analytics", "metarouter", "segment", ""],
}

Expand Down
2 changes: 1 addition & 1 deletion airflow/jobs/backfill_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -788,7 +788,7 @@ def tabulate_ti_keys_set(ti_keys: Iterable[TaskInstanceKey]) -> str:
yield tabulate_ti_keys_set([ti.key for ti in ti_status.deadlocked])

def _get_dag_with_subdags(self) -> list[DAG]:
return [self.dag] + self.dag.subdags
return [self.dag, *self.dag.subdags]

@provide_session
def _execute_dagruns(
Expand Down
7 changes: 4 additions & 3 deletions airflow/kubernetes/pre_7_4_0_compatibility/pod_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,9 +366,10 @@ def reconcile_containers(
client_container = extend_object_field(base_container, client_container, "volume_devices")
client_container = merge_objects(base_container, client_container)

return [client_container] + PodGenerator.reconcile_containers(
base_containers[1:], client_containers[1:]
)
return [
client_container,
*PodGenerator.reconcile_containers(base_containers[1:], client_containers[1:]),
]

@classmethod
def construct_pod(
Expand Down
2 changes: 1 addition & 1 deletion airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1699,7 +1699,7 @@ def _get_task_instances(
if include_subdags:
# Crafting the right filter for dag_id and task_ids combo
conditions = []
for dag in self.subdags + [self]:
for dag in [*self.subdags, self]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if itertools.chain would be better here

Copy link
Member

@Lee-W Lee-W Aug 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did some tests on it. It seems performance-wise [*[1, 2, 3], 4, 5, 6, 7] is the best solution

$ python -m timeit '[1, 2, 3] + [4] + [5] + [6] + [7]'
2000000 loops, best of 5: 179 nsec per loop

$ python -m timeit '[*[1, 2, 3], 4, 5, 6, 7]'
5000000 loops, best of 5: 69.2 nsec per loop

$ python -m timeit 'import itertools; itertools.chain([1, 2, 3], [4, 5, 6, 7])'
2000000 loops, best of 5: 177 nsec per loop

$ python -m timeit 'import itertools; [1, 2, 3] + [4] + [5] + [6] + [7]'
1000000 loops, best of 5: 242 nsec per loop

$ python -m timeit 'import itertools;  [*[1, 2, 3], 4, 5, 6, 7]'
2000000 loops, best of 5: 131 nsec per loop

$ python -m timeit 'import itertools; list(itertools.chain([1, 2, 3], [4, 5, 6, 7]))'
1000000 loops, best of 5: 305 nsec per loop

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A big chunk of this would come from import itertools though, which would not be relevant in Airflow since the module is already imported in a lot of places. I would not be surprised if * is still best for small lists though, since the itertools version still needs to build an additional list (of a single item).

conditions.append(
(TaskInstance.dag_id == dag.dag_id) & TaskInstance.task_id.in_(dag.task_ids)
)
Expand Down
6 changes: 2 additions & 4 deletions airflow/utils/python_virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,13 @@ def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages
def _generate_pip_install_cmd_from_file(
tmp_dir: str, requirements_file_path: str, pip_install_options: list[str]
) -> list[str]:
cmd = [f"{tmp_dir}/bin/pip", "install"] + pip_install_options + ["-r"]
return cmd + [requirements_file_path]
return [f"{tmp_dir}/bin/pip", "install", *pip_install_options, "-r", requirements_file_path]


def _generate_pip_install_cmd_from_list(
tmp_dir: str, requirements: list[str], pip_install_options: list[str]
) -> list[str]:
cmd = [f"{tmp_dir}/bin/pip", "install"] + pip_install_options
return cmd + requirements
return [f"{tmp_dir}/bin/pip", "install", *pip_install_options, *requirements]


def _generate_pip_conf(conf_file: Path, index_urls: list[str]) -> None:
Expand Down
2 changes: 1 addition & 1 deletion airflow/utils/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class State:
finished_dr_states: frozenset[DagRunState] = frozenset([DagRunState.SUCCESS, DagRunState.FAILED])
unfinished_dr_states: frozenset[DagRunState] = frozenset([DagRunState.QUEUED, DagRunState.RUNNING])

task_states: tuple[TaskInstanceState | None, ...] = (None,) + tuple(TaskInstanceState)
task_states: tuple[TaskInstanceState | None, ...] = (None, *TaskInstanceState)

dag_states: tuple[DagRunState, ...] = (
DagRunState.QUEUED,
Expand Down
6 changes: 4 additions & 2 deletions airflow/www/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,8 @@ class AirflowFilterConverter(fab_sqlafilters.SQLAFilterConverter):
"is_extendedjson",
[],
),
) + fab_sqlafilters.SQLAFilterConverter.conversion_table
*fab_sqlafilters.SQLAFilterConverter.conversion_table,
)

def __init__(self, datamodel):
super().__init__(datamodel)
Expand Down Expand Up @@ -877,7 +878,8 @@ def delete_all(self, items: list[Model]) -> bool:
# place
FieldConverter.conversion_table = (
("is_utcdatetime", DateTimeWithTimezoneField, AirflowDateTimePickerWidget),
) + FieldConverter.conversion_table
*FieldConverter.conversion_table,
)


class UIAlert:
Expand Down