Skip to content

Commit

Permalink
More optimized lazy-loading of provider information (#17304)
Browse files Browse the repository at this point in the history
With this change we truly lazy-load hooks and external_links only
when we need them. Previously they were loaded when any of the
properties of ProvidersManager was used, but with this change
in some scenarios where only extra links are used or when we
only need list of providers, but we do not need details on
which custom hooks are needed, there will be much
faster initialization. This is mainly for some CLI commands
(for example `airlfow providers list` is much faster now), but
also in some scenarios where for example .get_conn() is never
used in Tasks, tasks might also never need to import/load the hooks
and they might perform faster, with smaller memory footprint.
  • Loading branch information
potiuk authored Jul 29, 2021
1 parent 53e9349 commit 2dc7aa8
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 16 deletions.
6 changes: 4 additions & 2 deletions airflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,13 @@ def __getattr__(name):
if not settings.LAZY_LOAD_PROVIDERS:
from airflow import providers_manager

providers_manager.ProvidersManager().initialize_providers_manager()
manager = providers_manager.ProvidersManager()
manager.initialize_providers_list()
manager.initialize_providers_hooks()
manager.initialize_providers_extra_links()


# This is never executed, but tricks static analyzers (PyDev, PyCharm,)
# into knowing the types of these symbols, and what
# they contain.
STATICA_HACK = True
globals()['kcah_acitats'[::-1].upper()] = False
Expand Down
62 changes: 48 additions & 14 deletions airflow/providers_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import logging
import os
from collections import OrderedDict
from time import perf_counter
from typing import Any, Dict, NamedTuple, Set

import jsonschema
from wtforms import BooleanField, Field, IntegerField, PasswordField, StringField

from airflow.utils import yaml
from airflow.utils.entry_points import entry_points_with_dist
from airflow.utils.log.logging_mixin import LoggingMixin

try:
import importlib.resources as importlib_resources
Expand Down Expand Up @@ -83,7 +85,7 @@ class ConnectionFormWidgetInfo(NamedTuple):
field: Field


class ProvidersManager:
class ProvidersManager(LoggingMixin):
"""
Manages all provider packages. This is a Singleton class. The first time it is
instantiated, it discovers all available providers in installed packages and
Expand All @@ -99,6 +101,7 @@ def __new__(cls):
return cls._instance

def __init__(self):
"""Initializes the manager."""
# Keeps dict of providers keyed by module name
self._provider_dict: Dict[str, ProviderInfo] = {}
# Keeps dict of hooks keyed by connection type
Expand All @@ -112,30 +115,61 @@ def __init__(self):
self._customized_form_fields_schema_validator = (
_create_customized_form_field_behaviours_schema_validator()
)
self._initialized = False
self._providers_list_initialized = False
self._providers_hooks_initialized = False
self._providers_extra_links_initialized = False

def initialize_providers_manager(self):
"""Lazy initialization of provider data."""
def initialize_providers_list(self):
"""Lazy initialization of providers list."""
# We cannot use @cache here because it does not work during pytest, apparently each test
# runs it it's own namespace and ProvidersManager is a different object in each namespace
# even if it is singleton but @cache on the initialize_providers_manager message still works in the
# even if it is singleton but @cache on the initialize_providers_* still works in the
# way that it is called only once for one of the objects (at least this is how it looks like
# from running tests)
if self._initialized:
if self._providers_list_initialized:
return
start_time = perf_counter()
self.log.debug("Initializing Providers Manager list")
# Local source folders are loaded first. They should take precedence over the package ones for
# Development purpose. In production provider.yaml files are not present in the 'airflow" directory
# So there is no risk we are going to override package provider accidentally. This can only happen
# in case of local development
self._discover_all_airflow_builtin_providers_from_local_sources()
self._discover_all_providers_from_packages()
self._discover_hooks()
self._provider_dict = OrderedDict(sorted(self._provider_dict.items()))
self.log.debug(
"Initialization of Providers Manager list took %.2f seconds", perf_counter() - start_time
)
self._providers_list_initialized = True

def initialize_providers_hooks(self):
"""Lazy initialization of providers hooks."""
if self._providers_hooks_initialized:
return
self.initialize_providers_list()
start_time = perf_counter()
self.log.debug("Initializing Providers Hooks")
self._discover_hooks()
self._hooks_dict = OrderedDict(sorted(self._hooks_dict.items()))
self._connection_form_widgets = OrderedDict(sorted(self._connection_form_widgets.items()))
self._field_behaviours = OrderedDict(sorted(self._field_behaviours.items()))
self.log.debug(
"Initialization of Providers Manager hooks took %.2f seconds", perf_counter() - start_time
)
self._providers_hooks_initialized = True

def initialize_providers_extra_links(self):
"""Lazy initialization of providers extra links."""
if self._providers_extra_links_initialized:
return
self.initialize_providers_list()
start_time = perf_counter()
self.log.debug("Initializing Providers Extra Links")
self._discover_extra_links()
self._initialized = True
self.log.debug(
"Initialization of Providers Manager extra links took %.2f seconds", perf_counter() - start_time
)
self._providers_extra_links_initialized = True

def _discover_all_providers_from_packages(self) -> None:
"""
Expand Down Expand Up @@ -397,29 +431,29 @@ def _add_extra_link(self, extra_link_class_name, provider_package) -> None:
@property
def providers(self) -> Dict[str, ProviderInfo]:
"""Returns information about available providers."""
self.initialize_providers_manager()
self.initialize_providers_list()
return self._provider_dict

@property
def hooks(self) -> Dict[str, HookInfo]:
"""Returns dictionary of connection_type-to-hook mapping"""
self.initialize_providers_manager()
self.initialize_providers_hooks()
return self._hooks_dict

@property
def extra_links_class_names(self):
def extra_links_class_names(self) -> Set[str]:
"""Returns set of extra link class names."""
self.initialize_providers_manager()
self.initialize_providers_extra_links()
return sorted(self._extra_link_class_name_set)

@property
def connection_form_widgets(self) -> Dict[str, ConnectionFormWidgetInfo]:
"""Returns widgets for connection forms."""
self.initialize_providers_manager()
self.initialize_providers_hooks()
return self._connection_form_widgets

@property
def field_behaviours(self) -> Dict[str, Dict]:
"""Returns dictionary with field behaviours for connection types."""
self.initialize_providers_manager()
self.initialize_providers_hooks()
return self._field_behaviours

0 comments on commit 2dc7aa8

Please sign in to comment.