Skip to content

Commit

Permalink
Merge pull request #3963 from marta-lokhova/tx_queue_updates
Browse files Browse the repository at this point in the history
Code cleanup, fixes to tx queue

Reviewed-by: dmkozh
  • Loading branch information
latobarita authored Oct 16, 2023
2 parents fcbbad4 + 69a69b0 commit 8e4c44f
Show file tree
Hide file tree
Showing 13 changed files with 259 additions and 183 deletions.
2 changes: 1 addition & 1 deletion src/herder/Herder.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ class Herder
// restores Herder's state from disk
virtual void start() = 0;

virtual void lastClosedLedgerIncreased() = 0;
virtual void lastClosedLedgerIncreased(bool latest) = 0;

// Setup Herder's state to fully participate in consensus
virtual void setTrackingSCPState(uint64_t index, StellarValue const& value,
Expand Down
164 changes: 105 additions & 59 deletions src/herder/HerderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,6 @@ HerderImpl::HerderImpl(Application& app)
: mTransactionQueue(app, TRANSACTION_QUEUE_TIMEOUT_LEDGERS,
TRANSACTION_QUEUE_BAN_LEDGERS,
TRANSACTION_QUEUE_SIZE_MULTIPLIER)
, mSorobanTransactionQueue(app, TRANSACTION_QUEUE_TIMEOUT_LEDGERS,
TRANSACTION_QUEUE_BAN_LEDGERS,
SOROBAN_TRANSACTION_QUEUE_SIZE_MULTIPLIER)
, mPendingEnvelopes(app, *this)
, mHerderSCPDriver(app, *this, mUpgrades, mPendingEnvelopes)
, mLastSlotSaved(0)
Expand Down Expand Up @@ -268,7 +265,10 @@ HerderImpl::shutdown()
mLastQuorumMapIntersectionState.mInterruptFlag = true;
}
mTransactionQueue.shutdown();
mSorobanTransactionQueue.shutdown();
if (mSorobanTransactionQueue)
{
mSorobanTransactionQueue->shutdown();
}

mTxSetGarbageCollectTimer.cancel();
}
Expand Down Expand Up @@ -525,41 +525,38 @@ HerderImpl::recvTransaction(TransactionFrameBasePtr tx, bool submittedFromSelf)
ZoneScoped;
TransactionQueue::AddResult result;

auto const& lcl = mLedgerManager.getLastClosedLedgerHeader();
if (protocolVersionStartsFrom(lcl.header.ledgerVersion,
SOROBAN_PROTOCOL_VERSION))
// Allow txs of the same kind to reach the tx queue in case it can be
// replaced by fee
bool hasSoroban =
mSorobanTransactionQueue &&
mSorobanTransactionQueue->sourceAccountPending(tx->getSourceID()) &&
!tx->isSoroban();
bool hasClassic =
mTransactionQueue.sourceAccountPending(tx->getSourceID()) &&
tx->isSoroban();
if (hasSoroban || hasClassic)
{
// Allow txs of the same kind to reach the tx queue in case it can be
// replaced by fee
bool hasSoroban =
mSorobanTransactionQueue.sourceAccountPending(tx->getSourceID()) &&
!tx->isSoroban();
bool hasClassic =
mTransactionQueue.sourceAccountPending(tx->getSourceID()) &&
tx->isSoroban();
if (hasSoroban || hasClassic)
{
CLOG_DEBUG(
Herder,
"recv transaction {} for {} rejected due to 1 tx per source "
"account per ledger limit",
hexAbbrev(tx->getFullHash()),
KeyUtils::toShortString(tx->getSourceID()));
result = TransactionQueue::AddResult::ADD_STATUS_TRY_AGAIN_LATER;
}
else if (tx->isSoroban())
{
result = mSorobanTransactionQueue.tryAdd(tx, submittedFromSelf);
}
else
{
result = mTransactionQueue.tryAdd(tx, submittedFromSelf);
}
CLOG_DEBUG(Herder,
"recv transaction {} for {} rejected due to 1 tx per source "
"account per ledger limit",
hexAbbrev(tx->getFullHash()),
KeyUtils::toShortString(tx->getSourceID()));
result = TransactionQueue::AddResult::ADD_STATUS_TRY_AGAIN_LATER;
}
else
else if (!tx->isSoroban())
{
result = mTransactionQueue.tryAdd(tx, submittedFromSelf);
}
else if (mSorobanTransactionQueue)
{
result = mSorobanTransactionQueue->tryAdd(tx, submittedFromSelf);
}
else
{
// Received Soroban transaction before protocol 20; since this
// transaction isn't supported yet, return ERROR
result = TransactionQueue::AddResult::ADD_STATUS_ERROR;
}

