Skip to content

Commit

Permalink
Add concurrency for reward calculation to scale out (#1291)
Browse files Browse the repository at this point in the history
* Add concurrency for reward calculation to scale out
  • Loading branch information
prasannavl authored May 27, 2022
1 parent a7f17c4 commit 7dce4c2
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 10 deletions.
1 change: 1 addition & 0 deletions src/logging.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ const CLogCategoryDesc LogCategories[] =
{BCLog::LOAN, "loan"},
{BCLog::ACCOUNTCHANGE, "accountchange"},
{BCLog::FUTURESWAP, "futureswap"},
{BCLog::TOKEN_SPLIT, "tokensplit"},
{BCLog::ALL, "1"},
{BCLog::ALL, "all"},
};
Expand Down
1 change: 1 addition & 0 deletions src/logging.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ namespace BCLog {
LOAN = (1 << 25),
ACCOUNTCHANGE = (1 << 26),
FUTURESWAP = (1 << 27),
TOKEN_SPLIT = (1 << 28),
ALL = ~(uint32_t)0,
};

Expand Down
76 changes: 68 additions & 8 deletions src/validation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@

#include <boost/algorithm/string/replace.hpp>
#include <boost/thread.hpp>
#include <boost/asio.hpp>

#if defined(NDEBUG)
# error "Defi cannot be compiled without assertions."
Expand Down Expand Up @@ -184,6 +185,8 @@ namespace {
std::set<int> setDirtyFileInfo;
} // anon namespace

extern std::string ScriptToString(CScript const& script);

