Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination Redshift: Fixed maximum record size for SUPER type #12940

Merged
merged 9 commits into from
May 20, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@
- name: Redshift
destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
dockerRepository: airbyte/destination-redshift
dockerImageTag: 0.3.34
dockerImageTag: 0.3.35
documentationUrl: https://docs.airbyte.io/integrations/destinations/redshift
icon: redshift.svg
resourceRequirements:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3612,7 +3612,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-redshift:0.3.34"
- dockerImage: "airbyte/destination-redshift:0.3.35"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/redshift"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ void testSyncVeryBigRecords() throws Exception {
.withEmittedAt(Instant.now().toEpochMilli())
.withData(Jsons.jsonNode(ImmutableMap.builder()
.put("id", 3)
.put("currency", generateBigString(0))
.put("currency", generateBigString(getGenerateBigStringAddExtraCharacters()))
.put("date", "2020-10-10T00:00:00Z")
.put("HKD", 10.5)
.put("NZD", 1.14)
Expand All @@ -732,6 +732,10 @@ private String generateBigString(final int addExtraCharacters) {
.toString();
}

protected int getGenerateBigStringAddExtraCharacters() {
return 0;
}

/**
* @return the max limit length allowed for values in the destination.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-redshift

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.3.34
LABEL io.airbyte.version=0.3.35
LABEL io.airbyte.name=airbyte/destination-redshift
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -27,6 +28,7 @@ public class RedshiftSqlOperations extends JdbcSqlOperations {

private static final Logger LOGGER = LoggerFactory.getLogger(RedshiftSqlOperations.class);
public static final int REDSHIFT_VARCHAR_MAX_BYTE_SIZE = 65535;
public static final int REDSHIFT_SUPER_MAX_BYTE_SIZE = 1000000;

private static final String SELECT_ALL_TABLES_WITH_NOT_SUPER_TYPE_SQL_STATEMENT = """
select tablename, schemaname
Expand Down Expand Up @@ -92,9 +94,25 @@ public void insertRecordsInternal(final JdbcDatabase database,

@Override
public boolean isValidData(final JsonNode data) {
// check overall size of the SUPER data
final String stringData = Jsons.serialize(data);
final int dataSize = stringData.getBytes(StandardCharsets.UTF_8).length;
return dataSize <= REDSHIFT_VARCHAR_MAX_BYTE_SIZE;
boolean isValid = dataSize <= REDSHIFT_SUPER_MAX_BYTE_SIZE;

// check VARCHAR limits for VARCHAR fields within the SUPER object, if overall object is valid
if (isValid) {
Map<String, Object> dataMap = Jsons.flatten(data);
for (Object value : dataMap.values()) {
if (value instanceof String stringValue) {
final int stringDataSize = stringValue.getBytes(StandardCharsets.UTF_8).length;
isValid = stringDataSize <= REDSHIFT_VARCHAR_MAX_BYTE_SIZE;
if (!isValid) {
break;
}
}
}
}
return isValid;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,4 +179,9 @@ protected int getMaxRecordValueLimit() {
return RedshiftSqlOperations.REDSHIFT_VARCHAR_MAX_BYTE_SIZE;
}

@Override
protected int getGenerateBigStringAddExtraCharacters() {
return 1;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.redshift.operations;

import static org.junit.jupiter.api.Assertions.assertEquals;

import java.util.Random;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;

import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;

import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.destination.redshift.enums.RedshiftDataTmpTableMode;

@DisplayName("RedshiftSqlOperations")
public class RedshiftSqlOperationsTest {

private static final Random RANDOM = new Random();

private static final RedshiftDataTmpTableMode redshiftDataTmpTableMode = RedshiftDataTmpTableMode.SUPER;

private String generateBigString(final int addExtraCharacters) {
final int length = RedshiftSqlOperations.REDSHIFT_VARCHAR_MAX_BYTE_SIZE + addExtraCharacters;
return RANDOM
.ints('a', 'z' + 1)
.limit(length)
.collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append)
.toString();
}

@Test
@DisplayName("isValidData should return true for valid data")
public void isValidDataForValid() {
JsonNode testNode = Jsons.jsonNode(ImmutableMap.builder()
.put("id", 3)
.put("currency", generateBigString(0))
.put("date", "2020-10-10T00:00:00Z")
.put("HKD", 10.5)
.put("NZD", 1.14)
.build());

RedshiftSqlOperations uut = new RedshiftSqlOperations(redshiftDataTmpTableMode);
boolean isValid = uut.isValidData(testNode);
assertEquals(true, isValid);
}

@Test
@DisplayName("isValidData should return false for invalid data - string too long")
public void isValidDataForInvalidNode() {
JsonNode testNode = Jsons.jsonNode(ImmutableMap.builder()
.put("id", 3)
.put("currency", generateBigString(1))
.put("date", "2020-10-10T00:00:00Z")
.put("HKD", 10.5)
.put("NZD", 1.14)
.build());

RedshiftSqlOperations uut = new RedshiftSqlOperations(redshiftDataTmpTableMode);
boolean isValid = uut.isValidData(testNode);
assertEquals(false, isValid);
}

@Test
@DisplayName("isValidData should return false for invalid data - total object too big")
public void isValidDataForInvalidObject() {
JsonNode testNode = Jsons.jsonNode(ImmutableMap.builder()
.put("key1", generateBigString(-1))
.put("key2", generateBigString(-1))
.put("key3", generateBigString(-1))
.put("key4", generateBigString(-1))
.put("key5", generateBigString(-1))
.put("key6", generateBigString(-1))
.put("key7", generateBigString(-1))
.put("key8", generateBigString(-1))
.put("key9", generateBigString(-1))
.put("key10", generateBigString(-1))
.put("key11", generateBigString(-1))
.put("key12", generateBigString(-1))
.put("key13", generateBigString(-1))
.put("key14", generateBigString(-1))
.put("key15", generateBigString(-1))
.put("key16", generateBigString(-1))
.put("key17", generateBigString(-1))
.put("key18", generateBigString(-1))
.put("key19", generateBigString(-1))
.put("key20", generateBigString(-1))
.build());

RedshiftSqlOperations uut = new RedshiftSqlOperations(redshiftDataTmpTableMode);
boolean isValid = uut.isValidData(testNode);
assertEquals(false, isValid);
}

}
5 changes: 3 additions & 2 deletions docs/integrations/destinations/redshift.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ Therefore, Airbyte Redshift destination will create tables and schemas using the

### Data Size Limitations

Redshift specifies a maximum limit of 65535 bytes to store the raw JSON record data. Thus, when a row is too big to fit, the Redshift destination fails to load such data and currently ignores that record.
See [docs](https://docs.aws.amazon.com/redshift/latest/dg/r_Character_types.html)
Redshift specifies a maximum limit of 1MB (and 65535 bytes for any VARCHAR fields within the JSON record) to store the raw JSON record data. Thus, when a row is too big to fit, the Redshift destination fails to load such data and currently ignores that record.
See docs for [SUPER](https://docs.aws.amazon.com/redshift/latest/dg/r_SUPER_type.html) and [SUPER limitations](https://docs.aws.amazon.com/redshift/latest/dg/limitations-super.html)

### Encryption

Expand Down Expand Up @@ -138,6 +138,7 @@ Each stream will be output into its own raw table in Redshift. Each table will c

| Version | Date | Pull Request | Subject |
|:--------|:-----------| :----- | :------ |
| 0.3.35 | 2022-05-18 | [12940](https://github.com/airbytehq/airbyte/pull/12940) | Fixed maximum record size for SUPER type |
| 0.3.34 | 2022-05-16 | [12869](https://github.com/airbytehq/airbyte/pull/12869) | Fixed NPE in S3 staging check |
| 0.3.33 | 2022-05-04 | [12601](https://github.com/airbytehq/airbyte/pull/12601) | Apply buffering strategy for S3 staging |
| 0.3.32 | 2022-04-20 | [12085](https://github.com/airbytehq/airbyte/pull/12085) | Fixed bug with switching between INSERT and COPY config |
Expand Down