Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace constructors in stream transfer manager helper with a builder #13253

Merged
merged 1 commit into from
May 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import io.airbyte.integrations.destination.s3.avro.AvroRecordFactory;
import io.airbyte.integrations.destination.s3.avro.JsonToAvroSchemaConverter;
import io.airbyte.integrations.destination.s3.avro.S3AvroFormatConfig;
import io.airbyte.integrations.destination.s3.util.StreamTransferManagerHelper;
import io.airbyte.integrations.destination.s3.util.StreamTransferManagerFactory;
import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
Expand Down Expand Up @@ -74,8 +74,10 @@ public GcsAvroWriter(final GcsDestinationConfig config,
objectKey);

this.avroRecordFactory = new AvroRecordFactory(schema, converter);
this.uploadManager = StreamTransferManagerHelper.getDefault(
config.getBucketName(), objectKey, s3Client, config.getFormatConfig().getPartSize());
this.uploadManager = StreamTransferManagerFactory
.create(config.getBucketName(), objectKey, s3Client)
.setPartSize(config.getFormatConfig().getPartSize())
.get();
// We only need one output stream as we only have one input stream. This is reasonably performant.
this.outputStream = uploadManager.getMultiPartOutputStreams().get(0);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import io.airbyte.integrations.destination.s3.S3Format;
import io.airbyte.integrations.destination.s3.csv.CsvSheetGenerator;
import io.airbyte.integrations.destination.s3.csv.S3CsvFormatConfig;
import io.airbyte.integrations.destination.s3.util.StreamTransferManagerHelper;
import io.airbyte.integrations.destination.s3.util.StreamTransferManagerFactory;
import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
Expand Down Expand Up @@ -56,8 +56,10 @@ public GcsCsvWriter(final GcsDestinationConfig config,
LOGGER.info("Full GCS path for stream '{}': {}/{}", stream.getName(), config.getBucketName(),
objectKey);

this.uploadManager = StreamTransferManagerHelper.getDefault(
config.getBucketName(), objectKey, s3Client, config.getFormatConfig().getPartSize());
this.uploadManager = StreamTransferManagerFactory
.create(config.getBucketName(), objectKey, s3Client)
.setPartSize(config.getFormatConfig().getPartSize())
.get();
// We only need one output stream as we only have one input stream. This is reasonably performant.
this.outputStream = uploadManager.getMultiPartOutputStreams().get(0);
this.csvPrinter = new CSVPrinter(new PrintWriter(outputStream, true, StandardCharsets.UTF_8),
Expand All @@ -71,7 +73,7 @@ public void write(final UUID id, final AirbyteRecordMessage recordMessage) throw
}

@Override
public void write(JsonNode formattedData) throws IOException {
public void write(final JsonNode formattedData) throws IOException {
csvPrinter.printRecord(csvSheetGenerator.getDataRow(formattedData));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import io.airbyte.integrations.destination.gcs.GcsDestinationConfig;
import io.airbyte.integrations.destination.gcs.writer.BaseGcsWriter;
import io.airbyte.integrations.destination.s3.S3Format;
import io.airbyte.integrations.destination.s3.util.StreamTransferManagerHelper;
import io.airbyte.integrations.destination.s3.util.StreamTransferManagerFactory;
import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
Expand Down Expand Up @@ -52,8 +52,10 @@ public GcsJsonlWriter(final GcsDestinationConfig config,
gcsFileLocation = String.format("gs://%s/%s", config.getBucketName(), objectKey);
LOGGER.info("Full GCS path for stream '{}': {}/{}", stream.getName(), config.getBucketName(), objectKey);

this.uploadManager = StreamTransferManagerHelper.getDefault(
config.getBucketName(), objectKey, s3Client, config.getFormatConfig().getPartSize());
this.uploadManager = StreamTransferManagerFactory
.create(config.getBucketName(), objectKey, s3Client)
.setPartSize(config.getFormatConfig().getPartSize())
.get();

// We only need one output stream as we only have one input stream. This is reasonably performant.
this.outputStream = uploadManager.getMultiPartOutputStreams().get(0);
Expand All @@ -70,7 +72,7 @@ public void write(final UUID id, final AirbyteRecordMessage recordMessage) {
}

@Override
public void write(JsonNode formattedData) throws IOException {
public void write(final JsonNode formattedData) throws IOException {
printWriter.println(Jsons.serialize(formattedData));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import io.airbyte.integrations.destination.s3.S3DestinationConstants;
import io.airbyte.integrations.destination.s3.S3FormatConfig;
import io.airbyte.integrations.destination.s3.avro.S3AvroFormatConfig;
import io.airbyte.integrations.destination.s3.util.StreamTransferManagerHelper;
import io.airbyte.integrations.destination.s3.util.StreamTransferManagerFactory;
import java.util.List;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileConstants;
Expand Down Expand Up @@ -116,9 +116,10 @@ public void testHandlePartSizeConfig() throws IllegalAccessException {
assertEquals("AVRO", formatConfig.getFormat().name());
assertEquals(6, formatConfig.getPartSize());
// Assert that is set properly in config
final StreamTransferManager streamTransferManager = StreamTransferManagerHelper.getDefault(
gcsDestinationConfig.getBucketName(), "objectKey", null,
gcsDestinationConfig.getFormatConfig().getPartSize());
final StreamTransferManager streamTransferManager = StreamTransferManagerFactory
.create(gcsDestinationConfig.getBucketName(), "objectKey", null)
.setPartSize(gcsDestinationConfig.getFormatConfig().getPartSize())
.get();

final Integer partSizeBytes = (Integer) FieldUtils.readField(streamTransferManager, "partSize", true);
assertEquals(MB * 6, partSizeBytes);
Expand All @@ -135,9 +136,10 @@ public void testHandleAbsenceOfPartSizeConfig() throws IllegalAccessException {
.getGcsDestinationConfig(config);
ConfigTestUtils.assertBaseConfig(gcsDestinationConfig);

final StreamTransferManager streamTransferManager = StreamTransferManagerHelper.getDefault(
gcsDestinationConfig.getBucketName(), "objectKey", null,
gcsDestinationConfig.getFormatConfig().getPartSize());
final StreamTransferManager streamTransferManager = StreamTransferManagerFactory
.create(gcsDestinationConfig.getBucketName(), "objectKey", null)
.setPartSize(gcsDestinationConfig.getFormatConfig().getPartSize())
.get();

final Integer partSizeBytes = (Integer) FieldUtils.readField(streamTransferManager, "partSize", true);
assertEquals(MB * S3DestinationConstants.DEFAULT_PART_SIZE_MB, partSizeBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import io.airbyte.integrations.destination.s3.S3DestinationConstants;
import io.airbyte.integrations.destination.s3.S3FormatConfig;
import io.airbyte.integrations.destination.s3.csv.S3CsvFormatConfig.Flattening;
import io.airbyte.integrations.destination.s3.util.StreamTransferManagerHelper;
import io.airbyte.integrations.destination.s3.util.StreamTransferManagerFactory;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -52,9 +52,10 @@ public void testHandlePartSizeConfig() throws IllegalAccessException {
assertEquals("CSV", formatConfig.getFormat().name());
assertEquals(6, formatConfig.getPartSize());
// Assert that is set properly in config
final StreamTransferManager streamTransferManager = StreamTransferManagerHelper.getDefault(
gcsDestinationConfig.getBucketName(), "objectKey", null,
gcsDestinationConfig.getFormatConfig().getPartSize());
final StreamTransferManager streamTransferManager = StreamTransferManagerFactory
.create(gcsDestinationConfig.getBucketName(), "objectKey", null)
.setPartSize(gcsDestinationConfig.getFormatConfig().getPartSize())
.get();

final Integer partSizeBytes = (Integer) FieldUtils.readField(streamTransferManager, "partSize", true);
assertEquals(MB * 6, partSizeBytes);
Expand All @@ -71,9 +72,10 @@ public void testHandleAbsenceOfPartSizeConfig() throws IllegalAccessException {
final GcsDestinationConfig gcsDestinationConfig = GcsDestinationConfig.getGcsDestinationConfig(config);
ConfigTestUtils.assertBaseConfig(gcsDestinationConfig);

final StreamTransferManager streamTransferManager = StreamTransferManagerHelper.getDefault(
gcsDestinationConfig.getBucketName(), "objectKey", null,
gcsDestinationConfig.getFormatConfig().getPartSize());
final StreamTransferManager streamTransferManager = StreamTransferManagerFactory
.create(gcsDestinationConfig.getBucketName(), "objectKey", null)
.setPartSize(gcsDestinationConfig.getFormatConfig().getPartSize())
.get();

final Integer partSizeBytes = (Integer) FieldUtils.readField(streamTransferManager, "partSize", true);
assertEquals(MB * S3DestinationConstants.DEFAULT_PART_SIZE_MB, partSizeBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import io.airbyte.integrations.destination.gcs.util.ConfigTestUtils;
import io.airbyte.integrations.destination.s3.S3DestinationConstants;
import io.airbyte.integrations.destination.s3.S3FormatConfig;
import io.airbyte.integrations.destination.s3.util.StreamTransferManagerHelper;
import io.airbyte.integrations.destination.s3.util.StreamTransferManagerFactory;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
Expand All @@ -39,9 +39,10 @@ public void testHandlePartSizeConfig() throws IllegalAccessException {
assertEquals(6, formatConfig.getPartSize());

// Assert that is set properly in config
final StreamTransferManager streamTransferManager = StreamTransferManagerHelper.getDefault(
gcsDestinationConfig.getBucketName(), "objectKey", null,
gcsDestinationConfig.getFormatConfig().getPartSize());
final StreamTransferManager streamTransferManager = StreamTransferManagerFactory
.create(gcsDestinationConfig.getBucketName(), "objectKey", null)
.setPartSize(gcsDestinationConfig.getFormatConfig().getPartSize())
.get();

final Integer partSizeBytes = (Integer) FieldUtils.readField(streamTransferManager, "partSize", true);
assertEquals(MB * 6, partSizeBytes);
Expand All @@ -58,9 +59,10 @@ public void testHandleAbsenceOfPartSizeConfig() throws IllegalAccessException {
.getGcsDestinationConfig(config);
ConfigTestUtils.assertBaseConfig(gcsDestinationConfig);

final StreamTransferManager streamTransferManager = StreamTransferManagerHelper.getDefault(
gcsDestinationConfig.getBucketName(), "objectKey", null,
gcsDestinationConfig.getFormatConfig().getPartSize());
final StreamTransferManager streamTransferManager = StreamTransferManagerFactory
.create(gcsDestinationConfig.getBucketName(), "objectKey", null)
.setPartSize(gcsDestinationConfig.getFormatConfig().getPartSize())
.get();

final Integer partSizeBytes = (Integer) FieldUtils.readField(streamTransferManager, "partSize", true);
assertEquals(MB * S3DestinationConstants.DEFAULT_PART_SIZE_MB, partSizeBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import io.airbyte.integrations.destination.NamingConventionTransformer;
import io.airbyte.integrations.destination.record_buffer.FileBuffer;
import io.airbyte.integrations.destination.s3.util.S3NameTransformer;
import io.airbyte.integrations.destination.s3.util.StreamTransferManagerHelper;
import io.airbyte.integrations.destination.s3.util.StreamTransferManagerFactory;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.AirbyteMessage;
Expand Down Expand Up @@ -77,7 +77,7 @@ public AirbyteConnectionStatus check(final JsonNode config) {
}
}

public static void testSingleUpload(final AmazonS3 s3Client, final String bucketName, String bucketPath) {
public static void testSingleUpload(final AmazonS3 s3Client, final String bucketName, final String bucketPath) {
LOGGER.info("Started testing if all required credentials assigned to user for single file uploading");
if (bucketPath.endsWith("/")) {
throw new RuntimeException("Bucket Path should not end with /");
Expand All @@ -91,17 +91,13 @@ public static void testSingleUpload(final AmazonS3 s3Client, final String bucket
LOGGER.info("Finished checking for normal upload mode");
}

public static void testMultipartUpload(final AmazonS3 s3Client, final String bucketName, String bucketPath) throws IOException {
public static void testMultipartUpload(final AmazonS3 s3Client, final String bucketName, final String bucketPath) throws IOException {
LOGGER.info("Started testing if all required credentials assigned to user for multipart upload");
if (bucketPath.endsWith("/")) {
throw new RuntimeException("Bucket Path should not end with /");
}
final String testFile = bucketPath + "/" + "test_" + System.currentTimeMillis();
final StreamTransferManager manager = StreamTransferManagerHelper.getDefault(
bucketName,
testFile,
s3Client,
(long) StreamTransferManagerHelper.DEFAULT_PART_SIZE_MB);
final StreamTransferManager manager = StreamTransferManagerFactory.create(bucketName, testFile, s3Client).get();
boolean success = false;
try (final MultiPartOutputStream outputStream = manager.getMultiPartOutputStreams().get(0);
final CSVPrinter csvPrinter = new CSVPrinter(new PrintWriter(outputStream, true, StandardCharsets.UTF_8), CSVFormat.DEFAULT)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import io.airbyte.commons.string.Strings;
import io.airbyte.integrations.destination.NamingConventionTransformer;
import io.airbyte.integrations.destination.record_buffer.SerializableBuffer;
import io.airbyte.integrations.destination.s3.util.StreamTransferManagerHelper;
import io.airbyte.integrations.destination.s3.util.StreamTransferManagerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand Down Expand Up @@ -141,8 +141,10 @@ private String loadDataIntoBucket(final String objectPath, final SerializableBuf
for (final BlobDecorator blobDecorator : blobDecorators) {
blobDecorator.updateMetadata(metadata, getMetadataMapping());
}
final StreamTransferManager uploadManager = StreamTransferManagerHelper
.getDefault(bucket, fullObjectKey, s3Client, partSize, metadata)
final StreamTransferManager uploadManager = StreamTransferManagerFactory.create(bucket, fullObjectKey, s3Client)
.setPartSize(partSize)
.setUserMetadata(metadata)
.get()
.checkIntegrity(true)
.numUploadThreads(DEFAULT_UPLOAD_THREADS)
.queueCapacity(DEFAULT_QUEUE_CAPACITY);
Expand Down Expand Up @@ -293,7 +295,7 @@ protected Map<String, String> getMetadataMapping() {
AesCbcEnvelopeEncryptionBlobDecorator.INITIALIZATION_VECTOR, "x-amz-iv");
}

public void uploadManifest(String bucketName, String manifestFilePath, String manifestContents) {
public void uploadManifest(final String bucketName, final String manifestFilePath, final String manifestContents) {
s3Client.putObject(s3Config.getBucketName(), manifestFilePath, manifestContents);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.integrations.destination.s3.S3Format;
import io.airbyte.integrations.destination.s3.util.StreamTransferManagerHelper;
import io.airbyte.integrations.destination.s3.util.StreamTransferManagerFactory;
import io.airbyte.integrations.destination.s3.writer.BaseS3Writer;
import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter;
import io.airbyte.protocol.models.AirbyteRecordMessage;
Expand Down Expand Up @@ -54,8 +54,10 @@ public S3AvroWriter(final S3DestinationConfig config,
gcsFileLocation = String.format("gs://%s/%s", config.getBucketName(), objectKey);

this.avroRecordFactory = new AvroRecordFactory(schema, converter);
this.uploadManager = StreamTransferManagerHelper.getDefault(
config.getBucketName(), objectKey, s3Client, config.getFormatConfig().getPartSize());
this.uploadManager = StreamTransferManagerFactory
.create(config.getBucketName(), objectKey, s3Client)
.setPartSize(config.getFormatConfig().getPartSize())
.get();
// We only need one output stream as we only have one input stream. This is reasonably performant.
this.outputStream = uploadManager.getMultiPartOutputStreams().get(0);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.integrations.destination.s3.S3Format;
import io.airbyte.integrations.destination.s3.util.StreamTransferManagerHelper;
import io.airbyte.integrations.destination.s3.util.StreamTransferManagerFactory;
import io.airbyte.integrations.destination.s3.writer.BaseS3Writer;
import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter;
import io.airbyte.protocol.models.AirbyteRecordMessage;
Expand Down Expand Up @@ -59,7 +59,10 @@ private S3CsvWriter(final S3DestinationConfig config,
objectKey);
gcsFileLocation = String.format("gs://%s/%s", config.getBucketName(), objectKey);

this.uploadManager = StreamTransferManagerHelper.getDefault(config.getBucketName(), objectKey, s3Client, config.getFormatConfig().getPartSize())
this.uploadManager = StreamTransferManagerFactory
.create(config.getBucketName(), objectKey, s3Client)
.setPartSize(config.getFormatConfig().getPartSize())
.get()
.numUploadThreads(uploadThreads)
.queueCapacity(queueCapacity);
// We only need one output stream as we only have one input stream. This is reasonably performant.
Expand All @@ -76,8 +79,8 @@ public static class Builder {
private final AmazonS3 s3Client;
private final ConfiguredAirbyteStream configuredStream;
private final Timestamp uploadTimestamp;
private int uploadThreads = StreamTransferManagerHelper.DEFAULT_UPLOAD_THREADS;
private int queueCapacity = StreamTransferManagerHelper.DEFAULT_QUEUE_CAPACITY;
private int uploadThreads = StreamTransferManagerFactory.DEFAULT_UPLOAD_THREADS;
private int queueCapacity = StreamTransferManagerFactory.DEFAULT_QUEUE_CAPACITY;
private boolean withHeader = true;
private CSVFormat csvSettings = CSVFormat.DEFAULT.withQuoteMode(QuoteMode.ALL);
private CsvSheetGenerator csvSheetGenerator;
Expand Down
Loading