Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 New destination: S3 #3672

Merged
merged 28 commits into from
Jun 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
1728c21
Update README icon links
tuliren May 25, 2021
88044a5
Update airbyte-specification doc
tuliren May 25, 2021
b6084bf
Extend base connector
tuliren May 25, 2021
0818163
Remove redundant region
tuliren May 27, 2021
6f4aa9c
Separate warning from info
tuliren May 27, 2021
fdaf213
Implement s3 destination
tuliren May 27, 2021
2d90864
Run format
tuliren May 27, 2021
dc1220e
Clarify logging message
tuliren May 29, 2021
f9d07a0
Rename variables and functions
tuliren May 29, 2021
df040f7
Update documentation
tuliren May 29, 2021
8ec265a
Rename and annotate interface
tuliren May 29, 2021
0048bf7
Inject formatter factory
tuliren May 30, 2021
dffae5a
Remove part size
tuliren May 30, 2021
b026598
Fix spec field names and add unit tests
tuliren May 31, 2021
fc3bba7
Add unit tests for csv output formatter
tuliren May 31, 2021
4efa629
Format code
tuliren May 31, 2021
a6e6856
Complete acceptance test and fix bugs
tuliren Jun 1, 2021
9ac8fcc
Fix uuid
tuliren Jun 2, 2021
d3949f0
Merge branch 'master' into liren/s3-destination-mvp
tuliren Jun 2, 2021
d840504
Remove generator template files
tuliren Jun 2, 2021
dbb2f79
Add unhappy test case
tuliren Jun 2, 2021
434c8a4
Checkin airbyte state message
tuliren Jun 2, 2021
edbcfec
Adjust stream transfer manager parameters
tuliren Jun 2, 2021
8f940a3
Use underscore in filename
tuliren Jun 2, 2021
6cb86c0
Create csv sheet generator to handle data processing
tuliren Jun 2, 2021
60b252a
Format code
tuliren Jun 2, 2021
5373cd7
Add partition id to filename
tuliren Jun 2, 2021
caa44f4
Rename date format variable
tuliren Jun 3, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/publish-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ jobs:
ZENDESK_TALK_TEST_CREDS: ${{ secrets.ZENDESK_TALK_TEST_CREDS }}
ZOOM_INTEGRATION_TEST_CREDS: ${{ secrets.ZOOM_INTEGRATION_TEST_CREDS }}
PLAID_INTEGRATION_TEST_CREDS: ${{ secrets.PLAID_INTEGRATION_TEST_CREDS }}
DESTINATION_S3_INTEGRATION_TEST_CREDS: ${{ secrets.DESTINATION_S3_INTEGRATION_TEST_CREDS }}
- run: |
docker login -u airbytebot -p ${DOCKER_PASSWORD}
./tools/integrations/manage.sh publish airbyte-integrations/${{ github.event.inputs.connector }}
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/test-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ jobs:
ZENDESK_TALK_TEST_CREDS: ${{ secrets.ZENDESK_TALK_TEST_CREDS }}
ZOOM_INTEGRATION_TEST_CREDS: ${{ secrets.ZOOM_INTEGRATION_TEST_CREDS }}
PLAID_INTEGRATION_TEST_CREDS: ${{ secrets.PLAID_INTEGRATION_TEST_CREDS }}
DESTINATION_S3_INTEGRATION_TEST_CREDS: ${{ secrets.DESTINATION_S3_INTEGRATION_TEST_CREDS }}
- run: |
./tools/bin/ci_integration_test.sh ${{ github.event.inputs.connector }}
name: test ${{ github.event.inputs.connector }}
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Introduction

