Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/databricks): support hive metastore schemas with special char #10049

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions metadata-ingestion/docs/sources/databricks/README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
DataHub supports integration with Databricks ecosystem using a multitude of connectors, depending on your exact setup.

## Databricks Hive
## Databricks Hive (old)

The simplest way to integrate is usually via the Hive connector. The [Hive starter recipe](http://datahubproject.io/docs/generated/ingestion/sources/hive#starter-recipe) has a section describing how to connect to your Databricks workspace.
mayurinehate marked this conversation as resolved.
Show resolved Hide resolved

## Databricks Unity Catalog (new)

The recently introduced [Unity Catalog](https://www.databricks.com/product/unity-catalog) provides a new way to govern your assets within the Databricks lakehouse. If you have enabled Unity Catalog, you can use the `unity-catalog` source (see below) to integrate your metadata into DataHub as an alternate to the Hive pathway.
The recently introduced [Unity Catalog](https://www.databricks.com/product/unity-catalog) provides a new way to govern your assets within the Databricks lakehouse. If you have Unity Catalog Enabled Workspace, you can use the `unity-catalog` source (aka `databricks` source, see below for details) to integrate your metadata into DataHub as an alternate to the Hive pathway. This also ingests hive metastore catalog in Databricks and is recommended approach to ingest Databricks ecosystem in DataHub.

## Databricks Spark

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,47 @@ def hive_metastore_schemas(self, catalog: Catalog) -> Iterable[Schema]:
)

def hive_metastore_tables(self, schema: Schema) -> Iterable[Table]:
views = self.inspector.get_view_names(schema.name)
views = self.get_view_names(schema.name)
mayurinehate marked this conversation as resolved.
Show resolved Hide resolved
for table_name in views:
yield self._get_table(schema, table_name, True)
try:
yield self._get_table(schema, table_name, True)
except Exception as e:
logger.debug(
f"Failed to get table {schema.name}.{table_name} due to {e}",
exc_info=True,
)

for table_name in self.inspector.get_table_names(schema.name):
for table_name in self.get_table_names(schema.name):
if table_name in views:
continue
yield self._get_table(schema, table_name, False)
try:
yield self._get_table(schema, table_name, False)
except Exception as e:
logger.debug(
f"Failed to get table {schema.name}.{table_name} due to {e}",
exc_info=True,
)

def get_table_names(self, schema_name: str) -> List[str]:
try:
rows = self._execute_sql(f"SHOW TABLES FROM `{schema_name}`")
# 3 columns - database, tableName, isTemporary
return [row.tableName for row in rows]
except Exception as e:
logger.debug(
f"Failed to get tables {schema_name} due to {e}", exc_info=True
)
return []

def get_view_names(self, schema_name: str) -> List[str]:
try:

rows = self._execute_sql(f"SHOW VIEWS FROM `{schema_name}`")
# 3 columns - database, tableName, isTemporary
return [row.tableName for row in rows]
except Exception as e:
logger.debug(f"Failed to get views {schema_name} due to {e}", exc_info=True)
return []

def _get_table(
self,
Expand All @@ -134,9 +167,9 @@ def _get_table(
columns=columns,
storage_location=storage_location,
data_source_format=datasource_format,
view_definition=self._get_view_definition(schema.name, table_name)
if is_view
else None,
view_definition=(
self._get_view_definition(schema.name, table_name) if is_view else None
),
properties=detailed_info,
owner=None,
generation=None,
Expand Down Expand Up @@ -170,41 +203,53 @@ def get_table_profile(
else {}
)

column_profiles: List[ColumnProfile] = []
if include_column_stats:
for column in columns:
column_profile = self._get_column_profile(column.name, ref)
if column_profile:
column_profiles.append(column_profile)

return TableProfile(
num_rows=int(table_stats[ROWS])
if table_stats.get(ROWS) is not None
else None,
total_size=int(table_stats[BYTES])
if table_stats.get(BYTES) is not None
else None,
num_rows=(
int(table_stats[ROWS]) if table_stats.get(ROWS) is not None else None
),
total_size=(
int(table_stats[BYTES]) if table_stats.get(BYTES) is not None else None
),
num_columns=len(columns),
column_profiles=[
self._get_column_profile(column.name, ref) for column in columns
]
if include_column_stats
else [],
column_profiles=column_profiles,
)

def _get_column_profile(self, column: str, ref: TableReference) -> ColumnProfile:

props = self._column_describe_extended(ref.schema, ref.table, column)
col_stats = {}
for prop in props:
col_stats[prop[0]] = prop[1]
return ColumnProfile(
name=column,
null_count=int(col_stats[NUM_NULLS])
if col_stats.get(NUM_NULLS) is not None
else None,
distinct_count=int(col_stats[DISTINCT_COUNT])
if col_stats.get(DISTINCT_COUNT) is not None
else None,
min=col_stats.get(MIN),
max=col_stats.get(MAX),
avg_len=col_stats.get(AVG_COL_LEN),
max_len=col_stats.get(MAX_COL_LEN),
version=col_stats.get(VERSION),
)
def _get_column_profile(
self, column: str, ref: TableReference
) -> Optional[ColumnProfile]:
try:
props = self._column_describe_extended(ref.schema, ref.table, column)
col_stats = {}
for prop in props:
col_stats[prop[0]] = prop[1]
return ColumnProfile(
name=column,
null_count=(
int(col_stats[NUM_NULLS])
if col_stats.get(NUM_NULLS) is not None
else None
),
distinct_count=(
int(col_stats[DISTINCT_COUNT])
if col_stats.get(DISTINCT_COUNT) is not None
else None
),
min=col_stats.get(MIN),
max=col_stats.get(MAX),
avg_len=col_stats.get(AVG_COL_LEN),
max_len=col_stats.get(MAX_COL_LEN),
version=col_stats.get(VERSION),
)
except Exception as e:
logger.debug(f"Failed to get column profile for {ref}.{column} due to {e}")
return None

def _get_cached_table_statistics(self, statistics: str) -> dict:
# statistics is in format "xx bytes" OR "1382 bytes, 2 rows"
Expand Down Expand Up @@ -242,9 +287,10 @@ def _get_view_definition(self, schema_name: str, table_name: str) -> Optional[st
)
for row in rows:
return row[0]
except Exception:
except Exception as e:
logger.debug(
f"Failed to get view definition for {schema_name}.{table_name}"
f"Failed to get view definition for {schema_name}.{table_name} due to {e}",
exc_info=True,
)
return None

Expand Down
53 changes: 31 additions & 22 deletions metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
Manage the communication with DataBricks Server and provide equivalent dataclasses for dependent modules
"""

import dataclasses
import logging
from datetime import datetime, timezone
Expand Down Expand Up @@ -204,16 +205,16 @@ def workspace_notebooks(self) -> Iterable[Notebook]:
id=obj.object_id,
path=obj.path,
language=obj.language,
created_at=datetime.fromtimestamp(
obj.created_at / 1000, tz=timezone.utc
)
if obj.created_at
else None,
modified_at=datetime.fromtimestamp(
obj.modified_at / 1000, tz=timezone.utc
)
if obj.modified_at
else None,
created_at=(
datetime.fromtimestamp(obj.created_at / 1000, tz=timezone.utc)
if obj.created_at
else None
),
modified_at=(
datetime.fromtimestamp(obj.modified_at / 1000, tz=timezone.utc)
if obj.modified_at
else None
),
)

def query_history(
Expand Down Expand Up @@ -268,12 +269,14 @@ def _query_history(
response: dict = self._workspace_client.api_client.do( # type: ignore
method, path, body={**body, "filter_by": filter_by.as_dict()}
)
# we use default raw=False in above request, therefore will always get dict
# we use default raw=False(default) in above request, therefore will always get dict
while True:
if "res" not in response or not response["res"]:
return
for v in response["res"]:
yield QueryInfo.from_dict(v)
if not response.get("next_page_token"): # last page
return
response = self._workspace_client.api_client.do( # type: ignore
method, path, body={**body, "page_token": response["next_page_token"]}
)
Expand Down Expand Up @@ -434,22 +437,28 @@ def _create_table(
schema=schema,
storage_location=obj.storage_location,
data_source_format=obj.data_source_format,
columns=list(self._extract_columns(obj.columns, table_id))
if obj.columns
else [],
columns=(
list(self._extract_columns(obj.columns, table_id))
if obj.columns
else []
),
view_definition=obj.view_definition or None,
properties=obj.properties or {},
owner=obj.owner,
generation=obj.generation,
created_at=datetime.fromtimestamp(obj.created_at / 1000, tz=timezone.utc)
if obj.created_at
else None,
created_at=(
datetime.fromtimestamp(obj.created_at / 1000, tz=timezone.utc)
if obj.created_at
else None
),
created_by=obj.created_by,
updated_at=datetime.fromtimestamp(obj.updated_at / 1000, tz=timezone.utc)
if obj.updated_at
else None
if obj.updated_at
else None,
updated_at=(
datetime.fromtimestamp(obj.updated_at / 1000, tz=timezone.utc)
if obj.updated_at
else None
if obj.updated_at
else None
),
updated_by=obj.updated_by,
table_id=obj.table_id,
comment=obj.comment,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ def get_table_stats(
)
else:
return None
except Exception as e:
logger.debug(f"Failed to get table stats due to {e}", exc_info=True)
self.report.profile_table_errors.setdefault(
"miscellaneous errors", LossyList()
).append((str(ref), str(e)))
return None

def _should_retry_unsupported_column(
self, ref: TableReference, e: DatabricksError
Expand Down Expand Up @@ -185,12 +191,14 @@ def _create_table_profile(
num_rows=self._get_int(table_info, "spark.sql.statistics.numRows"),
total_size=self._get_int(table_info, "spark.sql.statistics.totalSize"),
num_columns=len(columns_names),
column_profiles=[
self._create_column_profile(column, table_info)
for column in columns_names
]
if include_columns
else [],
column_profiles=(
[
self._create_column_profile(column, table_info)
for column in columns_names
]
if include_columns
else []
),
)

def _create_column_profile(
Expand Down Expand Up @@ -237,12 +245,16 @@ def _raise_if_error(
StatementState.CLOSED,
]:
raise DatabricksError(
response.status.error.message
if response.status.error and response.status.error.message
else "Unknown Error",
error_code=response.status.error.error_code.value
if response.status.error and response.status.error.error_code
else "Unknown Error Code",
(
response.status.error.message
if response.status.error and response.status.error.message
else "Unknown Error"
),
error_code=(
response.status.error.error_code.value
if response.status.error and response.status.error.error_code
else "Unknown Error Code"
),
status=response.status.state.value,
context=key,
)
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ def _get_queries(self) -> Iterable[Query]:
self.config.start_time, self.config.end_time
)
except Exception as e:
breakpoint()
mayurinehate marked this conversation as resolved.
Show resolved Hide resolved
logger.warning("Error getting queries", exc_info=True)
self.report.report_warning("get-queries", str(e))

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import uuid
from collections import namedtuple
from unittest import mock
from unittest.mock import patch

Expand Down Expand Up @@ -272,6 +273,9 @@ def register_mock_data(workspace_client):
]


TableEntry = namedtuple("TableEntry", ["database", "tableName", "isTemporary"])


def mock_hive_sql(query):

if query == "DESCRIBE EXTENDED `bronze_kambi`.`bet` betStatusId":
Expand Down Expand Up @@ -371,6 +375,13 @@ def mock_hive_sql(query):
"CREATE VIEW `hive_metastore`.`bronze_kambi`.`view1` AS SELECT * FROM `hive_metastore`.`bronze_kambi`.`bet`",
)
]
elif query == "SHOW TABLES FROM `bronze_kambi`":
return [
TableEntry("bronze_kambi", "bet", False),
TableEntry("bronze_kambi", "view1", False),
]
elif query == "SHOW VIEWS FROM `bronze_kambi`":
return [TableEntry("bronze_kambi", "view1", False)]

return []

Expand All @@ -392,8 +403,6 @@ def test_ingestion(pytestconfig, tmp_path, requests_mock):

inspector = mock.MagicMock()
inspector.get_schema_names.return_value = ["bronze_kambi"]
inspector.get_view_names.return_value = ["view1"]
inspector.get_table_names.return_value = ["bet", "view1"]
get_inspector.return_value = inspector

execute_sql.side_effect = mock_hive_sql
Expand Down
Loading