Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow ledger close path to run async during externalize #4506

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/bucket/test/BucketTestUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ closeLedger(Application& app, std::optional<SecretKey> skToSignValue,
app.getHerder().externalizeValue(TxSetXDRFrame::makeEmpty(lcl), ledgerNum,
lcl.header.scpValue.closeTime, upgrades,
skToSignValue);
testutil::crankUntil(
app,
[&lm, ledgerNum]() { return lm.getLastClosedLedgerNum() == ledgerNum; },
std::chrono::seconds(10));
return lm.getLastClosedLedgerHeader().hash;
}

Expand Down
3 changes: 2 additions & 1 deletion src/catchup/ApplyLedgerWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ BasicWork::State
ApplyLedgerWork::onRun()
{
ZoneScoped;
mApp.getLedgerManager().closeLedger(mLedgerCloseData);
mApp.getLedgerManager().closeLedger(mLedgerCloseData,
/* externalize */ false);
return BasicWork::State::WORK_SUCCESS;
}

Expand Down
7 changes: 6 additions & 1 deletion src/catchup/CatchupManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,12 @@ class CatchupManager

// Process ledgers that could not be applied, and determine if catchup
// should run
virtual void processLedger(LedgerCloseData const& ledgerData) = 0;

// Return true is latest ledger was applied, and there are no syncing
// ledgers, Return false if ledgers are buffered with gaps, and we need to
// start catchup
virtual bool processLedger(LedgerCloseData const& ledgerData,
bool isLatestSlot) = 0;

// Forcibly switch the application into catchup mode, treating `toLedger`
// as the destination ledger number and count as the number of past ledgers
Expand Down
111 changes: 77 additions & 34 deletions src/catchup/CatchupManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,12 @@ CatchupManagerImpl::getCatchupCount()
: mApp.getConfig().CATCHUP_RECENT;
}

void
CatchupManagerImpl::processLedger(LedgerCloseData const& ledgerData)
bool
CatchupManagerImpl::processLedger(LedgerCloseData const& ledgerData,
bool isLatestSlot)
{
maybeUpdateLastQueuedToApply();

ZoneScoped;
if (catchupWorkIsDone())
{
Expand All @@ -119,26 +122,32 @@ CatchupManagerImpl::processLedger(LedgerCloseData const& ledgerData)
logAndUpdateCatchupStatus(true);
}

// Always skip old ledgers
uint32_t lastReceivedLedgerSeq = ledgerData.getLedgerSeq();
if (lastReceivedLedgerSeq <= *mLastQueuedToApply)
{
// If LCL is already at-or-ahead of the ledger we just received from the
// network, we're up to date. Return early, nothing to do.
CLOG_INFO(
Ledger,
"Skipping close ledger: local state is {}, more recent than {}",
*mLastQueuedToApply, ledgerData.getLedgerSeq());
return true;
}

// Always add a newer ledger, maybe apply
mSyncingLedgers.emplace(lastReceivedLedgerSeq, ledgerData);
mLargestLedgerSeqHeard =
std::max(mLargestLedgerSeqHeard, lastReceivedLedgerSeq);

// 1. CatchupWork is not running yet
// 2. CatchupManager received ledger that was immediately applied by
// 2. CatchupManager received ledger that should be immediately applied by
// LedgerManager: check if we have any sequential ledgers.
// If so, attempt to apply mSyncingLedgers and possibly get back in sync
if (!mCatchupWork && lastReceivedLedgerSeq ==
mApp.getLedgerManager().getLastClosedLedgerNum())
if (!mCatchupWork && lastReceivedLedgerSeq == *mLastQueuedToApply + 1)
{
tryApplySyncingLedgers();
return;
}
else if (lastReceivedLedgerSeq <=
mApp.getLedgerManager().getLastClosedLedgerNum())
{
// If LCL is already at-or-ahead of the ledger we just received from the
// network, we're up to date. Return early, nothing to do.
return;
return true;
}

// For the rest of this method: we know LCL has fallen behind the network
Expand All @@ -151,6 +160,9 @@ CatchupManagerImpl::processLedger(LedgerCloseData const& ledgerData)
// to history and commence catchup, running the (checkpoint-driven) catchup
// state machine to ledger L-1 (the end of the checkpoint covering K) and
// then replay buffered ledgers from L onwards.
CLOG_INFO(Ledger,
"Close of ledger {} buffered. mSyncingLedgers has {} ledgers",
ledgerData.getLedgerSeq(), mSyncingLedgers.size());

// First: if CatchupWork has started, just buffer and return early.
if (mCatchupWork)
Expand All @@ -160,17 +172,17 @@ CatchupManagerImpl::processLedger(LedgerCloseData const& ledgerData)
auto const& config = mCatchupWork->getCatchupConfiguration();
if (ledgerData.getLedgerSeq() <= config.toLedger())
{
return;
return false;
}

addAndTrimSyncingLedgers(ledgerData);
trimSyncingLedgers();
logAndUpdateCatchupStatus(true);
return;
return false;
}

