-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
🎉 BigQuery destinations with partitionned/clustered keys #7240
Changes from all commits
121d744
6d02bf5
019e95d
ee8207c
cd6437c
d4ab94e
392c41c
e3aa2a5
ba9767d
ea4caeb
bb7f7e5
c0ba015
20a8334
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,11 +14,13 @@ | |
import com.google.cloud.bigquery.BigQueryException; | ||
import com.google.cloud.bigquery.CopyJobConfiguration; | ||
import com.google.cloud.bigquery.CsvOptions; | ||
import com.google.cloud.bigquery.Field; | ||
import com.google.cloud.bigquery.Job; | ||
import com.google.cloud.bigquery.JobInfo; | ||
import com.google.cloud.bigquery.JobInfo.CreateDisposition; | ||
import com.google.cloud.bigquery.JobInfo.WriteDisposition; | ||
import com.google.cloud.bigquery.LoadJobConfiguration; | ||
import com.google.cloud.bigquery.QueryJobConfiguration; | ||
import com.google.cloud.bigquery.QueryParameterValue; | ||
import com.google.cloud.bigquery.Schema; | ||
import com.google.cloud.bigquery.TableDataWriteChannel; | ||
|
@@ -27,6 +29,7 @@ | |
import com.google.common.collect.ImmutableMap; | ||
import io.airbyte.commons.json.Jsons; | ||
import io.airbyte.commons.lang.Exceptions; | ||
import io.airbyte.commons.string.Strings; | ||
import io.airbyte.integrations.base.AirbyteMessageConsumer; | ||
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; | ||
import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer; | ||
|
@@ -119,7 +122,7 @@ public void acceptTracked(final AirbyteMessage message) throws IOException { | |
} else { | ||
// GCS uploading way, this data will be moved to bigquery in close method | ||
final GcsCsvWriter gcsCsvWriter = writer.getGcsCsvWriter(); | ||
gcsCsvWriter.write(UUID.randomUUID(), recordMessage); | ||
writeRecordToCsv(gcsCsvWriter, recordMessage); | ||
} | ||
} else { | ||
LOGGER.warn("Unexpected message: " + message.getType()); | ||
|
@@ -138,6 +141,23 @@ protected JsonNode formatRecord(final Schema schema, final AirbyteRecordMessage | |
JavaBaseConstants.COLUMN_NAME_EMITTED_AT, formattedEmittedAt)); | ||
} | ||
|
||
protected void writeRecordToCsv(final GcsCsvWriter gcsCsvWriter, final AirbyteRecordMessage recordMessage) { | ||
// Bigquery represents TIMESTAMP to the microsecond precision, so we convert to microseconds then | ||
// use BQ helpers to string-format correctly. | ||
final long emittedAtMicroseconds = TimeUnit.MICROSECONDS.convert(recordMessage.getEmittedAt(), TimeUnit.MILLISECONDS); | ||
final String formattedEmittedAt = QueryParameterValue.timestamp(emittedAtMicroseconds).getValue(); | ||
final JsonNode formattedData = StandardNameTransformer.formatJsonPath(recordMessage.getData()); | ||
try { | ||
gcsCsvWriter.getCsvPrinter().printRecord( | ||
UUID.randomUUID().toString(), | ||
formattedEmittedAt, | ||
Jsons.serialize(formattedData)); | ||
} catch (IOException e) { | ||
e.printStackTrace(); | ||
LOGGER.warn("An error occurred writing CSV file."); | ||
} | ||
} | ||
|
||
@Override | ||
public void close(final boolean hasFailed) { | ||
LOGGER.info("Started closing all connections"); | ||
|
@@ -181,7 +201,7 @@ private void closeGcsStreamsAndCopyDataToBigQuery(final boolean hasFailed) { | |
try { | ||
loadCsvFromGcsTruncate(pair); | ||
} catch (final Exception e) { | ||
LOGGER.error("Failed to load data from GCS CSV file to BibQuery tmp table with reason: " + e.getMessage()); | ||
LOGGER.error("Failed to load data from GCS CSV file to BigQuery tmp table with reason: " + e.getMessage()); | ||
throw new RuntimeException(e); | ||
} | ||
}); | ||
|
@@ -198,7 +218,7 @@ private void loadCsvFromGcsTruncate(final BigQueryWriteConfig bigQueryWriteConfi | |
|
||
// Initialize client that will be used to send requests. This client only needs to be created | ||
// once, and can be reused for multiple requests. | ||
LOGGER.info(String.format("Started coping data from %s GCS csv file to %s tmp BigQuery table with schema: \n %s", | ||
LOGGER.info(String.format("Started copying data from %s GCS csv file to %s tmp BigQuery table with schema: \n %s", | ||
csvFile, tmpTable, schema)); | ||
|
||
final CsvOptions csvOptions = CsvOptions.newBuilder().setEncoding(UTF8).setSkipLeadingRows(1).build(); | ||
|
@@ -215,7 +235,7 @@ private void loadCsvFromGcsTruncate(final BigQueryWriteConfig bigQueryWriteConfi | |
// Load the table | ||
final Job loadJob = bigquery.create(JobInfo.of(configuration)); | ||
|
||
LOGGER.info("Crated a new job GCS csv file to tmp BigQuery table: " + loadJob); | ||
LOGGER.info("Created a new job GCS csv file to tmp BigQuery table: " + loadJob); | ||
LOGGER.info("Waiting for job to complete..."); | ||
|
||
// Load data from a GCS parquet file into the table | ||
|
@@ -272,15 +292,20 @@ private void closeNormalBigqueryStreams(final boolean hasFailed) { | |
})); | ||
|
||
if (!hasFailed) { | ||
LOGGER.info("Migration finished with no explicit errors. Copying data from tmp tables to permanent"); | ||
LOGGER.info("Replication finished with no explicit errors. Copying data from tmp tables to permanent"); | ||
writeConfigs.values() | ||
.forEach( | ||
bigQueryWriteConfig -> copyTable(bigquery, bigQueryWriteConfig.getTmpTable(), bigQueryWriteConfig.getTable(), | ||
bigQueryWriteConfig.getSyncMode())); | ||
bigQueryWriteConfig -> { | ||
if (bigQueryWriteConfig.getSyncMode().equals(WriteDisposition.WRITE_APPEND)) { | ||
partitionIfUnpartitioned(bigQueryWriteConfig, bigquery, bigQueryWriteConfig.getTable()); | ||
} | ||
copyTable(bigquery, bigQueryWriteConfig.getTmpTable(), bigQueryWriteConfig.getTable(), | ||
bigQueryWriteConfig.getSyncMode()); | ||
}); | ||
// BQ is still all or nothing if a failure happens in the destination. | ||
outputRecordCollector.accept(lastStateMessage); | ||
} else { | ||
LOGGER.warn("Had errors while migrations"); | ||
LOGGER.warn("Had errors while replicating"); | ||
} | ||
} finally { | ||
// clean up tmp tables; | ||
|
@@ -324,7 +349,6 @@ private static void copyTable( | |
final TableId sourceTableId, | ||
final TableId destinationTableId, | ||
final WriteDisposition syncMode) { | ||
|
||
final CopyJobConfiguration configuration = CopyJobConfiguration.newBuilder(destinationTableId, sourceTableId) | ||
.setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) | ||
.setWriteDisposition(syncMode) | ||
|
@@ -336,7 +360,67 @@ private static void copyTable( | |
LOGGER.error("Failed on copy tables with error:" + job.getStatus()); | ||
throw new RuntimeException("BigQuery was unable to copy table due to an error: \n" + job.getStatus().getError()); | ||
} | ||
LOGGER.info("successfully copied tmp table: {} to final table: {}", sourceTableId, destinationTableId); | ||
LOGGER.info("successfully copied table: {} to table: {}", sourceTableId, destinationTableId); | ||
} | ||
|
||
private void partitionIfUnpartitioned(final BigQueryWriteConfig bigQueryWriteConfig, | ||
final BigQuery bigquery, | ||
final TableId destinationTableId) { | ||
try { | ||
final QueryJobConfiguration queryConfig = QueryJobConfiguration | ||
.newBuilder( | ||
String.format("SELECT max(is_partitioning_column) as is_partitioned FROM `%s.%s.INFORMATION_SCHEMA.COLUMNS` WHERE TABLE_NAME = '%s';", | ||
bigquery.getOptions().getProjectId(), | ||
destinationTableId.getDataset(), | ||
destinationTableId.getTable())) | ||
.setUseLegacySql(false) | ||
.build(); | ||
final ImmutablePair<Job, String> result = BigQueryUtils.executeQuery(bigquery, queryConfig); | ||
result.getLeft().getQueryResults().getValues().forEach(row -> { | ||
if (!row.get("is_partitioned").isNull() && row.get("is_partitioned").getStringValue().equals("NO")) { | ||
LOGGER.info("Partitioning existing destination table {}", destinationTableId); | ||
final String tmpPartitionTable = Strings.addRandomSuffix("_airbyte_partitioned_table", "_", 5); | ||
final TableId tmpPartitionTableId = TableId.of(destinationTableId.getDataset(), tmpPartitionTable); | ||
sherifnada marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// make sure tmpPartitionTable does not already exist | ||
bigquery.delete(tmpPartitionTableId); | ||
// Use BigQuery SQL to copy because java api copy jobs does not support creating a table from a | ||
// select query, see: | ||
// https://cloud.google.com/bigquery/docs/creating-partitioned-tables#create_a_partitioned_table_from_a_query_result | ||
final QueryJobConfiguration partitionQuery = QueryJobConfiguration | ||
.newBuilder( | ||
getCreatePartitionedTableFromSelectQuery(bigQueryWriteConfig.getSchema(), bigquery.getOptions().getProjectId(), destinationTableId, | ||
tmpPartitionTable)) | ||
.setUseLegacySql(false) | ||
.build(); | ||
BigQueryUtils.executeQuery(bigquery, partitionQuery); | ||
// Copying data from a partitioned tmp table into an existing non-partitioned table does not make it | ||
// partitioned... thus, we force re-create from scratch by completely deleting and creating new | ||
// table. | ||
bigquery.delete(destinationTableId); | ||
copyTable(bigquery, tmpPartitionTableId, destinationTableId, WriteDisposition.WRITE_EMPTY); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shouldn't we be copying data from the destinationTableId into tmpPartitionedTableId instead of the other way around? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. first, it does a Finally, we simply need to "rename" the tmp back to destinationTableId (and make a last simple delete/copy for that) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see, so we:
|
||
bigquery.delete(tmpPartitionTableId); | ||
} | ||
}); | ||
} catch (final InterruptedException e) { | ||
LOGGER.warn("Had errors while partitioning: ", e); | ||
} | ||
} | ||
|
||
protected String getCreatePartitionedTableFromSelectQuery(final Schema schema, | ||
final String projectId, | ||
final TableId destinationTableId, | ||
final String tmpPartitionTable) { | ||
return String.format("create table `%s.%s.%s` (", projectId, destinationTableId.getDataset(), tmpPartitionTable) | ||
+ schema.getFields().stream() | ||
.map(field -> String.format("%s %s", field.getName(), field.getType())) | ||
.collect(Collectors.joining(", ")) | ||
+ ") partition by date(" | ||
+ JavaBaseConstants.COLUMN_NAME_EMITTED_AT | ||
+ ") as select " | ||
+ schema.getFields().stream() | ||
.map(Field::getName) | ||
.collect(Collectors.joining(", ")) | ||
+ String.format(" from `%s.%s.%s`", projectId, destinationTableId.getDataset(), destinationTableId.getTable()); | ||
} | ||
|
||
private void printHeapMemoryConsumption() { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this other PR, it's converted into seconds instead of microseconds? These changes should be somehow equivalent, right?
WDYT @etsybaev @andresbravog?
https://github.com/airbytehq/airbyte/pull/5981/files#r734312334