diff --git a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/fs/LocalFilePositionOutputStream.java b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/fs/LocalFilePositionOutputStream.java index de17f3e78f..38fc31b3b6 100644 --- a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/fs/LocalFilePositionOutputStream.java +++ b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/fs/LocalFilePositionOutputStream.java @@ -6,49 +6,58 @@ import org.apache.parquet.io.PositionOutputStream; +import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; public class LocalFilePositionOutputStream extends PositionOutputStream { - private final RandomAccessFile stream; - private boolean isClosed = false; + private final File file; + private final RandomAccessFile fileStream; + private boolean closed = false; - public LocalFilePositionOutputStream(RandomAccessFile stream) { - this.stream = stream; + LocalFilePositionOutputStream(final File file, final RandomAccessFile fileStream) { + this.file = file; + this.fileStream = fileStream; + } + + public static LocalFilePositionOutputStream create(final File file) throws IOException { + return new LocalFilePositionOutputStream(file, new RandomAccessFile(file, "rw")); } @Override public long getPos() throws IOException { - if (isClosed) { - return stream.length(); + if (this.closed) { + return file.length(); } - return stream.getFilePointer(); + return fileStream.getFilePointer(); } @Override public void write(byte[] b) throws IOException { - stream.write(b); + fileStream.write(b); } @Override public void write(byte[] b, int off, int len) throws IOException { - stream.write(b, off, len); + fileStream.write(b, off, len); } @Override public void write(int b) throws IOException { - stream.write(b); + fileStream.write(b); } @Override public void close() throws IOException { - stream.close(); - this.isClosed = true; + if (!this.closed) { + fileStream.close(); + this.closed = true; + } } public boolean isClosed() { - return this.isClosed; + return this.closed; } } diff --git a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/fs/LocalInputFile.java b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/fs/LocalInputFile.java index e1f85e9087..893bc6cdc6 100644 --- a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/fs/LocalInputFile.java +++ b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/fs/LocalInputFile.java @@ -10,7 +10,6 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; -import java.io.RandomAccessFile; public class LocalInputFile implements InputFile { /** @@ -35,6 +34,6 @@ public long getLength() { @Override public SeekableInputStream newStream() throws IOException { - return new LocalInputStream(new RandomAccessFile(file, "r")); + return LocalInputStream.create(file); } } diff --git a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/fs/LocalInputStream.java b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/fs/LocalInputStream.java index 448d543da8..138af813de 100644 --- a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/fs/LocalInputStream.java +++ b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/fs/LocalInputStream.java @@ -7,6 +7,7 @@ import org.apache.parquet.io.SeekableInputStream; import java.io.EOFException; +import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; @@ -20,10 +21,14 @@ public class LocalInputStream extends SeekableInputStream { private long markPos = 0; - public LocalInputStream(final RandomAccessFile input) { + LocalInputStream(final RandomAccessFile input) { this.input = input; } + public static LocalInputStream create(final File file) throws IOException { + return new LocalInputStream(new RandomAccessFile(file, "r")); + } + @Override public int read() throws IOException { return input.read(); diff --git a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/fs/LocalOutputFile.java b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/fs/LocalOutputFile.java index b365309652..debcd1d6dc 100644 --- a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/fs/LocalOutputFile.java +++ b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/fs/LocalOutputFile.java @@ -10,7 +10,6 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; -import java.io.RandomAccessFile; public class LocalOutputFile implements OutputFile { @@ -30,7 +29,7 @@ public PositionOutputStream create(long blockSizeHint) throws IOException { } try { - return new LocalFilePositionOutputStream(new RandomAccessFile(file, "rw")); + return LocalFilePositionOutputStream.create(file); } catch (FileNotFoundException e) { throw new IOException("Failed to create file: " + file.toString(), e); } diff --git a/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/fs/LocalFilePositionOutputStreamTest.java b/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/fs/LocalFilePositionOutputStreamTest.java index e9fc7a4966..7cc735b23e 100644 --- a/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/fs/LocalFilePositionOutputStreamTest.java +++ b/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/fs/LocalFilePositionOutputStreamTest.java @@ -1,74 +1,98 @@ package org.opensearch.dataprepper.plugins.fs; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.opensearch.dataprepper.plugins.fs.LocalFilePositionOutputStream; +import java.io.File; import java.io.IOException; -import java.io.RandomAccessFile; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; public class LocalFilePositionOutputStreamTest { - private RandomAccessFile mockStream; + private File testDataFile; private LocalFilePositionOutputStream outputStream; @BeforeEach public void setup() throws IOException { - mockStream = mock(RandomAccessFile.class); - outputStream = new LocalFilePositionOutputStream(mockStream); + testDataFile = File.createTempFile( "LocalFilePositionOutputStreamTest-", "txt"); + testDataFile.deleteOnExit(); + outputStream = LocalFilePositionOutputStream.create(testDataFile); + } + + @AfterEach + public void tearDown() throws IOException { + outputStream.close(); } @Test public void getPos_notClosed_returnsFilePointer() throws IOException { - long mockPos = 100L; - when(mockStream.getFilePointer()).thenReturn(mockPos); + final String inputString = "a".repeat(100); + final byte[] inputBytes = inputString.getBytes(StandardCharsets.UTF_8); - assertEquals(mockPos, outputStream.getPos()); - verify(mockStream, times(1)).getFilePointer(); + outputStream.write(inputBytes); + + assertEquals(100, outputStream.getPos()); } @Test public void getPos_closed_returnsFileLength() throws IOException { - long mockPos = 200L; - when(mockStream.length()).thenReturn(mockPos); + final String inputString = "a".repeat(100); + final byte[] inputBytes = inputString.getBytes(StandardCharsets.UTF_8); + + outputStream.write(inputBytes); outputStream.close(); - assertEquals(mockPos, outputStream.getPos()); - verify(mockStream, times(1)).length(); - verify(mockStream, times(1)).close(); + assertEquals(100, outputStream.getPos()); } @Test public void write_bytes_passesThroughToStream() throws IOException { - byte[] data = new byte[] { 0, 1, 2, 3 }; - outputStream.write(data); + final String inputString = "a".repeat(100); + final byte[] inputBytes = inputString.getBytes(StandardCharsets.UTF_8); + + outputStream.write(inputBytes); + outputStream.close(); + + String actualContent = Files.readString(testDataFile.toPath()); - verify(mockStream, times(1)).write(data); + assertEquals(inputString, actualContent); } @Test public void write_bytesOffsetLength_passesThroughToStream() throws IOException { - byte[] data = new byte[] { 0, 1, 2, 3 }; - int offset = 1; - int length = 2; - outputStream.write(data, offset, length); + final String inputString = "a".repeat(100); + final byte[] inputBytes = inputString.getBytes(StandardCharsets.UTF_8); + final int offset = 1; + final int length = 2; - verify(mockStream, times(1)).write(data, offset, length); + outputStream.write(inputBytes, offset, length); + + outputStream.close(); + + final String actualContent = Files.readString(testDataFile.toPath()); + + assertEquals("aa", actualContent); } @Test public void write_int_passesThroughToStream() throws IOException { int data = 123; + outputStream.write(data); - verify(mockStream, times(1)).write(data); + outputStream.close(); + + final String stringContent = Files.readString(testDataFile.toPath()); + final byte[] bytesContent = stringContent.getBytes(StandardCharsets.UTF_8); + int actualContent = bytesContent[0] & 0xFF; + + + assertEquals(data, actualContent); } @Test @@ -76,6 +100,5 @@ public void close_closesStreamAndSetsClosed() throws IOException { outputStream.close(); assertTrue(outputStream.isClosed()); - verify(mockStream, times(1)).close(); } } diff --git a/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/fs/LocalInputFileTest.java b/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/fs/LocalInputFileTest.java index ae746d0be3..9182098a09 100644 --- a/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/fs/LocalInputFileTest.java +++ b/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/fs/LocalInputFileTest.java @@ -5,21 +5,14 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.opensearch.dataprepper.plugins.fs.LocalInputFile; -import org.opensearch.dataprepper.plugins.fs.LocalInputStream; import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; import java.io.IOException; -import java.io.RandomAccessFile; import java.nio.charset.StandardCharsets; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; public class LocalInputFileTest { diff --git a/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/fs/LocalInputStreamTest.java b/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/fs/LocalInputStreamTest.java index 2ec78fc91d..a5c7bb025d 100644 --- a/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/fs/LocalInputStreamTest.java +++ b/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/fs/LocalInputStreamTest.java @@ -2,28 +2,31 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.opensearch.dataprepper.plugins.fs.LocalInputStream; import java.io.EOFException; -import java.io.RandomAccessFile; +import java.io.File; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; public class LocalInputStreamTest { - private RandomAccessFile mockInput; + private File testDataFile; private LocalInputStream localInputStream; @BeforeEach public void setup() throws Exception { - mockInput = mock(RandomAccessFile.class); - localInputStream = new LocalInputStream(mockInput); + testDataFile = File.createTempFile( "LocalFilePositionOutputStreamTest-", "txt"); + testDataFile.deleteOnExit(); + + final String inputString = "a".repeat(100); + final byte[] inputBytes = inputString.getBytes(StandardCharsets.UTF_8); + Files.write(testDataFile.toPath(), inputBytes); + + localInputStream = LocalInputStream.create(testDataFile); } @Test @@ -33,132 +36,100 @@ public void read_byteArray_offset_length_callsInputRead() throws Exception { int length = 5; localInputStream.read(buffer, offset, length); - verify(mockInput, times(1)).read(buffer, offset, length); + final String actualContent = new String(buffer, StandardCharsets.UTF_8); + + assertEquals("aaaaa", actualContent); } @Test public void skip_skipsTheGivenAmountAndReturnsTheDifferenceInPosition() throws Exception { - when(mockInput.getFilePointer()).thenReturn(5L).thenReturn(10L); - when(mockInput.length()).thenReturn(20L); - assertEquals(5, localInputStream.skip(5)); - - verify(mockInput, times(2)).getFilePointer(); - verify(mockInput, times(1)).length(); - verify(mockInput, times(1)).seek(10L); } @Test public void mark_setsMarkPos() throws Exception { - when(mockInput.getFilePointer()).thenReturn(10L); + assertEquals(10, localInputStream.skip(10)); localInputStream.mark(0); assertEquals(10L, localInputStream.getMarkedPos()); } @Test public void reset_resetsToMarkPos() throws Exception { - when(mockInput.getFilePointer()).thenReturn(10L); - localInputStream.mark(100); + assertEquals(10, localInputStream.skip(10)); + localInputStream.mark(0); + localInputStream.reset(); - verify(mockInput, times(1)).seek(10L); + assertEquals(10, localInputStream.getPos()); } @Test public void getPos_returnsCurrentPos() throws Exception { - when(mockInput.getFilePointer()).thenReturn(10L); - assertEquals(10L, localInputStream.getPos()); + assertEquals(10, localInputStream.skip(10)); + assertEquals(10, localInputStream.getPos()); } @Test public void seek_changesPosition() throws Exception { localInputStream.seek(10L); - verify(mockInput, times(1)).seek(10L); + assertEquals(10, localInputStream.getPos()); } @Test public void readFully_bytes_callsInputReadFully() throws Exception { byte[] buffer = new byte[5]; localInputStream.readFully(buffer); - verify(mockInput, times(1)).readFully(buffer); + + assertEquals("aaaaa", new String(buffer, StandardCharsets.UTF_8)); } @Test public void read_singleByte_callsInputRead() throws Exception { - localInputStream.read(); - verify(mockInput, times(1)).read(); + int byteRead = localInputStream.read(); + byte b = (byte) (byteRead & 0xFF); + + assertEquals("a", new String(new byte[]{ b }, StandardCharsets.UTF_8)); + } @Test public void read_byteArray_callsInputRead() throws Exception { byte[] buffer = new byte[5]; localInputStream.read(buffer); - verify(mockInput, times(1)).read(buffer); + + assertEquals("aaaaa", new String(buffer, StandardCharsets.UTF_8)); } @Test public void readFully_bytes_offset_length_callsInputReadFully() throws Exception { byte[] buffer = new byte[5]; localInputStream.readFully(buffer, 0, 5); - verify(mockInput, times(1)).readFully(buffer, 0, 5); + + assertEquals("aaaaa", new String(buffer, StandardCharsets.UTF_8)); } @Test public void readDirectBuffer_readsIntoByteBuffer() throws Exception { - ByteBuffer buffer = ByteBuffer.allocate(8192); - byte[] page = new byte[10]; - byte[] data = "Some Data".getBytes(); - System.arraycopy(data, 0, page, 0, data.length); - - when(mockInput.read(page, 0, Math.min(buffer.remaining(), page.length))) - .thenAnswer(invocation -> { - byte[] output = invocation.getArgument(0); - int len = Math.min(output.length, data.length); - System.arraycopy(data, 0, output, 0, len); - return len; - }); - + ByteBuffer buffer = ByteBuffer.allocate(100); localInputStream.read(buffer); - buffer.flip(); - assertEquals("Some Data", new String(buffer.array(), 0, buffer.limit())); + + final String expectedContent = "a".repeat(100); + assertEquals(expectedContent, new String(buffer.array(), StandardCharsets.UTF_8)); } @Test public void readFullyDirectBuffer_readsFullyIntoByteBuffer() throws Exception { - ByteBuffer buffer = ByteBuffer.allocate(8192); - byte[] page = new byte[10]; - byte[] data = "Some Data".getBytes(); - System.arraycopy(data, 0, page, 0, data.length); - - when(mockInput.read(page, 0, Math.min(buffer.remaining(), page.length))) - .thenAnswer(invocation -> { - byte[] output = invocation.getArgument(0); - int len = Math.min(output.length, data.length); - System.arraycopy(data, 0, output, 0, len); - return len; - }); - + ByteBuffer buffer = ByteBuffer.allocate(100); localInputStream.readFully(buffer); - buffer.flip(); - assertEquals("Some Data", new String(buffer.array(), 0, buffer.limit())); + + final String expectedContent = "a".repeat(100); + assertEquals(expectedContent, new String(buffer.array(), StandardCharsets.UTF_8)); } @Test public void readFullyDirectBuffer_eofBeforeFullRead_throwsEOFException() throws Exception { ByteBuffer buffer = ByteBuffer.allocate(8192); - byte[] page = new byte[10]; - byte[] data = "Short".getBytes(); - System.arraycopy(data, 0, page, 0, data.length); - - when(mockInput.read(page, 0, Math.min(buffer.remaining(), page.length))) - .thenAnswer(invocation -> { - byte[] output = invocation.getArgument(0); - int len = Math.min(output.length, data.length); - System.arraycopy(data, 0, output, 0, len); - return len; - }) - .thenReturn(-1); assertThrows(EOFException.class, () -> localInputStream.readFully(buffer)); } diff --git a/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/fs/LocalOutputFileTest.java b/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/fs/LocalOutputFileTest.java index d9e0fcd899..f3493285f4 100644 --- a/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/fs/LocalOutputFileTest.java +++ b/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/fs/LocalOutputFileTest.java @@ -4,7 +4,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.mockito.Mockito; -import org.opensearch.dataprepper.plugins.fs.LocalOutputFile; import java.io.File; import java.io.IOException;