Skip to content

Commit

Permalink
DPL: add finaliseOutputs callback for services
Browse files Browse the repository at this point in the history
This is invoked after the processing is done (therefore no other
outputs are expected from user code) but before the postProcessing
(which is were data is actually sent to their consumers.
  • Loading branch information
ktf authored and davidrohr committed Oct 18, 2023
1 parent 655673d commit 2e6ed5f
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 1 deletion.
5 changes: 5 additions & 0 deletions Framework/Core/include/Framework/CallbackService.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ class CallbackService
NewTimeslice,
/// Invoked before the processing callback
PreProcessing,
/// Invoked after the processing callback and before the post processing
/// callback to allow for injecting data in the output stream.
FinaliseOutputs,
/// Invoked after the processing callback,
PostProcessing,
/// Invoked whenever an object from CCDB is deserialised via ROOT.
Expand All @@ -94,6 +97,7 @@ class CallbackService
using RegionInfoCallback = std::function<void(fair::mq::RegionInfo const&)>;
using NewTimesliceCallback = std::function<void(o2::header::DataHeader&, DataProcessingHeader&)>;
using PreProcessingCallback = std::function<void(ServiceRegistryRef, int)>;
using FinaliseOutputsCallback = std::function<void(ServiceRegistryRef, int)>;
using PostProcessingCallback = std::function<void(ServiceRegistryRef, int)>;
using CCDBDeserializedCallback = std::function<void(ConcreteDataMatcher&, void*)>;
using DomainInfoUpdatedCallback = std::function<void(ServiceRegistryRef, size_t timeslice, ChannelIndex index)>;
Expand All @@ -111,6 +115,7 @@ class CallbackService
RegistryPair<Id, Id::RegionInfoCallback, RegionInfoCallback>, //
RegistryPair<Id, Id::NewTimeslice, NewTimesliceCallback>, //
RegistryPair<Id, Id::PreProcessing, PreProcessingCallback>, //
RegistryPair<Id, Id::FinaliseOutputs, FinaliseOutputsCallback>, //
RegistryPair<Id, Id::PostProcessing, PostProcessingCallback>, //
RegistryPair<Id, Id::CCDBDeserialised, CCDBDeserializedCallback>, //
RegistryPair<Id, Id::DomainInfoUpdated, DomainInfoUpdatedCallback>, //
Expand Down
6 changes: 6 additions & 0 deletions Framework/Core/include/Framework/DataProcessingContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ struct DataProcessorContext {
void preStartCallbacks(ServiceRegistryRef);
/// Invoke callbacks to be executed before every process method invokation
void preProcessingCallbacks(ProcessingContext&);
/// Invoke callbacks to be executed after the outputs have been created
/// by the processing, but before the post processing callbacks.
void finaliseOutputsCallbacks(ProcessingContext&);
/// Invoke callbacks to be executed after every process method invokation
void postProcessingCallbacks(ProcessingContext&);
/// Invoke callbacks to be executed before every dangling check
Expand Down Expand Up @@ -91,6 +94,9 @@ struct DataProcessorContext {
mutable std::vector<ServiceProcessingHandle> preProcessingHandlers;
/// Callback for services to be executed after every processing.
/// The callback MUST BE REENTRANT and threadsafe.
mutable std::vector<ServiceProcessingHandle> finaliseOutputsHandles;
/// Callback for services to be executed after every processing.
/// The callback MUST BE REENTRANT and threadsafe.
mutable std::vector<ServiceProcessingHandle> postProcessingHandlers;
/// Callbacks for services to be executed before every dangling check
mutable std::vector<ServiceDanglingHandle> preDanglingHandles;
Expand Down
5 changes: 4 additions & 1 deletion Framework/Core/include/Framework/ServiceSpec.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ struct ServiceSpec {
ServiceConfigureCallback configure = nullptr;
/// Callback executed before actual processing happens.
ServiceProcessingCallback preProcessing = nullptr;
/// Callback executed after the processing callback is completed
/// and the user provided outputs have been created.
ServiceProcessingCallback finaliseOutputs = nullptr;
/// Callback executed once actual processing happened.
ServiceProcessingCallback postProcessing = nullptr;
/// Callback executed before the dangling inputs loop
Expand All @@ -170,7 +173,7 @@ struct ServiceSpec {
ServicePreSchedule preSchedule = nullptr;
ServicePostSchedule postSchedule = nullptr;

///Callback executed after each metric is received by the driver.
/// Callback executed after each metric is received by the driver.
ServiceMetricHandling metricHandling = nullptr;

/// Callback executed after a given input record has been successfully
Expand Down
4 changes: 4 additions & 0 deletions Framework/Core/include/Framework/StreamContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ struct StreamContext {
void preStartStreamCallbacks(ServiceRegistryRef);

void preProcessingCallbacks(ProcessingContext& pcx);
/// Invoke callbacks to be executed after the outputs have been created
/// by the processing, but before the post processing callbacks.
void finaliseOutputsCallbacks(ProcessingContext&);
void postProcessingCallbacks(ProcessingContext& pcx);

/// Invoke callbacks to be executed before every EOS user callback invokation
Expand All @@ -47,6 +50,7 @@ struct StreamContext {

/// Callbacks for services to be executed before every process method invokation
std::vector<ServiceProcessingHandle> preProcessingHandles;
std::vector<ServiceProcessingHandle> finaliseOutputsHandles;
/// Callbacks for services to be executed after every process method invokation
std::vector<ServiceProcessingHandle> postProcessingHandles;

Expand Down
6 changes: 6 additions & 0 deletions Framework/Core/src/ContextHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ void ContextHelpers::bindStreamService(DataProcessorContext& dpContext, StreamCo
if (spec.preProcessing) {
context.preProcessingHandles.push_back(ServiceProcessingHandle{spec, spec.preProcessing, service});
}
if (spec.finaliseOutputs) {
context.finaliseOutputsHandles.push_back(ServiceProcessingHandle{spec, spec.finaliseOutputs, service});
}
if (spec.postProcessing) {
context.postProcessingHandles.push_back(ServiceProcessingHandle{spec, spec.postProcessing, service});
}
Expand All @@ -59,6 +62,9 @@ void ContextHelpers::bindProcessorService(DataProcessorContext& dataProcessorCon
if (spec.preProcessing) {
dataProcessorContext.preProcessingHandlers.push_back(ServiceProcessingHandle{spec, spec.preProcessing, service});
}
if (spec.finaliseOutputs) {
dataProcessorContext.finaliseOutputsHandles.push_back(ServiceProcessingHandle{spec, spec.finaliseOutputs, service});
}
if (spec.postProcessing) {
dataProcessorContext.postProcessingHandlers.push_back(ServiceProcessingHandle{spec, spec.postProcessing, service});
}
Expand Down
8 changes: 8 additions & 0 deletions Framework/Core/src/DataProcessingContext.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ void DataProcessorContext::preProcessingCallbacks(ProcessingContext& ctx)
}
}

void DataProcessorContext::finaliseOutputsCallbacks(ProcessingContext& ctx)
{
for (auto& handle : finaliseOutputsHandles) {
LOGP(debug, "Invoking postProcessingCallback for service {}", handle.spec.name);
handle.callback(ctx, handle.service);
}
}

/// Invoke callbacks to be executed before every dangling check
void DataProcessorContext::postProcessingCallbacks(ProcessingContext& ctx)
{
Expand Down
9 changes: 9 additions & 0 deletions Framework/Core/src/DataProcessingDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -2304,6 +2304,15 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
allocator.make<int>(OutputRef{"dpl-summary", compile_time_hash(spec.name.c_str())}, 1);
}

// Extra callback which allows a service to add extra outputs.
// This is needed e.g. to ensure that injected CCDB outputs are added
// before an end of stream.
{
ref.get<CallbackService>().call<CallbackService::Id::FinaliseOutputs>(o2::framework::ServiceRegistryRef{ref}, (int)action.op);
dpContext.finaliseOutputsCallbacks(processContext);
streamContext.finaliseOutputsCallbacks(processContext);
}

{
ZoneScopedN("service post processing");
ref.get<CallbackService>().call<CallbackService::Id::PostProcessing>(o2::framework::ServiceRegistryRef{ref}, (int)action.op);
Expand Down
11 changes: 11 additions & 0 deletions Framework/Core/src/StreamContext.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,17 @@ void StreamContext::preProcessingCallbacks(ProcessingContext& pcx)
}
}

/// Invoke callbacks to be executed after every process method invokation
void StreamContext::finaliseOutputsCallbacks(ProcessingContext& pcx)
{
for (auto& handle : finaliseOutputsHandles) {
LOG(debug) << "Invoking finaliseOutputsCallbacks for " << handle.service;
assert(handle.service);
assert(handle.callback);
handle.callback(pcx, handle.service);
}
}

/// Invoke callbacks to be executed after every process method invokation
void StreamContext::postProcessingCallbacks(ProcessingContext& pcx)
{
Expand Down

0 comments on commit 2e6ed5f

Please sign in to comment.