Skip to content

Commit

Permalink
Protect language boundary between C++ and Rust codes (#6974)
Browse files Browse the repository at this point in the history
close #6973
  • Loading branch information
CalvinNeo authored Mar 7, 2023
1 parent 79be764 commit ecafa26
Showing 1 changed file with 89 additions and 23 deletions.
112 changes: 89 additions & 23 deletions dbms/src/Storages/Transaction/ProxyFFI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,51 +161,109 @@ uint8_t TryFlushData(EngineStoreServerWrap * server, uint64_t region_id, uint8_t

RawCppPtr CreateWriteBatch(const EngineStoreServerWrap * dummy)
{
UNUSED(dummy);
return GenRawCppPtr(new UniversalWriteBatch(), RawCppPtrTypeImpl::WriteBatch);
try
{
// Don't move the dummy argument, it is useful on proxy's side.
// This function is not protected by try-catch, since it's rarely throw.
UNUSED(dummy);
return GenRawCppPtr(new UniversalWriteBatch(), RawCppPtrTypeImpl::WriteBatch);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
exit(-1);
}
}

void WriteBatchPutPage(RawVoidPtr ptr, BaseBuffView page_id, BaseBuffView value)
{
LOG_TRACE(&Poco::Logger::get("ProxyFFI"), fmt::format("FFI write page {}", UniversalPageId(page_id.data, page_id.len)));
auto * wb = reinterpret_cast<UniversalWriteBatch *>(ptr);
MemoryWriteBuffer buf(0, value.len);
buf.write(value.data, value.len);
auto data_size = buf.count();
assert(data_size == value.len);
wb->putPage(UniversalPageId(page_id.data, page_id.len), 0, buf.tryGetReadBuffer(), data_size);
try
{
LOG_TRACE(&Poco::Logger::get("ProxyFFI"), fmt::format("FFI write page {}", UniversalPageId(page_id.data, page_id.len)));
auto * wb = reinterpret_cast<UniversalWriteBatch *>(ptr);
MemoryWriteBuffer buf(0, value.len);
buf.write(value.data, value.len);
auto data_size = buf.count();
assert(data_size == value.len);
wb->putPage(UniversalPageId(page_id.data, page_id.len), 0, buf.tryGetReadBuffer(), data_size);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
exit(-1);
}
}

void WriteBatchDelPage(RawVoidPtr ptr, BaseBuffView page_id)
{
LOG_TRACE(&Poco::Logger::get("ProxyFFI"), fmt::format("FFI delete page {}", UniversalPageId(page_id.data, page_id.len)));
auto * wb = reinterpret_cast<UniversalWriteBatch *>(ptr);
wb->delPage(UniversalPageId(page_id.data, page_id.len));
try
{
LOG_TRACE(&Poco::Logger::get("ProxyFFI"), fmt::format("FFI delete page {}", UniversalPageId(page_id.data, page_id.len)));
auto * wb = reinterpret_cast<UniversalWriteBatch *>(ptr);
wb->delPage(UniversalPageId(page_id.data, page_id.len));
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
exit(-1);
}
}

uint64_t GetWriteBatchSize(RawVoidPtr ptr)
{
auto * wb = reinterpret_cast<UniversalWriteBatch *>(ptr);
return wb->getTotalDataSize();
try
{
auto * wb = reinterpret_cast<UniversalWriteBatch *>(ptr);
return wb->getTotalDataSize();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
exit(-1);
}
}

uint8_t IsWriteBatchEmpty(RawVoidPtr ptr)
{
auto * wb = reinterpret_cast<UniversalWriteBatch *>(ptr);
return wb->empty();
try
{
auto * wb = reinterpret_cast<UniversalWriteBatch *>(ptr);
return wb->empty();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
exit(-1);
}
}

void HandleMergeWriteBatch(RawVoidPtr lhs, RawVoidPtr rhs)
{
auto * lwb = reinterpret_cast<UniversalWriteBatch *>(lhs);
auto * rwb = reinterpret_cast<UniversalWriteBatch *>(rhs);
lwb->merge(*rwb);
try
{
auto * lwb = reinterpret_cast<UniversalWriteBatch *>(lhs);
auto * rwb = reinterpret_cast<UniversalWriteBatch *>(rhs);
lwb->merge(*rwb);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
exit(-1);
}
}

void HandleClearWriteBatch(RawVoidPtr ptr)
{
auto * wb = reinterpret_cast<UniversalWriteBatch *>(ptr);
wb->clear();
try
{
auto * wb = reinterpret_cast<UniversalWriteBatch *>(ptr);
wb->clear();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
exit(-1);
}
}

void HandleConsumeWriteBatch(const EngineStoreServerWrap * server, RawVoidPtr ptr)
Expand Down Expand Up @@ -799,8 +857,16 @@ raft_serverpb::RegionLocalState TiFlashRaftProxyHelper::getRegionLocalState(uint

void HandleSafeTSUpdate(EngineStoreServerWrap * server, uint64_t region_id, uint64_t self_safe_ts, uint64_t leader_safe_ts)
{
RegionTable & region_table = server->tmt->getRegionTable();
region_table.updateSafeTS(region_id, leader_safe_ts, self_safe_ts);
try
{
RegionTable & region_table = server->tmt->getRegionTable();
region_table.updateSafeTS(region_id, leader_safe_ts, self_safe_ts);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
exit(-1);
}
}


Expand Down

0 comments on commit ecafa26

Please sign in to comment.