Skip to content

Commit

Permalink
Fix tests and style errors in local file implementations.
Browse files Browse the repository at this point in the history
  • Loading branch information
asuresh8 committed May 23, 2023
1 parent 0589c65 commit aa9c5ae
Show file tree
Hide file tree
Showing 8 changed files with 123 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,49 +6,58 @@

import org.apache.parquet.io.PositionOutputStream;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;

public class LocalFilePositionOutputStream extends PositionOutputStream {

private final RandomAccessFile stream;
private boolean isClosed = false;
private final File file;
private final RandomAccessFile fileStream;
private boolean closed = false;

public LocalFilePositionOutputStream(RandomAccessFile stream) {
this.stream = stream;
LocalFilePositionOutputStream(final File file, final RandomAccessFile fileStream) {
this.file = file;
this.fileStream = fileStream;
}

public static LocalFilePositionOutputStream create(final File file) throws IOException {
return new LocalFilePositionOutputStream(file, new RandomAccessFile(file, "rw"));
}

@Override
public long getPos() throws IOException {
if (isClosed) {
return stream.length();
if (this.closed) {
return file.length();
}
return stream.getFilePointer();
return fileStream.getFilePointer();
}

@Override
public void write(byte[] b) throws IOException {
stream.write(b);
fileStream.write(b);
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
stream.write(b, off, len);
fileStream.write(b, off, len);
}

@Override
public void write(int b) throws IOException {
stream.write(b);
fileStream.write(b);
}

@Override
public void close() throws IOException {
stream.close();
this.isClosed = true;
if (!this.closed) {
fileStream.close();
this.closed = true;
}
}

public boolean isClosed() {
return this.isClosed;
return this.closed;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;

public class LocalInputFile implements InputFile {
/**
Expand All @@ -35,6 +34,6 @@ public long getLength() {

@Override
public SeekableInputStream newStream() throws IOException {
return new LocalInputStream(new RandomAccessFile(file, "r"));
return LocalInputStream.create(file);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.apache.parquet.io.SeekableInputStream;

import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
Expand All @@ -20,10 +21,14 @@ public class LocalInputStream extends SeekableInputStream {
private long markPos = 0;


public LocalInputStream(final RandomAccessFile input) {
LocalInputStream(final RandomAccessFile input) {
this.input = input;
}

public static LocalInputStream create(final File file) throws IOException {
return new LocalInputStream(new RandomAccessFile(file, "r"));
}

@Override
public int read() throws IOException {
return input.read();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;

public class LocalOutputFile implements OutputFile {

Expand All @@ -30,7 +29,7 @@ public PositionOutputStream create(long blockSizeHint) throws IOException {
}

try {
return new LocalFilePositionOutputStream(new RandomAccessFile(file, "rw"));
return LocalFilePositionOutputStream.create(file);
} catch (FileNotFoundException e) {
throw new IOException("Failed to create file: " + file.toString(), e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,81 +1,104 @@
package org.opensearch.dataprepper.plugins.fs;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.plugins.fs.LocalFilePositionOutputStream;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class LocalFilePositionOutputStreamTest {

private RandomAccessFile mockStream;
private File testDataFile;
private LocalFilePositionOutputStream outputStream;

@BeforeEach
public void setup() throws IOException {
mockStream = mock(RandomAccessFile.class);
outputStream = new LocalFilePositionOutputStream(mockStream);
testDataFile = File.createTempFile( "LocalFilePositionOutputStreamTest-", "txt");
testDataFile.deleteOnExit();
outputStream = LocalFilePositionOutputStream.create(testDataFile);
}

@AfterEach
public void tearDown() throws IOException {
outputStream.close();
}

@Test
public void getPos_notClosed_returnsFilePointer() throws IOException {
long mockPos = 100L;
when(mockStream.getFilePointer()).thenReturn(mockPos);
final String inputString = "a".repeat(100);
final byte[] inputBytes = inputString.getBytes(StandardCharsets.UTF_8);

assertEquals(mockPos, outputStream.getPos());
verify(mockStream, times(1)).getFilePointer();
outputStream.write(inputBytes);

assertEquals(100, outputStream.getPos());
}

@Test
public void getPos_closed_returnsFileLength() throws IOException {
long mockPos = 200L;
when(mockStream.length()).thenReturn(mockPos);
final String inputString = "a".repeat(100);
final byte[] inputBytes = inputString.getBytes(StandardCharsets.UTF_8);

outputStream.write(inputBytes);
outputStream.close();

assertEquals(mockPos, outputStream.getPos());
verify(mockStream, times(1)).length();
verify(mockStream, times(1)).close();
assertEquals(100, outputStream.getPos());
}

@Test
public void write_bytes_passesThroughToStream() throws IOException {
byte[] data = new byte[] { 0, 1, 2, 3 };
outputStream.write(data);
final String inputString = "a".repeat(100);
final byte[] inputBytes = inputString.getBytes(StandardCharsets.UTF_8);

outputStream.write(inputBytes);
outputStream.close();

String actualContent = Files.readString(testDataFile.toPath());

verify(mockStream, times(1)).write(data);
assertEquals(inputString, actualContent);
}

@Test
public void write_bytesOffsetLength_passesThroughToStream() throws IOException {
byte[] data = new byte[] { 0, 1, 2, 3 };
int offset = 1;
int length = 2;
outputStream.write(data, offset, length);
final String inputString = "a".repeat(100);
final byte[] inputBytes = inputString.getBytes(StandardCharsets.UTF_8);
final int offset = 1;
final int length = 2;

verify(mockStream, times(1)).write(data, offset, length);
outputStream.write(inputBytes, offset, length);

outputStream.close();

final String actualContent = Files.readString(testDataFile.toPath());

assertEquals("aa", actualContent);
}

@Test
public void write_int_passesThroughToStream() throws IOException {
int data = 123;

outputStream.write(data);

verify(mockStream, times(1)).write(data);
outputStream.close();

final String stringContent = Files.readString(testDataFile.toPath());
final byte[] bytesContent = stringContent.getBytes(StandardCharsets.UTF_8);
int actualContent = bytesContent[0] & 0xFF;


assertEquals(data, actualContent);
}

@Test
public void close_closesStreamAndSetsClosed() throws IOException {
outputStream.close();

assertTrue(outputStream.isClosed());
verify(mockStream, times(1)).close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,14 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.plugins.fs.LocalInputFile;
import org.opensearch.dataprepper.plugins.fs.LocalInputStream;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.charset.StandardCharsets;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class LocalInputFileTest {

Expand Down
Loading

0 comments on commit aa9c5ae

Please sign in to comment.