diff --git a/paldb/src/main/java/com/linkedin/paldb/impl/StorageReader.java b/paldb/src/main/java/com/linkedin/paldb/impl/StorageReader.java index d7bdfcc..4fafb86 100644 --- a/paldb/src/main/java/com/linkedin/paldb/impl/StorageReader.java +++ b/paldb/src/main/java/com/linkedin/paldb/impl/StorageReader.java @@ -106,78 +106,82 @@ public class StorageReader implements Iterable> { } //Open file and read metadata + long createdAt = 0; + FormatVersion formatVersion = null; FileInputStream inputStream = new FileInputStream(path); DataInputStream dataInputStream = new DataInputStream(new BufferedInputStream(inputStream)); - int ignoredBytes = -2; - - //Byte mark - byte[] mark = FormatVersion.getPrefixBytes(); - int found = 0; - while (found != mark.length) { - byte b = dataInputStream.readByte(); - if (b == mark[found]) { - found++; - } else { - ignoredBytes += found + 1; - found = 0; + try { + int ignoredBytes = -2; + + //Byte mark + byte[] mark = FormatVersion.getPrefixBytes(); + int found = 0; + while (found != mark.length) { + byte b = dataInputStream.readByte(); + if (b == mark[found]) { + found++; + } else { + ignoredBytes += found + 1; + found = 0; + } } - } - //Version - byte[] versionFound = Arrays.copyOf(mark, FormatVersion.getLatestVersion().getBytes().length); - dataInputStream.readFully(versionFound, mark.length, versionFound.length - mark.length); + //Version + byte[] versionFound = Arrays.copyOf(mark, FormatVersion.getLatestVersion().getBytes().length); + dataInputStream.readFully(versionFound, mark.length, versionFound.length - mark.length); - FormatVersion formatVersion = FormatVersion.fromBytes(versionFound); - if (formatVersion == null || !formatVersion.is(FormatVersion.getLatestVersion())) { - throw new RuntimeException( - "Version mismatch, expected was '" + FormatVersion.getLatestVersion() + "' and found '" + formatVersion - + "'"); - } - - //Time - long createdAt = dataInputStream.readLong(); + formatVersion = FormatVersion.fromBytes(versionFound); + if (formatVersion == null || !formatVersion.is(FormatVersion.getLatestVersion())) { + throw new RuntimeException( + "Version mismatch, expected was '" + FormatVersion.getLatestVersion() + "' and found '" + formatVersion + + "'"); + } - //Metadata counters - keyCount = dataInputStream.readInt(); - keyLengthCount = dataInputStream.readInt(); - maxKeyLength = dataInputStream.readInt(); + //Time + createdAt = dataInputStream.readLong(); - //Read offset counts and keys - indexOffsets = new int[maxKeyLength + 1]; - dataOffsets = new long[maxKeyLength + 1]; - keyCounts = new int[maxKeyLength + 1]; - slots = new int[maxKeyLength + 1]; - slotSizes = new int[maxKeyLength + 1]; + //Metadata counters + keyCount = dataInputStream.readInt(); + keyLengthCount = dataInputStream.readInt(); + maxKeyLength = dataInputStream.readInt(); - int maxSlotSize = 0; - for (int i = 0; i < keyLengthCount; i++) { - int keyLength = dataInputStream.readInt(); + //Read offset counts and keys + indexOffsets = new int[maxKeyLength + 1]; + dataOffsets = new long[maxKeyLength + 1]; + keyCounts = new int[maxKeyLength + 1]; + slots = new int[maxKeyLength + 1]; + slotSizes = new int[maxKeyLength + 1]; - keyCounts[keyLength] = dataInputStream.readInt(); - slots[keyLength] = dataInputStream.readInt(); - slotSizes[keyLength] = dataInputStream.readInt(); - indexOffsets[keyLength] = dataInputStream.readInt(); - dataOffsets[keyLength] = dataInputStream.readLong(); + int maxSlotSize = 0; + for (int i = 0; i < keyLengthCount; i++) { + int keyLength = dataInputStream.readInt(); - maxSlotSize = Math.max(maxSlotSize, slotSizes[keyLength]); - } + keyCounts[keyLength] = dataInputStream.readInt(); + slots[keyLength] = dataInputStream.readInt(); + slotSizes[keyLength] = dataInputStream.readInt(); + indexOffsets[keyLength] = dataInputStream.readInt(); + dataOffsets[keyLength] = dataInputStream.readLong(); - slotBuffer = new byte[maxSlotSize]; + maxSlotSize = Math.max(maxSlotSize, slotSizes[keyLength]); + } - //Read serializers - try { - Serializers.deserialize(dataInputStream, config.getSerializers()); - } catch (Exception e) { - throw new RuntimeException(); - } + slotBuffer = new byte[maxSlotSize]; - //Read index and data offset - indexOffset = dataInputStream.readInt() + ignoredBytes; - dataOffset = dataInputStream.readLong() + ignoredBytes; + //Read serializers + try { + Serializers.deserialize(dataInputStream, config.getSerializers()); + } catch (Exception e) { + throw new RuntimeException(); + } - //Close metadata - dataInputStream.close(); - inputStream.close(); + //Read index and data offset + indexOffset = dataInputStream.readInt() + ignoredBytes; + dataOffset = dataInputStream.readLong() + ignoredBytes; + } finally { + //Close metadata + dataInputStream.close(); + inputStream.close(); + } //Create Mapped file in read-only mode mappedFile = new RandomAccessFile(path, "r"); diff --git a/paldb/src/main/java/com/linkedin/paldb/impl/StorageWriter.java b/paldb/src/main/java/com/linkedin/paldb/impl/StorageWriter.java index 6109ec0..c094fee 100644 --- a/paldb/src/main/java/com/linkedin/paldb/impl/StorageWriter.java +++ b/paldb/src/main/java/com/linkedin/paldb/impl/StorageWriter.java @@ -73,8 +73,7 @@ public class StorageWriter { // Number of collisions private int collisions; - StorageWriter(Configuration configuration, OutputStream stream) - throws IOException { + StorageWriter(Configuration configuration, OutputStream stream) { config = configuration; loadFactor = config.getDouble(Configuration.LOAD_FACTOR); if (loadFactor <= 0.0 || loadFactor >= 1.0) { @@ -279,78 +278,81 @@ private File buildIndex(int keyLength) // Init index File indexFile = new File(tempFolder, "index" + keyLength + ".dat"); RandomAccessFile indexAccessFile = new RandomAccessFile(indexFile, "rw"); - indexAccessFile.setLength(slots * slotSize); - FileChannel indexChannel = indexAccessFile.getChannel(); - MappedByteBuffer byteBuffer = indexChannel.map(FileChannel.MapMode.READ_WRITE, 0, indexAccessFile.length()); - - // Init reading stream - File tempIndexFile = indexFiles[keyLength]; - DataInputStream tempIndexStream = new DataInputStream(new BufferedInputStream(new FileInputStream(tempIndexFile))); try { - byte[] keyBuffer = new byte[keyLength]; - byte[] slotBuffer = new byte[slotSize]; - byte[] offsetBuffer = new byte[offsetLength]; - - // Read all keys - for (int i = 0; i < count; i++) { - // Read key - tempIndexStream.readFully(keyBuffer); - - // Read offset - long offset = LongPacker.unpackLong(tempIndexStream); - - // Hash - long hash = (long) HashUtils.hash(keyBuffer); - - boolean collision = false; - for (int probe = 0; probe < count; probe++) { - int slot = (int) ((hash + probe) % slots); - byteBuffer.position(slot * slotSize); - byteBuffer.get(slotBuffer); - - long found = LongPacker.unpackLong(slotBuffer, keyLength); - if (found == 0) { - // The spot is empty use it + indexAccessFile.setLength(slots * slotSize); + FileChannel indexChannel = indexAccessFile.getChannel(); + MappedByteBuffer byteBuffer = indexChannel.map(FileChannel.MapMode.READ_WRITE, 0, indexAccessFile.length()); + + // Init reading stream + File tempIndexFile = indexFiles[keyLength]; + DataInputStream tempIndexStream = new DataInputStream(new BufferedInputStream(new FileInputStream(tempIndexFile))); + try { + byte[] keyBuffer = new byte[keyLength]; + byte[] slotBuffer = new byte[slotSize]; + byte[] offsetBuffer = new byte[offsetLength]; + + // Read all keys + for (int i = 0; i < count; i++) { + // Read key + tempIndexStream.readFully(keyBuffer); + + // Read offset + long offset = LongPacker.unpackLong(tempIndexStream); + + // Hash + long hash = (long) HashUtils.hash(keyBuffer); + + boolean collision = false; + for (int probe = 0; probe < count; probe++) { + int slot = (int) ((hash + probe) % slots); byteBuffer.position(slot * slotSize); - byteBuffer.put(keyBuffer); - int pos = LongPacker.packLong(offsetBuffer, offset); - byteBuffer.put(offsetBuffer, 0, pos); - break; - } else { - collision = true; - // Check for duplicates - if (Arrays.equals(keyBuffer, Arrays.copyOf(slotBuffer, keyLength))) { - throw new RuntimeException( - String.format("A duplicate key has been found for for key bytes %s", Arrays.toString(keyBuffer))); + byteBuffer.get(slotBuffer); + + long found = LongPacker.unpackLong(slotBuffer, keyLength); + if (found == 0) { + // The spot is empty use it + byteBuffer.position(slot * slotSize); + byteBuffer.put(keyBuffer); + int pos = LongPacker.packLong(offsetBuffer, offset); + byteBuffer.put(offsetBuffer, 0, pos); + break; + } else { + collision = true; + // Check for duplicates + if (Arrays.equals(keyBuffer, Arrays.copyOf(slotBuffer, keyLength))) { + throw new RuntimeException( + String.format("A duplicate key has been found for for key bytes %s", Arrays.toString(keyBuffer))); + } } } - } - if (collision) { - collisions++; + if (collision) { + collisions++; + } } - } - String msg = " Max offset length: " + offsetLength + " bytes" + - "\n Slot size: " + slotSize + " bytes"; + String msg = " Max offset length: " + offsetLength + " bytes" + + "\n Slot size: " + slotSize + " bytes"; - LOGGER.log(Level.INFO, "Built index file {0}\n" + msg, indexFile.getName()); - } finally { - // Close input - tempIndexStream.close(); + LOGGER.log(Level.INFO, "Built index file {0}\n" + msg, indexFile.getName()); + } finally { + // Close input + tempIndexStream.close(); + + // Close index and make sure resources are liberated + indexChannel.close(); + indexChannel = null; + byteBuffer = null; - // Close index and make sure resources are liberated - indexChannel.close(); + // Delete temp index file + if (tempIndexFile.delete()) { + LOGGER.log(Level.INFO, "Temporary index file {0} has been deleted", tempIndexFile.getName()); + } + } + } finally{ indexAccessFile.close(); - indexChannel = null; indexAccessFile = null; - byteBuffer = null; System.gc(); - - // Delete temp index file - if (tempIndexFile.delete()) { - LOGGER.log(Level.INFO, "Temporary index file {0} has been deleted", tempIndexFile.getName()); - } } return indexFile; @@ -386,17 +388,18 @@ private void mergeFiles(List inputFiles, OutputStream outputStream) if (f.exists()) { FileInputStream fileInputStream = new FileInputStream(f); BufferedInputStream bufferedInputStream = new BufferedInputStream(fileInputStream); + try { + LOGGER.log(Level.INFO, "Merging {0} size={1}", new Object[]{f.getName(), f.length()}); - LOGGER.log(Level.INFO, "Merging {0} size={1}", new Object[]{f.getName(), f.length()}); - - byte[] buffer = new byte[8192]; - int length; - while ((length = bufferedInputStream.read(buffer)) > 0) { - outputStream.write(buffer, 0, length); + byte[] buffer = new byte[8192]; + int length; + while ((length = bufferedInputStream.read(buffer)) > 0) { + outputStream.write(buffer, 0, length); + } + } finally { + bufferedInputStream.close(); + fileInputStream.close(); } - - bufferedInputStream.close(); - fileInputStream.close(); } else { LOGGER.log(Level.INFO, "Skip merging file {0} because it doesn't exist", f.getName()); } diff --git a/paldb/src/main/java/com/linkedin/paldb/impl/WriterImpl.java b/paldb/src/main/java/com/linkedin/paldb/impl/WriterImpl.java index 35428a4..41421f0 100644 --- a/paldb/src/main/java/com/linkedin/paldb/impl/WriterImpl.java +++ b/paldb/src/main/java/com/linkedin/paldb/impl/WriterImpl.java @@ -77,13 +77,9 @@ private WriterImpl(Configuration config, OutputStream stream, File file) { this.file = file; // Open storage - try { - LOGGER.log(Level.INFO, "Opening writer storage"); - serialization = new StorageSerialization(config); - storage = new StorageWriter(config, outputStream); - } catch (IOException ex) { - throw new RuntimeException(ex); - } + LOGGER.log(Level.INFO, "Opening writer storage"); + serialization = new StorageSerialization(config); + storage = new StorageWriter(config, outputStream); opened = true; } diff --git a/paldb/src/main/java/com/linkedin/paldb/utils/TempUtils.java b/paldb/src/main/java/com/linkedin/paldb/utils/TempUtils.java index ff0ade0..5ea361e 100644 --- a/paldb/src/main/java/com/linkedin/paldb/utils/TempUtils.java +++ b/paldb/src/main/java/com/linkedin/paldb/utils/TempUtils.java @@ -52,7 +52,7 @@ public static File createTempDir(String prefix) { "Failed to create directory within " + 10000 + " attempts (tried " + baseName + "0 to " + baseName + (10000 - 1) + ')'); } - + /** * Copies inputStream into a temporary file fileName. * @@ -65,23 +65,27 @@ public static File copyIntoTempFile(String fileName, InputStream inputStream) throws IOException { BufferedInputStream bufferedStream = inputStream instanceof BufferedInputStream ? (BufferedInputStream) inputStream : new BufferedInputStream(inputStream); - File destFile = File.createTempFile(fileName, null); - destFile.deleteOnExit(); - - FileOutputStream fileOutputStream = new FileOutputStream(destFile); - BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(fileOutputStream); + File destFile = null; + try { + destFile = File.createTempFile(fileName, null); + destFile.deleteOnExit(); - byte[] buffer = new byte[8192]; - int length; - while ((length = bufferedStream.read(buffer)) > 0) { - bufferedOutputStream.write(buffer, 0, length); + FileOutputStream fileOutputStream = new FileOutputStream(destFile); + BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(fileOutputStream); + try { + byte[] buffer = new byte[8192]; + int length; + while ((length = bufferedStream.read(buffer)) > 0) { + bufferedOutputStream.write(buffer, 0, length); + } + } finally { + bufferedOutputStream.close(); + fileOutputStream.close(); + } + } finally { + bufferedStream.close(); + inputStream.close(); } - - bufferedOutputStream.close(); - fileOutputStream.close(); - - bufferedStream.close(); - inputStream.close(); return destFile; } }