Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 BigQuery destinations with partitionned/clustered keys #7240

Merged
merged 13 commits into from
Oct 25, 2021

Conversation

ChristopheDuong
Copy link
Contributor

@ChristopheDuong ChristopheDuong commented Oct 21, 2021

What

Follow-up on community contribution from @andresbravog: #7141
Adding docs and releasing #7118 as well

Closes #2579
Closes #5959 by reverting the column type back to timestamp

How

Describe the solution

Recommended reading order

  1. x.java
  2. y.python

Pre-merge Checklist

Expand the relevant checklist and delete the others.

Updating a connector

Community member or Airbyter

  • Grant edit access to maintainers (instructions)
  • Secrets in the connector's spec are annotated with airbyte_secret
  • Unit & integration tests added and passing. Community members, please provide proof of success locally e.g: screenshot or copy-paste unit, integration, and acceptance test output. To run acceptance tests for a Python connector, follow instructions in the README. For java connectors run ./gradlew :airbyte-integrations:connectors:<name>:integrationTest.
  • Code reviews completed
  • Documentation updated
    • Connector's README.md
    • Connector's bootstrap.md. See description and examples
    • Changelog updated in docs/integrations/<source or destination>/<name>.md including changelog. See changelog example
  • PR name follows PR naming conventions
  • Connector version bumped like described here

Airbyter

If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.

  • Create a non-forked branch based on this PR and test the below items on it
  • Build is successful
  • Credentials added to Github CI. Instructions.
  • /test connector=connectors/<name> command is passing.
  • New Connector version released on Dockerhub by running the /publish command described here

@ChristopheDuong ChristopheDuong marked this pull request as draft October 21, 2021 12:17
@github-actions github-actions bot added the area/connectors Connector related issues label Oct 21, 2021
@ChristopheDuong ChristopheDuong temporarily deployed to more-secrets October 21, 2021 13:07 Inactive
@ChristopheDuong
Copy link
Contributor Author

ChristopheDuong commented Oct 21, 2021

/test connector=destination-bigquery

🕑 destination-bigquery https://github.com/airbytehq/airbyte/actions/runs/1368059530
❌ destination-bigquery https://github.com/airbytehq/airbyte/actions/runs/1368059530
🐛 https://gradle.com/s/hq7kscscdheuy
🕑 destination-bigquery https://github.com/airbytehq/airbyte/actions/runs/1368059530
❌ destination-bigquery https://github.com/airbytehq/airbyte/actions/runs/1368059530
🐛 https://gradle.com/s/gdj5ub4edfpq4

@jrhizor jrhizor temporarily deployed to more-secrets October 21, 2021 13:14 Inactive
@jrhizor jrhizor temporarily deployed to more-secrets October 21, 2021 14:16 Inactive
@ChristopheDuong ChristopheDuong temporarily deployed to more-secrets October 21, 2021 19:40 Inactive
@ChristopheDuong ChristopheDuong temporarily deployed to more-secrets October 21, 2021 20:36 Inactive
@ChristopheDuong ChristopheDuong marked this pull request as ready for review October 21, 2021 20:43
@ChristopheDuong ChristopheDuong temporarily deployed to more-secrets October 21, 2021 20:43 Inactive
@ChristopheDuong
Copy link
Contributor Author

I still need to fix integrations tests and add extra ones to verify the partition migration of existing non-partitioned tables in the destination into partitioned ones.

@ChristopheDuong ChristopheDuong temporarily deployed to more-secrets October 21, 2021 20:51 Inactive
Copy link
Contributor

@sherifnada sherifnada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ChristopheDuong could you fill out the connector checklist? The one thing that stood out is we should report this change in the .md file for BQ connectors to describe the change

