Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nested record fields failing to load with storage_write #71

Open
alexcartwright opened this issue Oct 12, 2023 · 2 comments
Open

Nested record fields failing to load with storage_write #71

alexcartwright opened this issue Oct 12, 2023 · 2 comments

Comments

@alexcartwright
Copy link

I am experiencing an issue with loading records which have nested fields using BigQueryStorageWriteDenormalizedSink

To replicate error:

cat outfile.jsonl | meltano invoke target-bigquery

Using outfile:

{"type":"SCHEMA","stream":"invoices","schema":{"properties":{"Type":{"type":["null","string"]},"Contact":{"properties":{"ContactID":{"type":["string"]},"Name":{"type":["null","string"]}},"type":["null","object"],"additionalProperties":false},"InvoiceID":{"type":["string"]}},"type":["null","object"],"additionalProperties":false},"key_properties":["InvoiceID"]} {"type": "RECORD", "stream": "invoices", "record": {"Type": "ACCPAY", "InvoiceID": "ec24df0c-14e4-4e31-ac63-4f7d50dd8312", "Contact": {"ContactID": "29fb219f-823d-42cd-aaf7-7b35479a627a", "Name": "A contact name"}}}

And meltano config:

method: storage_write_api
denormalized: true
batch_size: 500
upsert: true
column_name_transforms.snake_case: false

The process hangs with the last three lines on the command line reading as follows:

Target 'target-bigquery' completed reading 2 lines of input (1 records, (0 batch manifests, 0 state messages).
Adding worker 8f674397a7014e528bcf90bb24a95b68
Thread-ConsumeBidirectionalStream exiting

However, it never exits and BigQuery contains two invoice tables, with the second being the temporary table created as part of the worker process. Not yet had a chance to dive into the code. Will post update if I find what's causing it

@alexcartwright
Copy link
Author

UPDATE:

I traced the issue above to the `AppendRowsRequest' which contains a reference to the Contact message, but doesn't send the schema for it. Also found a helpful related issue on Stackoverflow How to use nested proto message with BigQuery storage api writer.

After trial and error and a few false turns (am new to python, threads and protocol buffers) my solution was to transform the proto_descriptor after line 120 in generate_template by searching for nested field.message_type (recursively) and adding their schema as a nested_type. It solves my the specific issue above, but currently failing tests on a larger dataset (possibly for unrelated reasons...) Will post when bottomed out, but happy to share my interim solution, if anyone is struggling with same problem in the meantime.

@alexcartwright
Copy link
Author

Sharing the code that worked for me to fix the above issue.

Replace one line in generate_template function of storage_write.py to the following:

def generate_template(message: Type[Message]):
    """Generate a template for the storage write API from a proto message class."""
    from google.protobuf import descriptor_pb2

    template, proto_schema, proto_descriptor, proto_data = (
        types.AppendRowsRequest(),
        types.ProtoSchema(),
        descriptor_pb2.DescriptorProto(),
        types.AppendRowsRequest.ProtoData(),
    )
    ## message.DESCRIPTOR.CopyToProto(proto_descriptor) ## REPLACED THIS LINE WITH THE LINE BELOW
   proto_descriptor = CopyFromNestedDescriptor(message.DESCRIPTOR)

    proto_schema.proto_descriptor = proto_descriptor
    proto_data.writer_schema = proto_schema
    template.proto_rows = proto_data
    return template

Created and added the following CopyFromNestedDescriptor definition:

def CopyFromNestedDescriptor(descriptor):

    from google.protobuf import descriptor_pb2

    descriptor_proto = descriptor_pb2.DescriptorProto()
    descriptor.CopyToProto(descriptor_proto)

    def _transform_proto_descriptor(descriptor, descriptor_proto, local_path, nested = set()):
        
        ix = 0
        for field in descriptor.fields: 
            if field.message_type:
                
                nested_proto = descriptor_pb2.DescriptorProto()
                field.message_type.CopyToProto(nested_proto)
                nested_type_full_name = local_path[-1] + '.' + nested_proto.name
                descriptor_proto.field[ix].type_name = nested_type_full_name

                local_path.append(nested_type_full_name)
                _transform_proto_descriptor(field.message_type, nested_proto, local_path, nested)
                local_path.pop()

                if not nested_type_full_name in nested:

                   descriptor_proto.nested_type.add().MergeFrom(nested_proto)
                   nested.add(nested_type_full_name)    

            ix += 1

    _transform_proto_descriptor(descriptor, descriptor_proto, [descriptor_proto.name])

    return descriptor_proto

The other issue I hit related to the cached copy of the proto_schema class returned by the proto_schema property of the BigQueryStorageWriteSink class, which fails intermittently with nested RECORDs. Some runs are successful and others fail, which made it challenging to isolate. However, I ultimately tracked it down to a silent exception thrown by an attempt to get the Descriptor for the sub_messsage in json_format.ParseDict when called by the process_run method of BigQueryStorageWriteSink.

Whilst the solution to the ultimate cause is above my pay grade, a change to the definition of the proto_schema property to avoid returning the cached copy worked for my use case:

    @property
    def proto_schema(self) -> Type[Message]:
        if not hasattr(self, "_proto_schemaZZZ"): ## ADDED ZZZ SO IT WOULD NOT USE THE CACHED COPY
            self._proto_schema = proto_schema_factory_v2(
                self.table.get_resolved_schema(self.apply_transforms)
            )
        return self._proto_schema

My guess is it's either a threading issue or the underlying upb implementation of Protocol buffers.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant