Skip to content

Commit

Permalink
Support of consolidating rewards on start (#1313)
Browse files Browse the repository at this point in the history
* Minor cleanups, log fds

* Add missing

* Fix up mixed up default args

* Add compiler bug notes
  • Loading branch information
prasannavl authored May 31, 2022
1 parent 9a570e7 commit 2dd940d
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 42 deletions.
39 changes: 39 additions & 0 deletions src/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,7 @@ void SetupServerArgs()
gArgs.AddArg("-server", "Accept command line and JSON-RPC commands", ArgsManager::ALLOW_ANY, OptionsCategory::RPC);
gArgs.AddArg("-rpcallowcors=<host>", "Allow CORS requests from the given host origin. Include scheme and port (eg: -rpcallowcors=http://127.0.0.1:5000)", ArgsManager::ALLOW_ANY, OptionsCategory::RPC);
gArgs.AddArg("-rpcstats", strprintf("Log RPC stats. (default: %u)", DEFAULT_RPC_STATS), ArgsManager::ALLOW_ANY, OptionsCategory::RPC);
gArgs.AddArg("-consolidaterewards=<token-or-pool-symbol>", "Consolidate rewards on startup. Accepted multiple times for each token symbol", ArgsManager::ALLOW_ANY, OptionsCategory::DEBUG_TEST);

#if HAVE_DECL_DAEMON
gArgs.AddArg("-daemon", "Run in the background as a daemon and accept commands", ArgsManager::ALLOW_ANY, OptionsCategory::OPTIONS);
Expand Down Expand Up @@ -1865,6 +1866,44 @@ bool AppInitMain(InitInterfaces& interfaces)
nLocalServices = ServiceFlags(nLocalServices | NODE_WITNESS);
}

if (gArgs.IsArgSet("-consolidaterewards")) {
const std::vector<std::string> tokenSymbolArgs = gArgs.GetArgs("-consolidaterewards");
auto fullRewardConsolidation = false;
for (const auto& tokenSymbolInput : tokenSymbolArgs) {
auto tokenSymbol = trim_ws(tokenSymbolInput);
if (tokenSymbol.empty()) {
fullRewardConsolidation = true;
continue;
}
LogPrintf("Consolidate rewards for token: %s\n", tokenSymbol);
auto token = pcustomcsview->GetToken(tokenSymbol);
if (!token) {
InitError(strprintf("Invalid token \"%s\" for reward consolidation.\n", tokenSymbol));
return false;
}

std::vector<std::pair<CScript, CAmount>> balancesToMigrate;
pcustomcsview->ForEachBalance([&, tokenId = token->first](CScript const& owner, CTokenAmount balance) {
if (tokenId.v == balance.nTokenId.v && balance.nValue > 0) {
balancesToMigrate.emplace_back(owner, balance.nValue);
}
return true;
});
ConsolidateRewards(*pcustomcsview, ::ChainActive().Height(), balancesToMigrate, true);
}
if (fullRewardConsolidation) {
LogPrintf("Consolidate rewards for all addresses..\n");
std::vector<std::pair<CScript, CAmount>> balancesToMigrate;
pcustomcsview->ForEachBalance([&](CScript const& owner, CTokenAmount balance) {
if (balance.nValue > 0) {
balancesToMigrate.emplace_back(owner, balance.nValue);
}
return true;
});
ConsolidateRewards(*pcustomcsview, ::ChainActive().Height(), balancesToMigrate, true);
}
}

// ********************************************************* Step 11: import blocks

