Skip to content

Commit

Permalink
feat: Add supported hooks to OpenLineage docs (apache#41958)
Browse files Browse the repository at this point in the history
Signed-off-by: Kacper Muda <[email protected]>
  • Loading branch information
kacpermuda authored and harjeevanmaan committed Oct 23, 2024
1 parent 0b55c02 commit 6b44750
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 21 deletions.
2 changes: 1 addition & 1 deletion docs/apache-airflow-providers-openlineage/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
Intro <guides/structure>
User <guides/user>
Developer <guides/developer>
Supported operators <supported_classes>
Supported classes <supported_classes>
Macros <macros>

.. toctree::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@
.. _supported_classes:openlineage:

Supported operators
Supported classes
===================

Below is a list of Operators that support OpenLineage extraction, along with specific DB types that are compatible with the SQLExecuteQueryOperator.
Below is a list of Operators and Hooks that support OpenLineage extraction, along with specific DB types that are compatible with the SQLExecuteQueryOperator.

.. important::

While we strive to keep the list of supported operators current,
While we strive to keep the list of supported classes current,
please be aware that our updating process is automated and may not always capture everything accurately.
Detecting hook level lineage is challenging so make sure to double check the information provided below.

.. tip::

Expand Down
151 changes: 139 additions & 12 deletions docs/exts/providers_extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@

import ast
import os
from functools import partial
from pathlib import Path
from typing import Any, Iterable
from typing import Any, Callable, Iterable

# No stub exists for docutils.parsers.rst.directives. See https://github.com/python/typeshed/issues/5755.
from provider_yaml_utils import load_package_data
Expand All @@ -33,7 +34,90 @@
)


def get_import_mappings(tree):
def find_class_methods_with_specific_calls(
class_node: ast.ClassDef, target_calls: set[str], import_mappings: dict[str, str]
) -> set[str]:
"""
Identifies class methods that make specific calls.
This function only tracks target calls within the class scope. Method calling some function defined
will not be taken into consideration even if this function performs a target call.
Method calling other method that performs a target call will also be included.
This function performs a two-pass analysis of the AST:
1. It first identifies methods containing direct calls to the specified functions
and records method calls on `self`.
2. It then identifies methods that indirectly make such calls by invoking the
methods identified in the first pass.
:param class_node: The root node of the AST representing the class to analyze.
:param target_calls: A set of full paths to the method names to track when called.
:param import_mappings: A mapping of import names to fully qualified module names.
:return: Method names within the class that either directly or indirectly make the specified calls.
Examples:
> source_code = '''
... class Example:
... def method1(self):
... my_method().ok()
... def method2(self):
... self.method1()
... def method3(self):
... my_method().not_ok()
... def method4(self):
... self.some_other_method()
... '''
> find_methods_with_specific_calls(
ast.parse(source_code),
{"airflow.my_method.not_ok", "airflow.my_method.ok"},
{"my_method": "airflow.my_method"}
)
{'method1', 'method2', 'method3'}
"""
method_call_map: dict[str, set[str]] = {}
methods_with_calls: set[str] = set()

# First pass: Collect all calls and identify methods with specific calls we are looking for
for node in ast.walk(class_node):
if not isinstance(node, ast.FunctionDef):
continue
method_call_map[node.name] = set()
for sub_node in ast.walk(node):
if not isinstance(sub_node, ast.Call):
continue
called_function = sub_node.func
if not isinstance(called_function, ast.Attribute):
continue
if isinstance(called_function.value, ast.Call) and isinstance(
called_function.value.func, ast.Name
):
full_method_call = (
f"{import_mappings.get(called_function.value.func.id)}.{called_function.attr}"
)
if full_method_call in target_calls:
methods_with_calls.add(node.name)
elif isinstance(called_function.value, ast.Name) and called_function.value.id == "self":
method_call_map[node.name].add(called_function.attr)

# Second pass: Identify all methods that call the ones in `methods_with_calls`
def find_calling_methods(method_name):
for caller, callees in method_call_map.items():
if method_name in callees and caller not in methods_with_calls:
methods_with_calls.add(caller)
find_calling_methods(caller)

for method in list(methods_with_calls):
find_calling_methods(method)

return methods_with_calls


