diff --git a/leveldb-benchmark/src/main/java/org/iq80/leveldb/benchmark/DbBenchmark.java b/leveldb-benchmark/src/main/java/org/iq80/leveldb/benchmark/DbBenchmark.java index c2f17443..d415fad9 100644 --- a/leveldb-benchmark/src/main/java/org/iq80/leveldb/benchmark/DbBenchmark.java +++ b/leveldb-benchmark/src/main/java/org/iq80/leveldb/benchmark/DbBenchmark.java @@ -29,7 +29,6 @@ import org.iq80.leveldb.ReadOptions; import org.iq80.leveldb.WriteBatch; import org.iq80.leveldb.WriteOptions; -import org.iq80.leveldb.impl.DbImpl; import org.iq80.leveldb.table.BloomFilterPolicy; import org.iq80.leveldb.util.Closeables; import org.iq80.leveldb.util.FileUtils; @@ -55,7 +54,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static java.nio.charset.StandardCharsets.UTF_8; -import static org.iq80.leveldb.impl.DbConstants.NUM_LEVELS; public class DbBenchmark { @@ -214,7 +212,7 @@ else if (benchmark.equals("heapprofile")) { heapProfile(); } else if (benchmark.equals("stats")) { - printStats(); + printStats("leveldb.stats"); } else { System.err.println("Unknown benchmark: " + benchmark); @@ -570,12 +568,7 @@ private void readWhileWriting(ThreadState thread) private void compact(ThreadState thread) throws IOException { - if (db instanceof DbImpl) { - ((DbImpl) db).testCompactMemTable(); - for (int level = 0; level < NUM_LEVELS - 1; level++) { - ((DbImpl) db).compactRange(level, Slices.copiedBuffer("", UTF_8), Slices.copiedBuffer("~", UTF_8)); - } - } + db.compactRange(null, null); } private void crc32c(final ThreadState thread) @@ -727,8 +720,12 @@ private void destroyDb() FileUtils.deleteRecursively(databaseDir); } - private void printStats() + private void printStats(String name) { + final String property = db.getProperty(name); + if (property != null) { + System.out.print(property); + } //To change body of created methods use File | Settings | File Templates. } @@ -782,13 +779,10 @@ private enum Flag // stats -- Print DB stats // heapprofile -- Dump a heap profile (if supported by this port) benchmarks(ImmutableList.of( - "fillseq", - "fillseq", "fillseq", "fillsync", "fillrandom", "overwrite", - "fillseq", "readrandom", "readrandom", // Extra run to allow previous compactions to quiesce "readseq", @@ -801,7 +795,8 @@ private enum Flag // "crc32c", "snappycomp", "unsnap-array", - "unsnap-direct" + "unsnap-direct", + "stats" // "acquireload" )) { @Override diff --git a/leveldb/src/main/java/org/iq80/leveldb/impl/Compaction.java b/leveldb/src/main/java/org/iq80/leveldb/impl/Compaction.java index 243ab2cf..eeb6089a 100644 --- a/leveldb/src/main/java/org/iq80/leveldb/impl/Compaction.java +++ b/leveldb/src/main/java/org/iq80/leveldb/impl/Compaction.java @@ -197,11 +197,6 @@ public boolean shouldStopBefore(InternalKey internalKey) } } - public List[] getInputs() - { - return inputs; - } - @Override public void close() { @@ -210,4 +205,9 @@ public void close() inputVersion = null; } } + + public List input(int which) + { + return inputs[which]; + } } diff --git a/leveldb/src/main/java/org/iq80/leveldb/impl/DbImpl.java b/leveldb/src/main/java/org/iq80/leveldb/impl/DbImpl.java index 5e96961f..95d618f9 100644 --- a/leveldb/src/main/java/org/iq80/leveldb/impl/DbImpl.java +++ b/leveldb/src/main/java/org/iq80/leveldb/impl/DbImpl.java @@ -76,7 +76,6 @@ import static java.util.Objects.requireNonNull; import static org.iq80.leveldb.impl.DbConstants.L0_SLOWDOWN_WRITES_TRIGGER; import static org.iq80.leveldb.impl.DbConstants.L0_STOP_WRITES_TRIGGER; -import static org.iq80.leveldb.impl.DbConstants.NUM_LEVELS; import static org.iq80.leveldb.impl.SequenceNumber.MAX_SEQUENCE_NUMBER; import static org.iq80.leveldb.impl.ValueType.DELETION; import static org.iq80.leveldb.impl.ValueType.VALUE; @@ -104,6 +103,7 @@ public class DbImpl private final Deque writers = new LinkedList<>(); private final SnapshotList snapshots = new SnapshotList(mutex); private final WriteBatchImpl tmpBatch = new WriteBatchImpl(); + private final Env env; private LogWriter log; @@ -118,9 +118,12 @@ public class DbImpl private ManualCompaction manualCompaction; - public DbImpl(Options options, File databaseDir) + private CompactionStats[] stats = new CompactionStats[DbConstants.NUM_LEVELS]; + + public DbImpl(Options options, File databaseDir, Env env) throws IOException { + this.env = env; requireNonNull(options, "options is null"); requireNonNull(databaseDir, "databaseDir is null"); this.options = options; @@ -176,6 +179,10 @@ public void uncaughtException(Thread t, Throwable e) checkArgument(databaseDir.exists(), "Database directory '%s' does not exist and could not be created", databaseDir); checkArgument(databaseDir.isDirectory(), "Database directory '%s' is not a directory", databaseDir); + for (int i = 0; i < DbConstants.NUM_LEVELS; i++) { + stats[i] = new CompactionStats(); + } + mutex.lock(); try { // lock the database dir @@ -301,7 +308,28 @@ public String getProperty(String name) final int level = Integer.valueOf(matcher.group(1)); return String.valueOf(versions.numberOfFilesInLevel(level)); } - //TODO implement stats + matcher = Pattern.compile("stats") + .matcher(key); + if (matcher.matches()) { + final StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append(" Compactions\n"); + stringBuilder.append("Level Files Size(MB) Time(sec) Read(MB) Write(MB)\n"); + stringBuilder.append("--------------------------------------------------\n"); + for (int level = 0; level < DbConstants.NUM_LEVELS; level++) { + int files = versions.numberOfFilesInLevel(level); + if (stats[level].micros > 0 || files > 0) { + stringBuilder.append(String.format( + "%3d %8d %8.0f %9.0f %8.0f %9.0f\n", + level, + files, + versions.numberOfBytesInLevel(level) / 1048576.0, + stats[level].micros / 1e6, + stats[level].bytesRead / 1048576.0, + stats[level].bytesWritten / 1048576.0)); + } + } + return stringBuilder.toString(); + } //TODO implement sstables //TODO implement approximate-memory-usage } @@ -387,7 +415,7 @@ public void flushMemTable() public void compactRange(int level, Slice start, Slice end) { checkArgument(level >= 0, "level is negative"); - checkArgument(level + 1 < NUM_LEVELS, "level is greater than or equal to %s", NUM_LEVELS); + checkArgument(level + 1 < DbConstants.NUM_LEVELS, "level is greater than or equal to %s", DbConstants.NUM_LEVELS); requireNonNull(start, "start is null"); requireNonNull(end, "end is null"); @@ -1050,7 +1078,6 @@ private void compactMemTable() versions.logAndApply(edit, mutex); immutableMemTable = null; - deleteObsoleteFiles(); } finally { @@ -1061,6 +1088,7 @@ private void compactMemTable() private void writeLevel0Table(MemTable mem, VersionEdit edit, Version base) throws IOException { + final long startMicros = env.nowMicros(); checkState(mutex.isHeldByCurrentThread()); // skip empty mem table @@ -1092,6 +1120,7 @@ private void writeLevel0Table(MemTable mem, VersionEdit edit, Version base) } edit.addFile(level, meta); } + this.stats[level].Add(env.nowMicros() - startMicros, 0, meta.getFileSize()); } private FileMetaData buildTable(SeekingIterable data, long fileNumber) @@ -1139,6 +1168,8 @@ private FileMetaData buildTable(SeekingIterable data, long f private void doCompactionWork(CompactionState compactionState) throws IOException { + final long startMicros = env.nowMicros(); + long immMicros = 0; // Micros spent doing imm_ compactions checkState(mutex.isHeldByCurrentThread()); checkArgument(versions.numberOfBytesInLevel(compactionState.getCompaction().getLevel()) > 0); checkArgument(compactionState.builder == null); @@ -1157,6 +1188,7 @@ private void doCompactionWork(CompactionState compactionState) long lastSequenceForKey = MAX_SEQUENCE_NUMBER; while (iterator.hasNext() && !shuttingDown.get()) { // always give priority to compacting the current mem table + long immStart = env.nowMicros(); if (immutableMemTable != null) { mutex.lock(); try { @@ -1166,7 +1198,7 @@ private void doCompactionWork(CompactionState compactionState) mutex.unlock(); } } - + immMicros += (env.nowMicros() - immStart); InternalKey key = iterator.peek().getKey(); if (compactionState.compaction.shouldStopBefore(key) && compactionState.builder != null) { finishCompactionOutputFile(compactionState); @@ -1237,11 +1269,20 @@ else if (key.getValueType() == DELETION && } } finally { + long micros = env.nowMicros() - startMicros - immMicros; + long bytesRead = 0; + for (int which = 0; which < 2; which++) { + for (int i = 0; i < compactionState.compaction.input(which).size(); i++) { + bytesRead += compactionState.compaction.input(which, i).getFileSize(); + } + } + long bytesWritten = 0; + for (int i = 0; i < compactionState.outputs.size(); i++) { + bytesWritten += compactionState.outputs.get(i).getFileSize(); + } mutex.lock(); + this.stats[compactionState.compaction.getLevel() + 1].Add(micros, bytesRead, bytesWritten); } - - // todo port CompactionStats code - installCompactionResults(compactionState); } @@ -1429,6 +1470,29 @@ private ManualCompaction(int level, InternalKey begin, InternalKey end) } } + // Per level compaction stats. stats[level] stores the stats for + // compactions that produced data for the specified "level". + private static class CompactionStats + { + long micros; + long bytesRead; + long bytesWritten; + + CompactionStats() + { + this.micros = 0; + this.bytesRead = 0; + this.bytesWritten = 0; + } + + public void Add(long micros, long bytesRead, long bytesWritten) + { + this.micros += micros; + this.bytesRead += bytesRead; + this.bytesWritten += bytesWritten; + } + } + private WriteBatchImpl readWriteBatch(SliceInput record, int updateSize) throws IOException { diff --git a/leveldb/src/main/java/org/iq80/leveldb/impl/Env.java b/leveldb/src/main/java/org/iq80/leveldb/impl/Env.java new file mode 100644 index 00000000..36989713 --- /dev/null +++ b/leveldb/src/main/java/org/iq80/leveldb/impl/Env.java @@ -0,0 +1,23 @@ +/* + * Copyright (C) 2011 the original author or authors. + * See the notice.md file distributed with this work for additional + * information regarding copyright ownership. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.iq80.leveldb.impl; + +public interface Env +{ + long nowMicros(); +} diff --git a/leveldb/src/main/java/org/iq80/leveldb/impl/EnvImpl.java b/leveldb/src/main/java/org/iq80/leveldb/impl/EnvImpl.java new file mode 100644 index 00000000..c6029227 --- /dev/null +++ b/leveldb/src/main/java/org/iq80/leveldb/impl/EnvImpl.java @@ -0,0 +1,29 @@ +/* + * Copyright (C) 2011 the original author or authors. + * See the notice.md file distributed with this work for additional + * information regarding copyright ownership. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.iq80.leveldb.impl; + +import java.util.concurrent.TimeUnit; + +public class EnvImpl implements Env +{ + @Override + public long nowMicros() + { + return TimeUnit.NANOSECONDS.convert(System.nanoTime(), TimeUnit.MICROSECONDS); + } +} diff --git a/leveldb/src/main/java/org/iq80/leveldb/impl/Iq80DBFactory.java b/leveldb/src/main/java/org/iq80/leveldb/impl/Iq80DBFactory.java index 4a8244ac..400eddc0 100644 --- a/leveldb/src/main/java/org/iq80/leveldb/impl/Iq80DBFactory.java +++ b/leveldb/src/main/java/org/iq80/leveldb/impl/Iq80DBFactory.java @@ -80,7 +80,7 @@ public class Iq80DBFactory public DB open(File path, Options options) throws IOException { - return new DbImpl(options, path); + return new DbImpl(options, path, new EnvImpl()); } @Override diff --git a/leveldb/src/main/java/org/iq80/leveldb/impl/VersionSet.java b/leveldb/src/main/java/org/iq80/leveldb/impl/VersionSet.java index b850e02b..90ab3e77 100644 --- a/leveldb/src/main/java/org/iq80/leveldb/impl/VersionSet.java +++ b/leveldb/src/main/java/org/iq80/leveldb/impl/VersionSet.java @@ -207,14 +207,14 @@ public MergingIterator makeInputIterator(Compaction c) // TODO(opt): use concatenating iterator for level-0 if there is no overlap List list = new ArrayList<>(); for (int which = 0; which < 2; which++) { - if (!c.getInputs()[which].isEmpty()) { + List files = c.input(which); + if (!files.isEmpty()) { if (c.getLevel() + which == 0) { - List files = c.getInputs()[which]; list.add(new Level0Iterator(tableCache, files, internalKeyComparator)); } else { // Create concatenating iterator for the files from this level - list.add(Level.createLevelConcatIterator(tableCache, c.getInputs()[which], internalKeyComparator)); + list.add(Level.createLevelConcatIterator(tableCache, files, internalKeyComparator)); } } } diff --git a/leveldb/src/test/java/org/iq80/leveldb/impl/DbImplTest.java b/leveldb/src/test/java/org/iq80/leveldb/impl/DbImplTest.java index 22a16ae1..2f7bb41a 100644 --- a/leveldb/src/test/java/org/iq80/leveldb/impl/DbImplTest.java +++ b/leveldb/src/test/java/org/iq80/leveldb/impl/DbImplTest.java @@ -96,7 +96,7 @@ public void testBackgroundCompaction(final String desc, final Options options) { options.maxOpenFiles(100); options.createIfMissing(true); - DbImpl db = new DbImpl(options, this.databaseDir); + DbImpl db = new DbImpl(options, this.databaseDir, new EnvImpl()); Random random = new Random(301); for (int i = 0; i < 200000 * STRESS_FACTOR; i++) { db.put(randomString(random, 64).getBytes(), new byte[] {0x01}, new WriteOptions().sync(false)); @@ -113,7 +113,7 @@ public void testConcurrentWrite() throws Exception Options options = new Options(); options.maxOpenFiles(50); options.createIfMissing(true); - final DbImpl db = new DbImpl(options, this.databaseDir); + final DbImpl db = new DbImpl(options, this.databaseDir, new EnvImpl()); ExecutorService ex = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 4); try { final int numEntries = 1000000; @@ -169,7 +169,7 @@ public void testCompactionsOnBigDataSet(final String desc, final Options options throws Exception { options.createIfMissing(true); - DbImpl db = new DbImpl(options, databaseDir); + DbImpl db = new DbImpl(options, databaseDir, new EnvImpl()); for (int index = 0; index < 5000000; index++) { String key = "Key LOOOOOOOOOOOOOOOOOONG KEY " + index; String value = "This is element " + index + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABZASDFASDKLFJASDFKJSDFLKSDJFLKJSDHFLKJHSDJFSDFHJASDFLKJSDF"; @@ -1327,7 +1327,7 @@ private DbStringWrapper(Options options, File databaseDir) { this.options = options.verifyChecksums(true).createIfMissing(true).errorIfExists(true); this.databaseDir = databaseDir; - this.db = new DbImpl(options, databaseDir); + this.db = new DbImpl(options, databaseDir, new EnvImpl()); opened.add(this); } @@ -1438,7 +1438,7 @@ public void reopen(Options options) throws IOException { db.close(); - db = new DbImpl(options.verifyChecksums(true).createIfMissing(false).errorIfExists(false), databaseDir); + db = new DbImpl(options.verifyChecksums(true).createIfMissing(false).errorIfExists(false), databaseDir, new EnvImpl()); } private List allEntriesFor(String userKey) diff --git a/leveldb/src/test/java/org/iq80/leveldb/table/TableTest.java b/leveldb/src/test/java/org/iq80/leveldb/table/TableTest.java index c4abfae5..cad9b437 100644 --- a/leveldb/src/test/java/org/iq80/leveldb/table/TableTest.java +++ b/leveldb/src/test/java/org/iq80/leveldb/table/TableTest.java @@ -23,6 +23,7 @@ import org.iq80.leveldb.Options; import org.iq80.leveldb.impl.DbConstants; import org.iq80.leveldb.impl.DbImpl; +import org.iq80.leveldb.impl.EnvImpl; import org.iq80.leveldb.impl.InternalEntry; import org.iq80.leveldb.impl.InternalKey; import org.iq80.leveldb.impl.InternalKeyComparator; @@ -789,7 +790,7 @@ protected void finish(Options options, UserComparator comparator, KVMap kvMap) t .errorIfExists(true) .writeBufferSize(10000); // Something small to force merging tmpDir = FileUtils.createTempDir("leveldb"); - this.db = new DbImpl(options, tmpDir); + this.db = new DbImpl(options, tmpDir, new EnvImpl()); for (Map.Entry entry : kvMap.entrySet()) { db.put(entry.getKey().getBytes(), entry.getValue().getBytes()); }