Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support of consolidating rewards on start #1313

Merged
merged 8 commits into from
May 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions src/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,7 @@ void SetupServerArgs()
gArgs.AddArg("-server", "Accept command line and JSON-RPC commands", ArgsManager::ALLOW_ANY, OptionsCategory::RPC);
gArgs.AddArg("-rpcallowcors=<host>", "Allow CORS requests from the given host origin. Include scheme and port (eg: -rpcallowcors=http://127.0.0.1:5000)", ArgsManager::ALLOW_ANY, OptionsCategory::RPC);
gArgs.AddArg("-rpcstats", strprintf("Log RPC stats. (default: %u)", DEFAULT_RPC_STATS), ArgsManager::ALLOW_ANY, OptionsCategory::RPC);
gArgs.AddArg("-consolidaterewards=<token-or-pool-symbol>", "Consolidate rewards on startup. Accepted multiple times for each token symbol", ArgsManager::ALLOW_ANY, OptionsCategory::DEBUG_TEST);

#if HAVE_DECL_DAEMON
gArgs.AddArg("-daemon", "Run in the background as a daemon and accept commands", ArgsManager::ALLOW_ANY, OptionsCategory::OPTIONS);
Expand Down Expand Up @@ -1865,6 +1866,44 @@ bool AppInitMain(InitInterfaces& interfaces)
nLocalServices = ServiceFlags(nLocalServices | NODE_WITNESS);
}

if (gArgs.IsArgSet("-consolidaterewards")) {
const std::vector<std::string> tokenSymbolArgs = gArgs.GetArgs("-consolidaterewards");
auto fullRewardConsolidation = false;
for (const auto& tokenSymbolInput : tokenSymbolArgs) {
auto tokenSymbol = trim_ws(tokenSymbolInput);
if (tokenSymbol.empty()) {
fullRewardConsolidation = true;
continue;
}
LogPrintf("Consolidate rewards for token: %s\n", tokenSymbol);
auto token = pcustomcsview->GetToken(tokenSymbol);
if (!token) {
InitError(strprintf("Invalid token \"%s\" for reward consolidation.\n", tokenSymbol));
return false;
}

std::vector<std::pair<CScript, CAmount>> balancesToMigrate;
pcustomcsview->ForEachBalance([&, tokenId = token->first](CScript const& owner, CTokenAmount balance) {
if (tokenId.v == balance.nTokenId.v && balance.nValue > 0) {
balancesToMigrate.emplace_back(owner, balance.nValue);
}
return true;
});
ConsolidateRewards(*pcustomcsview, ::ChainActive().Height(), balancesToMigrate, true);
}
if (fullRewardConsolidation) {
LogPrintf("Consolidate rewards for all addresses..\n");
std::vector<std::pair<CScript, CAmount>> balancesToMigrate;
pcustomcsview->ForEachBalance([&](CScript const& owner, CTokenAmount balance) {
if (balance.nValue > 0) {
balancesToMigrate.emplace_back(owner, balance.nValue);
}
return true;
});
ConsolidateRewards(*pcustomcsview, ::ChainActive().Height(), balancesToMigrate, true);
}
}

// ********************************************************* Step 11: import blocks

