Skip to content

Commit

Permalink
vdk-impala: Add support for lineage in vdk-impala
Browse files Browse the repository at this point in the history
Add support for calculating data lineage for jobs that execute SQL queries
against impala.

Lineage is calculated by examining the query plan of the executed query. The
query plan is retrieved by executing RPC call to Impala coordinator after query
execution. The other option was to execute additional explain statements that
would practically double the amount of executed queries, which could be
problematic for loaded environments. Lineage is calculated only for
successfully executed queries that have actually read or written data from/to
the underlying storage.

Implemented functional tests that verify the behaviour with all templates and
simple jobs. As I hit problems starting Impala with pytest, I have bumped the
timeout waiting Impala to start to 120seconds. Wasn't able to make the change
work on python 3.10. Both tests and lineage collection is disabled for python
3.10. Will try to make it work in separate PR.

New feature (non-breaking change which adds functionality)

Signed-off-by: Vladimir Petkov <[email protected]>
  • Loading branch information
VladimirPetkov1 committed Aug 8, 2022
1 parent e16dc58 commit 4791717
Show file tree
Hide file tree
Showing 8 changed files with 510 additions and 6 deletions.
15 changes: 15 additions & 0 deletions projects/vdk-plugins/vdk-impala/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
This plugin allows vdk-core to interface with and execute queries against an Impala database.
Additionally, it can collect lineage data, assuming a lineage logger has been provided through the vdk-core configuration.

# Usage

