Skip to content

Commit

Permalink
Merge pull request #664 from lukemartinlogan/master
Browse files Browse the repository at this point in the history
Data staging will open and close files immediately
  • Loading branch information
lukemartinlogan authored Feb 5, 2024
2 parents e9f1c35 + 154ec54 commit cf2cc5b
Show file tree
Hide file tree
Showing 18 changed files with 318 additions and 138 deletions.
162 changes: 77 additions & 85 deletions CMake/HermesConfig.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -10,99 +10,91 @@
find_path(
Hermes_INCLUDE_DIR
hermes/hermes_types.h
HINTS ENV PATH ENV CPATH
)
message("Hermes_INCLUDE_DIR: ${Hermes_INCLUDE_DIR}")

if( Hermes_INCLUDE_DIR )
get_filename_component(Hermes_DIR ${Hermes_INCLUDE_DIR} PATH)

#-----------------------------------------------------------------------------
# Find all packages needed by Hermes
#-----------------------------------------------------------------------------
find_library(
Hermes_LIBRARY
NAMES hrun_client hrun_server
)

message("Hermes_LIBRARY: ${Hermes_LIBRARY}")
if (NOT Hermes_INCLUDE_DIR)
message(STATUS "FindHermes: Could not find Hermes.h")
set(Hermes_FOUND FALSE)
return()
endif()
get_filename_component(Hermes_DIR ${Hermes_INCLUDE_DIR} PATH)

# HermesShm
find_package(HermesShm CONFIG REQUIRED)
message(STATUS "found hermes_shm.h at ${HermesShm_INCLUDE_DIRS}")
#-----------------------------------------------------------------------------
# Find all packages needed by Hermes
#-----------------------------------------------------------------------------
find_library(
Hermes_LIBRARY
NAMES hrun_client hrun_server
HINTS ENV LD_LIBRARY_PATH ENV PATH
)
if (NOT Hermes_LIBRARY)
message(STATUS "FindHermes: Could not find libhrun_client.so")
set(Hermes_FOUND FALSE)
message(STATUS "LIBS: $ENV{LD_LIBRARY_PATH}")
return()
endif()

# YAML-CPP
find_package(yaml-cpp REQUIRED)
message(STATUS "found yaml-cpp at ${yaml-cpp_DIR}")
# HermesShm
find_package(HermesShm CONFIG REQUIRED)
message(STATUS "found hermes_shm.h at ${HermesShm_INCLUDE_DIRS}")

# Catch2
find_package(Catch2 3.0.1 REQUIRED)
message(STATUS "found catch2.h at ${Catch2_CXX_INCLUDE_DIRS}")
# YAML-CPP
find_package(yaml-cpp REQUIRED)
message(STATUS "found yaml-cpp at ${yaml-cpp_DIR}")

# MPICH
if(BUILD_MPI_TESTS)
find_package(MPI REQUIRED COMPONENTS C CXX)
message(STATUS "found mpi.h at ${MPI_CXX_INCLUDE_DIRS}")
endif()
# Catch2
find_package(Catch2 3.0.1 REQUIRED)
message(STATUS "found catch2.h at ${Catch2_CXX_INCLUDE_DIRS}")

# OpenMP
if(BUILD_OpenMP_TESTS)
find_package(OpenMP REQUIRED COMPONENTS C CXX)
message(STATUS "found omp.h at ${OpenMP_CXX_INCLUDE_DIRS}")
endif()
# MPICH
if(BUILD_MPI_TESTS)
find_package(MPI REQUIRED COMPONENTS C CXX)
message(STATUS "found mpi.h at ${MPI_CXX_INCLUDE_DIRS}")
endif()

# Cereal
find_package(cereal REQUIRED)
if(cereal)
message(STATUS "found cereal")
endif()
# OpenMP
if(BUILD_OpenMP_TESTS)
find_package(OpenMP REQUIRED COMPONENTS C CXX)
message(STATUS "found omp.h at ${OpenMP_CXX_INCLUDE_DIRS}")
endif()

# Boost
find_package(Boost REQUIRED COMPONENTS regex system filesystem fiber REQUIRED)
if (Boost_FOUND)
message(STATUS "found boost at ${Boost_INCLUDE_DIRS}")
endif()
# Cereal
find_package(cereal REQUIRED)
if(cereal)
message(STATUS "found cereal")
endif()

