Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix exception hanlding #15

Merged
merged 2 commits into from
Jun 20, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 62 additions & 58 deletions paldb/src/main/java/com/linkedin/paldb/impl/StorageReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,78 +106,82 @@ public class StorageReader implements Iterable<Map.Entry<byte[], byte[]>> {
}

//Open file and read metadata
long createdAt = 0;
FormatVersion formatVersion = null;
FileInputStream inputStream = new FileInputStream(path);
DataInputStream dataInputStream = new DataInputStream(new BufferedInputStream(inputStream));
int ignoredBytes = -2;

//Byte mark
byte[] mark = FormatVersion.getPrefixBytes();
int found = 0;
while (found != mark.length) {
byte b = dataInputStream.readByte();
if (b == mark[found]) {
found++;
} else {
ignoredBytes += found + 1;
found = 0;
try {
int ignoredBytes = -2;

//Byte mark
byte[] mark = FormatVersion.getPrefixBytes();
int found = 0;
while (found != mark.length) {
byte b = dataInputStream.readByte();
if (b == mark[found]) {
found++;
} else {
ignoredBytes += found + 1;
found = 0;
}
}
}

//Version
byte[] versionFound = Arrays.copyOf(mark, FormatVersion.getLatestVersion().getBytes().length);
dataInputStream.readFully(versionFound, mark.length, versionFound.length - mark.length);
//Version
byte[] versionFound = Arrays.copyOf(mark, FormatVersion.getLatestVersion().getBytes().length);
dataInputStream.readFully(versionFound, mark.length, versionFound.length - mark.length);

FormatVersion formatVersion = FormatVersion.fromBytes(versionFound);
if (formatVersion == null || !formatVersion.is(FormatVersion.getLatestVersion())) {
throw new RuntimeException(
"Version mismatch, expected was '" + FormatVersion.getLatestVersion() + "' and found '" + formatVersion
+ "'");
}

//Time
long createdAt = dataInputStream.readLong();
formatVersion = FormatVersion.fromBytes(versionFound);
if (formatVersion == null || !formatVersion.is(FormatVersion.getLatestVersion())) {
throw new RuntimeException(
"Version mismatch, expected was '" + FormatVersion.getLatestVersion() + "' and found '" + formatVersion
+ "'");
}

//Metadata counters
keyCount = dataInputStream.readInt();
keyLengthCount = dataInputStream.readInt();
maxKeyLength = dataInputStream.readInt();
//Time
createdAt = dataInputStream.readLong();

//Read offset counts and keys
indexOffsets = new int[maxKeyLength + 1];
dataOffsets = new long[maxKeyLength + 1];
keyCounts = new int[maxKeyLength + 1];
slots = new int[maxKeyLength + 1];
slotSizes = new int[maxKeyLength + 1];
//Metadata counters
keyCount = dataInputStream.readInt();
keyLengthCount = dataInputStream.readInt();
maxKeyLength = dataInputStream.readInt();

int maxSlotSize = 0;
for (int i = 0; i < keyLengthCount; i++) {
int keyLength = dataInputStream.readInt();
//Read offset counts and keys
indexOffsets = new int[maxKeyLength + 1];
dataOffsets = new long[maxKeyLength + 1];
keyCounts = new int[maxKeyLength + 1];
slots = new int[maxKeyLength + 1];
slotSizes = new int[maxKeyLength + 1];

keyCounts[keyLength] = dataInputStream.readInt();
slots[keyLength] = dataInputStream.readInt();
slotSizes[keyLength] = dataInputStream.readInt();
indexOffsets[keyLength] = dataInputStream.readInt();
dataOffsets[keyLength] = dataInputStream.readLong();
int maxSlotSize = 0;
for (int i = 0; i < keyLengthCount; i++) {
int keyLength = dataInputStream.readInt();

maxSlotSize = Math.max(maxSlotSize, slotSizes[keyLength]);
}
keyCounts[keyLength] = dataInputStream.readInt();
slots[keyLength] = dataInputStream.readInt();
slotSizes[keyLength] = dataInputStream.readInt();
indexOffsets[keyLength] = dataInputStream.readInt();
dataOffsets[keyLength] = dataInputStream.readLong();

slotBuffer = new byte[maxSlotSize];
maxSlotSize = Math.max(maxSlotSize, slotSizes[keyLength]);
}

//Read serializers
try {
Serializers.deserialize(dataInputStream, config.getSerializers());
} catch (Exception e) {
throw new RuntimeException();
}
slotBuffer = new byte[maxSlotSize];

//Read index and data offset
indexOffset = dataInputStream.readInt() + ignoredBytes;
dataOffset = dataInputStream.readLong() + ignoredBytes;
//Read serializers
try {
Serializers.deserialize(dataInputStream, config.getSerializers());
} catch (Exception e) {
throw new RuntimeException();
}

//Close metadata
dataInputStream.close();
inputStream.close();
//Read index and data offset
indexOffset = dataInputStream.readInt() + ignoredBytes;
dataOffset = dataInputStream.readLong() + ignoredBytes;
} finally {
//Close metadata
dataInputStream.close();
inputStream.close();
}

//Create Mapped file in read-only mode
mappedFile = new RandomAccessFile(path, "r");
Expand Down
145 changes: 74 additions & 71 deletions paldb/src/main/java/com/linkedin/paldb/impl/StorageWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ public class StorageWriter {
// Number of collisions
private int collisions;

StorageWriter(Configuration configuration, OutputStream stream)
throws IOException {
StorageWriter(Configuration configuration, OutputStream stream) {
config = configuration;
loadFactor = config.getDouble(Configuration.LOAD_FACTOR);
if (loadFactor <= 0.0 || loadFactor >= 1.0) {
Expand Down Expand Up @@ -279,78 +278,81 @@ private File buildIndex(int keyLength)
// Init index
File indexFile = new File(tempFolder, "index" + keyLength + ".dat");
RandomAccessFile indexAccessFile = new RandomAccessFile(indexFile, "rw");
indexAccessFile.setLength(slots * slotSize);
FileChannel indexChannel = indexAccessFile.getChannel();
MappedByteBuffer byteBuffer = indexChannel.map(FileChannel.MapMode.READ_WRITE, 0, indexAccessFile.length());

// Init reading stream
File tempIndexFile = indexFiles[keyLength];
DataInputStream tempIndexStream = new DataInputStream(new BufferedInputStream(new FileInputStream(tempIndexFile)));
try {
byte[] keyBuffer = new byte[keyLength];
byte[] slotBuffer = new byte[slotSize];
byte[] offsetBuffer = new byte[offsetLength];

// Read all keys
for (int i = 0; i < count; i++) {
// Read key
tempIndexStream.readFully(keyBuffer);

// Read offset
long offset = LongPacker.unpackLong(tempIndexStream);

// Hash
long hash = (long) HashUtils.hash(keyBuffer);

boolean collision = false;
for (int probe = 0; probe < count; probe++) {
int slot = (int) ((hash + probe) % slots);
byteBuffer.position(slot * slotSize);
byteBuffer.get(slotBuffer);

long found = LongPacker.unpackLong(slotBuffer, keyLength);
if (found == 0) {
// The spot is empty use it
indexAccessFile.setLength(slots * slotSize);
FileChannel indexChannel = indexAccessFile.getChannel();
MappedByteBuffer byteBuffer = indexChannel.map(FileChannel.MapMode.READ_WRITE, 0, indexAccessFile.length());

// Init reading stream
File tempIndexFile = indexFiles[keyLength];
DataInputStream tempIndexStream = new DataInputStream(new BufferedInputStream(new FileInputStream(tempIndexFile)));
try {
byte[] keyBuffer = new byte[keyLength];
byte[] slotBuffer = new byte[slotSize];
byte[] offsetBuffer = new byte[offsetLength];

// Read all keys
for (int i = 0; i < count; i++) {
// Read key
tempIndexStream.readFully(keyBuffer);

// Read offset
long offset = LongPacker.unpackLong(tempIndexStream);

// Hash
long hash = (long) HashUtils.hash(keyBuffer);

boolean collision = false;
for (int probe = 0; probe < count; probe++) {
int slot = (int) ((hash + probe) % slots);
byteBuffer.position(slot * slotSize);
byteBuffer.put(keyBuffer);
int pos = LongPacker.packLong(offsetBuffer, offset);
byteBuffer.put(offsetBuffer, 0, pos);
break;
} else {
collision = true;
// Check for duplicates
if (Arrays.equals(keyBuffer, Arrays.copyOf(slotBuffer, keyLength))) {
throw new RuntimeException(
String.format("A duplicate key has been found for for key bytes %s", Arrays.toString(keyBuffer)));
byteBuffer.get(slotBuffer);

long found = LongPacker.unpackLong(slotBuffer, keyLength);
if (found == 0) {
// The spot is empty use it
byteBuffer.position(slot * slotSize);
byteBuffer.put(keyBuffer);
int pos = LongPacker.packLong(offsetBuffer, offset);
byteBuffer.put(offsetBuffer, 0, pos);
break;
} else {
collision = true;
// Check for duplicates
if (Arrays.equals(keyBuffer, Arrays.copyOf(slotBuffer, keyLength))) {
throw new RuntimeException(
String.format("A duplicate key has been found for for key bytes %s", Arrays.toString(keyBuffer)));
}
}
}
}

if (collision) {
collisions++;
if (collision) {
collisions++;
}
}
}

String msg = " Max offset length: " + offsetLength + " bytes" +
"\n Slot size: " + slotSize + " bytes";
String msg = " Max offset length: " + offsetLength + " bytes" +
"\n Slot size: " + slotSize + " bytes";

LOGGER.log(Level.INFO, "Built index file {0}\n" + msg, indexFile.getName());
} finally {
// Close input
tempIndexStream.close();
LOGGER.log(Level.INFO, "Built index file {0}\n" + msg, indexFile.getName());
} finally {
// Close input
tempIndexStream.close();

// Close index and make sure resources are liberated
indexChannel.close();
indexChannel = null;
byteBuffer = null;

// Close index and make sure resources are liberated
indexChannel.close();
// Delete temp index file
if (tempIndexFile.delete()) {
LOGGER.log(Level.INFO, "Temporary index file {0} has been deleted", tempIndexFile.getName());
}
}
} finally{
indexAccessFile.close();
indexChannel = null;
indexAccessFile = null;
byteBuffer = null;
System.gc();

// Delete temp index file
if (tempIndexFile.delete()) {
LOGGER.log(Level.INFO, "Temporary index file {0} has been deleted", tempIndexFile.getName());
}
}

return indexFile;
Expand Down Expand Up @@ -386,17 +388,18 @@ private void mergeFiles(List<File> inputFiles, OutputStream outputStream)
if (f.exists()) {
FileInputStream fileInputStream = new FileInputStream(f);
BufferedInputStream bufferedInputStream = new BufferedInputStream(fileInputStream);
try {
LOGGER.log(Level.INFO, "Merging {0} size={1}", new Object[]{f.getName(), f.length()});

LOGGER.log(Level.INFO, "Merging {0} size={1}", new Object[]{f.getName(), f.length()});

byte[] buffer = new byte[8192];
int length;
while ((length = bufferedInputStream.read(buffer)) > 0) {
outputStream.write(buffer, 0, length);
byte[] buffer = new byte[8192];
int length;
while ((length = bufferedInputStream.read(buffer)) > 0) {
outputStream.write(buffer, 0, length);
}
} finally {
bufferedInputStream.close();
fileInputStream.close();
}

bufferedInputStream.close();
fileInputStream.close();
} else {
LOGGER.log(Level.INFO, "Skip merging file {0} because it doesn't exist", f.getName());
}
Expand Down
10 changes: 3 additions & 7 deletions paldb/src/main/java/com/linkedin/paldb/impl/WriterImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,9 @@ private WriterImpl(Configuration config, OutputStream stream, File file) {
this.file = file;

// Open storage
try {
LOGGER.log(Level.INFO, "Opening writer storage");
serialization = new StorageSerialization(config);
storage = new StorageWriter(config, outputStream);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
LOGGER.log(Level.INFO, "Opening writer storage");
serialization = new StorageSerialization(config);
storage = new StorageWriter(config, outputStream);
opened = true;
}

Expand Down
36 changes: 20 additions & 16 deletions paldb/src/main/java/com/linkedin/paldb/utils/TempUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public static File createTempDir(String prefix) {
"Failed to create directory within " + 10000 + " attempts (tried " + baseName + "0 to " + baseName + (10000 - 1)
+ ')');
}

/**
* Copies <code>inputStream</code> into a temporary file <code>fileName</code>.
*
Expand All @@ -65,23 +65,27 @@ public static File copyIntoTempFile(String fileName, InputStream inputStream)
throws IOException {
BufferedInputStream bufferedStream = inputStream instanceof BufferedInputStream ? (BufferedInputStream) inputStream
: new BufferedInputStream(inputStream);
File destFile = File.createTempFile(fileName, null);
destFile.deleteOnExit();

FileOutputStream fileOutputStream = new FileOutputStream(destFile);
BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(fileOutputStream);
File destFile = null;
try {
destFile = File.createTempFile(fileName, null);
destFile.deleteOnExit();

byte[] buffer = new byte[8192];
int length;
while ((length = bufferedStream.read(buffer)) > 0) {
bufferedOutputStream.write(buffer, 0, length);
FileOutputStream fileOutputStream = new FileOutputStream(destFile);
BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(fileOutputStream);
try {
byte[] buffer = new byte[8192];
int length;
while ((length = bufferedStream.read(buffer)) > 0) {
bufferedOutputStream.write(buffer, 0, length);
}
} finally {
bufferedOutputStream.close();
fileOutputStream.close();
}
} finally {
bufferedStream.close();
inputStream.close();
}

bufferedOutputStream.close();
fileOutputStream.close();

bufferedStream.close();
inputStream.close();
return destFile;
}
}