CBlockIndex* LookupBlockIndex(const uint256& hash)
{
//std::cout << "!!!LookupBlockIndex : " << hash.ToString() << std::endl;
Expand Down Expand Up @@ -2727,7 +2730,10 @@ bool CChainState::ConnectBlock(const CBlock& block, CValidationState& state, CBl
CCustomCSView accountsView(mnview);
blockundo.vtxundo.reserve(block.vtx.size() - 1);
std::vector<PrecomputedTransactionData> txdata;

txdata.reserve(block.vtx.size()); // Required so that pointers to individual PrecomputedTransactionData don't get invalidated

// Execute TXs
for (unsigned int i = 0; i < block.vtx.size(); i++)
{
const CTransaction &tx = *(block.vtx[i]);
Expand Down Expand Up @@ -2868,6 +2874,7 @@ bool CChainState::ConnectBlock(const CBlock& block, CValidationState& state, CBl
}
UpdateCoins(tx, view, i == 0 ? undoDummy : blockundo.vtxundo.back(), pindex->nHeight);
}

int64_t nTime3 = GetTimeMicros(); nTimeConnect += nTime3 - nTime2;
LogPrint(BCLog::BENCH, " - Connect %u transactions: %.2fms (%.3fms/tx, %.3fms/txin) [%.2fs (%.2fms/blk)]\n", (unsigned)block.vtx.size(), MILLI * (nTime3 - nTime2), MILLI * (nTime3 - nTime2) / block.vtx.size(), nInputs <= 1 ? 0 : MILLI * (nTime3 - nTime2) / (nInputs-1), nTimeConnect * MICRO, nTimeConnect * MILLI / nBlocksTotal);

Expand Down Expand Up @@ -3943,30 +3950,73 @@ static Res PoolSplits(CCustomCSView& view, CAmount& totalBalance, ATTRIBUTES& at
}

std::vector<std::pair<CScript, CAmount>> balancesToMigrate;
view.ForEachBalance([&, oldPoolId = oldPoolId](CScript const & owner, CTokenAmount balance) {
uint64_t totalAccounts = 0;
view.ForEachBalance([&, oldPoolId = oldPoolId](CScript const& owner, CTokenAmount balance) {
if (oldPoolId.v == balance.nTokenId.v && balance.nValue > 0) {
balancesToMigrate.emplace_back(owner, balance.nValue);
}
totalAccounts++;
return true;
});

LogPrintf("Pool migration: Migrating %d balances.. \n", balancesToMigrate.size());
const auto workersMax = std::thread::hardware_concurrency() - 1;
auto nWorkers = workersMax > 2 ? workersMax: 3;

LogPrintf("Pool migration: Migrating balances (count: %d, total: %d, concurrency: %d)..\n",
balancesToMigrate.size(), totalAccounts, nWorkers);

// Largest first to make sure we are over MINIMUM_LIQUIDITY on first call to AddLiquidity
std::sort(balancesToMigrate.begin(), balancesToMigrate.end(), [](const std::pair<CScript, CAmount>&a, const std::pair<CScript, CAmount>& b){
std::sort(balancesToMigrate.begin(), balancesToMigrate.end(),
[](const std::pair<CScript, CAmount>&a, const std::pair<CScript, CAmount>& b){
return a.second > b.second;
});

if (!balancesToMigrate.empty()) {
auto rewardsTime = GetTimeMicros();

boost::asio::thread_pool workerPool(nWorkers);
boost::asio::thread_pool mergeWorker(1);
std::atomic<uint64_t> tasksCompleted{0};
std::atomic<uint64_t> reportedTs{0};

for (auto& [owner, amount] : balancesToMigrate) {
// See https://github.com/DeFiCh/ain/pull/1291
// https://github.com/DeFiCh/ain/pull/1291#issuecomment-1137638060
// Technically not fully synchronized, but avoid races
// due to the segregated areas of operation.
boost::asio::post(workerPool, [&, &account = owner]() {
auto tempView = std::make_unique<CCustomCSView>(view);
tempView->CalculateOwnerRewards(account, pindex->nHeight);

boost::asio::post(mergeWorker, [&, tempView = std::move(tempView)]() {
tempView->Flush();

auto itemsCompleted = tasksCompleted.fetch_add(1);
const auto logTimeIntervalMillis = 3 * 1000;
if (GetTimeMillis() - reportedTs > logTimeIntervalMillis) {
LogPrintf("Balance migration: %.2f%% completed (%d/%d)\n",
(itemsCompleted * 1.f / balancesToMigrate.size()) * 100.0,
itemsCompleted, balancesToMigrate.size());
reportedTs.store(GetTimeMillis());
}
});
});
}
workerPool.join();
mergeWorker.join();

auto itemsCompleted = tasksCompleted.load();
LogPrintf("Balance migration: 100%% completed (%d/%d, time: %dms)\n",
itemsCompleted, itemsCompleted, MILLI * (GetTimeMicros() - rewardsTime));
}

// Special case. No liquidity providers in a previously used pool.
if (balancesToMigrate.empty() && oldPoolPair->totalLiquidity == CPoolPair::MINIMUM_LIQUIDITY) {
balancesToMigrate.emplace_back(Params().GetConsensus().burnAddress, CAmount{CPoolPair::MINIMUM_LIQUIDITY});
}

for (auto& [owner, amount] : balancesToMigrate) {

if (owner != Params().GetConsensus().burnAddress) {
view.CalculateOwnerRewards(owner, pindex->nHeight);

res = view.SubBalance(owner, CTokenAmount{oldPoolId, amount});
if (!res.ok) {
throw std::runtime_error(strprintf("SubBalance failed: %s", res.msg));
Expand Down Expand Up @@ -4049,6 +4099,11 @@ static Res PoolSplits(CCustomCSView& view, CAmount& totalBalance, ATTRIBUTES& at
continue;
}

auto oldPoolLogStr = CTokenAmount{oldPoolId, amount}.ToString();
auto newPoolLogStr = CTokenAmount{newPoolId, liquidity}.ToString();
LogPrint(BCLog::TOKEN_SPLIT, "TokenSplit: LP (%s: %s => %s)\n",
ScriptToString(owner), oldPoolLogStr, newPoolLogStr);

view.SetShare(newPoolId, owner, pindex->nHeight);
}

Expand Down Expand Up @@ -4381,12 +4436,17 @@ void CChainState::ProcessTokenSplits(const CBlock& block, const CBlockIndex* pin
addAccounts[owner].Add({newTokenId, newBalance});
subAccounts[owner].Add(balance);
totalBalance += newBalance;

auto newBalanceStr = CTokenAmount{newTokenId, newBalance}.ToString();
LogPrint(BCLog::TOKEN_SPLIT, "TokenSplit: T (%s, v: %s => %s)\n",
ScriptToString(owner), balance.ToString(),
newBalanceStr);
}
return true;
});

LogPrintf("Token split info: Rebalance " /* Continued */
"(id: %d, symbol: %s, add: %d, sub: %d, total: %d)\n",
LogPrintf("Token split info: rebalance " /* Continued */
"(id: %d, symbol: %s, add-accounts: %d, sub-accounts: %d, val: %d)\n",
id, newToken.symbol, addAccounts.size(), subAccounts.size(), totalBalance);

res = view.AddMintedTokens(newTokenId, totalBalance);
Expand Down
5 changes: 3 additions & 2 deletions test/lint/lint-includes.sh
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,13 @@ EXPECTED_BOOST_INCLUDES=(
boost/algorithm/string/join.hpp
boost/algorithm/string/replace.hpp
boost/algorithm/string/split.hpp
boost/asio.hpp
boost/chrono/chrono.hpp
boost/circular_buffer.hpp
boost/date_time/posix_time/posix_time.hpp
boost/filesystem.hpp
boost/filesystem/fstream.hpp
boost/multiprecision/cpp_int.hpp
boost/multi_index/hashed_index.hpp
boost/multi_index/ordered_index.hpp
boost/multi_index/sequenced_index.hpp
Expand All @@ -85,8 +88,6 @@ EXPECTED_BOOST_INCLUDES=(
boost/variant.hpp
boost/variant/apply_visitor.hpp
boost/variant/static_visitor.hpp
boost/multiprecision/cpp_int.hpp
boost/circular_buffer.hpp
)

for BOOST_INCLUDE in $(git grep '^#include <boost/' -- "*.cpp" "*.h" | cut -f2 -d: | cut -f2 -d'<' | cut -f1 -d'>' | sort -u); do
Expand Down

0 comments on commit 7dce4c2

Please sign in to comment.