Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLASH-206] Refactor Process About Region Data #36

Merged
merged 12 commits into from
Apr 8, 2019
Prev Previous commit
Next Next commit
use template to make code better.
solotzg committed Apr 4, 2019
commit 8595a25ccfec9e6acdd88ef25b1e7c2c8c45c4c7
2 changes: 1 addition & 1 deletion dbms/src/Storages/Transaction/PartitionStreams.cpp
Original file line number Diff line number Diff line change
@@ -66,7 +66,7 @@ std::tuple<BlockInputStreamPtr, RegionTable::RegionReadStatus, size_t> RegionTab
{
Region::LockInfoPtr lock_info = nullptr;
if (resolve_locks)
lock_info = scanner->getLockInfo(table_id, start_ts);
lock_info = scanner->getLockInfo(start_ts);
if (lock_info)
{
Region::LockInfos lock_infos;
14 changes: 9 additions & 5 deletions dbms/src/Storages/Transaction/Region.cpp
Original file line number Diff line number Diff line change
@@ -19,11 +19,15 @@ const String Region::lock_cf_name = "lock";
const String Region::default_cf_name = "default";
const String Region::write_cf_name = "write";

RegionData::WriteCFIter Region::removeDataByWriteIt(const RegionData::WriteCFIter & write_it) { return data.removeDataByWriteIt(write_it); }
RegionData::WriteCFIter Region::removeDataByWriteIt(const TableID & table_id, const RegionData::WriteCFIter & write_it)
{
return data.removeDataByWriteIt(table_id, write_it);
}

RegionData::ReadInfo Region::readDataByWriteIt(const RegionData::ConstWriteCFIter & write_it, std::vector<RegionWriteCFData::Key> * keys)
RegionData::ReadInfo Region::readDataByWriteIt(
const TableID & table_id, const RegionData::ConstWriteCFIter & write_it, RegionWriteCFDataTrait::Keys * keys)
{
return data.readDataByWriteIt(write_it, keys);
return data.readDataByWriteIt(table_id, write_it, keys);
}

Region::LockInfoPtr Region::getLockInfo(TableID expected_table_id, UInt64 start_ts)
@@ -379,9 +383,9 @@ std::unique_ptr<Region::CommittedScanner> Region::createCommittedScanner(TableID
return std::make_unique<Region::CommittedScanner>(this->shared_from_this(), expected_table_id);
}

std::unique_ptr<Region::CommittedRemover> Region::createCommittedRemover()
std::unique_ptr<Region::CommittedRemover> Region::createCommittedRemover(TableID expected_table_id)
{
return std::make_unique<Region::CommittedRemover>(this->shared_from_this());
return std::make_unique<Region::CommittedRemover>(this->shared_from_this(), expected_table_id);
}

std::string Region::toString(bool dump_status) const { return meta.toString(dump_status); }
66 changes: 41 additions & 25 deletions dbms/src/Storages/Transaction/Region.h
Original file line number Diff line number Diff line change
@@ -46,55 +46,71 @@ class Region : public std::enable_shared_from_this<Region>
{
public:
CommittedScanner(const RegionPtr & store_, TableID expected_table_id_)
: store(store_), lock(store_->mutex), expected_table_id(expected_table_id_), write_map_it(store->data.write_cf.map.cbegin())
{}

/// Check if next kv exists.
/// Return InvalidTableID if not.
TableID hasNext()
: store(store_), lock(store_->mutex), expected_table_id(expected_table_id_)
{
if (expected_table_id != InvalidTableID)
const auto & data = store->data.write_cf.getData();
if (auto it = data.find(expected_table_id); it != data.end())
{
for (; write_map_it != store->data.write_cf.map.cend(); ++write_map_it)
{
if (likely(std::get<0>(write_map_it->first) == expected_table_id))
return expected_table_id;
}
found = true;
write_map_it = it->second.begin();
write_map_it_end = it->second.end();
}
else
{
if (write_map_it != store->data.write_cf.map.cend())
return std::get<0>(write_map_it->first);
}
return InvalidTableID;
found = false;
}

auto next(std::vector<RegionWriteCFData::Key> * keys = nullptr) { return store->readDataByWriteIt(write_map_it++, keys); }
bool hasNext() const { return found && write_map_it != write_map_it_end; }

LockInfoPtr getLockInfo(TableID expected_table_id, UInt64 start_ts) { return store->getLockInfo(expected_table_id, start_ts); }
auto next(RegionWriteCFDataTrait::Keys * keys = nullptr)
{
if (!found)
throw Exception(String() + "table: " + DB::toString(expected_table_id) + " is not found", ErrorCodes::LOGICAL_ERROR);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need String() ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, that's a mistake

return store->readDataByWriteIt(expected_table_id, write_map_it++, keys);
}

LockInfoPtr getLockInfo(UInt64 start_ts) { return store->getLockInfo(expected_table_id, start_ts); }

private:
RegionPtr store;
std::shared_lock<std::shared_mutex> lock;

bool found;
TableID expected_table_id;
RegionData::ConstWriteCFIter write_map_it;
RegionData::ConstWriteCFIter write_map_it_end;
};

class CommittedRemover : private boost::noncopyable
{
public:
CommittedRemover(const RegionPtr & store_) : store(store_), lock(store_->mutex) {}
CommittedRemover(const RegionPtr & store_, TableID expected_table_id_)
: store(store_), lock(store_->mutex), expected_table_id(expected_table_id_)
{
auto & data = store->data.write_cf.getDataMut();
if (auto it = data.find(expected_table_id); it != data.end())
{
found = true;
data_map = &(it->second);
}
else
found = false;
}

void remove(const RegionWriteCFData::Key & key)
{
if (auto it = store->data.write_cf.map.find(key); it != store->data.write_cf.map.end())
store->removeDataByWriteIt(it);
if (!found)
return;
if (auto it = data_map->find(key); it != data_map->end())
store->removeDataByWriteIt(expected_table_id, it);
}

private:
RegionPtr store;
std::unique_lock<std::shared_mutex> lock;

bool found;
TableID expected_table_id;
RegionWriteCFData::Map * data_map;
};

public:
@@ -121,7 +137,7 @@ class Region : public std::enable_shared_from_this<Region>
std::tuple<std::vector<RegionPtr>, TableIDSet, bool> onCommand(const enginepb::CommandRequest & cmd);

std::unique_ptr<CommittedScanner> createCommittedScanner(TableID expected_table_id);
std::unique_ptr<CommittedRemover> createCommittedRemover();
std::unique_ptr<CommittedRemover> createCommittedRemover(TableID expected_table_id);

size_t serialize(WriteBuffer & buf, enginepb::CommandResponse * response = nullptr);
static RegionPtr deserialize(ReadBuffer & buf, const RegionClientCreateFunc * region_client_create = nullptr);
@@ -176,8 +192,8 @@ class Region : public std::enable_shared_from_this<Region>
ColumnFamilyType getCf(const String & cf);

RegionData::ReadInfo readDataByWriteIt(
const RegionData::ConstWriteCFIter & write_it, std::vector<RegionWriteCFData::Key> * keys = nullptr);
RegionData::WriteCFIter removeDataByWriteIt(const RegionData::WriteCFIter & write_it);
const TableID & table_id, const RegionData::ConstWriteCFIter & write_it, RegionWriteCFDataTrait::Keys * keys = nullptr);
RegionData::WriteCFIter removeDataByWriteIt(const TableID & table_id, const RegionData::WriteCFIter & write_it);

LockInfoPtr getLockInfo(TableID expected_table_id, UInt64 start_ts);

548 changes: 211 additions & 337 deletions dbms/src/Storages/Transaction/RegionData.h

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion dbms/src/Storages/Transaction/RegionTable.cpp
Original file line number Diff line number Diff line change
@@ -206,7 +206,7 @@ void RegionTable::flushRegion(TableID table_id, RegionID region_id, size_t & cac
auto region = tmt.kvstore->getRegion(region_id);
if (!region)
return;
auto remover = region->createCommittedRemover();
auto remover = region->createCommittedRemover(table_id);
for (const auto & key : keys_to_remove)
remover->remove(key);
cache_size = region->dataSize();