Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLASH-194] Fix Bug About ApplySnapshot #19

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 84 additions & 64 deletions dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,19 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
const unsigned num_streams,
Int64 max_block_number_to_read) const
{
size_t part_index = 0;
MergeTreeData::DataPartsVector parts = data.getDataPartsVector();
RangesInDataParts parts_with_ranges;

bool is_txn_engine = data.merging_params.mode == MergeTreeData::MergingParams::Txn;

// TODO: set regions_query_info from setting.
std::vector<RegionQueryInfo> regions_query_info;
std::vector<bool> regions_query_res;
BlockInputStreams region_block_data;
String handle_col_name;
size_t region_cnt = 0;
std::vector<RangesInDataParts> region_range_parts;
std::vector<size_t> rows_in_mem;

/// If query contains restrictions on the virtual column `_part` or `_part_index`, select only parts suitable for it.
/// The virtual column `_sample_factor` (which is equal to 1 / used sample rate) can be requested in the query.
Names virt_column_names;
Expand Down Expand Up @@ -257,6 +266,61 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
}
}

if (is_txn_engine)
{
handle_col_name = data.getPrimarySortDescription()[0].column_name;
ASTSelectQuery & select = typeid_cast<ASTSelectQuery &>(*query_info.query);

TMTContext & tmt = context.getTMTContext();

if (!select.no_kvstore)
{
tmt.region_table.traverseRegionsByTable(data.table_info.id, [&](Regions regions) {
for (const auto & region : regions)
{
regions_query_info.push_back({region->id(), region->version(), region->getHandleRangeByTable(data.table_info.id)});
}
});
}

std::sort(regions_query_info.begin(), regions_query_info.end());
region_cnt = regions_query_info.size();
region_range_parts.assign(region_cnt, {});
regions_query_res.assign(region_cnt, true);
region_block_data.assign(region_cnt, nullptr);
rows_in_mem.assign(region_cnt, 0);

Names column_names_to_read = real_column_names;

extend_mutable_engine_column_names(column_names_to_read, data);

// get data block from region first.

for (size_t region_index = 0; region_index < region_cnt; ++region_index)
{
const RegionQueryInfo & region_query_info = regions_query_info[region_index];

auto [region_input_stream, status, tol] = tmt.region_table.getBlockInputStreamByRegion(
data.table_info.id, region_query_info.region_id, region_query_info.version,
data.table_info, data.getColumns(), column_names_to_read,
true, query_info.resolve_locks, query_info.read_tso);
if (status != RegionTable::OK)
{
regions_query_res[region_index] = false;
LOG_INFO(log, "Region " << region_query_info.region_id << ", version " << region_query_info.version
<< ", handle range [" << region_query_info.range_in_table.first
<< ", " << region_query_info.range_in_table.second << ") , status "
<< RegionTable::RegionReadStatusString(status));
continue;
}
region_block_data[region_index] = region_input_stream;
rows_in_mem[region_index] = tol;
}
}

size_t part_index = 0;
MergeTreeData::DataPartsVector parts = data.getDataPartsVector();

NamesAndTypesList available_real_columns = data.getColumns().getAllPhysical();

NamesAndTypesList available_real_and_virtual_columns = available_real_columns;
Expand Down Expand Up @@ -597,37 +661,6 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
SizeLimits(settings.max_rows_to_transfer, settings.max_bytes_to_transfer, settings.transfer_overflow_mode)).read();
}

/// @todo Make sure partition select works properly when sampling is used!

// TODO: set regions_query_info from setting.

std::vector<RegionQueryInfo> regions_query_info;
std::vector<bool> regions_query_res;
BlockInputStreams region_block_data;
String handle_col_name;

if (is_txn_engine)
{
handle_col_name = data.primary_expr_ast->children[0]->getColumnName();

TMTContext & tmt = context.getTMTContext();

tmt.region_table.traverseRegionsByTable(data.table_info.id, [&](Regions regions) {
for (const auto & region : regions)
{
regions_query_info.push_back({region->id(), region->version(), region->getHandleRangeByTable(data.table_info.id)});
}
});
}

std::sort(regions_query_info.begin(), regions_query_info.end());
size_t region_cnt = regions_query_info.size();
std::vector<RangesInDataParts> region_range_parts(region_cnt);
regions_query_res.assign(region_cnt, true);
region_block_data.assign(region_cnt, nullptr);

RangesInDataParts parts_with_ranges;

