Skip to content

Commit

Permalink
Merge #2749: [Budget] Use space and time efficient data structure to …
Browse files Browse the repository at this point in the history
…check for already sent item sync requests

d8e2c98 budget manager: use space and time efficient data structure for unknown item requests. (furszy)
f3a7fea budget manager: use g_netfulfilledman instead of mAskedUsForBudgetSync to prevent spam. (furszy)

Pull request description:

  In the arriving orphan vote process; the node, after trying to add the vote to the orphan map, tries to ask the proposal/budget to the peer that sent the vote, sending a single item sync request.
  The request, as the node can receive several votes going to the same proposal, is relayed only if the node haven't sent a sync request for the proposal already.  As we are using a map to perform this membership test, the time complexity is O(log n).

  This PR improves it migrating the map to a bloom filter, which time complexity for membership test is O(1).
  The false-positive rate, even when it's negligible (i set the rate to 0.001), isn't really a worry here because, as wrote above, this is only use to test for sync proposal request existence, which is not ban cause. It's just used to not send extra unneeded requests.

  Plus, aside from that, cleaned the other `mAskedUsForBudgetSync` map, using the already existent net request manager flow.

ACKs for top commit:
  random-zebra:
    ACK d8e2c98
  Fuzzbawls:
    ACK d8e2c98

Tree-SHA512: 8cc2dc945f9182aaa553588b7f2e741b6388f70e550d2a99946476796ff1885a153ec9b8284ef299aff418f7e095bfa96e679caf02963597158d5974a8e77b1d
  • Loading branch information
furszy committed Mar 8, 2022
2 parents fc63a6f + d8e2c98 commit 5a7b514
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 63 deletions.
60 changes: 14 additions & 46 deletions src/budget/budgetmanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,20 @@
#include "masternodeman.h"
#include "netmessagemaker.h"
#include "tiertwo/tiertwo_sync_state.h"
#include "tiertwo/netfulfilledman.h"
#include "util/validation.h"
#include "validation.h" // GetTransaction, cs_main

#ifdef ENABLE_WALLET
#include "wallet/wallet.h" // future: use interface instead.
#endif

// Peers can only request complete budget sync once per hour.
#define BUDGET_SYNC_REQUEST_ACCEPTANCE_SECONDS (60 * 60) // One hour.

CBudgetManager g_budgetman;
#define BUDGET_ORPHAN_VOTES_CLEANUP_SECONDS (60 * 60) // One hour.
// Request type used in the net requests manager to block peers asking budget sync too often
static const std::string BUDGET_SYNC_REQUEST_RECV = "budget-sync-recv";

std::map<uint256, int64_t> askedForSourceProposalOrBudget;
CBudgetManager g_budgetman;

// Used to check both proposals and finalized-budgets collateral txes
bool CheckCollateral(const uint256& nTxCollateralHash, const uint256& nExpectedHash, std::string& strError, int64_t& nTime, int nCurrentHeight, bool fBudgetFinalization);
Expand Down Expand Up @@ -958,11 +959,6 @@ bool CBudgetManager::AddAndRelayProposalVote(const CBudgetVote& vote, std::strin
}

void CBudgetManager::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload)
{
NewBlock();
}

void CBudgetManager::NewBlock()
{
if (g_tiertwo_sync_state.GetSyncPhase() <= MASTERNODE_SYNC_BUDGET) return;

Expand Down Expand Up @@ -993,15 +989,6 @@ void CBudgetManager::NewBlock()
// remove expired/heavily downvoted budgets
CheckAndRemove();

//remove invalid (from non-active masternode) votes once in a while
LogPrint(BCLog::MNBUDGET,"%s: askedForSourceProposalOrBudget cleanup - size: %d\n", __func__, askedForSourceProposalOrBudget.size());
for (auto it = askedForSourceProposalOrBudget.begin(); it != askedForSourceProposalOrBudget.end(); ) {
if (it->second <= GetTime() - (60 * 60 * 24)) {
it = askedForSourceProposalOrBudget.erase(it);
} else {
it++;
}
}
{
LOCK(cs_proposals);
LogPrint(BCLog::MNBUDGET,"%s: mapProposals cleanup - size: %d\n", __func__, mapProposals.size());
Expand All @@ -1017,26 +1004,12 @@ void CBudgetManager::NewBlock()
}
}

{
// Clean peers who asked for budget votes sync after an hour (BUDGET_SYNC_REQUEST_ACCEPTANCE_SECONDS)
LOCK2(cs_budgets, cs_proposals);
int64_t currentTime = GetTime();
auto itAskedBudSync = mAskedUsForBudgetSync.begin();
while (itAskedBudSync != mAskedUsForBudgetSync.end()) {
if ((*itAskedBudSync).second < currentTime) {
itAskedBudSync = mAskedUsForBudgetSync.erase(itAskedBudSync);
} else {
++itAskedBudSync;
}
}
}