Expand All @@ -19,6 +20,20 @@ For example
job_input.execute_query("select 'Hi Impala!'")
```

### Lineage

The package can gather lineage data for all successful Impala SQL queries that have actually read or written data.
Other plugins can read and optionally send the lineage data to separate system.
They need to provide ILineageLogger implementation and hook this way:
```python
@hookimpl
def vdk_initialize(context: CoreContext) -> None:
context.state.set(StoreKey[ILineageLogger]("impala-lineage-logger"), MyLogger())
```

Lineage is calculated based on the query profile which is retrieved by executing additional RPC request against Impala.
If enabled, query plan is retrieved for every successfully executed query against Impala.

<!-- ## Ingestion - not yet implemented so this part is commented out
This plugin allows users to [ingest](https://github.com/vmware/versatile-data-kit/blob/main/projects/vdk-core/src/vdk/api/job_input.py#L90) data to an Impala database,
Expand Down
1 change: 1 addition & 0 deletions projects/vdk-plugins/vdk-impala/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ pydantic
pytest-docker
tabulate
vdk-core
vdk-lineage-model
vdk-test-utils
11 changes: 9 additions & 2 deletions projects/vdk-plugins/vdk-impala/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import setuptools


__version__ = "0.3.0"
__version__ = "0.4.0"

setuptools.setup(
name="vdk-impala",
Expand All @@ -14,7 +14,14 @@
description="Versatile Data Kit SDK plugin provides support for Impala database.",
long_description=pathlib.Path("README.md").read_text(),
long_description_content_type="text/markdown",
install_requires=["vdk-core", "impyla", "tabulate", "pydantic", "pyarrow"],
install_requires=[
"vdk-core",
"vdk-lineage-model",
"impyla",
"tabulate",
"pydantic",
"pyarrow",
],
package_dir={"": "src"},
packages=setuptools.find_namespace_packages(where="src"),
include_package_data=True,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,3 @@ def _connect(self):
)

return conn

def execute_query(self, query: str):
return super().execute_query(query)
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# Copyright 2021 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import logging
import re
import sys
from typing import cast
from typing import Optional
from typing import Tuple

from impala._thrift_gen.RuntimeProfile.ttypes import TRuntimeProfileFormat
from impala.hiveserver2 import HiveServer2Cursor
from vdk.api.lineage.model.logger.lineage_logger import ILineageLogger
from vdk.api.lineage.model.sql.model import LineageData
from vdk.api.lineage.model.sql.model import LineageTable
from vdk.api.plugin.hook_markers import hookimpl
from vdk.internal.builtin_plugins.connection.execution_cursor import ExecutionCursor
from vdk.internal.core.context import CoreContext
from vdk.internal.core.statestore import StoreKey

LINEAGE_LOGGER_KEY = StoreKey[ILineageLogger]("impala-lineage-logger")

log = logging.getLogger(__name__)


class ImpalaLineagePlugin:
"""
Calculates lineage information from the query plan of executed queries.
Lineage is calculated only if the query has finished successfully and has resulted actual data read or write from/to
the underlying storage.
"""

def __init__(self, lineage_logger: ILineageLogger = None):
self._lineage_logger = lineage_logger

# the purpose of the below hook is to get reference to any registered lineage loggers by other plugins
@hookimpl(
trylast=True
) # trylast because we want to have any lineage loggers already initialized
def vdk_initialize(self, context: CoreContext) -> None:
self._lineage_logger = context.state.get(LINEAGE_LOGGER_KEY)

# the purpose of the below hook is to calculate lineage information after a query has been successfully executed
@hookimpl(hookwrapper=True)
def db_connection_execute_operation(
self, execution_cursor: ExecutionCursor
) -> Optional[int]:
yield # let the query execute first
try:
if self._lineage_logger and sys.version_info < (3, 10):
# calculate and send lineage data only if there is a registered lineage logger
# at the time of writing SystemError occurs "PY_SSIZE_T_CLEAN macro must be defined for '#' formats"
# when trying to calculate lineage information, TODO resolve the above error for python 3.10
hive_cursor = cast(HiveServer2Cursor, execution_cursor)
lineage_data = self._get_lineage_data(hive_cursor)
if (
lineage_data
): # some queries do not have data lineage - select 1, create table, compute stats, etc
self._lineage_logger.send(lineage_data)
# do not stop job execution if error occurs during lineage processing
except Exception:
log.warning(
"Error occurred during lineage calculation. No lineage information will be generated."
" Job execution will continue",
exc_info=True,
)

def _get_lineage_data(self, cursor: HiveServer2Cursor) -> Optional[LineageData]:
query_statement = cursor._cursor.query_string
if self._is_keepalive_statement(query_statement):
return None # do not capture lineage for connection keepalive queries like "select 1"
query_profile = cursor.get_profile(profile_format=TRuntimeProfileFormat.STRING)
if "Query Status: OK" not in query_profile:
return None # do not capture lineage for failed queries
inputs, output = self._parse_inputs_outputs(query_profile)
if not inputs and not output:
return None # no lineage was present in the query plan, e.g. select 1 query
return LineageData(
query=query_statement,
query_status="OK",
query_type=None, # TODO specify query_type once there is clear spec for it
input_tables=[
self._get_lineage_table_from_table_name(table_name)
for table_name in inputs
],
output_table=self._get_lineage_table_from_table_name(output),
)

@staticmethod
def _parse_inputs_outputs(query_profile: str) -> Tuple[list, str]:
inputs = []
output = None
for line in query_profile.splitlines():
match = re.search(r"(?<=SCAN HDFS \[)\S*(?=,)", line)
if match:
inputs.append(match.group(0))
else:
match = re.search(r"(?<=WRITE TO HDFS \[)\S*(?=,)", line)
if match:
output = match.group(0)
return list(set(inputs)), output

@staticmethod
def _get_lineage_table_from_table_name(table_name: str) -> LineageTable:
split_name = table_name.split(".")
return LineageTable(schema=split_name[0], table=split_name[1], catalog=None)

@staticmethod
def _is_keepalive_statement(query_statement):
return query_statement.endswith(
"\n select 1 -- Testing if connection is alive."
)
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from vdk.plugin.impala.impala_connection import ImpalaConnection
from vdk.plugin.impala.impala_error_classifier import is_impala_user_error
from vdk.plugin.impala.impala_error_handler import ImpalaErrorHandler
from vdk.plugin.impala.impala_lineage_plugin import ImpalaLineagePlugin


def _connection_by_configuration(configuration: ImpalaPluginConfiguration):
Expand Down Expand Up @@ -149,6 +150,9 @@ def db_connection_decorate_operation(self, decoration_cursor: DecorationCursor):
@hookimpl
def vdk_start(plugin_registry: IPluginRegistry, command_line_args: List):
plugin_registry.load_plugin_with_hooks_impl(ImpalaPlugin(), "impala-plugin")
plugin_registry.load_plugin_with_hooks_impl(
ImpalaLineagePlugin(), "impala-lineage-plugin"
)


def get_jobs_parent_directory() -> pathlib.Path:
Expand Down
2 changes: 1 addition & 1 deletion projects/vdk-plugins/vdk-impala/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,6 @@ def impala_service(docker_ip, docker_services):
time.sleep(3)

docker_services.wait_until_responsive(
timeout=90.0, pause=0.3, check=lambda: _is_responsive(runner)
timeout=120.0, pause=0.3, check=lambda: _is_responsive(runner)
)
time.sleep(10)
Loading

0 comments on commit 4791717

Please sign in to comment.