def get_import_mappings(tree) -> dict[str, str]:
"""Retrieve a mapping of local import names to their fully qualified module paths from an AST tree.
:param tree: The AST tree to analyze for import statements.
Expand Down Expand Up @@ -62,7 +146,7 @@ def get_import_mappings(tree):


def _get_module_class_registry(
module_filepath: Path, module_name: str, class_extras: dict[str, Any]
module_filepath: Path, module_name: str, class_extras: dict[str, Callable]
) -> dict[str, dict[str, Any]]:
"""Extracts classes and its information from a Python module file.
Expand All @@ -87,7 +171,10 @@ def _get_module_class_registry(
for b in node.bases
if isinstance(b, ast.Name)
],
**class_extras,
**{
key: callable_(class_node=node, import_mappings=import_mappings)
for key, callable_ in class_extras.items()
},
}
for node in ast_obj.body
if isinstance(node, ast.ClassDef)
Expand Down Expand Up @@ -127,7 +214,9 @@ def _has_method(
return False


def _get_providers_class_registry() -> dict[str, dict[str, Any]]:
def _get_providers_class_registry(
class_extras: dict[str, Callable] | None = None,
) -> dict[str, dict[str, Any]]:
"""Builds a registry of classes from YAML configuration files.
This function scans through YAML configuration files to build a registry of classes.
Expand Down Expand Up @@ -157,7 +246,10 @@ def _get_providers_class_registry() -> dict[str, dict[str, Any]]:
.as_posix()
.replace("/", ".")
),
class_extras={"provider_name": provider_yaml_content["package-name"]},
class_extras={
"provider_name": lambda **kwargs: provider_yaml_content["package-name"],
**(class_extras or {}),
},
)
class_registry.update(module_registry)

Expand All @@ -170,27 +262,44 @@ def _render_openlineage_supported_classes_content():
"get_openlineage_database_info",
"get_openlineage_database_specific_lineage",
)
hook_lineage_collector_path = "airflow.providers.common.compat.lineage.hook.get_hook_lineage_collector"
hook_level_lineage_collector_calls = {
f"{hook_lineage_collector_path}.add_input_asset", # Airflow 3
f"{hook_lineage_collector_path}.add_output_asset", # Airflow 3
f"{hook_lineage_collector_path}.add_input_dataset", # Airflow 2
f"{hook_lineage_collector_path}.add_output_dataset", # Airflow 2
}

class_registry = _get_providers_class_registry(
class_extras={
"methods_with_hook_level_lineage": partial(
find_class_methods_with_specific_calls, target_calls=hook_level_lineage_collector_calls
)
}
)

class_registry = _get_providers_class_registry()
# These excluded classes will be included in docs directly
class_registry.pop("airflow.providers.common.sql.hooks.sql.DbApiHook")
class_registry.pop("airflow.providers.common.sql.operators.sql.SQLExecuteQueryOperator")

providers: dict[str, dict[str, list[str]]] = {}
providers: dict[str, dict[str, Any]] = {}
db_hooks: list[tuple[str, str]] = []
for class_path, info in class_registry.items():
class_name = class_path.split(".")[-1]
if class_name.startswith("_"):
continue
provider_entry = providers.setdefault(info["provider_name"], {"operators": []})
provider_entry = providers.setdefault(info["provider_name"], {"operators": {}, "hooks": {}})

if class_name.lower().endswith("operator"):
if _has_method(
class_path=class_path,
method_names=openlineage_operator_methods,
class_registry=class_registry,
):
provider_entry["operators"].append(class_path)
provider_entry["operators"][class_path] = [
f"{class_path}.{method}"
for method in set(openlineage_operator_methods) & set(info["methods"])
]
elif class_name.lower().endswith("hook"):
if _has_method(
class_path=class_path,
Expand All @@ -200,10 +309,28 @@ def _render_openlineage_supported_classes_content():
db_type = class_name.replace("SqlApiHook", "").replace("Hook", "")
db_hooks.append((db_type, class_path))

elif info["methods_with_hook_level_lineage"]:
provider_entry["hooks"][class_path] = [
f"{class_path}.{method}"
for method in info["methods_with_hook_level_lineage"]
if not method.startswith("_")
]

providers = {
provider: {key: sorted(set(value), key=lambda x: x.split(".")[-1]) for key, value in details.items()}
provider: {
"operators": {
operator: sorted(methods)
for operator, methods in sorted(
details["operators"].items(), key=lambda x: x[0].split(".")[-1]
)
},
"hooks": {
hook: sorted(methods)
for hook, methods in sorted(details["hooks"].items(), key=lambda x: x[0].split(".")[-1])
},
}
for provider, details in sorted(providers.items())
if any(details.values()) # This filters out providers with empty 'operators'
if any(details.values()) # This filters out providers with empty 'operators' and 'hooks'
}
db_hooks = sorted({db_type: hook for db_type, hook in db_hooks}.items(), key=lambda x: x[0])

Expand Down
26 changes: 21 additions & 5 deletions docs/exts/templates/openlineage.rst.jinja2
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
Core operators
==============
At the moment, two core operators supports OpenLineage. These operators function as a 'black box,'
capable of running any code, which might limit the extent of lineage extraction.
capable of running any code, which might limit the extent of lineage extraction. To enhance the extraction
of lineage information, operators can utilize the hooks listed below that support OpenLineage.

- :class:`~airflow.operators.python.PythonOperator` (via :class:`airflow.providers.openlineage.extractors.python.PythonExtractor`)
- :class:`~airflow.providers.standard.operators.bash.BashOperator` (via :class:`airflow.providers.openlineage.extractors.bash.BashExtractor`)
Expand All @@ -36,16 +37,31 @@ a dedicated Hook implementing OpenLineage methods is required. Currently, the fo
{% endfor %}


Provider's operators
====================
The operators listed below from each provider are natively equipped with OpenLineage support.
Providers
=========
The operators and hooks listed below from each provider are natively equipped with OpenLineage support.

{%for provider_name, provider_dict in providers.items() %}
{{ provider_name }}
{{ '"' * (provider_name|length) }}

{% for operator in provider_dict['operators'] %}
{% if provider_dict['operators'] %}
Operators
^^^^^^^^^
{% for operator, methods in provider_dict['operators'].items() %}
- :class:`~{{ operator }}`
{% endfor %}
{% endif %}

{% if provider_dict['hooks'] %}
Hooks
^^^^^
{% for hook, methods in provider_dict['hooks'].items() %}
- :class:`~{{ hook }}`
{% for method in methods %}
- :meth:`~{{ method }}`
{% endfor %}
{% endfor %}
{% endif %}

{% endfor %}

0 comments on commit 6b44750

Please sign in to comment.