Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add concurrency for reward calculation to scale out #1291

Merged
merged 20 commits into from
May 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/logging.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ const CLogCategoryDesc LogCategories[] =
{BCLog::LOAN, "loan"},
{BCLog::ACCOUNTCHANGE, "accountchange"},
{BCLog::FUTURESWAP, "futureswap"},
{BCLog::TOKEN_SPLIT, "tokensplit"},
{BCLog::ALL, "1"},
{BCLog::ALL, "all"},
};
Expand Down
1 change: 1 addition & 0 deletions src/logging.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ namespace BCLog {
LOAN = (1 << 25),
ACCOUNTCHANGE = (1 << 26),
FUTURESWAP = (1 << 27),
TOKEN_SPLIT = (1 << 28),
ALL = ~(uint32_t)0,
};

Expand Down
76 changes: 68 additions & 8 deletions src/validation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@

#include <boost/algorithm/string/replace.hpp>
#include <boost/thread.hpp>
#include <boost/asio.hpp>

#if defined(NDEBUG)
# error "Defi cannot be compiled without assertions."
Expand Down Expand Up @@ -184,6 +185,8 @@ namespace {
std::set<int> setDirtyFileInfo;
} // anon namespace

extern std::string ScriptToString(CScript const& script);

CBlockIndex* LookupBlockIndex(const uint256& hash)
{
//std::cout << "!!!LookupBlockIndex : " << hash.ToString() << std::endl;
Expand Down Expand Up @@ -2727,7 +2730,10 @@ bool CChainState::ConnectBlock(const CBlock& block, CValidationState& state, CBl
CCustomCSView accountsView(mnview);
blockundo.vtxundo.reserve(block.vtx.size() - 1);
std::vector<PrecomputedTransactionData> txdata;

txdata.reserve(block.vtx.size()); // Required so that pointers to individual PrecomputedTransactionData don't get invalidated

// Execute TXs
for (unsigned int i = 0; i < block.vtx.size(); i++)
{
const CTransaction &tx = *(block.vtx[i]);
Expand Down Expand Up @@ -2868,6 +2874,7 @@ bool CChainState::ConnectBlock(const CBlock& block, CValidationState& state, CBl
}
UpdateCoins(tx, view, i == 0 ? undoDummy : blockundo.vtxundo.back(), pindex->nHeight);
}

int64_t nTime3 = GetTimeMicros(); nTimeConnect += nTime3 - nTime2;
LogPrint(BCLog::BENCH, " - Connect %u transactions: %.2fms (%.3fms/tx, %.3fms/txin) [%.2fs (%.2fms/blk)]\n", (unsigned)block.vtx.size(), MILLI * (nTime3 - nTime2), MILLI * (nTime3 - nTime2) / block.vtx.size(), nInputs <= 1 ? 0 : MILLI * (nTime3 - nTime2) / (nInputs-1), nTimeConnect * MICRO, nTimeConnect * MILLI / nBlocksTotal);

Expand Down Expand Up @@ -3943,30 +3950,73 @@ static Res PoolSplits(CCustomCSView& view, CAmount& totalBalance, ATTRIBUTES& at
}

std::vector<std::pair<CScript, CAmount>> balancesToMigrate;
view.ForEachBalance([&, oldPoolId = oldPoolId](CScript const & owner, CTokenAmount balance) {
uint64_t totalAccounts = 0;
view.ForEachBalance([&, oldPoolId = oldPoolId](CScript const& owner, CTokenAmount balance) {
if (oldPoolId.v == balance.nTokenId.v && balance.nValue > 0) {
balancesToMigrate.emplace_back(owner, balance.nValue);
}
totalAccounts++;
return true;
});

LogPrintf("Pool migration: Migrating %d balances.. \n", balancesToMigrate.size());
const auto workersMax = std::thread::hardware_concurrency() - 1;
auto nWorkers = workersMax > 2 ? workersMax: 3;

LogPrintf("Pool migration: Migrating balances (count: %d, total: %d, concurrency: %d)..\n",
balancesToMigrate.size(), totalAccounts, nWorkers);

// Largest first to make sure we are over MINIMUM_LIQUIDITY on first call to AddLiquidity
std::sort(balancesToMigrate.begin(), balancesToMigrate.end(), [](const std::pair<CScript, CAmount>&a, const std::pair<CScript, CAmount>& b){
std::sort(balancesToMigrate.begin(), balancesToMigrate.end(),
[](const std::pair<CScript, CAmount>&a, const std::pair<CScript, CAmount>& b){
return a.second > b.second;
});

if (!balancesToMigrate.empty()) {
auto rewardsTime = GetTimeMicros();

boost::asio::thread_pool workerPool(nWorkers);
boost::asio::thread_pool mergeWorker(1);
std::atomic<uint64_t> tasksCompleted{0};
std::atomic<uint64_t> reportedTs{0};

for (auto& [owner, amount] : balancesToMigrate) {
// See https://github.com/DeFiCh/ain/pull/1291
// https://github.com/DeFiCh/ain/pull/1291#issuecomment-1137638060
// Technically not fully synchronized, but avoid races
// due to the segregated areas of operation.
boost::asio::post(workerPool, [&, &account = owner]() {
auto tempView = std::make_unique<CCustomCSView>(view);
tempView->CalculateOwnerRewards(account, pindex->nHeight);

boost::asio::post(mergeWorker, [&, tempView = std::move(tempView)]() {
tempView->Flush();

auto itemsCompleted = tasksCompleted.fetch_add(1);
const auto logTimeIntervalMillis = 3 * 1000;
if (GetTimeMillis() - reportedTs > logTimeIntervalMillis) {
LogPrintf("Balance migration: %.2f%% completed (%d/%d)\n",
(itemsCompleted * 1.f / balancesToMigrate.size()) * 100.0,
itemsCompleted, balancesToMigrate.size());
reportedTs.store(GetTimeMillis());
}
});
});
}
workerPool.join();
mergeWorker.join();

auto itemsCompleted = tasksCompleted.load();
LogPrintf("Balance migration: 100%% completed (%d/%d, time: %dms)\n",
itemsCompleted, itemsCompleted, MILLI * (GetTimeMicros() - rewardsTime));
}

// Special case. No liquidity providers in a previously used pool.
if (balancesToMigrate.empty() && oldPoolPair->totalLiquidity == CPoolPair::MINIMUM_LIQUIDITY) {
balancesToMigrate.emplace_back(Params().GetConsensus().burnAddress, CAmount{CPoolPair::MINIMUM_LIQUIDITY});
}

for (auto& [owner, amount] : balancesToMigrate) {

if (owner != Params().GetConsensus().burnAddress) {
view.CalculateOwnerRewards(owner, pindex->nHeight);

res = view.SubBalance(owner, CTokenAmount{oldPoolId, amount});
if (!res.ok) {
throw std::runtime_error(strprintf("SubBalance failed: %s", res.msg));
Expand Down Expand Up @@ -4049,6 +4099,11 @@ static Res PoolSplits(CCustomCSView& view, CAmount& totalBalance, ATTRIBUTES& at
continue;
}

auto oldPoolLogStr = CTokenAmount{oldPoolId, amount}.ToString();
auto newPoolLogStr = CTokenAmount{newPoolId, liquidity}.ToString();
LogPrint(BCLog::TOKEN_SPLIT, "TokenSplit: LP (%s: %s => %s)\n",
ScriptToString(owner), oldPoolLogStr, newPoolLogStr);

view.SetShare(newPoolId, owner, pindex->nHeight);
}

Expand Down Expand Up @@ -4381,12 +4436,17 @@ void CChainState::ProcessTokenSplits(const CBlock& block, const CBlockIndex* pin
addAccounts[owner].Add({newTokenId, newBalance});
subAccounts[owner].Add(balance);
totalBalance += newBalance;

auto newBalanceStr = CTokenAmount{newTokenId, newBalance}.ToString();
LogPrint(BCLog::TOKEN_SPLIT, "TokenSplit: T (%s, v: %s => %s)\n",
ScriptToString(owner), balance.ToString(),
newBalanceStr);
}
return true;
});

LogPrintf("Token split info: Rebalance " /* Continued */
"(id: %d, symbol: %s, add: %d, sub: %d, total: %d)\n",
LogPrintf("Token split info: rebalance " /* Continued */
"(id: %d, symbol: %s, add-accounts: %d, sub-accounts: %d, val: %d)\n",
id, newToken.symbol, addAccounts.size(), subAccounts.size(), totalBalance);

res = view.AddMintedTokens(newTokenId, totalBalance);
Expand Down
5 changes: 3 additions & 2 deletions test/lint/lint-includes.sh
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,13 @@ EXPECTED_BOOST_INCLUDES=(
boost/algorithm/string/join.hpp
boost/algorithm/string/replace.hpp
boost/algorithm/string/split.hpp
boost/asio.hpp
boost/chrono/chrono.hpp
boost/circular_buffer.hpp
boost/date_time/posix_time/posix_time.hpp
boost/filesystem.hpp
boost/filesystem/fstream.hpp
boost/multiprecision/cpp_int.hpp
boost/multi_index/hashed_index.hpp
boost/multi_index/ordered_index.hpp
boost/multi_index/sequenced_index.hpp
Expand All @@ -85,8 +88,6 @@ EXPECTED_BOOST_INCLUDES=(
boost/variant.hpp
boost/variant/apply_visitor.hpp
boost/variant/static_visitor.hpp
boost/multiprecision/cpp_int.hpp
boost/circular_buffer.hpp
)

for BOOST_INCLUDE in $(git grep '^#include <boost/' -- "*.cpp" "*.h" | cut -f2 -d: | cut -f2 -d'<' | cut -f1 -d'>' | sort -u); do
Expand Down