// Next, we buffer every out of sync ledger to allow us to get back in sync
// in case the ledgers we're missing are received.
addAndTrimSyncingLedgers(ledgerData);
trimSyncingLedgers();

// Finally we wait some number of ledgers beyond the smallest buffered
// checkpoint ledger before we trigger the CatchupWork. This could be any
Expand Down Expand Up @@ -233,6 +245,7 @@ CatchupManagerImpl::processLedger(LedgerCloseData const& ledgerData)
}
}
logAndUpdateCatchupStatus(true, message);
return false;
}

void
Expand All @@ -241,7 +254,9 @@ CatchupManagerImpl::startCatchup(
std::set<std::shared_ptr<Bucket>> bucketsToRetain)
{
ZoneScoped;
auto lastClosedLedger = mApp.getLedgerManager().getLastClosedLedgerNum();
maybeUpdateLastQueuedToApply();

auto lastClosedLedger = *mLastQueuedToApply;
if ((configuration.toLedger() != CatchupConfiguration::CURRENT) &&
(configuration.toLedger() <= lastClosedLedger))
{
Expand Down Expand Up @@ -329,10 +344,13 @@ CatchupManagerImpl::logAndUpdateCatchupStatus(bool contiguous)
std::optional<LedgerCloseData>
CatchupManagerImpl::maybeGetNextBufferedLedgerToApply()
{
releaseAssert(threadIsMain());
// Since we just applied a ledger, refresh mLastQueuedToApply
maybeUpdateLastQueuedToApply();

trimSyncingLedgers();
if (!mSyncingLedgers.empty() &&
mSyncingLedgers.begin()->first ==
mApp.getLedgerManager().getLastClosedLedgerNum() + 1)
mSyncingLedgers.begin()->first == *mLastQueuedToApply + 1)
{
return std::make_optional<LedgerCloseData>(
mSyncingLedgers.begin()->second);
Expand Down Expand Up @@ -370,14 +388,18 @@ CatchupManagerImpl::syncMetrics()
}

void
CatchupManagerImpl::addAndTrimSyncingLedgers(LedgerCloseData const& ledgerData)
CatchupManagerImpl::maybeUpdateLastQueuedToApply()
{
mSyncingLedgers.emplace(ledgerData.getLedgerSeq(), ledgerData);
trimSyncingLedgers();

CLOG_INFO(Ledger,
"Close of ledger {} buffered. mSyncingLedgers has {} ledgers",
ledgerData.getLedgerSeq(), mSyncingLedgers.size());
if (mLastQueuedToApply)
{
mLastQueuedToApply = mApp.getLedgerManager().getLastClosedLedgerNum();
}
else
{
mLastQueuedToApply =
std::max(*mLastQueuedToApply,
mApp.getLedgerManager().getLastClosedLedgerNum());
}
}

void
Expand Down Expand Up @@ -408,7 +430,7 @@ CatchupManagerImpl::trimSyncingLedgers()
// This erases [begin, it).
mSyncingLedgers.erase(mSyncingLedgers.begin(), it);
};
removeLedgersLessThan(mApp.getLedgerManager().getLastClosedLedgerNum() + 1);
removeLedgersLessThan(*mLastQueuedToApply + 1);
auto& hm = mApp.getHistoryManager();
if (!mSyncingLedgers.empty())
{
Expand Down Expand Up @@ -439,8 +461,7 @@ void
CatchupManagerImpl::tryApplySyncingLedgers()
{
ZoneScoped;
auto const& ledgerHeader =
mApp.getLedgerManager().getLastClosedLedgerHeader();
uint32_t nextToClose = *mLastQueuedToApply + 1;

// We can apply multiple ledgers here, which might be slow. This is a rare
// occurrence so we should be fine.
Expand All @@ -450,16 +471,38 @@ CatchupManagerImpl::tryApplySyncingLedgers()
auto const& lcd = it->second;

// we still have a missing ledger
if (ledgerHeader.header.ledgerSeq + 1 != lcd.getLedgerSeq())
if (nextToClose != lcd.getLedgerSeq())
{
break;
}

mApp.getLedgerManager().closeLedger(lcd);
CLOG_INFO(History, "Closed buffered ledger: {}",
LedgerManager::ledgerAbbrev(ledgerHeader));
if (mApp.getConfig()
.ARTIFICIALLY_DELAY_LEDGER_CLOSE_FOR_TESTING.count() > 0)
{
// Close a ledger asynchronously, with an added delay
// Usefult to test async extrnalize flow
mApp.postOnMainThread(
[&app = mApp, lcd]() {
if (app.isStopping())
{
return;
}
std::this_thread::sleep_for(
app.getConfig()
.ARTIFICIALLY_DELAY_LEDGER_CLOSE_FOR_TESTING);
app.getLedgerManager().closeLedger(lcd,
/* externalize */ true);
},
"closeLedger");
}
else
{
mApp.getLedgerManager().closeLedger(lcd, /* externalize */ true);
}
mLastQueuedToApply = lcd.getLedgerSeq();

++it;
++nextToClose;
}

mSyncingLedgers.erase(mSyncingLedgers.cbegin(), it);
Expand Down
9 changes: 7 additions & 2 deletions src/catchup/CatchupManagerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,12 @@ class CatchupManagerImpl : public CatchupManager
// maintain the invariants above.
std::map<uint32_t, LedgerCloseData> mSyncingLedgers;
medida::Counter& mSyncingLedgersSize;
// Most recent ledger that was queued to be applied by CatchupManager.
// Once applued, and before a new ledger is scheduled, this is equivalent to
// LCL.
std::optional<uint32_t> mLastQueuedToApply;

void addAndTrimSyncingLedgers(LedgerCloseData const& ledgerData);
void maybeUpdateLastQueuedToApply();
void startOnlineCatchup();
void trimSyncingLedgers();
void tryApplySyncingLedgers();
Expand All @@ -61,7 +65,8 @@ class CatchupManagerImpl : public CatchupManager
CatchupManagerImpl(Application& app);
~CatchupManagerImpl() override;

void processLedger(LedgerCloseData const& ledgerData) override;
bool processLedger(LedgerCloseData const& ledgerData,
bool isLatestSlot) override;
void
startCatchup(CatchupConfiguration configuration,
std::shared_ptr<HistoryArchive> archive,
Expand Down
6 changes: 3 additions & 3 deletions src/catchup/DownloadApplyTxsWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ DownloadApplyTxsWork::DownloadApplyTxsWork(
: BatchWork(app, "download-apply-ledgers")
, mRange(range)
, mDownloadDir(downloadDir)
, mLastApplied(lastApplied)
, mLastQueuedToApply(lastApplied)
, mCheckpointToQueue(
app.getHistoryManager().checkpointContainingLedger(range.mFirst))
, mWaitForPublish(waitForPublish)
Expand Down Expand Up @@ -171,7 +171,7 @@ DownloadApplyTxsWork::resetIter()
mCheckpointToQueue =
mApp.getHistoryManager().checkpointContainingLedger(mRange.mFirst);
mLastYieldedWork.reset();
mLastApplied = mApp.getLedgerManager().getLastClosedLedgerHeader();
mLastQueuedToApply = mApp.getLedgerManager().getLastClosedLedgerHeader();
}

bool
Expand All @@ -189,7 +189,7 @@ DownloadApplyTxsWork::hasNext() const
void
DownloadApplyTxsWork::onSuccess()
{
mLastApplied = mApp.getLedgerManager().getLastClosedLedgerHeader();
mLastQueuedToApply = mApp.getLedgerManager().getLastClosedLedgerHeader();
}

std::string
Expand Down
2 changes: 1 addition & 1 deletion src/catchup/DownloadApplyTxsWork.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class DownloadApplyTxsWork : public BatchWork
{
LedgerRange const mRange;
TmpDir const& mDownloadDir;
LedgerHeaderHistoryEntry& mLastApplied;
LedgerHeaderHistoryEntry& mLastQueuedToApply;
uint32_t mCheckpointToQueue;
std::shared_ptr<BasicWork> mLastYieldedWork;
bool const mWaitForPublish;
Expand Down
3 changes: 2 additions & 1 deletion src/catchup/ReplayDebugMetaWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ ReplayDebugMetaWork::applyLastLedger()
if (lcl + 1 == debugTxSet.ledgerSeq)
{
mApp.getLedgerManager().closeLedger(
LedgerCloseData::toLedgerCloseData(debugTxSet));
LedgerCloseData::toLedgerCloseData(debugTxSet),
/* externalize */ false);
}
else
{
Expand Down
3 changes: 2 additions & 1 deletion src/herder/Herder.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ class Herder
// restores Herder's state from disk
virtual void start() = 0;

virtual void lastClosedLedgerIncreased(bool latest) = 0;
virtual void lastClosedLedgerIncreased(bool latest,
TxSetXDRFrameConstPtr txSet) = 0;

// Setup Herder's state to fully participate in consensus
virtual void setTrackingSCPState(uint64_t index, StellarValue const& value,
Expand Down
21 changes: 11 additions & 10 deletions src/herder/HerderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0

#include "herder/HerderImpl.h"
#include "bucket/BucketListSnapshot.h"
#include "bucket/BucketManager.h"
#include "bucket/BucketSnapshotManager.h"
#include "crypto/Hex.h"
#include "crypto/KeyUtils.h"
#include "crypto/SHA.h"
Expand All @@ -14,9 +17,6 @@
#include "herder/TxSetFrame.h"
#include "herder/TxSetUtils.h"
#include "ledger/LedgerManager.h"
#include "ledger/LedgerTxn.h"
#include "ledger/LedgerTxnEntry.h"
#include "ledger/LedgerTxnHeader.h"
#include "lib/json/json.h"
#include "main/Application.h"
#include "main/Config.h"
Expand Down Expand Up @@ -249,10 +249,6 @@ HerderImpl::newSlotExternalized(bool synchronous, StellarValue const& value)
// start timing next externalize from this point
mLastExternalize = mApp.getClock().now();

// In order to update the transaction queue we need to get the
// applied transactions.
updateTransactionQueue(mPendingEnvelopes.getTxSet(value.txSetHash));

// perform cleanups
// Evict slots that are outside of our ledger validity bracket
auto minSlotToRemember = getMinLedgerSeqToRemember();
Expand Down Expand Up @@ -359,7 +355,7 @@ HerderImpl::processExternalized(uint64 slotIndex, StellarValue const& value,
writeDebugTxSet(ledgerData);
}

mLedgerManager.valueExternalized(ledgerData);
mLedgerManager.valueExternalized(ledgerData, isLatestSlot);
}

void
Expand Down Expand Up @@ -1136,14 +1132,19 @@ HerderImpl::safelyProcessSCPQueue(bool synchronous)
}

void
HerderImpl::lastClosedLedgerIncreased(bool latest)
HerderImpl::lastClosedLedgerIncreased(bool latest, TxSetXDRFrameConstPtr txSet)
{
releaseAssert(threadIsMain());
maybeSetupSorobanQueue(
mLedgerManager.getLastClosedLedgerHeader().header.ledgerVersion);

// Ensure potential upgrades are handled in overlay
maybeHandleUpgrade();

// In order to update the transaction queue we need to get the
// applied transactions.
updateTransactionQueue(txSet);

if (latest)
{
releaseAssert(isTracking());
Expand Down Expand Up @@ -1531,7 +1532,7 @@ HerderImpl::getUpgradesJson()
void
HerderImpl::forceSCPStateIntoSyncWithLastClosedLedger()
{
auto const& header = mLedgerManager.getLastClosedLedgerHeader().header;
auto header = mLedgerManager.getLastClosedLedgerHeader().header;
setTrackingSCPState(header.ledgerSeq, header.scpValue,
/* isTrackingNetwork */ true);
}
Expand Down
Loading
Loading