Skip to content

Commit

Permalink
feat(ingest/vertica): use 3 part naming (#10636)
Browse files Browse the repository at this point in the history
  • Loading branch information
Rajasekhar-Vuppala authored Jun 15, 2024
1 parent edb9cf6 commit a2d8a09
Show file tree
Hide file tree
Showing 5 changed files with 2,735 additions and 2,849 deletions.
Binary file removed datahub-web-react/src/images/verticalogo copy.png
Binary file not shown.
2 changes: 1 addition & 1 deletion metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@
"nifi": {"requests", "packaging", "requests-gssapi"},
"powerbi": microsoft_common | {"lark[regex]==1.1.4", "sqlparse"} | sqlglot_lib,
"powerbi-report-server": powerbi_report_server,
"vertica": sql_common | {"vertica-sqlalchemy-dialect[vertica-python]==0.0.8.1"},
"vertica": sql_common | {"vertica-sqlalchemy-dialect[vertica-python]==0.0.8.2"},
"unity-catalog": databricks | sql_common | sqllineage_lib,
# databricks is alias for unity-catalog and needs to be kept in sync
"databricks": databricks | sql_common | sqllineage_lib,
Expand Down
25 changes: 10 additions & 15 deletions metadata-ingestion/src/datahub/ingestion/source/sql/vertica.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,17 +133,8 @@ def create(cls, config_dict: Dict, ctx: PipelineContext) -> "VerticaSource":
return cls(config, ctx)

def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]:
yield from super().get_workunits_internal()
sql_config = self.config
if logger.isEnabledFor(logging.DEBUG):
# If debug logging is enabled, we also want to echo each SQL query issued.
sql_config.options.setdefault("echo", True)

# Extra default SQLAlchemy option for better connection pooling and threading.
# https://docs.sqlalchemy.org/en/14/core/pooling.html#sqlalchemy.pool.QueuePool.params.max_overflow
if sql_config.is_profiling_enabled():
sql_config.options.setdefault(
"max_overflow", sql_config.profiling.max_workers
)

for inspector in self.get_inspectors():
profiler = None
Expand All @@ -170,11 +161,6 @@ def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit
),
)

if sql_config.include_tables:
yield from self.loop_tables(inspector, schema, sql_config)

if sql_config.include_views:
yield from self.loop_views(inspector, schema, sql_config)
if sql_config.include_projections:
yield from self.loop_projections(inspector, schema, sql_config)
if sql_config.include_models:
Expand All @@ -190,6 +176,15 @@ def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit
profile_requests, profiler, platform=self.platform
)

def get_identifier(
self, *, schema: str, entity: str, inspector: VerticaInspector, **kwargs: Any
) -> str:
regular = f"{schema}.{entity}"
if self.config.database:
return f"{self.config.database}.{regular}"
current_database = self.get_db_name(inspector)
return f"{current_database}.{regular}"

def get_database_properties(
self, inspector: VerticaInspector, database: str
) -> Optional[Dict[str, str]]:
Expand Down
Loading

0 comments on commit a2d8a09

Please sign in to comment.