Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 BigQuery destination: use serialized buffer for gcs staging #11776

Merged
merged 12 commits into from
Apr 7, 2022

Conversation

tuliren
Copy link
Contributor

@tuliren tuliren commented Apr 7, 2022

What

How

  • The BigQuery direct uploading destination is unchanged.
  • The GCS staging destination uses the BufferedStreamConsumer, and reuses some of the BigQuery uploader, BigQueryUtils and BigQueryRecordFormatter logic.
  • A new BigQueryStagingOperations interface is introduced to handle the staging operations involving the BigQuery client. It is similar to StagingOperations. But since BigQuery requires lots of unique logic, a new interface is necessary. The implementation wraps the behaviors from the BigQuery uploaders.

Recommended reading order

  1. BigQueryDestination.java
  2. BigQueryStagingConsumerFactory.java
  3. BigQueryStagingOperations.java

🚨 User Impact 🚨

None expected.

TODO

  • Fix integration test for BigQuery denormalized destination.
  • Remove deprecated uploader code.

Pre-merge Checklist

Updating a connector

Community member or Airbyter

  • Grant edit access to maintainers (instructions)
  • Secrets in the connector's spec are annotated with airbyte_secret
  • Unit & integration tests added and passing. Community members, please provide proof of success locally e.g: screenshot or copy-paste unit, integration, and acceptance test output. To run acceptance tests for a Python connector, follow instructions in the README. For java connectors run ./gradlew :airbyte-integrations:connectors:<name>:integrationTest.
  • Code reviews completed
  • Documentation updated
    • Connector's README.md
    • Connector's bootstrap.md. See description and examples
    • Changelog updated in docs/integrations/<source or destination>/<name>.md including changelog. See changelog example
  • PR name follows PR naming conventions

Airbyter

If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.

  • Create a non-forked branch based on this PR and test the below items on it
  • Build is successful
  • If new credentials are required for use in CI, add them to GSM. Instructions.
  • /test connector=connectors/<name> command is passing
  • New Connector version released on Dockerhub by running the /publish command described here
  • After the new connector version is published, connector version bumped in the seed directory as described here
  • Seed specs have been re-generated by building the platform and committing the changes to the seed spec files, as described here

@github-actions github-actions bot added the area/connectors Connector related issues label Apr 7, 2022
@tuliren
Copy link
Contributor Author

tuliren commented Apr 7, 2022

/test connector=connectors/destination-bigquery

🕑 connectors/destination-bigquery https://github.com/airbytehq/airbyte/actions/runs/2106256610
✅ connectors/destination-bigquery https://github.com/airbytehq/airbyte/actions/runs/2106256610
Python tests coverage:

Name                                                                                                                            Stmts   Miss  Cover
---------------------------------------------------------------------------------------------------------------------------------------------------
normalization/transform_config/__init__.py                                                                                          2      0   100%
normalization/transform_catalog/reserved_keywords.py                                                                               13      0   100%
normalization/transform_catalog/__init__.py                                                                                         2      0   100%
normalization/destination_type.py                                                                                                  13      0   100%
normalization/__init__.py                                                                                                           4      0   100%
/actions-runner/_work/airbyte/airbyte/airbyte-integrations/bases/airbyte-protocol/airbyte_protocol/models/airbyte_protocol.py     124      0   100%
/actions-runner/_work/airbyte/airbyte/airbyte-integrations/bases/airbyte-protocol/airbyte_protocol/models/__init__.py               1      0   100%
/actions-runner/_work/airbyte/airbyte/airbyte-integrations/bases/airbyte-protocol/airbyte_protocol/__init__.py                      2      0   100%
normalization/transform_catalog/destination_name_transformer.py                                                                   155      8    95%
normalization/transform_config/transform.py                                                                                       168     31    82%
normalization/transform_catalog/table_name_registry.py                                                                            174     34    80%
normalization/transform_catalog/utils.py                                                                                           33      7    79%
normalization/transform_catalog/catalog_processor.py                                                                              143     77    46%
normalization/transform_catalog/transform.py                                                                                       45     26    42%
normalization/transform_catalog/stream_processor.py                                                                               524    337    36%
---------------------------------------------------------------------------------------------------------------------------------------------------
TOTAL                                                                                                                            1403    520    63%