# Thallium
find_package(thallium CONFIG REQUIRED)
if(thallium_FOUND)
message(STATUS "found thallium at ${thallium_DIR}")
endif()
# Boost
find_package(Boost REQUIRED COMPONENTS regex system filesystem fiber REQUIRED)
if (Boost_FOUND)
message(STATUS "found boost at ${Boost_INCLUDE_DIRS}")
endif()

#-----------------------------------------------------------------------------
# Mark hermes as found and set all needed packages
#-----------------------------------------------------------------------------
if( Hermes_LIBRARY )
set(Hermes_LIBRARY_DIR "")
get_filename_component(Hermes_LIBRARY_DIRS ${Hermes_LIBRARY} PATH)
# Set uncached variables as per standard.
set(Hermes_FOUND ON)
set(Hermes_INCLUDE_DIRS ${Boost_INCLUDE_DIRS} ${Hermes_INCLUDE_DIR})
set(Hermes_LIBRARIES
${HermesShm_LIBRARIES}
yaml-cpp
cereal::cereal
-ldl -lrt -lc -pthread
thallium
hermes
${Boost_LIBRARIES} ${Hermes_LIBRARY})
set(Hermes_CLIENT_LIBRARIES ${Hermes_LIBRARIES})
set(Hermes_RUNTIME_LIBRARIES
${Hermes_CLIENT_LIBRARIES}
hrun_runtime
${Boost_LIBRARIES})
set(Hermes_RUNTIME_DEPS "")
endif(Hermes_LIBRARY)
# Thallium
find_package(thallium CONFIG REQUIRED)
if(thallium_FOUND)
message(STATUS "found thallium at ${thallium_DIR}")
endif()

else(Hermes_INCLUDE_DIR)
message(STATUS "FindHermes: Could not find Hermes.h")
endif(Hermes_INCLUDE_DIR)

if(Hermes_FOUND)
if(NOT Hermes_FIND_QUIETLY)
message(STATUS "FindHermes: Found both Hermes.h and libhrun_client.so")
endif(NOT Hermes_FIND_QUIETLY)
else(Hermes_FOUND)
if(Hermes_FIND_REQUIRED)
message(STATUS "FindHermes: Could not find Hermes.h and/or libhrun_client.so")
endif(Hermes_FIND_REQUIRED)
endif(Hermes_FOUND)
#-----------------------------------------------------------------------------
# Mark hermes as found and set all needed packages
#-----------------------------------------------------------------------------
set(Hermes_LIBRARY_DIR "")
get_filename_component(Hermes_LIBRARY_DIRS ${Hermes_LIBRARY} PATH)
# Set uncached variables as per standard.
set(Hermes_FOUND ON)
set(Hermes_INCLUDE_DIRS ${Boost_INCLUDE_DIRS} ${Hermes_INCLUDE_DIR})
set(Hermes_LIBRARIES
${HermesShm_LIBRARIES}
yaml-cpp
cereal::cereal
-ldl -lrt -lc -pthread
thallium
hermes
${Boost_LIBRARIES} ${Hermes_LIBRARY})
set(Hermes_CLIENT_LIBRARIES ${Hermes_LIBRARIES})
set(Hermes_RUNTIME_LIBRARIES
${Hermes_CLIENT_LIBRARIES}
hrun_runtime
${Boost_LIBRARIES})
set(Hermes_RUNTIME_DEPS "")
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class Client : public TaskLibClient {
LPointer<ConstructTask> task =
AsyncCreateRoot(std::forward<Args>(args)...);
task->Wait();
Init(id_, HRUN_ADMIN->queue_id_);
Init(task->id_, HRUN_ADMIN->queue_id_);
HRUN_CLIENT->DelTask(task);
}

