From 7d2f1b8b62298244c56c0af7900a872077f263d2 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Wed, 12 Jun 2024 09:32:37 -0700 Subject: [PATCH] fix(ingest): fix redshift query urns + reduce memory usage Fixes a regression from https://github.com/datahub-project/datahub/pull/10619. In reality, it's not a full regression but rather an existing bug that the above PR exposed. Also improves memory usage, which should help with https://github.com/datahub-project/datahub/issues/10435. --- metadata-ingestion/setup.py | 2 +- .../src/datahub/ingestion/source/redshift/lineage.py | 12 ++++-------- .../datahub/ingestion/source/redshift/lineage_v2.py | 6 +++++- .../src/datahub/ingestion/source/redshift/query.py | 2 +- .../datahub/sql_parsing/sql_parsing_aggregator.py | 5 +++++ .../tests/unit/test_redshift_lineage.py | 2 +- 6 files changed, 17 insertions(+), 12 deletions(-) diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index ade1e1a6ee5ba..cd90716257733 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -34,7 +34,7 @@ "importlib_metadata>=4.0.0; python_version < '3.10'", "docker", "expandvars>=0.6.5", - "avro-gen3==0.7.12", + "avro-gen3==0.7.13", # "avro-gen3 @ git+https://github.com/acryldata/avro_gen@master#egg=avro-gen3", "avro>=1.11.3,<1.12", "python-dateutil>=2.8.0", diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage.py index 87deab72284c0..852deac13e516 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage.py @@ -4,7 +4,7 @@ from dataclasses import dataclass, field from datetime import datetime from enum import Enum -from typing import Dict, List, Optional, Set, Tuple, Union +from typing import Dict, Iterable, List, Optional, Set, Tuple, Union from urllib.parse import urlparse import humanfriendly @@ -661,7 +661,7 @@ def populate_lineage( if self.config.resolve_temp_table_in_lineage: self._init_temp_table_schema( database=database, - temp_tables=self.get_temp_tables(connection=connection), + temp_tables=list(self.get_temp_tables(connection=connection)), ) populate_calls: List[Tuple[str, LineageCollectorType]] = [] @@ -893,7 +893,7 @@ def _process_table_renames( def get_temp_tables( self, connection: redshift_connector.Connection - ) -> List[TempTableRow]: + ) -> Iterable[TempTableRow]: ddl_query: str = self.queries.temp_table_ddl_query( start_time=self.config.start_time, end_time=self.config.end_time, @@ -901,15 +901,11 @@ def get_temp_tables( logger.debug(f"Temporary table ddl query = {ddl_query}") - temp_table_rows: List[TempTableRow] = [] - for row in RedshiftDataDictionary.get_temporary_rows( conn=connection, query=ddl_query, ): - temp_table_rows.append(row) - - return temp_table_rows + yield row def find_temp_tables( self, temp_table_rows: List[TempTableRow], temp_table_names: List[str] diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage_v2.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage_v2.py index 2c7ebb613c57a..062a99de6b735 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage_v2.py @@ -116,7 +116,11 @@ def build( default_schema=self.config.default_schema, session_id=temp_row.session_id, query_timestamp=temp_row.start_time, - is_known_temp_table=True, + # The "temp table" query actually returns all CREATE TABLE statements, even if they + # aren't explicitly a temp table. As such, setting is_known_temp_table=True + # would not be correct. We already have mechanisms to autodetect temp tables, + # so we won't lose anything by not setting it. + is_known_temp_table=False, ) populate_calls: List[Tuple[LineageCollectorType, str, Callable]] = [] diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py index 1bc82556ce4bc..3bd69d72be605 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py @@ -502,7 +502,7 @@ def list_insert_create_queries_sql( usename as username, ddl, sq.query as query_id, - min(si.starttime) as starttime, + min(si.starttime) as timestamp, ANY_VALUE(pid) as session_id from stl_insert as si diff --git a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py index 774f0dfce3b87..27daae11e2295 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py @@ -870,6 +870,7 @@ def _query_type_precedence(cls, query_type: str) -> int: models.DatasetLineageTypeClass.TRANSFORMED, ] + # Lower value = higher precedence. idx = query_precedence.index(query_type) if idx == -1: return len(query_precedence) @@ -885,13 +886,17 @@ def _gen_lineage_for_downstream( ] # Sort the queries by highest precedence first, then by latest timestamp. + # In case of ties, prefer queries with a known query type. # Tricky: by converting the timestamp to a number, we also can ignore the # differences between naive and aware datetimes. queries = sorted( + # Sorted is a stable sort, so in the case of total ties, we want + # to prefer the most recently added query. reversed(queries), key=lambda query: ( self._query_type_precedence(query.lineage_type), -(make_ts_millis(query.latest_timestamp) or 0), + query.query_type == QueryType.UNKNOWN, ), ) diff --git a/metadata-ingestion/tests/unit/test_redshift_lineage.py b/metadata-ingestion/tests/unit/test_redshift_lineage.py index 366a6009ee46a..78b7169a93f3c 100644 --- a/metadata-ingestion/tests/unit/test_redshift_lineage.py +++ b/metadata-ingestion/tests/unit/test_redshift_lineage.py @@ -241,7 +241,7 @@ def test_collapse_temp_lineage(): lineage_extractor._init_temp_table_schema( database=lineage_extractor.config.database, - temp_tables=lineage_extractor.get_temp_tables(connection=connection), + temp_tables=list(lineage_extractor.get_temp_tables(connection=connection)), ) lineage_extractor._populate_lineage_map(