if (!CheckDiskSpace(GetDataDir())) {
Expand Down
104 changes: 62 additions & 42 deletions src/validation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3870,6 +3870,65 @@ static inline T CalculateNewAmount(const int multiplier, const T amount) {
return multiplier < 0 ? amount / std::abs(multiplier) : amount * multiplier;
}

size_t RewardConsolidationWorkersCount() {
const size_t workersMax = GetNumCores() - 1;
return workersMax > 2 ? workersMax : 3;
}

// Note: Be careful with lambda captures and default args. GCC 11.2.0, appears the if the captures are
// unused in the function directly, but inside the lambda, it completely disassociates them from the fn
// possibly when the lambda is lifted up and with default args, ends up inling the default arg
// completely. TODO: verify with smaller test case.
// But scenario: If `interruptOnShutdown` is set as default arg to false, it will never be set true
// on the below as it's inlined by gcc 11.2.0 on Ubuntu 22.04 incorrectly. Behavior is correct
// in lower versions of gcc or across clang.
void ConsolidateRewards(CCustomCSView &view, int height,
const std::vector<std::pair<CScript, CAmount>> &items, bool interruptOnShutdown, int numWorkers) {
int nWorkers = numWorkers < 1 ? RewardConsolidationWorkersCount() : numWorkers;
auto rewardsTime = GetTimeMicros();
boost::asio::thread_pool workerPool(nWorkers);
boost::asio::thread_pool mergeWorker(1);
std::atomic<uint64_t> tasksCompleted{0};
std::atomic<uint64_t> reportedTs{0};

for (auto& [owner, amount] : items) {
// See https://github.com/DeFiCh/ain/pull/1291
// https://github.com/DeFiCh/ain/pull/1291#issuecomment-1137638060
// Technically not fully synchronized, but avoid races
// due to the segregated areas of operation.
boost::asio::post(workerPool, [&, &account = owner]() {
if (interruptOnShutdown && ShutdownRequested()) return;
auto tempView = std::make_unique<CCustomCSView>(view);
tempView->CalculateOwnerRewards(account, height);

boost::asio::post(mergeWorker, [&, tempView = std::move(tempView)]() {
if (interruptOnShutdown && ShutdownRequested()) return;
tempView->Flush();

// This entire block is already serialized with single merge worker.
// So, relaxed ordering is more than sufficient - don't even need
// atomics really.
auto itemsCompleted = tasksCompleted.fetch_add(1,
std::memory_order::memory_order_relaxed);
const auto logTimeIntervalMillis = 3 * 1000;
if (GetTimeMillis() - reportedTs > logTimeIntervalMillis) {
LogPrintf("Reward consolidation: %.2f%% completed (%d/%d)\n",
(itemsCompleted * 1.f / items.size()) * 100.0,
itemsCompleted, items.size());
reportedTs.store(GetTimeMillis(),
std::memory_order::memory_order_relaxed);
}
});
});
}
workerPool.join();
mergeWorker.join();

auto itemsCompleted = tasksCompleted.load();
LogPrintf("Reward consolidation: 100%% completed (%d/%d, time: %dms)\n",
itemsCompleted, itemsCompleted, MILLI * (GetTimeMicros() - rewardsTime));
}

static Res PoolSplits(CCustomCSView& view, CAmount& totalBalance, ATTRIBUTES& attributes, const DCT_ID oldTokenId, const DCT_ID newTokenId,
const CBlockIndex* pindex, const CreationTxs& creationTxs, const int32_t multiplier) {

Expand Down Expand Up @@ -3960,10 +4019,8 @@ static Res PoolSplits(CCustomCSView& view, CAmount& totalBalance, ATTRIBUTES& at
return true;
});

const auto workersMax = std::thread::hardware_concurrency() - 1;
auto nWorkers = workersMax > 2 ? workersMax: 3;

LogPrintf("Pool migration: Migrating balances (count: %d, total: %d, concurrency: %d)..\n",
auto nWorkers = RewardConsolidationWorkersCount();
LogPrintf("Pool migration: Consolidating rewards (count: %d, total: %d, concurrency: %d)..\n",
balancesToMigrate.size(), totalAccounts, nWorkers);

// Largest first to make sure we are over MINIMUM_LIQUIDITY on first call to AddLiquidity
Expand All @@ -3972,44 +4029,7 @@ static Res PoolSplits(CCustomCSView& view, CAmount& totalBalance, ATTRIBUTES& at
return a.second > b.second;
});

if (!balancesToMigrate.empty()) {
auto rewardsTime = GetTimeMicros();

boost::asio::thread_pool workerPool(nWorkers);
boost::asio::thread_pool mergeWorker(1);
std::atomic<uint64_t> tasksCompleted{0};
std::atomic<uint64_t> reportedTs{0};

for (auto& [owner, amount] : balancesToMigrate) {
// See https://github.com/DeFiCh/ain/pull/1291
// https://github.com/DeFiCh/ain/pull/1291#issuecomment-1137638060
// Technically not fully synchronized, but avoid races
// due to the segregated areas of operation.
boost::asio::post(workerPool, [&, &account = owner]() {
auto tempView = std::make_unique<CCustomCSView>(view);
tempView->CalculateOwnerRewards(account, pindex->nHeight);

boost::asio::post(mergeWorker, [&, tempView = std::move(tempView)]() {
tempView->Flush();

auto itemsCompleted = tasksCompleted.fetch_add(1);
const auto logTimeIntervalMillis = 3 * 1000;
if (GetTimeMillis() - reportedTs > logTimeIntervalMillis) {
LogPrintf("Balance migration: %.2f%% completed (%d/%d)\n",
(itemsCompleted * 1.f / balancesToMigrate.size()) * 100.0,
itemsCompleted, balancesToMigrate.size());
reportedTs.store(GetTimeMillis());
}
});
});
}
workerPool.join();
mergeWorker.join();

auto itemsCompleted = tasksCompleted.load();
LogPrintf("Balance migration: 100%% completed (%d/%d, time: %dms)\n",
itemsCompleted, itemsCompleted, MILLI * (GetTimeMicros() - rewardsTime));
}
ConsolidateRewards(view, pindex->nHeight, balancesToMigrate, false, nWorkers);

// Special case. No liquidity providers in a previously used pool.
if (balancesToMigrate.empty() && oldPoolPair->totalLiquidity == CPoolPair::MINIMUM_LIQUIDITY) {
Expand Down
4 changes: 4 additions & 0 deletions src/validation.h
Original file line number Diff line number Diff line change
Expand Up @@ -856,4 +856,8 @@ inline CAmount CalculateCoinbaseReward(const CAmount blockReward, const uint32_t

Res AddNonTxToBurnIndex(const CScript& from, const CBalances& amounts);

void ConsolidateRewards(CCustomCSView& view, int height,
const std::vector<std::pair<CScript, CAmount>> &items,
bool interruptOnShutdown, int numWorkers = 0);

#endif // DEFI_VALIDATION_H

0 comments on commit 2dd940d

Please sign in to comment.