diff --git a/src/catchup/DownloadBucketsWork.cpp b/src/catchup/DownloadBucketsWork.cpp index e88bb6285a..43918f8cf4 100644 --- a/src/catchup/DownloadBucketsWork.cpp +++ b/src/catchup/DownloadBucketsWork.cpp @@ -12,82 +12,55 @@ namespace stellar { - DownloadBucketsWork::DownloadBucketsWork( - Application& app, WorkParent& parent, + Application& app, std::map>& buckets, std::vector hashes, TmpDir const& downloadDir) - : Work{app, parent, "download-and-verify-buckets"} + : Work{app, "download-and-verify-buckets"} , mBuckets{buckets} , mHashes{std::move(hashes)} , mDownloadDir{downloadDir} - , mDownloadBucketStart{app.getMetrics().NewMeter( - {"history", "download-bucket", "start"}, "event")} - , mDownloadBucketSuccess{app.getMetrics().NewMeter( - {"history", "download-bucket", "success"}, "event")} - , mDownloadBucketFailure{app.getMetrics().NewMeter( - {"history", "download-bucket", "failure"}, "event")} { } -DownloadBucketsWork::~DownloadBucketsWork() +BasicWork::State +DownloadBucketsWork::doWork() { - clearChildren(); -} - -std::string -DownloadBucketsWork::getStatus() const -{ - if (mState == WORK_RUNNING || mState == WORK_PENDING) + if (!mChildrenStarted) { - return "downloading buckets"; - } - return Work::getStatus(); -} - -void -DownloadBucketsWork::onReset() -{ - clearChildren(); + for (auto const& hash : mHashes) + { + FileTransferInfo ft(mDownloadDir, HISTORY_FILE_TYPE_BUCKET, hash); - for (auto const& hash : mHashes) + auto w1 = std::make_shared(mApp, ft); + auto w2 = std::make_shared( + mApp, mBuckets, ft.localPath_nogz(), hexToBin256(hash)); + std::vector> seq{w1, w2}; + addWork("download-verify-sequence-" + hash, seq); + } + mChildrenStarted = true; + } + else { - FileTransferInfo ft(mDownloadDir, HISTORY_FILE_TYPE_BUCKET, hash); - // Each bucket gets its own work-chain of - // download->gunzip->verify - - auto verify = addWork(mBuckets, ft.localPath_nogz(), - hexToBin256(hash)); - verify->addWork(ft); - mDownloadBucketStart.Mark(); + if (allChildrenSuccessful()) + { + return WORK_SUCCESS; + } + else if (anyChildRaiseFailure()) + { + return WORK_FAILURE_RETRY; + } + else if (!anyChildRunning()) + { + return WORK_WAITING; + } } + return WORK_RUNNING; } void -DownloadBucketsWork::notify(std::string const& child) +DownloadBucketsWork::doReset() { - auto i = mChildren.find(child); - if (i == mChildren.end()) - { - CLOG(WARNING, "Work") - << "DownloadBucketsWork notified by unknown child " << child; - return; - } - - switch (i->second->getState()) - { - case Work::WORK_SUCCESS: - mDownloadBucketSuccess.Mark(); - break; - case Work::WORK_FAILURE_RETRY: - case Work::WORK_FAILURE_FATAL: - case Work::WORK_FAILURE_RAISE: - mDownloadBucketFailure.Mark(); - break; - default: - break; - } - - advance(); + mChildrenStarted = false; } } diff --git a/src/catchup/DownloadBucketsWork.h b/src/catchup/DownloadBucketsWork.h index 0d4ff2c9ba..3d2aace6f0 100644 --- a/src/catchup/DownloadBucketsWork.h +++ b/src/catchup/DownloadBucketsWork.h @@ -22,19 +22,17 @@ class DownloadBucketsWork : public Work std::map> mBuckets; std::vector mHashes; TmpDir const& mDownloadDir; - - medida::Meter& mDownloadBucketStart; - medida::Meter& mDownloadBucketSuccess; - medida::Meter& mDownloadBucketFailure; + bool mChildrenStarted{false}; public: - DownloadBucketsWork(Application& app, WorkParent& parent, + DownloadBucketsWork(Application& app, std::map>& buckets, std::vector hashes, TmpDir const& downloadDir); - ~DownloadBucketsWork(); - std::string getStatus() const override; - void onReset() override; - void notify(std::string const& child) override; + ~DownloadBucketsWork() = default; + + protected: + BasicWork::State doWork() override; + void doReset() override; }; } diff --git a/src/history/HistoryTests.cpp b/src/history/HistoryTests.cpp index 2e24780da6..af074859de 100644 --- a/src/history/HistoryTests.cpp +++ b/src/history/HistoryTests.cpp @@ -19,7 +19,11 @@ #include "test/test.h" #include "util/Fs.h" #include "work/WorkManager.h" +#include "work/WorkScheduler.h" +#include "catchup/DownloadBucketsWork.h" +#include "historywork/GetHistoryArchiveStateWork.h" +#include "historywork/VerifyBucketWork.h" #include #include diff --git a/src/historywork/GetAndUnzipRemoteFileWork.cpp b/src/historywork/GetAndUnzipRemoteFileWork.cpp index e90cba1303..15f708a3c6 100644 --- a/src/historywork/GetAndUnzipRemoteFileWork.cpp +++ b/src/historywork/GetAndUnzipRemoteFileWork.cpp @@ -10,11 +10,10 @@ namespace stellar { - GetAndUnzipRemoteFileWork::GetAndUnzipRemoteFileWork( - Application& app, WorkParent& parent, FileTransferInfo ft, + Application& app, std::function callback, FileTransferInfo ft, std::shared_ptr archive, size_t maxRetries) - : Work(app, parent, + : Work(app, callback, std::string("get-and-unzip-remote-file ") + ft.remoteName(), maxRetries) , mFt(std::move(ft)) @@ -22,15 +21,10 @@ GetAndUnzipRemoteFileWork::GetAndUnzipRemoteFileWork( { } -GetAndUnzipRemoteFileWork::~GetAndUnzipRemoteFileWork() -{ - clearChildren(); -} - std::string GetAndUnzipRemoteFileWork::getStatus() const { - if (mState == WORK_PENDING) + if (!isDone()) { if (mGunzipFileWork) { @@ -41,86 +35,133 @@ GetAndUnzipRemoteFileWork::getStatus() const return mGetRemoteFileWork->getStatus(); } } - return Work::getStatus(); + return BasicWork::getStatus(); } void -GetAndUnzipRemoteFileWork::onReset() +GetAndUnzipRemoteFileWork::doReset() { - clearChildren(); mGetRemoteFileWork.reset(); mGunzipFileWork.reset(); +} - CLOG(DEBUG, "History") << "Downloading and unzipping " << mFt.remoteName() - << ": downloading"; - mGetRemoteFileWork = addWork( - mFt.remoteName(), mFt.localPath_gz_tmp(), nullptr, RETRY_NEVER); +void +GetAndUnzipRemoteFileWork::onFailureRaise() +{ + std::remove(mFt.localPath_nogz().c_str()); + std::remove(mFt.localPath_gz().c_str()); + std::remove(mFt.localPath_gz_tmp().c_str()); + Work::onFailureRaise(); } -Work::State -GetAndUnzipRemoteFileWork::onSuccess() +void +GetAndUnzipRemoteFileWork::onFailureRetry() { + std::remove(mFt.localPath_nogz().c_str()); + std::remove(mFt.localPath_gz().c_str()); + std::remove(mFt.localPath_gz_tmp().c_str()); + Work::onFailureRetry(); +} + +BasicWork::State +GetAndUnzipRemoteFileWork::doWork() +{ + CLOG(DEBUG, "History") << "Downloading and unzipping " << mFt.remoteName(); + if (mGunzipFileWork) { - if (!fs::exists(mFt.localPath_nogz())) + // Download completed, unzipping started + + auto state = mGunzipFileWork->getState(); + if (state == WORK_SUCCESS) { - CLOG(ERROR, "History") << "Downloading and unzipping " - << mFt.remoteName() << ": .xdr not found"; - return WORK_FAILURE_RETRY; + assert(mGetRemoteFileWork); + assert(mGetRemoteFileWork->getState() == WORK_SUCCESS); + if (!fs::exists(mFt.localPath_nogz())) + { + CLOG(ERROR, "History") + << "Downloading and unzipping " << mFt.remoteName() + << ": .xdr not found"; + return WORK_FAILURE_RETRY; + } + return WORK_SUCCESS; } - else + else if (state == WORK_FAILURE_RAISE) { - return WORK_SUCCESS; + return WORK_FAILURE_RETRY; } + return state; } - - if (fs::exists(mFt.localPath_gz_tmp())) + else if (mGetRemoteFileWork) { - CLOG(TRACE, "History") - << "Downloading and unzipping " << mFt.remoteName() - << ": renaming .gz.tmp to .gz"; - if (fs::exists(mFt.localPath_gz()) && - std::remove(mFt.localPath_gz().c_str())) + // Download started + + auto state = mGetRemoteFileWork->getState(); + if (state == WORK_SUCCESS) { - CLOG(ERROR, "History") - << "Downloading and unzipping " << mFt.remoteName() - << ": failed to remove .gz"; - return WORK_FAILURE_RETRY; - } + if (!validateFile()) + { + return WORK_FAILURE_RETRY; + } - if (std::rename(mFt.localPath_gz_tmp().c_str(), - mFt.localPath_gz().c_str())) + mGunzipFileWork = + addWork(mFt.localPath_gz(), false, RETRY_NEVER); + return WORK_RUNNING; + } + else if (state == WORK_FAILURE_RAISE) { - CLOG(ERROR, "History") - << "Downloading and unzipping " << mFt.remoteName() - << ": failed to rename .gz.tmp to .gz"; return WORK_FAILURE_RETRY; } + return state; + } + else + { + // Download hasn't started + + mGetRemoteFileWork = addWork( + mFt.remoteName(), mFt.localPath_gz_tmp(), nullptr, RETRY_NEVER); + return WORK_RUNNING; + } +} - CLOG(TRACE, "History") +bool +GetAndUnzipRemoteFileWork::validateFile() +{ + if (!fs::exists(mFt.localPath_gz_tmp())) + { + CLOG(ERROR, "History") << "Downloading and unzipping " + << mFt.remoteName() << ": .tmp file not found"; + return false; + } + + CLOG(TRACE, "History") << "Downloading and unzipping " << mFt.remoteName() + << ": renaming .gz.tmp to .gz"; + if (fs::exists(mFt.localPath_gz()) && + std::remove(mFt.localPath_gz().c_str())) + { + CLOG(ERROR, "History") << "Downloading and unzipping " + << mFt.remoteName() << ": failed to remove .gz"; + return false; + } + + if (std::rename(mFt.localPath_gz_tmp().c_str(), mFt.localPath_gz().c_str())) + { + CLOG(ERROR, "History") << "Downloading and unzipping " << mFt.remoteName() - << ": renamed .gz.tmp to .gz"; + << ": failed to rename .gz.tmp to .gz"; + return false; } + CLOG(TRACE, "History") << "Downloading and unzipping " << mFt.remoteName() + << ": renamed .gz.tmp to .gz"; + if (!fs::exists(mFt.localPath_gz())) { CLOG(ERROR, "History") << "Downloading and unzipping " << mFt.remoteName() << ": .gz not found"; - return WORK_FAILURE_RETRY; + return false; } - CLOG(DEBUG, "History") << "Downloading and unzipping " << mFt.remoteName() - << ": unzipping"; - mGunzipFileWork = - addWork(mFt.localPath_gz(), false, RETRY_NEVER); - return WORK_PENDING; -} - -void -GetAndUnzipRemoteFileWork::onFailureRaise() -{ - std::remove(mFt.localPath_nogz().c_str()); - std::remove(mFt.localPath_gz().c_str()); - std::remove(mFt.localPath_gz_tmp().c_str()); + return true; } } diff --git a/src/historywork/GetAndUnzipRemoteFileWork.h b/src/historywork/GetAndUnzipRemoteFileWork.h index 6d6fcf4e22..2393d88627 100644 --- a/src/historywork/GetAndUnzipRemoteFileWork.h +++ b/src/historywork/GetAndUnzipRemoteFileWork.h @@ -4,9 +4,8 @@ #pragma once -#include "work/Work.h" - #include "history/FileTransferInfo.h" +#include "work/Work.h" namespace stellar { @@ -15,24 +14,29 @@ class HistoryArchive; class GetAndUnzipRemoteFileWork : public Work { - std::shared_ptr mGetRemoteFileWork; - std::shared_ptr mGunzipFileWork; + std::shared_ptr mGetRemoteFileWork; + std::shared_ptr mGunzipFileWork; FileTransferInfo mFt; std::shared_ptr mArchive; + bool validateFile(); + public: // Passing `nullptr` for the archive argument will cause the work to // select a new readable history archive at random each time it runs / // retries. - GetAndUnzipRemoteFileWork(Application& app, WorkParent& parent, + GetAndUnzipRemoteFileWork(Application& app, std::function callback, FileTransferInfo ft, std::shared_ptr archive = nullptr, - size_t maxRetries = Work::RETRY_A_LOT); - ~GetAndUnzipRemoteFileWork(); + size_t maxRetries = BasicWork::RETRY_A_LOT); + ~GetAndUnzipRemoteFileWork() = default; std::string getStatus() const override; - void onReset() override; - Work::State onSuccess() override; + + protected: + void doReset() override; void onFailureRaise() override; + void onFailureRetry() override; + State doWork() override; }; } diff --git a/src/historywork/GetHistoryArchiveStateWork.cpp b/src/historywork/GetHistoryArchiveStateWork.cpp index 63598eb94c..6f65a9f238 100644 --- a/src/historywork/GetHistoryArchiveStateWork.cpp +++ b/src/historywork/GetHistoryArchiveStateWork.cpp @@ -14,12 +14,11 @@ namespace stellar { - GetHistoryArchiveStateWork::GetHistoryArchiveStateWork( - Application& app, WorkParent& parent, std::string uniqueName, + Application& app, std::function callback, HistoryArchiveState& state, uint32_t seq, std::shared_ptr archive, size_t maxRetries) - : Work(app, parent, std::move(uniqueName), maxRetries) + : Work(app, callback, "get-archive-state", maxRetries) , mState(state) , mSeq(seq) , mArchive(archive) @@ -27,77 +26,45 @@ GetHistoryArchiveStateWork::GetHistoryArchiveStateWork( archive ? HistoryArchiveState::localName(app, archive->getName()) : app.getHistoryManager().localFilename( HistoryArchiveState::baseName())) - , mGetHistoryArchiveStateStart(app.getMetrics().NewMeter( - {"history", "download-history-archive-state", "start"}, "event")) - , mGetHistoryArchiveStateSuccess(app.getMetrics().NewMeter( - {"history", "download-history-archive-state", "success"}, "event")) - , mGetHistoryArchiveStateFailure(app.getMetrics().NewMeter( - {"history", "download-history-archive-state", "failure"}, "event")) -{ -} - -GetHistoryArchiveStateWork::~GetHistoryArchiveStateWork() -{ - clearChildren(); -} - -std::string -GetHistoryArchiveStateWork::getStatus() const -{ - if (getState() == WORK_FAILURE_RETRY) - { - auto eta = getRetryETA(); - return fmt::format("Awaiting checkpoint (ETA: {:d} seconds)", eta); - } - return Work::getStatus(); -} - -void -GetHistoryArchiveStateWork::onReset() { - clearChildren(); - std::remove(mLocalFilename.c_str()); - addWork(mSeq == 0 - ? HistoryArchiveState::wellKnownRemoteName() - : HistoryArchiveState::remoteName(mSeq), - mLocalFilename, mArchive, getMaxRetries()); - - mGetHistoryArchiveStateStart.Mark(); } -void -GetHistoryArchiveStateWork::onRun() +BasicWork::State +GetHistoryArchiveStateWork::doWork() { - try + if (mGetRemoteFile) { - mState.load(mLocalFilename); - scheduleSuccess(); + auto state = mGetRemoteFile->getState(); + if (state == WORK_SUCCESS) + { + try + { + mState.load(mLocalFilename); + return WORK_SUCCESS; + } + catch (std::runtime_error& e) + { + CLOG(ERROR, "History") + << "error loading history state: " << e.what(); + return WORK_FAILURE_RETRY; + } + } + return state; } - catch (std::runtime_error& e) + else { - CLOG(ERROR, "History") << "error loading history state: " << e.what(); - scheduleFailure(); + auto name = mSeq == 0 ? HistoryArchiveState::wellKnownRemoteName() + : HistoryArchiveState::remoteName(mSeq); + mGetRemoteFile = addWork(name, mLocalFilename, + mArchive, getMaxRetries()); + return WORK_RUNNING; } } -Work::State -GetHistoryArchiveStateWork::onSuccess() -{ - mGetHistoryArchiveStateSuccess.Mark(); - return Work::onSuccess(); -} - -void -GetHistoryArchiveStateWork::onFailureRetry() -{ - mGetHistoryArchiveStateFailure.Mark(); - Work::onFailureRetry(); -} - void -GetHistoryArchiveStateWork::onFailureRaise() +GetHistoryArchiveStateWork::doReset() { - mGetHistoryArchiveStateFailure.Mark(); - Work::onFailureRaise(); + mGetRemoteFile.reset(); + std::remove(mLocalFilename.c_str()); } } diff --git a/src/historywork/GetHistoryArchiveStateWork.h b/src/historywork/GetHistoryArchiveStateWork.h index 046cd2cd93..765f09fb30 100644 --- a/src/historywork/GetHistoryArchiveStateWork.h +++ b/src/historywork/GetHistoryArchiveStateWork.h @@ -19,28 +19,22 @@ struct HistoryArchiveState; class GetHistoryArchiveStateWork : public Work { + std::shared_ptr mGetRemoteFile; + HistoryArchiveState& mState; uint32_t mSeq; std::shared_ptr mArchive; std::string mLocalFilename; - medida::Meter& mGetHistoryArchiveStateStart; - medida::Meter& mGetHistoryArchiveStateSuccess; - medida::Meter& mGetHistoryArchiveStateFailure; - public: - GetHistoryArchiveStateWork( - Application& app, WorkParent& parent, std::string uniqueName, - HistoryArchiveState& state, uint32_t seq = 0, - std::shared_ptr archive = nullptr, - size_t maxRetries = Work::RETRY_A_FEW); - ~GetHistoryArchiveStateWork(); - std::string getStatus() const override; - void onReset() override; - void onRun() override; - - State onSuccess() override; - void onFailureRetry() override; - void onFailureRaise() override; + GetHistoryArchiveStateWork(Application& app, std::function callback, + HistoryArchiveState& state, uint32_t seq, + std::shared_ptr archive, + size_t maxRetries = BasicWork::RETRY_A_FEW); + ~GetHistoryArchiveStateWork() = default; + + protected: + BasicWork::State doWork() override; + void doReset() override; }; } diff --git a/src/historywork/GetRemoteFileWork.cpp b/src/historywork/GetRemoteFileWork.cpp index 9f95f5f5b4..19c5a63789 100644 --- a/src/historywork/GetRemoteFileWork.cpp +++ b/src/historywork/GetRemoteFileWork.cpp @@ -10,13 +10,13 @@ namespace stellar { - -GetRemoteFileWork::GetRemoteFileWork(Application& app, WorkParent& parent, +GetRemoteFileWork::GetRemoteFileWork(Application& app, + std::function callback, std::string const& remote, std::string const& local, std::shared_ptr archive, size_t maxRetries) - : RunCommandWork(app, parent, std::string("get-remote-file ") + remote, + : RunCommandWork(app, callback, std::string("get-remote-file ") + remote, maxRetries) , mRemote(remote) , mLocal(local) @@ -24,13 +24,8 @@ GetRemoteFileWork::GetRemoteFileWork(Application& app, WorkParent& parent, { } -GetRemoteFileWork::~GetRemoteFileWork() -{ - clearChildren(); -} - -void -GetRemoteFileWork::getCommand(std::string& cmdLine, std::string& outFile) +RunCommandInfo +GetRemoteFileWork::getCommand() { mCurrentArchive = mArchive; if (!mCurrentArchive) @@ -40,21 +35,25 @@ GetRemoteFileWork::getCommand(std::string& cmdLine, std::string& outFile) } assert(mCurrentArchive); assert(mCurrentArchive->hasGetCmd()); - cmdLine = mCurrentArchive->getFileCmd(mRemote, mLocal); + auto cmdLine = mCurrentArchive->getFileCmd(mRemote, mLocal); + + return RunCommandInfo(cmdLine, std::string()); } void -GetRemoteFileWork::onReset() +GetRemoteFileWork::onFailureRetry() { + assert(mCurrentArchive); std::remove(mLocal.c_str()); + RunCommandWork::onFailureRetry(); } -Work::State +void GetRemoteFileWork::onSuccess() { assert(mCurrentArchive); mCurrentArchive->markSuccess(); - return RunCommandWork::onSuccess(); + RunCommandWork::onSuccess(); } void @@ -62,6 +61,7 @@ GetRemoteFileWork::onFailureRaise() { assert(mCurrentArchive); mCurrentArchive->markFailure(); + std::remove(mLocal.c_str()); RunCommandWork::onFailureRaise(); } } diff --git a/src/historywork/GetRemoteFileWork.h b/src/historywork/GetRemoteFileWork.h index 0888701fcf..2d89c140dc 100644 --- a/src/historywork/GetRemoteFileWork.h +++ b/src/historywork/GetRemoteFileWork.h @@ -17,20 +17,21 @@ class GetRemoteFileWork : public RunCommandWork std::string mLocal; std::shared_ptr mArchive; std::shared_ptr mCurrentArchive; - void getCommand(std::string& cmdLine, std::string& outFile) override; + RunCommandInfo getCommand() override; public: // Passing `nullptr` for the archive argument will cause the work to // select a new readable history archive at random each time it runs / // retries. - GetRemoteFileWork(Application& app, WorkParent& parent, + GetRemoteFileWork(Application& app, std::function callback, std::string const& remote, std::string const& local, std::shared_ptr archive = nullptr, - size_t maxRetries = Work::RETRY_A_LOT); - ~GetRemoteFileWork(); - void onReset() override; + size_t maxRetries = BasicWork::RETRY_A_LOT); + ~GetRemoteFileWork() = default; - Work::State onSuccess() override; + protected: + void onSuccess() override; void onFailureRaise() override; + void onFailureRetry() override; }; } diff --git a/src/historywork/GunzipFileWork.cpp b/src/historywork/GunzipFileWork.cpp index 856c4d8f51..f6896cdf1f 100644 --- a/src/historywork/GunzipFileWork.cpp +++ b/src/historywork/GunzipFileWork.cpp @@ -7,11 +7,10 @@ namespace stellar { - -GunzipFileWork::GunzipFileWork(Application& app, WorkParent& parent, +GunzipFileWork::GunzipFileWork(Application& app, std::function callback, std::string const& filenameGz, bool keepExisting, size_t maxRetries) - : RunCommandWork(app, parent, std::string("gunzip-file ") + filenameGz, + : RunCommandWork(app, callback, std::string("gunzip-file ") + filenameGz, maxRetries) , mFilenameGz(filenameGz) , mKeepExisting(keepExisting) @@ -19,14 +18,10 @@ GunzipFileWork::GunzipFileWork(Application& app, WorkParent& parent, fs::checkGzipSuffix(mFilenameGz); } -GunzipFileWork::~GunzipFileWork() -{ - clearChildren(); -} - -void -GunzipFileWork::getCommand(std::string& cmdLine, std::string& outFile) +RunCommandInfo +GunzipFileWork::getCommand() { + std::string cmdLine, outFile; cmdLine = "gzip -d "; if (mKeepExisting) { @@ -34,11 +29,21 @@ GunzipFileWork::getCommand(std::string& cmdLine, std::string& outFile) outFile = mFilenameGz.substr(0, mFilenameGz.size() - 3); } cmdLine += mFilenameGz; + return RunCommandInfo(cmdLine, outFile); +} + +void +GunzipFileWork::onFailureRaise() +{ + std::string filenameNoGz = mFilenameGz.substr(0, mFilenameGz.size() - 3); + std::remove(filenameNoGz.c_str()); } void -GunzipFileWork::onReset() +GunzipFileWork::onFailureRetry() { + // We shouldn't rely on atomicity of gzip (it's not anyways), + // hence clean up on failure std::string filenameNoGz = mFilenameGz.substr(0, mFilenameGz.size() - 3); std::remove(filenameNoGz.c_str()); } diff --git a/src/historywork/GunzipFileWork.h b/src/historywork/GunzipFileWork.h index 1b9ecc5fa6..75bbdbc6cf 100644 --- a/src/historywork/GunzipFileWork.h +++ b/src/historywork/GunzipFileWork.h @@ -13,13 +13,16 @@ class GunzipFileWork : public RunCommandWork { std::string mFilenameGz; bool mKeepExisting; - void getCommand(std::string& cmdLine, std::string& outFile) override; + RunCommandInfo getCommand() override; public: - GunzipFileWork(Application& app, WorkParent& parent, + GunzipFileWork(Application& app, std::function callback, std::string const& filenameGz, bool keepExisting = false, size_t maxRetries = Work::RETRY_NEVER); - ~GunzipFileWork(); - void onReset() override; + ~GunzipFileWork() = default; + + protected: + void onFailureRaise() override; + void onFailureRetry() override; }; } diff --git a/src/historywork/GzipFileWork.cpp b/src/historywork/GzipFileWork.cpp index 1dd5b24bf7..8a4cbe08de 100644 --- a/src/historywork/GzipFileWork.cpp +++ b/src/historywork/GzipFileWork.cpp @@ -8,9 +8,9 @@ namespace stellar { -GzipFileWork::GzipFileWork(Application& app, WorkParent& parent, +GzipFileWork::GzipFileWork(Application& app, std::function callback, std::string const& filenameNoGz, bool keepExisting) - : RunCommandWork(app, parent, std::string("gzip-file ") + filenameNoGz) + : RunCommandWork(app, callback, std::string("gzip-file ") + filenameNoGz) , mFilenameNoGz(filenameNoGz) , mKeepExisting(keepExisting) { @@ -19,25 +19,34 @@ GzipFileWork::GzipFileWork(Application& app, WorkParent& parent, GzipFileWork::~GzipFileWork() { - clearChildren(); } void -GzipFileWork::onReset() +GzipFileWork::onFailureRetry() { std::string filenameGz = mFilenameNoGz + ".gz"; std::remove(filenameGz.c_str()); } void -GzipFileWork::getCommand(std::string& cmdLine, std::string& outFile) +GzipFileWork::onFailureRaise() { - cmdLine = "gzip "; + std::string filenameGz = mFilenameNoGz + ".gz"; + std::remove(filenameGz.c_str()); +} + +RunCommandInfo +GzipFileWork::getCommand() +{ + std::string cmdLine = "gzip "; + std::string outFile; if (mKeepExisting) { cmdLine += "-c "; outFile = mFilenameNoGz + ".gz"; } cmdLine += mFilenameNoGz; + + return RunCommandInfo(cmdLine, outFile); } } diff --git a/src/historywork/GzipFileWork.h b/src/historywork/GzipFileWork.h index b200e29cea..8593203cf9 100644 --- a/src/historywork/GzipFileWork.h +++ b/src/historywork/GzipFileWork.h @@ -13,12 +13,15 @@ class GzipFileWork : public RunCommandWork { std::string mFilenameNoGz; bool mKeepExisting; - void getCommand(std::string& cmdLine, std::string& outFile) override; + RunCommandInfo getCommand() override; public: - GzipFileWork(Application& app, WorkParent& parent, + GzipFileWork(Application& app, std::function callback, std::string const& filenameNoGz, bool keepExisting = false); ~GzipFileWork(); - void onReset() override; + + protected: + void onFailureRetry() override; + void onFailureRaise() override; }; } diff --git a/src/historywork/MakeRemoteDirWork.cpp b/src/historywork/MakeRemoteDirWork.cpp index 4479b8fdc5..81196c7d7b 100644 --- a/src/historywork/MakeRemoteDirWork.cpp +++ b/src/historywork/MakeRemoteDirWork.cpp @@ -9,10 +9,11 @@ namespace stellar { -MakeRemoteDirWork::MakeRemoteDirWork(Application& app, WorkParent& parent, +MakeRemoteDirWork::MakeRemoteDirWork(Application& app, + std::function callback, std::string const& dir, std::shared_ptr archive) - : RunCommandWork(app, parent, std::string("make-remote-dir ") + dir) + : RunCommandWork(app, callback, std::string("make-remote-dir ") + dir) , mDir(dir) , mArchive(archive) { @@ -21,23 +22,23 @@ MakeRemoteDirWork::MakeRemoteDirWork(Application& app, WorkParent& parent, MakeRemoteDirWork::~MakeRemoteDirWork() { - clearChildren(); } -void -MakeRemoteDirWork::getCommand(std::string& cmdLine, std::string& outFile) +RunCommandInfo +MakeRemoteDirWork::getCommand() { + std::string cmdLine; if (mArchive->hasMkdirCmd()) { cmdLine = mArchive->mkdirCmd(mDir); } + return RunCommandInfo(cmdLine, std::string()); } -Work::State +void MakeRemoteDirWork::onSuccess() { mArchive->markSuccess(); - return RunCommandWork::onSuccess(); } void diff --git a/src/historywork/MakeRemoteDirWork.h b/src/historywork/MakeRemoteDirWork.h index 00512c5a1c..6a69abf464 100644 --- a/src/historywork/MakeRemoteDirWork.h +++ b/src/historywork/MakeRemoteDirWork.h @@ -15,15 +15,16 @@ class MakeRemoteDirWork : public RunCommandWork { std::string mDir; std::shared_ptr mArchive; - void getCommand(std::string& cmdLine, std::string& outFile) override; + RunCommandInfo getCommand() override; public: - MakeRemoteDirWork(Application& app, WorkParent& parent, + MakeRemoteDirWork(Application& app, std::function callback, std::string const& dir, std::shared_ptr archive); ~MakeRemoteDirWork(); - Work::State onSuccess() override; + protected: + void onSuccess() override; void onFailureRaise() override; }; } diff --git a/src/historywork/PutHistoryArchiveStateWork.cpp b/src/historywork/PutHistoryArchiveStateWork.cpp index 2096f82cb0..5c66f059a8 100644 --- a/src/historywork/PutHistoryArchiveStateWork.cpp +++ b/src/historywork/PutHistoryArchiveStateWork.cpp @@ -10,74 +10,93 @@ namespace stellar { - PutHistoryArchiveStateWork::PutHistoryArchiveStateWork( - Application& app, WorkParent& parent, HistoryArchiveState const& state, - std::shared_ptr archive) - : Work(app, parent, "put-history-archive-state") + Application& app, + HistoryArchiveState const& state, std::shared_ptr archive) + : Work(app, "put-history-archive-state") , mState(state) , mArchive(archive) , mLocalFilename(HistoryArchiveState::localName(app, archive->getName())) { } -PutHistoryArchiveStateWork::~PutHistoryArchiveStateWork() +void +PutHistoryArchiveStateWork::doReset() { - clearChildren(); + mPutRemoteFileWork.reset(); + std::remove(mLocalFilename.c_str()); } void -PutHistoryArchiveStateWork::onReset() +PutHistoryArchiveStateWork::onFailureRetry() { - clearChildren(); - mPutRemoteFileWork.reset(); std::remove(mLocalFilename.c_str()); } void -PutHistoryArchiveStateWork::onRun() +PutHistoryArchiveStateWork::onFailureRaise() +{ + std::remove(mLocalFilename.c_str()); +} + +BasicWork::State +PutHistoryArchiveStateWork::doWork() { if (!mPutRemoteFileWork) { try { mState.save(mLocalFilename); - scheduleSuccess(); + spawnPublishWork(); } catch (std::runtime_error& e) { CLOG(ERROR, "History") << "error loading history state: " << e.what(); - scheduleFailure(); + return WORK_FAILURE_RETRY; } } else { - scheduleSuccess(); + if (allChildrenSuccessful()) + { + return WORK_SUCCESS; + } + else if (anyChildRaiseFailure()) + { + return WORK_FAILURE_RETRY; + } + else if (!anyChildRunning()) + { + return WORK_WAITING; + } } + return WORK_RUNNING; } -Work::State -PutHistoryArchiveStateWork::onSuccess() +void +PutHistoryArchiveStateWork::spawnPublishWork() { - if (!mPutRemoteFileWork) - { - // Put the file in the history/ww/xx/yy/history-wwxxyyzz.json file - auto seqName = HistoryArchiveState::remoteName(mState.currentLedger); - auto seqDir = HistoryArchiveState::remoteDir(mState.currentLedger); - mPutRemoteFileWork = - addWork(mLocalFilename, seqName, mArchive); - mPutRemoteFileWork->addWork(seqDir, mArchive); + // Put the file in the history/ww/xx/yy/history-wwxxyyzz.json file + auto seqName = HistoryArchiveState::remoteName(mState.currentLedger); + auto seqDir = HistoryArchiveState::remoteDir(mState.currentLedger); - // Also put it in the .well-known/stellar-history.json file - auto wkName = HistoryArchiveState::wellKnownRemoteName(); - auto wkDir = HistoryArchiveState::wellKnownRemoteDir(); - auto wkWork = - addWork(mLocalFilename, wkName, mArchive); - wkWork->addWork(wkDir, mArchive); + auto w1 = std::make_shared(mApp, seqDir, mArchive); + auto w2 = std::make_shared(mApp, mLocalFilename, + seqName, mArchive); - return WORK_PENDING; - } - return WORK_SUCCESS; + std::vector> seq{w1, w2}; + mPutRemoteFileWork = + addWork("put-history-file-sequence", seq); + + // Also put it in the .well-known/stellar-history.json file + auto wkName = HistoryArchiveState::wellKnownRemoteName(); + auto wkDir = HistoryArchiveState::wellKnownRemoteDir(); + + auto w3 = std::make_shared(mApp, wkDir, mArchive); + auto w4 = std::make_shared(mApp, mLocalFilename, + wkName, mArchive); + std::vector> seqWk{w3, w4}; + addWork("put-history-well-known-sequence", seqWk); } } diff --git a/src/historywork/PutHistoryArchiveStateWork.h b/src/historywork/PutHistoryArchiveStateWork.h index 3c5c4d64d4..a55b15c7cf 100644 --- a/src/historywork/PutHistoryArchiveStateWork.h +++ b/src/historywork/PutHistoryArchiveStateWork.h @@ -16,16 +16,21 @@ class PutHistoryArchiveStateWork : public Work { HistoryArchiveState const& mState; std::shared_ptr mArchive; - std::string mLocalFilename; - std::shared_ptr mPutRemoteFileWork; + std::string const mLocalFilename; + std::shared_ptr mPutRemoteFileWork; + + void spawnPublishWork(); public: - PutHistoryArchiveStateWork(Application& app, WorkParent& parent, + PutHistoryArchiveStateWork(Application& app, HistoryArchiveState const& state, std::shared_ptr archive); - ~PutHistoryArchiveStateWork(); - void onReset() override; - void onRun() override; - Work::State onSuccess() override; + ~PutHistoryArchiveStateWork() = default; + + protected: + void doReset() override; + State doWork() override; + void onFailureRetry() override; + void onFailureRaise() override; }; -} +} \ No newline at end of file diff --git a/src/historywork/PutRemoteFileWork.cpp b/src/historywork/PutRemoteFileWork.cpp index c2a977bd10..dab7be6568 100644 --- a/src/historywork/PutRemoteFileWork.cpp +++ b/src/historywork/PutRemoteFileWork.cpp @@ -9,11 +9,12 @@ namespace stellar { -PutRemoteFileWork::PutRemoteFileWork(Application& app, WorkParent& parent, +PutRemoteFileWork::PutRemoteFileWork(Application& app, + std::function callback, std::string const& local, std::string const& remote, std::shared_ptr archive) - : RunCommandWork(app, parent, std::string("put-remote-file ") + remote) + : RunCommandWork(app, callback, std::string("put-remote-file ") + remote) , mRemote(remote) , mLocal(local) , mArchive(archive) @@ -24,20 +25,19 @@ PutRemoteFileWork::PutRemoteFileWork(Application& app, WorkParent& parent, PutRemoteFileWork::~PutRemoteFileWork() { - clearChildren(); } -void -PutRemoteFileWork::getCommand(std::string& cmdLine, std::string& outFile) +RunCommandInfo +PutRemoteFileWork::getCommand() { - cmdLine = mArchive->putFileCmd(mLocal, mRemote); + auto cmdLine = mArchive->putFileCmd(mLocal, mRemote); + return RunCommandInfo(cmdLine, std::string()); } -Work::State +void PutRemoteFileWork::onSuccess() { mArchive->markSuccess(); - return RunCommandWork::onSuccess(); } void diff --git a/src/historywork/PutRemoteFileWork.h b/src/historywork/PutRemoteFileWork.h index f94cdf3752..114dcc3434 100644 --- a/src/historywork/PutRemoteFileWork.h +++ b/src/historywork/PutRemoteFileWork.h @@ -13,18 +13,19 @@ class HistoryArchive; class PutRemoteFileWork : public RunCommandWork { - std::string mRemote; - std::string mLocal; + std::string const mRemote; + std::string const mLocal; std::shared_ptr mArchive; - void getCommand(std::string& cmdLine, std::string& outFile) override; + RunCommandInfo getCommand() override; public: - PutRemoteFileWork(Application& app, WorkParent& parent, - std::string const& remote, std::string const& local, + PutRemoteFileWork(Application& app, std::function callback, + std::string const& local, std::string const& remote, std::shared_ptr archive); ~PutRemoteFileWork(); - Work::State onSuccess() override; + protected: + void onSuccess() override; void onFailureRaise() override; }; } diff --git a/src/historywork/PutSnapshotFilesWork.cpp b/src/historywork/PutSnapshotFilesWork.cpp index 44d51d274d..41a3d0deac 100644 --- a/src/historywork/PutSnapshotFilesWork.cpp +++ b/src/historywork/PutSnapshotFilesWork.cpp @@ -12,51 +12,71 @@ #include "historywork/PutHistoryArchiveStateWork.h" #include "historywork/PutRemoteFileWork.h" #include "main/Application.h" +#include "util/format.h" namespace stellar { - PutSnapshotFilesWork::PutSnapshotFilesWork( - Application& app, WorkParent& parent, + Application& app, std::function callback, std::shared_ptr archive, std::shared_ptr snapshot) - : Work(app, parent, "put-snapshot-files-" + archive->getName()) + : Work(app, callback, "put-snapshot-files-" + archive->getName()) , mArchive(archive) , mSnapshot(snapshot) { } -PutSnapshotFilesWork::~PutSnapshotFilesWork() +void +PutSnapshotFilesWork::doReset() { - clearChildren(); + mPublishSnapshot.reset(); } -void -PutSnapshotFilesWork::onReset() +BasicWork::State +PutSnapshotFilesWork::doWork() { - clearChildren(); + if (!mPublishSnapshot) + { + mPublishSnapshot = addWork("publish-snapshots-files"); - mGetHistoryArchiveStateWork.reset(); - mPutFilesWork.reset(); - mPutHistoryArchiveStateWork.reset(); -} + // Phase 1: fetch remote history archive state + mPublishSnapshot->addToSequence( + mRemoteState, 0, mArchive); -Work::State -PutSnapshotFilesWork::onSuccess() -{ - // Phase 1: fetch remote history archive state - if (!mGetHistoryArchiveStateWork) + // Phase 2: put all requisite data files + mPublishSnapshot->addToSequence( + mArchive, mSnapshot, mRemoteState); + + // Phase 3: update remote history archive state + mPublishSnapshot->addToSequence( + mSnapshot->mLocalState, mArchive); + + return WORK_RUNNING; + } + else { - mGetHistoryArchiveStateWork = addWork( - "get-history-archive-state", mRemoteState, 0, mArchive); - return WORK_PENDING; + auto state = mPublishSnapshot->getState(); + return state == WORK_FAILURE_RAISE ? WORK_FAILURE_RETRY : state; } +} - // Phase 2: put all requisite data files - if (!mPutFilesWork) - { - mPutFilesWork = addWork("put-files"); +GzipAndPutFilesWork::GzipAndPutFilesWork( + Application& app, std::function callback, + std::shared_ptr archive, + std::shared_ptr snapshot, + HistoryArchiveState const& remoteState) + : Work(app, callback, "helper-put-files-" + archive->getName()) + , mArchive(archive) + , mSnapshot(snapshot) + , mRemoteState(remoteState) +{ +} +BasicWork::State +GzipAndPutFilesWork::doWork() +{ + if (!mChildrenSpawned) + { std::vector> files = { mSnapshot->mLedgerSnapFile, mSnapshot->mTransactionSnapFile, mSnapshot->mTransactionResultSnapFile, @@ -73,26 +93,40 @@ PutSnapshotFilesWork::onSuccess() } for (auto f : files) { + // Empty files are removed and shouldn't be uploaded if (f && fs::exists(f->localPath_nogz())) { - auto put = mPutFilesWork->addWork( + auto publish = addWork("publish-snapshots"); + publish->addToSequence(f->localPath_nogz(), true); + publish->addToSequence(f->remoteDir(), + mArchive); + publish->addToSequence( f->localPath_gz(), f->remoteName(), mArchive); - auto mkdir = - put->addWork(f->remoteDir(), mArchive); - mkdir->addWork(f->localPath_nogz(), true); } } - return WORK_PENDING; + mChildrenSpawned = true; } - - // Phase 3: update remote history archive state - if (!mPutHistoryArchiveStateWork) + else { - mPutHistoryArchiveStateWork = addWork( - mSnapshot->mLocalState, mArchive); - return WORK_PENDING; + if (allChildrenSuccessful()) + { + return WORK_SUCCESS; + } + else if (anyChildRaiseFailure()) + { + return WORK_FAILURE_RETRY; + } + else if (!anyChildRunning()) + { + return WORK_WAITING; + } } - - return WORK_SUCCESS; + return WORK_RUNNING; } + +void +GzipAndPutFilesWork::doReset() +{ + mChildrenSpawned = false; } +} \ No newline at end of file diff --git a/src/historywork/PutSnapshotFilesWork.h b/src/historywork/PutSnapshotFilesWork.h index 65e63ac1b2..bc7b49ac88 100644 --- a/src/historywork/PutSnapshotFilesWork.h +++ b/src/historywork/PutSnapshotFilesWork.h @@ -18,16 +18,36 @@ class PutSnapshotFilesWork : public Work std::shared_ptr mSnapshot; HistoryArchiveState mRemoteState; - std::shared_ptr mGetHistoryArchiveStateWork; - std::shared_ptr mPutFilesWork; - std::shared_ptr mPutHistoryArchiveStateWork; + std::shared_ptr mPublishSnapshot; public: - PutSnapshotFilesWork(Application& app, WorkParent& parent, + PutSnapshotFilesWork(Application& app, std::function callback, std::shared_ptr archive, std::shared_ptr snapshot); - ~PutSnapshotFilesWork(); - void onReset() override; - Work::State onSuccess() override; + ~PutSnapshotFilesWork() = default; + + protected: + void doReset() override; + State doWork() override; +}; + +class GzipAndPutFilesWork : public Work +{ + std::shared_ptr mArchive; + std::shared_ptr mSnapshot; + HistoryArchiveState const& mRemoteState; + + bool mChildrenSpawned{false}; + + public: + GzipAndPutFilesWork(Application& app, std::function callback, + std::shared_ptr archive, + std::shared_ptr snapshot, + HistoryArchiveState const& remoteState); + ~GzipAndPutFilesWork() = default; + + protected: + void doReset() override; + State doWork() override; }; } diff --git a/src/historywork/ResolveSnapshotWork.cpp b/src/historywork/ResolveSnapshotWork.cpp index 053ec6586e..92baa3b684 100644 --- a/src/historywork/ResolveSnapshotWork.cpp +++ b/src/historywork/ResolveSnapshotWork.cpp @@ -11,19 +11,15 @@ namespace stellar { ResolveSnapshotWork::ResolveSnapshotWork( - Application& app, WorkParent& parent, + Application& app, std::function callback, std::shared_ptr snapshot) - : Work(app, parent, "prepare-snapshot", Work::RETRY_FOREVER) + : BasicWork(app, callback, "prepare-snapshot", Work::RETRY_NEVER) , mSnapshot(snapshot) + , mTimer(std::make_unique(app.getClock())) { } -ResolveSnapshotWork::~ResolveSnapshotWork() -{ - clearChildren(); -} - -void +BasicWork::State ResolveSnapshotWork::onRun() { mSnapshot->mLocalState.resolveAnyReadyFutures(); @@ -32,11 +28,22 @@ ResolveSnapshotWork::onRun() mSnapshot->mLocalState.currentLedger) && mSnapshot->mLocalState.futuresAllResolved()) { - scheduleSuccess(); + return WORK_SUCCESS; } else { - scheduleFailure(); + std::weak_ptr weak( + std::static_pointer_cast(shared_from_this())); + auto handler = [weak](asio::error_code const& ec) { + auto self = weak.lock(); + if (self) + { + self->wakeUp(); + } + }; + mTimer->expires_from_now(mDelay); + mTimer->async_wait(handler); + return WORK_WAITING; } } } diff --git a/src/historywork/ResolveSnapshotWork.h b/src/historywork/ResolveSnapshotWork.h index 96fde50758..d6e936bb47 100644 --- a/src/historywork/ResolveSnapshotWork.h +++ b/src/historywork/ResolveSnapshotWork.h @@ -11,14 +11,18 @@ namespace stellar struct StateSnapshot; -class ResolveSnapshotWork : public Work +class ResolveSnapshotWork : public BasicWork { std::shared_ptr mSnapshot; + std::unique_ptr mTimer; + std::chrono::seconds const mDelay{1}; public: - ResolveSnapshotWork(Application& app, WorkParent& parent, + ResolveSnapshotWork(Application& app, std::function callback, std::shared_ptr snapshot); - ~ResolveSnapshotWork(); - void onRun() override; + ~ResolveSnapshotWork() = default; + + protected: + State onRun() override; }; } diff --git a/src/historywork/RunCommandWork.cpp b/src/historywork/RunCommandWork.cpp index 9e9ee7591b..589f394159 100644 --- a/src/historywork/RunCommandWork.cpp +++ b/src/historywork/RunCommandWork.cpp @@ -9,36 +9,51 @@ namespace stellar { -RunCommandWork::RunCommandWork(Application& app, WorkParent& parent, - std::string const& uniqueName, size_t maxRetries) - : Work(app, parent, uniqueName, maxRetries) +RunCommandWork::RunCommandWork(Application& app, + std::string const& name, size_t maxRetries) + : BasicWork(app, name, maxRetries) { } -RunCommandWork::~RunCommandWork() -{ - clearChildren(); -} - -void -RunCommandWork::onStart() +BasicWork::State +RunCommandWork::onRun() { - std::string cmd, outfile; - getCommand(cmd, outfile); - if (!cmd.empty()) + if (mDone) { - auto exit = mApp.getProcessManager().runProcess(cmd, outfile); - exit.async_wait(callComplete()); + return mEc ? BasicWork::WORK_FAILURE_RETRY : BasicWork::WORK_SUCCESS; } else { - scheduleSuccess(); + std::string cmd, outfile; + std::tie(cmd, outfile) = getCommand(); + if (!cmd.empty()) + { + auto exit = mApp.getProcessManager().runProcess(cmd, outfile); + + std::weak_ptr weak( + std::static_pointer_cast(shared_from_this())); + exit.async_wait([weak](asio::error_code const& ec) { + auto self = weak.lock(); + if (self) + { + self->mEc = ec; + self->mDone = true; + self->wakeUp(); + } + }); + return BasicWork::WORK_WAITING; + } + else + { + return BasicWork::WORK_SUCCESS; + } } } void -RunCommandWork::onRun() +RunCommandWork::onReset() { - // Do nothing: we ran the command in onStart(). + mDone = false; + mEc = asio::error_code(); } } diff --git a/src/historywork/RunCommandWork.h b/src/historywork/RunCommandWork.h index e5017a5cba..8ce0fa3d02 100644 --- a/src/historywork/RunCommandWork.h +++ b/src/historywork/RunCommandWork.h @@ -8,23 +8,27 @@ namespace stellar { +using RunCommandInfo = std::pair; -// This subclass exists for two reasons: first, to factor out a little code -// around running commands, and second to ensure that command-running -// happens from onStart rather than onRun, and that onRun is an empty -// method; this way we only run a command _once_ (when it's first -// scheduled) rather than repeatedly (racing with other copies of itself) -// when rescheduled. -class RunCommandWork : public Work +/** + * This class helps run various commands, that require + * process spawning. This work is not scheduled while it's + * waiting for a process to exit, and wakes up when it's ready + * to be scheduled again. + */ +class RunCommandWork : public BasicWork { - virtual void getCommand(std::string& cmdLine, std::string& outFile) = 0; + bool mDone{false}; + asio::error_code mEc; + virtual RunCommandInfo getCommand() = 0; public: - RunCommandWork(Application& app, WorkParent& parent, - std::string const& uniqueName, - size_t maxRetries = Work::RETRY_A_FEW); - ~RunCommandWork(); - void onStart() override; - void onRun() override; + RunCommandWork(Application& app, std::string const& name, + size_t maxRetries = BasicWork::RETRY_A_FEW); + ~RunCommandWork() = default; + + protected: + void onReset() override; + BasicWork::State onRun() override; }; } diff --git a/src/historywork/VerifyBucketWork.cpp b/src/historywork/VerifyBucketWork.cpp index 9e4220b937..003202fee3 100644 --- a/src/historywork/VerifyBucketWork.cpp +++ b/src/historywork/VerifyBucketWork.cpp @@ -16,37 +16,71 @@ namespace stellar { - VerifyBucketWork::VerifyBucketWork( - Application& app, WorkParent& parent, + Application& app, std::function callback, std::map>& buckets, std::string const& bucketFile, uint256 const& hash) - : Work(app, parent, std::string("verify-bucket-hash ") + bucketFile, - RETRY_NEVER) + : BasicWork(app, callback, "verify-bucket-hash-" + bucketFile, RETRY_NEVER) , mBuckets(buckets) , mBucketFile(bucketFile) , mHash(hash) - , mVerifyBucketSuccess{app.getMetrics().NewMeter( - {"history", "verify-bucket", "success"}, "event")} - , mVerifyBucketFailure{app.getMetrics().NewMeter( - {"history", "verify-bucket", "failure"}, "event")} { - fs::checkNoGzipSuffix(mBucketFile); } VerifyBucketWork::~VerifyBucketWork() { - clearChildren(); +} + +BasicWork::State +VerifyBucketWork::onRun() +{ + if (mDone) + { + if (mEc) + { + return WORK_FAILURE_RETRY; + } + + adoptBucket(); + return WORK_SUCCESS; + } + + spawnVerifier(); + return WORK_WAITING; +} + +void +VerifyBucketWork::adoptBucket() +{ + assert(mDone); + assert(!mEc); + + auto b = mApp.getBucketManager().adoptFileAsBucket(mBucketFile, mHash); + mBuckets[binToHex(mHash)] = b; } void -VerifyBucketWork::onStart() +VerifyBucketWork::spawnVerifier() { std::string filename = mBucketFile; uint256 hash = mHash; - Application& app = this->mApp; - auto handler = callComplete(); - app.getWorkerIOService().post([&app, filename, handler, hash]() { + Application& app = mApp; + + std::weak_ptr weak( + std::static_pointer_cast(shared_from_this())); + auto handler = [weak](asio::error_code const& ec) { + return [weak, ec]() { + auto self = weak.lock(); + if (self) + { + self->mEc = ec; + self->mDone = true; + self->wakeUp(); + } + }; + }; + + mApp.getWorkerIOService().post([&app, filename, handler, hash]() { auto hasher = SHA256::create(); asio::error_code ec; char buf[4096]; @@ -75,36 +109,11 @@ VerifyBucketWork::onStart() ec = std::make_error_code(std::errc::io_error); } } - app.getClock().getIOService().post([ec, handler]() { handler(ec); }); - }); -} - -void -VerifyBucketWork::onRun() -{ - // Do nothing: we spawned the verifier in onStart(). -} -Work::State -VerifyBucketWork::onSuccess() -{ - auto b = mApp.getBucketManager().adoptFileAsBucket(mBucketFile, mHash); - mBuckets[binToHex(mHash)] = b; - mVerifyBucketSuccess.Mark(); - return WORK_SUCCESS; -} - -void -VerifyBucketWork::onFailureRetry() -{ - mVerifyBucketFailure.Mark(); - Work::onFailureRetry(); -} - -void -VerifyBucketWork::onFailureRaise() -{ - mVerifyBucketFailure.Mark(); - Work::onFailureRaise(); + // TODO (mlo) Not ideal, but needed to prevent race conditions with + // main thread, since BasicWork's state is not thread-safe. This is a + // temporary workaround, as a cleaner solution is needed. + app.getClock().getIOService().post(handler(ec)); + }); } } diff --git a/src/historywork/VerifyBucketWork.h b/src/historywork/VerifyBucketWork.h index 8a42ab990d..6dec149512 100644 --- a/src/historywork/VerifyBucketWork.h +++ b/src/historywork/VerifyBucketWork.h @@ -17,24 +17,24 @@ namespace stellar class Bucket; -class VerifyBucketWork : public Work +class VerifyBucketWork : public BasicWork { std::map>& mBuckets; std::string mBucketFile; uint256 mHash; + bool mDone{false}; + std::error_code mEc; - medida::Meter& mVerifyBucketSuccess; - medida::Meter& mVerifyBucketFailure; + void adoptBucket(); + void spawnVerifier(); public: - VerifyBucketWork(Application& app, WorkParent& parent, + VerifyBucketWork(Application& app, std::function callback, std::map>& buckets, std::string const& bucketFile, uint256 const& hash); ~VerifyBucketWork(); - void onRun() override; - void onStart() override; - Work::State onSuccess() override; - void onFailureRetry() override; - void onFailureRaise() override; + + protected: + BasicWork::State onRun() override; }; } diff --git a/src/historywork/WriteSnapshotWork.cpp b/src/historywork/WriteSnapshotWork.cpp index a97d29f171..939de62b28 100644 --- a/src/historywork/WriteSnapshotWork.cpp +++ b/src/historywork/WriteSnapshotWork.cpp @@ -13,48 +13,69 @@ namespace stellar { -WriteSnapshotWork::WriteSnapshotWork(Application& app, WorkParent& parent, +WriteSnapshotWork::WriteSnapshotWork(Application& app, + std::function callback, std::shared_ptr snapshot) - : Work(app, parent, "write-snapshot", Work::RETRY_A_LOT) + : BasicWork(app, callback, "write-snapshot", Work::RETRY_A_LOT) , mSnapshot(snapshot) { } -WriteSnapshotWork::~WriteSnapshotWork() +BasicWork::State +WriteSnapshotWork::onRun() { - clearChildren(); -} + if (mDone) + { + return mSuccess ? WORK_SUCCESS : WORK_FAILURE_RETRY; + } -void -WriteSnapshotWork::onStart() -{ - auto handler = callComplete(); - auto snap = mSnapshot; - auto work = [handler, snap]() { - asio::error_code ec; + std::weak_ptr weak( + std::static_pointer_cast(shared_from_this())); + + auto handler = [weak](bool success) { + return [weak, success]() { + auto self = weak.lock(); + if (self) + { + self->mDone = true; + self->mSuccess = success; + self->wakeUp(); + } + }; + }; + + auto work = [weak, handler]() { + auto self = weak.lock(); + if (!self) + { + return; + } + + auto snap = self->mSnapshot; + bool success = true; if (!snap->writeHistoryBlocks()) { - ec = std::make_error_code(std::errc::io_error); + success = false; } - snap->mApp.getClock().getIOService().post( - [handler, ec]() { handler(ec); }); + + // TODO (mlo) Not ideal, but needed to prevent race conditions with + // main thread, since BasicWork's state is not thread-safe. This is a + // temporary workaround, as a cleaner solution is needed. + self->mApp.getClock().getIOService().post(handler(success)); }; // Throw the work over to a worker thread if we can use DB pools, // otherwise run on main thread. + // TODO check if this is actually correct if (mApp.getDatabase().canUsePool()) { mApp.getWorkerIOService().post(work); + return WORK_WAITING; } else { work(); + return WORK_RUNNING; } } - -void -WriteSnapshotWork::onRun() -{ - // Do nothing: we spawned the writer in onStart(). -} } diff --git a/src/historywork/WriteSnapshotWork.h b/src/historywork/WriteSnapshotWork.h index 6a2665b013..831278a320 100644 --- a/src/historywork/WriteSnapshotWork.h +++ b/src/historywork/WriteSnapshotWork.h @@ -11,15 +11,18 @@ namespace stellar struct StateSnapshot; -class WriteSnapshotWork : public Work +class WriteSnapshotWork : public BasicWork { std::shared_ptr mSnapshot; + bool mDone{false}; + bool mSuccess{true}; public: - WriteSnapshotWork(Application& app, WorkParent& parent, + WriteSnapshotWork(Application& app, std::function callback, std::shared_ptr snapshot); - ~WriteSnapshotWork(); - void onStart() override; - void onRun() override; + ~WriteSnapshotWork() = default; + + protected: + State onRun() override; }; } diff --git a/src/main/Application.h b/src/main/Application.h index 34505231d4..a715f9b1e0 100644 --- a/src/main/Application.h +++ b/src/main/Application.h @@ -39,6 +39,7 @@ class PersistentState; class LoadGenerator; class CommandHandler; class WorkManager; +class WorkScheduler; class BanManager; class StatusManager; @@ -205,6 +206,7 @@ class Application virtual PersistentState& getPersistentState() = 0; virtual CommandHandler& getCommandHandler() = 0; virtual WorkManager& getWorkManager() = 0; + virtual WorkScheduler& getWorkScheduler() = 0; virtual BanManager& getBanManager() = 0; virtual StatusManager& getStatusManager() = 0; diff --git a/src/main/ApplicationImpl.cpp b/src/main/ApplicationImpl.cpp index 13a896458e..5357b0499d 100644 --- a/src/main/ApplicationImpl.cpp +++ b/src/main/ApplicationImpl.cpp @@ -44,6 +44,7 @@ #include "simulation/LoadGenerator.h" #include "util/StatusManager.h" #include "work/WorkManager.h" +#include "work/WorkScheduler.h" #include "util/Logging.h" #include "util/TmpDir.h" @@ -119,6 +120,7 @@ ApplicationImpl::initialize() mProcessManager = ProcessManager::create(*this); mCommandHandler = std::make_unique(*this); mWorkManager = WorkManager::create(*this); + mWorkScheduler = WorkScheduler::create(*this); mBanManager = BanManager::create(*this); mStatusManager = std::make_unique(); @@ -293,6 +295,10 @@ ApplicationImpl::~ApplicationImpl() { mNtpSynchronizationChecker->shutdown(); } + if (mWorkScheduler) + { + mWorkScheduler->shutdown(); + } if (mProcessManager) { mProcessManager->shutdown(); @@ -419,6 +425,10 @@ ApplicationImpl::gracefulStop() { mNtpSynchronizationChecker->shutdown(); } + if (mWorkScheduler) + { + mWorkScheduler->shutdown(); + } if (mProcessManager) { mProcessManager->shutdown(); @@ -728,6 +738,12 @@ ApplicationImpl::getWorkManager() return *mWorkManager; } +WorkScheduler& +ApplicationImpl::getWorkScheduler() +{ + return *mWorkScheduler; +} + BanManager& ApplicationImpl::getBanManager() { diff --git a/src/main/ApplicationImpl.h b/src/main/ApplicationImpl.h index 238e2973f9..37e963c272 100644 --- a/src/main/ApplicationImpl.h +++ b/src/main/ApplicationImpl.h @@ -68,6 +68,7 @@ class ApplicationImpl : public Application virtual PersistentState& getPersistentState() override; virtual CommandHandler& getCommandHandler() override; virtual WorkManager& getWorkManager() override; + virtual WorkScheduler& getWorkScheduler() override; virtual BanManager& getBanManager() override; virtual StatusManager& getStatusManager() override; @@ -143,6 +144,7 @@ class ApplicationImpl : public Application std::shared_ptr mProcessManager; std::unique_ptr mCommandHandler; std::shared_ptr mWorkManager; + std::shared_ptr mWorkScheduler; std::unique_ptr mPersistentState; std::unique_ptr mLoadGenerator; std::unique_ptr mBanManager; diff --git a/src/work/BasicWork.cpp b/src/work/BasicWork.cpp new file mode 100644 index 0000000000..bb685013ef --- /dev/null +++ b/src/work/BasicWork.cpp @@ -0,0 +1,296 @@ +// Copyright 2018 Stellar Development Foundation and contributors. Licensed +// under the Apache License, Version 2.0. See the COPYING file at the root +// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0 + +#include "work/BasicWork.h" +#include "lib/util/format.h" +#include "util/Logging.h" +#include "util/Math.h" + +namespace stellar +{ + +size_t const BasicWork::RETRY_NEVER = 0; +size_t const BasicWork::RETRY_ONCE = 1; +size_t const BasicWork::RETRY_A_FEW = 5; +size_t const BasicWork::RETRY_A_LOT = 32; +size_t const BasicWork::RETRY_FOREVER = 0xffffffff; + +BasicWork::BasicWork(Application& app, std::string name, size_t maxRetries) + : mApp(app) + , mName(std::move(name)) + , mMaxRetries(maxRetries) +{ +} + +BasicWork::~BasicWork() +{ +} + +void +BasicWork::shutdown() +{ + mState = WORK_DESTRUCTING; +} + +std::string const& +BasicWork::getName() const +{ + return mName; +} + +std::string +BasicWork::getStatus() const +{ + // Work is in `WAITING` state when retrying + auto state = mRetryTimer ? WORK_FAILURE_RETRY : mState; + + switch (state) + { + case WORK_PENDING: + return fmt::format("Pending: {:s}", getName()); + case WORK_RUNNING: + return fmt::format("Running: {:s}", getName()); + case WORK_WAITING: + return fmt::format("Waiting: {:s}", getName()); + case WORK_SUCCESS: + return fmt::format("Succeeded: {:s}", getName()); + case WORK_FAILURE_RETRY: + { + auto eta = getRetryETA(); + return fmt::format("Retrying in {:d} sec: {:s}", eta, getName()); + } + case WORK_FAILURE_RAISE: + return fmt::format("Failed: {:s}", getName()); + default: + assert(false); + return ""; + } +} + +bool +BasicWork::isDone() const +{ + return mState == WORK_SUCCESS || mState == WORK_FAILURE_RAISE; +} + +std::string +BasicWork::stateName(State st) +{ + switch (st) + { + case WORK_PENDING: + return "WORK_PENDING"; + case WORK_WAITING: + return "WORK_WAITING"; + case WORK_RUNNING: + return "WORK_RUNNING"; + case WORK_SUCCESS: + return "WORK_SUCCESS"; + case WORK_FAILURE_RETRY: + return "WORK_FAILURE_RETRY"; + case WORK_FAILURE_RAISE: + return "WORK_FAILURE_RAISE"; + default: + throw std::runtime_error("Unknown Work::State"); + } +} + +void +BasicWork::reset() +{ + CLOG(DEBUG, "Work") << "resetting " << getName(); + + if (mRetryTimer) + { + mRetryTimer->cancel(); + mRetryTimer.reset(); + } + + onReset(); +} + +std::function +BasicWork::wakeUpCallback() +{ + std::weak_ptr weak = shared_from_this(); + auto callback = [weak]() { + auto self = weak.lock(); + if (self) + { + self->wakeUp(); + } + }; + return callback; +} + +void +BasicWork::restartWork(std::function wakeUpParent) +{ + if (mState != WORK_PENDING) + { + // Only restart if work is in terminal state + assert(isDone()); + reset(); + setState(WORK_PENDING); + } + + mNotifyCallback = wakeUpParent; + setState(WORK_RUNNING); + reset(); +} + +void +BasicWork::waitForRetry() +{ + if (mRetryTimer) + { + throw std::runtime_error( + fmt::format("Retry timer for {} already exists!", getName())); + } + + mRetryTimer = std::make_unique(mApp.getClock()); + std::weak_ptr weak = shared_from_this(); + auto t = getRetryDelay(); + mRetryTimer->expires_from_now(t); + CLOG(WARNING, "Work") + << "Scheduling retry #" << (mRetries + 1) << "/" << mMaxRetries + << " in " << std::chrono::duration_cast(t).count() + << " sec, for " << getName(); + setState(WORK_WAITING); + mRetryTimer->async_wait( + [weak]() { + auto self = weak.lock(); + if (!self) + { + return; + } + assert(self->getState() == WORK_WAITING); + self->mRetries++; + self->mRetryTimer.reset(); + self->wakeUp(); + }, + // TODO for abort logic: what needs to happen on cancellation? + VirtualTimer::onFailureNoop); +} + +void +BasicWork::onReset() +{ +} + +void +BasicWork::onSuccess() +{ +} + +void +BasicWork::onFailureRetry() +{ +} + +void +BasicWork::onFailureRaise() +{ +} + +void +BasicWork::onWakeUp() +{ +} + +BasicWork::State +BasicWork::getState() const +{ + return mState; +} + +void +BasicWork::setState(BasicWork::State st) +{ + // TODO Enforce proper state transitions + // by adding extra asserts + + auto maxR = getMaxRetries(); + if (st == WORK_FAILURE_RETRY && (mRetries >= maxR)) + { + CLOG(WARNING, "Work") + << "Reached retry limit " << maxR << " for " << getName(); + st = WORK_FAILURE_RAISE; + } + + if (st != mState) + { + CLOG(DEBUG, "Work") << "work " << getName() << " : " + << stateName(mState) << " -> " << stateName(st); + mState = st; + } + + switch (mState) + { + case WORK_SUCCESS: + onSuccess(); + reset(); + break; + case WORK_FAILURE_RAISE: + onFailureRaise(); + reset(); + break; + case WORK_FAILURE_RETRY: + onFailureRetry(); + reset(); + waitForRetry(); + break; + default: + break; + } +} + +void +BasicWork::wakeUp() +{ + if (mState == WORK_RUNNING || mState == WORK_DESTRUCTING) + { + return; + } + + setState(WORK_RUNNING); + onWakeUp(); + if (mNotifyCallback) + { + mNotifyCallback(); + } +} + +void +BasicWork::crankWork() +{ + assert(!isDone() && mState != WORK_WAITING); + + auto nextState = onRun(); + setState(nextState); +} + +size_t +BasicWork::getMaxRetries() const +{ + return mMaxRetries; +} + +VirtualClock::duration +BasicWork::getRetryDelay() const +{ + // Cap to 4096sec == a little over an hour. + uint64_t m = 2 << std::min(uint64_t(12), uint64_t(mRetries)); + return std::chrono::seconds(rand_uniform(1ULL, m)); +} + +uint64_t +BasicWork::getRetryETA() const +{ + uint64_t now = mApp.timeNow(); + uint64_t retry = + mRetryTimer ? VirtualClock::to_time_t(mRetryTimer->expiry_time()) : 0; + return now > retry ? 0 : retry - now; +} +} \ No newline at end of file diff --git a/src/work/BasicWork.h b/src/work/BasicWork.h new file mode 100644 index 0000000000..c165034be5 --- /dev/null +++ b/src/work/BasicWork.h @@ -0,0 +1,121 @@ +// Copyright 2018 Stellar Development Foundation and contributors. Licensed +// under the Apache License, Version 2.0. See the COPYING file at the root +// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0 +#pragma once + +#include "main/Application.h" + +namespace stellar +{ + +/** BasicWork is an implementation of a finite state machine, + * that is used for async or long-running tasks that: + * - May need to be broken into steps, so as not to block the main thread + * - May fail and need to retry, after some delay + * + * BasicWork manages all state transitions via `crankWork` as well as retries. + * While customers can trigger cranking and check on BasicWork's status, + * it is implementers' responsibility to implement `onRun`, that + * hints the next desired state. + */ +class BasicWork : public std::enable_shared_from_this, + private NonMovableOrCopyable +{ + public: + static size_t const RETRY_NEVER; + static size_t const RETRY_ONCE; + static size_t const RETRY_A_FEW; + static size_t const RETRY_A_LOT; + static size_t const RETRY_FOREVER; + + // TODO this needs to be split into 2 enums: + // one for publicly exposed state and one for + // internal transitions within BasicWork + enum State + { + WORK_PENDING, + WORK_RUNNING, + WORK_SUCCESS, + WORK_WAITING, + WORK_FAILURE_RETRY, + WORK_FAILURE_RAISE, + WORK_DESTRUCTING + }; + + BasicWork(Application& app, std::string name, size_t maxRetries); + virtual ~BasicWork(); + + std::string const& getName() const; + virtual std::string getStatus() const; + State getState() const; + bool isDone() const; + + size_t getMaxRetries() const; + uint64_t getRetryETA() const; + + // Main method for state transition, mostly dictated by `onRun` + void crankWork(); + void restartWork(std::function wakeUpParent); + virtual void shutdown(); + + protected: + // Implementers can override these callbacks to customize functionality + // `onReset` is called on any important state transition, e.g. when new work + // is added, on a retry, success and failure. This method ensures that + // all the references (including children) are properly cleaned up for + // safe destruction. `onRun` performs necessary work logic, and hints the + // next state transition. `onSuccess` is called when work transitions into + // WORK_SUCCESS state. Similarly, `onFailure*` methods are called upon + // transitioning into appropriate failure state. An example usage would be + // cleaning up side effects, like downloaded files in these methods. If you + // want to force failure (and reset / retry) you can return + // WORK_FAILURE_RETRY or WORK_FAILURE_RAISE. After a retry count is + // passed, WORK_FAILURE_RETRY means WORK_FAILURE_RAISE anyways. + // WORK_FAILURE_FATAL is equivalent to WORK_FAILURE_RAISE passed up in + // the work chain - when WORK_FAILURE_FATAL is raised in one work item, + // all work items that leaded to this one will also fail without retrying. + + virtual void onReset(); + virtual State onRun() = 0; + virtual void onWakeUp(); + + // TODO potentially remove these if they are not actually helpful + virtual void onFailureRetry(); + virtual void onFailureRaise(); + virtual void onSuccess(); + + // A helper method that implementers can use if they plan to + // utilize WAITING state. This tells the work to return to RUNNING + // state, and propagate the notification up to the scheduler + // An example use of this would be RunCommandWork: + // a timer is used to async_wait for a process to exit, + // with a call to `wakeUp` upon completion. + void wakeUp(); + + // Default wakeUp callback that implementers can use + std::function wakeUpCallback(); + + Application& mApp; + + private: + // TODO (mlo) look into uses of reset in current interface + // Proper cleanup needs to happen at some point, and `reset` + // might not be the best place. For instance, `GetRemoteFileWork` + // calls std::remove is `onReset`, so an attempt to remove a non-existent + // file happens every time a child is added, which is inefficient. + void reset(); + + VirtualClock::duration getRetryDelay() const; + static std::string stateName(State st); + void setState(State s); + void waitForRetry(); + + std::function mNotifyCallback; + std::string const mName; + std::unique_ptr mRetryTimer; + + State mState{WORK_PENDING}; + size_t mRetries{0}; + size_t const mMaxRetries{RETRY_A_FEW}; +}; +} \ No newline at end of file diff --git a/src/work/Work.cpp b/src/work/Work.cpp index 489f1f3d6d..ec3eeef06e 100644 --- a/src/work/Work.cpp +++ b/src/work/Work.cpp @@ -4,16 +4,7 @@ #include "work/Work.h" #include "lib/util/format.h" -#include "main/Application.h" #include "util/Logging.h" -#include "util/Math.h" -#include "work/WorkManager.h" -#include "work/WorkParent.h" - -#include "medida/meter.h" -#include "medida/metrics_registry.h" - -#include namespace stellar { @@ -24,414 +15,203 @@ size_t const Work::RETRY_A_FEW = 5; size_t const Work::RETRY_A_LOT = 32; size_t const Work::RETRY_FOREVER = 0xffffffff; -Work::Work(Application& app, WorkParent& parent, std::string uniqueName, +Work::Work(Application& app, std::string name, size_t maxRetries) - : WorkParent(app) - , mParent(parent.shared_from_this()) - , mUniqueName(uniqueName) - , mMaxRetries(maxRetries) + : BasicWork(app, std::move(name), maxRetries) + , mNextChild(mChildren.begin()) { } Work::~Work() { - clearChildren(); + // TODO consider this assert when abort logic is implemented + // assert(!hasChildren()); } -std::string -Work::getUniqueName() const +void +Work::shutdown() { - return mUniqueName; + for (auto& c : mChildren) + { + c->shutdown(); + } + BasicWork::shutdown(); } -std::string -Work::getStatus() const +BasicWork::State +Work::onRun() { - switch (mState) - { - case WORK_PENDING: + auto child = yieldNextRunningChild(); + if (child) { - size_t i = 0; - for (auto const& c : mChildren) - { - if (c.second->isDone()) - { - ++i; - } - } - auto total = mChildren.size(); - return fmt::format("Awaiting {:d}/{:d} prerequisites of: {:s}", - total - i, total, getUniqueName()); + CLOG(DEBUG, "Work") << "Running next child " << child->getName(); + child->crankWork(); + return BasicWork::WORK_RUNNING; } - case WORK_RUNNING: - return fmt::format("Running: {:s}", getUniqueName()); - case WORK_SUCCESS: - return fmt::format("Succeded: {:s}", getUniqueName()); - case WORK_FAILURE_RETRY: + else { - auto eta = getRetryETA(); - return fmt::format("Retrying in {:d} sec: {:s}", eta, getUniqueName()); - } - case WORK_FAILURE_RAISE: - case WORK_FAILURE_FATAL: - return fmt::format("Failed: {:s}", getUniqueName()); - default: - assert(false); - return ""; + CLOG(DEBUG, "Work") << "Running self " << getName(); + return doWork(); } } -uint64_t -Work::getRetryETA() const +void +Work::onReset() { - uint64_t now = mApp.timeNow(); - uint64_t retry = - mRetryTimer ? VirtualClock::to_time_t(mRetryTimer->expiry_time()) : 0; - return now > retry ? 0 : retry - now; + // TODO upon proper implementation of WorkScheduler shutdown, + // this assert needs to move to `clearChildren` + // assert(allChildrenDone()); + clearChildren(); + doReset(); } -VirtualClock::duration -Work::getRetryDelay() const +void +Work::doReset() { - // Cap to 4096sec == a little over an hour. - uint64_t m = 2 << std::min(uint64_t(12), uint64_t(mRetries)); - return std::chrono::seconds(rand_uniform(1ULL, m)); } -size_t -Work::getMaxRetries() const +void +Work::clearChildren() { - return mMaxRetries; + mChildren.clear(); + mNextChild = mChildren.begin(); } -std::string -Work::stateName(State st) +void +Work::addChild(std::shared_ptr child) { - switch (st) + // TODO (mlo) potentially check for child duplication + + bool resetIter = !hasChildren(); + mChildren.push_back(child); + + if (resetIter) { - case WORK_PENDING: - return "WORK_PENDING"; - case WORK_RUNNING: - return "WORK_RUNNING"; - case WORK_SUCCESS: - return "WORK_SUCCESS"; - case WORK_FAILURE_RETRY: - return "WORK_FAILURE_RETRY"; - case WORK_FAILURE_RAISE: - return "WORK_FAILURE_RAISE"; - case WORK_FAILURE_FATAL: - return "WORK_FAILURE_FATAL"; - default: - throw std::runtime_error("Unknown Work::State"); + mNextChild = mChildren.begin(); } } -std::function -Work::callComplete() +bool +Work::allChildrenSuccessful() const { - std::weak_ptr weak( - std::static_pointer_cast(shared_from_this())); - return [weak](asio::error_code const& ec) { - auto self = weak.lock(); - if (!self) + for (auto& c : mChildren) + { + if (c->getState() != BasicWork::WORK_SUCCESS) { - return; + return false; } - self->complete(ec ? WORK_COMPLETE_FAILURE : WORK_COMPLETE_OK); - }; + } + return true; } -void -Work::scheduleRun() +bool +Work::allChildrenDone() const { - if (mScheduled) + for (auto& c : mChildren) { - return; - } - - std::weak_ptr weak( - std::static_pointer_cast(shared_from_this())); - CLOG(DEBUG, "Work") << "scheduling run of " << getUniqueName(); - mScheduled = true; - mApp.getClock().getIOService().post([weak]() { - auto self = weak.lock(); - if (!self) + if (!c->isDone()) { - return; + return false; } - self->mScheduled = false; - self->run(); - }); + } + return true; } -void -Work::scheduleComplete(CompleteResult result) +bool +Work::anyChildRunning() const { - if (mScheduled) + for (auto& c : mChildren) { - return; - } - - std::weak_ptr weak( - std::static_pointer_cast(shared_from_this())); - CLOG(DEBUG, "Work") << "scheduling completion of " << getUniqueName(); - mScheduled = true; - mApp.getClock().getIOService().post([weak, result]() { - auto self = weak.lock(); - if (!self) + if (c->getState() == BasicWork::WORK_RUNNING) { - return; + return true; } - self->mScheduled = false; - self->complete(result); - }); -} - -void -Work::scheduleRetry() -{ - if (mScheduled) - { - return; } - - if (getState() != WORK_FAILURE_RETRY) - { - std::string msg = fmt::format("retrying {} in state {}", - getUniqueName(), stateName(getState())); - CLOG(ERROR, "Work") << msg; - throw std::runtime_error(msg); - } - - if (!mRetryTimer) - { - mRetryTimer = std::make_unique(mApp.getClock()); - } - - std::weak_ptr weak( - std::static_pointer_cast(shared_from_this())); - auto t = getRetryDelay(); - mRetryTimer->expires_from_now(t); - CLOG(WARNING, "Work") - << "Scheduling retry #" << (mRetries + 1) << "/" << mMaxRetries - << " in " << std::chrono::duration_cast(t).count() - << " sec, for " << getUniqueName(); - mScheduled = true; - mRetryTimer->async_wait( - [weak]() { - auto self = weak.lock(); - if (!self) - { - return; - } - self->mScheduled = false; - self->mRetries++; - self->reset(); - self->advance(); - }, - VirtualTimer::onFailureNoop); + return false; } -void -Work::reset() -{ - CLOG(DEBUG, "Work") << "resetting " << getUniqueName(); - setState(WORK_PENDING); - onReset(); -} - -void -Work::advance() +bool +Work::hasChildren() const { - if (getState() != WORK_PENDING) - { - return; - } - - CLOG(DEBUG, "Work") << "advancing " << getUniqueName(); - advanceChildren(); - if (allChildrenSuccessful()) - { - CLOG(DEBUG, "Work") << "all " << mChildren.size() << " children of " - << getUniqueName() << " successful, scheduling run"; - scheduleRun(); - } - else if (anyChildFatalFailure()) - { - CLOG(DEBUG, "Work") << "some of " << mChildren.size() << " children of " - << getUniqueName() << " fatally failed, scheduling " - << "fatal failure"; - scheduleFatalFailure(); - } - else if (anyChildRaiseFailure()) - { - CLOG(DEBUG, "Work") << "some of " << mChildren.size() << " children of " - << getUniqueName() << " failed, scheduling failure"; - scheduleFailure(); - } + return !mChildren.empty(); } -void -Work::run() +bool +Work::anyChildRaiseFailure() const { - if (getState() == WORK_PENDING) + for (auto& c : mChildren) { - CLOG(DEBUG, "Work") << "starting " << getUniqueName(); - mApp.getMetrics().NewMeter({"work", "unit", "start"}, "unit").Mark(); - onStart(); + if (c->getState() == BasicWork::WORK_FAILURE_RAISE) + { + return true; + } } - CLOG(DEBUG, "Work") << "running " << getUniqueName(); - mApp.getMetrics().NewMeter({"work", "unit", "run"}, "unit").Mark(); - setState(WORK_RUNNING); - onRun(); + return false; } -void -Work::complete(CompleteResult result) +std::shared_ptr +Work::yieldNextRunningChild() { - CLOG(DEBUG, "Work") << "completed " << getUniqueName(); - auto& succ = - mApp.getMetrics().NewMeter({"work", "unit", "success"}, "unit"); - auto& fail = - mApp.getMetrics().NewMeter({"work", "unit", "failure"}, "unit"); - - switch (result) + while (mNextChild != mChildren.end()) { - case WORK_COMPLETE_OK: - setState(onSuccess()); - break; - case WORK_COMPLETE_FAILURE: - setState(WORK_FAILURE_RETRY); - break; - case WORK_COMPLETE_FATAL: - setState(WORK_FAILURE_FATAL); - break; - } - - switch (getState()) - { - case WORK_SUCCESS: - succ.Mark(); - CLOG(DEBUG, "Work") - << "notifying parent of successful " << getUniqueName(); - notifyParent(); - break; - - case WORK_FAILURE_RETRY: - fail.Mark(); - onFailureRetry(); - scheduleRetry(); - break; - - case WORK_FAILURE_RAISE: - case WORK_FAILURE_FATAL: - fail.Mark(); - onFailureRaise(); - CLOG(DEBUG, "Work") << "notifying parent of failed " << getUniqueName(); - notifyParent(); - break; - - case WORK_PENDING: - succ.Mark(); - advance(); - break; - - case WORK_RUNNING: - succ.Mark(); - scheduleRun(); - break; - - default: - assert(false); - break; + auto next = *mNextChild; + mNextChild++; + if (next->getState() == BasicWork::WORK_RUNNING) + { + return next; + } } + mNextChild = mChildren.begin(); + return nullptr; } -void -Work::onReset() -{ -} - -void -Work::onStart() -{ -} - -void -Work::onRun() +WorkSequence::WorkSequence(Application& app, std::string name, + std::vector> sequence) + : Work(app, std::move(name), RETRY_A_FEW) + , mSequenceOfWork(sequence) + , mNextInSequence(mSequenceOfWork.begin()) { - scheduleSuccess(); } -Work::State -Work::onSuccess() +BasicWork::State +WorkSequence::doWork() { - return WORK_SUCCESS; -} - -void -Work::onFailureRetry() -{ -} - -void -Work::onFailureRaise() -{ -} - -Work::State -Work::getState() const -{ - return mState; -} - -bool -Work::isDone() const -{ - return mState == WORK_SUCCESS || mState == WORK_FAILURE_RAISE || - mState == WORK_FAILURE_FATAL; -} - -void -Work::setState(Work::State st) -{ - auto maxR = getMaxRetries(); - if (st == WORK_FAILURE_RETRY && (mRetries >= maxR)) + if (mNextInSequence == mSequenceOfWork.end()) { - CLOG(WARNING, "Work") - << "Reached retry limit " << maxR << " for " << getUniqueName(); - st = WORK_FAILURE_RAISE; + // Completed all the work + assert(!hasChildren()); + return WORK_SUCCESS; } - if (st != mState) + auto w = *mNextInSequence; + assert(w); + if (!hasChildren()) { - CLOG(DEBUG, "Work") << "work " << getUniqueName() << " : " - << stateName(mState) << " -> " << stateName(st); - mState = st; + w->restartWork(wakeUpCallback()); + addChild(w); + return WORK_RUNNING; } -} - -void -Work::notifyParent() -{ - auto parent = mParent.lock(); - if (parent) + else { - parent->notify(getUniqueName()); + auto state = w->getState(); + if (state == WORK_SUCCESS) + { + clearChildren(); + mNextInSequence++; + return WORK_RUNNING; + } + else if (state == WORK_FAILURE_RAISE) + { + // Work will do all the cleanup + return WORK_FAILURE_RETRY; + } + return state; } } void -Work::notify(std::string const& child) +WorkSequence::doReset() { - auto i = mChildren.find(child); - if (i == mChildren.end()) - { - CLOG(ERROR, "Work") << "work " << getUniqueName() - << " notified by unknown child " << child; - } - CLOG(DEBUG, "Work") << "notified " << getUniqueName() - << " of completed child " << child; - advance(); -} + mNextInSequence = mSequenceOfWork.begin(); } +} \ No newline at end of file diff --git a/src/work/Work.h b/src/work/Work.h index 1d7a8853e7..ce865eab13 100644 --- a/src/work/Work.h +++ b/src/work/Work.h @@ -1,140 +1,95 @@ -#pragma once - -// Copyright 2015 Stellar Development Foundation and contributors. Licensed +// Copyright 2018 Stellar Development Foundation and contributors. Licensed // under the Apache License, Version 2.0. See the COPYING file at the root // of this distribution or at http://www.apache.org/licenses/LICENSE-2.0 +#pragma once -#include "util/Timer.h" -#include "work/WorkParent.h" -#include -#include -#include +#include "main/Application.h" +#include "work/BasicWork.h" +#include +#include namespace stellar { - -class Application; -class WorkParent; - -/** Class 'Work' (and its friends 'WorkManager' and 'WorkParent') support - * structured dispatch of async or long-running activities that: - * - * - May depend on other Work before starting - * - May have parts that can run in parallel, other parts in serial - * - May need to be broken into steps, so as not to block the main thread - * - May fail and need to retry, after some delay - * - * Formerly this was managed through ad-hoc sprinkled-through-the-code - * copies of each of these facets of work-management. 'Work' is an attempt - * to make those facets uniform, systematic, and out-of-the-way of the - * logic of each piece of work. +/** + * Work is an extension of BasicWork, + * which additionally manages children. This allows the following: + * - Work might be dependent on the state of its children before performing + * its duties + * - Work may have children that are independent and could run in parallel, + * or dispatch children serially. */ - -class Work : public WorkParent +class Work : public BasicWork { - public: - static size_t const RETRY_NEVER; - static size_t const RETRY_ONCE; - static size_t const RETRY_A_FEW; - static size_t const RETRY_A_LOT; - static size_t const RETRY_FOREVER; - - enum State - { - WORK_PENDING, - WORK_RUNNING, - WORK_SUCCESS, - WORK_FAILURE_RETRY, - WORK_FAILURE_RAISE, - WORK_FAILURE_FATAL - }; - - enum CompleteResult - { - WORK_COMPLETE_OK, - WORK_COMPLETE_FAILURE, - WORK_COMPLETE_FATAL - }; + virtual ~Work(); - Work(Application& app, WorkParent& parent, std::string uniqueName, - size_t maxRetries = RETRY_A_FEW); + // Children status helper methods + bool allChildrenSuccessful() const; + bool allChildrenDone() const; + bool anyChildRaiseFailure() const; + bool anyChildRunning() const; + bool hasChildren() const; - virtual ~Work(); + void shutdown() override; - virtual std::string getUniqueName() const; - virtual std::string getStatus() const; - virtual size_t getMaxRetries() const; - uint64_t getRetryETA() const; + protected: + Work(Application& app, std::string name, + size_t retries = BasicWork::RETRY_A_FEW); + + // Note: `shared_from_this` assumes there exists a shared_ptr to the + // references class. This relates to who owns what in Work interface. + // Thus, `addWork` should be used to create Work (then the parent holds + // the reference). + template + std::shared_ptr + addWork(Args&&... args) + { + auto child = std::make_shared(mApp, std::forward(args)...); + child->restartWork(wakeUpCallback()); + addChild(child); + wakeUp(); + return child; + } - // Customize work behavior via these callbacks. onReset is called - // before any work starts (on addition, or retry). onStart is called - // when transitioning from WORK_PENDING -> WORK_RUNNING; onRun is - // called when run either from PENDING state _or_ after a SUCCESS - // callback reschedules running (see below). onFailure is only called - // on a failure before retrying; usually you can ignore it. - virtual void onReset(); - virtual void onStart(); - virtual void onRun(); - virtual void onFailureRetry(); - virtual void onFailureRaise(); + State onRun() final; + void onReset() final; - // onSuccess is a little different than the others: it's called on - // WORK_SUCCESS, but it also returns the next sate desired: if you want - // to restart or keep going, return WORK_PENDING or WORK_RUNNING - // (respectively) from onSuccess and you'll be rescheduled to run more. - // If you want to force failure (and reset / retry) you can return - // WORK_FAILURE_RETRY or WORK_FAILURE_RAISE. After a retry count is - // passed, WORK_FAILURE_RETRY means WORK_FAILURE_RAISE anyways. - // WORK_FAILURE_FATAL is equivalent to WORK_FAILURE_RAISE passed up in - // the work chain - when WORK_FAILURE_FATAL is raised in one work item, - // all work items that leaded to this one will also fail without retrying. - virtual State onSuccess(); + // Implementers decide what they want to do: spawn more children, + // wait for all children to finish, or perform Work + virtual BasicWork::State doWork() = 0; - static std::string stateName(State st); - State getState() const; - bool isDone() const; - void advance(); - void reset(); + // Provide additional cleanup logic for reset + virtual void doReset(); - protected: - std::weak_ptr mParent; - std::string mUniqueName; - size_t mMaxRetries{RETRY_A_FEW}; - size_t mRetries{0}; - State mState{WORK_PENDING}; - bool mScheduled{false}; + private: + std::list> mChildren; + std::list>::const_iterator mNextChild; - std::unique_ptr mRetryTimer; + std::shared_ptr yieldNextRunningChild(); + void addChild(std::shared_ptr child); + void clearChildren(); - std::function callComplete(); - void run(); - void complete(CompleteResult result); - void scheduleComplete(CompleteResult result = WORK_COMPLETE_OK); - void scheduleRetry(); - void scheduleRun(); - void - scheduleSuccess() - { - scheduleComplete(); - } - void - scheduleFailure() - { - scheduleComplete(WORK_COMPLETE_FAILURE); - } - void - scheduleFatalFailure() - { - scheduleComplete(WORK_COMPLETE_FATAL); - } + friend class WorkSequence; +}; - void setState(State s); +/* + * WorkSequence is a helper class, that implementers can use if they + * wish to enforce the order of work execution. It also allows parent works + * to construct more complex work trees by exposing public `addToSequence` + * method. + */ +class WorkSequence : public Work +{ + std::vector> mSequenceOfWork; + std::vector>::const_iterator mNextInSequence; - void notifyParent(); - virtual void notify(std::string const& childChanged) override; +public: + WorkSequence(Application& app, std::string name, + std::vector> sequence); + ~WorkSequence() = default; - private: - VirtualClock::duration getRetryDelay() const; +protected: + State doWork() final; + void doReset() final; }; -} +} \ No newline at end of file diff --git a/src/work/WorkScheduler.cpp b/src/work/WorkScheduler.cpp new file mode 100644 index 0000000000..97541fd4e0 --- /dev/null +++ b/src/work/WorkScheduler.cpp @@ -0,0 +1,64 @@ +// Copyright 2018 Stellar Development Foundation and contributors. Licensed +// under the Apache License, Version 2.0. See the COPYING file at the root +// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0 + +#include "work/WorkScheduler.h" +#include "lib/util/format.h" +#include "util/Logging.h" + +namespace stellar +{ +WorkScheduler::WorkScheduler(Application& app) + : Work(app, "work-scheduler", BasicWork::RETRY_NEVER) +{ +} + +WorkScheduler::~WorkScheduler() +{ +} + +std::shared_ptr +WorkScheduler::create(Application& app, std::function wakeUpCallback) +{ + auto work = std::shared_ptr(new WorkScheduler(app)); + work->restartWork(wakeUpCallback); + work->crankWork(); + return work; +}; + +void +WorkScheduler::onWakeUp() +{ + if (mScheduled) + { + return; + } + + std::weak_ptr weak( + std::static_pointer_cast(shared_from_this())); + mScheduled = true; + mApp.getClock().getIOService().post([weak]() { + auto self = weak.lock(); + if (!self) + { + return; + } + self->mScheduled = false; + self->crankWork(); + if (self->getState() == WORK_RUNNING) + { + self->onWakeUp(); + } + }); +} + +BasicWork::State +WorkScheduler::doWork() +{ + if (anyChildRunning()) + { + return BasicWork::WORK_RUNNING; + } + return BasicWork::WORK_WAITING; +} +} \ No newline at end of file diff --git a/src/work/WorkScheduler.h b/src/work/WorkScheduler.h new file mode 100644 index 0000000000..86744fd6eb --- /dev/null +++ b/src/work/WorkScheduler.h @@ -0,0 +1,51 @@ +// Copyright 2018 Stellar Development Foundation and contributors. Licensed +// under the Apache License, Version 2.0. See the COPYING file at the root +// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0 +#pragma once + +#include "main/Application.h" +#include "work/Work.h" + +namespace stellar +{ + +/** + * WorkScheduler is a top level Work, that is in charge of scheduling + * cranks (posting to the IO service); this is done via custom + * implementation of `onWakeUp`, which schedules a crank if necessary. + */ +class WorkScheduler : public Work +{ + WorkScheduler(Application& app); + bool mScheduled{false}; + + public: + virtual ~WorkScheduler(); + static std::shared_ptr create(Application& app, std::function wakeUpCallback = nullptr); + + template + std::shared_ptr + executeWork(Args&&... args) + { + auto work = addWork(std::forward(args)...); + auto& clock = mApp.getClock(); + while (!clock.getIOService().stopped() && !allChildrenDone()) + { + clock.crank(true); + } + return work; + } + + // TODO this probably needs a better name + template + std::shared_ptr + addWorkTree(Args&&... args) + { + return addWork(std::forward(args)...); + } + + protected: + void onWakeUp() override; + State doWork() override; +}; +} \ No newline at end of file diff --git a/src/work/WorkTests.cpp b/src/work/WorkTests.cpp index 1896d0ce2d..beb603b5ec 100644 --- a/src/work/WorkTests.cpp +++ b/src/work/WorkTests.cpp @@ -10,7 +10,9 @@ #include "test/test.h" #include "util/Fs.h" #include "work/WorkManager.h" +#include "work/WorkScheduler.h" +#include "historywork/RunCommandWork.h" #include #include #include @@ -52,6 +54,329 @@ TEST_CASE("work manager", "[work]") } } +class TestWork : public BasicWork +{ + bool mRetry; + + public: + int mSteps{3}; + int mRunningCount{0}; + int mSuccessCount{0}; + int mFailureCount{0}; + int mRetryCount{0}; + + TestWork(Application& app, std::function callback, std::string name, + bool retry = false) + : BasicWork(app, callback, name, RETRY_ONCE), mRetry(retry) + { + } + + BasicWork::State + onRun() override + { + if (mSteps-- > 0) + { + mRunningCount++; + return BasicWork::WORK_RUNNING; + } + return mRetry ? BasicWork::WORK_FAILURE_RETRY : BasicWork::WORK_SUCCESS; + } + + void + onSuccess() override + { + mSuccessCount++; + } + + void + onFailureRaise() override + { + mFailureCount++; + } + + void + onFailureRetry() override + { + mRetryCount++; + } + + void + onReset() override + { + mSteps = 3; + } +}; + +class TestWorkWaitForChildren : public Work +{ + bool mWorkAdded{false}; + + public: + TestWorkWaitForChildren(Application& app, std::function callback, + std::string name) + : Work(app, callback, name, RETRY_ONCE) + { + } + + BasicWork::State + doWork() override + { + if (allChildrenSuccessful()) + { + return BasicWork::WORK_SUCCESS; + } + + if (!mWorkAdded) + { + addWork("test-work"); + mWorkAdded = true; + } + return BasicWork::WORK_RUNNING; + } +}; + +TEST_CASE("Work scheduling", "[work]") +{ + VirtualClock clock; + Config const& cfg = getTestConfig(); + Application::pointer appPtr = createTestApplication(clock, cfg); + auto& wm = appPtr->getWorkScheduler(); + + auto checkSuccess = [](TestWork const& w) { + REQUIRE(w.mRunningCount == w.mSteps); + REQUIRE(w.mSuccessCount == 1); + REQUIRE(w.mFailureCount == 0); + REQUIRE(w.mRetryCount == 0); + }; + + SECTION("basic one work") + { + auto w = wm.addWorkTree("test-work"); + while (!wm.allChildrenSuccessful()) + { + clock.crank(); + } + checkSuccess(*w); + } + + SECTION("2 works round robin") + { + auto w1 = wm.addWorkTree("test-work1"); + auto w2 = wm.addWorkTree("test-work2"); + while (!wm.allChildrenSuccessful()) + { + clock.crank(); + } + checkSuccess(*w1); + checkSuccess(*w2); + } + + SECTION("another work added midway") + { + int crankCount = 0; + auto w1 = wm.addWorkTree("test-work1"); + auto w2 = wm.addWorkTree("test-work2"); + while (!wm.allChildrenSuccessful()) + { + if (++crankCount == 2) + { + wm.addWorkTree("test-work3"); + }; + clock.crank(); + } + checkSuccess(*w1); + checkSuccess(*w2); + } + + SECTION("new work wakes up scheduler") + { + // Scheduler wakes up to execute work, + // and goes back into a waiting state once child is done + while (wm.getState() != BasicWork::WORK_WAITING) + { + clock.crank(); + } + + auto w = wm.addWorkTree("test-work"); + REQUIRE(wm.getState() == BasicWork::WORK_RUNNING); + while (!wm.allChildrenSuccessful()) + { + clock.crank(); + } + checkSuccess(*w); + } + + SECTION("work retries and fails") + { + auto w = wm.addWorkTree("test-work", true); + while (!wm.allChildrenDone()) + { + clock.crank(); + } + REQUIRE(w->getState() == BasicWork::WORK_FAILURE_RAISE); + + // Work of 3 steps retried once + REQUIRE(w->mRunningCount == w->mSteps * 2); + REQUIRE(w->mSuccessCount == 0); + REQUIRE(w->mFailureCount == 1); + REQUIRE(w->mRetryCount == 1); + } +} + +TEST_CASE("work with children scheduling", "[work]") +{ + VirtualClock clock; + Config const& cfg = getTestConfig(); + Application::pointer appPtr = createTestApplication(clock, cfg); + auto& wm = appPtr->getWorkScheduler(); + + auto w1 = wm.addWorkTree("test-work1"); + auto w2 = wm.addWorkTree("test-work2"); + while (!wm.allChildrenSuccessful()) + { + clock.crank(); + } +} + +class TestRunCommandWork : public RunCommandWork +{ + std::string mCommand; + + public: + TestRunCommandWork(Application& app, std::function callback, + std::string name, std::string command) + : RunCommandWork(app, callback, name), mCommand(command) + { + } + ~TestRunCommandWork() override = default; + + RunCommandInfo + getCommand() override + { + return RunCommandInfo(mCommand, std::string()); + } +}; + +TEST_CASE("RunCommandWork test", "[work]") +{ + VirtualClock clock; + Config const& cfg = getTestConfig(); + Application::pointer appPtr = createTestApplication(clock, cfg); + auto& wm = appPtr->getWorkScheduler(); + + SECTION("one run command work") + { + wm.addWorkTree("test-run-command", "date"); + while (!wm.allChildrenSuccessful()) + { + clock.crank(); + } + } + SECTION("round robin with other work") + { + wm.addWorkTree("test-run-command", "date"); + wm.addWorkTree("test-work"); + while (!wm.allChildrenSuccessful()) + { + clock.crank(); + } + } + SECTION("invalid run command") + { + auto w = + wm.addWorkTree("test-run-command", "_invalid_"); + while (!wm.allChildrenDone()) + { + clock.crank(); + } + REQUIRE(w->getState() == BasicWork::WORK_FAILURE_RAISE); + } +} + +TEST_CASE("WorkSequence test", "[work]") +{ + VirtualClock clock; + Config const& cfg = getTestConfig(); + Application::pointer appPtr = createTestApplication(clock, cfg); + auto& wm = appPtr->getWorkScheduler(); + + auto work = wm.addWorkTree("test-work-sequence"); + int cranks = 0; + auto w1 = work->addToSequence("test-work-1"); + auto w2 = work->addToSequence("test-work-2"); + auto w3 = work->addToSequence("test-work-3"); + + SECTION("basic") + { + while (!wm.allChildrenSuccessful()) + { + if (!w1->mSuccessCount) + { + REQUIRE(!w2->mRunningCount); + REQUIRE(!w3->mRunningCount); + } + else if (!w2->mSuccessCount) + { + REQUIRE(w1->mSuccessCount); + REQUIRE(!w3->mRunningCount); + } + else + { + REQUIRE(w1->mSuccessCount); + REQUIRE(w2->mSuccessCount); + } + clock.crank(); + } + } + SECTION("new work added midway") + { + while (!wm.allChildrenSuccessful()) + { + if (++cranks == 2) + { + auto w4 = work->addToSequence("test-work-3"); + } + // TODO refactor duplication + if (!w1->mSuccessCount) + { + REQUIRE(!w2->mRunningCount); + REQUIRE(!w3->mRunningCount); + } + else if (!w2->mSuccessCount) + { + REQUIRE(w1->mSuccessCount); + REQUIRE(!w3->mRunningCount); + } + else + { + REQUIRE(w1->mSuccessCount); + REQUIRE(w2->mSuccessCount); + } + clock.crank(); + } + } +} + +TEST_CASE("WorkSequence work in sequence fails", "[work]") +{ + VirtualClock clock; + Config const& cfg = getTestConfig(); + Application::pointer appPtr = createTestApplication(clock, cfg); + auto& wm = appPtr->getWorkScheduler(); + + auto work = wm.addWorkTree("test-work-sequence"); + auto w1 = work->addToSequence("test-work-1", true); + auto w2 = work->addToSequence("test-work-2"); + auto w3 = work->addToSequence("test-work-3"); + + while (!wm.allChildrenDone()) + { + clock.crank(); + } + REQUIRE_FALSE(work->hasChildren()); + REQUIRE(work->getState() == BasicWork::WORK_FAILURE_RAISE); +} + class CountDownWork : public Work { size_t mCount;