Skip to content

Commit

Permalink
Fix: Updated GZipCompressionEngine to use GzipCompressorInputStream (#…
Browse files Browse the repository at this point in the history
…1570) (#1572)

Signed-off-by: Asif Sohail Mohammed <[email protected]>
(cherry picked from commit bde1b52)

Co-authored-by: Asif Sohail Mohammed <[email protected]>
  • Loading branch information
1 parent 27f853b commit 13ab3f0
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 7 deletions.
1 change: 1 addition & 0 deletions data-prepper-plugins/s3-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ dependencies {
implementation 'software.amazon.awssdk:sts'
implementation 'software.amazon.awssdk:sqs'
implementation 'com.amazonaws:aws-java-sdk-s3:1.12.220'
implementation 'org.apache.commons:commons-compress:1.21'
implementation 'org.hibernate.validator:hibernate-validator:7.0.4.Final'
testImplementation 'org.apache.commons:commons-lang3:3.12.0'
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ public NewlineDelimitedCodec(final NewlineDelimitedConfig config) {

@Override
public void parse(final InputStream inputStream, final Consumer<Record<Event>> eventConsumer) throws IOException {
final BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));

parseBufferedReader(reader, eventConsumer);
try (final BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {
parseBufferedReader(reader, eventConsumer);
}
}

private void parseBufferedReader(final BufferedReader reader, final Consumer<Record<Event>> eventConsumer) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,17 @@

package com.amazon.dataprepper.plugins.source.compression;

import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;

import java.io.IOException;
import java.io.InputStream;
import java.util.zip.GZIPInputStream;

public class GZipCompressionEngine implements CompressionEngine {
@Override
public InputStream createInputStream(final String s3Key, final InputStream responseInputStream) throws IOException {
return new GZIPInputStream(responseInputStream);
// We are using GzipCompressorInputStream here to decompress because GZIPInputStream doesn't decompress concatenated .gz files
// it stops after the first member and silently ignores the rest.
// It doesn't leave the read position to point to the beginning of the next member.
return new GzipCompressorInputStream(responseInputStream, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package com.amazon.dataprepper.plugins.source.compression;

import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.core.ResponseInputStream;
Expand All @@ -16,7 +17,6 @@
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

import static org.hamcrest.CoreMatchers.instanceOf;
Expand Down Expand Up @@ -51,7 +51,7 @@ void createInputStream_with_gzip_should_return_instance_of_GZIPInputStream() thr

final InputStream inputStream = compressionEngine.createInputStream(s3Key, byteInStream);

assertThat(inputStream, instanceOf(GZIPInputStream.class));
assertThat(inputStream, instanceOf(GzipCompressorInputStream.class));
assertThat(inputStream.readAllBytes(), equalTo(testStringBytes));
}
}

0 comments on commit 13ab3f0

Please sign in to comment.