-
Notifications
You must be signed in to change notification settings - Fork 26
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Support running multithreaded: Add an IOSvc, an algorithm for reading…
… and writing for functionals in k4FWCore (#173)
- Loading branch information
Showing
58 changed files
with
3,313 additions
and
409 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
/* | ||
* Copyright (c) 2014-2024 Key4hep-Project. | ||
* | ||
* This file is part of Key4hep. | ||
* See https://key4hep.github.io/key4hep-doc/ for further info. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
#ifndef FWCORE_IIOSVC_H | ||
#define FWCORE_IIOSVC_H | ||
|
||
#include "GaudiKernel/IInterface.h" | ||
|
||
#include "podio/CollectionBase.h" | ||
#include "podio/ROOTWriter.h" | ||
|
||
#include <memory> | ||
#include <vector> | ||
|
||
/** | ||
* The interface implemented by any class making IO and reading RawEvent Data | ||
*/ | ||
class IIOSvc : virtual public IInterface { | ||
public: | ||
struct EndOfInput : std::logic_error { | ||
EndOfInput() : logic_error("Reached end of input while more data were expected"){}; | ||
}; | ||
|
||
public: | ||
/// InterfaceID | ||
DeclareInterfaceID(IIOSvc, 1, 0); | ||
|
||
/** | ||
* @brief Read the next event from the input file | ||
* @return A tuple containing the collections read, the collection names and the frame that owns the collections | ||
*/ | ||
virtual std::tuple<std::vector<std::shared_ptr<podio::CollectionBase>>, std::vector<std::string>, podio::Frame> | ||
next() = 0; | ||
virtual std::shared_ptr<std::vector<std::string>> getCollectionNames() const = 0; | ||
|
||
virtual std::shared_ptr<podio::ROOTWriter> getWriter() = 0; | ||
virtual void deleteWriter() = 0; | ||
virtual void deleteReader() = 0; | ||
virtual bool checkIfWriteCollection(const std::string& collName) = 0; | ||
}; | ||
|
||
#endif |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,159 @@ | ||
/* | ||
* Copyright (c) 2014-2024 Key4hep-Project. | ||
* | ||
* This file is part of Key4hep. | ||
* See https://key4hep.github.io/key4hep-doc/ for further info. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
#include "IOSvc.h" | ||
|
||
#include "podio/Frame.h" | ||
#include "podio/FrameCategories.h" | ||
|
||
#include "k4FWCore/FunctionalUtils.h" | ||
#include "k4FWCore/KeepDropSwitch.h" | ||
|
||
#include "GaudiKernel/AnyDataWrapper.h" | ||
#include "GaudiKernel/IEventProcessor.h" | ||
|
||
#include <mutex> | ||
#include <tuple> | ||
|
||
StatusCode IOSvc::initialize() { | ||
StatusCode sc = Service::initialize(); | ||
if (sc.isFailure()) { | ||
error() << "Unable to initialize base class Service." << endmsg; | ||
return sc; | ||
} | ||
if (!m_readingFileNames.empty()) { | ||
m_reader = std::make_unique<podio::ROOTReader>(); | ||
try { | ||
m_reader->openFiles(m_readingFileNames); | ||
} catch (std::runtime_error& e) { | ||
error() << "Error when opening files: " << e.what() << endmsg; | ||
return StatusCode::FAILURE; | ||
} | ||
m_entries = m_reader->getEntries(podio::Category::Event); | ||
} | ||
|
||
m_switch = KeepDropSwitch(m_outputCommands); | ||
|
||
m_incidentSvc = service("IncidentSvc"); | ||
if (!m_incidentSvc) { | ||
error() << "Unable to locate IIncidentSvc interface" << endmsg; | ||
return StatusCode::FAILURE; | ||
} | ||
m_incidentSvc->addListener(this, IncidentType::EndEvent); | ||
|
||
m_dataSvc = service("EventDataSvc"); | ||
if (!m_dataSvc) { | ||
error() << "Unable to locate the EventDataSvc" << endmsg; | ||
return StatusCode::FAILURE; | ||
} | ||
|
||
m_hiveWhiteBoard = service("EventDataSvc"); | ||
|
||
return StatusCode::SUCCESS; | ||
} | ||
|
||
StatusCode IOSvc::finalize() { return Service::finalize(); } | ||
|
||
std::tuple<std::vector<std::shared_ptr<podio::CollectionBase>>, std::vector<std::string>, podio::Frame> IOSvc::next() { | ||
podio::Frame frame; | ||
{ | ||
std::scoped_lock<std::mutex> lock(m_changeBufferLock); | ||
info() << "m_nextEntry = " << m_nextEntry << " m_entries = " << m_entries << endmsg; | ||
if (m_nextEntry < m_entries) { | ||
frame = podio::Frame(std::move(m_reader->readEntry(podio::Category::Event, m_nextEntry))); | ||
} else { | ||
return std::make_tuple(std::vector<std::shared_ptr<podio::CollectionBase>>(), std::vector<std::string>(), | ||
std::move(frame)); | ||
} | ||
m_nextEntry++; | ||
if (m_collectionNames.empty()) { | ||
m_collectionNames = frame.getAvailableCollections(); | ||
} | ||
} | ||
|
||
if (m_nextEntry >= m_entries) { | ||
// if (true) { | ||
auto ep = serviceLocator()->as<IEventProcessor>(); | ||
StatusCode sc = ep->stopRun(); | ||
if (sc.isFailure()) { | ||
error() << "Error when stopping run" << endmsg; | ||
throw GaudiException("Error when stopping run", name(), StatusCode::FAILURE); | ||
} | ||
info() << "m_nextEntry = " << m_nextEntry << " m_entries = " << m_entries << endmsg; | ||
} | ||
|
||
std::vector<std::shared_ptr<podio::CollectionBase>> collections; | ||
|
||
for (const auto& name : m_collectionNames) { | ||
auto ptr = const_cast<podio::CollectionBase*>(frame.get(name)); | ||
collections.push_back(std::shared_ptr<podio::CollectionBase>(ptr)); | ||
} | ||
|
||
return std::make_tuple(collections, m_collectionNames, std::move(frame)); | ||
} | ||
|
||
// After every event if there is still a frame in the TES | ||
// that means it hasn't been written so the collections inside the Frame | ||
// should be removed so that they are deleted when the Frame is deleted | ||
// and not deleted when clearing the store | ||
void IOSvc::handle(const Incident& incident) { | ||
StatusCode code; | ||
if (m_hiveWhiteBoard) { | ||
if (!incident.context().valid()) { | ||
info() << "No context found in IOSvc" << endmsg; | ||
return; | ||
} | ||
debug() << "Setting store to " << incident.context().slot() << endmsg; | ||
code = m_hiveWhiteBoard->selectStore(incident.context().slot()); | ||
if (code.isFailure()) { | ||
error() << "Error when setting store" << endmsg; | ||
throw GaudiException("Error when setting store", name(), StatusCode::FAILURE); | ||
} | ||
} | ||
DataObject* p; | ||
code = m_dataSvc->retrieveObject("/Event" + k4FWCore::frameLocation, p); | ||
if (code.isFailure()) { | ||
return; | ||
} | ||
|
||
auto frame = dynamic_cast<AnyDataWrapper<podio::Frame>*>(p); | ||
if (!frame) { | ||
error() << "Expected Frame in " << k4FWCore::frameLocation << " but there was something else" << endmsg; | ||
return; | ||
} | ||
for (const auto& coll : frame->getData().getAvailableCollections()) { | ||
DataObject* collPtr; | ||
code = m_dataSvc->retrieveObject("/Event/" + coll, collPtr); | ||
if (code.isSuccess()) { | ||
debug() << "Removing the collection: " << coll << " from the store" << endmsg; | ||
code = m_dataSvc->unregisterObject(collPtr); | ||
} | ||
// else { | ||
// info() << "Collection not found: " << coll << endmsg; | ||
// } | ||
} | ||
} | ||
|
||
void IOSvc::setReadingCollectionNames(const std::vector<std::string>& names) { m_collectionNames = names; } | ||
|
||
void IOSvc::setReadingFileNames(const std::vector<std::string>& names) { m_readingFileNames = names; } | ||
|
||
bool IOSvc::checkIfWriteCollection(const std::string& collName) { return m_switch.isOn(collName); } | ||
|
||
DECLARE_COMPONENT(IOSvc) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
/* | ||
* Copyright (c) 2014-2024 Key4hep-Project. | ||
* | ||
* This file is part of Key4hep. | ||
* See https://key4hep.github.io/key4hep-doc/ for further info. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
#ifndef FWCORE_IOSVC_H | ||
#define FWCORE_IOSVC_H | ||
|
||
#include "Gaudi/Property.h" | ||
#include "GaudiKernel/IDataProviderSvc.h" | ||
#include "GaudiKernel/IHiveWhiteBoard.h" | ||
#include "GaudiKernel/IIncidentListener.h" | ||
#include "GaudiKernel/IIncidentSvc.h" | ||
#include "GaudiKernel/Service.h" | ||
|
||
#include "podio/ROOTReader.h" | ||
#include "podio/ROOTWriter.h" | ||
|
||
#include "IIOSvc.h" | ||
#include "k4FWCore/KeepDropSwitch.h" | ||
|
||
#include <string> | ||
#include <vector> | ||
|
||
class IOSvc : public extends<Service, IIOSvc, IIncidentListener> { | ||
using extends::extends; | ||
|
||
public: | ||
~IOSvc() override = default; | ||
|
||
StatusCode initialize() override; | ||
StatusCode finalize() override; | ||
|
||
std::tuple<std::vector<std::shared_ptr<podio::CollectionBase>>, std::vector<std::string>, podio::Frame> next() | ||
override; | ||
|
||
std::shared_ptr<std::vector<std::string>> getCollectionNames() const override { | ||
return std::make_shared<std::vector<std::string>>(m_collectionNames); | ||
} | ||
|
||
void setReadingCollectionNames(const std::vector<std::string>& names); | ||
void setReadingFileNames(const std::vector<std::string>& names); | ||
|
||
protected: | ||
Gaudi::Property<std::vector<std::string>> m_collectionNames{ | ||
this, "CollectionNames", {}, "List of collections to read"}; | ||
Gaudi::Property<std::vector<std::string>> m_readingFileNames{this, "input", {}, "List of files to read"}; | ||
Gaudi::Property<std::string> m_writingFileName{this, "output", {}, "List of files to write output to"}; | ||
Gaudi::Property<std::vector<std::string>> m_outputCommands{ | ||
this, "outputCommands", {"keep *"}, "A set of commands to declare which collections to keep or drop."}; | ||
Gaudi::Property<std::string> m_inputType{this, "ioType", "ROOT", "Type of input file (ROOT, RNTuple)"}; | ||
|
||
std::mutex m_changeBufferLock; | ||
|
||
KeepDropSwitch m_switch; | ||
|
||
std::unique_ptr<podio::ROOTReader> m_reader{nullptr}; | ||
std::shared_ptr<podio::ROOTWriter> m_writer{nullptr}; | ||
|
||
std::shared_ptr<podio::ROOTWriter> getWriter() override { | ||
if (!m_writer) { | ||
m_writer = std::make_shared<podio::ROOTWriter>(m_writingFileName.value()); | ||
} | ||
return m_writer; | ||
} | ||
|
||
// Gaudi doesn't always run the destructor of the Services so we have to | ||
// manually ask for the writer to be deleted so it will call finish() | ||
void deleteWriter() override { m_writer.reset(); } | ||
void deleteReader() override { m_reader.reset(); } | ||
|
||
SmartIF<IDataProviderSvc> m_dataSvc; | ||
SmartIF<IIncidentSvc> m_incidentSvc; | ||
SmartIF<IHiveWhiteBoard> m_hiveWhiteBoard; | ||
void handle(const Incident& incident) override; | ||
|
||
int m_entries{0}; | ||
int m_nextEntry{0}; | ||
|
||
bool checkIfWriteCollection(const std::string& collName) override; | ||
}; | ||
|
||
#endif |
Oops, something went wrong.