Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add refresh_iter to release pinned MemTable and SST #1212

Open
wants to merge 1 commit into
base: fb-mysql-8.0.28
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions storage/rocksdb/ha_rocksdb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4061,6 +4061,32 @@ class Rdb_transaction {
return get_iterator(options, column_family, table_type);
}

rocksdb::Iterator *refresh_iterator(
const rocksdb::Snapshot* snapshot,
rocksdb::ColumnFamilyHandle *const column_family, bool skip_bloom_filter,
const rocksdb::Slice &eq_cond_lower_bound,
const rocksdb::Slice &eq_cond_upper_bound, TABLE_TYPE table_type) {
rocksdb::ReadOptions options = m_read_opts;
const bool fill_cache = !THDVAR(get_thd(), skip_fill_cache);
if (skip_bloom_filter) {
const bool enable_iterate_bounds =
THDVAR(get_thd(), enable_iterate_bounds);
options.total_order_seek = true;
options.iterate_lower_bound =
enable_iterate_bounds ? &eq_cond_lower_bound : nullptr;
options.iterate_upper_bound =
enable_iterate_bounds ? &eq_cond_upper_bound : nullptr;
} else {
// With this option, Iterator::Valid() returns false if key
// is outside of the prefix bloom filter range set at Seek().
// Must not be set to true if not using bloom filter.
options.prefix_same_as_start = true;
}
options.fill_cache = fill_cache;
options.snapshot = snapshot;
return get_iterator(options, column_family, table_type);
}

virtual bool is_tx_started(TABLE_TYPE table_type) const = 0;
virtual void start_tx(TABLE_TYPE table_type) = 0;
virtual void start_stmt() = 0;
Expand Down Expand Up @@ -17485,6 +17511,16 @@ rocksdb::Iterator *rdb_tx_get_iterator(
}
}

rocksdb::Iterator *rdb_tx_refresh_iterator(
THD *thd, rocksdb::ColumnFamilyHandle *const cf, bool skip_bloom_filter,
const rocksdb::Slice &eq_cond_lower_bound,
const rocksdb::Slice &eq_cond_upper_bound,
const rocksdb::Snapshot *snapshot, TABLE_TYPE table_type) {
Rdb_transaction *tx = get_tx_from_thd(thd);
return tx->refresh_iterator(snapshot, cf, skip_bloom_filter,
eq_cond_lower_bound, eq_cond_upper_bound, table_type);
}

bool rdb_tx_started(Rdb_transaction *tx, TABLE_TYPE table_type) {
return tx->is_tx_started(table_type);
}
Expand Down
6 changes: 6 additions & 0 deletions storage/rocksdb/ha_rocksdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -1148,6 +1148,12 @@ rocksdb::Iterator *rdb_tx_get_iterator(
const rocksdb::Snapshot **snapshot, TABLE_TYPE table_type,
bool read_current = false, bool create_snapshot = true);

rocksdb::Iterator *rdb_tx_refresh_iterator(
THD *thd, rocksdb::ColumnFamilyHandle *const cf, bool skip_bloom_filter,
const rocksdb::Slice &eq_cond_lower_bound,
const rocksdb::Slice &eq_cond_upper_bound,
const rocksdb::Snapshot *snapshot, TABLE_TYPE table_type);

rocksdb::Status rdb_tx_get(Rdb_transaction *tx,
rocksdb::ColumnFamilyHandle *const column_family,
const rocksdb::Slice &key,
Expand Down
30 changes: 30 additions & 0 deletions storage/rocksdb/rdb_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,30 @@ void Rdb_iterator_base::setup_scan_iterator(const rocksdb::Slice *const slice,
}
}

// This function is intented for releasing MemTable and SST objects held by
// rocksdb::Version object which referenced by old rocksdb::Iterator, newly
// created Iterator may reference a newer rocksdb::Version object, The data
// view of these 2 iterators are identical.
void Rdb_iterator_base::refresh_iter() {
std::string curr_key;
bool valid = m_scan_it->Valid();
if (valid) {
curr_key = m_scan_it->key().ToString();
}
delete m_scan_it;
bool skip_bloom = m_scan_it_skips_bloom;
m_scan_it = rdb_tx_refresh_iterator(
m_thd, m_kd->get_cf(), skip_bloom, m_scan_it_lower_bound_slice,
m_scan_it_upper_bound_slice, m_scan_it_snapshot, m_table_type);
if (valid) {
m_scan_it->Seek(curr_key);
SHIP_ASSERT(m_scan_it->Valid());
SHIP_ASSERT(m_scan_it->key() == curr_key);
} else {
SHIP_ASSERT(!m_scan_it->Valid());
}
}

int Rdb_iterator_base::calc_eq_cond_len(enum ha_rkey_function find_flag,
const rocksdb::Slice &start_key,
const int bytes_changed_by_succ,
Expand Down Expand Up @@ -214,6 +238,12 @@ int Rdb_iterator_base::next_with_direction(bool move_forward, bool skip_next) {
const auto &kd = *m_kd;
Rdb_transaction *const tx = get_tx_from_thd(m_thd);

const uint32_t refresh_interval = 100000;
if (++m_call_cnt >= refresh_interval) {
refresh_iter();
m_call_cnt = 0;
}

for (;;) {
DEBUG_SYNC(m_thd, "rocksdb.check_flags_nwd");
if (thd_killed(m_thd)) {
Expand Down
2 changes: 2 additions & 0 deletions storage/rocksdb/rdb_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ class Rdb_iterator_base : public Rdb_iterator {

/* Iterator used for range scans and for full table/index scans */
rocksdb::Iterator *m_scan_it;
uint32_t m_call_cnt = 0; // for refresh_iter
void refresh_iter();

/* Whether m_scan_it was created with skip_bloom=true */
bool m_scan_it_skips_bloom;
Expand Down