Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BigQuery/BiqQuery denorm Destinations : Add possibility to use different types of GCS files #8788

Merged
merged 58 commits into from
Dec 20, 2021
Merged
Show file tree
Hide file tree
Changes from 57 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
32025e1
impl BigQuery uploader
DoNotPanicUA Dec 8, 2021
d0d7c2f
rework BigQuery destination in order to use Uploaders
DoNotPanicUA Dec 9, 2021
b376986
fix constructor
DoNotPanicUA Dec 9, 2021
1accaaf
fix test
DoNotPanicUA Dec 9, 2021
7a7a369
add job waiting
DoNotPanicUA Dec 10, 2021
b86e8e2
fix correct closing
DoNotPanicUA Dec 10, 2021
547f194
Throw exception if something goes wrong.
DoNotPanicUA Dec 13, 2021
0719cf4
Move data format logic to the formatters.
DoNotPanicUA Dec 13, 2021
af46fbd
remove duplication from tests
DoNotPanicUA Dec 13, 2021
8e45603
JsonToAvro schema transformation
DoNotPanicUA Dec 14, 2021
ddd9d36
destination-bigquery: print BQ error when sync fails
mkhokh-33 Dec 14, 2021
2f4ef57
add uploader config
DoNotPanicUA Dec 14, 2021
e8bc1e9
uncommit table drop
DoNotPanicUA Dec 14, 2021
30bbe11
switch to S3 json converter + disable invalid tests
DoNotPanicUA Dec 14, 2021
9e0c5ad
fix test stuck
DoNotPanicUA Dec 15, 2021
14953cb
move getSchema to formatters
DoNotPanicUA Dec 15, 2021
5034c19
replace Date-time to Timestamp for the Denormalized Avro processing. …
DoNotPanicUA Dec 16, 2021
7b471cb
make test in line with GCS impl
DoNotPanicUA Dec 16, 2021
b7ecfa7
add string type for Ref fields (GCS impl)
DoNotPanicUA Dec 16, 2021
ebccd08
fix format root of NULL with nested date-time fields
mkhokh-33 Dec 16, 2021
cfec7ec
adopt array processing
DoNotPanicUA Dec 16, 2021
5ff3c90
minor fix
DoNotPanicUA Dec 16, 2021
1dfccad
fix incorrect array filtering
DoNotPanicUA Dec 16, 2021
121b15f
format
DoNotPanicUA Dec 16, 2021
a633a70
Connector throw error after close no more.
DoNotPanicUA Dec 16, 2021
6a91964
uncommit fixed tests
DoNotPanicUA Dec 16, 2021
6a7099d
format
DoNotPanicUA Dec 17, 2021
47777a9
Add GCS to denormalized spec
DoNotPanicUA Dec 17, 2021
48cd0ed
Increase version
DoNotPanicUA Dec 17, 2021
7aceb3a
Merge remote-tracking branch 'origin/master' into aleonets/oncall27-bigq
DoNotPanicUA Dec 17, 2021
3b2a5ba
fix s3 tests
DoNotPanicUA Dec 17, 2021
9686ef1
review upd
DoNotPanicUA Dec 17, 2021
4b550fc
fix gcs
DoNotPanicUA Dec 17, 2021
41619ad
format + minor beautifier
DoNotPanicUA Dec 17, 2021
a878bfd
revert docker file change
DoNotPanicUA Dec 17, 2021
8b6aaba
Merge remote-tracking branch 'origin/master' into aleonets/oncall27-bigq
DoNotPanicUA Dec 17, 2021
f88693f
correct condition
DoNotPanicUA Dec 17, 2021
704f361
Update airbyte-integrations/connectors/destination-s3/src/main/java/i…
DoNotPanicUA Dec 18, 2021
f52f43c
Update airbyte-integrations/connectors/destination-bigquery-denormali…
DoNotPanicUA Dec 18, 2021
0c7325b
Update airbyte-integrations/connectors/destination-bigquery-denormali…
DoNotPanicUA Dec 18, 2021
668eb8c
Update airbyte-integrations/connectors/destination-bigquery/src/main/…
DoNotPanicUA Dec 18, 2021
07c42d1
don't throw warns during record formatting
DoNotPanicUA Dec 18, 2021
d828e9d
add back adding String to all logical avro S3 types controlled by param.
DoNotPanicUA Dec 18, 2021
299ac94
add docs
DoNotPanicUA Dec 18, 2021
793d331
fix internal call.
DoNotPanicUA Dec 18, 2021
d737414
fix GCS AVRO integration test
DoNotPanicUA Dec 18, 2021
0981d77
Merge remote-tracking branch 'origin/aleonets/oncall27-bigq' into ale…
DoNotPanicUA Dec 18, 2021
3871cfe
Update airbyte-integrations/connectors/destination-bigquery/src/main/…
DoNotPanicUA Dec 18, 2021
68a1206
Update airbyte-integrations/connectors/destination-gcs/src/main/java/…
DoNotPanicUA Dec 18, 2021
661832d
remove unused code
DoNotPanicUA Dec 18, 2021
f6431e7
Merge remote-tracking branch 'origin/aleonets/oncall27-bigq' into ale…
DoNotPanicUA Dec 18, 2021
1a2dcdf
remove duplication in BigQuery formatters
DoNotPanicUA Dec 18, 2021
715163a
replace group file delete to specific file delete to avoid deleting u…
DoNotPanicUA Dec 18, 2021
0a06d81
restore docker files
DoNotPanicUA Dec 18, 2021
f31f8f6
Revert "restore docker files"
DoNotPanicUA Dec 18, 2021
218d421
Merge branch 'master' into aleonets/oncall27-bigq
alexandr-shegeda Dec 20, 2021
2af0e95
resolved conflicts
alexandr-shegeda Dec 20, 2021
0ca8fc6
Update airbyte-integrations/connectors/destination-bigquery/Dockerfile
sherifnada Dec 20, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ WORKDIR /airbyte

