Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New work interface #1801

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 26 additions & 59 deletions src/catchup/DownloadBucketsWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,82 +12,49 @@

namespace stellar
{

DownloadBucketsWork::DownloadBucketsWork(
Application& app, WorkParent& parent,
Application& app, std::function<void()> callback,
std::map<std::string, std::shared_ptr<Bucket>>& buckets,
std::vector<std::string> hashes, TmpDir const& downloadDir)
: Work{app, parent, "download-and-verify-buckets"}
: Work{app, callback, "download-and-verify-buckets"}
, mBuckets{buckets}
, mHashes{std::move(hashes)}
, mDownloadDir{downloadDir}
, mDownloadBucketStart{app.getMetrics().NewMeter(
{"history", "download-bucket", "start"}, "event")}
, mDownloadBucketSuccess{app.getMetrics().NewMeter(
{"history", "download-bucket", "success"}, "event")}
, mDownloadBucketFailure{app.getMetrics().NewMeter(
{"history", "download-bucket", "failure"}, "event")}
{
}

DownloadBucketsWork::~DownloadBucketsWork()
BasicWork::State
DownloadBucketsWork::doWork()
{
clearChildren();
}

std::string
DownloadBucketsWork::getStatus() const
{
if (mState == WORK_RUNNING || mState == WORK_PENDING)
if (!mChildrenStarted)
{
return "downloading buckets";
for (auto const& hash : mHashes)
{
FileTransferInfo ft(mDownloadDir, HISTORY_FILE_TYPE_BUCKET, hash);
auto verify = addWork<VerifyBucketWork>(
mBuckets, ft.localPath_nogz(), hexToBin256(hash));
verify->addWork<GetAndUnzipRemoteFileWork>(ft);
}
mChildrenStarted = true;
}
return Work::getStatus();
}

void
DownloadBucketsWork::onReset()
{
clearChildren();

for (auto const& hash : mHashes)
else
{
FileTransferInfo ft(mDownloadDir, HISTORY_FILE_TYPE_BUCKET, hash);
// Each bucket gets its own work-chain of
// download->gunzip->verify

auto verify = addWork<VerifyBucketWork>(mBuckets, ft.localPath_nogz(),
hexToBin256(hash));
verify->addWork<GetAndUnzipRemoteFileWork>(ft);
mDownloadBucketStart.Mark();
if (allChildrenSuccessful())
{
return WORK_SUCCESS;
}
if (anyChildRaiseFailure() || anyChildFatalFailure())
{
return WORK_FAILURE_RETRY;
}
}
return WORK_RUNNING;
}

void
DownloadBucketsWork::notify(std::string const& child)
DownloadBucketsWork::onReset()
{
auto i = mChildren.find(child);
if (i == mChildren.end())
{
CLOG(WARNING, "Work")
<< "DownloadBucketsWork notified by unknown child " << child;
return;
}

switch (i->second->getState())
{
case Work::WORK_SUCCESS:
mDownloadBucketSuccess.Mark();
break;
case Work::WORK_FAILURE_RETRY:
case Work::WORK_FAILURE_FATAL:
case Work::WORK_FAILURE_RAISE:
mDownloadBucketFailure.Mark();
break;
default:
break;
}

advance();
mChildrenStarted = false;
Work::onReset();
}
}
14 changes: 6 additions & 8 deletions src/catchup/DownloadBucketsWork.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,17 @@ class DownloadBucketsWork : public Work
std::map<std::string, std::shared_ptr<Bucket>> mBuckets;
std::vector<std::string> mHashes;
TmpDir const& mDownloadDir;

medida::Meter& mDownloadBucketStart;
medida::Meter& mDownloadBucketSuccess;
medida::Meter& mDownloadBucketFailure;
bool mChildrenStarted{false};

public:
DownloadBucketsWork(Application& app, WorkParent& parent,
DownloadBucketsWork(Application& app, std::function<void()> callback,
std::map<std::string, std::shared_ptr<Bucket>>& buckets,
std::vector<std::string> hashes,
TmpDir const& downloadDir);
~DownloadBucketsWork();
std::string getStatus() const override;
~DownloadBucketsWork() = default;

protected:
BasicWork::State doWork() override;
void onReset() override;
void notify(std::string const& child) override;
};
}
99 changes: 55 additions & 44 deletions src/historywork/VerifyBucketWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,37 +16,77 @@

