Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Work subsystem overhaul #2063

Merged
merged 17 commits into from
Apr 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
f87f699
Move BatchWork to work subfolder
marta-lokhova Apr 22, 2019
8486074
Introduce work building blocks
marta-lokhova Apr 22, 2019
4583396
Refactor RunCommandWork and its implementers to comply with the new …
marta-lokhova Apr 22, 2019
59f2906
Refactor get history archive state work to comply with the new
marta-lokhova Apr 22, 2019
46cad63
Refactor catchup-related works to comply with the new interface
marta-lokhova Apr 22, 2019
92b382e
Remove BucketDownloadWork as catchup does not use it anymore
marta-lokhova Apr 22, 2019
375ac49
Implement RepairMissingBuckets work
marta-lokhova Apr 22, 2019
0e6c02d
Refactor publish-related works and update orchestration in
marta-lokhova Apr 22, 2019
2ca8c89
Tweak ProcessManager to allow Work to shutdown processes during abort
marta-lokhova Apr 22, 2019
f5c4b78
Refactor FetchRecentQset work to comply with the new interface
marta-lokhova Apr 22, 2019
2d52059
Update Application to employ WorkScheduler, and wait for its proper s…
marta-lokhova Apr 22, 2019
7c4e3a1
Refactor work test suite to consider possible failure modes
marta-lokhova Apr 22, 2019
38276e9
Update remaining tests to utilize WorkScheduler
marta-lokhova Apr 22, 2019
8d2387e
Remove obsolete include
marta-lokhova Apr 22, 2019
dfc4fcd
Update WorkScheduler comment and return log
marta-lokhova Apr 22, 2019
e148b0e
bugfix: use children status directly when publishing snapshot
marta-lokhova Apr 26, 2019
f3c9ef4
Disallow parent work to succeed if a child failed
marta-lokhova Apr 26, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/bucket/test/BucketManagerTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1017,7 +1017,6 @@ TEST_CASE("bucket persistence over app restart",
std::vector<stellar::LedgerKey> emptySet;
std::vector<stellar::LedgerEntry> emptySetEntry;

VirtualClock clock;
Config cfg0(getTestConfig(0, Config::TESTDB_ON_DISK_SQLITE));
for_versions_with_differing_bucket_logic(cfg0, [&](Config const& cfg0) {

Expand Down Expand Up @@ -1047,6 +1046,7 @@ TEST_CASE("bucket persistence over app restart",
// First, run an application through two ledger closes, picking up
// the bucket and ledger closes at each.
{
VirtualClock clock;
Application::pointer app = createTestApplication(clock, cfg0);
app->start();
BucketList& bl = app->getBucketManager().getBucketList();
Expand Down Expand Up @@ -1082,6 +1082,7 @@ TEST_CASE("bucket persistence over app restart",
// Next run a new app with a disjoint config one ledger close, and
// stop it. It should have acquired the same state and ledger.
{
VirtualClock clock;
Application::pointer app = createTestApplication(clock, cfg1);
app->start();
BucketList& bl = app->getBucketManager().getBucketList();
Expand All @@ -1107,6 +1108,7 @@ TEST_CASE("bucket persistence over app restart",
// pick up the bucket list correctly.
cfg1.FORCE_SCP = false;
{
VirtualClock clock;
Application::pointer app = Application::create(clock, cfg1, false);
app->start();
BucketList& bl = app->getBucketManager().getBucketList();
Expand Down
138 changes: 66 additions & 72 deletions src/catchup/ApplyBucketsWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ namespace stellar
{

ApplyBucketsWork::ApplyBucketsWork(
Application& app, WorkParent& parent,
Application& app,
std::map<std::string, std::shared_ptr<Bucket>> const& buckets,
HistoryArchiveState const& applyState, uint32_t maxProtocolVersion)
: Work(app, parent, std::string("apply-buckets"))
: BasicWork(app, "apply-buckets", RETRY_A_FEW)
, mBuckets(buckets)
, mApplyState(applyState)
, mApplying(false)
Expand All @@ -43,11 +43,6 @@ ApplyBucketsWork::ApplyBucketsWork(
{
}

ApplyBucketsWork::~ApplyBucketsWork()
{
clearChildren();
}

BucketLevel&
ApplyBucketsWork::getBucketLevel(uint32_t level)
{
Expand Down Expand Up @@ -76,18 +71,21 @@ ApplyBucketsWork::onReset()
mLastAppliedSizeMb = 0;
mLastPos = 0;

auto addBucket = [this](std::shared_ptr<Bucket const> const& bucket) {
if (bucket->getSize() > 0)
if (!isAborting())
{
auto addBucket = [this](std::shared_ptr<Bucket const> const& bucket) {
if (bucket->getSize() > 0)
{
mTotalBuckets++;
mTotalSize += bucket->getSize();
}
};

for (auto const& hsb : mApplyState.currentBuckets)
{
mTotalBuckets++;
mTotalSize += bucket->getSize();
addBucket(getBucket(hsb.snap));
addBucket(getBucket(hsb.curr));
}
};

for (auto const& hsb : mApplyState.currentBuckets)
{
addBucket(getBucket(hsb.snap));
addBucket(getBucket(hsb.curr));
}

mLevel = BucketList::kNumLevels - 1;
Expand All @@ -99,8 +97,11 @@ ApplyBucketsWork::onReset()
}

void
ApplyBucketsWork::onStart()
ApplyBucketsWork::startLevel()
{
assert(isLevelComplete());

CLOG(DEBUG, "History") << "ApplyBuckets : starting level " << mLevel;
auto& level = getBucketLevel(mLevel);
HistoryStateBucket const& i = mApplyState.currentBuckets.at(mLevel);

Expand Down Expand Up @@ -139,9 +140,15 @@ ApplyBucketsWork::onStart()
}
}

void
BasicWork::State
ApplyBucketsWork::onRun()
{
// Check if we're at the beginning of the new level
if (isLevelComplete())
{
startLevel();
}

// The structure of these if statements is motivated by the following:
// 1. mCurrApplicator should never be advanced if mSnapApplicator is
// not false. Otherwise it is possible for curr to modify the
Expand All @@ -150,24 +157,50 @@ ApplyBucketsWork::onRun()
// if there is nothing to be applied.
if (mSnapApplicator)
{
advance("snap", *mSnapApplicator);
if (*mSnapApplicator)
{
advance("snap", *mSnapApplicator);
return State::WORK_RUNNING;
}
mApp.getInvariantManager().checkOnBucketApply(
mSnapBucket, mApplyState.currentLedger, mLevel, false);
mSnapApplicator.reset();
mSnapBucket.reset();
mBucketApplySuccess.Mark();
}
else if (mCurrApplicator)
if (mCurrApplicator)
{
advance("curr", *mCurrApplicator);
if (*mCurrApplicator)
{
advance("curr", *mCurrApplicator);
return State::WORK_RUNNING;
}
mApp.getInvariantManager().checkOnBucketApply(
mCurrBucket, mApplyState.currentLedger, mLevel, true);
mCurrApplicator.reset();
mCurrBucket.reset();
mBucketApplySuccess.Mark();
}

mApp.getCatchupManager().logAndUpdateCatchupStatus(true);
if (mLevel != 0)
{
--mLevel;
CLOG(DEBUG, "History")
<< "ApplyBuckets : starting next level: " << mLevel;
return State::WORK_RUNNING;
}
scheduleSuccess();

CLOG(DEBUG, "History") << "ApplyBuckets : done, restarting merges";
mApp.getBucketManager().assumeState(mApplyState, mMaxProtocolVersion);
return State::WORK_SUCCESS;
}

void
ApplyBucketsWork::advance(std::string const& bucketName,
BucketApplicator& applicator)
{
if (!applicator)
{
return;
}

assert(applicator);
assert(mTotalSize != 0);
auto sz = applicator.advance(mCounters);
mAppliedEntries += sz;
Expand Down Expand Up @@ -206,60 +239,21 @@ ApplyBucketsWork::advance(std::string const& bucketName,
}
}

Work::State
ApplyBucketsWork::onSuccess()
bool
ApplyBucketsWork::isLevelComplete()
{
mApp.getCatchupManager().logAndUpdateCatchupStatus(true);

if (mSnapApplicator)
{
if (*mSnapApplicator)
{
return WORK_RUNNING;
}
mApp.getInvariantManager().checkOnBucketApply(
mSnapBucket, mApplyState.currentLedger, mLevel, false);
mSnapApplicator.reset();
mSnapBucket.reset();
mBucketApplySuccess.Mark();
}
if (mCurrApplicator)
{
if (*mCurrApplicator)
{
return WORK_RUNNING;
}
mApp.getInvariantManager().checkOnBucketApply(
mCurrBucket, mApplyState.currentLedger, mLevel, true);
mCurrApplicator.reset();
mCurrBucket.reset();
mBucketApplySuccess.Mark();
}

if (mLevel != 0)
{
--mLevel;
CLOG(DEBUG, "History")
<< "ApplyBuckets : starting next level: " << mLevel;
return WORK_PENDING;
}

CLOG(DEBUG, "History") << "ApplyBuckets : done, restarting merges";
mApp.getBucketManager().assumeState(mApplyState, mMaxProtocolVersion);
return WORK_SUCCESS;
return !(mApplying) || !(mSnapApplicator || mCurrApplicator);
}

void
ApplyBucketsWork::onFailureRetry()
ApplyBucketsWork::onFailureRaise()
{
mBucketApplyFailure.Mark();
Work::onFailureRetry();
}

void
ApplyBucketsWork::onFailureRaise()
ApplyBucketsWork::onFailureRetry()
{
mBucketApplyFailure.Mark();
Work::onFailureRaise();
}
}
24 changes: 15 additions & 9 deletions src/catchup/ApplyBucketsWork.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ class Bucket;
struct HistoryArchiveState;
struct LedgerHeaderHistoryEntry;

class ApplyBucketsWork : public Work
class ApplyBucketsWork : public BasicWork
{
std::map<std::string, std::shared_ptr<Bucket>> const& mBuckets;
const HistoryArchiveState& mApplyState;
HistoryArchiveState const& mApplyState;

bool mApplying;
size_t mTotalBuckets;
Expand All @@ -46,22 +46,28 @@ class ApplyBucketsWork : public Work
medida::Meter& mBucketApplyFailure;
BucketApplicator::Counters mCounters;

void advance(std::string const& name, BucketApplicator& applicator);
std::shared_ptr<Bucket const> getBucket(std::string const& bucketHash);
BucketLevel& getBucketLevel(uint32_t level);
void advance(std::string const& name, BucketApplicator& applicator);
void startLevel();
bool isLevelComplete();

public:
ApplyBucketsWork(
Application& app, WorkParent& parent,
Application& app,
std::map<std::string, std::shared_ptr<Bucket>> const& buckets,
HistoryArchiveState const& applyState, uint32_t maxProtocolVersion);
~ApplyBucketsWork();
~ApplyBucketsWork() = default;

protected:
void onReset() override;
void onStart() override;
void onRun() override;
Work::State onSuccess() override;
void onFailureRetry() override;
BasicWork::State onRun() override;
bool
onAbort() override
{
return true;
};
void onFailureRaise() override;
void onFailureRetry() override;
};
}
Loading