Skip to content

Commit

Permalink
Prune columns over 10,000. (#535)
Browse files Browse the repository at this point in the history
Corrects issue 533 by removing any properties over the 10,000 limit.
Also speed up remove_duplicates by processing each dict once.

Co-authored-by: Jacob Ferriero <[email protected]>
  • Loading branch information
bmenasha and Jacob Ferriero authored Aug 11, 2020
1 parent dedc299 commit cf37d06
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 30 deletions.
75 changes: 45 additions & 30 deletions tools/asset-inventory/asset_inventory/bigquery_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
"""

import copy
from collections import defaultdict
from numbers import Number
import re

Expand All @@ -48,8 +49,10 @@
DATE_REGEX = re.compile(r'^\d\d\d\d-\d\d-\d\d$')
BQ_MAX_NUMERIC = 99999999999999999999999999999.999999999
BQ_MIN_NUMERIC = -99999999999999999999999999999.999999999
MAX_BQ_COL_NAME_LENGTH = 128
BQ_MAX_COL_NAME_LENGTH = 128
BQ_NUMERIC_SCALE_DIGITS = 9
BQ_MAX_DEPTH = 15
BQ_MAX_COLUMNS = 10000


def is_number(s):
Expand Down Expand Up @@ -223,10 +226,10 @@ def _convert_labels_dict_to_list(parent):
return parent


def _sanitize_property(property_name, parent, depth):
def _sanitize_property(property_name, parent, depth, num_properties):
"""Clean up json property for import into BigQuery.
Enforces some BigQuery requirements (see _santize_property_value for some
Enforces some BigQuery requirements (see _sanitize_property_value for some
others):
1. Covert all properties named "labels" from maps into list of "name",
Expand All @@ -250,6 +253,7 @@ def _sanitize_property(property_name, parent, depth):
property_name: Name of the property in the json oject.
parent: The json object containing the property.
depth: How nested within the original document we are.
num_properties: How many properties into the document we are.
"""
# if property was removed earlier, nothing to sanitize.
if property_name not in parent:
Expand All @@ -260,7 +264,7 @@ def _sanitize_property(property_name, parent, depth):
first_character = new_property_name[0]
if not first_character.isalpha() and first_character != '_':
new_property_name = '_' + new_property_name
new_property_name = new_property_name[:MAX_BQ_COL_NAME_LENGTH]
new_property_name = new_property_name[:BQ_MAX_COL_NAME_LENGTH]

# check if property was changed.
if property_name != new_property_name:
Expand All @@ -275,7 +279,8 @@ def _sanitize_property(property_name, parent, depth):
property_value = parent[new_property_name]

# recursivly descend.
sanitized = sanitize_property_value(property_value, depth=depth + 1)
sanitized = sanitize_property_value(property_value, depth=depth + 1,
num_properties=num_properties)

# else the value could have changed.
parent[new_property_name] = sanitized
Expand All @@ -288,32 +293,31 @@ def _sanitize_property(property_name, parent, depth):
# prune the value.
parent.pop(new_property_name)

# remove duplicates (condition #4)
remove_duplicates(new_property_name, parent)


def remove_duplicates(property_name, properties):
"""Ensure no other property in properties share the same name.
def remove_duplicates(properties):
"""Ensure no two property in properties share the same name.
Args:
property_name: name of property to check for.
properties: dictionary to modify.
BigQuery is case insensitive, remove any lexically greater property
in the dictionary that differ only by case.
"""
duplicates = []
for k in properties.keys():
if k.lower() == property_name.lower():
duplicates.append(k)
if len(duplicates) > 1:
selected_property = min(duplicates)
for p in duplicates:
if p != selected_property:
properties.pop(p)


def sanitize_property_value(property_value, depth=0):
duplicates = defaultdict(list)
# find duplicate properties
for k in properties:
duplicates[k.casefold()] += [k]

for k in duplicates:
duplicate_properties = duplicates[k]
# remove any properties that are duplicate
if len(duplicate_properties) > 1:
selected_property = min(duplicate_properties)
for p in duplicate_properties:
if p != selected_property:
properties.pop(p)


def sanitize_property_value(property_value, depth=0, num_properties=0):
"""Modifies supplied json object for BigQuery load.
Traverses the json object and modifies it to conform to BigQuery
Expand All @@ -326,17 +330,24 @@ def sanitize_property_value(property_value, depth=0):
2. rounds/truncates numeric values to be between BigQuery limits for NUMERIC
values.
3. Prunes any value after the 10,000'th property.
Args:
property_value: Json object.
depth: Level of embedding within the json document.
depth: Level of embedding within the document.
num_properties: Number of properties processed within the document.
Returns:
Modified json object.
"""

# BigQuery can't deal with more then 15 nested fields.
# BigQuery can't deal with too many nested fields.
# prune it.
if depth > 15:
if depth > BQ_MAX_DEPTH:
return {}

# BigQuery can't handle too many columns.
if num_properties > BQ_MAX_COLUMNS:
return {}

# NUMERIC data type is an exact numeric value with 38 digits of precision
Expand All @@ -350,13 +361,17 @@ def sanitize_property_value(property_value, depth=0):
# sanitize each nested list element.
if isinstance(property_value, list):
for array_item in property_value:
sanitize_property_value(array_item, depth)
sanitize_property_value(array_item, depth, num_properties)

# and each nested json object.
if isinstance(property_value, dict):
remove_duplicates(property_value)
for child_property in dict(property_value):
_sanitize_property(child_property, property_value, depth)

# count it.
num_properties += 1
# sanitize each property.
_sanitize_property(child_property, property_value, depth,
num_properties)
return property_value


Expand Down
17 changes: 17 additions & 0 deletions tools/asset-inventory/tests/test_bigquery_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,23 @@ def test_remove_duplicate_property(self):
self.assertEqual(sanitized['IPAddress'], 'other_value')
self.assertEqual(sanitized['array'], [{'IPAddress': 'other_value'}])

def test_prune_max_properties(self):
doc = {'prop-' + str(i): 'value' for i in range(0, 10000)}
sanitized = bigquery_schema.sanitize_property_value(doc)
self.assertEqual(len(sanitized), 10000)

# prune the 10,000'th
doc['prop-10001'] = 'value'
sanitized = bigquery_schema.sanitize_property_value(doc)
self.assertEqual(len(sanitized), 10000)

# prune last added property
doc['z'] = 'value'
sanitized = bigquery_schema.sanitize_property_value(doc)
self.assertEqual(len(sanitized), 10000)
self.assertNotIn('z', sanitized)



if __name__ == '__main__':
unittest.main()

0 comments on commit cf37d06

Please sign in to comment.