Skip to content

Commit

Permalink
Merge pull request #45434 from wddgit/exceptionBehaviorBeginEndJobAnd…
Browse files Browse the repository at this point in the history
…Stream

Improve Framework behavior after exceptions in begin/end transitions (Job, Stream, ProcessBlock)
  • Loading branch information
cmsbuild authored Jul 26, 2024
2 parents b61d899 + af8027d commit fd19e2b
Show file tree
Hide file tree
Showing 45 changed files with 1,663 additions and 588 deletions.
9 changes: 5 additions & 4 deletions FWCore/Framework/bin/cmsRun.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,26 +267,27 @@ int main(int argc, const char* argv[]) {
TaskCleanupSentry sentry{proc.get()};

alwaysAddContext = false;

proc.on();
context = "Calling beginJob";
proc->beginJob();

// EventSetupsController uses pointers to the ParameterSet
// owned by ProcessDesc while it is dealing with sharing of
// ESProducers among the top-level process and the
// SubProcesses. Therefore the ProcessDesc needs to be kept
// alive until the beginJob transition has finished.
processDesc.reset();

alwaysAddContext = false;
context =
"Calling EventProcessor::runToCompletion (which does almost everything after beginJob and before endJob)";
proc.on();
auto status = proc->runToCompletion();
if (status == edm::EventProcessor::epSignal) {
returnCode = edm::errors::CaughtSignal;
}
proc.off();

context = "Calling endJob";
proc.off();
context = "Calling endJob and endStream";
proc->endJob();
});
return returnCode;
Expand Down
9 changes: 9 additions & 0 deletions FWCore/Framework/interface/EventProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ configured in the user's main() function, and is set running.
#include "FWCore/Utilities/interface/get_underlying_safe.h"
#include "FWCore/Utilities/interface/propagate_const.h"

#include "oneapi/tbb/task_group.h"

#include <atomic>
#include <map>
#include <memory>
Expand All @@ -46,6 +48,7 @@ configured in the user's main() function, and is set running.