if (!CheckDiskSpace(GetDataDir())) {
Expand Down
104 changes: 62 additions & 42 deletions src/validation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3870,6 +3870,65 @@ static inline T CalculateNewAmount(const int multiplier, const T amount) {
return multiplier < 0 ? amount / std::abs(multiplier) : amount * multiplier;
}

size_t RewardConsolidationWorkersCount() {
const size_t workersMax = GetNumCores() - 1;
return workersMax > 2 ? workersMax : 3;
}

// Note: Be careful with lambda captures and default args. GCC 11.2.0, appears the if the captures are
// unused in the function directly, but inside the lambda, it completely disassociates them from the fn
// possibly when the lambda is lifted up and with default args, ends up inling the default arg
// completely. TODO: verify with smaller test case.
// But scenario: If `interruptOnShutdown` is set as default arg to false, it will never be set true
// on the below as it's inlined by gcc 11.2.0 on Ubuntu 22.04 incorrectly. Behavior is correct
// in lower versions of gcc or across clang.
void ConsolidateRewards(CCustomCSView &view, int height,
const std::vector<std::pair<CScript, CAmount>> &items, bool interruptOnShutdown, int numWorkers) {
int nWorkers = numWorkers < 1 ? RewardConsolidationWorkersCount() : numWorkers;
auto rewardsTime = GetTimeMicros();
boost::asio::thread_pool workerPool(nWorkers);
boost::asio::thread_pool mergeWorker(1);
std::atomic<uint64_t> tasksCompleted{0};
std::atomic<uint64_t> reportedTs{0};

for (auto& [owner, amount] : items) {
// See https://github.com/DeFiCh/ain/pull/1291
// https://github.com/DeFiCh/ain/pull/1291#issuecomment-1137638060
// Technically not fully synchronized, but avoid races
// due to the segregated areas of operation.
boost::asio::post(workerPool, [&, &account = owner]() {
if (interruptOnShutdown && ShutdownRequested()) return;
auto tempView = std::make_unique<CCustomCSView>(view);
tempView->CalculateOwnerRewards(account, height);

boost::asio::post(mergeWorker, [&, tempView = std::move(tempView)]() {
if (interruptOnShutdown && ShutdownRequested()) return;
tempView->Flush();

// This entire block is already serialized with single merge worker.
// So, relaxed ordering is more than sufficient - don't even need
// atomics really.
auto itemsCompleted = tasksCompleted.fetch_add(1,
std::memory_order::memory_order_relaxed);
const auto logTimeIntervalMillis = 3 * 1000;
if (GetTimeMillis() - reportedTs > logTimeIntervalMillis) {
LogPrintf("Reward consolidation: %.2f%% completed (%d/%d)\n",
(itemsCompleted * 1.f / items.size()) * 100.0,
itemsCompleted, items.size());
reportedTs.store(GetTimeMillis(),
std::memory_order::memory_order_relaxed);
}
});
});
}
workerPool.join();
mergeWorker.join();

auto itemsCompleted = tasksCompleted.load();
LogPrintf("Reward consolidation: 100%% completed (%d/%d, time: %dms)\n",
itemsCompleted, itemsCompleted, MILLI * (GetTimeMicros() - rewardsTime));
}

static Res PoolSplits(CCustomCSView& view, CAmount& totalBalance, ATTRIBUTES& attributes, const DCT_ID oldTokenId, const DCT_ID newTokenId,
const CBlockIndex* pindex, const CreationTxs& creationTxs, const int32_t multiplier) {

Expand Down Expand Up @@ -3960,10 +4019,8 @@ static Res PoolSplits(CCustomCSView& view, CAmount& totalBalance, ATTRIBUTES& at
return true;
});

const auto workersMax = std::thread::hardware_concurrency() - 1;
auto nWorkers = workersMax > 2 ? workersMax: 3;

LogPrintf("Pool migration: Migrating balances (count: %d, total: %d, concurrency: %d)..\n",
auto nWorkers = RewardConsolidationWorkersCount();
LogPrintf("Pool migration: Consolidating rewards (count: %d, total: %d, concurrency: %d)..\n",
balancesToMigrate.size(), totalAccounts, nWorkers);

// Largest first to make sure we are over MINIMUM_LIQUIDITY on first call to AddLiquidity
Expand All @@ -3972,44 +4029,7 @@ static Res PoolSplits(CCustomCSView& view, CAmount& totalBalance, ATTRIBUTES& at
return a.second > b.second;
});

if (!balancesToMigrate.empty()) {
auto rewardsTime = GetTimeMicros();

boost::asio::thread_pool workerPool(nWorkers);
boost::asio::thread_pool mergeWorker(1);
std::atomic<uint64_t> tasksCompleted{0};
std::atomic<uint64_t> reportedTs{0};

for (auto& [owner, amount] : balancesToMigrate) {
// See https://github.com/DeFiCh/ain/pull/1291
// https://github.com/DeFiCh/ain/pull/1291#issuecomment-1137638060
// Technically not fully synchronized, but avoid races
// due to the segregated areas of operation.
boost::asio::post(workerPool, [&, &account = owner]() {
auto tempView = std::make_unique<CCustomCSView>(view);
tempView->CalculateOwnerRewards(account, pindex->nHeight);

boost::asio::post(mergeWorker, [&, tempView = std::move(tempView)]() {
tempView->Flush();

auto itemsCompleted = tasksCompleted.fetch_add(1);
const auto logTimeIntervalMillis = 3 * 1000;
if (GetTimeMillis() - reportedTs > logTimeIntervalMillis) {
LogPrintf("Balance migration: %.2f%% completed (%d/%d)\n",
(itemsCompleted * 1.f / balancesToMigrate.size()) * 100.0,
itemsCompleted, balancesToMigrate.size());
reportedTs.store(GetTimeMillis());
}
});
});
}
workerPool.join();
mergeWorker.join();

auto itemsCompleted = tasksCompleted.load();
LogPrintf("Balance migration: 100%% completed (%d/%d, time: %dms)\n",
itemsCompleted, itemsCompleted, MILLI * (GetTimeMicros() - rewardsTime));
}
ConsolidateRewards(view, pindex->nHeight, balancesToMigrate, false, nWorkers);

// Special case. No liquidity providers in a previously used pool.
if (balancesToMigrate.empty() && oldPoolPair->totalLiquidity == CPoolPair::MINIMUM_LIQUIDITY) {
Expand Down
4 changes: 4 additions & 0 deletions src/validation.h
Original file line number Diff line number Diff line change
Expand Up @@ -856,4 +856,8 @@ inline CAmount CalculateCoinbaseReward(const CAmount blockReward, const uint32_t

Res AddNonTxToBurnIndex(const CScript& from, const CBalances& amounts);

void ConsolidateRewards(CCustomCSView& view, int height,
const std::vector<std::pair<CScript, CAmount>> &items,
bool interruptOnShutdown, int numWorkers = 0);

#endif // DEFI_VALIDATION_H