@github-actions github-actions bot added the area/documentation Improvements or additions to documentation label Apr 7, 2022
@tuliren
Copy link
Contributor Author

tuliren commented Apr 7, 2022

/test connector=connectors/destination-bigquery

🕑 connectors/destination-bigquery https://github.com/airbytehq/airbyte/actions/runs/2108110638
✅ connectors/destination-bigquery https://github.com/airbytehq/airbyte/actions/runs/2108110638
Python tests coverage:

Name                                                                                                                            Stmts   Miss  Cover
---------------------------------------------------------------------------------------------------------------------------------------------------
normalization/transform_config/__init__.py                                                                                          2      0   100%
normalization/transform_catalog/reserved_keywords.py                                                                               13      0   100%
normalization/transform_catalog/__init__.py                                                                                         2      0   100%
normalization/destination_type.py                                                                                                  13      0   100%
normalization/__init__.py                                                                                                           4      0   100%
/actions-runner/_work/airbyte/airbyte/airbyte-integrations/bases/airbyte-protocol/airbyte_protocol/models/airbyte_protocol.py     124      0   100%
/actions-runner/_work/airbyte/airbyte/airbyte-integrations/bases/airbyte-protocol/airbyte_protocol/models/__init__.py               1      0   100%
/actions-runner/_work/airbyte/airbyte/airbyte-integrations/bases/airbyte-protocol/airbyte_protocol/__init__.py                      2      0   100%
normalization/transform_catalog/destination_name_transformer.py                                                                   155      8    95%
normalization/transform_config/transform.py                                                                                       168     31    82%
normalization/transform_catalog/table_name_registry.py                                                                            174     34    80%
normalization/transform_catalog/utils.py                                                                                           33      7    79%
normalization/transform_catalog/catalog_processor.py                                                                              143     77    46%
normalization/transform_catalog/transform.py                                                                                       45     26    42%
normalization/transform_catalog/stream_processor.py                                                                               524    337    36%
---------------------------------------------------------------------------------------------------------------------------------------------------
TOTAL                                                                                                                            1403    520    63%

@tuliren
Copy link
Contributor Author

tuliren commented Apr 7, 2022

/test connector=connectors/destination-bigquery-denormalized

🕑 connectors/destination-bigquery-denormalized https://github.com/airbytehq/airbyte/actions/runs/2108111466
✅ connectors/destination-bigquery-denormalized https://github.com/airbytehq/airbyte/actions/runs/2108111466
Python tests coverage:

Name                                                                                                                            Stmts   Miss  Cover
---------------------------------------------------------------------------------------------------------------------------------------------------
normalization/transform_config/__init__.py                                                                                          2      0   100%
normalization/transform_catalog/reserved_keywords.py                                                                               13      0   100%
normalization/transform_catalog/__init__.py                                                                                         2      0   100%
normalization/destination_type.py                                                                                                  13      0   100%
normalization/__init__.py                                                                                                           4      0   100%
/actions-runner/_work/airbyte/airbyte/airbyte-integrations/bases/airbyte-protocol/airbyte_protocol/models/airbyte_protocol.py     124      0   100%
/actions-runner/_work/airbyte/airbyte/airbyte-integrations/bases/airbyte-protocol/airbyte_protocol/models/__init__.py               1      0   100%
/actions-runner/_work/airbyte/airbyte/airbyte-integrations/bases/airbyte-protocol/airbyte_protocol/__init__.py                      2      0   100%
normalization/transform_catalog/destination_name_transformer.py                                                                   155      8    95%
normalization/transform_config/transform.py                                                                                       168     31    82%
normalization/transform_catalog/table_name_registry.py                                                                            174     34    80%
normalization/transform_catalog/utils.py                                                                                           33      7    79%
normalization/transform_catalog/catalog_processor.py                                                                              143     77    46%
normalization/transform_catalog/transform.py                                                                                       45     26    42%
normalization/transform_catalog/stream_processor.py                                                                               524    337    36%
---------------------------------------------------------------------------------------------------------------------------------------------------
TOTAL                                                                                                                            1403    520    63%

