Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/databricks): support hive metastore schemas with special char #10049

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions metadata-ingestion/docs/sources/databricks/README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
DataHub supports integration with Databricks ecosystem using a multitude of connectors, depending on your exact setup.

## Databricks Hive
## Databricks Unity Catalog (new)

The simplest way to integrate is usually via the Hive connector. The [Hive starter recipe](http://datahubproject.io/docs/generated/ingestion/sources/hive#starter-recipe) has a section describing how to connect to your Databricks workspace.
The recently introduced [Unity Catalog](https://www.databricks.com/product/unity-catalog) provides a new way to govern your assets within the Databricks lakehouse. If you have Unity Catalog Enabled Workspace, you can use the `unity-catalog` source (aka `databricks` source, see below for details) to integrate your metadata into DataHub as an alternate to the Hive pathway. This also ingests hive metastore catalog in Databricks and is recommended approach to ingest Databricks ecosystem in DataHub.

## Databricks Unity Catalog (new)
## Databricks Hive (old)

The alternative way to integrate is via the Hive connector. The [Hive starter recipe](http://datahubproject.io/docs/generated/ingestion/sources/hive#starter-recipe) has a section describing how to connect to your Databricks workspace.

The recently introduced [Unity Catalog](https://www.databricks.com/product/unity-catalog) provides a new way to govern your assets within the Databricks lakehouse. If you have enabled Unity Catalog, you can use the `unity-catalog` source (see below) to integrate your metadata into DataHub as an alternate to the Hive pathway.

## Databricks Spark

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
TableProfile,
TableReference,
)
from datahub.ingestion.source.unity.report import UnityCatalogReport

logger = logging.getLogger(__name__)
HIVE_METASTORE = "hive_metastore"
Expand Down Expand Up @@ -66,9 +67,12 @@ class HiveMetastoreProxy(Closeable):
as unity catalog apis do not return details about this legacy metastore.
"""

def __init__(self, sqlalchemy_url: str, options: dict) -> None:
def __init__(
self, sqlalchemy_url: str, options: dict, report: UnityCatalogReport
) -> None:
try:
self.inspector = HiveMetastoreProxy.get_inspector(sqlalchemy_url, options)
self.report = report
except Exception:
# This means that there is no `hive_metastore` catalog in databricks workspace
# Not tested but seems like the logical conclusion.
Expand Down Expand Up @@ -100,22 +104,53 @@ def hive_metastore_schemas(self, catalog: Catalog) -> Iterable[Schema]:
)

def hive_metastore_tables(self, schema: Schema) -> Iterable[Table]:
views = self.inspector.get_view_names(schema.name)
# NOTE: Ideally, we use `inspector.get_view_names` and `inspector.get_table_names` here instead of
# making show queries in this class however Databricks dialect for databricks-sql-connector<3.0.0 does not
# back-quote schemas with special char such as hyphen.
# Currently, databricks-sql-connector is pinned to <3.0.0 due to requirement of SQLAlchemy > 2.0.21 for
# later versions.
views = self.get_view_names(schema.name)
mayurinehate marked this conversation as resolved.
Show resolved Hide resolved
for table_name in views:
yield self._get_table(schema, table_name, True)

for table_name in self.inspector.get_table_names(schema.name):
for table_name in self.get_table_names(schema.name):
if table_name in views:
continue
yield self._get_table(schema, table_name, False)

def get_table_names(self, schema_name: str) -> List[str]:
try:
rows = self._execute_sql(f"SHOW TABLES FROM `{schema_name}`")
# 3 columns - database, tableName, isTemporary
return [row.tableName for row in rows]
except Exception as e:
self.report.report_warning(
"Failed to get tables for schema", f"{HIVE_METASTORE}.{schema_name}"
)
logger.warning(
f"Failed to get tables {schema_name} due to {e}", exc_info=True
)
return []

def get_view_names(self, schema_name: str) -> List[str]:
try:
rows = self._execute_sql(f"SHOW VIEWS FROM `{schema_name}`")
# 3 columns - database, tableName, isTemporary
return [row.tableName for row in rows]
except Exception as e:
self.report.report_warning("Failed to get views for schema", schema_name)
logger.warning(
f"Failed to get views {schema_name} due to {e}", exc_info=True
)
return []

def _get_table(
self,
schema: Schema,
table_name: str,
is_view: bool = False,
) -> Table:
columns = self._get_columns(schema, table_name)
columns = self._get_columns(schema.name, table_name)
detailed_info = self._get_table_info(schema.name, table_name)

comment = detailed_info.pop("Comment", None)
Expand All @@ -134,9 +169,9 @@ def _get_table(
columns=columns,
storage_location=storage_location,
data_source_format=datasource_format,
view_definition=self._get_view_definition(schema.name, table_name)
if is_view
else None,
view_definition=(
self._get_view_definition(schema.name, table_name) if is_view else None
),
properties=detailed_info,
owner=None,
generation=None,
Expand All @@ -150,61 +185,69 @@ def _get_table(

def get_table_profile(
self, ref: TableReference, include_column_stats: bool = False
) -> TableProfile:
) -> Optional[TableProfile]:
columns = self._get_columns(
Schema(
id=ref.schema,
name=ref.schema,
# This is okay, as none of this is used in profiling
catalog=self.hive_metastore_catalog(None),
comment=None,
owner=None,
),
ref.schema,
ref.table,
)
detailed_info = self._get_table_info(ref.schema, ref.table)

if not columns and not detailed_info:
return None

table_stats = (
self._get_cached_table_statistics(detailed_info["Statistics"])
if detailed_info.get("Statistics")
else {}
)

column_profiles: List[ColumnProfile] = []
if include_column_stats:
for column in columns:
column_profile = self._get_column_profile(column.name, ref)
if column_profile:
column_profiles.append(column_profile)

return TableProfile(
num_rows=int(table_stats[ROWS])
if table_stats.get(ROWS) is not None
else None,
total_size=int(table_stats[BYTES])
if table_stats.get(BYTES) is not None
else None,
num_rows=(
int(table_stats[ROWS]) if table_stats.get(ROWS) is not None else None
),
total_size=(
int(table_stats[BYTES]) if table_stats.get(BYTES) is not None else None
),
num_columns=len(columns),
column_profiles=[
self._get_column_profile(column.name, ref) for column in columns
]
if include_column_stats
else [],
column_profiles=column_profiles,
)

def _get_column_profile(self, column: str, ref: TableReference) -> ColumnProfile:

props = self._column_describe_extended(ref.schema, ref.table, column)
col_stats = {}
for prop in props:
col_stats[prop[0]] = prop[1]
return ColumnProfile(
name=column,
null_count=int(col_stats[NUM_NULLS])
if col_stats.get(NUM_NULLS) is not None
else None,
distinct_count=int(col_stats[DISTINCT_COUNT])
if col_stats.get(DISTINCT_COUNT) is not None
else None,
min=col_stats.get(MIN),
max=col_stats.get(MAX),
avg_len=col_stats.get(AVG_COL_LEN),
max_len=col_stats.get(MAX_COL_LEN),
version=col_stats.get(VERSION),
)
def _get_column_profile(
self, column: str, ref: TableReference
) -> Optional[ColumnProfile]:
try:
props = self._column_describe_extended(ref.schema, ref.table, column)
col_stats = {}
for prop in props:
col_stats[prop[0]] = prop[1]
return ColumnProfile(
name=column,
null_count=(
int(col_stats[NUM_NULLS])
if col_stats.get(NUM_NULLS) is not None
else None
),
distinct_count=(
int(col_stats[DISTINCT_COUNT])
if col_stats.get(DISTINCT_COUNT) is not None
else None
),
min=col_stats.get(MIN),
max=col_stats.get(MAX),
avg_len=col_stats.get(AVG_COL_LEN),
max_len=col_stats.get(MAX_COL_LEN),
version=col_stats.get(VERSION),
)
except Exception as e:
logger.debug(f"Failed to get column profile for {ref}.{column} due to {e}")
return None

def _get_cached_table_statistics(self, statistics: str) -> dict:
# statistics is in format "xx bytes" OR "1382 bytes, 2 rows"
Expand Down Expand Up @@ -242,9 +285,14 @@ def _get_view_definition(self, schema_name: str, table_name: str) -> Optional[st
)
for row in rows:
return row[0]
except Exception:
except Exception as e:
self.report.report_warning(
"Failed to get view definition for table",
f"{HIVE_METASTORE}.{schema_name}.{table_name}",
)
logger.debug(
f"Failed to get view definition for {schema_name}.{table_name}"
f"Failed to get view definition for {schema_name}.{table_name} due to {e}",
exc_info=True,
)
return None

Expand All @@ -258,60 +306,81 @@ def _get_table_type(self, type: Optional[str]) -> HiveTableType:
else:
return HiveTableType.UNKNOWN

@lru_cache(maxsize=1)
def _get_table_info(self, schema_name: str, table_name: str) -> dict:
rows = self._describe_extended(schema_name, table_name)

index = rows.index(("# Detailed Table Information", "", ""))
rows = rows[index + 1 :]
# Copied from https://github.com/acryldata/PyHive/blob/master/pyhive/sqlalchemy_hive.py#L375
# Generate properties dictionary.
properties = {}
active_heading = None
for col_name, data_type, value in rows:
col_name = col_name.rstrip()
if col_name.startswith("# "):
continue
elif col_name == "" and data_type is None:
active_heading = None
continue
elif col_name != "" and data_type is None:
active_heading = col_name
elif col_name != "" and data_type is not None:
properties[col_name] = data_type.strip()
else:
# col_name == "", data_type is not None
prop_name = "{} {}".format(active_heading, data_type.rstrip())
properties[prop_name] = value.rstrip()

try:
rows = self._describe_extended(schema_name, table_name)

index = rows.index(("# Detailed Table Information", "", ""))
rows = rows[index + 1 :]
# Copied from https://github.com/acryldata/PyHive/blob/master/pyhive/sqlalchemy_hive.py#L375

active_heading = None
for col_name, data_type, value in rows:
col_name = col_name.rstrip()
if col_name.startswith("# "):
continue
elif col_name == "" and data_type is None:
active_heading = None
continue
elif col_name != "" and data_type is None:
active_heading = col_name
elif col_name != "" and data_type is not None:
properties[col_name] = data_type.strip()
else:
# col_name == "", data_type is not None
prop_name = "{} {}".format(active_heading, data_type.rstrip())
properties[prop_name] = value.rstrip()
except Exception as e:
self.report.report_warning(
"Failed to get detailed info for table",
f"{HIVE_METASTORE}.{schema_name}.{table_name}",
)
logger.debug(
f"Failed to get detailed info for table {schema_name}.{table_name} due to {e}",
exc_info=True,
)
return properties

def _get_columns(self, schema: Schema, table_name: str) -> List[Column]:
rows = self._describe_extended(schema.name, table_name)

@lru_cache(maxsize=1)
def _get_columns(self, schema_name: str, table_name: str) -> List[Column]:
columns: List[Column] = []
for i, row in enumerate(rows):
if i == 0 and row[0].strip() == "col_name":
continue # first row
if row[0].strip() in (
"",
"# Partition Information",
"# Detailed Table Information",
):
break
columns.append(
Column(
name=row[0].strip(),
id=f"{schema.id}.{table_name}.{row[0].strip()}",
type_text=row[1].strip(),
type_name=type_map.get(row[1].strip().lower()),
type_scale=None,
type_precision=None,
position=None,
nullable=None,
comment=row[2],
try:
rows = self._describe_extended(schema_name, table_name)
for i, row in enumerate(rows):
if i == 0 and row[0].strip() == "col_name":
continue # first row
if row[0].strip() in (
"",
"# Partition Information",
"# Detailed Table Information",
):
break
columns.append(
Column(
name=row[0].strip(),
id=f"{HIVE_METASTORE}.{schema_name}.{table_name}.{row[0].strip()}",
type_text=row[1].strip(),
type_name=type_map.get(row[1].strip().lower()),
type_scale=None,
type_precision=None,
position=None,
nullable=None,
comment=row[2],
)
)
except Exception as e:
self.report.report_warning(
"Failed to get columns for table",
f"{HIVE_METASTORE}.{schema_name}.{table_name}",
)
logger.debug(
f"Failed to get columns for table {schema_name}.{table_name} due to {e}",
exc_info=True,
)

return columns

@lru_cache(maxsize=1)
Expand Down
Loading
Loading