Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 fix s3/gcs bucket cleanup #11728

Merged
merged 14 commits into from
Apr 5, 2022
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/destination-gcs/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-gcs

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.2.1
LABEL io.airbyte.version=0.2.2
LABEL io.airbyte.name=airbyte/destination-gcs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.gcs;

import io.airbyte.integrations.destination.s3.util.S3NameTransformer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion;
import com.amazonaws.services.s3.model.ObjectListing;
import io.airbyte.integrations.destination.NamingConventionTransformer;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.integrations.destination.s3.S3StorageOperations;
Expand All @@ -25,29 +24,14 @@ public GcsStorageOperations(final NamingConventionTransformer nameTransformer,
}

/**
* This method is overridden because GCS doesn't accept request to delete multiple objects. The
* only difference is that the AmazonS3#deleteObjects method is replaced with
* AmazonS3#deleteObject.
* This method is overridden because GCS doesn't accept request to delete multiple objects. The only
* difference is that the AmazonS3#deleteObjects method is replaced with AmazonS3#deleteObject.
*/
@Override
public void cleanUpBucketObject(final String objectPath, final List<String> stagedFiles) {
final String bucket = s3Config.getBucketName();
ObjectListing objects = s3Client.listObjects(bucket, objectPath);
while (objects.getObjectSummaries().size() > 0) {
final List<KeyVersion> keysToDelete = objects.getObjectSummaries()
.stream()
.map(obj -> new KeyVersion(obj.getKey()))
.filter(obj -> stagedFiles.isEmpty() || stagedFiles.contains(obj.getKey()))
.toList();
for (final KeyVersion keyToDelete : keysToDelete) {
s3Client.deleteObject(bucket, keyToDelete.getKey());
}
LOGGER.info("Storage bucket {} has been cleaned-up ({} objects were deleted)...", objectPath, keysToDelete.size());
if (objects.isTruncated()) {
objects = s3Client.listNextBatchOfObjects(objects);
} else {
break;
}
protected void cleanUpObjects(final String bucket, final List<KeyVersion> keysToDelete) {
for (final KeyVersion keyToDelete : keysToDelete) {
LOGGER.info("Deleting object {}", keyToDelete.getKey());
s3Client.deleteObject(bucket, keyToDelete.getKey());
davinchia marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,7 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "GCS Destination Spec",
"type": "object",
"required": [
"gcs_bucket_name",
"gcs_bucket_path",
"credential",
"format"
],
"required": ["gcs_bucket_name", "gcs_bucket_path", "credential", "format"],
"additionalProperties": false,
"properties": {
"gcs_bucket_name": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@

import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.json.Jsons;
Expand All @@ -18,6 +20,9 @@
import io.airbyte.integrations.destination.s3.avro.JsonFieldNameUpdater;
import io.airbyte.integrations.destination.s3.util.AvroRecordHelper;
import io.airbyte.integrations.standardtest.destination.DateTimeUtils;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
Expand All @@ -28,6 +33,7 @@
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.commons.lang3.StringUtils;

public class GcsAvroDestinationAcceptanceTest extends GcsDestinationAcceptanceTest {

Expand Down Expand Up @@ -79,13 +85,13 @@ public boolean requiresDateTimeConversionForSync() {
}

@Override
public void convertDateTime(ObjectNode data, Map<String, String> dateTimeFieldNames) {
for (String path : dateTimeFieldNames.keySet()) {
public void convertDateTime(final ObjectNode data, final Map<String, String> dateTimeFieldNames) {
for (final String path : dateTimeFieldNames.keySet()) {
if (!data.at(path).isMissingNode() && DateTimeUtils.isDateTimeValue(data.at(path).asText())) {
var pathFields = new ArrayList<>(Arrays.asList(path.split("/")));
final var pathFields = new ArrayList<>(Arrays.asList(path.split("/")));
pathFields.remove(0); // first element always empty string
// if pathFields.size() == 1 -> /field else /field/nestedField..
var pathWithoutLastField = pathFields.size() == 1 ? "/" + pathFields.get(0)
final var pathWithoutLastField = pathFields.size() == 1 ? "/" + pathFields.get(0)
: "/" + String.join("/", pathFields.subList(0, pathFields.size() - 1));
switch (dateTimeFieldNames.get(path)) {
case DATE_TIME -> {
Expand Down Expand Up @@ -114,4 +120,28 @@ public void convertDateTime(ObjectNode data, Map<String, String> dateTimeFieldNa
}
}

@Override
protected void deserializeNestedObjects(final List<AirbyteMessage> messages, final List<AirbyteRecordMessage> actualMessages) {
for (final AirbyteMessage message : messages) {
if (message.getType() == Type.RECORD) {
final var iterator = message.getRecord().getData().fieldNames();
while (iterator.hasNext()) {
final var fieldName = iterator.next();
if (message.getRecord().getData().get(fieldName).isContainerNode()) {
message.getRecord().getData().get(fieldName).fieldNames().forEachRemaining(f -> {
final var data = message.getRecord().getData().get(fieldName).get(f);
final var wrappedData = String.format("{\"%s\":%s,\"_airbyte_additional_properties\":null}", f,
dateTimeFieldNames.containsKey(f) || !data.isTextual() ? data.asText() : StringUtils.wrap(data.asText(), "\""));
try {
((ObjectNode) message.getRecord().getData()).set(fieldName, new ObjectMapper().readTree(wrappedData));
} catch (final JsonProcessingException e) {
e.printStackTrace();
}
});
}
}
}
}
}

Comment on lines +124 to +146
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These seemed to be missing from #9816 for these tests to pass...

Same thing in all the other acceptanceTest files from this PR

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.s3.S3Format;
import io.airbyte.integrations.destination.s3.csv.S3CsvFormatConfig.Flattening;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
Expand Down Expand Up @@ -109,4 +112,15 @@ protected List<JsonNode> retrieveRecords(final TestDestinationEnv testEnv,
return jsonRecords;
}

@Override
protected void retrieveRawRecordsAndAssertSameMessages(final AirbyteCatalog catalog,
final List<AirbyteMessage> messages,
final String defaultSchema)
throws Exception {
final List<AirbyteRecordMessage> actualMessages = retrieveRawRecords(catalog, defaultSchema);
deserializeNestedObjects(messages, actualMessages);

assertSameMessages(messages, actualMessages, false);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.StandardCheckConnectionOutput.Status;
import io.airbyte.integrations.destination.NamingConventionTransformer;
import io.airbyte.integrations.destination.s3.S3DestinationConstants;
import io.airbyte.integrations.destination.s3.S3Format;
import io.airbyte.integrations.destination.s3.S3FormatConfig;
import io.airbyte.integrations.destination.s3.S3StorageOperations;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@

import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.json.Jsons;
Expand All @@ -19,6 +21,9 @@
import io.airbyte.integrations.destination.s3.avro.JsonFieldNameUpdater;
import io.airbyte.integrations.destination.s3.util.AvroRecordHelper;
import io.airbyte.integrations.standardtest.destination.DateTimeUtils;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
Expand All @@ -28,6 +33,7 @@
import java.util.List;
import java.util.Map;
import org.apache.avro.generic.GenericData;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.hadoop.ParquetReader;
Expand Down Expand Up @@ -85,13 +91,13 @@ public boolean requiresDateTimeConversionForSync() {
}

@Override
public void convertDateTime(ObjectNode data, Map<String, String> dateTimeFieldNames) {
for (String path : dateTimeFieldNames.keySet()) {
public void convertDateTime(final ObjectNode data, final Map<String, String> dateTimeFieldNames) {
for (final String path : dateTimeFieldNames.keySet()) {
if (!data.at(path).isMissingNode() && DateTimeUtils.isDateTimeValue(data.at(path).asText())) {
var pathFields = new ArrayList<>(Arrays.asList(path.split("/")));
final var pathFields = new ArrayList<>(Arrays.asList(path.split("/")));
pathFields.remove(0); // first element always empty string
// if pathFields.size() == 1 -> /field else /field/nestedField..
var pathWithoutLastField = pathFields.size() == 1 ? "/" + pathFields.get(0)
final var pathWithoutLastField = pathFields.size() == 1 ? "/" + pathFields.get(0)
: "/" + String.join("/", pathFields.subList(0, pathFields.size() - 1));
switch (dateTimeFieldNames.get(path)) {
case DATE_TIME -> {
Expand All @@ -103,7 +109,6 @@ public void convertDateTime(ObjectNode data, Map<String, String> dateTimeFieldNa
((ObjectNode) data.at(pathWithoutLastField)).put(
pathFields.get(pathFields.size() - 1),
(DateTimeUtils.getEpochMicros(data.at(path).asText()) / 1000) * 1000);
((ObjectNode) data.at(pathWithoutLastField)).set("_airbyte_additional_properties", null);
}
}
case DATE -> {
Expand All @@ -113,12 +118,35 @@ public void convertDateTime(ObjectNode data, Map<String, String> dateTimeFieldNa
} else {
((ObjectNode) data.at(pathWithoutLastField)).put(pathFields.get(pathFields.size() - 1),
DateTimeUtils.getEpochDay((data.at(path).asText())));
((ObjectNode) data.at(pathWithoutLastField)).set("_airbyte_additional_properties", null);
}
}
}
}
}
}

@Override
protected void deserializeNestedObjects(final List<AirbyteMessage> messages, final List<AirbyteRecordMessage> actualMessages) {
for (final AirbyteMessage message : messages) {
if (message.getType() == Type.RECORD) {
final var iterator = message.getRecord().getData().fieldNames();
while (iterator.hasNext()) {
final var fieldName = iterator.next();
if (message.getRecord().getData().get(fieldName).isContainerNode()) {
message.getRecord().getData().get(fieldName).fieldNames().forEachRemaining(f -> {
final var data = message.getRecord().getData().get(fieldName).get(f);
final var wrappedData = String.format("{\"%s\":%s,\"_airbyte_additional_properties\":null}", f,
dateTimeFieldNames.containsKey(f) || !data.isTextual() ? data.asText() : StringUtils.wrap(data.asText(), "\""));
try {
((ObjectNode) message.getRecord().getData()).set(fieldName, new ObjectMapper().readTree(wrappedData));
} catch (final JsonProcessingException e) {
e.printStackTrace();
}
});
}
}
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-s3

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.3.0
LABEL io.airbyte.version=0.3.1
LABEL io.airbyte.name=airbyte/destination-s3
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@ public interface BlobStorageOperations {
String getBucketObjectPath(String namespace, String streamName, DateTime writeDatetime, String customFormat);

/**
* Create a storage object where to store data in the destination for a @param streamName using
* location of @param objectPath
* Create a storage object where to store data in the destination for a @param objectPath
*/
void createBucketObjectIfNotExists(String streamName) throws Exception;
void createBucketObjectIfNotExists(String objectPath) throws Exception;

/**
* Upload the data files into the storage area.
Expand All @@ -29,9 +28,11 @@ public interface BlobStorageOperations {
/**
* Remove files that were just stored in the bucket
*/
void cleanUpBucketObject(String streamName, List<String> stagedFiles) throws Exception;
void cleanUpBucketObject(String objectPath, List<String> stagedFiles) throws Exception;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a better name


void dropBucketObject(String streamName);
void cleanUpBucketObject(String namespace, String StreamName, String objectPath, String pathFormat);

void dropBucketObject(String objectPath);

boolean isValidData(JsonNode jsonNode);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ private static Function<ConfiguredAirbyteStream, WriteConfig> toWriteConfig(
final String customOutputFormat = String.join("/", bucketPath, s3Config.getPathFormat());
final String fullOutputPath = storageOperations.getBucketObjectPath(namespace, streamName, SYNC_DATETIME, customOutputFormat);
final DestinationSyncMode syncMode = stream.getDestinationSyncMode();
final WriteConfig writeConfig = new WriteConfig(namespace, streamName, bucketPath, fullOutputPath, syncMode);
final WriteConfig writeConfig = new WriteConfig(namespace, streamName, bucketPath, customOutputFormat, fullOutputPath, syncMode);
LOGGER.info("Write config: {}", writeConfig);
return writeConfig;
};
Expand All @@ -95,10 +95,16 @@ private OnStartFunction onStartFunction(final BlobStorageOperations storageOpera
final String namespace = writeConfig.getNamespace();
final String stream = writeConfig.getStreamName();
final String outputBucketPath = writeConfig.getOutputBucketPath();
LOGGER.info("Clearing storage area in destination started for namespace {} stream {} bucketObject {}", namespace, stream, outputBucketPath);
final String pathFormat = writeConfig.getPathFormat();
LOGGER.info("Clearing storage area in destination started for namespace {} stream {} bucketObject {} pathFormat {}",
namespace, stream, outputBucketPath, pathFormat);
AirbyteSentry.executeWithTracing("PrepareStreamStorage",
() -> storageOperations.dropBucketObject(outputBucketPath),
Map.of("namespace", Objects.requireNonNullElse(namespace, "null"), "stream", stream, "storage", outputBucketPath));
() -> storageOperations.cleanUpBucketObject(namespace, stream, outputBucketPath, pathFormat),
Map.of(
"namespace", Objects.requireNonNullElse(namespace, "null"),
"stream", stream,
"storage", outputBucketPath,
"pathFormat", pathFormat));
LOGGER.info("Clearing storage area in destination completed for namespace {} stream {} bucketObject {}", namespace, stream,
outputBucketPath);
}
Expand Down
Loading