@@ -12,9 +12,8 @@ application {
dependencies {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot to mention this on the previous S3/GCS PR but can we make the integration tests of warehouses destinations (bigquery / snowflake / etc in the future) dependent from blob storage destinations that are used underneath for staging?

So if we run ./gradlew :airbyte-integrations:connectors:destination-gcs:integrationTest, it should also run: ./gradlew :airbyte-integrations:connectors:destination-bigquery*:integrationTest, etc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. I created an issue here: #11815

Comment on lines +26 to +30
/**
* This class differs from {@link AvroSerializedBuffer} in that 1) the Avro schema can be customized
* by the caller, and 2) the message is formatted by {@link BigQueryRecordFormatter}. In this way,
* this buffer satisfies the needs of both the standard and the denormalized BigQuery destinations.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we introduce a common concept to handle this though?

If I understand correctly, this sounds like the CsvSheetGenerator?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point, only BigQuery needs a separate implementation, because the BigQuery schema is a bit hacky. I doubt if other destinations will ever need this. So I think something more generic is not necessary. Maybe let's wait until there is another case use?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👌

return getRecordConsumer(getUploaderMap(config, catalog), outputRecordCollector);
final UploadingMethod uploadingMethod = BigQueryUtils.getLoadingMethod(config);
if (uploadingMethod == UploadingMethod.STANDARD) {
return getStandardRecordConsumer(config, catalog, outputRecordCollector);
Copy link
Contributor

@ChristopheDuong ChristopheDuong Apr 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, should there be some warning printed in the logs to warn users and recommend using staging options rather than direct upload methods though? (especially when dealing with larger streams)

(we could even suggest which format should be used to stage the data in the message if that's exposed to users?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's always staging in avro...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do. I will also update the document to mention that.

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BigQueryStagingConsumerFactory {
Copy link
Contributor

@ChristopheDuong ChristopheDuong Apr 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it would be possible to get a common staging consumer factory from snowflake and bigquery?

we probably don't want to have their own staging consumer factory for each destination moving forward, no? (redshift, etc)

This could definitely be another PR though

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did try to reuse the existing factory or modify the Snowflake factory to fit in BigQuery. However, it complicates the factory a lot, because BigQuery is different and it has its own client. I don't think we need another consumer factory for destinations like Redshift, because most destinations are JDBC based, and fit into the current StagingConsumerFactory.

/**
* @param datasetId the dataset ID is equivalent to output schema
*/
public record BigQueryWriteConfig(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there might be opportunity to DRY this too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class also has many BigQuery-specific logic (e.g. TableId). So I'd rather not to merge it with the other write configs.

@tuliren tuliren requested a review from subodh1810 April 7, 2022 16:37
@tuliren
Copy link
Contributor Author

tuliren commented Apr 7, 2022

/publish connector=connectors/destination-bigquery

🕑 connectors/destination-bigquery https://github.com/airbytehq/airbyte/actions/runs/2111504304
🚀 Successfully published connectors/destination-bigquery
❌ Couldn't auto-bump version for connectors/destination-bigquery

@tuliren
Copy link
Contributor Author

tuliren commented Apr 7, 2022

/publish connector=connectors/destination-bigquery-denormalized

🕑 connectors/destination-bigquery-denormalized https://github.com/airbytehq/airbyte/actions/runs/2111504603
🚀 Successfully published connectors/destination-bigquery-denormalized
🚀 Auto-bumped version for connectors/destination-bigquery-denormalized
✅ connectors/destination-bigquery-denormalized https://github.com/airbytehq/airbyte/actions/runs/2111504603

@octavia-squidington-iii octavia-squidington-iii temporarily deployed to more-secrets April 7, 2022 21:34 Inactive
@octavia-squidington-iii octavia-squidington-iii temporarily deployed to more-secrets April 7, 2022 21:34 Inactive
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/connectors Connector related issues area/documentation Improvements or additions to documentation
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Apply buffering changes to BigQuery Destination when using staging
3 participants