if (result == TransactionQueue::AddResult::ADD_STATUS_PENDING)
{
Expand Down Expand Up @@ -839,10 +836,13 @@ HerderImpl::externalizeValue(TxSetFrameConstPtr txSet, uint32_t ledgerSeq,
bool
HerderImpl::sourceAccountPending(AccountID const& accountID) const
{
return mApp.getHerder().getTransactionQueue().sourceAccountPending(
accountID) ||
mApp.getHerder().getSorobanTransactionQueue().sourceAccountPending(
accountID);
bool accPending = mTransactionQueue.sourceAccountPending(accountID);
if (mSorobanTransactionQueue)
{
accPending = accPending ||
mSorobanTransactionQueue->sourceAccountPending(accountID);
}
return accPending;
}

#endif
Expand Down Expand Up @@ -1010,7 +1010,8 @@ HerderImpl::getTransactionQueue()
SorobanTransactionQueue&
HerderImpl::getSorobanTransactionQueue()
{
return mSorobanTransactionQueue;
releaseAssert(mSorobanTransactionQueue);
return *mSorobanTransactionQueue;
}
#endif

Expand Down Expand Up @@ -1057,14 +1058,20 @@ HerderImpl::safelyProcessSCPQueue(bool synchronous)
}

void
HerderImpl::lastClosedLedgerIncreased()
HerderImpl::lastClosedLedgerIncreased(bool latest)
{
releaseAssert(isTracking());
releaseAssert(trackingConsensusLedgerIndex() ==
mLedgerManager.getLastClosedLedgerNum());
releaseAssert(mLedgerManager.isSynced());
maybeSetupSorobanQueue(
mLedgerManager.getLastClosedLedgerHeader().header.ledgerVersion);

setupTriggerNextLedger();
if (latest)
{
releaseAssert(isTracking());
releaseAssert(trackingConsensusLedgerIndex() ==
mLedgerManager.getLastClosedLedgerNum());
releaseAssert(mLedgerManager.isSynced());

setupTriggerNextLedger();
}
}

void
Expand Down Expand Up @@ -1212,9 +1219,10 @@ HerderImpl::getMinLedgerSeqToAskPeers() const
SequenceNumber
HerderImpl::getMaxSeqInPendingTxs(AccountID const& acc)
{
if (mSorobanTransactionQueue.sourceAccountPending(acc))
if (mSorobanTransactionQueue &&
mSorobanTransactionQueue->sourceAccountPending(acc))
{
return mSorobanTransactionQueue.getAccountTransactionQueueInfo(acc)
return mSorobanTransactionQueue->getAccountTransactionQueueInfo(acc)
.mMaxSeq;
}
return mTransactionQueue.getAccountTransactionQueueInfo(acc).mMaxSeq;
Expand Down Expand Up @@ -1279,8 +1287,9 @@ HerderImpl::triggerNextLedger(uint32_t ledgerSeqToTrigger,
if (protocolVersionStartsFrom(lcl.header.ledgerVersion,
SOROBAN_PROTOCOL_VERSION))
{
releaseAssert(mSorobanTransactionQueue);
txPhases.emplace_back(
mSorobanTransactionQueue.getTransactions(lcl.header));
mSorobanTransactionQueue->getTransactions(lcl.header));
}

// We pick as next close time the current time unless it's before the last
Expand Down Expand Up @@ -1324,7 +1333,8 @@ HerderImpl::triggerNextLedger(uint32_t ledgerSeqToTrigger,
if (protocolVersionStartsFrom(lcl.header.ledgerVersion,
SOROBAN_PROTOCOL_VERSION))
{
mSorobanTransactionQueue.ban(
releaseAssert(mSorobanTransactionQueue);
mSorobanTransactionQueue->ban(
invalidTxPhases[static_cast<size_t>(TxSetFrame::Phase::SOROBAN)]);
}

Expand Down Expand Up @@ -2022,6 +2032,27 @@ HerderImpl::maybeHandleUpgrade()
}
}

void
HerderImpl::maybeSetupSorobanQueue(uint32_t protocolVersion)
{
if (protocolVersionStartsFrom(protocolVersion, SOROBAN_PROTOCOL_VERSION))
{
if (!mSorobanTransactionQueue)
{
mSorobanTransactionQueue =
std::make_unique<SorobanTransactionQueue>(
mApp, TRANSACTION_QUEUE_TIMEOUT_LEDGERS,
TRANSACTION_QUEUE_BAN_LEDGERS,
SOROBAN_TRANSACTION_QUEUE_SIZE_MULTIPLIER);
}
}
else if (mSorobanTransactionQueue)
{
throw std::runtime_error(
"Invalid state: Soroban queue initialized before v20");
}
}

void
HerderImpl::start()
{
Expand All @@ -2031,13 +2062,15 @@ HerderImpl::start()
/* shouldUpdateLastModified */ true,
TransactionMode::READ_ONLY_WITHOUT_SQL_TXN);

if (protocolVersionStartsFrom(ltx.loadHeader().current().ledgerVersion,
SOROBAN_PROTOCOL_VERSION))
uint32_t version = ltx.loadHeader().current().ledgerVersion;
if (protocolVersionStartsFrom(version, SOROBAN_PROTOCOL_VERSION))
{
auto const& conf =
mApp.getLedgerManager().getSorobanNetworkConfig(ltx);
mMaxTxSize = std::max(mMaxTxSize, conf.txMaxSizeBytes());
}

maybeSetupSorobanQueue(version);
}

auto const& cfg = mApp.getConfig();
Expand Down Expand Up @@ -2084,7 +2117,10 @@ HerderImpl::start()
// make sure that the transaction queue is setup against
// the lcl that we have right now
mTransactionQueue.maybeVersionUpgraded();
mSorobanTransactionQueue.maybeVersionUpgraded();
if (mSorobanTransactionQueue)
{
mSorobanTransactionQueue->maybeVersionUpgraded();
}

startTxSetGCTimer();
}
Expand Down Expand Up @@ -2182,9 +2218,13 @@ HerderImpl::updateTransactionQueue(TxSetFrameConstPtr txSet)

