Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New work interface #1801

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 32 additions & 59 deletions src/catchup/DownloadBucketsWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,82 +12,55 @@

namespace stellar
{

DownloadBucketsWork::DownloadBucketsWork(
Application& app, WorkParent& parent,
Application& app,
std::map<std::string, std::shared_ptr<Bucket>>& buckets,
std::vector<std::string> hashes, TmpDir const& downloadDir)
: Work{app, parent, "download-and-verify-buckets"}
: Work{app, "download-and-verify-buckets"}
, mBuckets{buckets}
, mHashes{std::move(hashes)}
, mDownloadDir{downloadDir}
, mDownloadBucketStart{app.getMetrics().NewMeter(
{"history", "download-bucket", "start"}, "event")}
, mDownloadBucketSuccess{app.getMetrics().NewMeter(
{"history", "download-bucket", "success"}, "event")}
, mDownloadBucketFailure{app.getMetrics().NewMeter(
{"history", "download-bucket", "failure"}, "event")}
{
}

DownloadBucketsWork::~DownloadBucketsWork()
BasicWork::State
DownloadBucketsWork::doWork()
{
clearChildren();
}

std::string
DownloadBucketsWork::getStatus() const
{
if (mState == WORK_RUNNING || mState == WORK_PENDING)
if (!mChildrenStarted)
{
return "downloading buckets";
}
return Work::getStatus();
}

void
DownloadBucketsWork::onReset()
{
clearChildren();
for (auto const& hash : mHashes)
{
FileTransferInfo ft(mDownloadDir, HISTORY_FILE_TYPE_BUCKET, hash);

for (auto const& hash : mHashes)
auto w1 = std::make_shared<GetAndUnzipRemoteFileWork>(mApp, ft);
auto w2 = std::make_shared<VerifyBucketWork>(
mApp, mBuckets, ft.localPath_nogz(), hexToBin256(hash));
std::vector<std::shared_ptr<BasicWork>> seq{w1, w2};
addWork<WorkSequence>("download-verify-sequence-" + hash, seq);
}
mChildrenStarted = true;
}
else
{
FileTransferInfo ft(mDownloadDir, HISTORY_FILE_TYPE_BUCKET, hash);
// Each bucket gets its own work-chain of
// download->gunzip->verify

auto verify = addWork<VerifyBucketWork>(mBuckets, ft.localPath_nogz(),
hexToBin256(hash));
verify->addWork<GetAndUnzipRemoteFileWork>(ft);
mDownloadBucketStart.Mark();
if (allChildrenSuccessful())
{
return WORK_SUCCESS;
}
else if (anyChildRaiseFailure())
{
return WORK_FAILURE_RETRY;
}
else if (!anyChildRunning())
{
return WORK_WAITING;
marta-lokhova marked this conversation as resolved.
Show resolved Hide resolved
}
}
return WORK_RUNNING;
}

void
DownloadBucketsWork::notify(std::string const& child)
DownloadBucketsWork::doReset()
{
auto i = mChildren.find(child);
if (i == mChildren.end())
{
CLOG(WARNING, "Work")
<< "DownloadBucketsWork notified by unknown child " << child;
return;
}

switch (i->second->getState())
{
case Work::WORK_SUCCESS:
mDownloadBucketSuccess.Mark();
break;
case Work::WORK_FAILURE_RETRY:
case Work::WORK_FAILURE_FATAL:
case Work::WORK_FAILURE_RAISE:
mDownloadBucketFailure.Mark();
break;
default:
break;
}

advance();
mChildrenStarted = false;
}
}
16 changes: 7 additions & 9 deletions src/catchup/DownloadBucketsWork.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,17 @@ class DownloadBucketsWork : public Work
std::map<std::string, std::shared_ptr<Bucket>> mBuckets;
std::vector<std::string> mHashes;
TmpDir const& mDownloadDir;

medida::Meter& mDownloadBucketStart;
medida::Meter& mDownloadBucketSuccess;
medida::Meter& mDownloadBucketFailure;
bool mChildrenStarted{false};

