Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor TMTTableFlusher #2

Merged
merged 1 commit into from
Feb 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions dbms/src/Debug/DBGInvoker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ DBGInvoker::DBGInvoker()
regFunc("mock_tidb_table", dbgFuncMockTiDBTable);
regFunc("drop_tidb_table", dbgFuncDropTiDBTable);

regFunc("set_flush_rows", dbgFuncSetFlushRows);
regFunc("set_deadline_seconds", dbgFuncSetDeadlineSeconds);
regFunc("set_flush_threshold", dbgFuncSetFlushThreshold);

regFunc("raft_insert_row", dbgFuncRaftInsertRow);
regFunc("raft_insert_rows", dbgFuncRaftInsertRows);
Expand Down
27 changes: 7 additions & 20 deletions dbms/src/Debug/dbgFuncMockTiDBData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,19 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}

void dbgFuncSetFlushRows(Context & context, const ASTs & args, DBGInvoker::Printer output)
void dbgFuncSetFlushThreshold(Context & context, const ASTs & args, DBGInvoker::Printer output)
{
if (args.size() != 1)
throw Exception("Args not matched, should be: threshold-rows", ErrorCodes::BAD_ARGUMENTS);
if (args.size() != 2)
throw Exception("Args not matched, should be: bytes, seconds", ErrorCodes::BAD_ARGUMENTS);

auto rows = safeGet<UInt64>(typeid_cast<const ASTLiteral &>(*args[0]).value);
TMTContext & tmt = context.getTMTContext();
tmt.table_flushers.setFlushThresholdRows(rows);

std::stringstream ss;
ss << "set flush threshold to " << rows << " rows";
output(ss.str());
}