updateQueue(mTransactionQueue,
txSet->getTxsForPhase(TxSetFrame::Phase::CLASSIC));
if (txSet->numPhases() > static_cast<size_t>(TxSetFrame::Phase::SOROBAN))
// Even if we're in protocol 20, still check for number of phases, in case
// we're dealing with the upgrade ledger that contains old-style transaction
// set
if (txSet->numPhases() > static_cast<size_t>(TxSetFrame::Phase::SOROBAN) &&
mSorobanTransactionQueue)
{
updateQueue(mSorobanTransactionQueue,
updateQueue(*mSorobanTransactionQueue,
txSet->getTxsForPhase(TxSetFrame::Phase::SOROBAN));
}
}
Expand Down Expand Up @@ -2299,23 +2339,29 @@ HerderImpl::getMaxQueueSizeOps() const
size_t
HerderImpl::getMaxQueueSizeSorobanOps() const
{
return mSorobanTransactionQueue.getMaxQueueSizeOps();
return mSorobanTransactionQueue
? mSorobanTransactionQueue->getMaxQueueSizeOps()
: 0;
}

bool
HerderImpl::isBannedTx(Hash const& hash) const
{
return mTransactionQueue.isBanned(hash) ||
mSorobanTransactionQueue.isBanned(hash);
auto banned = mTransactionQueue.isBanned(hash);
if (mSorobanTransactionQueue)
{
banned = banned || mSorobanTransactionQueue->isBanned(hash);
}
return banned;
}

