Skip to content

Commit

Permalink
🐛 fix s3/gcs bucket cleanup (#11728)
Browse files Browse the repository at this point in the history
* Restrict bucket clean up

* bumpversion

* Fix GCS acceptance tests
  • Loading branch information
ChristopheDuong authored Apr 5, 2022
1 parent 0fa9f12 commit 393ba35
Show file tree
Hide file tree
Showing 18 changed files with 340 additions and 90 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/destination-gcs/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-gcs

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.2.1
LABEL io.airbyte.version=0.2.2
LABEL io.airbyte.name=airbyte/destination-gcs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion;
import com.amazonaws.services.s3.model.ObjectListing;
import io.airbyte.integrations.destination.NamingConventionTransformer;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.integrations.destination.s3.S3StorageOperations;
Expand All @@ -29,24 +28,10 @@ public GcsStorageOperations(final NamingConventionTransformer nameTransformer,
* difference is that the AmazonS3#deleteObjects method is replaced with AmazonS3#deleteObject.
*/
@Override
public void cleanUpBucketObject(final String objectPath, final List<String> stagedFiles) {
final String bucket = s3Config.getBucketName();
ObjectListing objects = s3Client.listObjects(bucket, objectPath);
while (objects.getObjectSummaries().size() > 0) {
final List<KeyVersion> keysToDelete = objects.getObjectSummaries()
.stream()
.map(obj -> new KeyVersion(obj.getKey()))
.filter(obj -> stagedFiles.isEmpty() || stagedFiles.contains(obj.getKey()))
.toList();
for (final KeyVersion keyToDelete : keysToDelete) {
s3Client.deleteObject(bucket, keyToDelete.getKey());
}
LOGGER.info("Storage bucket {} has been cleaned-up ({} objects were deleted)...", objectPath, keysToDelete.size());
if (objects.isTruncated()) {
objects = s3Client.listNextBatchOfObjects(objects);
} else {
break;
}
protected void cleanUpObjects(final String bucket, final List<KeyVersion> keysToDelete) {
for (final KeyVersion keyToDelete : keysToDelete) {
LOGGER.info("Deleting object {}", keyToDelete.getKey());
s3Client.deleteObject(bucket, keyToDelete.getKey());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,7 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "GCS Destination Spec",
"type": "object",
"required": [
"gcs_bucket_name",
"gcs_bucket_path",
"credential",
"format"
],
"required": ["gcs_bucket_name", "gcs_bucket_path", "credential", "format"],
"additionalProperties": false,
"properties": {
"gcs_bucket_name": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@

import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.json.Jsons;
Expand All @@ -18,6 +20,9 @@
import io.airbyte.integrations.destination.s3.avro.JsonFieldNameUpdater;
import io.airbyte.integrations.destination.s3.util.AvroRecordHelper;
import io.airbyte.integrations.standardtest.destination.DateTimeUtils;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
Expand All @@ -28,6 +33,7 @@
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.commons.lang3.StringUtils;

public class GcsAvroDestinationAcceptanceTest extends GcsDestinationAcceptanceTest {

Expand Down Expand Up @@ -79,13 +85,13 @@ public boolean requiresDateTimeConversionForSync() {
}

@Override
public void convertDateTime(ObjectNode data, Map<String, String> dateTimeFieldNames) {
for (String path : dateTimeFieldNames.keySet()) {
public void convertDateTime(final ObjectNode data, final Map<String, String> dateTimeFieldNames) {
for (final String path : dateTimeFieldNames.keySet()) {
if (!data.at(path).isMissingNode() && DateTimeUtils.isDateTimeValue(data.at(path).asText())) {
var pathFields = new ArrayList<>(Arrays.asList(path.split("/")));
final var pathFields = new ArrayList<>(Arrays.asList(path.split("/")));
pathFields.remove(0); // first element always empty string
// if pathFields.size() == 1 -> /field else /field/nestedField..
var pathWithoutLastField = pathFields.size() == 1 ? "/" + pathFields.get(0)
final var pathWithoutLastField = pathFields.size() == 1 ? "/" + pathFields.get(0)
: "/" + String.join("/", pathFields.subList(0, pathFields.size() - 1));
switch (dateTimeFieldNames.get(path)) {
case DATE_TIME -> {
Expand Down Expand Up @@ -114,4 +120,28 @@ public void convertDateTime(ObjectNode data, Map<String, String> dateTimeFieldNa
}
}

@Override
protected void deserializeNestedObjects(final List<AirbyteMessage> messages, final List<AirbyteRecordMessage> actualMessages) {
for (final AirbyteMessage message : messages) {
if (message.getType() == Type.RECORD) {
final var iterator = message.getRecord().getData().fieldNames();
while (iterator.hasNext()) {
final var fieldName = iterator.next();
if (message.getRecord().getData().get(fieldName).isContainerNode()) {
message.getRecord().getData().get(fieldName).fieldNames().forEachRemaining(f -> {
final var data = message.getRecord().getData().get(fieldName).get(f);
final var wrappedData = String.format("{\"%s\":%s,\"_airbyte_additional_properties\":null}", f,
dateTimeFieldNames.containsKey(f) || !data.isTextual() ? data.asText() : StringUtils.wrap(data.asText(), "\""));
try {
((ObjectNode) message.getRecord().getData()).set(fieldName, new ObjectMapper().readTree(wrappedData));
} catch (final JsonProcessingException e) {
e.printStackTrace();
}
});
}
}
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.s3.S3Format;
import io.airbyte.integrations.destination.s3.csv.S3CsvFormatConfig.Flattening;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
Expand Down Expand Up @@ -109,4 +112,15 @@ protected List<JsonNode> retrieveRecords(final TestDestinationEnv testEnv,
return jsonRecords;
}

@Override
protected void retrieveRawRecordsAndAssertSameMessages(final AirbyteCatalog catalog,
final List<AirbyteMessage> messages,
final String defaultSchema)
throws Exception {
final List<AirbyteRecordMessage> actualMessages = retrieveRawRecords(catalog, defaultSchema);
deserializeNestedObjects(messages, actualMessages);

assertSameMessages(messages, actualMessages, false);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@

import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.json.Jsons;
Expand All @@ -19,6 +21,9 @@
import io.airbyte.integrations.destination.s3.avro.JsonFieldNameUpdater;
import io.airbyte.integrations.destination.s3.util.AvroRecordHelper;
import io.airbyte.integrations.standardtest.destination.DateTimeUtils;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
Expand All @@ -28,6 +33,7 @@
import java.util.List;
import java.util.Map;
import org.apache.avro.generic.GenericData;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.hadoop.ParquetReader;
Expand Down Expand Up @@ -85,13 +91,13 @@ public boolean requiresDateTimeConversionForSync() {
}

@Override
public void convertDateTime(ObjectNode data, Map<String, String> dateTimeFieldNames) {
for (String path : dateTimeFieldNames.keySet()) {
public void convertDateTime(final ObjectNode data, final Map<String, String> dateTimeFieldNames) {
for (final String path : dateTimeFieldNames.keySet()) {
if (!data.at(path).isMissingNode() && DateTimeUtils.isDateTimeValue(data.at(path).asText())) {
var pathFields = new ArrayList<>(Arrays.asList(path.split("/")));
final var pathFields = new ArrayList<>(Arrays.asList(path.split("/")));
pathFields.remove(0); // first element always empty string
// if pathFields.size() == 1 -> /field else /field/nestedField..
var pathWithoutLastField = pathFields.size() == 1 ? "/" + pathFields.get(0)
final var pathWithoutLastField = pathFields.size() == 1 ? "/" + pathFields.get(0)
: "/" + String.join("/", pathFields.subList(0, pathFields.size() - 1));
switch (dateTimeFieldNames.get(path)) {
case DATE_TIME -> {
Expand All @@ -103,7 +109,6 @@ public void convertDateTime(ObjectNode data, Map<String, String> dateTimeFieldNa
((ObjectNode) data.at(pathWithoutLastField)).put(
pathFields.get(pathFields.size() - 1),
(DateTimeUtils.getEpochMicros(data.at(path).asText()) / 1000) * 1000);
((ObjectNode) data.at(pathWithoutLastField)).set("_airbyte_additional_properties", null);
}
}
case DATE -> {
Expand All @@ -113,12 +118,35 @@ public void convertDateTime(ObjectNode data, Map<String, String> dateTimeFieldNa
} else {
((ObjectNode) data.at(pathWithoutLastField)).put(pathFields.get(pathFields.size() - 1),
DateTimeUtils.getEpochDay((data.at(path).asText())));
((ObjectNode) data.at(pathWithoutLastField)).set("_airbyte_additional_properties", null);
}
}
}
}
}
}

@Override
protected void deserializeNestedObjects(final List<AirbyteMessage> messages, final List<AirbyteRecordMessage> actualMessages) {
for (final AirbyteMessage message : messages) {
if (message.getType() == Type.RECORD) {
final var iterator = message.getRecord().getData().fieldNames();
while (iterator.hasNext()) {
final var fieldName = iterator.next();
if (message.getRecord().getData().get(fieldName).isContainerNode()) {
message.getRecord().getData().get(fieldName).fieldNames().forEachRemaining(f -> {
final var data = message.getRecord().getData().get(fieldName).get(f);
final var wrappedData = String.format("{\"%s\":%s,\"_airbyte_additional_properties\":null}", f,
dateTimeFieldNames.containsKey(f) || !data.isTextual() ? data.asText() : StringUtils.wrap(data.asText(), "\""));
try {
((ObjectNode) message.getRecord().getData()).set(fieldName, new ObjectMapper().readTree(wrappedData));
} catch (final JsonProcessingException e) {
e.printStackTrace();
}
});
}
}
}
}
}

}
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/destination-s3/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-s3

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.3.0
LABEL io.airbyte.version=0.3.1
LABEL io.airbyte.name=airbyte/destination-s3
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@ public interface BlobStorageOperations {
String getBucketObjectPath(String namespace, String streamName, DateTime writeDatetime, String customFormat);

/**
* Create a storage object where to store data in the destination for a @param streamName using
* location of @param objectPath
* Create a storage object where to store data in the destination for a @param objectPath
*/
void createBucketObjectIfNotExists(String streamName) throws Exception;
void createBucketObjectIfNotExists(String objectPath) throws Exception;

/**
* Upload the data files into the storage area.
Expand All @@ -29,9 +28,11 @@ public interface BlobStorageOperations {
/**
* Remove files that were just stored in the bucket
*/
void cleanUpBucketObject(String streamName, List<String> stagedFiles) throws Exception;
void cleanUpBucketObject(String objectPath, List<String> stagedFiles) throws Exception;

void dropBucketObject(String streamName);
void cleanUpBucketObject(String namespace, String StreamName, String objectPath, String pathFormat);

void dropBucketObject(String objectPath);

boolean isValidData(JsonNode jsonNode);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ private static Function<ConfiguredAirbyteStream, WriteConfig> toWriteConfig(
final String customOutputFormat = String.join("/", bucketPath, s3Config.getPathFormat());
final String fullOutputPath = storageOperations.getBucketObjectPath(namespace, streamName, SYNC_DATETIME, customOutputFormat);
final DestinationSyncMode syncMode = stream.getDestinationSyncMode();
final WriteConfig writeConfig = new WriteConfig(namespace, streamName, bucketPath, fullOutputPath, syncMode);
final WriteConfig writeConfig = new WriteConfig(namespace, streamName, bucketPath, customOutputFormat, fullOutputPath, syncMode);
LOGGER.info("Write config: {}", writeConfig);
return writeConfig;
};
Expand All @@ -95,10 +95,16 @@ private OnStartFunction onStartFunction(final BlobStorageOperations storageOpera
final String namespace = writeConfig.getNamespace();
final String stream = writeConfig.getStreamName();
final String outputBucketPath = writeConfig.getOutputBucketPath();
LOGGER.info("Clearing storage area in destination started for namespace {} stream {} bucketObject {}", namespace, stream, outputBucketPath);
final String pathFormat = writeConfig.getPathFormat();
LOGGER.info("Clearing storage area in destination started for namespace {} stream {} bucketObject {} pathFormat {}",
namespace, stream, outputBucketPath, pathFormat);
AirbyteSentry.executeWithTracing("PrepareStreamStorage",
() -> storageOperations.dropBucketObject(outputBucketPath),
Map.of("namespace", Objects.requireNonNullElse(namespace, "null"), "stream", stream, "storage", outputBucketPath));
() -> storageOperations.cleanUpBucketObject(namespace, stream, outputBucketPath, pathFormat),
Map.of(
"namespace", Objects.requireNonNullElse(namespace, "null"),
"stream", stream,
"storage", outputBucketPath,
"pathFormat", pathFormat));
LOGGER.info("Clearing storage area in destination completed for namespace {} stream {} bucketObject {}", namespace, stream,
outputBucketPath);
}
Expand Down
Loading

0 comments on commit 393ba35

Please sign in to comment.