int64_t now = GetTime();
const auto cleanOrphans = [now](auto& mutex, auto& mapOrphans, auto& mapSeen) {
LOCK(mutex);
for (auto it = mapOrphans.begin() ; it != mapOrphans.end();) {
int64_t lastReceivedVoteTime = it->second.second;
if (lastReceivedVoteTime + BUDGET_SYNC_REQUEST_ACCEPTANCE_SECONDS < now) {
if (lastReceivedVoteTime + BUDGET_ORPHAN_VOTES_CLEANUP_SECONDS < now) {
// Clean seen votes
for (const auto& voteIt : it->second.first) {
mapSeen.erase(voteIt.GetHash());
Expand Down Expand Up @@ -1067,12 +1040,9 @@ int CBudgetManager::ProcessBudgetVoteSync(const uint256& nProp, CNode* pfrom)
if (nProp.IsNull()) {
LOCK2(cs_budgets, cs_proposals);
if (!(pfrom->addr.IsRFC1918() || pfrom->addr.IsLocal())) {
auto itLastRequest = mAskedUsForBudgetSync.find(pfrom->addr);
if (itLastRequest != mAskedUsForBudgetSync.end() && GetTime() < (*itLastRequest).second) {
if (g_netfulfilledman.HasFulfilledRequest(pfrom->addr, BUDGET_SYNC_REQUEST_RECV)) {
LogPrint(BCLog::MASTERNODE, "budgetsync - peer %i already asked for budget sync\n", pfrom->GetId());
// The peers sync requests information is not stored on disk (for now), so
// the budget sync could be re-requested in less than the allowed time (due a node restart for example).
// So, for now, let's not be so hard with the node.
// let's not be so hard with the node for now.
return 10;
}
}
Expand Down Expand Up @@ -1475,11 +1445,9 @@ void CBudgetManager::Sync(CNode* pfrom, bool fPartial)
relayInventoryItems<CFinalizedBudget>(pfrom, cs_budgets, mapFinalizedBudgets, fPartial, MSG_BUDGET_FINALIZED, MASTERNODE_SYNC_BUDGET_FIN);

if (!fPartial) {
// Now that budget full sync request was handled, mark it as completed.
// We are not going to answer full budget sync requests for an hour (BUDGET_SYNC_REQUEST_ACCEPTANCE_SECONDS).
// We are not going to answer full budget sync requests for an hour (chainparams.FulfilledRequestExpireTime()).
// The remote peer can still do single prop and mnv sync requests if needed.
LOCK2(cs_budgets, cs_proposals);
mAskedUsForBudgetSync[pfrom->addr] = GetTime() + BUDGET_SYNC_REQUEST_ACCEPTANCE_SECONDS;
g_netfulfilledman.AddFulfilledRequest(pfrom->addr, BUDGET_SYNC_REQUEST_RECV);
}
}

Expand Down Expand Up @@ -1528,9 +1496,9 @@ bool CBudgetManager::UpdateProposal(const CBudgetVote& vote, CNode* pfrom, std::
TryAppendOrphanVoteMap<CBudgetVote>(vote, nProposalHash, mapOrphanProposalVotes, mapSeenProposalVotes);
}

if (!askedForSourceProposalOrBudget.count(nProposalHash)) {
if (!g_netfulfilledman.HasItemRequest(pfrom->addr, nProposalHash)) {
g_connman->PushMessage(pfrom, CNetMsgMaker(pfrom->GetSendVersion()).Make(NetMsgType::BUDGETVOTESYNC, nProposalHash));
askedForSourceProposalOrBudget[nProposalHash] = GetTime();
g_netfulfilledman.AddItemRequest(pfrom->addr, nProposalHash);
}
}

Expand Down Expand Up @@ -1559,9 +1527,9 @@ bool CBudgetManager::UpdateFinalizedBudget(const CFinalizedBudgetVote& vote, CNo
TryAppendOrphanVoteMap<CFinalizedBudgetVote>(vote, nBudgetHash, mapOrphanFinalizedBudgetVotes, mapSeenFinalizedBudgetVotes);
}

if (!askedForSourceProposalOrBudget.count(nBudgetHash)) {
if (!g_netfulfilledman.HasItemRequest(pfrom->addr, nBudgetHash)) {
g_connman->PushMessage(pfrom, CNetMsgMaker(pfrom->GetSendVersion()).Make(NetMsgType::BUDGETVOTESYNC, nBudgetHash));
askedForSourceProposalOrBudget[nBudgetHash] = GetTime();
g_netfulfilledman.AddItemRequest(pfrom->addr, nBudgetHash);
}
}

Expand Down
12 changes: 1 addition & 11 deletions src/budget/budgetmanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ class CBudgetManager : public CValidationInterface
// Memory Only. Updated in NewBlock (blocks arrive in order)
std::atomic<int> nBestHeight;

// Spam protection
// who's asked for the complete budget sync and the last time
std::map<CNetAddr, int64_t> mAskedUsForBudgetSync; // guarded by cs_budgets and cs_proposals.

struct HighestFinBudget {
const CFinalizedBudget* m_budget_fin{nullptr};
int m_vote_count{0};
Expand All @@ -67,7 +63,7 @@ class CBudgetManager : public CValidationInterface
mutable RecursiveMutex cs_votes;

// budget finalization
std::string strBudgetMode = "";
std::string strBudgetMode;

CBudgetManager() {}

Expand Down Expand Up @@ -114,7 +110,6 @@ class CBudgetManager : public CValidationInterface
bool ProcessMessage(CNode* pfrom, std::string& strCommand, CDataStream& vRecv, int& banScore);
/// Process the message and returns the ban score (0 if no banning is needed)
int ProcessMessageInner(CNode* pfrom, std::string& strCommand, CDataStream& vRecv);
void NewBlock();

int ProcessBudgetVoteSync(const uint256& nProp, CNode* pfrom);
int ProcessProposal(CBudgetProposal& proposal);
Expand Down Expand Up @@ -183,11 +178,6 @@ class CBudgetManager : public CValidationInterface
mapSeenFinalizedBudgetVotes.clear();
mapOrphanFinalizedBudgetVotes.clear();
}
{
LOCK2(cs_budgets, cs_proposals);
mAskedUsForBudgetSync.clear();
}

LogPrintf("Budget object cleared\n");
}
void CheckAndRemove();
Expand Down
28 changes: 26 additions & 2 deletions src/test/netfulfilledman_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ BOOST_AUTO_TEST_CASE(netfulfilledman_simple_add_and_expire)
int64_t now = GetTime();
SetMockTime(now);

CNetFulfilledRequestManager fulfilledMan;
CNetFulfilledRequestManager fulfilledMan(DEFAULT_ITEMS_FILTER_SIZE);
CService service = LookupNumeric("1.1.1.1", 9999);
std::string request = "request";
BOOST_ASSERT(!fulfilledMan.HasFulfilledRequest(service, request));
Expand All @@ -27,7 +27,7 @@ BOOST_AUTO_TEST_CASE(netfulfilledman_simple_add_and_expire)
BOOST_ASSERT(fulfilledMan.HasFulfilledRequest(service, request));

// Advance mock time to surpass FulfilledRequestExpireTime
SetMockTime(now + 60 * 60 + 1);
SetMockTime(GetMockTime() + 60 * 60 + 1);

// Verify that the request exists and expired now
BOOST_CHECK(fulfilledMan.Size() == 1);
Expand All @@ -36,6 +36,30 @@ BOOST_AUTO_TEST_CASE(netfulfilledman_simple_add_and_expire)
// Verify request removal
fulfilledMan.CheckAndRemove();
BOOST_CHECK(fulfilledMan.Size() == 0);

// Items filter, insertion and lookup.
uint256 item(g_insecure_rand_ctx.rand256());
fulfilledMan.AddItemRequest(service, item);
BOOST_CHECK(fulfilledMan.HasItemRequest(service, item));

CService service2 = LookupNumeric("1.1.1.2", 9999);
BOOST_CHECK(!fulfilledMan.HasItemRequest(service2, item));

// Advance mock time to surpass the expiration time
SetMockTime(GetMockTime() + DEFAULT_ITEMS_FILTER_CLEANUP + 1);
fulfilledMan.CheckAndRemove();
BOOST_CHECK(!fulfilledMan.HasItemRequest(service, item));

// Now test filling up the filter
fulfilledMan.AddItemRequest(service, item);
for (int i = 0; i < 300; i++) {
uint256 element(g_insecure_rand_ctx.rand256());
fulfilledMan.AddItemRequest(service, element);
BOOST_CHECK(fulfilledMan.HasItemRequest(service, element));
}

fulfilledMan.CheckAndRemove();
BOOST_CHECK(!fulfilledMan.HasItemRequest(service, item));
}

BOOST_AUTO_TEST_SUITE_END()
2 changes: 1 addition & 1 deletion src/tiertwo/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ bool LoadTierTwo(int chain_active_height, bool load_cache_files)
LogPrintf("Failed to load network requests cache from %s", netRequestsDb.GetDbPath().string());
}
} else {
CNetFulfilledRequestManager netfulfilledmanTmp;
CNetFulfilledRequestManager netfulfilledmanTmp(0);
if (!netRequestsDb.Dump(netfulfilledmanTmp)) {
LogPrintf("Failed to clear network requests cache at %s", netRequestsDb.GetDbPath().string());
}
Expand Down
39 changes: 38 additions & 1 deletion src/tiertwo/netfulfilledman.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,15 @@
#include "shutdown.h"
#include "utiltime.h"

CNetFulfilledRequestManager g_netfulfilledman;
CNetFulfilledRequestManager g_netfulfilledman(DEFAULT_ITEMS_FILTER_SIZE);

CNetFulfilledRequestManager::CNetFulfilledRequestManager(unsigned int _itemsFilterSize)
{
itemsFilterSize = _itemsFilterSize;
if (itemsFilterSize != 0) {
itemsFilter = std::make_unique<CBloomFilter>(itemsFilterSize, 0.001, 0, BLOOM_UPDATE_ALL);
}
}

void CNetFulfilledRequestManager::AddFulfilledRequest(const CService& addr, const std::string& strRequest)
{
Expand All @@ -30,6 +38,29 @@ bool CNetFulfilledRequestManager::HasFulfilledRequest(const CService& addr, cons
return false;
}

static std::vector<unsigned char> convertElement(const CService& addr, const uint256& itemHash)
{
CDataStream stream(SER_NETWORK, PROTOCOL_VERSION);
stream << addr.GetAddrBytes();
stream << itemHash;
return {stream.begin(), stream.end()};
}

void CNetFulfilledRequestManager::AddItemRequest(const CService& addr, const uint256& itemHash)
{
LOCK(cs_mapFulfilledRequests);
assert(itemsFilter);
itemsFilter->insert(convertElement(addr, itemHash));
itemsFilterCount++;
}

bool CNetFulfilledRequestManager::HasItemRequest(const CService& addr, const uint256& itemHash) const
{
LOCK(cs_mapFulfilledRequests);
assert(itemsFilter);
return itemsFilter->contains(convertElement(addr, itemHash));
}

void CNetFulfilledRequestManager::CheckAndRemove()
{
LOCK(cs_mapFulfilledRequests);
Expand All @@ -48,6 +79,12 @@ void CNetFulfilledRequestManager::CheckAndRemove()
it++;
}
}

if (now > lastFilterCleanup || itemsFilterCount >= itemsFilterSize) {
itemsFilter->clear();
itemsFilterCount = 0;
lastFilterCleanup = now + filterCleanupTime;
}
}

void CNetFulfilledRequestManager::Clear()
Expand Down
19 changes: 17 additions & 2 deletions src/tiertwo/netfulfilledman.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,21 @@
#ifndef PIVX_NETFULFILLEDMAN_H
#define PIVX_NETFULFILLEDMAN_H

#include "bloom.h"
#include "serialize.h"
#include "sync.h"

#include <map>

class CBloomFilter;
class CService;

static const std::string NET_REQUESTS_CACHE_FILENAME = "netrequests.dat";
static const std::string NET_REQUESTS_CACHE_FILE_ID = "magicNetRequestsCache";

static const unsigned int DEFAULT_ITEMS_FILTER_SIZE = 250;
static const unsigned int DEFAULT_ITEMS_FILTER_CLEANUP = 60 * 60;

// Fulfilled requests are used to prevent nodes from asking the same data on sync
// and being banned for doing it too often.
class CNetFulfilledRequestManager
Expand All @@ -25,11 +30,17 @@ class CNetFulfilledRequestManager
typedef std::map<CService, fulfilledreqmapentry_t> fulfilledreqmap_t;

// Keep track of what node has/was asked for and when
fulfilledreqmap_t mapFulfilledRequests;
fulfilledreqmap_t mapFulfilledRequests GUARDED_BY(cs_mapFulfilledRequests);
mutable Mutex cs_mapFulfilledRequests;

std::unique_ptr<CBloomFilter> itemsFilter GUARDED_BY(cs_mapFulfilledRequests){nullptr};
unsigned int itemsFilterSize{0};
unsigned int itemsFilterCount{0};
int64_t filterCleanupTime{DEFAULT_ITEMS_FILTER_CLEANUP}; // for now, fixed cleanup time
int64_t lastFilterCleanup{0};

public:
CNetFulfilledRequestManager() = default;
CNetFulfilledRequestManager(unsigned int itemsFilterSize);

SERIALIZE_METHODS(CNetFulfilledRequestManager, obj) {
LOCK(obj.cs_mapFulfilledRequests);
Expand All @@ -39,6 +50,10 @@ class CNetFulfilledRequestManager
void AddFulfilledRequest(const CService& addr, const std::string& strRequest);
bool HasFulfilledRequest(const CService& addr, const std::string& strRequest) const;

// Faster lookup using bloom filter
void AddItemRequest(const CService& addr, const uint256& itemHash);
bool HasItemRequest(const CService& addr, const uint256& itemHash) const;

void CheckAndRemove();
void Clear();

Expand Down

0 comments on commit 5a7b514

Please sign in to comment.