Skip to content

Commit

Permalink
[#1955] imporvement(core): Fix resource leak in RocksDBKvBackend.java (
Browse files Browse the repository at this point in the history
…#2138)

<!--
1. Title: [#<issue>] <type>(<scope>): <subject>
   Examples:
     - "[#123] feat(operator): support xxx"
     - "[#233] fix: check null before access result in xxx"
     - "[MINOR] refactor: fix typo in variable name"
     - "[MINOR] docs: fix typo in README"
     - "[#255] test: fix flaky test NameOfTheTest"
   Reference: https://www.conventionalcommits.org/en/v1.0.0/
2. If the PR is unfinished, please mark this PR as draft.
-->

### What changes were proposed in this pull request?

close #1955

### Why are the changes needed?

(Please clarify why the changes are needed. For instance,
  1. If you propose a new API, clarify the use case for a new API.
  2. If you fix a bug, describe the bug.)

Fix: #1955 

### Does this PR introduce _any_ user-facing change?
- no

### How was this patch tested?

- unit
  • Loading branch information
coolderli authored Feb 17, 2024
1 parent baa4355 commit b11a32f
Showing 1 changed file with 56 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ public class RocksDBKvBackend implements KvBackend {
*/
private RocksDB initRocksDB(Config config) throws RocksDBException {
RocksDB.loadLibrary();
final Options options = new Options();
options.setCreateIfMissing(true);

String dbPath = config.get(Configs.ENTRY_KV_ROCKSDB_BACKEND_PATH);
File dbDir = new File(dbPath, "instance");
try {
try (final Options options = new Options()) {
options.setCreateIfMissing(true);

if (!dbDir.exists() && !dbDir.mkdirs()) {
throw new RocksDBException(
String.format("Can't create RocksDB path '%s'", dbDir.getAbsolutePath()));
Expand Down Expand Up @@ -109,43 +109,45 @@ public byte[] get(byte[] key) throws IOException {
@Override
public List<Pair<byte[], byte[]>> scan(KvRange scanRange) throws IOException {
RocksIterator rocksIterator = db.newIterator();
rocksIterator.seek(scanRange.getStart());
try {
rocksIterator.seek(scanRange.getStart());

List<Pair<byte[], byte[]>> result = Lists.newArrayList();
int count = 0;
while (count < scanRange.getLimit() && rocksIterator.isValid()) {
byte[] key = rocksIterator.key();
List<Pair<byte[], byte[]>> result = Lists.newArrayList();
int count = 0;
while (count < scanRange.getLimit() && rocksIterator.isValid()) {
byte[] key = rocksIterator.key();

// Break if the key is out of the scan range
if (Bytes.wrap(key).compareTo(scanRange.getEnd()) > 0) {
break;
}
// Break if the key is out of the scan range
if (Bytes.wrap(key).compareTo(scanRange.getEnd()) > 0) {
break;
}

if (!scanRange.getPredicate().test(key, rocksIterator.value())) {
rocksIterator.next();
continue;
}
if (!scanRange.getPredicate().test(key, rocksIterator.value())) {
rocksIterator.next();
continue;
}

if (Bytes.wrap(key).compareTo(scanRange.getStart()) == 0) {
if (scanRange.isStartInclusive()) {
if (Bytes.wrap(key).compareTo(scanRange.getStart()) == 0) {
if (scanRange.isStartInclusive()) {
result.add(Pair.of(key, rocksIterator.value()));
count++;
}
} else if (Bytes.wrap(key).compareTo(scanRange.getEnd()) == 0) {
if (scanRange.isEndInclusive()) {
result.add(Pair.of(key, rocksIterator.value()));
}
break;
} else {
result.add(Pair.of(key, rocksIterator.value()));
count++;
}
} else if (Bytes.wrap(key).compareTo(scanRange.getEnd()) == 0) {
if (scanRange.isEndInclusive()) {
result.add(Pair.of(key, rocksIterator.value()));
}
break;
} else {
result.add(Pair.of(key, rocksIterator.value()));
count++;
}

rocksIterator.next();
rocksIterator.next();
}
return result;
} finally {
rocksIterator.close();
}

rocksIterator.close();
return result;
}

@Override
Expand All @@ -161,33 +163,36 @@ public boolean delete(byte[] key) throws IOException {
@Override
public boolean deleteRange(KvRange deleteRange) throws IOException {
RocksIterator rocksIterator = db.newIterator();
rocksIterator.seek(deleteRange.getStart());

while (rocksIterator.isValid()) {
byte[] key = rocksIterator.key();
// Break if the key is out of the scan range
if (Bytes.wrap(key).compareTo(deleteRange.getEnd()) > 0) {
break;
}
try {
rocksIterator.seek(deleteRange.getStart());

if (Bytes.wrap(key).compareTo(deleteRange.getStart()) == 0) {
if (deleteRange.isStartInclusive()) {
delete(key);
while (rocksIterator.isValid()) {
byte[] key = rocksIterator.key();
// Break if the key is out of the scan range
if (Bytes.wrap(key).compareTo(deleteRange.getEnd()) > 0) {
break;
}
} else if (Bytes.wrap(key).compareTo(deleteRange.getEnd()) == 0) {
if (deleteRange.isEndInclusive()) {

if (Bytes.wrap(key).compareTo(deleteRange.getStart()) == 0) {
if (deleteRange.isStartInclusive()) {
delete(key);
}
} else if (Bytes.wrap(key).compareTo(deleteRange.getEnd()) == 0) {
if (deleteRange.isEndInclusive()) {
delete(key);
}
break;
} else {
delete(key);
}
break;
} else {
delete(key);
}

rocksIterator.next();
rocksIterator.next();
}
return true;
} finally {
rocksIterator.close();
}

rocksIterator.close();
return true;
}

@Override
Expand Down

0 comments on commit b11a32f

Please sign in to comment.