Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement iterate_upper_bound for txn iterators #5105

Closed
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 25 additions & 5 deletions db/c.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1763,18 +1763,38 @@ void rocksdb_writebatch_wi_rollback_to_save_point(rocksdb_writebatch_wi_t* b,
rocksdb_iterator_t* rocksdb_writebatch_wi_create_iterator_with_base(
rocksdb_writebatch_wi_t* wbwi,
rocksdb_iterator_t* base_iterator) {
rocksdb_iterator_t* result = new rocksdb_iterator_t;
result->rep = wbwi->rep->NewIteratorWithBase(base_iterator->rep);
delete base_iterator;
return result;
rocksdb_readoptions_t options;
return rocksdb_writebatch_wi_create_iterator_with_base_and_readoptions(
&options, wbwi, base_iterator);
}

rocksdb_iterator_t* rocksdb_writebatch_wi_create_iterator_with_base_cf(
rocksdb_writebatch_wi_t* wbwi, rocksdb_iterator_t* base_iterator,
rocksdb_column_family_handle_t* column_family) {
rocksdb_readoptions_t options;
return rocksdb_writebatch_wi_create_iterator_with_base_cf_and_readoptions(
&options, wbwi, base_iterator, column_family);
}

rocksdb_iterator_t*
rocksdb_writebatch_wi_create_iterator_with_base_and_readoptions(
const rocksdb_readoptions_t* options, rocksdb_writebatch_wi_t* wbwi,
rocksdb_iterator_t* base_iterator) {
rocksdb_iterator_t* result = new rocksdb_iterator_t;
result->rep =
wbwi->rep->NewIteratorWithBase(column_family->rep, base_iterator->rep);
wbwi->rep->NewIteratorWithBase(options->rep, base_iterator->rep);
delete base_iterator;
return result;
}

rocksdb_iterator_t*
rocksdb_writebatch_wi_create_iterator_with_base_cf_and_readoptions(
const rocksdb_readoptions_t* options, rocksdb_writebatch_wi_t* wbwi,
rocksdb_iterator_t* base_iterator,
rocksdb_column_family_handle_t* column_family) {
rocksdb_iterator_t* result = new rocksdb_iterator_t;
result->rep = wbwi->rep->NewIteratorWithBase(options->rep, column_family->rep,
base_iterator->rep);
delete base_iterator;
return result;
}
Expand Down
4 changes: 3 additions & 1 deletion db/c_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -918,7 +918,8 @@ int main(int argc, char** argv) {
rocksdb_writebatch_wi_put(wbi, "bar", 3, "b", 1);
rocksdb_writebatch_wi_delete(wbi, "foo", 3);
rocksdb_iterator_t* iter =
rocksdb_writebatch_wi_create_iterator_with_base(wbi, base_iter);
rocksdb_writebatch_wi_create_iterator_with_base_and_readoptions(
roptions, wbi, base_iter);
CheckCondition(!rocksdb_iter_valid(iter));
rocksdb_iter_seek_to_first(iter);
CheckCondition(rocksdb_iter_valid(iter));
Expand Down Expand Up @@ -1644,6 +1645,7 @@ int main(int argc, char** argv) {
// Check iterator with column family
rocksdb_transaction_put_cf(txn, cfh1, "key1_cf", 7, "val1_cf", 7, &err);
CheckNoError(err);
rocksdb_readoptions_set_iterate_upper_bound(roptions, "key2", 4);
rocksdb_iterator_t* iter =
rocksdb_transaction_create_iterator_cf(txn, roptions, cfh1);
CheckCondition(!rocksdb_iter_valid(iter));
Expand Down
8 changes: 8 additions & 0 deletions include/rocksdb/c.h
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,14 @@ extern ROCKSDB_LIBRARY_API rocksdb_iterator_t* rocksdb_writebatch_wi_create_iter
rocksdb_writebatch_wi_t* wbwi,
rocksdb_iterator_t* base_iterator,
rocksdb_column_family_handle_t* cf);
extern ROCKSDB_LIBRARY_API rocksdb_iterator_t*
rocksdb_writebatch_wi_create_iterator_with_base_and_readoptions(
const rocksdb_readoptions_t* options, rocksdb_writebatch_wi_t* wbwi,
rocksdb_iterator_t* base_iterator);
extern ROCKSDB_LIBRARY_API rocksdb_iterator_t*
rocksdb_writebatch_wi_create_iterator_with_base_cf_and_readoptions(
const rocksdb_readoptions_t* options, rocksdb_writebatch_wi_t* wbwi,
rocksdb_iterator_t* base_iterator, rocksdb_column_family_handle_t* cf);

/* Block based table options */

Expand Down
2 changes: 2 additions & 0 deletions include/rocksdb/iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ class Iterator : public Cleanable {
// Position at the last key in the source that at or before target.
// The iterator is Valid() after this call iff the source contains
// an entry that comes at or before target.
// Note: If iterate_upper_bound is set and SeekForPrev is called with target
// greater or equal to iterate_upper_bound, the behavior is undefined
virtual void SeekForPrev(const Slice& target) = 0;

// Moves to the next entry in the source. After this call, Valid() is
Expand Down
6 changes: 6 additions & 0 deletions include/rocksdb/utilities/write_batch_with_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,15 @@ class WriteBatchWithIndex : public WriteBatchBase {
// key() and value() of the iterator. This invalidation happens even before
// the write batch update finishes. The state may recover after Next() is
// called.
Iterator* NewIteratorWithBase(const ReadOptions& read_options,
ColumnFamilyHandle* column_family,
Iterator* base_iterator);

Iterator* NewIteratorWithBase(ColumnFamilyHandle* column_family,
Iterator* base_iterator);
// default column family
Iterator* NewIteratorWithBase(const ReadOptions& read_options,
Iterator* base_iterator);
Iterator* NewIteratorWithBase(Iterator* base_iterator);

// Similar to DB::Get() but will only read the key from this batch.
Expand Down
12 changes: 6 additions & 6 deletions java/rocksjni/write_batch_with_index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -453,15 +453,15 @@ jlong Java_org_rocksdb_WriteBatchWithIndex_iterator1(JNIEnv* /*env*/,
* Method: iteratorWithBase
* Signature: (JJJ)J
*/
jlong Java_org_rocksdb_WriteBatchWithIndex_iteratorWithBase(JNIEnv* /*env*/,
jobject /*jobj*/,
jlong jwbwi_handle,
jlong jcf_handle,
jlong jbi_handle) {
jlong Java_org_rocksdb_WriteBatchWithIndex_iteratorWithBase(
JNIEnv* /*env*/, jobject /*jobj*/, jlong jwbwi_handle, jlong jcf_handle,
jlong jbi_handle, jlong jreadopt_handle) {
auto* wbwi = reinterpret_cast<rocksdb::WriteBatchWithIndex*>(jwbwi_handle);
auto* cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
auto* base_iterator = reinterpret_cast<rocksdb::Iterator*>(jbi_handle);
auto* iterator = wbwi->NewIteratorWithBase(cf_handle, base_iterator);
auto* readopt = reinterpret_cast<rocksdb::ReadOptions*>(jreadopt_handle);
auto* iterator =
wbwi->NewIteratorWithBase(*readopt, cf_handle, base_iterator);
return reinterpret_cast<jlong>(iterator);
}

Expand Down
63 changes: 53 additions & 10 deletions java/src/main/java/org/rocksdb/WriteBatchWithIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
*
* A user can call {@link org.rocksdb.WriteBatchWithIndex#newIterator()} to
* create an iterator over the write batch or
* {@link org.rocksdb.WriteBatchWithIndex#newIteratorWithBase(org.rocksdb.RocksIterator)}
* to get an iterator for the database with Read-Your-Own-Writes like capability
* {@link org.rocksdb.WriteBatchWithIndex#newIteratorWithBase(org.rocksdb.ReadOptions,
* org.rocksdb.RocksIterator)} to get an iterator for the database with Read-Your-Own-Writes like
* capability
*/
public class WriteBatchWithIndex extends AbstractWriteBatch {
/**
Expand Down Expand Up @@ -120,23 +121,64 @@ public WBWIRocksIterator newIterator() {
* the write batch update finishes. The state may recover after Next() is
* called.
*
* @param read_opts The read options to use
* @param columnFamilyHandle The column family to iterate over
* @param baseIterator The base iterator,
* e.g. {@link org.rocksdb.RocksDB#newIterator()}
* @return An iterator which shows a view comprised of both the database
* point-in-time from baseIterator and modifications made in this write batch.
*/
public RocksIterator newIteratorWithBase(
final ColumnFamilyHandle columnFamilyHandle,
final RocksIterator baseIterator) {
public RocksIterator newIteratorWithBase(final ReadOptions read_opts,
final ColumnFamilyHandle columnFamilyHandle, final RocksIterator baseIterator) {
RocksIterator iterator = new RocksIterator(baseIterator.parent_,
iteratorWithBase(
nativeHandle_, columnFamilyHandle.nativeHandle_, baseIterator.nativeHandle_));
iteratorWithBase(nativeHandle_, columnFamilyHandle.nativeHandle_,
baseIterator.nativeHandle_, read_opts.nativeHandle_));
// when the iterator is deleted it will also delete the baseIterator
baseIterator.disOwnNativeHandle();
return iterator;
}

/**
* Provides Read-Your-Own-Writes like functionality by
* creating a new Iterator that will use {@link org.rocksdb.WBWIRocksIterator}
* as a delta and baseIterator as a base
*
* Updating write batch with the current key of the iterator is not safe.
* We strongly recommand users not to do it. It will invalidate the current
* key() and value() of the iterator. This invalidation happens even before
* the write batch update finishes. The state may recover after Next() is
* called.
*
* @param columnFamilyHandle The column family to iterate over
* @param baseIterator The base iterator,
* e.g. {@link org.rocksdb.RocksDB#newIterator()}
* @return An iterator which shows a view comprised of both the database
* point-in-time from baseIterator and modifications made in this write batch.
*/
public RocksIterator newIteratorWithBase(
final ColumnFamilyHandle columnFamilyHandle, final RocksIterator baseIterator) {
ReadOptions read_opts = new ReadOptions();
return newIteratorWithBase(read_opts, columnFamilyHandle, baseIterator);
}

/**
* Provides Read-Your-Own-Writes like functionality by
* creating a new Iterator that will use {@link org.rocksdb.WBWIRocksIterator}
* as a delta and baseIterator as a base. Operates on the default column
* family.
*
* @param read_opts The read options to use
* @param baseIterator The base iterator,
* e.g. {@link org.rocksdb.RocksDB#newIterator()}
* @return An iterator which shows a view comprised of both the database
* point-in-timefrom baseIterator and modifications made in this write batch.
*/
public RocksIterator newIteratorWithBase(
final ReadOptions read_opts, final RocksIterator baseIterator) {
return newIteratorWithBase(
read_opts, baseIterator.parent_.getDefaultColumnFamily(), baseIterator);
}

/**
* Provides Read-Your-Own-Writes like functionality by
* creating a new Iterator that will use {@link org.rocksdb.WBWIRocksIterator}
Expand All @@ -149,7 +191,8 @@ public RocksIterator newIteratorWithBase(
* point-in-timefrom baseIterator and modifications made in this write batch.
*/
public RocksIterator newIteratorWithBase(final RocksIterator baseIterator) {
return newIteratorWithBase(baseIterator.parent_.getDefaultColumnFamily(), baseIterator);
ReadOptions read_opts = new ReadOptions();
return newIteratorWithBase(read_opts, baseIterator);
}

/**
Expand Down Expand Up @@ -292,8 +335,8 @@ private native static long newWriteBatchWithIndex(
final boolean overwriteKey);
private native long iterator0(final long handle);
private native long iterator1(final long handle, final long cfHandle);
private native long iteratorWithBase(
final long handle, final long baseIteratorHandle, final long cfHandle);
private native long iteratorWithBase(final long handle, final long baseIteratorHandle,
final long cfHandle, final long jreadopt_handle);
private native byte[] getFromBatch(final long handle, final long optHandle,
final byte[] key, final int keyLen);
private native byte[] getFromBatch(final long handle, final long optHandle,
Expand Down
45 changes: 45 additions & 0 deletions utilities/transactions/optimistic_transaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,51 @@ TEST_F(OptimisticTransactionTest, UntrackedWrites) {
delete txn;
}

TEST_F(OptimisticTransactionTest, IteratorUpperBoundTest) {
WriteOptions write_options;
auto txn = unique_ptr<Transaction>(txn_db->BeginTransaction(write_options));

string key1 = "a1";
string key2 = "a3";
string key3 = "b1";
string val = "123";
txn->Put(key1, val);
txn->Put(key2, val);

Status s = txn->Commit();
ASSERT_OK(s);
txn = unique_ptr<Transaction>(txn_db->BeginTransaction(write_options));
txn->Put(key3, val);

string ubKey("a2");
Slice upperbound(ubKey);
ReadOptions read_options;
read_options.iterate_upper_bound = &upperbound;
auto it = unique_ptr<Iterator>(txn->GetIterator(read_options));
for (it->SeekToFirst(); it->Valid(); it->Next()) {
EXPECT_LT(it->key().ToString(), ubKey);
}
EXPECT_GE(it->key().ToString(), ubKey);
int key_count = 0;
for (it->SeekToFirst(); it->Valid(); it->Next()) {
EXPECT_LT(it->key().ToString(), ubKey);
key_count++;
}
ASSERT_EQ(key_count, 1);
// Test Seek to a key equal or over upper bound
it->Seek("a2");
ASSERT_FALSE(it->Valid());
it->Seek("a3");
ASSERT_FALSE(it->Valid());
it->Seek("a1");
ASSERT_TRUE(it->Valid());
it.reset();

s = txn->Commit();
ASSERT_OK(s);
txn.reset();
}

TEST_F(OptimisticTransactionTest, IteratorTest) {
WriteOptions write_options;
ReadOptions read_options, snapshot_read_options;
Expand Down
4 changes: 2 additions & 2 deletions utilities/transactions/transaction_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -313,15 +313,15 @@ Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options) {
Iterator* db_iter = db_->NewIterator(read_options);
assert(db_iter);

return write_batch_.NewIteratorWithBase(db_iter);
return write_batch_.NewIteratorWithBase(read_options, db_iter);
}

Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options,
ColumnFamilyHandle* column_family) {
Iterator* db_iter = db_->NewIterator(read_options, column_family);
assert(db_iter);

return write_batch_.NewIteratorWithBase(column_family, db_iter);
return write_batch_.NewIteratorWithBase(read_options, column_family, db_iter);
}

Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family,
Expand Down
4 changes: 2 additions & 2 deletions utilities/transactions/write_prepared_txn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options) {
Iterator* db_iter = wpt_db_->NewIterator(options);
assert(db_iter);

return write_batch_.NewIteratorWithBase(db_iter);
return write_batch_.NewIteratorWithBase(options, db_iter);
}

Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options,
Expand All @@ -76,7 +76,7 @@ Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options,
Iterator* db_iter = wpt_db_->NewIterator(options, column_family);
assert(db_iter);

return write_batch_.NewIteratorWithBase(column_family, db_iter);
return write_batch_.NewIteratorWithBase(options, column_family, db_iter);
}

Status WritePreparedTxn::PrepareInternal() {
Expand Down
2 changes: 1 addition & 1 deletion utilities/transactions/write_unprepared_txn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ Iterator* WriteUnpreparedTxn::GetIterator(const ReadOptions& options,
Iterator* db_iter = wupt_db_->NewIterator(options, column_family, this);
assert(db_iter);

return write_batch_.NewIteratorWithBase(column_family, db_iter);
return write_batch_.NewIteratorWithBase(options, column_family, db_iter);
}

const std::map<SequenceNumber, size_t>&
Expand Down
Loading