Skip to content

Commit

Permalink
Merge pull request #2063 from marta-lokhova/work_subsystem_overhaul
Browse files Browse the repository at this point in the history
Work subsystem overhaul

Reviewed-by: graydon
  • Loading branch information
latobarita authored Apr 26, 2019
2 parents 236f831 + f3c9ef4 commit 1966a24
Show file tree
Hide file tree
Showing 93 changed files with 3,603 additions and 2,531 deletions.
4 changes: 3 additions & 1 deletion src/bucket/test/BucketManagerTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1017,7 +1017,6 @@ TEST_CASE("bucket persistence over app restart",
std::vector<stellar::LedgerKey> emptySet;
std::vector<stellar::LedgerEntry> emptySetEntry;

VirtualClock clock;
Config cfg0(getTestConfig(0, Config::TESTDB_ON_DISK_SQLITE));
for_versions_with_differing_bucket_logic(cfg0, [&](Config const& cfg0) {

Expand Down Expand Up @@ -1047,6 +1046,7 @@ TEST_CASE("bucket persistence over app restart",
// First, run an application through two ledger closes, picking up
// the bucket and ledger closes at each.
{
VirtualClock clock;
Application::pointer app = createTestApplication(clock, cfg0);
app->start();
BucketList& bl = app->getBucketManager().getBucketList();
Expand Down Expand Up @@ -1082,6 +1082,7 @@ TEST_CASE("bucket persistence over app restart",
// Next run a new app with a disjoint config one ledger close, and
// stop it. It should have acquired the same state and ledger.
{
VirtualClock clock;
Application::pointer app = createTestApplication(clock, cfg1);
app->start();
BucketList& bl = app->getBucketManager().getBucketList();
Expand All @@ -1107,6 +1108,7 @@ TEST_CASE("bucket persistence over app restart",
// pick up the bucket list correctly.
cfg1.FORCE_SCP = false;
{
VirtualClock clock;
Application::pointer app = Application::create(clock, cfg1, false);
app->start();
BucketList& bl = app->getBucketManager().getBucketList();
Expand Down
138 changes: 66 additions & 72 deletions src/catchup/ApplyBucketsWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ namespace stellar
{

ApplyBucketsWork::ApplyBucketsWork(
Application& app, WorkParent& parent,
Application& app,
std::map<std::string, std::shared_ptr<Bucket>> const& buckets,
HistoryArchiveState const& applyState, uint32_t maxProtocolVersion)
: Work(app, parent, std::string("apply-buckets"))
: BasicWork(app, "apply-buckets", RETRY_A_FEW)
, mBuckets(buckets)
, mApplyState(applyState)
, mApplying(false)
Expand All @@ -43,11 +43,6 @@ ApplyBucketsWork::ApplyBucketsWork(
{
}

ApplyBucketsWork::~ApplyBucketsWork()
{
clearChildren();
}

BucketLevel&
ApplyBucketsWork::getBucketLevel(uint32_t level)
{
Expand Down Expand Up @@ -76,18 +71,21 @@ ApplyBucketsWork::onReset()
mLastAppliedSizeMb = 0;
mLastPos = 0;

auto addBucket = [this](std::shared_ptr<Bucket const> const& bucket) {
if (bucket->getSize() > 0)
if (!isAborting())
{
auto addBucket = [this](std::shared_ptr<Bucket const> const& bucket) {
if (bucket->getSize() > 0)
{
mTotalBuckets++;
mTotalSize += bucket->getSize();
}
};

for (auto const& hsb : mApplyState.currentBuckets)
{
mTotalBuckets++;
mTotalSize += bucket->getSize();
addBucket(getBucket(hsb.snap));
addBucket(getBucket(hsb.curr));
}
};

for (auto const& hsb : mApplyState.currentBuckets)
{
addBucket(getBucket(hsb.snap));
addBucket(getBucket(hsb.curr));
}

mLevel = BucketList::kNumLevels - 1;
Expand All @@ -99,8 +97,11 @@ ApplyBucketsWork::onReset()
}

void
ApplyBucketsWork::onStart()
ApplyBucketsWork::startLevel()
{
assert(isLevelComplete());

CLOG(DEBUG, "History") << "ApplyBuckets : starting level " << mLevel;
auto& level = getBucketLevel(mLevel);
HistoryStateBucket const& i = mApplyState.currentBuckets.at(mLevel);

Expand Down Expand Up @@ -139,9 +140,15 @@ ApplyBucketsWork::onStart()
}
}

void
BasicWork::State
ApplyBucketsWork::onRun()
{
// Check if we're at the beginning of the new level
if (isLevelComplete())
{
startLevel();
}

// The structure of these if statements is motivated by the following:
// 1. mCurrApplicator should never be advanced if mSnapApplicator is
// not false. Otherwise it is possible for curr to modify the
Expand All @@ -150,24 +157,50 @@ ApplyBucketsWork::onRun()
// if there is nothing to be applied.
if (mSnapApplicator)
{
advance("snap", *mSnapApplicator);
if (*mSnapApplicator)
{
advance("snap", *mSnapApplicator);
return State::WORK_RUNNING;
}
mApp.getInvariantManager().checkOnBucketApply(
mSnapBucket, mApplyState.currentLedger, mLevel, false);
mSnapApplicator.reset();
mSnapBucket.reset();
mBucketApplySuccess.Mark();
}
else if (mCurrApplicator)
if (mCurrApplicator)
{
advance("curr", *mCurrApplicator);
if (*mCurrApplicator)
{
advance("curr", *mCurrApplicator);
return State::WORK_RUNNING;
}
mApp.getInvariantManager().checkOnBucketApply(
mCurrBucket, mApplyState.currentLedger, mLevel, true);
mCurrApplicator.reset();
mCurrBucket.reset();
mBucketApplySuccess.Mark();
}

mApp.getCatchupManager().logAndUpdateCatchupStatus(true);
if (mLevel != 0)
{
--mLevel;
CLOG(DEBUG, "History")
<< "ApplyBuckets : starting next level: " << mLevel;
return State::WORK_RUNNING;
}
scheduleSuccess();

CLOG(DEBUG, "History") << "ApplyBuckets : done, restarting merges";
mApp.getBucketManager().assumeState(mApplyState, mMaxProtocolVersion);
return State::WORK_SUCCESS;
}

void
ApplyBucketsWork::advance(std::string const& bucketName,
BucketApplicator& applicator)
{
if (!applicator)
{
return;
}

assert(applicator);
assert(mTotalSize != 0);
auto sz = applicator.advance(mCounters);
mAppliedEntries += sz;
Expand Down Expand Up @@ -206,60 +239,21 @@ ApplyBucketsWork::advance(std::string const& bucketName,
}
}

Work::State
ApplyBucketsWork::onSuccess()
bool
ApplyBucketsWork::isLevelComplete()
{
mApp.getCatchupManager().logAndUpdateCatchupStatus(true);

if (mSnapApplicator)
{
if (*mSnapApplicator)
{
return WORK_RUNNING;
}
mApp.getInvariantManager().checkOnBucketApply(
mSnapBucket, mApplyState.currentLedger, mLevel, false);
mSnapApplicator.reset();
mSnapBucket.reset();
mBucketApplySuccess.Mark();
}
if (mCurrApplicator)
{
if (*mCurrApplicator)
{
return WORK_RUNNING;
}
mApp.getInvariantManager().checkOnBucketApply(
mCurrBucket, mApplyState.currentLedger, mLevel, true);
mCurrApplicator.reset();
mCurrBucket.reset();
mBucketApplySuccess.Mark();
}

if (mLevel != 0)
{
--mLevel;
CLOG(DEBUG, "History")
<< "ApplyBuckets : starting next level: " << mLevel;
return WORK_PENDING;
}

CLOG(DEBUG, "History") << "ApplyBuckets : done, restarting merges";
mApp.getBucketManager().assumeState(mApplyState, mMaxProtocolVersion);
return WORK_SUCCESS;
return !(mApplying) || !(mSnapApplicator || mCurrApplicator);
}

void
ApplyBucketsWork::onFailureRetry()
ApplyBucketsWork::onFailureRaise()
{
mBucketApplyFailure.Mark();
Work::onFailureRetry();
}

void
ApplyBucketsWork::onFailureRaise()
ApplyBucketsWork::onFailureRetry()
{
mBucketApplyFailure.Mark();
Work::onFailureRaise();
}
}
24 changes: 15 additions & 9 deletions src/catchup/ApplyBucketsWork.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ class Bucket;
struct HistoryArchiveState;
struct LedgerHeaderHistoryEntry;

class ApplyBucketsWork : public Work
class ApplyBucketsWork : public BasicWork
{
std::map<std::string, std::shared_ptr<Bucket>> const& mBuckets;
const HistoryArchiveState& mApplyState;
HistoryArchiveState const& mApplyState;

bool mApplying;
size_t mTotalBuckets;
Expand All @@ -46,22 +46,28 @@ class ApplyBucketsWork : public Work
medida::Meter& mBucketApplyFailure;
BucketApplicator::Counters mCounters;

void advance(std::string const& name, BucketApplicator& applicator);
std::shared_ptr<Bucket const> getBucket(std::string const& bucketHash);
BucketLevel& getBucketLevel(uint32_t level);
void advance(std::string const& name, BucketApplicator& applicator);
void startLevel();
bool isLevelComplete();

public:
ApplyBucketsWork(
Application& app, WorkParent& parent,
Application& app,
std::map<std::string, std::shared_ptr<Bucket>> const& buckets,
HistoryArchiveState const& applyState, uint32_t maxProtocolVersion);
~ApplyBucketsWork();
~ApplyBucketsWork() = default;

protected:
void onReset() override;
void onStart() override;
void onRun() override;
Work::State onSuccess() override;
void onFailureRetry() override;
BasicWork::State onRun() override;
bool
onAbort() override
{
return true;
};
void onFailureRaise() override;
void onFailureRetry() override;
};
}
Loading

0 comments on commit 1966a24

Please sign in to comment.