Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Translate additionalProperties API properties into arrays. #544

Merged
merged 1 commit into from
Aug 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions tools/asset-inventory/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,75 @@ This repository contains some command line tools that let you run the export/imp
```


## Input Schema Changes

Sometimes the import schema will change. This can happen upstream when a backwards incompatible change is made to an API, such as a change in the datatype of a value or when the code in the pipeline changes the import format. This can cause imports with ```write_disposition=WRITE_APPEND``` to fail with an error simliar to this:

```
"./asset_inventory/import_pipeline.py", line 422, in finish_bundle raise e
File "./asset_inventory/import_pipeline.py", line 419, in finish_bundle load_job.result()
File "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/job.py", line 733, in result return super(_AsyncJob, self).result(timeout=timeout)

File "/usr/local/lib/python3.7/site-packages/google/api_core/future/polling.py", line 127, in result raise self._exception google.api_core.exceptions.BadRequest: 400 Provided Schema does not match Table project-1:assets.run_googleapis_com_Revision. Field resource.data.spec.containers.livenessProbe.handler.exec.command has changed mode from NULLABLE to REPEATED [while running 'load_to_bigquery/load_to_bigquery']
```

To resume the import process there are three options.

### 1. Delete the dataset and recreate it.

The simplest option, this will discard all your data losing prior history but will let you continue the import. You can also try deleting the tables that fail to import if just a few of them.


```
bq rm my_dataset_name
bq mk my_dataset_name
```

### 1. Copy the data to a new dataset, delete and recreate the existing dataset.

This will preserve all your data in separate tables so no data is lost by copying the tables to a new datset using the bigquery transfer service.

```
# enable BigQuery Transfer Service (only needs to be done once)
gcloud services enable bigquerydatatransfer.googleapis.com

# create new dataset to copy tables to
bq mk my_new_dataset_name
bq mk --transfer_config --data_source=cross_region_copy --target_dataset=my_new_dataset_name --display_name=copy --params='{"source_dataset_id":"my_dataest_name","source_project_id":"my-project-id","overwrite_destination_table":"false"}'

# wait for transfer config to be completed.
bq show --transfer_config projects/123/locations/us/transferConfigs/my-tranferconfigid
....

# delete old tables by deleting and recreating the dataset.
bq rm my_dataset_name
bq mk my_dataset_name
```

### 1. Change import configurations.

to import to a new dataset, or set write_disposition=WRITE_EMPTY.
Changing the dataset the pipeline imports to will create new tables or setting write_disposition to WRITE_EMPTY (which will delete existing data) will allow imports to resume. This is done by changing either the ```config.yaml``` if using the scheduled import process or the ```--parameters`` value in the gcloud command when invoking the template via gcloud.

## Upgrading from version 1.0.0 to 2.0.0