namespace stellar
{

VerifyBucketWork::VerifyBucketWork(
Application& app, WorkParent& parent,
Application& app, std::function<void()> callback,
std::map<std::string, std::shared_ptr<Bucket>>& buckets,
std::string const& bucketFile, uint256 const& hash)
: Work(app, parent, std::string("verify-bucket-hash ") + bucketFile,
RETRY_NEVER)
: Work(app, callback, "verify-bucket-hash-" + bucketFile, RETRY_NEVER)
, mBuckets(buckets)
, mBucketFile(bucketFile)
, mHash(hash)
, mVerifyBucketSuccess{app.getMetrics().NewMeter(
{"history", "verify-bucket", "success"}, "event")}
, mVerifyBucketFailure{app.getMetrics().NewMeter(
{"history", "verify-bucket", "failure"}, "event")}
{
fs::checkNoGzipSuffix(mBucketFile);
}

VerifyBucketWork::~VerifyBucketWork()
{
clearChildren();
}

BasicWork::State
VerifyBucketWork::doWork()
{
// FIXME (mlo) this a logical problem, as this check shouldn't be here
// VerifyBucketWork shouldn't be required to know that it depends on
// GetAndUnzipRemoteFileWork. This will be resolved with batching though,
// as DownloadBuckets work will take care of when to dispatch download
// and verify tasks.
marta-lokhova marked this conversation as resolved.
Show resolved Hide resolved
if (!allChildrenSuccessful())
{
return BasicWork::WORK_RUNNING;
}

if (mDone)
{
if (mEc)
{
return WORK_FAILURE_RETRY;
}

adoptBucket();
return WORK_SUCCESS;
}

spawnVerifier();
return WORK_WAITING;
}

void
VerifyBucketWork::adoptBucket()
{
assert(mDone);
assert(!mEc);

auto b = mApp.getBucketManager().adoptFileAsBucket(mBucketFile, mHash);
mBuckets[binToHex(mHash)] = b;
}

void
VerifyBucketWork::onStart()
VerifyBucketWork::spawnVerifier()
{
std::string filename = mBucketFile;
uint256 hash = mHash;
Application& app = this->mApp;
auto handler = callComplete();
app.getWorkerIOService().post([&app, filename, handler, hash]() {
std::weak_ptr<VerifyBucketWork> weak(
std::static_pointer_cast<VerifyBucketWork>(shared_from_this()));
auto handler = [weak](asio::error_code const& ec) {
auto self = weak.lock();
if (self)
{
self->mEc = ec;
self->mDone = true;
self->wakeUp();
}
};

mApp.getWorkerIOService().post([filename, handler, hash]() {
auto hasher = SHA256::create();
asio::error_code ec;
char buf[4096];
Expand Down Expand Up @@ -75,36 +115,7 @@ VerifyBucketWork::onStart()
ec = std::make_error_code(std::errc::io_error);
}
}
app.getClock().getIOService().post([ec, handler]() { handler(ec); });
handler(ec);
});
}

void
VerifyBucketWork::onRun()
{
// Do nothing: we spawned the verifier in onStart().
}

Work::State
VerifyBucketWork::onSuccess()
{
auto b = mApp.getBucketManager().adoptFileAsBucket(mBucketFile, mHash);
mBuckets[binToHex(mHash)] = b;
mVerifyBucketSuccess.Mark();
return WORK_SUCCESS;
}

void
VerifyBucketWork::onFailureRetry()
{
mVerifyBucketFailure.Mark();
Work::onFailureRetry();
}

void
VerifyBucketWork::onFailureRaise()
{
mVerifyBucketFailure.Mark();
Work::onFailureRaise();
}
}
16 changes: 8 additions & 8 deletions src/historywork/VerifyBucketWork.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,19 @@ class VerifyBucketWork : public Work
std::map<std::string, std::shared_ptr<Bucket>>& mBuckets;
std::string mBucketFile;
uint256 mHash;
bool mDone{false};
std::error_code mEc;

medida::Meter& mVerifyBucketSuccess;
medida::Meter& mVerifyBucketFailure;
void adoptBucket();
void spawnVerifier();

public:
VerifyBucketWork(Application& app, WorkParent& parent,
VerifyBucketWork(Application& app, std::function<void()> callback,
std::map<std::string, std::shared_ptr<Bucket>>& buckets,
std::string const& bucketFile, uint256 const& hash);
~VerifyBucketWork();
void onRun() override;
void onStart() override;
Work::State onSuccess() override;
void onFailureRetry() override;
void onFailureRaise() override;

protected:
BasicWork::State doWork() override;
};
}