From 2e6ed5ff4183d8f5684fc6b83352a00932f0c84e Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Mon, 16 Oct 2023 22:10:17 +0200 Subject: [PATCH] DPL: add finaliseOutputs callback for services This is invoked after the processing is done (therefore no other outputs are expected from user code) but before the postProcessing (which is were data is actually sent to their consumers. --- Framework/Core/include/Framework/CallbackService.h | 5 +++++ .../Core/include/Framework/DataProcessingContext.h | 6 ++++++ Framework/Core/include/Framework/ServiceSpec.h | 5 ++++- Framework/Core/include/Framework/StreamContext.h | 4 ++++ Framework/Core/src/ContextHelpers.h | 6 ++++++ Framework/Core/src/DataProcessingContext.cxx | 8 ++++++++ Framework/Core/src/DataProcessingDevice.cxx | 9 +++++++++ Framework/Core/src/StreamContext.cxx | 11 +++++++++++ 8 files changed, 53 insertions(+), 1 deletion(-) diff --git a/Framework/Core/include/Framework/CallbackService.h b/Framework/Core/include/Framework/CallbackService.h index 91115e92af129..6243b15795fd3 100644 --- a/Framework/Core/include/Framework/CallbackService.h +++ b/Framework/Core/include/Framework/CallbackService.h @@ -71,6 +71,9 @@ class CallbackService NewTimeslice, /// Invoked before the processing callback PreProcessing, + /// Invoked after the processing callback and before the post processing + /// callback to allow for injecting data in the output stream. + FinaliseOutputs, /// Invoked after the processing callback, PostProcessing, /// Invoked whenever an object from CCDB is deserialised via ROOT. @@ -94,6 +97,7 @@ class CallbackService using RegionInfoCallback = std::function; using NewTimesliceCallback = std::function; using PreProcessingCallback = std::function; + using FinaliseOutputsCallback = std::function; using PostProcessingCallback = std::function; using CCDBDeserializedCallback = std::function; using DomainInfoUpdatedCallback = std::function; @@ -111,6 +115,7 @@ class CallbackService RegistryPair, // RegistryPair, // RegistryPair, // + RegistryPair, // RegistryPair, // RegistryPair, // RegistryPair, // diff --git a/Framework/Core/include/Framework/DataProcessingContext.h b/Framework/Core/include/Framework/DataProcessingContext.h index 2f7308a0ce810..d71ad203b1580 100644 --- a/Framework/Core/include/Framework/DataProcessingContext.h +++ b/Framework/Core/include/Framework/DataProcessingContext.h @@ -53,6 +53,9 @@ struct DataProcessorContext { void preStartCallbacks(ServiceRegistryRef); /// Invoke callbacks to be executed before every process method invokation void preProcessingCallbacks(ProcessingContext&); + /// Invoke callbacks to be executed after the outputs have been created + /// by the processing, but before the post processing callbacks. + void finaliseOutputsCallbacks(ProcessingContext&); /// Invoke callbacks to be executed after every process method invokation void postProcessingCallbacks(ProcessingContext&); /// Invoke callbacks to be executed before every dangling check @@ -91,6 +94,9 @@ struct DataProcessorContext { mutable std::vector preProcessingHandlers; /// Callback for services to be executed after every processing. /// The callback MUST BE REENTRANT and threadsafe. + mutable std::vector finaliseOutputsHandles; + /// Callback for services to be executed after every processing. + /// The callback MUST BE REENTRANT and threadsafe. mutable std::vector postProcessingHandlers; /// Callbacks for services to be executed before every dangling check mutable std::vector preDanglingHandles; diff --git a/Framework/Core/include/Framework/ServiceSpec.h b/Framework/Core/include/Framework/ServiceSpec.h index d30280a2e2f85..910620fcd0655 100644 --- a/Framework/Core/include/Framework/ServiceSpec.h +++ b/Framework/Core/include/Framework/ServiceSpec.h @@ -146,6 +146,9 @@ struct ServiceSpec { ServiceConfigureCallback configure = nullptr; /// Callback executed before actual processing happens. ServiceProcessingCallback preProcessing = nullptr; + /// Callback executed after the processing callback is completed + /// and the user provided outputs have been created. + ServiceProcessingCallback finaliseOutputs = nullptr; /// Callback executed once actual processing happened. ServiceProcessingCallback postProcessing = nullptr; /// Callback executed before the dangling inputs loop @@ -170,7 +173,7 @@ struct ServiceSpec { ServicePreSchedule preSchedule = nullptr; ServicePostSchedule postSchedule = nullptr; - ///Callback executed after each metric is received by the driver. + /// Callback executed after each metric is received by the driver. ServiceMetricHandling metricHandling = nullptr; /// Callback executed after a given input record has been successfully diff --git a/Framework/Core/include/Framework/StreamContext.h b/Framework/Core/include/Framework/StreamContext.h index 5e4dc5f1b325c..0ab353e147276 100644 --- a/Framework/Core/include/Framework/StreamContext.h +++ b/Framework/Core/include/Framework/StreamContext.h @@ -38,6 +38,9 @@ struct StreamContext { void preStartStreamCallbacks(ServiceRegistryRef); void preProcessingCallbacks(ProcessingContext& pcx); + /// Invoke callbacks to be executed after the outputs have been created + /// by the processing, but before the post processing callbacks. + void finaliseOutputsCallbacks(ProcessingContext&); void postProcessingCallbacks(ProcessingContext& pcx); /// Invoke callbacks to be executed before every EOS user callback invokation @@ -47,6 +50,7 @@ struct StreamContext { /// Callbacks for services to be executed before every process method invokation std::vector preProcessingHandles; + std::vector finaliseOutputsHandles; /// Callbacks for services to be executed after every process method invokation std::vector postProcessingHandles; diff --git a/Framework/Core/src/ContextHelpers.h b/Framework/Core/src/ContextHelpers.h index cb7af43d5006e..12ccfcca83fe1 100644 --- a/Framework/Core/src/ContextHelpers.h +++ b/Framework/Core/src/ContextHelpers.h @@ -41,6 +41,9 @@ void ContextHelpers::bindStreamService(DataProcessorContext& dpContext, StreamCo if (spec.preProcessing) { context.preProcessingHandles.push_back(ServiceProcessingHandle{spec, spec.preProcessing, service}); } + if (spec.finaliseOutputs) { + context.finaliseOutputsHandles.push_back(ServiceProcessingHandle{spec, spec.finaliseOutputs, service}); + } if (spec.postProcessing) { context.postProcessingHandles.push_back(ServiceProcessingHandle{spec, spec.postProcessing, service}); } @@ -59,6 +62,9 @@ void ContextHelpers::bindProcessorService(DataProcessorContext& dataProcessorCon if (spec.preProcessing) { dataProcessorContext.preProcessingHandlers.push_back(ServiceProcessingHandle{spec, spec.preProcessing, service}); } + if (spec.finaliseOutputs) { + dataProcessorContext.finaliseOutputsHandles.push_back(ServiceProcessingHandle{spec, spec.finaliseOutputs, service}); + } if (spec.postProcessing) { dataProcessorContext.postProcessingHandlers.push_back(ServiceProcessingHandle{spec, spec.postProcessing, service}); } diff --git a/Framework/Core/src/DataProcessingContext.cxx b/Framework/Core/src/DataProcessingContext.cxx index 281a92134b5f6..365975c706722 100644 --- a/Framework/Core/src/DataProcessingContext.cxx +++ b/Framework/Core/src/DataProcessingContext.cxx @@ -22,6 +22,14 @@ void DataProcessorContext::preProcessingCallbacks(ProcessingContext& ctx) } } +void DataProcessorContext::finaliseOutputsCallbacks(ProcessingContext& ctx) +{ + for (auto& handle : finaliseOutputsHandles) { + LOGP(debug, "Invoking postProcessingCallback for service {}", handle.spec.name); + handle.callback(ctx, handle.service); + } +} + /// Invoke callbacks to be executed before every dangling check void DataProcessorContext::postProcessingCallbacks(ProcessingContext& ctx) { diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index e3c29035e9d21..23bdc9e75806b 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -2304,6 +2304,15 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v allocator.make(OutputRef{"dpl-summary", compile_time_hash(spec.name.c_str())}, 1); } + // Extra callback which allows a service to add extra outputs. + // This is needed e.g. to ensure that injected CCDB outputs are added + // before an end of stream. + { + ref.get().call(o2::framework::ServiceRegistryRef{ref}, (int)action.op); + dpContext.finaliseOutputsCallbacks(processContext); + streamContext.finaliseOutputsCallbacks(processContext); + } + { ZoneScopedN("service post processing"); ref.get().call(o2::framework::ServiceRegistryRef{ref}, (int)action.op); diff --git a/Framework/Core/src/StreamContext.cxx b/Framework/Core/src/StreamContext.cxx index e11e658f38558..c7f28a3dbde1a 100644 --- a/Framework/Core/src/StreamContext.cxx +++ b/Framework/Core/src/StreamContext.cxx @@ -35,6 +35,17 @@ void StreamContext::preProcessingCallbacks(ProcessingContext& pcx) } } +/// Invoke callbacks to be executed after every process method invokation +void StreamContext::finaliseOutputsCallbacks(ProcessingContext& pcx) +{ + for (auto& handle : finaliseOutputsHandles) { + LOG(debug) << "Invoking finaliseOutputsCallbacks for " << handle.service; + assert(handle.service); + assert(handle.callback); + handle.callback(pcx, handle.service); + } +} + /// Invoke callbacks to be executed after every process method invokation void StreamContext::postProcessingCallbacks(ProcessingContext& pcx) {