void dbgFuncSetDeadlineSeconds(Context & context, const ASTs & args, DBGInvoker::Printer output)
{
if (args.size() != 1)
throw Exception("Args not matched, should be: second-uint", ErrorCodes::BAD_ARGUMENTS);

const UInt64 second = safeGet<UInt64>(typeid_cast<const ASTLiteral &>(*args[0]).value);
auto bytes = safeGet<UInt64>(typeid_cast<const ASTLiteral &>(*args[0]).value);
auto seconds = safeGet<UInt64>(typeid_cast<const ASTLiteral &>(*args[1]).value);

TMTContext & tmt = context.getTMTContext();
tmt.table_flushers.setDeadlineSeconds(second);
tmt.region_partition.setFlushThresholds({{bytes, Seconds(seconds)}});

std::stringstream ss;
ss << "set deadline seconds to " << second << "s";
ss << "set flush threshold to (" << bytes << " bytes, " << seconds << " seconds)";
output(ss.str());
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Debug/dbgFuncMockTiDBData.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace DB
// Change flush threshold rows
// Usage:
// ./storages-client.sh "DBGInvoke set_flush_rows(threshold_rows)"
void dbgFuncSetFlushRows(Context & context, const ASTs & args, DBGInvoker::Printer output);
void dbgFuncSetFlushThreshold(Context & context, const ASTs & args, DBGInvoker::Printer output);

// Change flush deadline seconds
// Usage:
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Debug/dbgFuncMockTiDBTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ void dbgFuncDropTiDBTable(Context & context, const ASTs & args, DBGInvoker::Prin

TMTContext & tmt = context.getTMTContext();
tmt.region_partition.dropRegionsInTable(table_id);
tmt.table_flushers.dropRegionsInTable(table_id);

MockTiDB::instance().dropTable(database_name, table_name);

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Debug/dbgFuncRegion.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ std::vector<std::tuple<HandleID, HandleID, RegionID>> getPartitionRegionRanges(
};

TMTContext & tmt = context.getTMTContext();
tmt.region_partition.traverseRegionsByTablePartition(table_id, partition_id, context, callback);
tmt.region_partition.traverseRegionsByTablePartition(table_id, partition_id, callback);
return handle_ranges;
}

Expand Down Expand Up @@ -494,7 +494,7 @@ void dbgFuncCheckRegionCorrect(Context & context, const ASTs & args, DBGInvoker:
for (UInt64 partition_id = 0; partition_id < partition_number; ++partition_id)
{
std::unordered_map<RegionID, RegionPtr> partition_regions;
tmt.region_partition.traverseRegionsByTablePartition(table_id, partition_id, context, [&](Regions regions)
tmt.region_partition.traverseRegionsByTablePartition(table_id, partition_id, [&](Regions regions)
{
for (auto region : regions)
partition_regions[region->id()] = region;
Expand Down
6 changes: 1 addition & 5 deletions dbms/src/Debug/dbgTools.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,6 @@ void insert(const TiDB::TableInfo & table_info, RegionID region_id, HandleID han
addRequestsToRaftCmd(cmds.add_requests(), region_id, key, value, prewrite_ts, commit_ts, false);
tmt.kvstore->onServiceCommand(cmds, raft_ctx);
}

tmt.table_flushers.onPutTryFlush(region);
}

void remove(const TiDB::TableInfo & table_info, RegionID region_id, HandleID handle_id, Context & context)
Expand All @@ -167,7 +165,6 @@ void remove(const TiDB::TableInfo & table_info, RegionID region_id, HandleID han
addRequestsToRaftCmd(cmds.add_requests(), region_id, key, value, prewrite_ts, commit_ts, true);

tmt.kvstore->onServiceCommand(cmds, raft_ctx);
tmt.table_flushers.onPutTryFlush(region);
}

struct BatchCtrl
Expand Down Expand Up @@ -271,7 +268,6 @@ void batchInsert(const TiDB::TableInfo & table_info, std::unique_ptr<BatchCtrl>
}

tmt.kvstore->onServiceCommand(cmds, raft_ctx);
tmt.table_flushers.onPutTryFlush(region);
}
}

Expand Down Expand Up @@ -322,7 +318,7 @@ Int64 concurrentRangeOperate(const TiDB::TableInfo & table_info, HandleID start_
for (UInt64 partition_id = 0; partition_id < partition_number; ++partition_id)
{
TMTContext & tmt = context.getTMTContext();
tmt.region_partition.traverseRegionsByTablePartition(table_info.id, partition_id, context, [&](Regions d){
tmt.region_partition.traverseRegionsByTablePartition(table_info.id, partition_id, [&](Regions d){
regions.insert(regions.end(), d.begin(), d.end());
});
}
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Raft/RaftService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,16 @@ grpc::Status RaftService::ApplyCommandBatch(grpc::ServerContext * grpc_context,
{
RaftContext rctx(&db_context, grpc_context, stream);
BackgroundProcessingPool::TaskHandle persist_handle;
BackgroundProcessingPool::TaskHandle flush_handle;

RegionPartition & region_partition = db_context.getTMTContext().region_partition;

try
{
kvstore->report(rctx);

persist_handle = background_pool.addTask([&, this] { return kvstore->tryPersistAndReport(rctx); });
flush_handle = background_pool.addTask([&] { return region_partition.tryFlushRegions(); });

enginepb::CommandRequestBatch request;
while (stream->Read(&request))
Expand All @@ -53,6 +58,8 @@ grpc::Status RaftService::ApplyCommandBatch(grpc::ServerContext * grpc_context,

if (persist_handle)
background_pool.removeTask(persist_handle);
if (flush_handle)
background_pool.removeTask(flush_handle);
zanmato1984 marked this conversation as resolved.
Show resolved Hide resolved

return grpc::Status::CANCELLED;
}
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
continue;
auto region_input_stream = tmt.region_partition.getBlockInputStreamByPartition(
data.table_info.id, partition_id, data.table_info, data.getColumns(), column_names_to_read,
const_cast<Context &>(context), false, true, query_info.resolve_locks, query_info.read_tso);
false, true, query_info.resolve_locks, query_info.read_tso);
if (region_input_stream)
res.emplace_back(region_input_stream);
}
Expand Down Expand Up @@ -671,7 +671,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
{
auto region_input_stream = tmt.region_partition.getBlockInputStreamByPartition(
data.table_info.id, partition_id, data.table_info, data.getColumns(), column_names_to_read,
const_cast<Context &>(context), false, true, query_info.resolve_locks, query_info.read_tso);
false, true, query_info.resolve_locks, query_info.read_tso);
if (region_input_stream)
{
BlockInputStreamPtr version_filtered_stream = std::make_shared<VersionFilterBlockInputStream>(
Expand Down
39 changes: 19 additions & 20 deletions dbms/src/Storages/Transaction/KVStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ extern const int LOGICAL_ERROR;
}

// TODO move to Settings.h
//static constexpr Int64 REGION_PERSIST_PERIOD = 60 * 1000 * 1000; // 1 minutes
//static constexpr Int64 KVSTORE_TRY_PERSIST_PERIOD = 10 * 1000 * 1000; // 10 seconds
static Seconds REGION_PERSIST_PERIOD(60); // 1 minutes
static Seconds KVSTORE_TRY_PERSIST_PERIOD(10); // 10 seconds

KVStore::KVStore(const std::string & data_dir, Context * context) : region_persister(data_dir), log(&Logger::get("KVStore"))
{
Expand Down Expand Up @@ -65,7 +65,7 @@ void KVStore::onSnapshot(const RegionPtr & region, Context * context)
}

if (tmt_ctx && old_region)
tmt_ctx->region_partition.removeRegion(old_region, *context);
tmt_ctx->region_partition.removeRegion(old_region);

region_persister.persist(region);

Expand All @@ -75,7 +75,7 @@ void KVStore::onSnapshot(const RegionPtr & region, Context * context)
}

if (tmt_ctx)
tmt_ctx->table_flushers.onPutTryFlush(region);
tmt_ctx->region_partition.applySnapshotRegion(region);
}

void KVStore::onServiceCommand(const enginepb::CommandRequestBatch & cmds, RaftContext & raft_ctx)
Expand Down Expand Up @@ -134,7 +134,9 @@ void KVStore::onServiceCommand(const enginepb::CommandRequestBatch & cmds, RaftC
continue;
}

auto [new_region, split_regions, sync] = curr_region->onCommand(cmd, callback);
auto before_cache_bytes = curr_region->dataSize();

auto [new_region, split_regions, table_ids, sync] = curr_region->onCommand(cmd, callback);

if (curr_region->isPendingRemove())
{
Expand All @@ -147,10 +149,11 @@ void KVStore::onServiceCommand(const enginepb::CommandRequestBatch & cmds, RaftC
continue;
}

// Persist current region and split regions, and mange data in partition
// Add to regions map so that queries can see them.

if (!split_regions.empty())
{
// Persist current region and split regions, and mange data in partition
// Add to regions map so that queries can see them.
// TODO: support atomic or idempotent operation.
{
std::lock_guard<std::mutex> lock(mutex);
Expand All @@ -168,23 +171,19 @@ void KVStore::onServiceCommand(const enginepb::CommandRequestBatch & cmds, RaftC
}

if (tmt_ctx)
{
tmt_ctx->region_partition.splitRegion(curr_region, split_regions, *context);

tmt_ctx->table_flushers.onPutTryFlush(curr_region);
for (const auto & region : split_regions)
tmt_ctx->table_flushers.onPutTryFlush(region);
}
tmt_ctx->region_partition.splitRegion(curr_region, split_regions);

region_persister.persist(curr_region);
for (const auto & region : split_regions)
region_persister.persist(region);
}
else if (sync)
else
{
if (tmt_ctx)
tmt_ctx->table_flushers.onPutTryFlush(curr_region);
region_persister.persist(curr_region);
tmt_ctx->region_partition.updateRegion(curr_region, before_cache_bytes, table_ids);

if (sync)
region_persister.persist(curr_region);
}

if (sync)
Expand Down Expand Up @@ -218,7 +217,7 @@ bool KVStore::tryPersistAndReport(RaftContext & context)
{
std::lock_guard<std::mutex> lock(mutex);

Poco::Timestamp now;
Timepoint now = Clock::now();
if (now < (last_try_persist_time + KVSTORE_TRY_PERSIST_PERIOD))
return false;
last_try_persist_time = now;
Expand All @@ -229,7 +228,7 @@ bool KVStore::tryPersistAndReport(RaftContext & context)
for (const auto & p : regions)
{
const auto region = p.second;
if (Poco::Timestamp() < (region->lastPersistTime() + REGION_PERSIST_PERIOD))
if (now < (region->lastPersistTime() + REGION_PERSIST_PERIOD))
continue;

persist_job = true;
Expand Down Expand Up @@ -263,7 +262,7 @@ void KVStore::removeRegion(RegionID region_id, Context * context)

region_persister.drop(region_id);
if (context)
context->getTMTContext().region_partition.removeRegion(region, *context);
context->getTMTContext().region_partition.removeRegion(region);
}

} // namespace DB
7 changes: 1 addition & 6 deletions dbms/src/Storages/Transaction/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,11 @@
#include <Storages/Transaction/Consistency.h>
#include <Storages/Transaction/Region.h>
#include <Storages/Transaction/RegionPersister.h>
#include <Storages/Transaction/TMTTableFlusher.h>
#include <Storages/Transaction/TiKVKeyValue.h>


namespace DB
{
// TODO move to Settings.h
static constexpr Int64 REGION_PERSIST_PERIOD = 60 * 1000 * 1000; // 1 minutes
static constexpr Int64 KVSTORE_TRY_PERSIST_PERIOD = 10 * 1000 * 1000; // 10 seconds

/// TODO: brief design document.
class KVStore final : private boost::noncopyable
Expand All @@ -40,7 +36,6 @@ class KVStore final : private boost::noncopyable
// Currently we also trigger region files GC in it.
bool tryPersistAndReport(RaftContext & context);

// TODO: Value copy instead of value ref
// For test, please do NOT remove.
RegionMap & _regions() { return regions; }

Expand All @@ -54,7 +49,7 @@ class KVStore final : private boost::noncopyable
std::mutex mutex;

Consistency consistency;
Poco::Timestamp last_try_persist_time{};
Timepoint last_try_persist_time = Clock::now();

Logger * log;
};
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/Transaction/LockException.h
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
#pragma once

#include <Common/Exception.h>
#include <Storages/Transaction/Region.h>

namespace DB
{

class LockException : public Exception
{
public:
explicit LockException(Region::LockInfos && lock_infos_) : lock_infos(std::move(lock_infos_))
{}
explicit LockException(Region::LockInfos && lock_infos_) : lock_infos(std::move(lock_infos_)) {}

Region::LockInfos lock_infos;
};

}
} // namespace DB
7 changes: 5 additions & 2 deletions dbms/src/Storages/Transaction/PartitionDataMover.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
#pragma once

#include "Storages/Transaction/Region.h"
#include "Storages/Transaction/TiKVKeyValue.h"
#include <Interpreters/Context.h>

#include <Storages/Transaction/Region.h>
#include <Storages/Transaction/TiKVKeyValue.h>
#include <Storages/StorageMergeTree.h>

namespace DB
{
Expand Down
Loading