public:
DownloadBucketsWork(Application& app, WorkParent& parent,
DownloadBucketsWork(Application& app,
std::map<std::string, std::shared_ptr<Bucket>>& buckets,
std::vector<std::string> hashes,
TmpDir const& downloadDir);
~DownloadBucketsWork();
std::string getStatus() const override;
void onReset() override;
void notify(std::string const& child) override;
~DownloadBucketsWork() = default;

protected:
BasicWork::State doWork() override;
void doReset() override;
};
}
4 changes: 4 additions & 0 deletions src/history/HistoryTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@
#include "test/test.h"
#include "util/Fs.h"
#include "work/WorkManager.h"
#include "work/WorkScheduler.h"

#include "catchup/DownloadBucketsWork.h"
#include "historywork/GetHistoryArchiveStateWork.h"
#include "historywork/VerifyBucketWork.h"
#include <lib/catch.hpp>
#include <lib/util/format.h>

Expand Down
155 changes: 98 additions & 57 deletions src/historywork/GetAndUnzipRemoteFileWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,21 @@

namespace stellar
{

GetAndUnzipRemoteFileWork::GetAndUnzipRemoteFileWork(
Application& app, WorkParent& parent, FileTransferInfo ft,
Application& app, std::function<void()> callback, FileTransferInfo ft,
std::shared_ptr<HistoryArchive> archive, size_t maxRetries)
: Work(app, parent,
: Work(app, callback,
std::string("get-and-unzip-remote-file ") + ft.remoteName(),
maxRetries)
, mFt(std::move(ft))
, mArchive(archive)
{
}

GetAndUnzipRemoteFileWork::~GetAndUnzipRemoteFileWork()
{
clearChildren();
}

std::string
GetAndUnzipRemoteFileWork::getStatus() const
{
if (mState == WORK_PENDING)
if (!isDone())
{
if (mGunzipFileWork)
{
Expand All @@ -41,86 +35,133 @@ GetAndUnzipRemoteFileWork::getStatus() const
return mGetRemoteFileWork->getStatus();
}
}
return Work::getStatus();
return BasicWork::getStatus();
}

void
GetAndUnzipRemoteFileWork::onReset()
GetAndUnzipRemoteFileWork::doReset()
{
clearChildren();
mGetRemoteFileWork.reset();
mGunzipFileWork.reset();
}

CLOG(DEBUG, "History") << "Downloading and unzipping " << mFt.remoteName()
<< ": downloading";
mGetRemoteFileWork = addWork<GetRemoteFileWork>(
mFt.remoteName(), mFt.localPath_gz_tmp(), nullptr, RETRY_NEVER);
void
GetAndUnzipRemoteFileWork::onFailureRaise()
{
std::remove(mFt.localPath_nogz().c_str());
std::remove(mFt.localPath_gz().c_str());
std::remove(mFt.localPath_gz_tmp().c_str());
Work::onFailureRaise();
}

Work::State
GetAndUnzipRemoteFileWork::onSuccess()
void
GetAndUnzipRemoteFileWork::onFailureRetry()
{
std::remove(mFt.localPath_nogz().c_str());
std::remove(mFt.localPath_gz().c_str());
std::remove(mFt.localPath_gz_tmp().c_str());
Work::onFailureRetry();
}

BasicWork::State
GetAndUnzipRemoteFileWork::doWork()
{
CLOG(DEBUG, "History") << "Downloading and unzipping " << mFt.remoteName();

if (mGunzipFileWork)
{
if (!fs::exists(mFt.localPath_nogz()))
// Download completed, unzipping started

auto state = mGunzipFileWork->getState();
if (state == WORK_SUCCESS)
{
CLOG(ERROR, "History") << "Downloading and unzipping "
<< mFt.remoteName() << ": .xdr not found";
return WORK_FAILURE_RETRY;
assert(mGetRemoteFileWork);
assert(mGetRemoteFileWork->getState() == WORK_SUCCESS);
if (!fs::exists(mFt.localPath_nogz()))
{
CLOG(ERROR, "History")
<< "Downloading and unzipping " << mFt.remoteName()
<< ": .xdr not found";
return WORK_FAILURE_RETRY;
}
return WORK_SUCCESS;
}
else
else if (state == WORK_FAILURE_RAISE)
{
return WORK_SUCCESS;
return WORK_FAILURE_RETRY;
}
return state;
}

if (fs::exists(mFt.localPath_gz_tmp()))
else if (mGetRemoteFileWork)
{
CLOG(TRACE, "History")
<< "Downloading and unzipping " << mFt.remoteName()
<< ": renaming .gz.tmp to .gz";
if (fs::exists(mFt.localPath_gz()) &&
std::remove(mFt.localPath_gz().c_str()))
// Download started

auto state = mGetRemoteFileWork->getState();
if (state == WORK_SUCCESS)
{
CLOG(ERROR, "History")
<< "Downloading and unzipping " << mFt.remoteName()
<< ": failed to remove .gz";
return WORK_FAILURE_RETRY;
}
if (!validateFile())
{
return WORK_FAILURE_RETRY;
}

if (std::rename(mFt.localPath_gz_tmp().c_str(),
mFt.localPath_gz().c_str()))
mGunzipFileWork =
addWork<GunzipFileWork>(mFt.localPath_gz(), false, RETRY_NEVER);
return WORK_RUNNING;
}
else if (state == WORK_FAILURE_RAISE)
{
CLOG(ERROR, "History")
<< "Downloading and unzipping " << mFt.remoteName()
<< ": failed to rename .gz.tmp to .gz";
return WORK_FAILURE_RETRY;
}
return state;
}
else
{
// Download hasn't started

mGetRemoteFileWork = addWork<GetRemoteFileWork>(
mFt.remoteName(), mFt.localPath_gz_tmp(), nullptr, RETRY_NEVER);
return WORK_RUNNING;
}
}

CLOG(TRACE, "History")
bool
GetAndUnzipRemoteFileWork::validateFile()
{
if (!fs::exists(mFt.localPath_gz_tmp()))
{
CLOG(ERROR, "History") << "Downloading and unzipping "
<< mFt.remoteName() << ": .tmp file not found";
return false;
}

CLOG(TRACE, "History") << "Downloading and unzipping " << mFt.remoteName()
<< ": renaming .gz.tmp to .gz";
if (fs::exists(mFt.localPath_gz()) &&
std::remove(mFt.localPath_gz().c_str()))
{
CLOG(ERROR, "History") << "Downloading and unzipping "
<< mFt.remoteName() << ": failed to remove .gz";
return false;
}

if (std::rename(mFt.localPath_gz_tmp().c_str(), mFt.localPath_gz().c_str()))
{
CLOG(ERROR, "History")
<< "Downloading and unzipping " << mFt.remoteName()
<< ": renamed .gz.tmp to .gz";
<< ": failed to rename .gz.tmp to .gz";
return false;
}

CLOG(TRACE, "History") << "Downloading and unzipping " << mFt.remoteName()
<< ": renamed .gz.tmp to .gz";

if (!fs::exists(mFt.localPath_gz()))
{
CLOG(ERROR, "History") << "Downloading and unzipping "
<< mFt.remoteName() << ": .gz not found";
return WORK_FAILURE_RETRY;
return false;
}

CLOG(DEBUG, "History") << "Downloading and unzipping " << mFt.remoteName()
<< ": unzipping";
mGunzipFileWork =
addWork<GunzipFileWork>(mFt.localPath_gz(), false, RETRY_NEVER);
return WORK_PENDING;
}

void
GetAndUnzipRemoteFileWork::onFailureRaise()
{
std::remove(mFt.localPath_nogz().c_str());
std::remove(mFt.localPath_gz().c_str());
std::remove(mFt.localPath_gz_tmp().c_str());
return true;
}
}
Loading