diff --git a/src/init.cpp b/src/init.cpp index bf48fabc15..028512c6e3 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -597,6 +597,7 @@ void SetupServerArgs() gArgs.AddArg("-server", "Accept command line and JSON-RPC commands", ArgsManager::ALLOW_ANY, OptionsCategory::RPC); gArgs.AddArg("-rpcallowcors=", "Allow CORS requests from the given host origin. Include scheme and port (eg: -rpcallowcors=http://127.0.0.1:5000)", ArgsManager::ALLOW_ANY, OptionsCategory::RPC); gArgs.AddArg("-rpcstats", strprintf("Log RPC stats. (default: %u)", DEFAULT_RPC_STATS), ArgsManager::ALLOW_ANY, OptionsCategory::RPC); + gArgs.AddArg("-consolidaterewards=", "Consolidate rewards on startup. Accepted multiple times for each token symbol", ArgsManager::ALLOW_ANY, OptionsCategory::DEBUG_TEST); #if HAVE_DECL_DAEMON gArgs.AddArg("-daemon", "Run in the background as a daemon and accept commands", ArgsManager::ALLOW_ANY, OptionsCategory::OPTIONS); @@ -1865,6 +1866,44 @@ bool AppInitMain(InitInterfaces& interfaces) nLocalServices = ServiceFlags(nLocalServices | NODE_WITNESS); } + if (gArgs.IsArgSet("-consolidaterewards")) { + const std::vector tokenSymbolArgs = gArgs.GetArgs("-consolidaterewards"); + auto fullRewardConsolidation = false; + for (const auto& tokenSymbolInput : tokenSymbolArgs) { + auto tokenSymbol = trim_ws(tokenSymbolInput); + if (tokenSymbol.empty()) { + fullRewardConsolidation = true; + continue; + } + LogPrintf("Consolidate rewards for token: %s\n", tokenSymbol); + auto token = pcustomcsview->GetToken(tokenSymbol); + if (!token) { + InitError(strprintf("Invalid token \"%s\" for reward consolidation.\n", tokenSymbol)); + return false; + } + + std::vector> balancesToMigrate; + pcustomcsview->ForEachBalance([&, tokenId = token->first](CScript const& owner, CTokenAmount balance) { + if (tokenId.v == balance.nTokenId.v && balance.nValue > 0) { + balancesToMigrate.emplace_back(owner, balance.nValue); + } + return true; + }); + ConsolidateRewards(*pcustomcsview, ::ChainActive().Height(), balancesToMigrate, true); + } + if (fullRewardConsolidation) { + LogPrintf("Consolidate rewards for all addresses..\n"); + std::vector> balancesToMigrate; + pcustomcsview->ForEachBalance([&](CScript const& owner, CTokenAmount balance) { + if (balance.nValue > 0) { + balancesToMigrate.emplace_back(owner, balance.nValue); + } + return true; + }); + ConsolidateRewards(*pcustomcsview, ::ChainActive().Height(), balancesToMigrate, true); + } + } + // ********************************************************* Step 11: import blocks if (!CheckDiskSpace(GetDataDir())) { diff --git a/src/validation.cpp b/src/validation.cpp index eea0cf3aa8..536534e3ed 100644 --- a/src/validation.cpp +++ b/src/validation.cpp @@ -3870,6 +3870,65 @@ static inline T CalculateNewAmount(const int multiplier, const T amount) { return multiplier < 0 ? amount / std::abs(multiplier) : amount * multiplier; } +size_t RewardConsolidationWorkersCount() { + const size_t workersMax = GetNumCores() - 1; + return workersMax > 2 ? workersMax : 3; +} + +// Note: Be careful with lambda captures and default args. GCC 11.2.0, appears the if the captures are +// unused in the function directly, but inside the lambda, it completely disassociates them from the fn +// possibly when the lambda is lifted up and with default args, ends up inling the default arg +// completely. TODO: verify with smaller test case. +// But scenario: If `interruptOnShutdown` is set as default arg to false, it will never be set true +// on the below as it's inlined by gcc 11.2.0 on Ubuntu 22.04 incorrectly. Behavior is correct +// in lower versions of gcc or across clang. +void ConsolidateRewards(CCustomCSView &view, int height, + const std::vector> &items, bool interruptOnShutdown, int numWorkers) { + int nWorkers = numWorkers < 1 ? RewardConsolidationWorkersCount() : numWorkers; + auto rewardsTime = GetTimeMicros(); + boost::asio::thread_pool workerPool(nWorkers); + boost::asio::thread_pool mergeWorker(1); + std::atomic tasksCompleted{0}; + std::atomic reportedTs{0}; + + for (auto& [owner, amount] : items) { + // See https://github.com/DeFiCh/ain/pull/1291 + // https://github.com/DeFiCh/ain/pull/1291#issuecomment-1137638060 + // Technically not fully synchronized, but avoid races + // due to the segregated areas of operation. + boost::asio::post(workerPool, [&, &account = owner]() { + if (interruptOnShutdown && ShutdownRequested()) return; + auto tempView = std::make_unique(view); + tempView->CalculateOwnerRewards(account, height); + + boost::asio::post(mergeWorker, [&, tempView = std::move(tempView)]() { + if (interruptOnShutdown && ShutdownRequested()) return; + tempView->Flush(); + + // This entire block is already serialized with single merge worker. + // So, relaxed ordering is more than sufficient - don't even need + // atomics really. + auto itemsCompleted = tasksCompleted.fetch_add(1, + std::memory_order::memory_order_relaxed); + const auto logTimeIntervalMillis = 3 * 1000; + if (GetTimeMillis() - reportedTs > logTimeIntervalMillis) { + LogPrintf("Reward consolidation: %.2f%% completed (%d/%d)\n", + (itemsCompleted * 1.f / items.size()) * 100.0, + itemsCompleted, items.size()); + reportedTs.store(GetTimeMillis(), + std::memory_order::memory_order_relaxed); + } + }); + }); + } + workerPool.join(); + mergeWorker.join(); + + auto itemsCompleted = tasksCompleted.load(); + LogPrintf("Reward consolidation: 100%% completed (%d/%d, time: %dms)\n", + itemsCompleted, itemsCompleted, MILLI * (GetTimeMicros() - rewardsTime)); +} + static Res PoolSplits(CCustomCSView& view, CAmount& totalBalance, ATTRIBUTES& attributes, const DCT_ID oldTokenId, const DCT_ID newTokenId, const CBlockIndex* pindex, const CreationTxs& creationTxs, const int32_t multiplier) { @@ -3960,10 +4019,8 @@ static Res PoolSplits(CCustomCSView& view, CAmount& totalBalance, ATTRIBUTES& at return true; }); - const auto workersMax = std::thread::hardware_concurrency() - 1; - auto nWorkers = workersMax > 2 ? workersMax: 3; - - LogPrintf("Pool migration: Migrating balances (count: %d, total: %d, concurrency: %d)..\n", + auto nWorkers = RewardConsolidationWorkersCount(); + LogPrintf("Pool migration: Consolidating rewards (count: %d, total: %d, concurrency: %d)..\n", balancesToMigrate.size(), totalAccounts, nWorkers); // Largest first to make sure we are over MINIMUM_LIQUIDITY on first call to AddLiquidity @@ -3972,44 +4029,7 @@ static Res PoolSplits(CCustomCSView& view, CAmount& totalBalance, ATTRIBUTES& at return a.second > b.second; }); - if (!balancesToMigrate.empty()) { - auto rewardsTime = GetTimeMicros(); - - boost::asio::thread_pool workerPool(nWorkers); - boost::asio::thread_pool mergeWorker(1); - std::atomic tasksCompleted{0}; - std::atomic reportedTs{0}; - - for (auto& [owner, amount] : balancesToMigrate) { - // See https://github.com/DeFiCh/ain/pull/1291 - // https://github.com/DeFiCh/ain/pull/1291#issuecomment-1137638060 - // Technically not fully synchronized, but avoid races - // due to the segregated areas of operation. - boost::asio::post(workerPool, [&, &account = owner]() { - auto tempView = std::make_unique(view); - tempView->CalculateOwnerRewards(account, pindex->nHeight); - - boost::asio::post(mergeWorker, [&, tempView = std::move(tempView)]() { - tempView->Flush(); - - auto itemsCompleted = tasksCompleted.fetch_add(1); - const auto logTimeIntervalMillis = 3 * 1000; - if (GetTimeMillis() - reportedTs > logTimeIntervalMillis) { - LogPrintf("Balance migration: %.2f%% completed (%d/%d)\n", - (itemsCompleted * 1.f / balancesToMigrate.size()) * 100.0, - itemsCompleted, balancesToMigrate.size()); - reportedTs.store(GetTimeMillis()); - } - }); - }); - } - workerPool.join(); - mergeWorker.join(); - - auto itemsCompleted = tasksCompleted.load(); - LogPrintf("Balance migration: 100%% completed (%d/%d, time: %dms)\n", - itemsCompleted, itemsCompleted, MILLI * (GetTimeMicros() - rewardsTime)); - } + ConsolidateRewards(view, pindex->nHeight, balancesToMigrate, false, nWorkers); // Special case. No liquidity providers in a previously used pool. if (balancesToMigrate.empty() && oldPoolPair->totalLiquidity == CPoolPair::MINIMUM_LIQUIDITY) { diff --git a/src/validation.h b/src/validation.h index 10b579fd78..72c1c2bf6a 100644 --- a/src/validation.h +++ b/src/validation.h @@ -856,4 +856,8 @@ inline CAmount CalculateCoinbaseReward(const CAmount blockReward, const uint32_t Res AddNonTxToBurnIndex(const CScript& from, const CBalances& amounts); +void ConsolidateRewards(CCustomCSView& view, int height, + const std::vector> &items, + bool interruptOnShutdown, int numWorkers = 0); + #endif // DEFI_VALIDATION_H