namespace edm {

class ExceptionCollector;
class ExceptionToActionTable;
class BranchIDListHelper;
class MergeableRunProductMetadata;
Expand Down Expand Up @@ -120,6 +123,10 @@ namespace edm {
*/
void beginJob();

void beginStreams();

void endStreams(ExceptionCollector&) noexcept;

/**This should be called before the EventProcessor is destroyed
throws if any module's endJob throws an exception.
*/
Expand Down Expand Up @@ -351,6 +358,8 @@ namespace edm {
std::shared_ptr<std::recursive_mutex> sourceMutex_;
PrincipalCache principalCache_;
bool beginJobCalled_;
bool beginJobStartedModules_ = false;
bool beginJobSucceeded_ = false;
bool shouldWeStop_;
bool fileModeNoMerge_;
std::string exceptionMessageFiles_;
Expand Down
26 changes: 15 additions & 11 deletions FWCore/Framework/interface/GlobalSchedule.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "FWCore/MessageLogger/interface/ExceptionMessages.h"
#include "FWCore/ServiceRegistry/interface/GlobalContext.h"
#include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
#include "FWCore/ServiceRegistry/interface/ServiceRegistryfwd.h"
#include "FWCore/ServiceRegistry/interface/ServiceToken.h"
#include "FWCore/Utilities/interface/Algorithms.h"
#include "FWCore/Utilities/interface/BranchType.h"
Expand All @@ -38,9 +39,7 @@

namespace edm {

class ActivityRegistry;
class ExceptionCollector;
class ProcessContext;
class PreallocationConfiguration;
class ModuleRegistry;
class TriggerResultInserter;
Expand Down Expand Up @@ -76,7 +75,9 @@ namespace edm {

void beginJob(ProductRegistry const&,
eventsetup::ESRecordsToProductResolverIndices const&,
ProcessBlockHelperBase const&);
ProcessBlockHelperBase const&,
PathsAndConsumesOfModulesBase const&,
ProcessContext const&);
void endJob(ExceptionCollector& collector);

/// Return a vector allowing const access to all the
Expand Down Expand Up @@ -118,7 +119,14 @@ namespace edm {
std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
std::vector<edm::propagate_const<WorkerPtr>> extraWorkers_;
ProcessContext const* processContext_;

// The next 4 variables use the same naming convention, even though we have no intention
// to ever have concurrent ProcessBlocks or Jobs. They are all related to the number of
// WorkerManagers needed for global transitions.
unsigned int numberOfConcurrentLumis_;
unsigned int numberOfConcurrentRuns_;
static constexpr unsigned int numberOfConcurrentProcessBlocks_ = 1;
static constexpr unsigned int numberOfConcurrentJobs_ = 1;
};

template <typename T>
Expand Down Expand Up @@ -155,6 +163,8 @@ namespace edm {
unsigned int managerIndex = principal.index();
if constexpr (T::branchType_ == InRun) {
managerIndex += numberOfConcurrentLumis_;
} else if constexpr (T::branchType_ == InProcess) {
managerIndex += (numberOfConcurrentLumis_ + numberOfConcurrentRuns_);
}
WorkerManager& workerManager = workerManagers_[managerIndex];
workerManager.resetAll();
Expand Down Expand Up @@ -184,10 +194,7 @@ namespace edm {
ServiceRegistry::Operate op(token);
convertException::wrap([this, globalContext]() { T::preScheduleSignal(actReg_.get(), globalContext); });
} catch (cms::Exception& ex) {
std::ostringstream ost;
ex.addContext("Handling pre signal, likely in a service function");
exceptionContext(ost, *globalContext);
ex.addContext(ost.str());
exceptionContext(ex, *globalContext, "Handling pre signal, likely in a service function");
throw;
}
}
Expand All @@ -205,10 +212,7 @@ namespace edm {
});
} catch (cms::Exception& ex) {
if (not excpt) {
std::ostringstream ost;
ex.addContext("Handling post signal, likely in a service function");
exceptionContext(ost, *globalContext);
ex.addContext(ost.str());
exceptionContext(ex, *globalContext, "Handling post signal, likely in a service function");
excpt = std::current_exception();
}
}
Expand Down
14 changes: 8 additions & 6 deletions FWCore/Framework/interface/Schedule.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@
#include "FWCore/MessageLogger/interface/JobReport.h"
#include "FWCore/MessageLogger/interface/MessageLogger.h"
#include "FWCore/ServiceRegistry/interface/Service.h"
#include "FWCore/ServiceRegistry/interface/ServiceRegistryfwd.h"
#include "FWCore/Utilities/interface/Algorithms.h"
#include "FWCore/Utilities/interface/BranchType.h"
#include "FWCore/Utilities/interface/ConvertException.h"
#include "FWCore/Utilities/interface/Exception.h"
#include "FWCore/Utilities/interface/StreamID.h"
#include "FWCore/Utilities/interface/get_underlying_safe.h"
Expand All @@ -85,6 +85,7 @@
#include <array>
#include <map>
#include <memory>
#include <mutex>
#include <set>
#include <string>
#include <vector>
Expand All @@ -100,13 +101,11 @@ namespace edm {
class ESRecordsToProductResolverIndices;
}

class ActivityRegistry;
class BranchIDListHelper;
class EventTransitionInfo;
class ExceptionCollector;
class MergeableRunProductMetadata;
class OutputModuleCommunicator;
class ProcessContext;
class ProductRegistry;
class PreallocationConfiguration;
class StreamSchedule;
Expand Down Expand Up @@ -171,11 +170,14 @@ namespace edm {

void beginJob(ProductRegistry const&,
eventsetup::ESRecordsToProductResolverIndices const&,
ProcessBlockHelperBase const&);
ProcessBlockHelperBase const&,
PathsAndConsumesOfModulesBase const&,
ProcessContext const&);
void endJob(ExceptionCollector& collector);
void sendFwkSummaryToMessageLogger() const;

void beginStream(unsigned int);
void endStream(unsigned int);
void beginStream(unsigned int streamID);
void endStream(unsigned int streamID, ExceptionCollector& collector, std::mutex& collectorMutex) noexcept;

// Write the luminosity block
void writeLumiAsync(WaitingTaskHolder iTask,
Expand Down
41 changes: 21 additions & 20 deletions FWCore/Framework/interface/StreamSchedule.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@
#include "FWCore/MessageLogger/interface/JobReport.h"
#include "FWCore/MessageLogger/interface/MessageLogger.h"
#include "FWCore/ServiceRegistry/interface/Service.h"
#include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
#include "FWCore/ServiceRegistry/interface/ServiceRegistryfwd.h"
#include "FWCore/ServiceRegistry/interface/ServiceToken.h"
#include "FWCore/ServiceRegistry/interface/StreamContext.h"
#include "FWCore/Concurrency/interface/FunctorTask.h"
#include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
Expand All @@ -88,8 +90,10 @@
#include "FWCore/Utilities/interface/propagate_const.h"
#include "FWCore/Utilities/interface/thread_safety_macros.h"

#include <exception>
#include <map>
#include <memory>
#include <mutex>
#include <set>
#include <string>
#include <vector>
Expand All @@ -111,8 +115,6 @@ namespace edm {
class PathStatusInserter;
class EndPathStatusInserter;
class PreallocationConfiguration;
class WaitingTaskHolder;

class ConditionalTaskHelper;

namespace service {
Expand All @@ -123,7 +125,6 @@ namespace edm {
public:
typedef std::vector<std::string> vstring;
typedef std::vector<Path> TrigPaths;
typedef std::vector<Path> NonTrigPaths;
typedef std::shared_ptr<HLTGlobalStatus> TrigResPtr;
typedef std::shared_ptr<HLTGlobalStatus const> TrigResConstPtr;
typedef std::shared_ptr<Worker> WorkerPtr;
Expand Down Expand Up @@ -162,7 +163,7 @@ namespace edm {
bool cleaningUpAfterException = false);

void beginStream();
void endStream();
void endStream(ExceptionCollector& collector, std::mutex& collectorMutex) noexcept;

StreamID streamID() const { return streamID_; }

Expand Down Expand Up @@ -306,12 +307,9 @@ namespace edm {
void preScheduleSignal(StreamContext const*) const;

template <typename T>
void postScheduleSignal(StreamContext const*, ServiceWeakToken const&, std::exception_ptr&) const noexcept;
void postScheduleSignal(StreamContext const*, std::exception_ptr&) const noexcept;

void handleException(StreamContext const&,
ServiceWeakToken const&,
bool cleaningUpAfterException,
std::exception_ptr&) const noexcept;
void handleException(StreamContext const&, bool cleaningUpAfterException, std::exception_ptr&) const noexcept;

WorkerManager workerManagerBeginEnd_;
WorkerManager workerManagerRuns_;
Expand Down Expand Up @@ -370,11 +368,15 @@ namespace edm {
auto doneTask = make_waiting_task([this, iHolder = std::move(iHolder), cleaningUpAfterException, weakToken](
std::exception_ptr const* iPtr) mutable {
std::exception_ptr excpt;
if (iPtr) {
excpt = *iPtr;
handleException(streamContext_, weakToken, cleaningUpAfterException, excpt);
}
postScheduleSignal<T>(&streamContext_, weakToken, excpt);
{
ServiceRegistry::Operate op(weakToken.lock());

if (iPtr) {
excpt = *iPtr;
handleException(streamContext_, cleaningUpAfterException, excpt);
}
postScheduleSignal<T>(&streamContext_, excpt);
} // release service token before calling doneWaiting
iHolder.doneWaiting(excpt);
});

Expand All @@ -391,7 +393,10 @@ namespace edm {
preScheduleSignal<T>(&streamContext_);
workerManager->resetAll();
} catch (...) {
h.doneWaiting(std::current_exception());
// Just remember the exception at this point,
// let the destructor of h call doneWaiting() so the
// ServiceRegistry::Operator object is destroyed first
h.presetTaskAsFailed(std::current_exception());
return;
}

Expand Down Expand Up @@ -430,13 +435,9 @@ namespace edm {

template <typename T>
void StreamSchedule::postScheduleSignal(StreamContext const* streamContext,
ServiceWeakToken const& weakToken,
std::exception_ptr& excpt) const noexcept {
try {
convertException::wrap([this, &weakToken, streamContext]() {
ServiceRegistry::Operate op(weakToken.lock());
T::postScheduleSignal(actReg_.get(), streamContext);
});
convertException::wrap([this, streamContext]() { T::postScheduleSignal(actReg_.get(), streamContext); });
} catch (cms::Exception& ex) {
if (not excpt) {
std::ostringstream ost;
Expand Down
11 changes: 7 additions & 4 deletions FWCore/Framework/interface/SubProcess.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "FWCore/Framework/interface/ProductSelector.h"
#include "FWCore/ServiceRegistry/interface/ProcessContext.h"
#include "FWCore/ServiceRegistry/interface/ServiceLegacy.h"
#include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
#include "FWCore/ServiceRegistry/interface/ServiceToken.h"
#include "FWCore/Utilities/interface/Algorithms.h"
#include "FWCore/Utilities/interface/BranchType.h"
Expand All @@ -23,6 +24,7 @@

#include <map>
#include <memory>
#include <mutex>
#include <set>
#include <vector>

Expand All @@ -32,6 +34,7 @@ namespace edm {
class BranchIDListHelper;
class EventPrincipal;
class EventSetupImpl;
class ExceptionCollector;
class HistoryAppender;
class LuminosityBlockPrincipal;
class LumiTransitionInfo;
Expand Down Expand Up @@ -86,7 +89,7 @@ namespace edm {
std::vector<ModuleProcessName> keepOnlyConsumedUnscheduledModules(bool deleteModules);

void doBeginJob();
void doEndJob();
void doEndJob(ExceptionCollector&);

void doEventAsync(WaitingTaskHolder iHolder,
EventPrincipal const& principal,
Expand All @@ -113,8 +116,8 @@ namespace edm {
LumiTransitionInfo const& iTransitionInfo,
bool cleaningUpAfterException);

void doBeginStream(unsigned int);
void doEndStream(unsigned int);
void doBeginStream(unsigned int streamID);
void doEndStream(unsigned int streamID, ExceptionCollector& collector, std::mutex& collectorMutex) noexcept;
void doStreamBeginRunAsync(WaitingTaskHolder iHolder, unsigned int iID, RunTransitionInfo const&);

void doStreamEndRunAsync(WaitingTaskHolder iHolder,
Expand Down Expand Up @@ -238,7 +241,7 @@ namespace edm {

private:
void beginJob();
void endJob();
void endJob(ExceptionCollector&);
void processAsync(WaitingTaskHolder iHolder,
EventPrincipal const& e,
std::vector<std::shared_ptr<const EventSetupImpl>> const*);
Expand Down
12 changes: 7 additions & 5 deletions FWCore/Framework/interface/WorkerManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
#include "FWCore/Framework/interface/Frameworkfwd.h"
#include "FWCore/Framework/interface/UnscheduledCallProducer.h"
#include "FWCore/Framework/interface/WorkerRegistry.h"
#include "FWCore/ServiceRegistry/interface/ParentContext.h"
#include "FWCore/ServiceRegistry/interface/ServiceRegistryfwd.h"
#include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
#include "FWCore/Utilities/interface/StreamID.h"

#include <memory>
#include <mutex>
#include <set>
#include <string>
#include <utility>
Expand Down Expand Up @@ -70,12 +72,12 @@ namespace edm {

void beginJob(ProductRegistry const& iRegistry,
eventsetup::ESRecordsToProductResolverIndices const&,
ProcessBlockHelperBase const&);
void endJob();
void endJob(ExceptionCollector& collector);
ProcessBlockHelperBase const&,
GlobalContext const&);
void endJob(ExceptionCollector&, GlobalContext const&);

void beginStream(StreamID iID, StreamContext& streamContext);
void endStream(StreamID iID, StreamContext& streamContext);
void beginStream(StreamID, StreamContext const&);
void endStream(StreamID, StreamContext const&, ExceptionCollector&, std::mutex& collectorMutex) noexcept;

AllWorkers const& allWorkers() const { return allWorkers_; }
AllWorkers const& unscheduledWorkers() const { return unscheduled_.workers(); }
Expand Down
Loading

0 comments on commit fd19e2b

Please sign in to comment.