@@ -16,6 +16,7 @@ dependencies {
implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
implementation project(':airbyte-integrations:connectors:source-relational-db')
implementation project(':airbyte-integrations:connectors:source-mongodb-v2')
implementation 'org.mongodb:mongodb-driver-sync:4.3.0'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how come this was added here? was it just missing from before?

Copy link
Contributor Author

@ChristopheDuong ChristopheDuong Oct 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know how you build the project in the IDE but when I "Build project" it does a global ./gradlew build

and so, without this line (master branch), it fails on
Screenshot 2021-10-22 at 09 09 46

@@ -13,6 +13,11 @@ dependencies {
implementation 'com.google.cloud:google-cloud-bigquery:1.122.2'
implementation 'org.apache.commons:commons-lang3:3.11'

// csv
implementation 'com.amazonaws:aws-java-sdk-s3:1.11.978'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why were these added?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

implementation 'org.apache.commons:commons-csv:1.4' is needed by writeRecordToCsv to use the CsvPrinter similarly to how it is used in destination-gcs

the rest seems unnecessary...

Is that correct @andresbravog?

Copy link
Contributor

@andresbravog andresbravog Oct 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is correct! On my tests, it was not compiling without the other libs.

// Copying data from partitioned tmp table into a non-partitioned table does not make it
// partitioned... we need to force re-create from 0...
bigquery.delete(destinationTableId);
copyTable(bigquery, tmpPartitionTableId, destinationTableId, WriteDisposition.WRITE_EMPTY);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we be copying data from the destinationTableId into tmpPartitionedTableId instead of the other way around?

Copy link
Contributor Author

@ChristopheDuong ChristopheDuong Oct 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

first, it does a create table ... as select ... from which is a copy from destinationTableId into tmpPartitionedTableId (that does converts into Partitioned from Unpartitioned). But BigQuery copy jobs don't transfer table partition modes when copying (thus the SQL approach instead):

Finally, we simply need to "rename" the tmp back to destinationTableId (and make a last simple delete/copy for that)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, so we:

  1. Create an empty partitioned tmpPartitionedTable as select * from destinationTable
  2. delete destination table
  3. copy tmpPartitionedTable into destination table
  4. copy new data from the tmp table (this tmp contains new data from the sync) into the destinationTable

protected void writeRecordToCsv(final GcsCsvWriter gcsCsvWriter, final AirbyteRecordMessage recordMessage) {
// Bigquery represents TIMESTAMP to the microsecond precision, so we convert to microseconds then
// use BQ helpers to string-format correctly.
final long emittedAtMicroseconds = TimeUnit.MICROSECONDS.convert(recordMessage.getEmittedAt(), TimeUnit.MILLISECONDS);
Copy link
Contributor Author

@ChristopheDuong ChristopheDuong Oct 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this other PR, it's converted into seconds instead of microseconds? These changes should be somehow equivalent, right?

WDYT @etsybaev @andresbravog?
https://github.com/airbytehq/airbyte/pull/5981/files#r734312334

@ChristopheDuong ChristopheDuong temporarily deployed to more-secrets October 22, 2021 09:07 Inactive
@ChristopheDuong
Copy link
Contributor Author

ChristopheDuong commented Oct 22, 2021

/test connector=destination-bigquery

🕑 destination-bigquery https://github.com/airbytehq/airbyte/actions/runs/1371545426
✅ destination-bigquery https://github.com/airbytehq/airbyte/actions/runs/1371545426
Python tests coverage:

	 ---------- coverage: platform linux, python 3.8.10-final-0 -----------
	 Name                                                              Stmts   Miss  Cover
	 -------------------------------------------------------------------------------------
	 main_dev_transform_catalog.py                                         3      3     0%
	 main_dev_transform_config.py                                          3      3     0%
	 normalization/__init__.py                                             4      0   100%
	 normalization/destination_type.py                                    12      0   100%
	 normalization/transform_catalog/__init__.py                           2      0   100%
	 normalization/transform_catalog/catalog_processor.py                143     77    46%
	 normalization/transform_catalog/destination_name_transformer.py     120      6    95%
	 normalization/transform_catalog/reserved_keywords.py                 11      0   100%
	 normalization/transform_catalog/stream_processor.py                 370    218    41%
	 normalization/transform_catalog/table_name_registry.py              174     34    80%
	 normalization/transform_catalog/transform.py                         45     26    42%
	 normalization/transform_catalog/utils.py                             33      7    79%
	 normalization/transform_config/__init__.py                            2      0   100%
	 normalization/transform_config/transform.py                         140     29    79%
	 -------------------------------------------------------------------------------------
	 TOTAL                                                              1062    403    62%

@jrhizor jrhizor temporarily deployed to more-secrets October 22, 2021 09:17 Inactive
@ChristopheDuong
Copy link
Contributor Author

ChristopheDuong commented Oct 22, 2021

/test connector=destination-bigquery-denormalized

🕑 destination-bigquery-denormalized https://github.com/airbytehq/airbyte/actions/runs/1371844008
✅ destination-bigquery-denormalized https://github.com/airbytehq/airbyte/actions/runs/1371844008
No Python unittests run

@jrhizor jrhizor temporarily deployed to more-secrets October 22, 2021 10:48 Inactive
@ChristopheDuong ChristopheDuong temporarily deployed to more-secrets October 22, 2021 12:02 Inactive
@github-actions github-actions bot added the area/documentation Improvements or additions to documentation label Oct 22, 2021
@ChristopheDuong ChristopheDuong temporarily deployed to more-secrets October 22, 2021 12:32 Inactive
@@ -148,7 +155,8 @@ Therefore, Airbyte BigQuery destination will convert any invalid characters into

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.4.0 | 2021-10-04 | [\#6733](https://github.com/airbytehq/airbyte/issues/6733) | Support dataset starting with numbers |
| 0.4.2 | 2021-10-26 | [\#7240](https://github.com/airbytehq/airbyte/issues/7240) | Output partitioned/clustered tables |
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a minor release, since it reverts the type back to timestamp?

Suggested change
| 0.4.2 | 2021-10-26 | [\#7240](https://github.com/airbytehq/airbyte/issues/7240) | Output partitioned/clustered tables |
| 0.5.0 | 2021-10-26 | [\#7240](https://github.com/airbytehq/airbyte/issues/7240) | Output partitioned/clustered tables |

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's no functional difference (the app allows you to upgrade to whatever version) but that seems more "tidy"

// Copying data from partitioned tmp table into a non-partitioned table does not make it
// partitioned... we need to force re-create from 0...
bigquery.delete(destinationTableId);
copyTable(bigquery, tmpPartitionTableId, destinationTableId, WriteDisposition.WRITE_EMPTY);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, so we:

  1. Create an empty partitioned tmpPartitionedTable as select * from destinationTable
  2. delete destination table
  3. copy tmpPartitionedTable into destination table
  4. copy new data from the tmp table (this tmp contains new data from the sync) into the destinationTable

@@ -148,7 +155,8 @@ Therefore, Airbyte BigQuery destination will convert any invalid characters into

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.4.0 | 2021-10-04 | [\#6733](https://github.com/airbytehq/airbyte/issues/6733) | Support dataset starting with numbers |
| 0.4.2 | 2021-10-26 | [\#7240](https://github.com/airbytehq/airbyte/issues/7240) | Output partitioned/clustered tables |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's no functional difference (the app allows you to upgrade to whatever version) but that seems more "tidy"

* **Google BigQuery client chunk size**: Google BigQuery client's chunk\(buffer\) size \(MIN=1, MAX = 15\) for each table. The default 15MiB value is used if not set explicitly. It's recommended to decrease value for big data sets migration for less HEAP memory consumption and avoiding crashes. For more details refer to [https://googleapis.dev/python/bigquery/latest/generated/google.cloud.bigquery.client.Client.html](https://googleapis.dev/python/bigquery/latest/generated/google.cloud.bigquery.client.Client.html)
* **Transformation Priority**: configure the priority of queries run for transformations. Refer to [https://cloud.google.com/bigquery/docs/running-queries](https://cloud.google.com/bigquery/docs/running-queries). By default, Airbyte runs interactive query jobs on BigQuery, which means that the query is executed as soon as possible and count towards daily concurrent quotas and limits. If set to use batch query on your behalf, BigQuery starts the query as soon as idle resources are available in the BigQuery shared resource pool. This usually occurs within a few minutes. If BigQuery hasn't started the query within 24 hours, BigQuery changes the job priority to interactive. Batch queries don't count towards your concurrent rate limit, which can make it easier to start many queries at once.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when configuring this option, the only impact on Airbyte's sync is it's longer right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes and it's probably cheaper with more concurrent load as a trade-off

@@ -296,4 +305,68 @@ private void assertTmpTablesNotPresent(final List<String> tableNames) throws Int
.collect(Collectors.toList());
}

@Test
void testWritePartitionOverUnpartitioned() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

love me a good test case 💪🏼

@ChristopheDuong ChristopheDuong temporarily deployed to more-secrets October 25, 2021 08:33 Inactive
@ChristopheDuong
Copy link
Contributor Author

ChristopheDuong commented Oct 25, 2021

/publish connector=connectors/destination-bigquery

🕑 connectors/destination-bigquery https://github.com/airbytehq/airbyte/actions/runs/1380240051
✅ connectors/destination-bigquery https://github.com/airbytehq/airbyte/actions/runs/1380240051

@ChristopheDuong
Copy link
Contributor Author

ChristopheDuong commented Oct 25, 2021

/publish connector=connectors/destination-bigquery-denormalized

🕑 connectors/destination-bigquery-denormalized https://github.com/airbytehq/airbyte/actions/runs/1380240541
✅ connectors/destination-bigquery-denormalized https://github.com/airbytehq/airbyte/actions/runs/1380240541

@jrhizor jrhizor temporarily deployed to more-secrets October 25, 2021 08:38 Inactive
@jrhizor jrhizor temporarily deployed to more-secrets October 25, 2021 08:38 Inactive
@ChristopheDuong ChristopheDuong temporarily deployed to more-secrets October 25, 2021 09:13 Inactive
@ChristopheDuong ChristopheDuong merged commit 27df558 into master Oct 25, 2021
@ChristopheDuong ChristopheDuong deleted the chris/bq-partitions branch October 25, 2021 10:41
schlattk pushed a commit to schlattk/airbyte that referenced this pull request Jan 4, 2022
)

* [ airbytehq#5959 ][ airbytehq#2579 ] Add support of partitioned tables by _airbyte_emitted_at field (airbytehq#7141)

Co-authored-by: Andrés Bravo <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
5 participants