Skip to content

Commit

Permalink
feat(ingest): working with multiple bigquery projects (datahub-projec…
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored and alexey-kravtsov committed Jul 8, 2022
1 parent e124215 commit 95bf8a2
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 54 deletions.
10 changes: 10 additions & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,17 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
## Next

### Breaking Changes
- #5240 `lineage_client_project_id` in `bigquery` source is removed. Use `storage_project_id` instead.

### Potential Downtime

### Deprecations

### Other notable Changes

## `v0.8.39`

### Breaking Changes
- Refactored the `health` field of the `Dataset` GraphQL Type to be of type **list of HealthStatus** (was type **HealthStatus**). See [this PR](https://github.com/datahub-project/datahub/pull/5222/files) for more details.

### Potential Downtime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ source:
# - "schema.table.column"
# deny:
# - "*.*.*"
#lineage_client_project_id: project-id-1234567
#storage_project_id: project-id-1234567

## see https://datahubproject.io/docs/metadata-ingestion/sink_docs/datahub for complete documentation
sink:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,7 @@ def generate_profiles(
requests: List[GEProfilerRequest],
max_workers: int,
platform: Optional[str] = None,
profiler_args: Optional[Dict] = None,
) -> Iterable[Tuple[GEProfilerRequest, Optional[DatasetProfileClass]]]:
with PerfTimer() as timer, concurrent.futures.ThreadPoolExecutor(
max_workers=max_workers
Expand Down Expand Up @@ -758,6 +759,7 @@ def generate_profiles(
query_combiner,
request,
platform=platform,
profiler_args=profiler_args,
)
for request in requests
]
Expand Down Expand Up @@ -817,11 +819,13 @@ def _generate_profile_from_request(
query_combiner: SQLAlchemyQueryCombiner,
request: GEProfilerRequest,
platform: Optional[str] = None,
profiler_args: Optional[Dict] = None,
) -> Tuple[GEProfilerRequest, Optional[DatasetProfileClass]]:
return request, self._generate_single_profile(
query_combiner=query_combiner,
pretty_name=request.pretty_name,
platform=platform,
profiler_args=profiler_args,
**request.batch_kwargs,
)

Expand All @@ -844,8 +848,12 @@ def _generate_single_profile(
partition: Optional[str] = None,
custom_sql: Optional[str] = None,
platform: Optional[str] = None,
profiler_args: Optional[Dict] = None,
**kwargs: Any,
) -> Optional[DatasetProfileClass]:
logger.debug(
f"Received single profile request for {pretty_name} for {schema}, {table}, {custom_sql}"
)
bigquery_temp_table: Optional[str] = None

ge_config = {
Expand All @@ -858,16 +866,19 @@ def _generate_single_profile(

# We have to create temporary tables if offset or limit or custom sql is set on Bigquery
if custom_sql or self.config.limit or self.config.offset:
if profiler_args is not None:
temp_table_db = profiler_args.get("temp_table_db", schema)
if platform is not None and platform == "bigquery":
ge_config["schema"] = temp_table_db

if self.config.bigquery_temp_table_schema:
bigquery_temp_table = (
f"{self.config.bigquery_temp_table_schema}.ge-temp-{uuid.uuid4()}"
)
bigquery_temp_table = f"{temp_table_db}.{self.config.bigquery_temp_table_schema}.ge-temp-{uuid.uuid4()}"
else:
assert table
table_parts = table.split(".")
if len(table_parts) == 2:
bigquery_temp_table = (
f"{schema}.{table_parts[0]}.ge-temp-{uuid.uuid4()}"
f"{temp_table_db}.{table_parts[0]}.ge-temp-{uuid.uuid4()}"
)

# With this pr there is no option anymore to set the bigquery temp table:
Expand Down Expand Up @@ -941,6 +952,7 @@ def _get_ge_dataset(
# },
# )

logger.debug(f"Got pretty_name={pretty_name}, kwargs={batch_kwargs}")
expectation_suite_name = ge_context.datasource_name + "." + pretty_name

ge_context.data_context.create_expectation_suite(
Expand All @@ -955,4 +967,15 @@ def _get_ge_dataset(
**batch_kwargs,
},
)
if platform is not None and platform == "bigquery":
# This is done as GE makes the name as DATASET.TABLE
# but we want it to be PROJECT.DATASET.TABLE instead for multi-project setups
logger.debug(f"Setting table name to be {pretty_name}")
batch._table = sa.text(pretty_name)
name_parts = pretty_name.split(".")
if len(name_parts) != 3:
logger.error(
f"Unexpected {pretty_name} while profiling. Should have 3 parts but has {len(name_parts)} parts."
)

return batch
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class GEProfilingConfig(ConfigModel):
partition_profiling_enabled: bool = Field(default=True, description="")
bigquery_temp_table_schema: Optional[str] = Field(
default=None,
description="On bigquery for profiling partitioned tables needs to create temporary views. You have to define a schema where these will be created. Views will be cleaned up after profiler runs. (Great expectation tech details about this (https://legacy.docs.greatexpectations.io/en/0.9.0/reference/integrations/bigquery.html#custom-queries-with-sql-datasource).",
description="On bigquery for profiling partitioned tables needs to create temporary views. You have to define a dataset where these will be created. Views will be cleaned up after profiler runs. (Great expectation tech details about this (https://legacy.docs.greatexpectations.io/en/0.9.0/reference/integrations/bigquery.html#custom-queries-with-sql-datasource).",
)
partition_datetime: Optional[datetime.datetime] = Field(
default=None,
Expand Down
97 changes: 56 additions & 41 deletions metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from google.cloud.logging_v2.client import Client as GCPLoggingClient
from ratelimiter import RateLimiter
from sqlalchemy import create_engine, inspect
from sqlalchemy.engine import Engine
from sqlalchemy.engine.reflection import Inspector

from datahub.emitter import mce_builder
Expand All @@ -36,6 +37,7 @@
support_status,
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.ge_data_profiler import DatahubGEProfiler
from datahub.ingestion.source.sql.sql_common import (
SQLAlchemyConfig,
SQLAlchemySource,
Expand Down Expand Up @@ -332,8 +334,17 @@ def __init__(self, config, ctx):
self.partition_info: Dict[str, str] = dict()
atexit.register(cleanup, config)

def get_db_name(self, inspector: Inspector = None) -> str:
if self.config.project_id:
def get_db_name(
self, inspector: Inspector = None, for_sql_queries: bool = True
) -> str:
"""
for_sql_queries - Used mainly for multi-project setups with different permissions
- should be set to True if this is to be used to run sql queries
- should be set to False if this is to inspect contents and not run sql queries
"""
if for_sql_queries and self.config.storage_project_id:
return self.config.storage_project_id
elif self.config.project_id:
return self.config.project_id
else:
return self._get_project_id(inspector)
Expand All @@ -342,13 +353,10 @@ def _compute_big_query_lineage(self) -> None:
if not self.config.include_table_lineage:
return

lineage_client_project_id = self._get_lineage_client_project_id()
if self.config.use_exported_bigquery_audit_metadata:
self._compute_bigquery_lineage_via_exported_bigquery_audit_metadata(
lineage_client_project_id
)
self._compute_bigquery_lineage_via_exported_bigquery_audit_metadata()
else:
self._compute_bigquery_lineage_via_gcp_logging(lineage_client_project_id)
self._compute_bigquery_lineage_via_gcp_logging()

if self.lineage_metadata is None:
self.lineage_metadata = {}
Expand All @@ -359,14 +367,11 @@ def _compute_big_query_lineage(self) -> None:
)
logger.debug(f"lineage metadata is {self.lineage_metadata}")

def _compute_bigquery_lineage_via_gcp_logging(
self, lineage_client_project_id: Optional[str]
) -> None:
def _compute_bigquery_lineage_via_gcp_logging(self) -> None:
project_id = self.get_db_name()
logger.info("Populating lineage info via GCP audit logs")
try:
_clients: List[GCPLoggingClient] = self._make_bigquery_client(
lineage_client_project_id
)
_clients: List[GCPLoggingClient] = self._make_gcp_logging_client(project_id)
template: str = BQ_FILTER_RULE_TEMPLATE

if self.config.use_v2_audit_metadata:
Expand All @@ -386,12 +391,11 @@ def _compute_bigquery_lineage_via_gcp_logging(
f"Error was {e}",
)

def _compute_bigquery_lineage_via_exported_bigquery_audit_metadata(
self, lineage_client_project_id: Optional[str]
) -> None:
def _compute_bigquery_lineage_via_exported_bigquery_audit_metadata(self) -> None:
project_id = self.get_db_name(for_sql_queries=True)
logger.info("Populating lineage info via exported GCP audit logs")
try:
_client: BigQueryClient = BigQueryClient(project=lineage_client_project_id)
_client: BigQueryClient = BigQueryClient(project=project_id)
exported_bigquery_audit_metadata: Iterable[
BigQueryAuditMetadata
] = self._get_exported_bigquery_audit_metadata(_client)
Expand All @@ -408,28 +412,18 @@ def _compute_bigquery_lineage_via_exported_bigquery_audit_metadata(
f"Error: {e}",
)

def _make_bigquery_client(
self, lineage_client_project_id: Optional[str]
def _make_gcp_logging_client(
self, project_id: Optional[str]
) -> List[GCPLoggingClient]:
# See https://github.com/googleapis/google-cloud-python/issues/2674 for
# why we disable gRPC here.
client_options = self.config.extra_client_options.copy()
client_options["_use_grpc"] = False
if lineage_client_project_id is not None:
return [
GCPLoggingClient(**client_options, project=lineage_client_project_id)
]
if project_id is not None:
return [GCPLoggingClient(**client_options, project=project_id)]
else:
return [GCPLoggingClient(**client_options)]

def _get_lineage_client_project_id(self) -> Optional[str]:
project_id: Optional[str] = (
self.config.lineage_client_project_id
if self.config.lineage_client_project_id
else self.config.project_id
)
return project_id

def _get_bigquery_log_entries(
self,
clients: List[GCPLoggingClient],
Expand Down Expand Up @@ -665,8 +659,7 @@ def is_table_partitioned(
if database:
project_id = database
else:
url = self.config.get_sql_alchemy_url()
engine = create_engine(url, **self.config.options)
engine = self._get_engine(for_run_sql=False)
with engine.connect() as con:
inspector = inspect(con)
project_id = self.get_db_name(inspector)
Expand All @@ -675,8 +668,8 @@ def is_table_partitioned(
def get_latest_partition(
self, schema: str, table: str
) -> Optional[BigQueryPartitionColumn]:
url = self.config.get_sql_alchemy_url()
engine = create_engine(url, **self.config.options)
logger.debug(f"get_latest_partition for {schema} and {table}")
engine = self._get_engine(for_run_sql=True)
with engine.connect() as con:
inspector = inspect(con)
project_id = self.get_db_name(inspector)
Expand All @@ -685,7 +678,9 @@ def get_latest_partition(
):
return None
sql = BQ_GET_LATEST_PARTITION_TEMPLATE.format(
project_id=project_id, schema=schema, table=table
project_id=self.get_db_name(inspector, for_sql_queries=True),
schema=schema,
table=table,
)
result = con.execute(sql)
# Bigquery only supports one partition column
Expand All @@ -709,8 +704,7 @@ def is_latest_shard(self, project_id: str, schema: str, table: str) -> bool:
table_name, shard = self.get_shard_from_table(table)
if shard:
logger.debug(f"{table_name} is sharded and shard id is: {shard}")
url = self.config.get_sql_alchemy_url()
engine = create_engine(url, **self.config.options)
engine = self._get_engine(for_run_sql=True)
if f"{project_id}.{schema}.{table_name}" not in self.maximum_shard_ids:
with engine.connect() as con:
sql = BQ_GET_LATEST_SHARD.format(
Expand All @@ -734,9 +728,13 @@ def is_latest_shard(self, project_id: str, schema: str, table: str) -> bool:
else:
return True

def _get_engine(self, for_run_sql: bool) -> Engine:
url = self.config.get_sql_alchemy_url(for_run_sql=for_run_sql)
logger.debug(f"sql_alchemy_url={url}")
return create_engine(url, **self.config.options)

def add_information_for_schema(self, inspector: Inspector, schema: str) -> None:
url = self.config.get_sql_alchemy_url()
engine = create_engine(url, **self.config.options)
engine = self._get_engine(for_run_sql=True)
project_id = self.get_db_name(inspector)
with engine.connect() as con:
inspector = inspect(con)
Expand Down Expand Up @@ -821,6 +819,22 @@ def generate_partition_profiler_query(
return shard, None
return None, None

def get_profiler_instance(self, inspector: Inspector) -> "DatahubGEProfiler":
logger.debug("Getting profiler instance from bigquery")
engine = self._get_engine(for_run_sql=True)
with engine.connect() as conn:
inspector = inspect(conn)

return DatahubGEProfiler(
conn=inspector.bind,
report=self.report,
config=self.config.profiling,
platform=self.platform,
)

def get_profile_args(self) -> Dict:
return {"temp_table_db": self.config.project_id}

def is_dataset_eligible_for_profiling(
self,
dataset_name: str,
Expand Down Expand Up @@ -884,6 +898,7 @@ def get_workunits(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]:
isinstance(wu, SqlWorkUnit)
and isinstance(wu.metadata, MetadataChangeEvent)
and isinstance(wu.metadata.proposedSnapshot, DatasetSnapshot)
and self.config.include_table_lineage
):
lineage_mcp = self.get_lineage_mcp(wu.metadata.proposedSnapshot.urn)
if lineage_mcp is not None:
Expand Down Expand Up @@ -977,7 +992,7 @@ def prepare_profiler_args(
custom_sql: Optional[str] = None,
) -> dict:
return dict(
schema=self.config.project_id,
schema=self.get_db_name(for_sql_queries=True),
table=f"{schema}.{table}",
partition=partition,
custom_sql=custom_sql,
Expand Down
16 changes: 13 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,7 @@ def get_workunits(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]:
profiler = None
profile_requests: List["GEProfilerRequest"] = []
if sql_config.profiling.enabled:
profiler = self._get_profiler_instance(inspector)
profiler = self.get_profiler_instance(inspector)

db_name = self.get_db_name(inspector)
yield from self.gen_database_containers(db_name)
Expand Down Expand Up @@ -1309,7 +1309,7 @@ def add_table_to_schema_container(
self.report.report_workunit(wu)
yield wu

def _get_profiler_instance(self, inspector: Inspector) -> "DatahubGEProfiler":
def get_profiler_instance(self, inspector: Inspector) -> "DatahubGEProfiler":
from datahub.ingestion.source.ge_data_profiler import DatahubGEProfiler

return DatahubGEProfiler(
Expand All @@ -1319,6 +1319,10 @@ def _get_profiler_instance(self, inspector: Inspector) -> "DatahubGEProfiler":
platform=self.platform,
)

def get_profile_args(self) -> Dict:
"""Passed down to GE profiler"""
return {}

# Override if needed
def generate_partition_profiler_query(
self, schema: str, table: str, partition_datetime: Optional[datetime.datetime]
Expand Down Expand Up @@ -1426,6 +1430,9 @@ def loop_profiler_requests(
continue

self.report.report_entity_profiled(dataset_name)
logger.debug(
f"Preparing profiling request for {schema}, {table}, {partition}"
)
yield GEProfilerRequest(
pretty_name=dataset_name,
batch_kwargs=self.prepare_profiler_args(
Expand All @@ -1443,7 +1450,10 @@ def loop_profiler(
platform: Optional[str] = None,
) -> Iterable[MetadataWorkUnit]:
for request, profile in profiler.generate_profiles(
profile_requests, self.config.profiling.max_workers, platform=platform
profile_requests,
self.config.profiling.max_workers,
platform=platform,
profiler_args=self.get_profile_args(),
):
if profile is None:
continue
Expand Down
Loading

0 comments on commit 95bf8a2

Please sign in to comment.