diff --git a/data-prepper-plugins/s3-source/build.gradle b/data-prepper-plugins/s3-source/build.gradle index f53a94459b..f6b8c5485b 100644 --- a/data-prepper-plugins/s3-source/build.gradle +++ b/data-prepper-plugins/s3-source/build.gradle @@ -18,6 +18,7 @@ dependencies { implementation 'software.amazon.awssdk:sts' implementation 'software.amazon.awssdk:sqs' implementation 'com.amazonaws:aws-java-sdk-s3:1.12.220' + implementation 'org.apache.commons:commons-compress:1.21' implementation 'org.hibernate.validator:hibernate-validator:7.0.4.Final' testImplementation 'org.apache.commons:commons-lang3:3.12.0' } diff --git a/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/codec/NewlineDelimitedCodec.java b/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/codec/NewlineDelimitedCodec.java index aa65823cb9..f99d3854c1 100644 --- a/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/codec/NewlineDelimitedCodec.java +++ b/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/codec/NewlineDelimitedCodec.java @@ -36,9 +36,9 @@ public NewlineDelimitedCodec(final NewlineDelimitedConfig config) { @Override public void parse(final InputStream inputStream, final Consumer> eventConsumer) throws IOException { - final BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)); - - parseBufferedReader(reader, eventConsumer); + try (final BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) { + parseBufferedReader(reader, eventConsumer); + } } private void parseBufferedReader(final BufferedReader reader, final Consumer> eventConsumer) throws IOException { diff --git a/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/compression/GZipCompressionEngine.java b/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/compression/GZipCompressionEngine.java index dcbe7e7738..bf448930ff 100644 --- a/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/compression/GZipCompressionEngine.java +++ b/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/compression/GZipCompressionEngine.java @@ -5,13 +5,17 @@ package com.amazon.dataprepper.plugins.source.compression; +import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; + import java.io.IOException; import java.io.InputStream; -import java.util.zip.GZIPInputStream; public class GZipCompressionEngine implements CompressionEngine { @Override public InputStream createInputStream(final String s3Key, final InputStream responseInputStream) throws IOException { - return new GZIPInputStream(responseInputStream); + // We are using GzipCompressorInputStream here to decompress because GZIPInputStream doesn't decompress concatenated .gz files + // it stops after the first member and silently ignores the rest. + // It doesn't leave the read position to point to the beginning of the next member. + return new GzipCompressorInputStream(responseInputStream, true); } } diff --git a/data-prepper-plugins/s3-source/src/test/java/com/amazon/dataprepper/plugins/source/compression/GZipCompressionEngineTest.java b/data-prepper-plugins/s3-source/src/test/java/com/amazon/dataprepper/plugins/source/compression/GZipCompressionEngineTest.java index 35a0cd7a85..45e47b5427 100644 --- a/data-prepper-plugins/s3-source/src/test/java/com/amazon/dataprepper/plugins/source/compression/GZipCompressionEngineTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/com/amazon/dataprepper/plugins/source/compression/GZipCompressionEngineTest.java @@ -5,6 +5,7 @@ package com.amazon.dataprepper.plugins.source.compression; +import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import software.amazon.awssdk.core.ResponseInputStream; @@ -16,7 +17,6 @@ import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.UUID; -import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; import static org.hamcrest.CoreMatchers.instanceOf; @@ -51,7 +51,7 @@ void createInputStream_with_gzip_should_return_instance_of_GZIPInputStream() thr final InputStream inputStream = compressionEngine.createInputStream(s3Key, byteInStream); - assertThat(inputStream, instanceOf(GZIPInputStream.class)); + assertThat(inputStream, instanceOf(GzipCompressorInputStream.class)); assertThat(inputStream.readAllBytes(), equalTo(testStringBytes)); } }