diff --git a/src/historywork/RunCommandWork.cpp b/src/historywork/RunCommandWork.cpp index 9e9ee7591b..8043265ca7 100644 --- a/src/historywork/RunCommandWork.cpp +++ b/src/historywork/RunCommandWork.cpp @@ -28,7 +28,8 @@ RunCommandWork::onStart() if (!cmd.empty()) { auto exit = mApp.getProcessManager().runProcess(cmd, outfile); - exit.async_wait(callComplete()); + exit->async_wait(callComplete()); + mExitEvent = std::weak_ptr(exit); } else { @@ -41,4 +42,15 @@ RunCommandWork::onRun() { // Do nothing: we ran the command in onStart(). } + +void +RunCommandWork::onAbort(CompleteResult result) +{ + auto ptr = mExitEvent.lock(); + if (ptr) + { + mApp.getProcessManager().shutdownProcess(ptr); + } + Work::onAbort(result); +} } diff --git a/src/historywork/RunCommandWork.h b/src/historywork/RunCommandWork.h index e5017a5cba..166a4f6364 100644 --- a/src/historywork/RunCommandWork.h +++ b/src/historywork/RunCommandWork.h @@ -4,6 +4,7 @@ // under the Apache License, Version 2.0. See the COPYING file at the root // of this distribution or at http://www.apache.org/licenses/LICENSE-2.0 +#include "process/ProcessManager.h" #include "work/Work.h" namespace stellar @@ -18,6 +19,7 @@ namespace stellar class RunCommandWork : public Work { virtual void getCommand(std::string& cmdLine, std::string& outFile) = 0; + std::weak_ptr mExitEvent; public: RunCommandWork(Application& app, WorkParent& parent, @@ -26,5 +28,6 @@ class RunCommandWork : public Work ~RunCommandWork(); void onStart() override; void onRun() override; + void onAbort(CompleteResult result) override; }; } diff --git a/src/process/ProcessManager.h b/src/process/ProcessManager.h index 9f58a9b61b..917299669f 100644 --- a/src/process/ProcessManager.h +++ b/src/process/ProcessManager.h @@ -49,11 +49,12 @@ class ProcessManager : public std::enable_shared_from_this, { public: static std::shared_ptr create(Application& app); - virtual ProcessExitEvent runProcess(std::string const& cmdLine, - std::string outputFile = "") = 0; + virtual std::shared_ptr + runProcess(std::string const& cmdLine, std::string outputFile = "") = 0; virtual size_t getNumRunningProcesses() = 0; virtual bool isShutdown() const = 0; virtual void shutdown() = 0; + virtual void shutdownProcess(std::shared_ptr pe) = 0; virtual ~ProcessManager() { } diff --git a/src/process/ProcessManagerImpl.cpp b/src/process/ProcessManagerImpl.cpp index 4138785b0a..27d7e1467d 100644 --- a/src/process/ProcessManagerImpl.cpp +++ b/src/process/ProcessManagerImpl.cpp @@ -107,16 +107,16 @@ ProcessManagerImpl::~ProcessManagerImpl() forceShutdown(impl); }; // Use SIGKILL on any processes we already used SIGINT on - while (!mKillableImpls.empty()) + while (!mKillable.empty()) { - auto impl = std::move(mKillableImpls.front()); - mKillableImpls.pop_front(); - killProcess(*impl); + auto pe = std::move(mKillable.front()); + mKillable.pop_front(); + killProcess(*pe->mImpl); } // Use SIGKILL on any processes we haven't politely asked to exit yet for (auto& pair : mImpls) { - killProcess(*pair.second); + killProcess(*pair.second->mImpl); } } @@ -136,20 +136,20 @@ ProcessManagerImpl::shutdown() // Cancel all pending. std::lock_guard guard(mImplsMutex); - for (auto& pending : mPendingImpls) + for (auto& pending : mPending) { - pending->cancel(ec); + pending->mImpl->cancel(ec); } - mPendingImpls.clear(); + mPending.clear(); // Cancel all running. for (auto& pair : mImpls) { // Mark it as "ready to be killed" - mKillableImpls.push_back(pair.second); + mKillable.push_back(pair.second); // Cancel any pending events and shut down the process cleanly - pair.second->cancel(ec); - cleanShutdown(*pair.second); + pair.second->mImpl->cancel(ec); + cleanShutdown(*pair.second->mImpl); } mImpls.clear(); gNumProcessesActive = 0; @@ -159,6 +159,41 @@ ProcessManagerImpl::shutdown() } } +void +ProcessManagerImpl::shutdownProcess(std::shared_ptr pev) +{ + // Shutdown a ProcessExitEvent, which could be either in pending queue + // or running + + auto ec = ABORT_ERROR_CODE; + auto impl = pev->mImpl; + + std::lock_guard guard(mImplsMutex); + auto pendingIt = find(mPending.begin(), mPending.end(), pev); + if (pendingIt != mPending.end()) + { + (*pendingIt)->mImpl->cancel(ec); + mPending.erase(pendingIt); + } + else + { + auto pid = impl->getProcessId(); + auto runningIt = mImpls.find(pid); + if (runningIt != mImpls.end()) + { + impl->cancel(ec); + forceShutdown(*runningIt->second->mImpl); + mImpls.erase(pid); + gNumProcessesActive--; + } + else + { + CLOG(DEBUG, "Process") + << "failed to find pending or running process"; + } + } +} + #ifdef _WIN32 #include #include @@ -388,7 +423,7 @@ ProcessManagerImpl::handleProcessTermination(int pid, int status) CLOG(DEBUG, "Process") << "failed to find process with pid " << pid; return; } - auto impl = pair->second; + auto impl = pair->second->mImpl; asio::error_code ec; if (WIFEXITED(status)) @@ -533,7 +568,7 @@ ProcessExitEvent::Impl::run() #endif -ProcessExitEvent +std::shared_ptr ProcessManagerImpl::runProcess(std::string const& cmdLine, std::string outFile) { std::lock_guard guard(mImplsMutex); @@ -543,10 +578,12 @@ ProcessManagerImpl::runProcess(std::string const& cmdLine, std::string outFile) std::weak_ptr weakSelf(self); pe.mImpl = std::make_shared( pe.mTimer, pe.mEc, cmdLine, outFile, weakSelf); - mPendingImpls.push_back(pe.mImpl); + + auto pePtr = std::make_shared(pe); + mPending.push_back(pePtr); maybeRunPendingProcesses(); - return pe; + return pePtr; } void @@ -557,23 +594,28 @@ ProcessManagerImpl::maybeRunPendingProcesses() return; } std::lock_guard guard(mImplsMutex); - while (!mPendingImpls.empty() && gNumProcessesActive < mMaxProcesses) + while (!mPending.empty() && gNumProcessesActive < mMaxProcesses) { - auto i = mPendingImpls.front(); - mPendingImpls.pop_front(); + auto i = std::move(mPending.front()); + mPending.pop_front(); + if (!i || !i->mImpl) + { + throw std::runtime_error( + "Cannot start process: invalid ProcessExitEvent"); + } try { - CLOG(DEBUG, "Process") << "Running: " << i->mCmdLine; + CLOG(DEBUG, "Process") << "Running: " << i->mImpl->mCmdLine; - i->run(); - mImpls[i->getProcessId()] = i; + i->mImpl->run(); + mImpls[i->mImpl->getProcessId()] = i; ++gNumProcessesActive; } catch (std::runtime_error& e) { - i->cancel(std::make_error_code(std::errc::io_error)); + i->mImpl->cancel(std::make_error_code(std::errc::io_error)); CLOG(ERROR, "Process") << "Error starting process: " << e.what(); - CLOG(ERROR, "Process") << "When running: " << i->mCmdLine; + CLOG(ERROR, "Process") << "When running: " << i->mImpl->mCmdLine; } } } diff --git a/src/process/ProcessManagerImpl.h b/src/process/ProcessManagerImpl.h index 6ecd0b9a89..c230cb4c39 100644 --- a/src/process/ProcessManagerImpl.h +++ b/src/process/ProcessManagerImpl.h @@ -27,14 +27,14 @@ class ProcessManagerImpl : public ProcessManager // Subprocesses will be removed asynchronously, hence the lock on // just this member std::recursive_mutex mImplsMutex; - std::map> mImpls; + std::map> mImpls; bool mIsShutdown{false}; size_t mMaxProcesses; asio::io_service& mIOService; - std::deque> mPendingImpls; - std::deque> mKillableImpls; + std::deque> mPending; + std::deque> mKillable; void maybeRunPendingProcesses(); // These are only used on POSIX, but they're harmless here. @@ -49,12 +49,13 @@ class ProcessManagerImpl : public ProcessManager public: ProcessManagerImpl(Application& app); - ProcessExitEvent runProcess(std::string const& cmdLine, - std::string outFile = "") override; + std::shared_ptr + runProcess(std::string const& cmdLine, std::string outFile = "") override; size_t getNumRunningProcesses() override; bool isShutdown() const override; void shutdown() override; + void shutdownProcess(std::shared_ptr pe) override; ~ProcessManagerImpl() override; }; diff --git a/src/process/ProcessTests.cpp b/src/process/ProcessTests.cpp index f2a0aa92d2..8e1b0aac7d 100644 --- a/src/process/ProcessTests.cpp +++ b/src/process/ProcessTests.cpp @@ -28,7 +28,7 @@ TEST_CASE("subprocess", "[process]") auto evt = app->getProcessManager().runProcess("hostname"); bool exited = false; bool failed = false; - evt.async_wait([&](asio::error_code ec) { + evt->async_wait([&](asio::error_code ec) { CLOG(DEBUG, "Process") << "process exited: " << ec; if (ec) { @@ -53,7 +53,7 @@ TEST_CASE("subprocess fails", "[process]") auto evt = app->getProcessManager().runProcess("hostname -xsomeinvalid"); bool exited = false; bool failed = false; - evt.async_wait([&](asio::error_code ec) { + evt->async_wait([&](asio::error_code ec) { CLOG(DEBUG, "Process") << "process exited: " << ec; if (ec) { @@ -79,7 +79,7 @@ TEST_CASE("subprocess redirect to file", "[process]") std::string filename("hostname.txt"); auto evt = app.getProcessManager().runProcess("hostname", filename); bool exited = false; - evt.async_wait([&](asio::error_code ec) { + evt->async_wait([&](asio::error_code ec) { CLOG(DEBUG, "Process") << "process exited: " << ec; if (ec) { @@ -127,7 +127,7 @@ TEST_CASE("subprocess storm", "[process]") out << i; } auto evt = app.getProcessManager().runProcess("mv " + src + " " + dst); - evt.async_wait([&](asio::error_code ec) { + evt->async_wait([&](asio::error_code ec) { CLOG(INFO, "Process") << "process exited: " << ec; if (ec) { @@ -171,14 +171,14 @@ TEST_CASE("shutdown while process running", "[process]") #else std::string command = "sleep 10"; #endif - std::vector events = { + std::vector> events = { app1->getProcessManager().runProcess(command), app2->getProcessManager().runProcess(command)}; std::vector errorCodes; size_t exitedCount = 0; for (auto& event : events) { - event.async_wait([&](asio::error_code ec) { + event->async_wait([&](asio::error_code ec) { CLOG(DEBUG, "Process") << "process exited: " << ec; if (ec) { @@ -192,17 +192,90 @@ TEST_CASE("shutdown while process running", "[process]") // Wait, just in case the processes haven't started yet std::this_thread::sleep_for(std::chrono::seconds(1)); - // Shutdown so we force the command execution to fail - app1->getProcessManager().shutdown(); - app2->getProcessManager().shutdown(); + SECTION("shutdown process manager") + { + // Shutdown so we force the command execution to fail + app1->getProcessManager().shutdown(); + app2->getProcessManager().shutdown(); - while (exitedCount < events.size() && !clock.getIOService().stopped()) + while (exitedCount < events.size() && !clock.getIOService().stopped()) + { + clock.crank(true); + } + REQUIRE(exitedCount == events.size()); + for (auto const& errorCode : errorCodes) + { + REQUIRE(errorCode == asio::error::operation_aborted); + } + } + SECTION("shutdown single process") { - clock.crank(true); + // Shutdown processes individually + app1->getProcessManager().shutdownProcess(events[0]); + while (exitedCount < 1 && !clock.getIOService().stopped()) + { + clock.crank(true); + } + REQUIRE(exitedCount == 1); + REQUIRE(errorCodes.size() == 1); + REQUIRE(errorCodes[0] == asio::error::operation_aborted); + + app2->getProcessManager().shutdownProcess(events[1]); + while (exitedCount < 2 && !clock.getIOService().stopped()) + { + clock.crank(true); + } + REQUIRE(exitedCount == 2); + REQUIRE(errorCodes.size() == 2); + REQUIRE(errorCodes[1] == asio::error::operation_aborted); + } +} + +TEST_CASE("remove pending ProcessExitEvent", "[process]") +{ + // Shutdown ProcessExitEvent while it is in pending + // queue, meaning process hasn't spawned yet + + VirtualClock clock; + auto const& cfg1 = getTestConfig(0); + auto app = createTestApplication(clock, cfg1); +#ifdef _WIN32 + std::string command = "waitfor /T 10 pause"; +#else + std::string command = "sleep 10"; +#endif + auto maxProcesses = app->getConfig().MAX_CONCURRENT_SUBPROCESSES; + auto count = 0; + std::deque> events; + + // Run extra process outside of capacity so that it's in pending queue + while (count < maxProcesses + 1) + { + events.push_back(app->getProcessManager().runProcess(command)); + count++; } - REQUIRE(exitedCount == events.size()); - for (auto const& errorCode : errorCodes) + + asio::error_code errorCode; + bool exited = false; + events.back()->async_wait([&](asio::error_code ec) { + exited = true; + errorCode = ec; + }); + + // Wait, just in case the processes haven't started yet + std::this_thread::sleep_for(std::chrono::seconds(1)); + + app->getProcessManager().shutdownProcess(events.back()); + while (!exited && !clock.getIOService().stopped()) { - REQUIRE(errorCode == asio::error::operation_aborted); + clock.crank(true); } + + REQUIRE(exited); + REQUIRE(errorCode == asio::error::operation_aborted); + // Running processes are not affected + REQUIRE(app->getProcessManager().getNumRunningProcesses() == maxProcesses); + + // Clean up + app->getProcessManager().shutdown(); } diff --git a/src/work/Work.cpp b/src/work/Work.cpp index 489f1f3d6d..a31f4ea891 100644 --- a/src/work/Work.cpp +++ b/src/work/Work.cpp @@ -75,6 +75,8 @@ Work::getStatus() const case WORK_FAILURE_RAISE: case WORK_FAILURE_FATAL: return fmt::format("Failed: {:s}", getUniqueName()); + case WORK_FAILURE_ABORTED: + return fmt::format("Aborted: {:s}", getUniqueName()); default: assert(false); return ""; @@ -121,6 +123,8 @@ Work::stateName(State st) return "WORK_FAILURE_RAISE"; case WORK_FAILURE_FATAL: return "WORK_FAILURE_FATAL"; + case WORK_FAILURE_ABORTED: + return "WORK_FAILURE_ABORTED"; default: throw std::runtime_error("Unknown Work::State"); } @@ -137,14 +141,31 @@ Work::callComplete() { return; } - self->complete(ec ? WORK_COMPLETE_FAILURE : WORK_COMPLETE_OK); + auto status = ec ? WORK_COMPLETE_FAILURE : WORK_COMPLETE_OK; + if (self->mAborting) + { + status = WORK_COMPLETE_ABORTED; + } + self->complete(status); }; } +void +Work::scheduleAbort(CompleteResult result) +{ + if (result != WORK_COMPLETE_FATAL && result != WORK_COMPLETE_FAILURE && + result != WORK_COMPLETE_ABORTED) + { + CLOG(ERROR, "Work") << "Cannot schedule abort with non-failure state"; + return; + } + scheduleComplete(result); +} + void Work::scheduleRun() { - if (mScheduled) + if (mScheduled || mAborting) { return; } @@ -183,6 +204,7 @@ Work::scheduleComplete(CompleteResult result) return; } self->mScheduled = false; + self->mAborting = false; self->complete(result); }); } @@ -190,7 +212,7 @@ Work::scheduleComplete(CompleteResult result) void Work::scheduleRetry() { - if (mScheduled) + if (mScheduled || mAborting) { return; } @@ -249,31 +271,59 @@ Work::advance() } CLOG(DEBUG, "Work") << "advancing " << getUniqueName(); - advanceChildren(); - if (allChildrenSuccessful()) + + // If necessary, propagate abort signal before advancing children + // This is to prevent scheduling any children to run if they are about + // to be in WORK_ABORTING state (such children are scheduled to abort + // properly instead) + if (anyChildFatalFailure()) { - CLOG(DEBUG, "Work") << "all " << mChildren.size() << " children of " - << getUniqueName() << " successful, scheduling run"; - scheduleRun(); + CLOG(DEBUG, "Work") + << "some of " << mChildren.size() << " children of " + << getUniqueName() << " FATALLY failed, propagating " + << "abort"; + abort(WORK_COMPLETE_FATAL); } - else if (anyChildFatalFailure()) + else if (anyChildRaiseFailure()) { CLOG(DEBUG, "Work") << "some of " << mChildren.size() << " children of " - << getUniqueName() << " fatally failed, scheduling " - << "fatal failure"; - scheduleFatalFailure(); + << getUniqueName() << " failed, propagating " + << "abort"; + abort(WORK_COMPLETE_FAILURE); } - else if (anyChildRaiseFailure()) + else if (anyChildAborted()) { CLOG(DEBUG, "Work") << "some of " << mChildren.size() << " children of " - << getUniqueName() << " failed, scheduling failure"; - scheduleFailure(); + << getUniqueName() << " aborted, propagating " + << "abort"; + abort(WORK_COMPLETE_ABORTED); + } + + advanceChildren(); + if (allChildrenSuccessful()) + { + if (mAborting) + { + scheduleAbort(WORK_COMPLETE_ABORTED); + } + else + { + scheduleRun(); + } } } void Work::run() { + if (mAborting) + { + CLOG(DEBUG, "Work") << "aborting " << getUniqueName(); + mApp.getMetrics().NewMeter({"work", "unit", "abort"}, "unit").Mark(); + onAbort(); + return; + } + if (getState() == WORK_PENDING) { CLOG(DEBUG, "Work") << "starting " << getUniqueName(); @@ -294,6 +344,8 @@ Work::complete(CompleteResult result) mApp.getMetrics().NewMeter({"work", "unit", "success"}, "unit"); auto& fail = mApp.getMetrics().NewMeter({"work", "unit", "failure"}, "unit"); + auto& aborted = + mApp.getMetrics().NewMeter({"work", "unit", "abort"}, "unit"); switch (result) { @@ -306,6 +358,9 @@ Work::complete(CompleteResult result) case WORK_COMPLETE_FATAL: setState(WORK_FAILURE_FATAL); break; + case WORK_COMPLETE_ABORTED: + setState(WORK_FAILURE_ABORTED); + break; } switch (getState()) @@ -331,6 +386,13 @@ Work::complete(CompleteResult result) notifyParent(); break; + case WORK_FAILURE_ABORTED: + aborted.Mark(); + CLOG(DEBUG, "Work") + << "notifying parent of completed abort " << getUniqueName(); + notifyParent(); + break; + case WORK_PENDING: succ.Mark(); advance(); @@ -379,6 +441,12 @@ Work::onFailureRaise() { } +void +Work::onAbort(CompleteResult result) +{ + scheduleAbort(result); +} + Work::State Work::getState() const { @@ -389,7 +457,7 @@ bool Work::isDone() const { return mState == WORK_SUCCESS || mState == WORK_FAILURE_RAISE || - mState == WORK_FAILURE_FATAL; + mState == WORK_FAILURE_FATAL || mState == WORK_FAILURE_ABORTED; } void @@ -434,4 +502,43 @@ Work::notify(std::string const& child) << " of completed child " << child; advance(); } + +void +Work::abort(CompleteResult result) +{ + // When `abort` signal is issued, pending/running work is in either + // one of two states: + // 1. It hasn't been scheduled to run yet. If some children are still + // running, this is handled in advance where work is scheduled to abort. + // Otherwise, work is scheduled to abort right away. + // 2. Work is already in IO service queue, but hasn't started running yet. + // This scenario is handled in `run` method, where abort is scheduled + // instead of success. + + assert(getState() == WORK_PENDING || getState() == WORK_RUNNING); + mAborting = true; + bool allDone = true; + + for (auto const& c : mChildren) + { + if (!c.second->isDone()) + { + allDone = false; + } + + // Abort pending or running work. If work has finished with success or + // fail, there's nothing to do + if (c.second->getState() == Work::WORK_PENDING || + c.second->getState() == WORK_RUNNING) + { + c.second->abort(); + } + } + + if (allDone) + { + // Children are ready, schedule abort for work itself. + onAbort(result); + } +} } diff --git a/src/work/Work.h b/src/work/Work.h index 1d7a8853e7..9d5c4fdaf4 100644 --- a/src/work/Work.h +++ b/src/work/Work.h @@ -47,14 +47,16 @@ class Work : public WorkParent WORK_SUCCESS, WORK_FAILURE_RETRY, WORK_FAILURE_RAISE, - WORK_FAILURE_FATAL + WORK_FAILURE_FATAL, + WORK_FAILURE_ABORTED }; enum CompleteResult { WORK_COMPLETE_OK, WORK_COMPLETE_FAILURE, - WORK_COMPLETE_FATAL + WORK_COMPLETE_FATAL, + WORK_COMPLETE_ABORTED }; Work(Application& app, WorkParent& parent, std::string uniqueName, @@ -78,6 +80,7 @@ class Work : public WorkParent virtual void onRun(); virtual void onFailureRetry(); virtual void onFailureRaise(); + virtual void onAbort(CompleteResult result = WORK_COMPLETE_ABORTED); // onSuccess is a little different than the others: it's called on // WORK_SUCCESS, but it also returns the next sate desired: if you want @@ -94,6 +97,7 @@ class Work : public WorkParent static std::string stateName(State st); State getState() const; bool isDone() const; + void abort(CompleteResult result = WORK_COMPLETE_ABORTED); void advance(); void reset(); @@ -104,6 +108,7 @@ class Work : public WorkParent size_t mRetries{0}; State mState{WORK_PENDING}; bool mScheduled{false}; + bool mAborting{false}; std::unique_ptr mRetryTimer; @@ -129,6 +134,7 @@ class Work : public WorkParent scheduleComplete(WORK_COMPLETE_FATAL); } + void scheduleAbort(CompleteResult result = WORK_COMPLETE_ABORTED); void setState(State s); void notifyParent(); diff --git a/src/work/WorkManagerImpl.cpp b/src/work/WorkManagerImpl.cpp index 9e5c0e9831..ac79e43df4 100644 --- a/src/work/WorkManagerImpl.cpp +++ b/src/work/WorkManagerImpl.cpp @@ -61,6 +61,13 @@ WorkManagerImpl::notify(std::string const& child) mApp.getMetrics().NewMeter({"work", "root", "failure"}, "unit").Mark(); mChildren.erase(child); } + else if (i->second->getState() == Work::WORK_FAILURE_ABORTED) + { + CLOG(WARNING, "Work") + << "WorkManager got FAILURE_ABORTED from " << child; + mApp.getMetrics().NewMeter({"work", "root", "abort"}, "unit").Mark(); + mChildren.erase(child); + } advanceChildren(); } diff --git a/src/work/WorkParent.cpp b/src/work/WorkParent.cpp index 70a193066c..a2375c0cb1 100644 --- a/src/work/WorkParent.cpp +++ b/src/work/WorkParent.cpp @@ -48,6 +48,15 @@ WorkParent::advanceChildren() } } +void +WorkParent::abortChildren() +{ + for (auto& c : mChildren) + { + c.second->abort(); + } +} + bool WorkParent::anyChildRaiseFailure() const { @@ -87,14 +96,25 @@ WorkParent::allChildrenSuccessful() const return true; } +bool +WorkParent::anyChildAborted() const +{ + for (auto const& c : mChildren) + { + if (c.second->getState() == Work::WORK_FAILURE_ABORTED) + { + return true; + } + } + return false; +} + bool WorkParent::allChildrenDone() const { for (auto& c : mChildren) { - if (c.second->getState() != Work::WORK_SUCCESS && - c.second->getState() != Work::WORK_FAILURE_RAISE && - c.second->getState() != Work::WORK_FAILURE_FATAL) + if (!c.second->isDone()) { return false; } diff --git a/src/work/WorkParent.h b/src/work/WorkParent.h index a3bc170920..014922fc77 100644 --- a/src/work/WorkParent.h +++ b/src/work/WorkParent.h @@ -39,10 +39,12 @@ class WorkParent : public std::enable_shared_from_this, void addChild(std::shared_ptr child); void clearChildren(); void advanceChildren(); + void abortChildren(); bool anyChildRaiseFailure() const; bool anyChildFatalFailure() const; bool allChildrenSuccessful() const; bool allChildrenDone() const; + bool anyChildAborted() const; Application& app() const; diff --git a/src/work/WorkTests.cpp b/src/work/WorkTests.cpp index 1896d0ce2d..323d4e9c73 100644 --- a/src/work/WorkTests.cpp +++ b/src/work/WorkTests.cpp @@ -2,6 +2,7 @@ // under the Apache License, Version 2.0. See the COPYING file at the root // of this distribution or at http://www.apache.org/licenses/LICENSE-2.0 +#include "historywork/RunCommandWork.h" #include "lib/catch.hpp" #include "main/Application.h" #include "main/Config.h" @@ -9,6 +10,7 @@ #include "test/TestUtils.h" #include "test/test.h" #include "util/Fs.h" +#include "util/format.h" #include "work/WorkManager.h" #include @@ -18,20 +20,24 @@ using namespace stellar; -class CallCmdWork : public Work +class CallCmdWork : public RunCommandWork { std::string mCommand; public: - CallCmdWork(Application& app, WorkParent& parent, std::string command) - : Work(app, parent, std::string("call-") + command), mCommand(command) + CallCmdWork(Application& app, WorkParent& parent, std::string command, + uint32_t count = 0) + : RunCommandWork(app, parent, + fmt::format("call-{:s}-{:d}", command, count), + Work::RETRY_A_FEW) + , mCommand(command) { } + virtual void - onRun() override + getCommand(std::string& cmdLine, std::string& outFile) override { - auto evt = mApp.getProcessManager().runProcess(mCommand); - evt.async_wait(callComplete()); + cmdLine = mCommand; } }; @@ -52,6 +58,51 @@ TEST_CASE("work manager", "[work]") } } +TEST_CASE("work aborted while running or in process queue", "[work][process]") +{ + VirtualClock clock; + Config const& cfg = getTestConfig(); + Application::pointer appPtr = createTestApplication(clock, cfg); + auto& wm = appPtr->getWorkManager(); + auto maxProcesses = appPtr->getConfig().MAX_CONCURRENT_SUBPROCESSES; + + auto w = wm.addWork("hostname"); + std::deque> workQueue; + + // intentionally longer work so that we can abort while it's running +#ifdef _WIN32 + std::string command = "waitfor /T 10 pause"; +#else + std::string command = "sleep 10"; +#endif + + // 5 extra processes that won't be spawned right away + for (auto i = 0; i < 5 + maxProcesses; i++) + { + workQueue.push_back(w->addWork(command, i)); + } + + wm.advanceChildren(); + + // Wait for at least one work to be "running" + while (workQueue.front()->getState() != Work::WORK_RUNNING) + { + clock.crank(); + } + + wm.abortChildren(); + while (!wm.allChildrenDone()) + { + clock.crank(); + } + + REQUIRE(appPtr->getProcessManager().getNumRunningProcesses() == 0); + for (auto work : workQueue) + { + REQUIRE(work->getState() == Work::WORK_FAILURE_ABORTED); + } +} + class CountDownWork : public Work { size_t mCount; @@ -221,7 +272,7 @@ class WorkWith2Subworks : public Work bool mCalledSuccessWithPendingSubwork{false}; }; -TEST_CASE("sub-subwork items succed at the same time", "[work]") +TEST_CASE("sub-subwork items succeed at the same time", "[work]") { VirtualClock clock; auto const& cfg = getTestConfig(); @@ -254,3 +305,124 @@ TEST_CASE("sub-subwork items succed at the same time", "[work]") REQUIRE(!work1->mCalledSuccessWithPendingSubwork); } + +TEST_CASE("subwork triggers abort", "[work][workabort]") +{ + VirtualClock clock; + auto const& cfg = getTestConfig(); + auto app = createTestApplication(clock, cfg); + auto& wm = app->getWorkManager(); + auto w = wm.addWork("main-work"); + + SECTION("other child pending") + { + auto failWork = w->addWork("work-1"); + auto normalWork = w->addWork("work-2"); + failWork->advance(); + while (!wm.allChildrenDone()) + { + clock.crank(); + } + + REQUIRE(normalWork->getState() == Work::WORK_FAILURE_ABORTED); + REQUIRE(w->getState() == Work::WORK_FAILURE_RAISE); + } + SECTION("other child already succeeded") + { + auto failWork = w->addWork("work-1"); + auto normalWork = w->addWork("work-2"); + normalWork->forceSuccess(); + + while (!wm.allChildrenDone()) + { + clock.crank(); + } + + REQUIRE(normalWork->getState() == Work::WORK_SUCCESS); + REQUIRE(w->getState() == Work::WORK_FAILURE_RAISE); + } + SECTION("other child running") + { + auto work1 = w->addWork(10); + wm.advanceChildren(); + auto work2 = w->addWork("work-2"); + wm.advanceChildren(); + + while (!wm.allChildrenDone()) + { + clock.crank(); + } + + REQUIRE(work1->getState() == Work::WORK_FAILURE_ABORTED); + REQUIRE(work2->getState() == Work::WORK_FAILURE_RAISE); + REQUIRE(w->getState() == Work::WORK_FAILURE_RAISE); + } +} + +TEST_CASE("work manual abort", "[work][workabort]") +{ + VirtualClock clock; + auto const& cfg = getTestConfig(); + auto app = createTestApplication(clock, cfg); + auto& wm = app->getWorkManager(); + auto w = wm.addWork("main-work"); + + auto work1 = w->addWork("work-1"); + auto work2 = w->addWork("work-2"); + + SECTION("main work aborts") + { + w->abort(); + while (!wm.allChildrenDone()) + { + clock.crank(); + } + + REQUIRE(work1->getState() == Work::WORK_FAILURE_ABORTED); + REQUIRE(work2->getState() == Work::WORK_FAILURE_ABORTED); + REQUIRE(w->getState() == Work::WORK_FAILURE_ABORTED); + } + SECTION("abort propagates to the top") + { + work2->abort(); + while (!wm.allChildrenDone()) + { + clock.crank(); + } + + REQUIRE(work1->getState() == Work::WORK_FAILURE_ABORTED); + REQUIRE(work2->getState() == Work::WORK_FAILURE_ABORTED); + REQUIRE(w->getState() == Work::WORK_FAILURE_ABORTED); + } +} + +TEST_CASE("work abort with complex structure", "[work][workabort]") +{ + VirtualClock clock; + auto const& cfg = getTestConfig(); + auto app = createTestApplication(clock, cfg); + auto& wm = app->getWorkManager(); + auto w = wm.addWork("main-work"); + + auto work1 = w->addWork("work-1"); + auto work2 = w->addWork("work-2"); + auto work3 = w->addWork("work-3"); + + auto w1_work4 = work1->addWork("work-4"); + auto w1_work5 = work1->addWork("work-5"); + + auto w2_work6 = work2->addWork("work-6"); + + work2->advance(); + while (!wm.allChildrenDone()) + { + clock.crank(); + } + + REQUIRE(work1->getState() == Work::WORK_FAILURE_ABORTED); + REQUIRE(w1_work4->getState() == Work::WORK_FAILURE_ABORTED); + REQUIRE(w1_work5->getState() == Work::WORK_FAILURE_ABORTED); + REQUIRE(work2->getState() == Work::WORK_FAILURE_RAISE); + REQUIRE(w2_work6->getState() == Work::WORK_FAILURE_RAISE); + REQUIRE(work3->getState() == Work::WORK_FAILURE_ABORTED); +}