Skip to content

Commit

Permalink
fix(ingest): bigquery-beta - Lowering a bit memory footprint of bigqu…
Browse files Browse the repository at this point in the history
…ery usage (#6095)

* Lowering a bit memory footprint of bigquery usage
* Filtering out not seen tables from usage generation
  • Loading branch information
treff7es authored Oct 1, 2022
1 parent 2662163 commit 05f5c12
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -445,9 +445,6 @@ def get_workunits(self) -> Iterable[WorkUnit]:
logger.info(f"Processing project: {project_id.id}")
yield from self._process_project(conn, project_id)

if self.config.include_usage_statistics:
yield from self.usage_extractor.get_workunits()

if self.config.profiling.enabled:
yield from self.profiler.get_workunits(self.db_tables)

Expand Down Expand Up @@ -493,7 +490,26 @@ def _process_project(

if self.config.include_usage_statistics:
logger.info(f"Generate usage for {project_id}")
yield from self.usage_extractor.generate_usage_for_project(project_id)
tables: Dict[str, List[str]] = {}

for dataset in self.db_tables[project_id]:
tables[dataset] = [
table.name for table in self.db_tables[project_id][dataset]
]

for dataset in self.db_views[project_id]:
if not tables[dataset]:
tables[dataset] = [
table.name for table in self.db_views[project_id][dataset]
]
else:
tables[dataset].extend(
[table.name for table in self.db_views[project_id][dataset]]
)

yield from self.usage_extractor.generate_usage_for_project(
project_id, tables
)

def _process_schema(
self, conn: bigquery.Client, project_id: str, bigquery_dataset: BigqueryDataset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,15 +150,10 @@ class BigQueryUsageExtractor:
* Aggregation of these statistics into buckets, by day or hour granularity
:::note
1. This source only does usage statistics. To get the tables, views, and schemas in your BigQuery project, use the `bigquery` plugin.
2. Depending on the compliance policies setup for the bigquery instance, sometimes logging.read permission is not sufficient. In that case, use either admin or private log viewer permission.
1. Depending on the compliance policies setup for the bigquery instance, sometimes logging.read permission is not sufficient. In that case, use either admin or private log viewer permission.
:::
"""

aggregated_info: Dict[
datetime, Dict[BigQueryTableRef, AggregatedDataset]
] = collections.defaultdict(dict)

def __init__(self, config: BigQueryV2Config, report: BigQueryV2Report):
self.config: BigQueryV2Config = config
self.report: BigQueryV2Report = report
Expand All @@ -173,7 +168,13 @@ def _is_table_allowed(self, table_ref: Optional[BigQueryTableRef]) -> bool:
and self.config.table_pattern.allowed(table_ref.table_identifier.table)
)

def generate_usage_for_project(self, project_id: str) -> Iterable[MetadataWorkUnit]:
def generate_usage_for_project(
self, project_id: str, tables: Dict[str, List[str]]
) -> Iterable[MetadataWorkUnit]:
aggregated_info: Dict[
datetime, Dict[BigQueryTableRef, AggregatedDataset]
] = collections.defaultdict(dict)

parsed_bigquery_log_events: Iterable[
Union[ReadEvent, QueryEvent, MetadataWorkUnit]
]
Expand Down Expand Up @@ -221,24 +222,26 @@ def generate_usage_for_project(self, project_id: str) -> Iterable[MetadataWorkUn
yield operational_wu
self.report.num_operational_stats_workunits_emitted += 1
if event.read_event:
self.aggregated_info = self._aggregate_enriched_read_events(
self.aggregated_info, event
aggregated_info = self._aggregate_enriched_read_events(
aggregated_info, event, tables
)
num_aggregated += 1
logger.info(f"Total number of events aggregated = {num_aggregated}.")
bucket_level_stats: str = "\n\t" + "\n\t".join(
[
f'bucket:{db.strftime("%m-%d-%Y:%H:%M:%S")}, size={len(ads)}'
for db, ads in self.aggregated_info.items()
for db, ads in aggregated_info.items()
]
)
logger.debug(
f"Number of buckets created = {len(self.aggregated_info)}. Per-bucket details:{bucket_level_stats}"
f"Number of buckets created = {len(aggregated_info)}. Per-bucket details:{bucket_level_stats}"
)

self.report.usage_extraction_sec[project_id] = round(
timer.elapsed_seconds(), 2
)

yield from self.get_workunits(aggregated_info)
except Exception as e:
self.report.usage_failed_extraction.append(project_id)
logger.error(
Expand Down Expand Up @@ -746,6 +749,7 @@ def _aggregate_enriched_read_events(
self,
datasets: Dict[datetime, Dict[BigQueryTableRef, AggregatedDataset]],
event: AuditEvent,
tables: Dict[str, List[str]],
) -> Dict[datetime, Dict[BigQueryTableRef, AggregatedDataset]]:
if not event.read_event:
return datasets
Expand All @@ -756,6 +760,12 @@ def _aggregate_enriched_read_events(
resource: Optional[BigQueryTableRef] = None
try:
resource = event.read_event.resource.get_sanitized_table_ref()
if (
resource.table_identifier.get_table_display_name()
not in tables[resource.table_identifier.dataset]
):
logger.debug(f"Skipping non existing {resource} from usage")
return datasets
except Exception as e:
self.report.report_warning(
str(event.read_event.resource), f"Failed to clean up resource, {e}"
Expand Down Expand Up @@ -787,9 +797,11 @@ def _aggregate_enriched_read_events(

return datasets

def get_workunits(self):
def get_workunits(
self, aggregated_info: Dict[datetime, Dict[BigQueryTableRef, AggregatedDataset]]
) -> Iterable[MetadataWorkUnit]:
self.report.num_usage_workunits_emitted = 0
for time_bucket in self.aggregated_info.values():
for time_bucket in aggregated_info.values():
for aggregate in time_bucket.values():
wu = self._make_usage_stat(aggregate)
self.report.report_workunit(wu)
Expand Down

0 comments on commit 05f5c12

Please sign in to comment.