Skip to content

Commit

Permalink
fix(ingest/bigquery): Filter projects for lineage and usage (#7954)
Browse files Browse the repository at this point in the history
  • Loading branch information
asikowitz authored May 4, 2023
1 parent 3f8a532 commit 8019d17
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -492,29 +492,14 @@ def gen_dataset_containers(
)

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
logger.info("Getting projects")
conn: bigquery.Client = get_bigquery_client(self.config)
self.add_config_to_report()

projects = self._get_projects(conn)
if len(projects) == 0:
logger.error(
"Get projects didn't return any project. "
"Maybe resourcemanager.projects.get permission is missing for the service account. "
"You can assign predefined roles/bigquery.metadataViewer role to your service account."
)
self.report.report_failure(
"metadata-extraction",
"Get projects didn't return any project. "
"Maybe resourcemanager.projects.get permission is missing for the service account. "
"You can assign predefined roles/bigquery.metadataViewer role to your service account.",
)
if not projects:
return

for project_id in projects:
if not self.config.project_id_pattern.allowed(project_id.id):
self.report.report_dropped(project_id.id)
continue
logger.info(f"Processing project: {project_id.id}")
self.report.set_ingestion_stage(project_id.id, "Metadata Extraction")
yield from self._process_project(conn, project_id)
Expand Down Expand Up @@ -584,27 +569,37 @@ def _should_ingest_lineage(self) -> bool:
return True

def _get_projects(self, conn: bigquery.Client) -> List[BigqueryProject]:
logger.info("Getting projects")
if self.config.project_ids or self.config.project_id:
project_ids = self.config.project_ids or [self.config.project_id] # type: ignore
return [
BigqueryProject(id=project_id, name=project_id)
for project_id in project_ids
]
else:
try:
return BigQueryDataDictionary.get_projects(conn)
except Exception as e:
# TODO: Merge with error logging in `get_workunits_internal`
trace = traceback.format_exc()
logger.error(
f"Get projects didn't return any project. Maybe resourcemanager.projects.get permission is missing for the service account. You can assign predefined roles/bigquery.metadataViewer role to your service account. The error was: {e}"
)
logger.error(trace)
self.report.report_failure(
"metadata-extraction",
f"Get projects didn't return any project. Maybe resourcemanager.projects.get permission is missing for the service account. You can assign predefined roles/bigquery.metadataViewer role to your service account. The error was: {e} Stacktrace: {trace}",
)
return []
return list(self._get_project_list(conn))

def _get_project_list(self, conn: bigquery.Client) -> Iterable[BigqueryProject]:
try:
projects = BigQueryDataDictionary.get_projects(conn)
except Exception as e:
logger.error(f"Error getting projects. {e}", exc_info=True)
projects = []

if not projects: # Report failure on exception and if empty list is returned
self.report.report_failure(
"metadata-extraction",
"Get projects didn't return any project. "
"Maybe resourcemanager.projects.get permission is missing for the service account. "
"You can assign predefined roles/bigquery.metadataViewer role to your service account.",
)
return []

for project in projects:
if self.config.project_id_pattern.allowed(project.id):
yield project
else:
self.report.report_dropped(project.id)

def _process_project(
self, conn: bigquery.Client, bigquery_project: BigqueryProject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,11 @@ class BigQueryV2Config(
)
project_ids: List[str] = Field(
default_factory=list,
description="Ingests specified project_ids. Use this property if you only want to ingest one project and don't want to give project resourcemanager.projects.list to your service account.",
description=(
"Ingests specified project_ids. Use this property if you want to specify what projects to ingest or "
"don't want to give project resourcemanager.projects.list to your service account. "
"Overrides `project_id_pattern`."
),
)

project_on_behalf: Optional[str] = Field(
Expand Down
84 changes: 82 additions & 2 deletions metadata-ingestion/tests/unit/test_bigquery_source.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import json
import logging
import os
from datetime import datetime, timedelta, timezone
from types import SimpleNamespace
from typing import Any, Dict, Optional, cast
from unittest.mock import Mock, patch
from unittest.mock import MagicMock, Mock, patch

import pytest
from google.api_core.exceptions import GoogleAPICallError
from google.cloud.bigquery.table import Row, TableListItem

from datahub.ingestion.api.common import PipelineContext
Expand Down Expand Up @@ -111,6 +113,21 @@ def test_get_projects_with_project_ids(client_mock):
assert client_mock.list_projects.call_count == 0


def test_get_projects_with_project_ids_overrides_project_id_pattern():
config = BigQueryV2Config.parse_obj(
{
"project_ids": ["test-project", "test-project-2"],
"project_id_pattern": {"deny": ["^test-project$"]},
}
)
source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test"))
projects = source._get_projects(MagicMock())
assert projects == [
BigqueryProject(id="test-project", name="test-project"),
BigqueryProject(id="test-project-2", name="test-project-2"),
]


@patch("google.cloud.bigquery.client.Client")
def test_get_projects_with_single_project_id(client_mock):
config = BigQueryV2Config.parse_obj({"project_id": "test-3"})
Expand All @@ -122,7 +139,7 @@ def test_get_projects_with_single_project_id(client_mock):


@patch("google.cloud.bigquery.client.Client")
def test_get_projects(client_mock):
def test_get_projects_by_list(client_mock):
client_mock.list_projects.return_value = [
SimpleNamespace(
project_id="test-1",
Expand All @@ -143,6 +160,69 @@ def test_get_projects(client_mock):
assert client_mock.list_projects.call_count == 1


@patch.object(BigQueryDataDictionary, "get_projects")
def test_get_projects_filter_by_pattern(get_projects_mock):
get_projects_mock.return_value = [
BigqueryProject("test-project", "Test Project"),
BigqueryProject("test-project-2", "Test Project 2"),
]

config = BigQueryV2Config.parse_obj(
{"project_id_pattern": {"deny": ["^test-project$"]}}
)
source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test"))
projects = source._get_projects(MagicMock())
assert projects == [
BigqueryProject(id="test-project-2", name="Test Project 2"),
]


@patch.object(BigQueryDataDictionary, "get_projects")
def test_get_projects_list_empty(get_projects_mock):
get_projects_mock.return_value = []

config = BigQueryV2Config.parse_obj(
{"project_id_pattern": {"deny": ["^test-project$"]}}
)
source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test"))
projects = source._get_projects(MagicMock())
assert len(source.report.failures) == 1
assert projects == []


@patch.object(BigQueryDataDictionary, "get_projects")
def test_get_projects_list_failure(
get_projects_mock: MagicMock, caplog: pytest.LogCaptureFixture
) -> None:
error_str = "my error"
get_projects_mock.side_effect = GoogleAPICallError(error_str)

config = BigQueryV2Config.parse_obj(
{"project_id_pattern": {"deny": ["^test-project$"]}}
)
source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test"))
caplog.records.clear()
with caplog.at_level(logging.ERROR):
projects = source._get_projects(MagicMock())
assert len(caplog.records) == 1
assert error_str in caplog.records[0].msg
assert len(source.report.failures) == 1
assert projects == []


@patch.object(BigQueryDataDictionary, "get_projects")
def test_get_projects_list_fully_filtered(get_projects_mock):
get_projects_mock.return_value = [BigqueryProject("test-project", "Test Project")]

config = BigQueryV2Config.parse_obj(
{"project_id_pattern": {"deny": ["^test-project$"]}}
)
source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test"))
projects = source._get_projects(MagicMock())
assert len(source.report.failures) == 0
assert projects == []


def test_simple_upstream_table_generation():
a: BigQueryTableRef = BigQueryTableRef(
BigqueryTableIdentifier(
Expand Down

0 comments on commit 8019d17

Please sign in to comment.