Skip to content

Commit

Permalink
Revert "DPL: If in OnlineMode, input proxies should only send EoS if …
Browse files Browse the repository at this point in the history
…they receive it"

This reverts commit ed52430.
  • Loading branch information
davidrohr committed Oct 18, 2023
1 parent dbb5102 commit 655673d
Showing 1 changed file with 1 addition and 12 deletions.
13 changes: 1 addition & 12 deletions Framework/Core/src/ExternalFairMQDeviceProxy.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
#include "Framework/TimingInfo.h"
#include "Framework/DeviceState.h"
#include "Framework/Monitoring.h"
#include "Framework/DefaultsHelpers.h"
#include "Framework/DataTakingContext.h"
#include "Headers/DataHeader.h"
#include "Headers/Stack.h"
#include "DecongestionService.h"
Expand All @@ -54,10 +52,6 @@

namespace o2::framework
{
// We do not allow to send EoS in online runs, until we receive (which is checked below) them upstream (to prevent premature shutdown of calibration)
static bool gAllowEoSdefault = !(DefaultsHelpers::deploymentMode() == DeploymentMode::OnlineDDS || DefaultsHelpers::deploymentMode() == DeploymentMode::OnlineECS);
static bool gAllowEoS = gAllowEoSdefault;

using DataHeader = o2::header::DataHeader;

std::string formatExternalChannelConfiguration(InputChannelSpec const& spec)
Expand Down Expand Up @@ -797,7 +791,6 @@ DataProcessorSpec specifyExternalFairMQDeviceProxy(char const* name,
if (newRun) {
std::fill(numberOfEoS.begin(), numberOfEoS.end(), 0);
std::fill(eosPeersCount.begin(), eosPeersCount.end(), 0);
gAllowEoS = gAllowEoSdefault;
}
numberOfEoS[ci] += nEos;
if (numberOfEoS[ci]) {
Expand Down Expand Up @@ -828,8 +821,7 @@ DataProcessorSpec specifyExternalFairMQDeviceProxy(char const* name,
}

if (everyEoS) {
LOG(info) << "Received " << numberOfEoS[ci] << " end-of-stream from " << eosPeersCount[ci] << " peers, forwarding end-of-stream (shouldstop " << (int)shouldstop << ", nEos " << nEos << ", newRun " << (int)newRun << ")";
gAllowEoS = true;
LOG(info) << "Received (on channel " << ci << ") " << numberOfEoS[ci] << " end-of-stream from " << eosPeersCount[ci] << " peers, forwarding end-of-stream (shouldstop " << (int)shouldstop << ", nEos " << nEos << ", newRun " << (int)newRun << ")";
// Mark all input channels as closed
for (auto& info : deviceState->inputChannelInfos) {
info.state = InputChannelState::Completed;
Expand Down Expand Up @@ -957,9 +949,6 @@ DataProcessorSpec specifyFairMQDeviceOutputProxy(char const* name,
// DPL implements an internal end of stream signal, which is propagated through
// all downstream channels if a source is dry, make it available to other external
// devices via a message of type {DPL/EOS/0}
if (!gAllowEoS) {
return;
}
for (auto& channelInfo : device->GetChannels()) {
auto& channelName = channelInfo.first;
if (channelName != outputChannelName) {
Expand Down

0 comments on commit 655673d

Please sign in to comment.