Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid dummy run() call in raw-tf-reader #12798

Merged
merged 1 commit into from
Mar 4, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 28 additions & 17 deletions Detectors/Raw/TFReaderDD/src/TFReaderSpec.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class TFReaderSpec : public o2f::Task
long mTotalWaitTime = 0;
size_t mSelIDEntry = 0; // next TFID to select from the mInput.tfIDs (if non-empty)
bool mRunning = false;
bool mWaitSendingLast = false;
TFReaderInp mInput; // command line inputs
std::thread mTFBuilderThread{};
};
Expand Down Expand Up @@ -116,7 +117,6 @@ void TFReaderSpec::run(o2f::ProcessingContext& ctx)
mTFBuilderThread = std::thread(&TFReaderSpec::TFBuilder, this);
}
static auto tLastTF = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
static long deltaSending = 0; // time correction for sending
auto device = ctx.services().get<o2f::RawDeviceService>().device();
assert(device);
if (device != mDevice) {
Expand Down Expand Up @@ -252,11 +252,10 @@ void TFReaderSpec::run(o2f::ProcessingContext& ctx)
}

auto tNow = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
auto tDiff = tNow - tLastTF + 2 * deltaSending;
auto tDiff = tNow - tLastTF;
if (mTFCounter && tDiff < mInput.delay_us) {
std::this_thread::sleep_for(std::chrono::microseconds((size_t)(mInput.delay_us - tDiff))); // respect requested delay before sending
}
auto tSend = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
for (auto& msgIt : *tfPtr.get()) {
size_t szPart = acknowledgeOutput(*msgIt.second.get(), false);
dataSize += szPart;
Expand All @@ -269,23 +268,22 @@ void TFReaderSpec::run(o2f::ProcessingContext& ctx)
// however this is a small enough hack for now.
ctx.services().get<o2f::MessageContext>().fakeDispatch();
tNow = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
deltaSending = mTFCounter ? tNow - tLastTF : 0;
LOGP(info, "Sent TF {} of size {} with {} parts, {:.4f} s elapsed from previous TF.", mTFCounter, dataSize, nparts, double(deltaSending) * 1e-6);
deltaSending -= mInput.delay_us;
if (!mTFCounter || deltaSending < 0) {
deltaSending = 0; // correction for next delay
}
LOGP(info, "Sent TF {} of size {} with {} parts, {:.4f} s elapsed from previous TF., WaitSending={}", mTFCounter, dataSize, nparts, mTFCounter ? double(tNow - tLastTF) * 1e-6 : 0., mWaitSendingLast);
tLastTF = tNow;
++mTFCounter;

while (mTFQueue.size() == 0 && mWaitSendingLast) {
usleep(10000);
}
break;
}
if (!mRunning) { // no more TFs will be provided
stopProcessing(ctx);
break;
}
usleep(5000); // wait 5ms for new TF to be built
// usleep(5000); // wait 5ms for new TF to be built
}
if (mTFCounter >= mInput.maxTFs) { // done
if (mTFCounter >= mInput.maxTFs || (!mTFQueue.size() && !mRunning)) { // done
stopProcessing(ctx);
}
}
Expand All @@ -305,10 +303,17 @@ void TFReaderSpec::endOfStream(o2f::EndOfStreamContext& ec)
//___________________________________________________________
void TFReaderSpec::stopProcessing(o2f::ProcessingContext& ctx)
{
static bool stopDone = false;
if (stopDone) {
return;
}
stopDone = true;
LOGP(info, "{} TFs in {} loops were sent, spent {:.2} s in {} data waiting states", mTFCounter, mFileFetcher->getNLoops(), 1e-6 * mTotalWaitTime, mNWaits);
mRunning = false;
mFileFetcher->stop();
mFileFetcher.reset();
if (mFileFetcher) {
mFileFetcher->stop();
mFileFetcher.reset();
}
if (mTFBuilderThread.joinable()) {
mTFBuilderThread.join();
}
Expand Down Expand Up @@ -341,7 +346,9 @@ void TFReaderSpec::TFBuilder()
bool waitAcknowledged = false;
long startWait = 0;
while (mRunning && mDevice) {
LOGP(info, "mTFQueue.size()={} mWaitSendingLast = {}", mTFQueue.size(), mWaitSendingLast);
if (mTFQueue.size() >= size_t(mInput.maxTFCache)) {
mWaitSendingLast = false;
std::this_thread::sleep_for(sleepTime);
continue;
}
Expand All @@ -356,6 +363,7 @@ void TFReaderSpec::TFBuilder()
mFileFetcher->stop();
}
mRunning = false;
mWaitSendingLast = false;
break;
}
if (tfFileName.empty()) {
Expand All @@ -366,6 +374,7 @@ void TFReaderSpec::TFBuilder()
std::this_thread::sleep_for(10ms); // wait for the files cache to be filled
continue;
}
mWaitSendingLast = false;
if (waitAcknowledged) {
long waitTime = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count() - startWait;
mTotalWaitTime += waitTime;
Expand All @@ -383,6 +392,9 @@ void TFReaderSpec::TFBuilder()
{
while (mRunning && mTFBuilderCounter < mInput.maxTFs) {
if (mTFQueue.size() >= size_t(mInput.maxTFCache)) {
if (mTFQueue.size() > 1) {
mWaitSendingLast = false;
}
std::this_thread::sleep_for(sleepTime);
continue;
}
Expand All @@ -393,6 +405,7 @@ void TFReaderSpec::TFBuilder()
if (!mInput.tfIDs.empty()) {
acceptTF = false;
if (mInput.tfIDs[mSelIDEntry] == mTFBuilderCounter) {
mWaitSendingLast = false;
acceptTF = true;
LOGP(info, "Retrieved TF#{} will be pushed as slice {} following user request", mTFBuilderCounter, mSelIDEntry);
mSelIDEntry++;
Expand All @@ -408,6 +421,7 @@ void TFReaderSpec::TFBuilder()
continue;
}
if (mRunning && tf) {
mWaitSendingLast = true;
mTFQueue.push(std::move(tf));
} else {
break;
Expand All @@ -417,10 +431,7 @@ void TFReaderSpec::TFBuilder()
if (mFileFetcher) {
mFileFetcher->popFromQueue(mFileFetcher->getNLoops() >= mInput.maxLoops);
}
} /*catch (...) {
LOGP(error, "Error when building {}-th TF from file {}", locID, tfFileName);
mFileFetcher->popFromQueue(mFileFetcher->getNLoops() >= mInput.maxLoops); // remove failed TF file
} */
}
}
}

Expand Down
Loading