Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deprecate PART_SIZE_MB in connectors using S3/GCS storage #13753

Merged
merged 22 commits into from
Jun 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
c853af3
Removed part_size from connectors that use StreamTransferManager
VitaliiMaltsev Jun 14, 2022
416ae7a
fixed S3DestinationConfigTest
VitaliiMaltsev Jun 14, 2022
00dbd77
fixed S3JsonlFormatConfigTest
VitaliiMaltsev Jun 14, 2022
bb6fbbf
Merge branch 'master' into vmaltsev/11389-deprecate-part-size
VitaliiMaltsev Jun 14, 2022
189b276
Merge branch 'master' into vmaltsev/11389-deprecate-part-size
VitaliiMaltsev Jun 16, 2022
4e02f0c
upadate changelog and bump version
VitaliiMaltsev Jun 17, 2022
0a8de1a
auto-bump connector version
octavia-squidington-iii Jun 17, 2022
1124955
auto-bump connector version
octavia-squidington-iii Jun 17, 2022
9d92ed5
Merge branch 'vmaltsev/11389-deprecate-part-size' of https://github.c…
octavia-squidington-iii Jun 17, 2022
9b86a1d
auto-bump connector version
octavia-squidington-iii Jun 17, 2022
78e1dcb
Merge branch 'vmaltsev/11389-deprecate-part-size' of https://github.c…
octavia-squidington-iii Jun 17, 2022
fce483d
auto-bump connector version
octavia-squidington-iii Jun 17, 2022
664dc1a
Merge branch 'vmaltsev/11389-deprecate-part-size' of https://github.c…
octavia-squidington-iii Jun 17, 2022
74aa129
Merge branch 'master' into vmaltsev/11389-deprecate-part-size
VitaliiMaltsev Jun 19, 2022
88577d7
upadate changelog and bump version for Redshift and Snowflake destina…
VitaliiMaltsev Jun 19, 2022
fdaa38e
Merge remote-tracking branch 'origin/vmaltsev/11389-deprecate-part-si…
VitaliiMaltsev Jun 19, 2022
0477041
auto-bump connector version
octavia-squidington-iii Jun 19, 2022
b2c7592
Merge branch 'master' into vmaltsev/11389-deprecate-part-size
VitaliiMaltsev Jun 20, 2022
e4a42e5
fix GCS staging test
VitaliiMaltsev Jun 20, 2022
a63c43e
Merge remote-tracking branch 'origin/vmaltsev/11389-deprecate-part-si…
VitaliiMaltsev Jun 20, 2022
738548d
fix GCS staging test
VitaliiMaltsev Jun 20, 2022
b4c72e8
auto-bump connector version
octavia-squidington-iii Jun 20, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
- name: BigQuery
destinationDefinitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
dockerRepository: airbyte/destination-bigquery
dockerImageTag: 1.1.8
dockerImageTag: 1.1.9
documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery
icon: bigquery.svg
resourceRequirements:
Expand All @@ -40,7 +40,7 @@
- name: BigQuery (denormalized typed struct)
destinationDefinitionId: 079d5540-f236-4294-ba7c-ade8fd918496
dockerRepository: airbyte/destination-bigquery-denormalized
dockerImageTag: 1.1.8
dockerImageTag: 1.1.9
documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery
icon: bigquery.svg
resourceRequirements:
Expand Down Expand Up @@ -100,7 +100,7 @@
- name: Google Cloud Storage (GCS)
destinationDefinitionId: ca8f6566-e555-4b40-943a-545bf123117a
dockerRepository: airbyte/destination-gcs
dockerImageTag: 0.2.7
dockerImageTag: 0.2.8
documentationUrl: https://docs.airbyte.io/integrations/destinations/gcs
icon: googlecloudstorage.svg
resourceRequirements:
Expand Down Expand Up @@ -225,7 +225,7 @@
- name: Redshift
destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
dockerRepository: airbyte/destination-redshift
dockerImageTag: 0.3.39
dockerImageTag: 0.3.40
documentationUrl: https://docs.airbyte.io/integrations/destinations/redshift
icon: redshift.svg
resourceRequirements:
Expand All @@ -244,7 +244,7 @@
- name: S3
destinationDefinitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
dockerRepository: airbyte/destination-s3
dockerImageTag: 0.3.7
dockerImageTag: 0.3.8
documentationUrl: https://docs.airbyte.io/integrations/destinations/s3
icon: s3.svg
resourceRequirements:
Expand All @@ -264,7 +264,7 @@
- name: Snowflake
destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerRepository: airbyte/destination-snowflake
dockerImageTag: 0.4.28
dockerImageTag: 0.4.29
documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake
icon: snowflake.svg
resourceRequirements:
Expand Down
107 changes: 10 additions & 97 deletions airbyte-config/init/src/main/resources/seed/destination_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-bigquery:1.1.8"
- dockerImage: "airbyte/destination-bigquery:1.1.9"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/bigquery"
connectionSpecification:
Expand Down Expand Up @@ -438,19 +438,6 @@
examples:
- "data_sync/test"
order: 3
part_size_mb:
title: "Block Size (MB) for GCS Multipart Upload (Optional)"
description: "This is the size of a \"Part\" being buffered in memory.\
\ It limits the memory usage when writing. Larger values will allow\
\ to upload a bigger files and improve the speed, but consumes more\
\ memory. Allowed values: min=5MB, max=525MB Default: 5MB."
type: "integer"
default: 5
minimum: 5
maximum: 525
examples:
- 5
order: 4
keep_files_in_gcs-bucket:
type: "string"
description: "This upload method is supposed to temporary store records\
Expand All @@ -462,7 +449,7 @@
enum:
- "Delete all tmp files from GCS"
- "Keep all tmp files in GCS"
order: 5
order: 4
credentials_json:
type: "string"
description: "The contents of the JSON service account key. Check out the\
Expand Down Expand Up @@ -510,7 +497,7 @@
- "overwrite"
- "append"
- "append_dedup"
- dockerImage: "airbyte/destination-bigquery-denormalized:1.1.8"
- dockerImage: "airbyte/destination-bigquery-denormalized:1.1.9"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/bigquery"
connectionSpecification:
Expand Down Expand Up @@ -627,19 +614,6 @@
examples:
- "data_sync/test"
order: 3
part_size_mb:
title: "Block Size (MB) for GCS Multipart Upload (Optional)"
description: "This is the size of a \"Part\" being buffered in memory.\
\ It limits the memory usage when writing. Larger values will allow\
\ to upload a bigger files and improve the speed, but consumes more\
\ memory. Allowed values: min=5MB, max=525MB Default: 5MB."
type: "integer"
default: 5
minimum: 5
maximum: 525
examples:
- 5
order: 4
keep_files_in_gcs-bucket:
type: "string"
description: "This upload method is supposed to temporary store records\
Expand All @@ -651,7 +625,7 @@
enum:
- "Delete all tmp files from GCS"
- "Keep all tmp files in GCS"
order: 5
order: 4
credentials_json:
type: "string"
description: "The contents of the JSON service account key. Check out the\
Expand Down Expand Up @@ -1486,7 +1460,7 @@
- "overwrite"
- "append"
supportsNamespaces: true
- dockerImage: "airbyte/destination-gcs:0.2.7"
- dockerImage: "airbyte/destination-gcs:0.2.8"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/gcs"
connectionSpecification:
Expand Down Expand Up @@ -1720,16 +1694,6 @@
enum:
- "snappy"
default: "snappy"
part_size_mb:
title: "Block Size (MB) for GCS multipart upload (Optional)"
description: "This is the size of a \"Part\" being buffered in memory.\
\ It limits the memory usage when writing. Larger values will allow\
\ to upload a bigger files and improve the speed, but consumes9\
\ more memory. Allowed values: min=5MB, max=525MB Default: 5MB."
type: "integer"
default: 5
examples:
- 5
- title: "CSV: Comma-Separated Values"
required:
- "format_type"
Expand All @@ -1748,16 +1712,6 @@
enum:
- "No flattening"
- "Root level flattening"
part_size_mb:
title: "Block Size (MB) for GCS multipart upload (Optional)"
description: "This is the size of a \"Part\" being buffered in memory.\
\ It limits the memory usage when writing. Larger values will allow\
\ to upload a bigger files and improve the speed, but consumes9\
\ more memory. Allowed values: min=5MB, max=525MB Default: 5MB."
type: "integer"
default: 5
examples:
- 5
compression:
title: "Compression"
type: "object"
Expand Down Expand Up @@ -1792,16 +1746,6 @@
enum:
- "JSONL"
default: "JSONL"
part_size_mb:
title: "Block Size (MB) for GCS multipart upload (Optional)"
description: "This is the size of a \"Part\" being buffered in memory.\
\ It limits the memory usage when writing. Larger values will allow\
\ to upload a bigger files and improve the speed, but consumes9\
\ more memory. Allowed values: min=5MB, max=525MB Default: 5MB."
type: "integer"
default: 5
examples:
- 5
compression:
title: "Compression"
type: "object"
Expand Down Expand Up @@ -3678,7 +3622,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-redshift:0.3.39"
- dockerImage: "airbyte/destination-redshift:0.3.40"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/redshift"
connectionSpecification:
Expand Down Expand Up @@ -3822,22 +3766,6 @@
\ key."
title: "S3 Access Key"
airbyte_secret: true
part_size:
type: "integer"
minimum: 10
maximum: 100
examples:
- "10"
description: "Increase this if syncing tables larger than 100GB. Only\
\ relevant for COPY. Files are streamed to S3 in parts. This determines\
\ the size of each part, in MBs. As S3 has a limit of 10,000 parts\
\ per file, part size affects the table size. This is 10MB by default,\
\ resulting in a default limit of 100GB tables. Note: a larger part\
\ size will result in larger memory requirements. A rule of thumb\
\ is to multiply the part size by 10 to get the memory requirement.\
\ Modify this with care. See <a href=\"https://docs.airbyte.com/integrations/destinations/redshift/#:~:text=above%20key%20id.-,Part%20Size,-Affects%20the%20size\"\
,> docs</a> for details."
title: "Stream Part Size (Optional)"
purge_staging_data:
title: "Purge Staging Files and Tables (Optional)"
type: "boolean"
Expand Down Expand Up @@ -3895,7 +3823,7 @@
supported_destination_sync_modes:
- "append"
- "overwrite"
- dockerImage: "airbyte/destination-s3:0.3.7"
- dockerImage: "airbyte/destination-s3:0.3.8"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/s3"
connectionSpecification:
Expand Down Expand Up @@ -4314,7 +4242,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-snowflake:0.4.28"
- dockerImage: "airbyte/destination-snowflake:0.4.29"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/snowflake"
connectionSpecification:
Expand Down Expand Up @@ -4546,36 +4474,21 @@
title: "S3 Access Key"
airbyte_secret: true
order: 4
part_size:
type: "integer"
default: 5
examples:
- 5
description: "Optional. Increase this if syncing tables larger than\
\ 100GB. Only relevant for COPY. Files are streamed to S3 in parts.\
\ This determines the size of each part, in MBs. As S3 has a limit\
\ of 10,000 parts per file, part size affects the table size. This\
\ is 10MB by default, resulting in a default limit of 100GB tables.\
\ Note, a larger part size will result in larger memory requirements.\
\ A rule of thumb is to multiply the part size by 10 to get the\
\ memory requirement. Modify this with care."
title: "Stream Part Size"
order: 5
purge_staging_data:
title: "Purge Staging Files and Tables"
type: "boolean"
description: "Whether to delete the staging files from S3 after completing\
\ the sync. See the docs for details. Only relevant for COPY. Defaults\
\ to true."
default: true
order: 6
order: 5
encryption:
title: "Encryption"
type: "object"
description: "How to encrypt the staging data"
default:
encryption_type: "none"
order: 7
order: 6
oneOf:
- title: "No encryption"
description: "Staging data will be stored in plaintext."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,16 +112,6 @@
"examples": ["data_sync/test"],
"order": 3
},
"part_size_mb": {
"title": "Block Size (MB) for GCS Multipart Upload (Optional)",
"description": "This is the size of a \"Part\" being buffered in memory. It limits the memory usage when writing. Larger values will allow to upload a bigger files and improve the speed, but consumes more memory. Allowed values: min=5MB, max=525MB Default: 5MB.",
"type": "integer",
"default": 5,
"minimum": 5,
"maximum": 525,
"examples": [5],
"order": 4
},
"keep_files_in_gcs-bucket": {
"type": "string",
"description": "This upload method is supposed to temporary store records in GCS bucket. By this select you can chose if these records should be removed from GCS when migration has finished. The default \"Delete all tmp files from GCS\" value is used if not set explicitly.",
Expand All @@ -131,7 +121,7 @@
"Delete all tmp files from GCS",
"Keep all tmp files in GCS"
],
"order": 5
"order": 4
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ protected JsonNode createConfig() throws IOException {
.put(BigQueryConsts.METHOD, BigQueryConsts.GCS_STAGING)
.put(BigQueryConsts.GCS_BUCKET_NAME, gcsConfigFromSecretFile.get(BigQueryConsts.GCS_BUCKET_NAME))
.put(BigQueryConsts.GCS_BUCKET_PATH, gcsConfigFromSecretFile.get(BigQueryConsts.GCS_BUCKET_PATH).asText() + System.currentTimeMillis())
.put(BigQueryConsts.PART_SIZE, gcsConfigFromSecretFile.get(BigQueryConsts.PART_SIZE))
.put(BigQueryConsts.CREDENTIAL, credential)
.build());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ public class BigQueryConsts {
public static final String FORMAT = "format";
public static final String KEEP_GCS_FILES = "keep_files_in_gcs-bucket";
public static final String KEEP_GCS_FILES_VAL = "Keep all tmp files in GCS";
public static final String PART_SIZE = "part_size_mb";

public static final String NAMESPACE_PREFIX = "n";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,7 @@ public static JsonNode getGcsJsonNodeConfig(final JsonNode config) {
.put(BigQueryConsts.CREDENTIAL, loadingMethod.get(BigQueryConsts.CREDENTIAL))
.put(BigQueryConsts.FORMAT, Jsons.deserialize("{\n"
+ " \"format_type\": \"CSV\",\n"
+ " \"flattening\": \"No flattening\",\n"
+ " \"part_size_mb\": \"" + loadingMethod.get(BigQueryConsts.PART_SIZE) + "\"\n"
+ " \"flattening\": \"No flattening\"\n"
+ "}"))
.build());

Expand All @@ -165,8 +164,7 @@ public static JsonNode getGcsAvroJsonNodeConfig(final JsonNode config) {
.put(BigQueryConsts.CREDENTIAL, loadingMethod.get(BigQueryConsts.CREDENTIAL))
.put(BigQueryConsts.FORMAT, Jsons.deserialize("{\n"
+ " \"format_type\": \"AVRO\",\n"
+ " \"flattening\": \"No flattening\",\n"
+ " \"part_size_mb\": \"" + loadingMethod.get(BigQueryConsts.PART_SIZE) + "\"\n"
+ " \"flattening\": \"No flattening\"\n"
+ "}"))
.build());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,16 +149,6 @@
"examples": ["data_sync/test"],
"order": 3
},
"part_size_mb": {
"title": "Block Size (MB) for GCS Multipart Upload (Optional)",
"description": "This is the size of a \"Part\" being buffered in memory. It limits the memory usage when writing. Larger values will allow to upload a bigger files and improve the speed, but consumes more memory. Allowed values: min=5MB, max=525MB Default: 5MB.",
"type": "integer",
"default": 5,
"minimum": 5,
"maximum": 525,
"examples": [5],
"order": 4
},
"keep_files_in_gcs-bucket": {
"type": "string",
"description": "This upload method is supposed to temporary store records in GCS bucket. By this select you can chose if these records should be removed from GCS when migration has finished. The default \"Delete all tmp files from GCS\" value is used if not set explicitly.",
Expand All @@ -168,7 +158,7 @@
"Delete all tmp files from GCS",
"Keep all tmp files in GCS"
],
"order": 5
"order": 4
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ private List<JsonNode> retrieveRecordsFromTable(final String tableName, final St
final FieldList fields = queryResults.getSchema().getFields();
BigQuerySourceOperations sourceOperations = new BigQuerySourceOperations();

return Streams.stream(queryResults.iterateAll())
return Streams.stream(queryResults.iterateAll())
.map(fieldValues -> sourceOperations.rowToJson(new BigQueryResultSet(fieldValues, fields))).collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ protected void setup(final TestDestinationEnv testEnv) throws Exception {
.put(BigQueryConsts.METHOD, BigQueryConsts.GCS_STAGING)
.put(BigQueryConsts.GCS_BUCKET_NAME, gcsConfigFromSecretFile.get(BigQueryConsts.GCS_BUCKET_NAME))
.put(BigQueryConsts.GCS_BUCKET_PATH, gcsConfigFromSecretFile.get(BigQueryConsts.GCS_BUCKET_PATH).asText() + System.currentTimeMillis())
.put(BigQueryConsts.PART_SIZE, gcsConfigFromSecretFile.get(BigQueryConsts.PART_SIZE))
.put(BigQueryConsts.CREDENTIAL, credential)
.build());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,5 @@ protected void compareObjects(JsonNode expectedObject, JsonNode actualObject) {
JsonNode actualJsonNode = (actualObject.isTextual() ? Jsons.deserialize(actualObject.textValue()) : actualObject);
super.compareObjects(expectedObject, actualJsonNode);
}

}
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/destination-gcs/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-gcs

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.2.7
LABEL io.airbyte.version=0.2.8
LABEL io.airbyte.name=airbyte/destination-gcs
Loading