diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/compression/AutomaticCompressionEngine.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/compression/AutomaticCompressionEngine.java index 0b119f2174..8fce5aab7e 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/compression/AutomaticCompressionEngine.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/compression/AutomaticCompressionEngine.java @@ -5,18 +5,25 @@ package org.opensearch.dataprepper.plugins.source.compression; +import org.opensearch.dataprepper.plugins.source.configuration.CompressionOption; + import java.io.IOException; import java.io.InputStream; -import java.util.zip.GZIPInputStream; public class AutomaticCompressionEngine implements CompressionEngine { @Override public InputStream createInputStream(final String s3Key, final InputStream responseInputStream) throws IOException { + return getCompressionOption(s3Key) + .getEngine() + .createInputStream(s3Key, responseInputStream); + } + + private CompressionOption getCompressionOption(final String s3Key) { if (s3Key.endsWith(".gz")) { - return new GZIPInputStream(responseInputStream); - } - else { - return responseInputStream; + return CompressionOption.GZIP; + } else { + return CompressionOption.NONE; } + } } diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/compression/AutomaticCompressionEngineTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/compression/AutomaticCompressionEngineTest.java index 5fcc11d371..464ddb722f 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/compression/AutomaticCompressionEngineTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/compression/AutomaticCompressionEngineTest.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.source.compression; +import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import software.amazon.awssdk.core.ResponseInputStream; @@ -16,7 +17,6 @@ import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.UUID; -import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; import static org.hamcrest.CoreMatchers.instanceOf; @@ -38,19 +38,22 @@ void setUp() { responseInputStream = mock(ResponseInputStream.class); } + private AutomaticCompressionEngine createObjectUnderTest() { + return new AutomaticCompressionEngine(); + } + @Test void createInputStream_with_automatic_and_uncompressed_should_return_instance_of_ResponseInputStream() throws IOException { - compressionEngine = new AutomaticCompressionEngine(); when(responseInputStream.response()).thenReturn(mock(GetObjectResponse.class)); - final InputStream inputStream = compressionEngine.createInputStream(s3Key, responseInputStream); + final InputStream inputStream = createObjectUnderTest().createInputStream(s3Key, responseInputStream); assertThat(inputStream, sameInstance(responseInputStream)); verifyNoInteractions(responseInputStream); } @Test void createInputStream_with_automatic_and_compressed_should_return_instance_of_GZIPInputStream() throws IOException { - compressionEngine = new AutomaticCompressionEngine(); + compressionEngine = createObjectUnderTest(); s3Key = s3Key.concat(".gz"); final String testString = UUID.randomUUID().toString(); @@ -65,7 +68,7 @@ void createInputStream_with_automatic_and_compressed_should_return_instance_of_G final InputStream inputStream = compressionEngine.createInputStream(s3Key, byteInStream); - assertThat(inputStream, instanceOf(GZIPInputStream.class)); + assertThat(inputStream, instanceOf(GzipCompressorInputStream.class)); assertThat(inputStream.readAllBytes(), equalTo(testStringBytes)); } }