ENV APPLICATION destination-bigquery-denormalized

ADD build/distributions/${APPLICATION}*.tar /airbyte
COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
DoNotPanicUA marked this conversation as resolved.
Show resolved Hide resolved

LABEL io.airbyte.version=0.1.11
RUN tar xf ${APPLICATION}.tar --strip-components=1
DoNotPanicUA marked this conversation as resolved.
Show resolved Hide resolved

LABEL io.airbyte.version=0.2.0
LABEL io.airbyte.name=airbyte/destination-bigquery-denormalized
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ dependencies {
implementation project(':airbyte-integrations:bases:base-java')
implementation project(':airbyte-integrations:connectors:destination-bigquery')
implementation project(':airbyte-protocol:models')
implementation project(':airbyte-integrations:connectors:destination-s3')
implementation project(':airbyte-integrations:connectors:destination-gcs')

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-bigquery')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-bigquery-denormalized')


implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,204 +5,43 @@
package io.airbyte.integrations.destination.bigquery;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.Field.Builder;
import com.google.cloud.bigquery.Field.Mode;
import com.google.cloud.bigquery.FieldList;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.common.base.Preconditions;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter;
import io.airbyte.integrations.destination.bigquery.formatter.DefaultBigQueryDenormalizedRecordFormatter;
import io.airbyte.integrations.destination.bigquery.formatter.GcsBigQueryDenormalizedRecordFormatter;
import io.airbyte.integrations.destination.bigquery.uploader.UploaderType;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BigQueryDenormalizedDestination extends BigQueryDestination {

private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDenormalizedDestination.class);

public static final String NESTED_ARRAY_FIELD = "value";
protected static final String PROPERTIES_FIELD = "properties";
private static final String TYPE_FIELD = "type";
private static final String FORMAT_FIELD = "format";
private static final String REF_DEFINITION_KEY = "$ref";

private final Set<String> fieldsContainRefDefinitionValue = new HashSet<>();

@Override
protected String getTargetTableName(final String streamName) {
// This BigQuery destination does not write to a staging "raw" table but directly to a normalized
// table
return getNamingResolver().getIdentifier(streamName);
}