TransactionFrameBaseConstPtr
HerderImpl::getTx(Hash const& hash) const
{
auto classic = mTransactionQueue.getTx(hash);
if (!classic)
if (!classic && mSorobanTransactionQueue)
{
return mSorobanTransactionQueue.getTx(hash);
return mSorobanTransactionQueue->getTx(hash);
}
return classic;
}
Expand Down
5 changes: 3 additions & 2 deletions src/herder/HerderImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class HerderImpl : public Herder

void start() override;

void lastClosedLedgerIncreased() override;
void lastClosedLedgerIncreased(bool latest) override;

SCP& getSCP();
HerderSCPDriver&
Expand Down Expand Up @@ -230,9 +230,10 @@ class HerderImpl : public Herder
void writeDebugTxSet(LedgerCloseData const& lcd);

ClassicTransactionQueue mTransactionQueue;
SorobanTransactionQueue mSorobanTransactionQueue;
std::unique_ptr<SorobanTransactionQueue> mSorobanTransactionQueue;

void updateTransactionQueue(TxSetFrameConstPtr txSet);
void maybeSetupSorobanQueue(uint32_t protocolVersion);

PendingEnvelopes mPendingEnvelopes;
Upgrades mUpgrades;
Expand Down
3 changes: 3 additions & 0 deletions src/herder/PendingEnvelopes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ PendingEnvelopes::peerDoesntHave(MessageType type, Hash const& itemID,
{
switch (type)
{
// Subtle: it is important to treat both TX_SET and GENERALIZED_TX_SET the
// same way here, since the sending node may have the type wrong depending
// on the protocol version
case TX_SET:
case GENERALIZED_TX_SET:
mTxSetFetcher.doesntHave(itemID, peer);
Expand Down
22 changes: 5 additions & 17 deletions src/herder/TransactionQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1231,14 +1231,6 @@ SorobanTransactionQueue::getMaxResourcesToFloodThisPeriod() const

LedgerTxn ltx(mApp.getLedgerTxnRoot(), /* shouldUpdateLastModified */ true,
TransactionMode::READ_ONLY_WITHOUT_SQL_TXN);

// If we're not on the right protocol yet, there's nothing to broadcast
if (protocolVersionIsBefore(ltx.loadHeader().current().ledgerVersion,
SOROBAN_PROTOCOL_VERSION))
{
return std::make_pair(Resource::makeEmpty(true), std::nullopt);
}

auto sorRes = mApp.getLedgerManager().maxLedgerResources(true, ltx);

auto totalFloodPerLedger = multiplyByDouble(sorRes, ratePerLedger);
Expand Down Expand Up @@ -1307,16 +1299,12 @@ SorobanTransactionQueue::broadcastSome()

LedgerTxn ltx(mApp.getLedgerTxnRoot(), true,
TransactionMode::READ_ONLY_WITHOUT_SQL_TXN);
if (protocolVersionStartsFrom(ltx.loadHeader().current().ledgerVersion,
SOROBAN_PROTOCOL_VERSION))
Resource maxPerTx = mApp.getLedgerManager().maxTransactionResources(
/* isSoroban */ true, ltx);
for (auto& resLeft : mBroadcastOpCarryover)
{
Resource maxPerTx = mApp.getLedgerManager().maxTransactionResources(
/* isSoroban */ true, ltx);
for (auto& resLeft : mBroadcastOpCarryover)
{
// Limit carry-over to 1 maximum resource transaction
resLeft = limitTo(resLeft, maxPerTx);
}
// Limit carry-over to 1 maximum resource transaction
resLeft = limitTo(resLeft, maxPerTx);
}
return !totalResToFlood.isZero();
}
Expand Down
Loading

0 comments on commit 8e4c44f

Please sign in to comment.