Expand Down
24 changes: 10 additions & 14 deletions include/hermes/bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -389,28 +389,21 @@ class Bucket {
/**
* Reorganize a blob to a new score or node
* */
void ReorganizeBlob(const BlobId &blob_id,
float score) {
blob_mdm_->AsyncReorganizeBlobRoot(id_, blob_id, score, 0, true);
}

/**
* Reorganize a blob to a new score or node
* */
void ReorganizeBlob(const BlobId &blob_id,
void ReorganizeBlob(const std::string &name,
float score,
Context &ctx) {
blob_mdm_->AsyncReorganizeBlobRoot(id_, blob_id, score, 0, true);
const Context &ctx = Context()) {
blob_mdm_->AsyncReorganizeBlobRoot(
id_, hshm::charbuf(name), BlobId::GetNull(), score, true, ctx);
}

/**
* Reorganize a blob to a new score or node
* */
void ReorganizeBlob(const BlobId &blob_id,
float score,
u32 node_id,
Context &ctx) {
blob_mdm_->AsyncReorganizeBlobRoot(id_, blob_id, score, node_id, true);
const Context &ctx = Context()) {
blob_mdm_->AsyncReorganizeBlobRoot(
id_, hshm::charbuf(""), blob_id, score, true, ctx);
}

/**
Expand Down Expand Up @@ -494,6 +487,9 @@ class Bucket {
Context &ctx) {
Blob blob;
BlobId blob_id = BaseGet(blob_name, orig_blob_id, blob, 0, ctx);
if (blob.size() == 0) {
return BlobId::GetNull();
}
std::stringstream ss(std::string(blob.data(), blob.size()));
cereal::BinaryInputArchive ar(ss);
ar >> data;
Expand Down
4 changes: 3 additions & 1 deletion include/hermes/config_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ class ConfigurationManager {
op_mdm_.CreateRoot(DomainId::GetGlobal(), "hermes_op_mdm",
bkt_mdm_.id_, blob_mdm_.id_);
stager_mdm_.CreateRoot(DomainId::GetGlobal(),
"hermes_stager_mdm", blob_mdm_.id_);
"hermes_stager_mdm",
blob_mdm_.id_,
bkt_mdm_.id_);
blob_mdm_.SetBucketMdmRoot(DomainId::GetGlobal(),
bkt_mdm_.id_,
stager_mdm_.id_, op_mdm_.id_);
Expand Down
28 changes: 26 additions & 2 deletions tasks/data_stager/include/data_stager/data_stager.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ class Client : public TaskLibClient {
LPointer<ConstructTask> AsyncCreate(const TaskNode &task_node,
const DomainId &domain_id,
const std::string &state_name,
const TaskStateId &blob_mdm) {
const TaskStateId &blob_mdm,
const TaskStateId &bkt_mdm) {
id_ = TaskStateId::GetNull();
QueueManagerInfo &qm = HRUN_CLIENT->server_config_.queue_manager_;
std::vector<PriorityInfo> queue_info;
return HRUN_ADMIN->AsyncCreateTaskState<ConstructTask>(
task_node, domain_id, state_name, id_, queue_info, blob_mdm);
task_node, domain_id, state_name, id_, queue_info, blob_mdm, bkt_mdm);
}
HRUN_TASK_NODE_ROOT(AsyncCreate)
template<typename ...Args>
Expand Down Expand Up @@ -132,6 +133,29 @@ class Client : public TaskLibClient {
}
HRUN_TASK_NODE_PUSH_ROOT(StageOut);

/** Stage out data to a remote source */
HSHM_ALWAYS_INLINE
void AsyncUpdateSizeConstruct(UpdateSizeTask *task,
const TaskNode &task_node,
const BucketId &bkt_id,
const hshm::charbuf &blob_name,
size_t blob_off,
size_t data_size,
u32 task_flags) {
HRUN_CLIENT->ConstructTask<UpdateSizeTask>(
task, task_node, id_, bkt_id,
blob_name, blob_off, data_size, task_flags);
}
HSHM_ALWAYS_INLINE
void UpdateSizeRoot(const BucketId &bkt_id,
const hshm::charbuf &blob_name,
size_t blob_off,
size_t data_size,
u32 task_flags) {
AsyncUpdateSizeRoot(bkt_id, blob_name, blob_off, data_size, task_flags);
}
HRUN_TASK_NODE_PUSH_ROOT(UpdateSize);

/** Parse url */
static inline
bool GetUrlProtocolAndAction(const std::string &url,
Expand Down
Loading

0 comments on commit cf2cc5b

Please sign in to comment.