/// Let's find what range to read from each part.
size_t sum_marks = 0;
size_t sum_ranges = 0;
Expand Down Expand Up @@ -658,50 +691,35 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
{
TMTContext & tmt = context.getTMTContext();

extend_mutable_engine_column_names(column_names_to_read, data);

// get data block from region first.
std::vector<size_t> rows_in_mem(region_cnt, 0);

for (size_t region_index = 0; region_index < region_cnt; ++region_index)
{
if (select.no_kvstore)
continue;

if (!regions_query_res[region_index])
continue;

const RegionQueryInfo & region_query_info = regions_query_info[region_index];

auto [region_input_stream, status, tol] = tmt.region_table.getBlockInputStreamByRegion(
data.table_info.id, region_query_info.region_id, region_query_info.version,
data.table_info, data.getColumns(), column_names_to_read,
true, query_info.resolve_locks, query_info.read_tso);
if (status != RegionTable::OK)
if (tmt.kvstore->getRegion(region_query_info.region_id) == nullptr)
{
// Region may be removed.
// If region is in kvstore, even if its state is pending_remove, the new parts with del data are not flushed into ch.
regions_query_res[region_index] = false;
LOG_INFO(log, "Region " << region_query_info.region_id << ", version " << regions_query_info[region_index].version
<< ", handle range [" << regions_query_info[region_index].range_in_table.first
<< ", " << regions_query_info[region_index].range_in_table.second << ") , status "
<< RegionTable::RegionReadStatusString(status));
LOG_INFO(log, "Region " << region_query_info.region_id << ", version " << region_query_info.version
<< ", handle range [" << region_query_info.range_in_table.first
<< ", " << region_query_info.range_in_table.second << ") , status "
<< RegionTable::RegionReadStatusString(RegionTable::RegionReadStatus::NOT_FOUND));
continue;
}
region_block_data[region_index] = region_input_stream;
rows_in_mem[region_index] = tol;
}

for (size_t region_index = 0; region_index < region_cnt; ++region_index)
{
if (select.no_kvstore)
continue;

if (!regions_query_res[region_index])
continue;

size_t sum_marks = 0;
size_t sum_ranges = 0;
for (const RangesInDataPart & ranges : parts_with_ranges)
{
MarkRanges mark_ranges = MarkRangesFromRegionRange(ranges.data_part->index,
regions_query_info[region_index].range_in_table.first,
regions_query_info[region_index].range_in_table.second,
region_query_info.range_in_table.first,
region_query_info.range_in_table.second,
ranges.ranges,
(settings.merge_tree_min_rows_for_seek
+ data.index_granularity - 1)
Expand All @@ -715,10 +733,10 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
sum_marks += range.end - range.begin;
}

LOG_TRACE(log, "Region " << regions_query_info[region_index].region_id << ", version "
<< regions_query_info[region_index].version
<< ", handle range [" << regions_query_info[region_index].range_in_table.first
<< ", " << regions_query_info[region_index].range_in_table.second << "), selected "
LOG_TRACE(log, "Region " << region_query_info.region_id << ", version "
<< region_query_info.version
<< ", handle range [" << region_query_info.range_in_table.first
<< ", " << region_query_info.range_in_table.second << "), selected "
<< region_range_parts[region_index].size() << " parts, " << sum_marks << " marks to read from "
<< sum_ranges << " ranges, read " << rows_in_mem[region_index] << " rows from memory");
}
Expand All @@ -741,6 +759,8 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
if (sum_marks > max_marks_to_use_cache)
use_uncompressed_cache = false;

extend_mutable_engine_column_names(column_names_to_read, data);

if (select.raw_for_mutable)
{
res = spreadMarkRangesAmongStreams(
Expand Down
53 changes: 30 additions & 23 deletions dbms/src/Storages/Transaction/KVStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}

KVStore::KVStore(const std::string & data_dir) : region_persister(data_dir), log(&Logger::get("KVStore"))
{
}
KVStore::KVStore(const std::string & data_dir) : region_persister(data_dir), log(&Logger::get("KVStore")) {}

void KVStore::restore(const Region::RegionClientCreateFunc & region_client_create, std::vector<RegionID> * regions_to_remove)
{
Expand Down Expand Up @@ -51,31 +49,38 @@ void KVStore::traverseRegions(std::function<void(const RegionID region_id, const
callback(it->first, it->second);
}

void KVStore::onSnapshot(const RegionPtr & region, Context * context)
void KVStore::onSnapshot(RegionPtr new_region, Context * context)
{
TMTContext * tmt_ctx = (bool)(context) ? &(context->getTMTContext()) : nullptr;
auto region_id = region->id();
TMTContext * tmt_ctx = context ? &(context->getTMTContext()) : nullptr;

RegionPtr old_region;
{
std::lock_guard<std::mutex> lock(mutex);
auto it = regions.find(region_id);
if (it != regions.end())
old_region = it->second;
}
std::lock_guard<std::mutex> lock(task_mutex);

if (old_region != nullptr && old_region->getIndex() >= region->getIndex())
return;

region_persister.persist(region);
RegionID region_id = new_region->id();
RegionPtr old_region = getRegion(region_id);
if (old_region != nullptr)
{
LOG_DEBUG(log, "KVStore::onSnapshot: previous " << old_region->toString(true) << " ; new " << new_region->toString(true));

{
std::lock_guard<std::mutex> lock(mutex);
regions.insert_or_assign(region_id, region);
if (old_region->getIndex() >= new_region->getIndex())
{
LOG_DEBUG(log, "KVStore::onSnapshot: discard new region because of index is outdated");
return;
}
old_region->reset(std::move(*new_region));
new_region = old_region;
}
else
{
std::lock_guard<std::mutex> lock(mutex);
regions[region_id] = new_region;
}
}

region_persister.persist(new_region);

if (tmt_ctx)
tmt_ctx->region_table.applySnapshotRegion(region);
tmt_ctx->region_table.applySnapshotRegion(new_region);
}

void KVStore::onServiceCommand(const enginepb::CommandRequestBatch & cmds, RaftContext & raft_ctx)
Expand All @@ -96,6 +101,9 @@ void KVStore::onServiceCommand(const enginepb::CommandRequestBatch & cmds, RaftC
{
auto & header = cmd.header();
auto curr_region_id = header.region_id();

std::lock_guard<std::mutex> lock(task_mutex);

RegionPtr curr_region;
{
std::lock_guard<std::mutex> lock(mutex);
Expand Down Expand Up @@ -258,9 +266,8 @@ void KVStore::removeRegion(RegionID region_id, Context * context)
void KVStore::checkRegion(RegionTable & region_table)
{
std::unordered_set<RegionID> region_in_table;
region_table.traverseRegions([&](TableID, RegionTable::InternalRegion & internal_region){
region_in_table.insert(internal_region.region_id);
});
region_table.traverseRegions(
[&](TableID, RegionTable::InternalRegion & internal_region) { region_in_table.insert(internal_region.region_id); });
for (auto && [id, region] : regions)
{
if (region_in_table.count(id))
Expand Down
13 changes: 8 additions & 5 deletions dbms/src/Storages/Transaction/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@
#include <Storages/Transaction/Consistency.h>
#include <Storages/Transaction/Region.h>
#include <Storages/Transaction/RegionPersister.h>
#include <Storages/Transaction/TiKVKeyValue.h>
#include <Storages/Transaction/RegionTable.h>
#include <Storages/Transaction/TiKVKeyValue.h>


namespace DB
{

// TODO move to Settings.h
static const Seconds REGION_PERSIST_PERIOD(120); // 2 minutes
static const Seconds REGION_PERSIST_PERIOD(120); // 2 minutes
static const Seconds KVSTORE_TRY_PERSIST_PERIOD(20); // 20 seconds

/// TODO: brief design document.
Expand All @@ -32,7 +32,7 @@ class KVStore final : private boost::noncopyable
RegionPtr getRegion(RegionID region_id);
void traverseRegions(std::function<void(const RegionID region_id, const RegionPtr & region)> callback);

void onSnapshot(const RegionPtr & region, Context * context);
void onSnapshot(RegionPtr region, Context * context);
// TODO: remove RaftContext and use Context + CommandServerReaderWriter
void onServiceCommand(const enginepb::CommandRequestBatch & cmds, RaftContext & context);

Expand All @@ -41,8 +41,8 @@ class KVStore final : private boost::noncopyable

// Persist and report those expired regions.
// Currently we also trigger region files GC in it.
bool tryPersistAndReport(RaftContext & context, const Seconds kvstore_try_persist_period=KVSTORE_TRY_PERSIST_PERIOD,
const Seconds region_persist_period=REGION_PERSIST_PERIOD);
bool tryPersistAndReport(RaftContext & context, const Seconds kvstore_try_persist_period = KVSTORE_TRY_PERSIST_PERIOD,
const Seconds region_persist_period = REGION_PERSIST_PERIOD);

const RegionMap & getRegions();

Expand All @@ -59,6 +59,9 @@ class KVStore final : private boost::noncopyable
Consistency consistency;
std::atomic<Timepoint> last_try_persist_time = Clock::now();

// onServiceCommand and onSnapshot should not be called concurrently
mutable std::mutex task_mutex;

Logger * log;
};

Expand Down
20 changes: 18 additions & 2 deletions dbms/src/Storages/Transaction/Region.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,9 @@ RegionPtr Region::splitInto(const RegionMeta & meta)
auto [start_key, end_key] = meta.getRange();
RegionPtr new_region;
if (client != nullptr)
new_region = std::make_shared<Region>(
meta, [&](pingcap::kv::RegionVerID) { return std::make_shared<pingcap::kv::RegionClient>(client->cache, client->client, meta.getRegionVerID()); });
new_region = std::make_shared<Region>(meta, [&](pingcap::kv::RegionVerID) {
return std::make_shared<pingcap::kv::RegionClient>(client->cache, client->client, meta.getRegionVerID());
});
else
new_region = std::make_shared<Region>(meta);

Expand Down Expand Up @@ -612,4 +613,19 @@ std::pair<HandleID, HandleID> getHandleRangeByTable(const TiKVKey & start_key, c
return {start_handle, end_handle};
}

void Region::reset(Region && new_region)
{
std::lock_guard<std::mutex> lock(mutex);

data_cf = std::move(new_region.data_cf);
write_cf = std::move(new_region.write_cf);
lock_cf = std::move(new_region.lock_cf);

cf_data_size = new_region.cf_data_size.load();

persist_parm++;

meta.swap(new_region.meta);
}

} // namespace DB
Loading