Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingestion/tableau): restructure the tableau graphql datasource query #11230

Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
fcc2450
cursor based pagination
sid-acryl Aug 23, 2024
752cd95
lint fix
sid-acryl Aug 23, 2024
ff54319
error message
sid-acryl Aug 23, 2024
7213497
add a debug log
sid-acryl Aug 23, 2024
d75912c
add fetch size
sid-acryl Aug 24, 2024
1dff5d8
commented upstreamFields and upstreamColumns of published and embedde…
sid-acryl Aug 26, 2024
f182099
existing test-cases are working
sid-acryl Aug 27, 2024
0d2431c
fix test case
sid-acryl Aug 27, 2024
fe70845
fix unit test case
sid-acryl Aug 27, 2024
8cb1d03
Merge branch 'master' into cus-2491-tableau-ingestion-hitting-20000-n…
sid-acryl Aug 27, 2024
464bb43
doc updated
sid-acryl Aug 27, 2024
902aa3c
Merge branch 'cus-2491-tableau-ingestion-hitting-20000-node-limit' of…
sid-acryl Aug 27, 2024
2ec4ca4
Merge branch 'master' into cus-2491-tableau-ingestion-hitting-20000-n…
sid-acryl Aug 27, 2024
dd533b4
Merge branch 'master' into cus-2491-tableau-ingestion-hitting-20000-n…
sid-acryl Aug 28, 2024
127391d
fix the retries_remaining
sid-acryl Aug 28, 2024
b25c6f1
change parent container emit sequence
sid-acryl Aug 28, 2024
a093b15
log message
sid-acryl Aug 29, 2024
02d4655
update sequence
sid-acryl Aug 29, 2024
9cce6c0
Merge branch 'master' into cus-2491-tableau-ingestion-hitting-20000-n…
sid-acryl Aug 30, 2024
439a375
address review comments
sid-acryl Aug 30, 2024
29f9406
generate container for parent first
sid-acryl Sep 3, 2024
dc28a3e
doc updates
sid-acryl Sep 4, 2024
3fcedb7
revert the file
sid-acryl Sep 4, 2024
b8f4428
Merge branch 'master' into cus-2491-tableau-ingestion-hitting-20000-n…
sid-acryl Sep 5, 2024
c5afb8a
address review comments
sid-acryl Sep 5, 2024
1559493
Update metadata-ingestion/src/datahub/ingestion/source/tableau/tablea…
sid-acryl Sep 9, 2024
45a2e07
Merge branch 'master' into cus-2491-tableau-ingestion-hitting-20000-n…
sid-acryl Sep 9, 2024
07baf75
address review comments
sid-acryl Sep 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from datahub.ingestion.source.tableau.tableau import (
sid-acryl marked this conversation as resolved.
Show resolved Hide resolved
TableauConfig,
TableauSiteSource,
TableauSource,
TableauSourceReport,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove unused imports.

The following imports are unused and should be removed to maintain clean and efficient code:

  • TableauConfig
  • TableauSiteSource
  • TableauSource
  • TableauSourceReport

Apply this diff to remove the unused imports:

-from datahub.ingestion.source.tableau.tableau import (
-    TableauConfig,
-    TableauSiteSource,
-    TableauSource,
-    TableauSourceReport,
-)
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
from datahub.ingestion.source.tableau.tableau import (
TableauConfig,
TableauSiteSource,
TableauSource,
TableauSourceReport,
)
Tools
Ruff

2-2: datahub.ingestion.source.tableau.tableau.TableauConfig imported but unused; consider removing, adding to __all__, or using a redundant alias

(F401)


3-3: datahub.ingestion.source.tableau.tableau.TableauSiteSource imported but unused; consider removing, adding to __all__, or using a redundant alias

(F401)


4-4: datahub.ingestion.source.tableau.tableau.TableauSource imported but unused; consider removing, adding to __all__, or using a redundant alias

(F401)


5-5: datahub.ingestion.source.tableau.tableau.TableauSourceReport imported but unused; consider removing, adding to __all__, or using a redundant alias

(F401)

Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
TestConnectionReport,
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source import tableau_constant as c
from datahub.ingestion.source.common.subtypes import (
BIContainerSubTypes,
DatasetSubTypes,
Expand All @@ -83,7 +82,8 @@
StatefulIngestionConfigBase,
StatefulIngestionSourceBase,
)
from datahub.ingestion.source.tableau_common import (
from datahub.ingestion.source.tableau import tableau_constant as c
from datahub.ingestion.source.tableau.tableau_common import (
FIELD_TYPE_MAPPING,
MetadataQueryException,
TableauLineageOverrides,
Expand All @@ -93,6 +93,7 @@
dashboard_graphql_query,
database_servers_graphql_query,
database_tables_graphql_query,
datasource_upstream_fields_graphql_query,
embedded_datasource_graphql_query,
get_filter_pages,
get_overridden_info,
Expand All @@ -101,7 +102,7 @@
make_fine_grained_lineage_class,
make_upstream_class,
published_datasource_graphql_query,
query_metadata,
query_metadata_cursor_based_pagination,
sheet_graphql_query,
tableau_field_to_schema_field,
workbook_graphql_query,
Expand Down Expand Up @@ -345,6 +346,12 @@ class TableauConfig(
default=10,
description="[advanced] Number of metadata objects (e.g. CustomSQLTable, PublishedDatasource, etc) to query at a time using the Tableau API.",
)

fetch_size: int = Field(
default=250,
description="Specifies the number of records to retrieve in each batch during a query execution.",
)

# We've found that even with a small workbook page size (e.g. 10), the Tableau API often
# returns warnings like this:
# {
Expand Down Expand Up @@ -972,22 +979,28 @@ def get_connection_object_page(
query: str,
connection_type: str,
query_filter: str,
count: int = 0,
offset: int = 0,
current_cursor: Optional[str], # initial value is None
fetch_size: int = 0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improve error handling for "NODE_LIMIT_EXCEEDED".

The function is correctly implemented, but the error handling for "NODE_LIMIT_EXCEEDED" could be improved by suggesting actionable steps to the user.

Apply this diff to suggest actionable steps for "NODE_LIMIT_EXCEEDED" error:

if node_limit_errors:
    logger.debug(f"Node Limit Error. query_data {query_data}")
    self.report.warning(
        title="Node Limit Errors",
        message="Increase your tableau node limit or reduce the fetch size in the configuration.",
        context=f"""{{
                "errors": {node_limit_errors},
                "connection_type": {connection_type},
                "query_filter": {query_filter},
                "query": {query},
        }}""",
    )

Also applies to: 986-987, 990-991, 995-1001, 1003-1003, 1013-1017, 1031-1035, 1037-1037, 1051-1051, 1059-1063, 1082-1093, 1131-1137

retry_on_auth_error: bool = True,
retries_remaining: Optional[int] = None,
) -> Tuple[dict, int, int]:
) -> Tuple[dict, Optional[str], int]:
retries_remaining = retries_remaining or self.config.max_retries

logger.debug(
f"Query {connection_type} to get {count} objects with offset {offset}"
f"Query {connection_type} to get {fetch_size} objects with offset {current_cursor}"
f" and filter {query_filter}"
)
try:
assert self.server is not None
query_data = query_metadata(
self.server, query, connection_type, count, offset, query_filter
query_data = query_metadata_cursor_based_pagination(
server=self.server,
main_query=query,
connection_name=connection_type,
first=fetch_size,
after=current_cursor,
qry_filter=query_filter,
)

except REAUTHENTICATE_ERRORS:
if not retry_on_auth_error:
raise
Expand All @@ -997,11 +1010,11 @@ def get_connection_object_page(
# will be thrown and we need to re-authenticate and retry.
self._re_authenticate()
return self.get_connection_object_page(
query,
connection_type,
query_filter,
count,
offset,
query=query,
connection_type=connection_type,
query_filter=query_filter,
fetch_size=fetch_size,
current_cursor=current_cursor,
retry_on_auth_error=False,
retries_remaining=retries_remaining,
)
Expand All @@ -1015,13 +1028,13 @@ def get_connection_object_page(
if retries_remaining <= 0:
raise
return self.get_connection_object_page(
query,
connection_type,
query_filter,
count,
offset,
query=query,
connection_type=connection_type,
query_filter=query_filter,
fetch_size=fetch_size,
current_cursor=current_cursor,
retry_on_auth_error=False,
retries_remaining=retries_remaining - 1,
retries_remaining=retries_remaining,
sid-acryl marked this conversation as resolved.
Show resolved Hide resolved
)

if c.ERRORS in query_data:
Expand All @@ -1035,13 +1048,19 @@ def get_connection_object_page(
# filter out PERMISSIONS_MODE_SWITCHED to report error in human-readable format
other_errors = []
permission_mode_errors = []
node_limit_errors = []
for error in errors:
if (
error.get("extensions")
and error["extensions"].get("code")
== "PERMISSIONS_MODE_SWITCHED"
):
permission_mode_errors.append(error)
elif (
error.get("extensions")
and error["extensions"].get("code") == "NODE_LIMIT_EXCEEDED"
):
node_limit_errors.append(error)
else:
other_errors.append(error)

Expand All @@ -1060,6 +1079,18 @@ def get_connection_object_page(
context=f"{permission_mode_errors}",
)

if node_limit_errors:
logger.debug(f"Node Limit Error. query_data {query_data}")
self.report.warning(
title="Node Limit Errors",
sid-acryl marked this conversation as resolved.
Show resolved Hide resolved
message="Increase your tableau node limit",
sid-acryl marked this conversation as resolved.
Show resolved Hide resolved
context=f"""{{
"errors": {node_limit_errors},
"connection_type": {connection_type},
"query_filter": {query_filter},
"query": {query},
}}""",
)
else:
# As of Tableau Server 2024.2, the metadata API sporadically returns a 30 second
# timeout error. It doesn't reliably happen, so retrying a couple times makes sense.
Expand All @@ -1082,23 +1113,28 @@ def get_connection_object_page(
)
time.sleep(backoff_time)
return self.get_connection_object_page(
query,
connection_type,
query_filter,
count,
offset,
query=query,
connection_type=connection_type,
query_filter=query_filter,
fetch_size=fetch_size,
current_cursor=current_cursor,
retry_on_auth_error=False,
retries_remaining=retries_remaining - 1,
retries_remaining=retries_remaining,
)
raise RuntimeError(f"Query {connection_type} error: {errors}")

connection_object = query_data.get(c.DATA, {}).get(connection_type, {})

total_count = connection_object.get(c.TOTAL_COUNT, 0)
has_next_page = connection_object.get(c.PAGE_INFO, {}).get(
c.HAS_NEXT_PAGE, False
)
return connection_object, total_count, has_next_page

next_cursor = connection_object.get(c.PAGE_INFO, {}).get(
"endCursor",
None,
)

return connection_object, next_cursor, has_next_page

def get_connection_objects(
self,
Expand All @@ -1114,29 +1150,23 @@ def get_connection_objects(
filter_pages = get_filter_pages(query_filter, page_size)

for filter_page in filter_pages:
total_count = page_size
has_next_page = 1
offset = 0
current_cursor: Optional[str] = None
hsheth2 marked this conversation as resolved.
Show resolved Hide resolved
while has_next_page:
count = (
page_size
if offset + page_size < total_count
else total_count - offset
)
filter_: str = make_filter(filter_page)

(
connection_objects,
total_count,
current_cursor,
has_next_page,
) = self.get_connection_object_page(
query,
connection_type,
make_filter(filter_page),
count,
offset,
query=query,
connection_type=connection_type,
query_filter=filter_,
fetch_size=self.config.fetch_size,
current_cursor=current_cursor,
sid-acryl marked this conversation as resolved.
Show resolved Hide resolved
)

offset += count

yield from connection_objects.get(c.NODES) or []

def emit_workbooks(self) -> Iterable[MetadataWorkUnit]:
Expand Down Expand Up @@ -2291,6 +2321,41 @@ def _get_datasource_container_key(

return container_key

def update_datasource_for_field_upstream(
self,
datasource: dict,
field_upstream_query: str,
) -> dict:
# Collect field ids to fetch field upstreams
field_ids: List[str] = []
for field in datasource.get(c.FIELDS, []):
if field.get(c.ID):
field_ids.append(field.get(c.ID))

# Fetch field upstreams and arrange them in map
field_vs_upstream: Dict[str, dict] = {}
for field_upstream in self.get_connection_objects(
field_upstream_query,
c.FIELDS_CONNECTION,
{c.ID_WITH_IN: field_ids},
):
if field_upstream.get(c.ID):
field_id = field_upstream[c.ID]
# delete the field id, we don't need it for further processing
del field_upstream[c.ID]
field_vs_upstream[field_id] = field_upstream

# update datasource's field for its upstream
for field_dict in datasource.get(c.FIELDS, []):
field_upstream_dict: Optional[dict] = field_vs_upstream.get(
field_dict.get(c.ID)
)
if field_upstream_dict:
# Add upstream fields to field
field_dict.update(field_upstream_dict)

return datasource

Comment on lines +2340 to +2374
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplify the logic using dictionary comprehension.

The function is correctly implemented, but the logic could be simplified by using a dictionary comprehension.

Apply this diff to simplify the logic:

# Fetch field upstreams and arrange them in map
field_vs_upstream: Dict[str, dict] = {
    field_upstream[c.ID]: {k: v for k, v in field_upstream.items() if k != c.ID}
    for field_upstream in self.get_connection_objects(
        field_upstream_query,
        c.FIELDS_CONNECTION,
        {c.ID_WITH_IN: field_ids},
    )
    if field_upstream.get(c.ID)
}

# update datasource's field for its upstream
for field_dict in datasource.get(c.FIELDS, []):
    field_upstream_dict: Optional[dict] = field_vs_upstream.get(field_dict.get(c.ID))
    if field_upstream_dict:
        # Add upstream fields to field
        field_dict.update(field_upstream_dict)
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def update_datasource_for_field_upstream(
self,
datasource: dict,
field_upstream_query: str,
) -> dict:
# Collect field ids to fetch field upstreams
field_ids: List[str] = []
for field in datasource.get(c.FIELDS, []):
if field.get(c.ID):
field_ids.append(field.get(c.ID))
# Fetch field upstreams and arrange them in map
field_vs_upstream: Dict[str, dict] = {}
for field_upstream in self.get_connection_objects(
field_upstream_query,
c.FIELDS_CONNECTION,
{c.ID_WITH_IN: field_ids},
):
if field_upstream.get(c.ID):
field_id = field_upstream[c.ID]
# delete the field id, we don't need it for further processing
del field_upstream[c.ID]
field_vs_upstream[field_id] = field_upstream
# update datasource's field for its upstream
for field_dict in datasource.get(c.FIELDS, []):
field_upstream_dict: Optional[dict] = field_vs_upstream.get(
field_dict.get(c.ID)
)
if field_upstream_dict:
# Add upstream fields to field
field_dict.update(field_upstream_dict)
return datasource
def update_datasource_for_field_upstream(
self,
datasource: dict,
field_upstream_query: str,
) -> dict:
# Collect field ids to fetch field upstreams
field_ids: List[str] = []
for field in datasource.get(c.FIELDS, []):
if field.get(c.ID):
field_ids.append(field.get(c.ID))
# Fetch field upstreams and arrange them in map
field_vs_upstream: Dict[str, dict] = {
field_upstream[c.ID]: {k: v for k, v in field_upstream.items() if k != c.ID}
for field_upstream in self.get_connection_objects(
field_upstream_query,
c.FIELDS_CONNECTION,
{c.ID_WITH_IN: field_ids},
)
if field_upstream.get(c.ID)
}
# update datasource's field for its upstream
for field_dict in datasource.get(c.FIELDS, []):
field_upstream_dict: Optional[dict] = field_vs_upstream.get(
field_dict.get(c.ID)
)
if field_upstream_dict:
# Add upstream fields to field
field_dict.update(field_upstream_dict)
return datasource

def emit_published_datasources(self) -> Iterable[MetadataWorkUnit]:
datasource_filter = {c.ID_WITH_IN: self.datasource_ids_being_used}

Expand All @@ -2299,6 +2364,11 @@ def emit_published_datasources(self) -> Iterable[MetadataWorkUnit]:
c.PUBLISHED_DATA_SOURCES_CONNECTION,
datasource_filter,
):
datasource = self.update_datasource_for_field_upstream(
sid-acryl marked this conversation as resolved.
Show resolved Hide resolved
datasource=datasource,
field_upstream_query=datasource_upstream_fields_graphql_query,
)

yield from self.emit_datasource(datasource)

def emit_upstream_tables(self) -> Iterable[MetadataWorkUnit]:
Expand Down Expand Up @@ -2940,6 +3010,10 @@ def emit_embedded_datasources(self) -> Iterable[MetadataWorkUnit]:
c.EMBEDDED_DATA_SOURCES_CONNECTION,
datasource_filter,
):
datasource = self.update_datasource_for_field_upstream(
datasource=datasource,
field_upstream_query=datasource_upstream_fields_graphql_query,
)
Comment on lines +3029 to +3032
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extract logic to a helper function.

The function is correctly implemented, but the logic for updating the datasource fields with their upstream fields could be extracted to a helper function to improve readability.

Apply this diff to extract the logic to a helper function:

def update_datasource_with_upstream_fields(self, datasource: dict) -> dict:
    return self.update_datasource_for_field_upstream(
        datasource=datasource,
        field_upstream_query=datasource_upstream_fields_graphql_query,
    )

def emit_embedded_datasources(self) -> Iterable[MetadataWorkUnit]:
    datasource_filter = {c.ID_WITH_IN: self.embedded_datasource_ids_being_used}

    for datasource in self.get_connection_objects(
        embedded_datasource_graphql_query,
        c.EMBEDDED_DATA_SOURCES_CONNECTION,
        datasource_filter,
    ):
        datasource = self.update_datasource_with_upstream_fields(datasource)
        yield from self.emit_datasource(
            datasource,
            datasource.get(c.WORKBOOK),
            is_embedded_ds=True,
        )
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
datasource = self.update_datasource_for_field_upstream(
datasource=datasource,
field_upstream_query=datasource_upstream_fields_graphql_query,
)
def update_datasource_with_upstream_fields(self, datasource: dict) -> dict:
return self.update_datasource_for_field_upstream(
datasource=datasource,
field_upstream_query=datasource_upstream_fields_graphql_query,
)
def emit_embedded_datasources(self) -> Iterable[MetadataWorkUnit]:
datasource_filter = {c.ID_WITH_IN: self.embedded_datasource_ids_being_used}
for datasource in self.get_connection_objects(
embedded_datasource_graphql_query,
c.EMBEDDED_DATA_SOURCES_CONNECTION,
datasource_filter,
):
datasource = self.update_datasource_with_upstream_fields(datasource)
yield from self.emit_datasource(
datasource,
datasource.get(c.WORKBOOK),
is_embedded_ds=True,
)

yield from self.emit_datasource(
datasource,
datasource.get(c.WORKBOOK),
Expand Down
Loading
Loading