![GitHub Workflow Status](https://img.shields.io/github/workflow/status/airbytehq/airbyte/Airbyte%20CI) ![License](https://img.shields.io/github/license/airbytehq/airbyte)
[![GitHub Workflow Status](https://img.shields.io/github/workflow/status/airbytehq/airbyte/Airbyte%20CI)](https://github.com/airbytehq/airbyte/actions/workflows/gradle.yml) [![License](https://img.shields.io/github/license/airbytehq/airbyte)](./LICENSE)

![](docs/.gitbook/assets/airbyte_horizontal_color_white-background.svg)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"destinationDefinitionId": "4816b78f-1489-44c1-9060-4b19d5fa9362",
"name": "S3",
"dockerRepository": "airbyte/destination-s3",
"dockerImageTag": "0.1.0",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/s3"
}
7 changes: 7 additions & 0 deletions airbyte-config/init/src/main/resources/icons/s3.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@
dockerRepository: airbyte/destination-snowflake
dockerImageTag: 0.3.6
documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake
- destinationDefinitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
name: S3
dockerRepository: airbyte/destination-s3
dockerImageTag: 0.1.0
documentationUrl: https://docs.airbyte.io/integrations/destinations/s3
- destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
name: Redshift
dockerRepository: airbyte/destination-redshift
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,11 @@ public void accept(AirbyteMessage msg) throws Exception {

@Override
public void close() throws Exception {
LOGGER.info("hasFailed: {}.", hasFailed);
if (hasFailed) {
LOGGER.warn("Airbyte message consumer: failed.");
} else {
LOGGER.info("Airbyte message consumer: succeeded.");
}
close(hasFailed);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,17 @@ public abstract class DestinationAcceptanceTest {
* @param streamName - name of the stream for which we are retrieving records.
* @param namespace - the destination namespace records are located in. Null if not applicable.
* Usually a JDBC schema.
* @param streamSchema - schema of the stream to be retrieved. This is only necessary for
* destinations in which data types cannot be accurately inferred (e.g. in CSV destination,
* every value is a string).
* @return All of the records in the destination at the time this method is invoked.
* @throws Exception - can throw any exception, test framework will handle.
*/
protected abstract List<JsonNode> retrieveRecords(TestDestinationEnv testEnv, String streamName, String namespace) throws Exception;
protected abstract List<JsonNode> retrieveRecords(TestDestinationEnv testEnv,
String streamName,
String namespace,
JsonNode streamSchema)
throws Exception;

/**
* Returns a destination's default schema. The default implementation assumes this corresponds to
Expand Down Expand Up @@ -221,10 +228,10 @@ protected boolean implementsRecordSizeLimitChecks() {
}

/**
* Same idea as {@link #retrieveRecords(TestDestinationEnv, String, String)}. Except this method
* should pull records from the table that contains the normalized records and convert them back
* into the data as it would appear in an {@link AirbyteRecordMessage}. Only need to override this
* method if {@link #implementsBasicNormalization} returns true.
* Same idea as {@link #retrieveRecords(TestDestinationEnv, String, String, JsonNode)}. Except this
* method should pull records from the table that contains the normalized records and convert them
* back into the data as it would appear in an {@link AirbyteRecordMessage}. Only need to override
* this method if {@link #implementsBasicNormalization} returns true.
*
* @param testEnv - information about the test environment.
* @param streamName - name of the stream for which we are retrieving records.
Expand Down Expand Up @@ -882,7 +889,7 @@ private void retrieveRawRecordsAndAssertSameMessages(AirbyteCatalog catalog, Lis
for (final AirbyteStream stream : catalog.getStreams()) {
final String streamName = stream.getName();
final String schema = stream.getNamespace() != null ? stream.getNamespace() : defaultSchema;
List<AirbyteRecordMessage> msgList = retrieveRecords(testEnv, streamName, schema)
List<AirbyteRecordMessage> msgList = retrieveRecords(testEnv, streamName, schema, stream.getJsonSchema())
.stream()
.map(data -> new AirbyteRecordMessage().withStream(streamName).withNamespace(schema).withData(data))
.collect(Collectors.toList());
Expand Down Expand Up @@ -922,7 +929,7 @@ private void assertSameData(List<JsonNode> expected, List<JsonNode> actual) {
final Iterator<Entry<String, JsonNode>> expectedDataIterator = expectedData.fields();
LOGGER.info("Expected row {}", expectedData);
LOGGER.info("Actual row {}", actualData);
assertEquals(expectedData.size(), actualData.size());
assertEquals(expectedData.size(), actualData.size(), "Unequal row size");
while (expectedDataIterator.hasNext()) {
final Entry<String, JsonNode> expectedEntry = expectedDataIterator.next();
final JsonNode expectedValue = expectedEntry.getValue();
Expand Down Expand Up @@ -1025,6 +1032,13 @@ public Path getLocalRoot() {
return localRoot;
}

@Override
public String toString() {
return "TestDestinationEnv{" +
"localRoot=" + localRoot +
'}';
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import com.google.cloud.bigquery.WriteChannelConfiguration;
import com.google.common.base.Charsets;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.integrations.BaseConnector;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.base.Destination;
Expand All @@ -56,7 +56,6 @@
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.DestinationSyncMode;
import java.io.ByteArrayInputStream;
import java.io.IOException;
Expand All @@ -70,7 +69,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BigQueryDestination implements Destination {
public class BigQueryDestination extends BaseConnector implements Destination {

private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDestination.class);
static final String CONFIG_DATASET_ID = "dataset_id";
Expand All @@ -88,13 +87,6 @@ public BigQueryDestination() {
namingResolver = new StandardNameTransformer();
}

@Override
public ConnectorSpecification spec() throws IOException {
// return a jsonschema representation of the spec for the integration.
final String resourceString = MoreResources.readResource("spec.json");
return Jsons.deserialize(resourceString, ConnectorSpecification.class);
}

@Override
public AirbyteConnectionStatus check(JsonNode config) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,11 @@ protected List<JsonNode> retrieveNormalizedRecords(TestDestinationEnv testEnv, S
}

@Override
protected List<JsonNode> retrieveRecords(TestDestinationEnv env, String streamName, String namespace) throws Exception {
protected List<JsonNode> retrieveRecords(TestDestinationEnv env,
String streamName,
String namespace,
JsonNode streamSchema)
throws Exception {
return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namingResolver.getIdentifier(namespace))
.stream()
.map(node -> node.get(JavaBaseConstants.COLUMN_NAME_DATA).asText())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.integrations.BaseConnector;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.CommitOnStateAirbyteMessageConsumer;
import io.airbyte.integrations.base.Destination;
Expand All @@ -41,7 +41,6 @@
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.DestinationSyncMode;
import java.io.FileWriter;
import java.io.IOException;
Expand All @@ -59,7 +58,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CsvDestination implements Destination {
public class CsvDestination extends BaseConnector implements Destination {

private static final Logger LOGGER = LoggerFactory.getLogger(CsvDestination.class);

Expand All @@ -71,12 +70,6 @@ public CsvDestination() {
namingResolver = new StandardNameTransformer();
}

@Override
public ConnectorSpecification spec() throws IOException {
final String resourceString = MoreResources.readResource("spec.json");
return Jsons.deserialize(resourceString, ConnectorSpecification.class);
}

@Override
public AirbyteConnectionStatus check(JsonNode config) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,11 @@ protected JsonNode getFailCheckConfig() {
public void testCheckConnectionInvalidCredentials() {}

@Override
protected List<JsonNode> retrieveRecords(TestDestinationEnv testEnv, String streamName, String namespace) throws Exception {
protected List<JsonNode> retrieveRecords(TestDestinationEnv testEnv,
String streamName,
String namespace,
JsonNode streamSchema)
throws Exception {
final List<Path> allOutputs = Files.list(testEnv.getLocalRoot().resolve(RELATIVE_PATH)).collect(Collectors.toList());
final Optional<Path> streamOutput =
allOutputs.stream().filter(path -> path.getFileName().toString().contains(new StandardNameTransformer().getRawTableName(streamName)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ private void closeAndWaitForUpload() throws IOException {
LOGGER.info("All data for {} stream uploaded.", streamName);
}

public static void attemptWriteToPersistence(S3Config s3Config) {
public static void attemptS3WriteAndDelete(S3Config s3Config) {
final String outputTableName = "_airbyte_connection_test_" + UUID.randomUUID().toString().replaceAll("-", "");
attemptWriteAndDeleteS3Object(s3Config, outputTableName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,11 @@ protected JsonNode getFailCheckConfig() {
}

@Override
protected List<JsonNode> retrieveRecords(TestDestinationEnv env, String streamName, String namespace) throws Exception {
protected List<JsonNode> retrieveRecords(TestDestinationEnv env,
String streamName,
String namespace,
JsonNode streamSchema)
throws Exception {
return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namingResolver.getIdentifier(namespace))
.stream()
.map(r -> Jsons.deserialize(r.get(JavaBaseConstants.COLUMN_NAME_DATA).asText()))
Expand Down
Loading