From 627c5abfd6e0ffd0fea196870cbbcf1b61f74625 Mon Sep 17 00:00:00 2001 From: sid-acryl <155424659+sid-acryl@users.noreply.github.com> Date: Wed, 21 Aug 2024 00:12:00 +0530 Subject: [PATCH] feat(ingestion/bigquery): Add ability to filter GCP project ingestion based on project labels (#11169) Co-authored-by: Alice Naghshineh Co-authored-by: Alice Naghshineh <45885699+anaghshineh@users.noreply.github.com> Co-authored-by: Tamas Nemeth Co-authored-by: david-leifker <114954101+david-leifker@users.noreply.github.com> --- docs/quick-ingestion-guides/bigquery/setup.md | 4 +- metadata-ingestion/setup.py | 1 + .../ingestion/source/bigquery_v2/bigquery.py | 32 +- .../source/bigquery_v2/bigquery_config.py | 47 +- .../source/bigquery_v2/bigquery_report.py | 1 + .../source/bigquery_v2/bigquery_schema.py | 26 +- .../bigquery_v2/bigquery_test_connection.py | 4 +- .../ingestion/source/bigquery_v2/lineage.py | 4 +- .../bigquery_project_label_mcp_golden.json | 452 ++++++++++++++++++ .../integration/bigquery_v2/test_bigquery.py | 150 +++++- .../tests/unit/test_bigquery_source.py | 128 ++++- 11 files changed, 786 insertions(+), 63 deletions(-) create mode 100644 metadata-ingestion/tests/integration/bigquery_v2/bigquery_project_label_mcp_golden.json diff --git a/docs/quick-ingestion-guides/bigquery/setup.md b/docs/quick-ingestion-guides/bigquery/setup.md index 10351d6572c53..96850f2deb68e 100644 --- a/docs/quick-ingestion-guides/bigquery/setup.md +++ b/docs/quick-ingestion-guides/bigquery/setup.md @@ -38,7 +38,9 @@ Please refer to the BigQuery [Permissions](https://cloud.google.com/iam/docs/per You can always add/remove roles to Service Accounts later on. Please refer to the BigQuery [Manage access to projects, folders, and organizations](https://cloud.google.com/iam/docs/granting-changing-revoking-access) guide for more details. ::: -3. Create and download a [Service Account Key](https://cloud.google.com/iam/docs/creating-managing-service-account-keys). We will use this to set up authentication within DataHub. +3. To filter projects based on the `project_labels` configuration, first visit [cloudresourcemanager.googleapis.com](https://console.developers.google.com/apis/api/cloudresourcemanager.googleapis.com/overview) and enable the `Cloud Resource Manager API` + +4. Create and download a [Service Account Key](https://cloud.google.com/iam/docs/creating-managing-service-account-keys). We will use this to set up authentication within DataHub. The key file looks like this: diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 7fb83fb6a8325..d59545694c324 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -181,6 +181,7 @@ "google-cloud-logging<=3.5.0", "google-cloud-bigquery", "google-cloud-datacatalog>=1.5.0", + "google-cloud-resource-manager", "more-itertools>=8.12.0", "sqlalchemy-bigquery>=1.4.1", } diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index 7a96b2f0643ab..0d73c9ad02897 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -113,8 +113,9 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config): BigqueryTableIdentifier._BQ_SHARDED_TABLE_SUFFIX = "" self.bigquery_data_dictionary = BigQuerySchemaApi( - self.report.schema_api_perf, - self.config.get_bigquery_client(), + report=BigQueryV2Report().schema_api_perf, + projects_client=config.get_projects_client(), + client=config.get_bigquery_client(), ) if self.config.extract_policy_tags_from_catalog: self.bigquery_data_dictionary.datacatalog_client = ( @@ -257,14 +258,37 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: def _get_projects(self) -> List[BigqueryProject]: logger.info("Getting projects") + if self.config.project_ids or self.config.project_id: project_ids = self.config.project_ids or [self.config.project_id] # type: ignore return [ BigqueryProject(id=project_id, name=project_id) for project_id in project_ids ] - else: - return list(self._query_project_list()) + + if self.config.project_labels: + return list(self._query_project_list_from_labels()) + + return list(self._query_project_list()) + + def _query_project_list_from_labels(self) -> Iterable[BigqueryProject]: + projects = self.bigquery_data_dictionary.get_projects_with_labels( + self.config.project_labels + ) + + if not projects: # Report failure on exception and if empty list is returned + self.report.report_failure( + "metadata-extraction", + "Get projects didn't return any project with any of the specified label(s). " + "Maybe resourcemanager.projects.list permission is missing for the service account. " + "You can assign predefined roles/bigquery.metadataViewer role to your service account.", + ) + + for project in projects: + if self.config.project_id_pattern.allowed(project.id): + yield project + else: + self.report.report_dropped(project.id) def _query_project_list(self) -> Iterable[BigqueryProject]: try: diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py index fe961dbd780f6..af9256d8877f5 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py @@ -3,7 +3,7 @@ from datetime import timedelta from typing import Any, Dict, List, Optional, Union -from google.cloud import bigquery, datacatalog_v1 +from google.cloud import bigquery, datacatalog_v1, resourcemanager_v3 from google.cloud.logging_v2.client import Client as GCPLoggingClient from pydantic import Field, PositiveInt, PrivateAttr, root_validator, validator @@ -34,12 +34,16 @@ class BigQueryUsageConfig(BaseUsageConfig): max_query_duration: timedelta = Field( default=timedelta(minutes=15), - description="Correction to pad start_time and end_time with. For handling the case where the read happens within our time range but the query completion event is delayed and happens after the configured end time.", + description="Correction to pad start_time and end_time with. For handling the case where the read happens " + "within our time range but the query completion event is delayed and happens after the configured" + " end time.", ) apply_view_usage_to_tables: bool = Field( default=False, - description="Whether to apply view's usage to its base tables. If set to False, uses sql parser and applies usage to views / tables mentioned in the query. If set to True, usage is applied to base tables only.", + description="Whether to apply view's usage to its base tables. If set to False, uses sql parser and applies " + "usage to views / tables mentioned in the query. If set to True, usage is applied to base tables " + "only.", ) @@ -74,6 +78,9 @@ def get_bigquery_client(self) -> bigquery.Client: client_options = self.extra_client_options return bigquery.Client(self.project_on_behalf, **client_options) + def get_projects_client(self) -> resourcemanager_v3.ProjectsClient: + return resourcemanager_v3.ProjectsClient() + def get_policy_tag_manager_client(self) -> datacatalog_v1.PolicyTagManagerClient: return datacatalog_v1.PolicyTagManagerClient() @@ -143,12 +150,14 @@ class BigQueryV2Config( dataset_pattern: AllowDenyPattern = Field( default=AllowDenyPattern.allow_all(), - description="Regex patterns for dataset to filter in ingestion. Specify regex to only match the schema name. e.g. to match all tables in schema analytics, use the regex 'analytics'", + description="Regex patterns for dataset to filter in ingestion. Specify regex to only match the schema name. " + "e.g. to match all tables in schema analytics, use the regex 'analytics'", ) match_fully_qualified_names: bool = Field( default=True, - description="[deprecated] Whether `dataset_pattern` is matched against fully qualified dataset name `.`.", + description="[deprecated] Whether `dataset_pattern` is matched against fully qualified dataset name " + "`.`.", ) include_external_url: bool = Field( @@ -169,7 +178,9 @@ class BigQueryV2Config( table_snapshot_pattern: AllowDenyPattern = Field( default=AllowDenyPattern.allow_all(), - description="Regex patterns for table snapshots to filter in ingestion. Specify regex to match the entire snapshot name in database.schema.snapshot format. e.g. to match all snapshots starting with customer in Customer database and public schema, use the regex 'Customer.public.customer.*'", + description="Regex patterns for table snapshots to filter in ingestion. Specify regex to match the entire " + "snapshot name in database.schema.snapshot format. e.g. to match all snapshots starting with " + "customer in Customer database and public schema, use the regex 'Customer.public.customer.*'", ) debug_include_full_payloads: bool = Field( @@ -180,17 +191,22 @@ class BigQueryV2Config( number_of_datasets_process_in_batch: int = Field( hidden_from_docs=True, default=10000, - description="Number of table queried in batch when getting metadata. This is a low level config property which should be touched with care.", + description="Number of table queried in batch when getting metadata. This is a low level config property " + "which should be touched with care.", ) number_of_datasets_process_in_batch_if_profiling_enabled: int = Field( default=1000, - description="Number of partitioned table queried in batch when getting metadata. This is a low level config property which should be touched with care. This restriction is needed because we query partitions system view which throws error if we try to touch too many tables.", + description="Number of partitioned table queried in batch when getting metadata. This is a low level config " + "property which should be touched with care. This restriction is needed because we query " + "partitions system view which throws error if we try to touch too many tables.", ) use_tables_list_query_v2: bool = Field( default=False, - description="List tables using an improved query that extracts partitions and last modified timestamps more accurately. Requires the ability to read table data. Automatically enabled when profiling is enabled.", + description="List tables using an improved query that extracts partitions and last modified timestamps more " + "accurately. Requires the ability to read table data. Automatically enabled when profiling is " + "enabled.", ) @property @@ -199,7 +215,9 @@ def have_table_data_read_permission(self) -> bool: column_limit: int = Field( default=300, - description="Maximum number of columns to process in a table. This is a low level config property which should be touched with care. This restriction is needed because excessively wide tables can result in failure to ingest the schema.", + description="Maximum number of columns to process in a table. This is a low level config property which " + "should be touched with care. This restriction is needed because excessively wide tables can " + "result in failure to ingest the schema.", ) # The inheritance hierarchy is wonky here, but these options need modifications. project_id: Optional[str] = Field( @@ -214,6 +232,15 @@ def have_table_data_read_permission(self) -> bool: "Overrides `project_id_pattern`." ), ) + project_labels: List[str] = Field( + default_factory=list, + description=( + "Ingests projects with the specified labels. Set value in the format of `key:value`. Use this property to " + "define which projects to ingest based" + "on project-level labels. If project_ids or project_id is set, this configuration has no effect. The " + "ingestion process filters projects by label first, and then applies the project_id_pattern." + ), + ) storage_project_id: None = Field(default=None, hidden_from_docs=True) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py index 4cfcc3922ddc3..807e99604f013 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py @@ -31,6 +31,7 @@ class BigQuerySchemaApiPerfReport(Report): num_get_snapshots_for_dataset_api_requests: int = 0 list_projects: PerfTimer = field(default_factory=PerfTimer) + list_projects_with_labels: PerfTimer = field(default_factory=PerfTimer) list_datasets: PerfTimer = field(default_factory=PerfTimer) get_columns_for_dataset_sec: float = 0 diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py index d73ac46c862ea..4326ff7a35527 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py @@ -5,7 +5,7 @@ from typing import Any, Dict, Iterable, Iterator, List, Optional from google.api_core import retry -from google.cloud import bigquery, datacatalog_v1 +from google.cloud import bigquery, datacatalog_v1, resourcemanager_v3 from google.cloud.bigquery.table import ( RowIterator, TableListItem, @@ -144,9 +144,11 @@ def __init__( self, report: BigQuerySchemaApiPerfReport, client: bigquery.Client, + projects_client: resourcemanager_v3.ProjectsClient, datacatalog_client: Optional[datacatalog_v1.PolicyTagManagerClient] = None, ) -> None: self.bq_client = client + self.projects_client = projects_client self.report = report self.datacatalog_client = datacatalog_client @@ -175,7 +177,7 @@ def _should_retry(exc: BaseException) -> bool: # 'Quota exceeded: Your user exceeded quota for concurrent project.lists requests.' # Hence, added the api request retry of 15 min. # We already tried adding rate_limit externally, proving max_result and page_size - # to restrict the request calls inside list_project but issue still occured. + # to restrict the request calls inside list_project but issue still occurred. projects_iterator = self.bq_client.list_projects( max_results=max_results_per_page, page_token=page_token, @@ -202,6 +204,26 @@ def _should_retry(exc: BaseException) -> bool: return [] return projects + def get_projects_with_labels(self, labels: List[str]) -> List[BigqueryProject]: + with self.report.list_projects_with_labels: + try: + projects = [] + labels_query = " OR ".join([f"labels.{label}" for label in labels]) + for project in self.projects_client.search_projects(query=labels_query): + projects.append( + BigqueryProject( + id=project.project_id, name=project.display_name + ) + ) + + return projects + + except Exception as e: + logger.error( + f"Error getting projects with labels: {labels}. {e}", exc_info=True + ) + return [] + def get_datasets_for_project_id( self, project_id: str, maxResults: Optional[int] = None ) -> List[BigqueryDataset]: diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_test_connection.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_test_connection.py index 3aac78c154b2e..e21aadd91d7d5 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_test_connection.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_test_connection.py @@ -96,7 +96,9 @@ def metadata_read_capability_test( client: bigquery.Client = config.get_bigquery_client() assert client bigquery_data_dictionary = BigQuerySchemaApi( - BigQueryV2Report().schema_api_perf, client + report=BigQueryV2Report().schema_api_perf, + projects_client=config.get_projects_client(), + client=client, ) result = bigquery_data_dictionary.get_datasets_for_project_id( project_id, 10 diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py index 496bd64d3b4fe..9d15691491740 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py @@ -479,7 +479,9 @@ def lineage_via_catalog_lineage_api( lineage_client: lineage_v1.LineageClient = lineage_v1.LineageClient() data_dictionary = BigQuerySchemaApi( - self.report.schema_api_perf, self.config.get_bigquery_client() + self.report.schema_api_perf, + self.config.get_bigquery_client(), + self.config.get_projects_client(), ) # Filtering datasets diff --git a/metadata-ingestion/tests/integration/bigquery_v2/bigquery_project_label_mcp_golden.json b/metadata-ingestion/tests/integration/bigquery_v2/bigquery_project_label_mcp_golden.json new file mode 100644 index 0000000000000..a529ddc6221a7 --- /dev/null +++ b/metadata-ingestion/tests/integration/bigquery_v2/bigquery_project_label_mcp_golden.json @@ -0,0 +1,452 @@ +[ +{ + "entityType": "container", + "entityUrn": "urn:li:container:f284164f9a7db03ca6bbdb7bb17d5a7e", + "changeType": "UPSERT", + "aspectName": "containerProperties", + "aspect": { + "json": { + "customProperties": { + "platform": "bigquery", + "env": "PROD", + "project_id": "dev" + }, + "name": "dev" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:f284164f9a7db03ca6bbdb7bb17d5a7e", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:f284164f9a7db03ca6bbdb7bb17d5a7e", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:bigquery" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:f284164f9a7db03ca6bbdb7bb17d5a7e", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Project" + ] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:f284164f9a7db03ca6bbdb7bb17d5a7e", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:ce17940c2d64e7e315e68f8d7d071b1e", + "changeType": "UPSERT", + "aspectName": "containerProperties", + "aspect": { + "json": { + "customProperties": { + "platform": "bigquery", + "env": "PROD", + "project_id": "dev", + "dataset_id": "bigquery-dataset-1" + }, + "externalUrl": "https://console.cloud.google.com/bigquery?project=dev&ws=!1m4!1m3!3m2!1sdev!2sbigquery-dataset-1", + "name": "bigquery-dataset-1" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:ce17940c2d64e7e315e68f8d7d071b1e", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:ce17940c2d64e7e315e68f8d7d071b1e", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:bigquery" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:ce17940c2d64e7e315e68f8d7d071b1e", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Dataset" + ] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:ce17940c2d64e7e315e68f8d7d071b1e", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:f284164f9a7db03ca6bbdb7bb17d5a7e" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:ce17940c2d64e7e315e68f8d7d071b1e", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:container:f284164f9a7db03ca6bbdb7bb17d5a7e", + "urn": "urn:li:container:f284164f9a7db03ca6bbdb7bb17d5a7e" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,dev.bigquery-dataset-1.table-1,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,dev.bigquery-dataset-1.table-1,PROD)", + "changeType": "UPSERT", + "aspectName": "schemaMetadata", + "aspect": { + "json": { + "schemaName": "dev.bigquery-dataset-1.table-1", + "platform": "urn:li:dataPlatform:bigquery", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.schema.MySqlDDL": { + "tableSchema": "" + } + }, + "fields": [ + { + "fieldPath": "age", + "nullable": false, + "description": "comment", + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "INT", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Test Policy Tag" + } + ] + }, + "glossaryTerms": { + "terms": [ + { + "urn": "urn:li:glossaryTerm:Age" + } + ], + "auditStamp": { + "time": 1643871600000, + "actor": "urn:li:corpuser:datahub" + } + }, + "isPartOfKey": false + }, + { + "fieldPath": "email", + "nullable": false, + "description": "comment", + "type": { + "type": { + "com.linkedin.schema.StringType": {} + } + }, + "nativeDataType": "STRING", + "recursive": false, + "globalTags": { + "tags": [] + }, + "glossaryTerms": { + "terms": [ + { + "urn": "urn:li:glossaryTerm:Email_Address" + } + ], + "auditStamp": { + "time": 1643871600000, + "actor": "urn:li:corpuser:datahub" + } + }, + "isPartOfKey": false + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,dev.bigquery-dataset-1.table-1,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetProperties", + "aspect": { + "json": { + "customProperties": {}, + "externalUrl": "https://console.cloud.google.com/bigquery?project=dev&ws=!1m5!1m4!4m3!1sdev!2sbigquery-dataset-1!3stable-1", + "name": "table-1", + "qualifiedName": "dev.bigquery-dataset-1.table-1", + "description": "", + "tags": [] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,dev.bigquery-dataset-1.table-1,PROD)", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:ce17940c2d64e7e315e68f8d7d071b1e" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,dev.bigquery-dataset-1.table-1,PROD)", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:bigquery", + "instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:bigquery,dev)" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,dev.bigquery-dataset-1.table-1,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Table" + ] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,dev.bigquery-dataset-1.table-1,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:container:f284164f9a7db03ca6bbdb7bb17d5a7e", + "urn": "urn:li:container:f284164f9a7db03ca6bbdb7bb17d5a7e" + }, + { + "id": "urn:li:container:ce17940c2d64e7e315e68f8d7d071b1e", + "urn": "urn:li:container:ce17940c2d64e7e315e68f8d7d071b1e" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "glossaryTerm", + "entityUrn": "urn:li:glossaryTerm:Age", + "changeType": "UPSERT", + "aspectName": "glossaryTermKey", + "aspect": { + "json": { + "name": "Age" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "glossaryTerm", + "entityUrn": "urn:li:glossaryTerm:Email_Address", + "changeType": "UPSERT", + "aspectName": "glossaryTermKey", + "aspect": { + "json": { + "name": "Email_Address" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "tag", + "entityUrn": "urn:li:tag:Test Policy Tag", + "changeType": "UPSERT", + "aspectName": "tagKey", + "aspect": { + "json": { + "name": "Test Policy Tag" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py b/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py index 762c73d2a55c6..dff7f18db6135 100644 --- a/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py +++ b/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py @@ -15,6 +15,7 @@ from datahub.ingestion.source.bigquery_v2.bigquery_schema import ( BigqueryColumn, BigqueryDataset, + BigqueryProject, BigQuerySchemaApi, BigqueryTable, ) @@ -39,6 +40,33 @@ def random_email(): ) +def recipe(mcp_output_path: str, override: dict = {}) -> dict: + return { + "source": { + "type": "bigquery", + "config": { + "project_ids": ["project-id-1"], + "include_usage_statistics": False, + "include_table_lineage": False, + "include_data_platform_instance": True, + "classification": ClassificationConfig( + enabled=True, + classifiers=[ + DynamicTypedClassifierConfig( + type="datahub", + config=DataHubClassifierConfig( + minimum_values_threshold=1, + ), + ) + ], + max_workers=1, + ).dict(), + }, + }, + "sink": {"type": "file", "config": {"filename": mcp_output_path}}, + } + + @freeze_time(FROZEN_TIME) @patch.object(BigQuerySchemaApi, "get_tables_for_dataset") @patch.object(BigQuerySchemaGenerator, "get_core_table_details") @@ -47,9 +75,11 @@ def random_email(): @patch.object(BigQueryDataReader, "get_sample_data_for_table") @patch("google.cloud.bigquery.Client") @patch("google.cloud.datacatalog_v1.PolicyTagManagerClient") +@patch("google.cloud.resourcemanager_v3.ProjectsClient") def test_bigquery_v2_ingest( client, policy_tag_manager_client, + projects_client, get_sample_data_for_table, get_columns_for_dataset, get_datasets_for_project_id, @@ -111,33 +141,105 @@ def test_bigquery_v2_ingest( ) get_tables_for_dataset.return_value = iter([bigquery_table]) - source_config_dict: Dict[str, Any] = { - "project_ids": ["project-id-1"], - "include_usage_statistics": False, - "include_table_lineage": False, - "include_data_platform_instance": True, - "classification": ClassificationConfig( - enabled=True, - classifiers=[ - DynamicTypedClassifierConfig( - type="datahub", - config=DataHubClassifierConfig( - minimum_values_threshold=1, - ), - ) - ], - max_workers=1, - ).dict(), - } + pipeline_config_dict: Dict[str, Any] = recipe(mcp_output_path=mcp_output_path) - pipeline_config_dict: Dict[str, Any] = { - "source": { - "type": "bigquery", - "config": source_config_dict, - }, - "sink": {"type": "file", "config": {"filename": mcp_output_path}}, + run_and_get_pipeline(pipeline_config_dict) + + mce_helpers.check_golden_file( + pytestconfig, + output_path=mcp_output_path, + golden_path=mcp_golden_path, + ) + + +@freeze_time(FROZEN_TIME) +@patch.object(BigQuerySchemaApi, attribute="get_projects_with_labels") +@patch.object(BigQuerySchemaApi, "get_tables_for_dataset") +@patch.object(BigQuerySchemaGenerator, "get_core_table_details") +@patch.object(BigQuerySchemaApi, "get_datasets_for_project_id") +@patch.object(BigQuerySchemaApi, "get_columns_for_dataset") +@patch.object(BigQueryDataReader, "get_sample_data_for_table") +@patch("google.cloud.bigquery.Client") +@patch("google.cloud.datacatalog_v1.PolicyTagManagerClient") +@patch("google.cloud.resourcemanager_v3.ProjectsClient") +def test_bigquery_v2_project_labels_ingest( + client, + policy_tag_manager_client, + projects_client, + get_sample_data_for_table, + get_columns_for_dataset, + get_datasets_for_project_id, + get_core_table_details, + get_tables_for_dataset, + get_projects_with_labels, + pytestconfig, + tmp_path, +): + test_resources_dir = pytestconfig.rootpath / "tests/integration/bigquery_v2" + mcp_golden_path = f"{test_resources_dir}/bigquery_project_label_mcp_golden.json" + mcp_output_path = "{}/{}".format(tmp_path, "bigquery_project_label_mcp_output.json") + + get_datasets_for_project_id.return_value = [ + BigqueryDataset(name="bigquery-dataset-1") + ] + + get_projects_with_labels.return_value = [ + BigqueryProject(id="dev", name="development") + ] + + table_list_item = TableListItem( + {"tableReference": {"projectId": "", "datasetId": "", "tableId": ""}} + ) + table_name = "table-1" + get_core_table_details.return_value = {table_name: table_list_item} + get_columns_for_dataset.return_value = { + table_name: [ + BigqueryColumn( + name="age", + ordinal_position=1, + is_nullable=False, + field_path="col_1", + data_type="INT", + comment="comment", + is_partition_column=False, + cluster_column_position=None, + policy_tags=["Test Policy Tag"], + ), + BigqueryColumn( + name="email", + ordinal_position=1, + is_nullable=False, + field_path="col_2", + data_type="STRING", + comment="comment", + is_partition_column=False, + cluster_column_position=None, + ), + ] + } + get_sample_data_for_table.return_value = { + "age": [random.randint(1, 80) for i in range(20)], + "email": [random_email() for i in range(20)], } + bigquery_table = BigqueryTable( + name=table_name, + comment=None, + created=None, + last_altered=None, + size_in_bytes=None, + rows_count=None, + ) + get_tables_for_dataset.return_value = iter([bigquery_table]) + + pipeline_config_dict: Dict[str, Any] = recipe(mcp_output_path=mcp_output_path) + + del pipeline_config_dict["source"]["config"]["project_ids"] + + pipeline_config_dict["source"]["config"]["project_labels"] = [ + "environment:development" + ] + run_and_get_pipeline(pipeline_config_dict) mce_helpers.check_golden_file( diff --git a/metadata-ingestion/tests/unit/test_bigquery_source.py b/metadata-ingestion/tests/unit/test_bigquery_source.py index 746cf9b0acfc3..d12ffbcbbcf10 100644 --- a/metadata-ingestion/tests/unit/test_bigquery_source.py +++ b/metadata-ingestion/tests/unit/test_bigquery_source.py @@ -170,7 +170,11 @@ def test_bigquery_uri_with_credential(): @patch.object(BigQueryV2Config, "get_bigquery_client") -def test_get_projects_with_project_ids(get_bq_client_mock): +@patch.object(BigQueryV2Config, "get_projects_client") +def test_get_projects_with_project_ids( + get_projects_client, + get_bq_client_mock, +): client_mock = MagicMock() get_bq_client_mock.return_value = client_mock config = BigQueryV2Config.parse_obj( @@ -197,8 +201,10 @@ def test_get_projects_with_project_ids(get_bq_client_mock): @patch.object(BigQueryV2Config, "get_bigquery_client") +@patch.object(BigQueryV2Config, "get_projects_client") def test_get_projects_with_project_ids_overrides_project_id_pattern( - get_bq_client_mock, + get_projects_client, + get_bigquery_client, ): config = BigQueryV2Config.parse_obj( { @@ -226,7 +232,11 @@ def test_platform_instance_config_always_none(): @patch.object(BigQueryV2Config, "get_bigquery_client") -def test_get_dataplatform_instance_aspect_returns_project_id(get_bq_client_mock): +@patch.object(BigQueryV2Config, "get_projects_client") +def test_get_dataplatform_instance_aspect_returns_project_id( + get_projects_client, + get_bq_client_mock, +): project_id = "project_id" expected_instance = ( f"urn:li:dataPlatformInstance:(urn:li:dataPlatform:bigquery,{project_id})" @@ -247,7 +257,11 @@ def test_get_dataplatform_instance_aspect_returns_project_id(get_bq_client_mock) @patch.object(BigQueryV2Config, "get_bigquery_client") -def test_get_dataplatform_instance_default_no_instance(get_bq_client_mock): +@patch.object(BigQueryV2Config, "get_projects_client") +def test_get_dataplatform_instance_default_no_instance( + get_projects_client, + get_bq_client_mock, +): config = BigQueryV2Config.parse_obj({}) source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test")) schema_gen = source.bq_schema_extractor @@ -263,7 +277,11 @@ def test_get_dataplatform_instance_default_no_instance(get_bq_client_mock): @patch.object(BigQueryV2Config, "get_bigquery_client") -def test_get_projects_with_single_project_id(get_bq_client_mock): +@patch.object(BigQueryV2Config, "get_projects_client") +def test_get_projects_with_single_project_id( + get_projects_client, + get_bq_client_mock, +): client_mock = MagicMock() get_bq_client_mock.return_value = client_mock config = BigQueryV2Config.parse_obj({"project_id": "test-3"}) @@ -275,9 +293,10 @@ def test_get_projects_with_single_project_id(get_bq_client_mock): @patch.object(BigQueryV2Config, "get_bigquery_client") -def test_get_projects_by_list(get_bq_client_mock): +@patch.object(BigQueryV2Config, "get_projects_client") +def test_get_projects_by_list(get_projects_client, get_bigquery_client): client_mock = MagicMock() - get_bq_client_mock.return_value = client_mock + get_bigquery_client.return_value = client_mock first_page = MagicMock() first_page.__iter__.return_value = iter( @@ -296,6 +315,7 @@ def test_get_projects_by_list(get_bq_client_mock): ] ) second_page.next_page_token = None + client_mock.list_projects.side_effect = [first_page, second_page] config = BigQueryV2Config.parse_obj({}) @@ -311,7 +331,10 @@ def test_get_projects_by_list(get_bq_client_mock): @patch.object(BigQuerySchemaApi, "get_projects") @patch.object(BigQueryV2Config, "get_bigquery_client") -def test_get_projects_filter_by_pattern(get_bq_client_mock, get_projects_mock): +@patch.object(BigQueryV2Config, "get_projects_client") +def test_get_projects_filter_by_pattern( + get_projects_client, get_bq_client_mock, get_projects_mock +): get_projects_mock.return_value = [ BigqueryProject("test-project", "Test Project"), BigqueryProject("test-project-2", "Test Project 2"), @@ -329,7 +352,10 @@ def test_get_projects_filter_by_pattern(get_bq_client_mock, get_projects_mock): @patch.object(BigQuerySchemaApi, "get_projects") @patch.object(BigQueryV2Config, "get_bigquery_client") -def test_get_projects_list_empty(get_bq_client_mock, get_projects_mock): +@patch.object(BigQueryV2Config, "get_projects_client") +def test_get_projects_list_empty( + get_projects_client, get_bq_client_mock, get_projects_mock +): get_projects_mock.return_value = [] config = BigQueryV2Config.parse_obj( @@ -342,7 +368,9 @@ def test_get_projects_list_empty(get_bq_client_mock, get_projects_mock): @patch.object(BigQueryV2Config, "get_bigquery_client") +@patch.object(BigQueryV2Config, "get_projects_client") def test_get_projects_list_failure( + get_projects_client: MagicMock, get_bq_client_mock: MagicMock, caplog: pytest.LogCaptureFixture, ) -> None: @@ -366,7 +394,10 @@ def test_get_projects_list_failure( @patch.object(BigQuerySchemaApi, "get_projects") @patch.object(BigQueryV2Config, "get_bigquery_client") -def test_get_projects_list_fully_filtered(get_projects_mock, get_bq_client_mock): +@patch.object(BigQueryV2Config, "get_projects_client") +def test_get_projects_list_fully_filtered( + get_projects_mock, get_bq_client_mock, get_projects_client +): get_projects_mock.return_value = [BigqueryProject("test-project", "Test Project")] config = BigQueryV2Config.parse_obj( @@ -399,7 +430,10 @@ def bigquery_table() -> BigqueryTable: @patch.object(BigQueryV2Config, "get_bigquery_client") -def test_gen_table_dataset_workunits(get_bq_client_mock, bigquery_table): +@patch.object(BigQueryV2Config, "get_projects_client") +def test_gen_table_dataset_workunits( + get_projects_client, get_bq_client_mock, bigquery_table +): project_id = "test-project" dataset_name = "test-dataset" config = BigQueryV2Config.parse_obj( @@ -471,7 +505,8 @@ def test_gen_table_dataset_workunits(get_bq_client_mock, bigquery_table): @patch.object(BigQueryV2Config, "get_bigquery_client") -def test_simple_upstream_table_generation(get_bq_client_mock): +@patch.object(BigQueryV2Config, "get_projects_client") +def test_simple_upstream_table_generation(get_bq_client_mock, get_projects_client): a: BigQueryTableRef = BigQueryTableRef( BigqueryTableIdentifier( project_id="test-project", dataset="test-dataset", table="a" @@ -503,8 +538,10 @@ def test_simple_upstream_table_generation(get_bq_client_mock): @patch.object(BigQueryV2Config, "get_bigquery_client") +@patch.object(BigQueryV2Config, "get_projects_client") def test_upstream_table_generation_with_temporary_table_without_temp_upstream( get_bq_client_mock, + get_projects_client, ): a: BigQueryTableRef = BigQueryTableRef( BigqueryTableIdentifier( @@ -536,7 +573,10 @@ def test_upstream_table_generation_with_temporary_table_without_temp_upstream( @patch.object(BigQueryV2Config, "get_bigquery_client") -def test_upstream_table_column_lineage_with_temp_table(get_bq_client_mock): +@patch.object(BigQueryV2Config, "get_projects_client") +def test_upstream_table_column_lineage_with_temp_table( + get_bq_client_mock, get_projects_client +): from datahub.ingestion.api.common import PipelineContext a: BigQueryTableRef = BigQueryTableRef( @@ -611,8 +651,9 @@ def test_upstream_table_column_lineage_with_temp_table(get_bq_client_mock): @patch.object(BigQueryV2Config, "get_bigquery_client") +@patch.object(BigQueryV2Config, "get_projects_client") def test_upstream_table_generation_with_temporary_table_with_multiple_temp_upstream( - get_bq_client_mock, + get_bq_client_mock, get_projects_client ): a: BigQueryTableRef = BigQueryTableRef( BigqueryTableIdentifier( @@ -675,7 +716,10 @@ def test_upstream_table_generation_with_temporary_table_with_multiple_temp_upstr @patch.object(BigQuerySchemaApi, "get_tables_for_dataset") @patch.object(BigQueryV2Config, "get_bigquery_client") -def test_table_processing_logic(get_bq_client_mock, data_dictionary_mock): +@patch.object(BigQueryV2Config, "get_projects_client") +def test_table_processing_logic( + get_projects_client, get_bq_client_mock, data_dictionary_mock +): client_mock = MagicMock() get_bq_client_mock.return_value = client_mock config = BigQueryV2Config.parse_obj( @@ -747,8 +791,9 @@ def test_table_processing_logic(get_bq_client_mock, data_dictionary_mock): @patch.object(BigQuerySchemaApi, "get_tables_for_dataset") @patch.object(BigQueryV2Config, "get_bigquery_client") +@patch.object(BigQueryV2Config, "get_projects_client") def test_table_processing_logic_date_named_tables( - get_bq_client_mock, data_dictionary_mock + get_projects_client, get_bq_client_mock, data_dictionary_mock ): client_mock = MagicMock() get_bq_client_mock.return_value = client_mock @@ -859,8 +904,10 @@ def bigquery_view_2() -> BigqueryView: @patch.object(BigQuerySchemaApi, "get_query_result") @patch.object(BigQueryV2Config, "get_bigquery_client") +@patch.object(BigQueryV2Config, "get_projects_client") def test_get_views_for_dataset( get_bq_client_mock: Mock, + get_projects_client: MagicMock, query_mock: Mock, bigquery_view_1: BigqueryView, bigquery_view_2: BigqueryView, @@ -889,7 +936,9 @@ def test_get_views_for_dataset( ) query_mock.return_value = [row1, row2] bigquery_data_dictionary = BigQuerySchemaApi( - BigQueryV2Report().schema_api_perf, client_mock + report=BigQueryV2Report().schema_api_perf, + client=client_mock, + projects_client=MagicMock(), ) views = bigquery_data_dictionary.get_views_for_dataset( @@ -905,8 +954,9 @@ def test_get_views_for_dataset( BigQuerySchemaGenerator, "gen_dataset_workunits", lambda *args, **kwargs: [] ) @patch.object(BigQueryV2Config, "get_bigquery_client") +@patch.object(BigQueryV2Config, "get_projects_client") def test_gen_view_dataset_workunits( - get_bq_client_mock, bigquery_view_1, bigquery_view_2 + get_projects_client, get_bq_client_mock, bigquery_view_1, bigquery_view_2 ): project_id = "test-project" dataset_name = "test-dataset" @@ -963,7 +1013,9 @@ def bigquery_snapshot() -> BigqueryTableSnapshot: @patch.object(BigQuerySchemaApi, "get_query_result") @patch.object(BigQueryV2Config, "get_bigquery_client") +@patch.object(BigQueryV2Config, "get_projects_client") def test_get_snapshots_for_dataset( + get_projects_client: MagicMock, get_bq_client_mock: Mock, query_mock: Mock, bigquery_snapshot: BigqueryTableSnapshot, @@ -988,7 +1040,9 @@ def test_get_snapshots_for_dataset( ) query_mock.return_value = [row1] bigquery_data_dictionary = BigQuerySchemaApi( - BigQueryV2Report().schema_api_perf, client_mock + report=BigQueryV2Report().schema_api_perf, + client=client_mock, + projects_client=MagicMock(), ) snapshots = bigquery_data_dictionary.get_snapshots_for_dataset( @@ -1001,7 +1055,10 @@ def test_get_snapshots_for_dataset( @patch.object(BigQueryV2Config, "get_bigquery_client") -def test_gen_snapshot_dataset_workunits(get_bq_client_mock, bigquery_snapshot): +@patch.object(BigQueryV2Config, "get_projects_client") +def test_gen_snapshot_dataset_workunits( + get_bq_client_mock, get_projects_client, bigquery_snapshot +): project_id = "test-project" dataset_name = "test-dataset" config = BigQueryV2Config.parse_obj( @@ -1140,7 +1197,9 @@ def test_default_config_for_excluding_projects_and_datasets(): @patch.object(BigQueryConnectionConfig, "get_bigquery_client", new=lambda self: None) @patch.object(BigQuerySchemaApi, "get_datasets_for_project_id") +@patch.object(BigQueryV2Config, "get_projects_client") def test_excluding_empty_projects_from_ingestion( + get_projects_client, get_datasets_for_project_id_mock, ): project_id_with_datasets = "project-id-with-datasets" @@ -1173,3 +1232,32 @@ def get_datasets_for_project_id_side_effect( config = BigQueryV2Config.parse_obj({**base_config, "exclude_empty_projects": True}) source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test-2")) assert len({wu.metadata.entityUrn for wu in source.get_workunits()}) == 1 # type: ignore + + +@patch.object(BigQueryV2Config, "get_bigquery_client") +@patch.object(BigQueryV2Config, "get_projects_client") +def test_get_projects_with_project_labels( + get_projects_client, + get_bq_client_mock, +): + client_mock = MagicMock() + + get_projects_client.return_value = client_mock + + client_mock.search_projects.return_value = [ + SimpleNamespace(project_id="dev", display_name="dev_project"), + SimpleNamespace(project_id="qa", display_name="qa_project"), + ] + + config = BigQueryV2Config.parse_obj( + { + "project_labels": ["environment:dev", "environment:qa"], + } + ) + + source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test1")) + + assert source._get_projects() == [ + BigqueryProject("dev", "dev_project"), + BigqueryProject("qa", "qa_project"), + ]