protected AirbyteMessageConsumer getRecordConsumer(final BigQuery bigquery,
final Map<AirbyteStreamNameNamespacePair, BigQueryWriteConfig> writeConfigs,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector,
final boolean isGcsUploadingMode,
final boolean isKeepFilesInGcs) {
return new BigQueryDenormalizedRecordConsumer(bigquery, writeConfigs, catalog, outputRecordCollector, getNamingResolver(),
fieldsContainRefDefinitionValue);
}

@Override
protected Schema getBigQuerySchema(final JsonNode jsonSchema) {
final List<Field> fieldList = getSchemaFields(getNamingResolver(), jsonSchema);
if (fieldList.stream().noneMatch(f -> f.getName().equals(JavaBaseConstants.COLUMN_NAME_AB_ID))) {
fieldList.add(Field.of(JavaBaseConstants.COLUMN_NAME_AB_ID, StandardSQLTypeName.STRING));
}
if (fieldList.stream().noneMatch(f -> f.getName().equals(JavaBaseConstants.COLUMN_NAME_EMITTED_AT))) {
fieldList.add(Field.of(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, StandardSQLTypeName.TIMESTAMP));
}
return com.google.cloud.bigquery.Schema.of(fieldList);
}

private List<Field> getSchemaFields(final BigQuerySQLNameTransformer namingResolver, final JsonNode jsonSchema) {
Preconditions.checkArgument(jsonSchema.isObject() && jsonSchema.has(PROPERTIES_FIELD));
final ObjectNode properties = (ObjectNode) jsonSchema.get(PROPERTIES_FIELD);
List<Field> tmpFields = Jsons.keys(properties).stream()
.peek(addToRefList(properties))
.map(key -> getField(namingResolver, key, properties.get(key))
.build())
.collect(Collectors.toList());
if (!fieldsContainRefDefinitionValue.isEmpty()) {
LOGGER.warn("Next fields contain \"$ref\" as Definition: {}. They are going to be saved as String Type column",
fieldsContainRefDefinitionValue);
}
return tmpFields;
protected Map<UploaderType, BigQueryRecordFormatter> getFormatterMap(JsonNode jsonSchema) {
DoNotPanicUA marked this conversation as resolved.
Show resolved Hide resolved
return Map.of(UploaderType.STANDARD, new DefaultBigQueryDenormalizedRecordFormatter(jsonSchema, getNamingResolver()),
UploaderType.AVRO, new GcsBigQueryDenormalizedRecordFormatter(jsonSchema, getNamingResolver()));
}

/**
* @param properties - JSON schema with properties
*
* The method is responsible for population of fieldsContainRefDefinitionValue set with keys
* contain $ref definition
*
* Currently, AirByte doesn't support parsing value by $ref key definition. The issue to
* track this <a href="https://github.com/airbytehq/airbyte/issues/7725">7725</a>
* BigQuery might have different structure of the Temporary table.
* If this method returns TRUE, temporary table will have only three common Airbyte attributes.
* In case of FALSE, temporary table structure will be in line with Airbyte message JsonSchema.
* @return use default AirbyteSchema or build using JsonSchema
*/
private Consumer<String> addToRefList(ObjectNode properties) {
return key -> {
if (properties.get(key).has(REF_DEFINITION_KEY)) {
fieldsContainRefDefinitionValue.add(key);
}
};
}

private static Builder getField(final BigQuerySQLNameTransformer namingResolver, final String key, final JsonNode fieldDefinition) {

final String fieldName = namingResolver.getIdentifier(key);
final Builder builder = Field.newBuilder(fieldName, StandardSQLTypeName.STRING);
final List<JsonSchemaType> fieldTypes = getTypes(fieldName, fieldDefinition.get(TYPE_FIELD));
for (int i = 0; i < fieldTypes.size(); i++) {
final JsonSchemaType fieldType = fieldTypes.get(i);
if (fieldType == JsonSchemaType.NULL) {
builder.setMode(Mode.NULLABLE);
}
if (i == 0) {
// Treat the first type in the list with the widest scope as the primary type
final JsonSchemaType primaryType = fieldTypes.get(i);
switch (primaryType) {
case NULL -> {
builder.setType(StandardSQLTypeName.STRING);
}
case STRING, NUMBER, INTEGER, BOOLEAN -> {
builder.setType(primaryType.getBigQueryType());
}
case ARRAY -> {
final JsonNode items;
if (fieldDefinition.has("items")) {
items = fieldDefinition.get("items");
} else {
LOGGER.warn("Source connector provided schema for ARRAY with missed \"items\", will assume that it's a String type");
// this is handler for case when we get "array" without "items"
// (https://github.com/airbytehq/airbyte/issues/5486)
items = getTypeStringSchema();
}
final Builder subField = getField(namingResolver, fieldName, items).setMode(Mode.REPEATED);
// "Array of Array of" (nested arrays) are not permitted by BigQuery ("Array of Record of Array of"
// is)
// Turn all "Array of" into "Array of Record of" instead
return builder.setType(StandardSQLTypeName.STRUCT, subField.setName(NESTED_ARRAY_FIELD).build());
}
case OBJECT -> {
final JsonNode properties;
if (fieldDefinition.has(PROPERTIES_FIELD)) {
properties = fieldDefinition.get(PROPERTIES_FIELD);
} else {
properties = fieldDefinition;
}
final FieldList fieldList = FieldList.of(Jsons.keys(properties)
.stream()
.map(f -> getField(namingResolver, f, properties.get(f)).build())
.collect(Collectors.toList()));
if (fieldList.size() > 0) {
builder.setType(StandardSQLTypeName.STRUCT, fieldList);
} else {
builder.setType(StandardSQLTypeName.STRING);
}
}
default -> {
throw new IllegalStateException(
String.format("Unexpected type for field %s: %s", fieldName, primaryType));
}
}
}
}

// If a specific format is defined, use their specific type instead of the JSON's one
final JsonNode fieldFormat = fieldDefinition.get(FORMAT_FIELD);
if (fieldFormat != null) {
final JsonSchemaFormat schemaFormat = JsonSchemaFormat.fromJsonSchemaFormat(fieldFormat.asText());
if (schemaFormat != null) {
builder.setType(schemaFormat.getBigQueryType());
}
}

return builder;
}

private static JsonNode getTypeStringSchema() {
return Jsons.deserialize("{\n"
+ " \"type\": [\n"
+ " \"string\"\n"
+ " ]\n"
+ " }");
}

private static List<JsonSchemaType> getTypes(final String fieldName, final JsonNode type) {
if (type == null) {
LOGGER.warn("Field {} has no type defined, defaulting to STRING", fieldName);
return List.of(JsonSchemaType.STRING);
} else if (type.isArray()) {
return MoreIterators.toList(type.elements()).stream()
.map(s -> JsonSchemaType.fromJsonSchemaType(s.asText()))
// re-order depending to make sure wider scope types are first
.sorted(Comparator.comparingInt(JsonSchemaType::getOrder))
.collect(Collectors.toList());
} else if (type.isTextual()) {
return Collections.singletonList(JsonSchemaType.fromJsonSchemaType(type.asText()));
} else {
throw new IllegalStateException("Unexpected type: " + type);
}
@Override
protected boolean isDefaultAirbyteTmpTableSchema() {
// Build temporary table structure based on incoming JsonSchema
return false;
}

public static void main(final String[] args) throws Exception {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ public enum JsonSchemaFormat {

DATE("date", StandardSQLTypeName.DATE),
DATETIME("date-time", StandardSQLTypeName.DATETIME),
TIME("time", StandardSQLTypeName.TIME);
TIME("time", StandardSQLTypeName.TIME),
TIMESTAMP("timestamp-micros", StandardSQLTypeName.TIMESTAMP);

private final String jsonSchemaFormat;
private final StandardSQLTypeName bigQueryType;
Expand Down
Loading