The 2.0.0 pipeline release unfortuantely changed the import schema to resolve [issue #533](https://github.com/GoogleCloudPlatform/professional-services/issues/533). Now some user specified and dynamic properties are represented as record arrays of key value pairs rather then just flat records. This was done to keep a more regular schema and prevent accumulation of columns from overflowing table limits. This change could require changes in how you query the data for example, previously to query the App Engine traffic split across two versions you would write:

```
SELECT resource.data.split.allocations._20190115t172957,
resource.data.split.allocations._20190921t233039
FROM `asset_inventory.appengine_googleapis_com_Service`
```

The new query would look like:

```
SELECT allocations.value
FROM `asset_inventory.appengine_googleapis_com_Service` join
unnest(resource.data.split.allocations) allocations
WHERE allocations.name='_20190115t172957' or allocations.name = '_20190921t233039'
```

## Troubleshooting.

1. The Cloud Asset Inventory export operation failed with the error: "PERMISSION_DENIED. Failed to write to: gs://<my-export-path>" yet I know I have write permissions?
Expand Down
76 changes: 56 additions & 20 deletions tools/asset-inventory/asset_inventory/api_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,60 @@ def _get_properties_map_field_list(cls, property_name, property_value,
return cls._get_properties_map_field_list(
property_name, property_value['items'],
resources, seen_resources)
# we can't safely process labels or additionalProperties fields so
# skip them
# convert additionalProperties fields to a dict
# of name value pairs for a more regular schema.
if 'additionalProperties' in property_value:
fields = [{'name': 'name',
'field_type': 'STRING',
'description': 'additionalProperties name',
'mode': 'NULLABLE'}]
fields.append(
cls._property_to_field(
'value',
property_value['additionalProperties'],
resources, seen_resources))
return fields
# unknown property type.
return None

@classmethod
def _property_to_field(cls, property_name, property_value,
resources, seen_resources):
"""Convert api property to BigQuery field.

Args:
property_name: name of API property
property_value: value of the API property.
resources: dict of all other resources that might be referenced by
the API schema through reference types ($ref values).
seen_resources: dict of types we have processed to prevent endless
Returns:
BigQuery field or None if the field should be skipped.
"""
field = {'name': property_name}
property_type = property_value.get('type', None)
bigquery_type = cls._get_bigquery_type_for_property(
property_value, resources)
field['field_type'] = bigquery_type
if 'description' in property_value:
field['description'] = property_value['description'][:1024]

# array fields are BigQuery repeated fields, and convert
# additionalProperties to repeated lists of key value pairs.
if (property_type == 'array' or
'additionalProperties' in property_value):
field['mode'] = 'REPEATED'
else:
field['mode'] = 'NULLABLE'

if bigquery_type == 'RECORD':
fields_list = cls._get_properties_map_field_list(
property_name, property_value, resources, seen_resources)
if not fields_list:
return None
field['fields'] = fields_list
return field

@classmethod
def _properties_map_to_field_list(cls, properties_map, resources,
seen_resources):
Expand All @@ -198,24 +248,10 @@ def _properties_map_to_field_list(cls, properties_map, resources,
"""
fields = []
for property_name, property_value in properties_map.items():
field = {'name': property_name}
property_type = property_value.get('type', None)
bigquery_type = cls._get_bigquery_type_for_property(
property_value, resources)
field['field_type'] = bigquery_type
if 'description' in property_value:
field['description'] = property_value['description'][:1024]
if property_type == 'array':
field['mode'] = 'REPEATED'
else:
field['mode'] = 'NULLABLE'
if bigquery_type == 'RECORD':
fields_list = cls._get_properties_map_field_list(
property_name, property_value, resources, seen_resources)
if not fields_list:
continue
field['fields'] = fields_list
fields.append(field)
field = cls._property_to_field(property_name, property_value,
resources, seen_resources)
if field is not None:
fields.append(field)
return fields

@classmethod
Expand Down
9 changes: 8 additions & 1 deletion tools/asset-inventory/asset_inventory/bigquery_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ def _sanitize_property(property_name, parent, depth, num_properties):
# prune the value.
parent.pop(new_property_name)


def remove_duplicates(properties):
"""Ensure no two property in properties share the same name.

Expand Down Expand Up @@ -441,7 +442,13 @@ def enforce_schema_data_types(resource, schema):
if field_name in resource:
resource_value = resource[field_name]
if field.get('mode', 'NULLABLE') == 'REPEATED':
if not isinstance(resource_value, list):
# satisfy array condition by converting dict into
# repeated name value records.
if (field['field_type'] == 'RECORD' and
isinstance(resource_value, dict)):
resource_value = [{'name': key, 'value': val}
for (key, val) in resource_value.items()]
elif not isinstance(resource_value, list):
resource_value = [resource_value]
new_array = []
for value in resource_value:
Expand Down
2 changes: 1 addition & 1 deletion tools/asset-inventory/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

setup(
name='asset-inventory',
version='1.0.0',
version='2.0.0',
description=
'Generate Cloud Asset Inventory exports and Import To BigQuery.',
# pylint: disable=line-too-long
Expand Down
73 changes: 73 additions & 0 deletions tools/asset-inventory/tests/test_api_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,3 +270,76 @@ def test_recursive_properties(self):
discovery_doc)
schema.sort()
self.assertEqual(schema, [])

def test_string_additional_properties(self):
api_properties = {
'property-1': {
'type': 'object',
'additionalProperties': {
'type': 'string',
'description': 'description-1.'
},
'description': 'description-1'
},
}
resources = {}
schema = APISchema._properties_map_to_field_list(api_properties,
resources, {})
schema.sort()
self.assertEqual(
schema,
[{'name': 'property-1',
'field_type': 'RECORD',
'description': 'description-1',
'mode': 'REPEATED',
'fields': [{'name': 'name',
'field_type': 'STRING',
'description': 'additionalProperties name',
'mode': 'NULLABLE'},
{'name': 'value',
'field_type': 'STRING',
'description': 'description-1.',
'mode': 'NULLABLE'}]}])

def test_nested_additional_properties(self):
api_properties = {
'property-1': {
'type': 'object',
'additionalProperties': {
'$ref': 'NestedObject',
'description': 'description-1.'
},
'description': 'description-1'
},
}
resources = {
'NestedObject': {
'properties': {
'property-2': {
'type': 'string',
'description': 'description-2.'
}
}
}
}
schema = APISchema._properties_map_to_field_list(api_properties,
resources, {})
schema.sort()
self.assertEqual(
schema,
[{'name': 'property-1',
'field_type': 'RECORD',
'description': 'description-1',
'mode': 'REPEATED',
'fields': [{'name': 'name',
'field_type': 'STRING',
'description': 'additionalProperties name',
'mode': 'NULLABLE'},
{'name': 'value',
'field_type': 'RECORD',
'description': 'description-1.',
'mode': 'NULLABLE',
'fields': [{'name': 'property-2',
'field_type': 'STRING',
'description': 'description-2.',
'mode': 'NULLABLE'}]}]}])
45 changes: 45 additions & 0 deletions tools/asset-inventory/tests/test_bigquery_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,51 @@ def test_enforce_schema_data_types(self):
self.assertEqual(bigquery_schema.enforce_schema_data_types(
{'property_7': [{'property_1': 'invalid'}, 33]}, schema), {})

def test_addtional_properties_repeated_string(self):
schema = [
{'name': 'property_1',
'field_type': 'RECORD',
'description': 'description-1',
'mode': 'REPEATED',
'fields': [{'name': 'name',
'field_type': 'STRING',
'description': 'additionalProperties name',
'mode': 'NULLABLE'},
{'name': 'value',
'field_type': 'STRING',
'description': 'description-1.',
'mode': 'NULLABLE'}]}]
self.assertEqual(
bigquery_schema.enforce_schema_data_types(
{'property_1': {'key1': 'a', 'key2': 'b'}}, schema),
{'property_1': [{'name': 'key1', 'value': 'a'},
{'name': 'key2', 'value': 'b'}]})

def test_addtional_properties_repeated_record(self):
schema = [
{'name': 'property_1',
'field_type': 'RECORD',
'description': 'description-1',
'mode': 'REPEATED',
'fields': [{'name': 'name',
'field_type': 'STRING',
'description': 'additionalProperties name',
'mode': 'NULLABLE'},
{'name': 'value',
'field_type': 'RECORD',
'description': 'description-1.',
'mode': 'NULLABLE',
'fields': [{'name': 'property_2',
'field_type': 'STRING',
'description': 'description-2.',
'mode': 'NULLABLE'}]}]}]
self.assertEqual(
bigquery_schema.enforce_schema_data_types(
{'property_1': {'key1': {'property_2': 'a'},
'key2': {'property_2': 'b'}}}, schema),
{'property_1': [{'name': 'key1', 'value': {'property_2': 'a'}},
{'name': 'key2', 'value': {'property_2': 'b'}}]})

def test_remove_duplicate_property(self):
doc = {
'ipAddress': 'value',
Expand Down