Skip to content

Commit

Permalink
Merge pull request #15 from tejaspathak/master
Browse files Browse the repository at this point in the history
Fix exception hanlding
  • Loading branch information
mbastian authored Jun 20, 2016
2 parents e7c81c7 + 5cc0e54 commit 2c9af3f
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 152 deletions.
120 changes: 62 additions & 58 deletions paldb/src/main/java/com/linkedin/paldb/impl/StorageReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,78 +106,82 @@ public class StorageReader implements Iterable<Map.Entry<byte[], byte[]>> {
}

//Open file and read metadata
long createdAt = 0;
FormatVersion formatVersion = null;
FileInputStream inputStream = new FileInputStream(path);
DataInputStream dataInputStream = new DataInputStream(new BufferedInputStream(inputStream));
int ignoredBytes = -2;

//Byte mark
byte[] mark = FormatVersion.getPrefixBytes();
int found = 0;
while (found != mark.length) {
byte b = dataInputStream.readByte();
if (b == mark[found]) {
found++;
} else {
ignoredBytes += found + 1;
found = 0;
try {
int ignoredBytes = -2;

//Byte mark
byte[] mark = FormatVersion.getPrefixBytes();
int found = 0;
while (found != mark.length) {
byte b = dataInputStream.readByte();
if (b == mark[found]) {
found++;
} else {
ignoredBytes += found + 1;
found = 0;
}
}
}

//Version
byte[] versionFound = Arrays.copyOf(mark, FormatVersion.getLatestVersion().getBytes().length);
dataInputStream.readFully(versionFound, mark.length, versionFound.length - mark.length);
//Version
byte[] versionFound = Arrays.copyOf(mark, FormatVersion.getLatestVersion().getBytes().length);
dataInputStream.readFully(versionFound, mark.length, versionFound.length - mark.length);

FormatVersion formatVersion = FormatVersion.fromBytes(versionFound);
if (formatVersion == null || !formatVersion.is(FormatVersion.getLatestVersion())) {
throw new RuntimeException(
"Version mismatch, expected was '" + FormatVersion.getLatestVersion() + "' and found '" + formatVersion
+ "'");
}

//Time
long createdAt = dataInputStream.readLong();
formatVersion = FormatVersion.fromBytes(versionFound);
if (formatVersion == null || !formatVersion.is(FormatVersion.getLatestVersion())) {
throw new RuntimeException(
"Version mismatch, expected was '" + FormatVersion.getLatestVersion() + "' and found '" + formatVersion
+ "'");
}

//Metadata counters
keyCount = dataInputStream.readInt();
keyLengthCount = dataInputStream.readInt();
maxKeyLength = dataInputStream.readInt();
//Time
createdAt = dataInputStream.readLong();

//Read offset counts and keys
indexOffsets = new int[maxKeyLength + 1];
dataOffsets = new long[maxKeyLength + 1];
keyCounts = new int[maxKeyLength + 1];
slots = new int[maxKeyLength + 1];
slotSizes = new int[maxKeyLength + 1];
//Metadata counters
keyCount = dataInputStream.readInt();
keyLengthCount = dataInputStream.readInt();
maxKeyLength = dataInputStream.readInt();

int maxSlotSize = 0;
for (int i = 0; i < keyLengthCount; i++) {
int keyLength = dataInputStream.readInt();
//Read offset counts and keys
indexOffsets = new int[maxKeyLength + 1];
dataOffsets = new long[maxKeyLength + 1];
keyCounts = new int[maxKeyLength + 1];
slots = new int[maxKeyLength + 1];
slotSizes = new int[maxKeyLength + 1];

keyCounts[keyLength] = dataInputStream.readInt();
slots[keyLength] = dataInputStream.readInt();
slotSizes[keyLength] = dataInputStream.readInt();
indexOffsets[keyLength] = dataInputStream.readInt();
dataOffsets[keyLength] = dataInputStream.readLong();
int maxSlotSize = 0;
for (int i = 0; i < keyLengthCount; i++) {
int keyLength = dataInputStream.readInt();

maxSlotSize = Math.max(maxSlotSize, slotSizes[keyLength]);
}
keyCounts[keyLength] = dataInputStream.readInt();
slots[keyLength] = dataInputStream.readInt();
slotSizes[keyLength] = dataInputStream.readInt();
indexOffsets[keyLength] = dataInputStream.readInt();
dataOffsets[keyLength] = dataInputStream.readLong();

slotBuffer = new byte[maxSlotSize];
maxSlotSize = Math.max(maxSlotSize, slotSizes[keyLength]);
}

//Read serializers
try {
Serializers.deserialize(dataInputStream, config.getSerializers());
} catch (Exception e) {
throw new RuntimeException();
}
slotBuffer = new byte[maxSlotSize];

//Read index and data offset
indexOffset = dataInputStream.readInt() + ignoredBytes;
dataOffset = dataInputStream.readLong() + ignoredBytes;
//Read serializers
try {
Serializers.deserialize(dataInputStream, config.getSerializers());
} catch (Exception e) {
throw new RuntimeException();
}

//Close metadata
dataInputStream.close();
inputStream.close();
//Read index and data offset
indexOffset = dataInputStream.readInt() + ignoredBytes;
dataOffset = dataInputStream.readLong() + ignoredBytes;
} finally {
//Close metadata
dataInputStream.close();
inputStream.close();
}

//Create Mapped file in read-only mode
mappedFile = new RandomAccessFile(path, "r");
Expand Down
145 changes: 74 additions & 71 deletions paldb/src/main/java/com/linkedin/paldb/impl/StorageWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ public class StorageWriter {
// Number of collisions
private int collisions;

StorageWriter(Configuration configuration, OutputStream stream)
throws IOException {
StorageWriter(Configuration configuration, OutputStream stream) {
config = configuration;
loadFactor = config.getDouble(Configuration.LOAD_FACTOR);
if (loadFactor <= 0.0 || loadFactor >= 1.0) {
Expand Down Expand Up @@ -279,78 +278,81 @@ private File buildIndex(int keyLength)
// Init index
File indexFile = new File(tempFolder, "index" + keyLength + ".dat");
RandomAccessFile indexAccessFile = new RandomAccessFile(indexFile, "rw");
indexAccessFile.setLength(slots * slotSize);
FileChannel indexChannel = indexAccessFile.getChannel();
MappedByteBuffer byteBuffer = indexChannel.map(FileChannel.MapMode.READ_WRITE, 0, indexAccessFile.length());

// Init reading stream
File tempIndexFile = indexFiles[keyLength];
DataInputStream tempIndexStream = new DataInputStream(new BufferedInputStream(new FileInputStream(tempIndexFile)));
try {
byte[] keyBuffer = new byte[keyLength];
byte[] slotBuffer = new byte[slotSize];
byte[] offsetBuffer = new byte[offsetLength];

// Read all keys
for (int i = 0; i < count; i++) {
// Read key
tempIndexStream.readFully(keyBuffer);

// Read offset
long offset = LongPacker.unpackLong(tempIndexStream);

// Hash
long hash = (long) HashUtils.hash(keyBuffer);

boolean collision = false;
for (int probe = 0; probe < count; probe++) {
int slot = (int) ((hash + probe) % slots);
byteBuffer.position(slot * slotSize);
byteBuffer.get(slotBuffer);

long found = LongPacker.unpackLong(slotBuffer, keyLength);
if (found == 0) {
// The spot is empty use it
indexAccessFile.setLength(slots * slotSize);
FileChannel indexChannel = indexAccessFile.getChannel();
MappedByteBuffer byteBuffer = indexChannel.map(FileChannel.MapMode.READ_WRITE, 0, indexAccessFile.length());

// Init reading stream
File tempIndexFile = indexFiles[keyLength];
DataInputStream tempIndexStream = new DataInputStream(new BufferedInputStream(new FileInputStream(tempIndexFile)));
try {
byte[] keyBuffer = new byte[keyLength];
byte[] slotBuffer = new byte[slotSize];
byte[] offsetBuffer = new byte[offsetLength];

// Read all keys
for (int i = 0; i < count; i++) {
// Read key
tempIndexStream.readFully(keyBuffer);

// Read offset
long offset = LongPacker.unpackLong(tempIndexStream);

// Hash
long hash = (long) HashUtils.hash(keyBuffer);

boolean collision = false;
for (int probe = 0; probe < count; probe++) {
int slot = (int) ((hash + probe) % slots);
byteBuffer.position(slot * slotSize);
byteBuffer.put(keyBuffer);
int pos = LongPacker.packLong(offsetBuffer, offset);
byteBuffer.put(offsetBuffer, 0, pos);
break;
} else {
collision = true;
// Check for duplicates
if (Arrays.equals(keyBuffer, Arrays.copyOf(slotBuffer, keyLength))) {
throw new RuntimeException(
String.format("A duplicate key has been found for for key bytes %s", Arrays.toString(keyBuffer)));
byteBuffer.get(slotBuffer);

long found = LongPacker.unpackLong(slotBuffer, keyLength);
if (found == 0) {
// The spot is empty use it
byteBuffer.position(slot * slotSize);
byteBuffer.put(keyBuffer);
int pos = LongPacker.packLong(offsetBuffer, offset);
byteBuffer.put(offsetBuffer, 0, pos);
break;
} else {
collision = true;
// Check for duplicates
if (Arrays.equals(keyBuffer, Arrays.copyOf(slotBuffer, keyLength))) {
throw new RuntimeException(
String.format("A duplicate key has been found for for key bytes %s", Arrays.toString(keyBuffer)));
}
}
}
}

if (collision) {
collisions++;
if (collision) {
collisions++;
}
}
}

String msg = " Max offset length: " + offsetLength + " bytes" +
"\n Slot size: " + slotSize + " bytes";
String msg = " Max offset length: " + offsetLength + " bytes" +
"\n Slot size: " + slotSize + " bytes";

LOGGER.log(Level.INFO, "Built index file {0}\n" + msg, indexFile.getName());
} finally {
// Close input
tempIndexStream.close();
LOGGER.log(Level.INFO, "Built index file {0}\n" + msg, indexFile.getName());
} finally {
// Close input
tempIndexStream.close();

// Close index and make sure resources are liberated
indexChannel.close();
indexChannel = null;
byteBuffer = null;

// Close index and make sure resources are liberated
indexChannel.close();
// Delete temp index file
if (tempIndexFile.delete()) {
LOGGER.log(Level.INFO, "Temporary index file {0} has been deleted", tempIndexFile.getName());
}
}
} finally{
indexAccessFile.close();
indexChannel = null;
indexAccessFile = null;
byteBuffer = null;
System.gc();

// Delete temp index file
if (tempIndexFile.delete()) {
LOGGER.log(Level.INFO, "Temporary index file {0} has been deleted", tempIndexFile.getName());
}
}

return indexFile;
Expand Down Expand Up @@ -386,17 +388,18 @@ private void mergeFiles(List<File> inputFiles, OutputStream outputStream)
if (f.exists()) {
FileInputStream fileInputStream = new FileInputStream(f);
BufferedInputStream bufferedInputStream = new BufferedInputStream(fileInputStream);
try {
LOGGER.log(Level.INFO, "Merging {0} size={1}", new Object[]{f.getName(), f.length()});

LOGGER.log(Level.INFO, "Merging {0} size={1}", new Object[]{f.getName(), f.length()});

byte[] buffer = new byte[8192];
int length;
while ((length = bufferedInputStream.read(buffer)) > 0) {
outputStream.write(buffer, 0, length);
byte[] buffer = new byte[8192];
int length;
while ((length = bufferedInputStream.read(buffer)) > 0) {
outputStream.write(buffer, 0, length);
}
} finally {
bufferedInputStream.close();
fileInputStream.close();
}

bufferedInputStream.close();
fileInputStream.close();
} else {
LOGGER.log(Level.INFO, "Skip merging file {0} because it doesn't exist", f.getName());
}
Expand Down
10 changes: 3 additions & 7 deletions paldb/src/main/java/com/linkedin/paldb/impl/WriterImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,9 @@ private WriterImpl(Configuration config, OutputStream stream, File file) {
this.file = file;

// Open storage
try {
LOGGER.log(Level.INFO, "Opening writer storage");
serialization = new StorageSerialization(config);
storage = new StorageWriter(config, outputStream);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
LOGGER.log(Level.INFO, "Opening writer storage");
serialization = new StorageSerialization(config);
storage = new StorageWriter(config, outputStream);
opened = true;
}

Expand Down
36 changes: 20 additions & 16 deletions paldb/src/main/java/com/linkedin/paldb/utils/TempUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public static File createTempDir(String prefix) {
"Failed to create directory within " + 10000 + " attempts (tried " + baseName + "0 to " + baseName + (10000 - 1)
+ ')');
}

/**
* Copies <code>inputStream</code> into a temporary file <code>fileName</code>.
*
Expand All @@ -65,23 +65,27 @@ public static File copyIntoTempFile(String fileName, InputStream inputStream)
throws IOException {
BufferedInputStream bufferedStream = inputStream instanceof BufferedInputStream ? (BufferedInputStream) inputStream
: new BufferedInputStream(inputStream);
File destFile = File.createTempFile(fileName, null);
destFile.deleteOnExit();

FileOutputStream fileOutputStream = new FileOutputStream(destFile);
BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(fileOutputStream);
File destFile = null;
try {
destFile = File.createTempFile(fileName, null);
destFile.deleteOnExit();

byte[] buffer = new byte[8192];
int length;
while ((length = bufferedStream.read(buffer)) > 0) {
bufferedOutputStream.write(buffer, 0, length);
FileOutputStream fileOutputStream = new FileOutputStream(destFile);
BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(fileOutputStream);
try {
byte[] buffer = new byte[8192];
int length;
while ((length = bufferedStream.read(buffer)) > 0) {
bufferedOutputStream.write(buffer, 0, length);
}
} finally {
bufferedOutputStream.close();
fileOutputStream.close();
}
} finally {
bufferedStream.close();
inputStream.close();
}

bufferedOutputStream.close();
fileOutputStream.close();

bufferedStream.close();
inputStream.close();
return destFile;
}
}

0 comments on commit 2c9af3f

Please sign in to comment.