diff --git a/ci/hermes/packages/hermes_shm/package.py b/ci/hermes/packages/hermes_shm/package.py index fab60e16f..596d448c1 100644 --- a/ci/hermes/packages/hermes_shm/package.py +++ b/ci/hermes/packages/hermes_shm/package.py @@ -4,8 +4,6 @@ class HermesShm(CMakePackage): homepage = "https://github.com/lukemartinlogan/hermes_shm/wiki" git = "https://github.com/lukemartinlogan/hermes_shm.git" version('master', branch='master') - version("1.1.0", sha256="080d5361cff22794b670e4544c532926ca8b6d6ec695af25596efe035bfffea5") - version("1.0.0", sha256="a79f01d531ce89985ad59a2f62b41d74c2385e48d929e2f4ad895ae34137573b") # Main variants variant('debug', default=False, description='Build shared libraries') diff --git a/hrun/include/hrun/api/hrun_client.h b/hrun/include/hrun/api/hrun_client.h index d9826a946..45eb8822e 100644 --- a/hrun/include/hrun/api/hrun_client.h +++ b/hrun/include/hrun/api/hrun_client.h @@ -79,6 +79,8 @@ class Client : public ConfigurationManager { qm.shm_name_); mem_mngr->AttachBackend(hipc::MemoryBackendType::kPosixShmMmap, qm.data_shm_name_); + mem_mngr->AttachBackend(hipc::MemoryBackendType::kPosixShmMmap, + qm.rdata_shm_name_); } main_alloc_ = mem_mngr->GetAllocator(main_alloc_id_); data_alloc_ = mem_mngr->GetAllocator(data_alloc_id_); diff --git a/hrun/include/hrun/hrun_map.h b/hrun/include/hrun/hrun_map.h new file mode 100644 index 000000000..e86030bc2 --- /dev/null +++ b/hrun/include/hrun/hrun_map.h @@ -0,0 +1,123 @@ +// +// Created by lukemartinlogan on 2/20/24. +// + +#ifndef HERMES_HRUN_INCLUDE_HRUN_HRUN_MAP_H_ +#define HERMES_HRUN_INCLUDE_HRUN_HRUN_MAP_H_ + +#include "hrun_types.h" + +namespace hrun { + +template +class LockFreeMap; + +/** + * Iterator for lock-free map + * */ +template +struct LockFreeMapIterator { + public: + using Pair = hipc::pair; + size_t x_, y_; + Pair *val_; + + bool operator==(const LockFreeMapIterator &other) const { + return x_ == other.x_ && y_ == other.y_; + } + + bool operator!=(const LockFreeMapIterator &other) const { + return x_ != other.x_ || y_ != other.y_; + } +}; + +/** + * A lock-free unordered map implementation + * */ +template +class LockFreeMap { + public: + using Pair = hipc::pair; + using Bucket = hipc::mpsc_queue; + hipc::mptr> map_; + int num_lanes_; + size_t bkts_per_lane_; + + void Init(hipc::Allocator *alloc, int num_lanes, size_t bkts_per_lane, + size_t collisions) { + map_ = hipc::make_mptr>( + alloc, num_lanes * bkts_per_lane, collisions); + } + + void Connect(const hipc::Pointer &p) { + map_ = hipc::mptr>( + p); + } + + size_t FirstBucket(int lane_id) const { + return lane_id * bkts_per_lane_; + } + + size_t LastBucket(int lane_id) const { + return FirstBucket(lane_id) + bkts_per_lane_; + } + + Bucket& GetBucket(size_t bkt_id) { + return (*map_)[bkt_id]; + } + + hipc::Pointer GetShmPointer() { + return map_->template GetShmPointer(); + } + + void emplace(const Key &key, const T &val) { + size_t hash = std::hash{}(key); + size_t x = hash % map_->size(); + Bucket &bkt = (*map_)[x]; + bkt.emplace(key, val); + } + + LockFreeMapIterator find(const Key &key) { + LockFreeMapIterator it; + size_t hash = std::hash{}(key); + it.x_ = hash % map_->size(); + Bucket &bkt = (*map_)[it.x_]; + size_t head = bkt.head_; + size_t tail = bkt.tail_; + size_t bkt_size = 0; + if (head <= tail) { + bkt_size = tail - head; + } + for (size_t i = 0; i < bkt_size; ++i) { + Pair *val; + if (bkt.peek(val, i).IsNull()) { + continue; + } + it.y_ = i; + it.val_ = val; + return it; + } + it.x_ = map_->size(); + it.y_ = -1; + return it; + } + + T& operator[](const Key &key) { + LockFreeMapIterator it = find(key); + if (it.x_ == map_->size()) { + throw std::runtime_error("Key not found"); + } + return *it.val_->second_; + } + + LockFreeMapIterator end() const { + LockFreeMapIterator it; + it.x_ = map_->size(); + it.y_ = -1; + return it; + } +}; + +} // namespace hshm + +#endif //HERMES_HRUN_INCLUDE_HRUN_HRUN_MAP_H_ diff --git a/include/hermes/bucket.h b/include/hermes/bucket.h index effbfda14..2fb680407 100644 --- a/include/hermes/bucket.h +++ b/include/hermes/bucket.h @@ -465,29 +465,55 @@ class Bucket { * Get \a blob_id Blob from the bucket (sync) * */ BlobId BaseGet(const std::string &blob_name, - const BlobId &orig_blob_id, + BlobId blob_id, Blob &blob, size_t blob_off, Context &ctx) { - // TODO(llogan): intercept mmap to avoid copy - size_t data_size = blob.size(); - if (blob.size() == 0) { - data_size = blob_mdm_->GetBlobSizeRoot( - id_, hshm::charbuf(blob_name), orig_blob_id); - blob.resize(data_size); + // Get the blob ID + if (blob_id.IsNull()) { + auto &blob_id_map = HERMES_CONF->blob_mdm_.blob_id_map_; + auto blob_name_buf = blob_mdm::Client::GetBlobNameWithBucket(id_, blob_name); + auto it = blob_id_map.find(*blob_name_buf); + if (it != blob_id_map.end()) { + blob_id = *it.val_->second_; + } + } + if (!blob_id.IsNull()) { + auto &blob_map = HERMES_CONF->blob_mdm_.blob_map_; + auto it = blob_map.find(blob_id); + if (it != blob_map.end()) { + BlobInfo &blob_info = *it.val_->second_; + if (blob_off + blob.size() > blob_info.blob_size_) { + if (blob_info.blob_size_ < blob_off) { + return BlobId::GetNull(); + } + blob.resize(blob_info.blob_size_ - blob_off); + } + char *data = HRUN_CLIENT->GetDataPointer(blob_info.data_.shm_); + memcpy(blob.data(), data + blob_off, blob.size()); + return blob_id; + } + } else { + size_t data_size = blob.size(); + if (blob.size() == 0) { + data_size = blob_mdm_->GetBlobSizeRoot( + id_, hshm::charbuf(blob_name), blob_id); + blob.resize(data_size); + } + HILOG(kDebug, "Getting blob of size {}", data_size); + BlobId blob_id; + LPointer> push_task; + push_task = AsyncBaseGet(blob_name, blob_id, blob, blob_off, ctx); + push_task->Wait(); + GetBlobTask *task = push_task->get(); + blob_id = task->blob_id_; + char *data = HRUN_CLIENT->GetDataPointer(task->data_); + memcpy(blob.data(), data, task->data_size_); + blob.resize(task->data_size_); + HRUN_CLIENT->FreeBuffer(task->data_); + HRUN_CLIENT->DelTask(push_task); + return blob_id; } - HILOG(kDebug, "Getting blob of size {}", data_size); - BlobId blob_id; - LPointer> push_task; - push_task = AsyncBaseGet(blob_name, orig_blob_id, blob, blob_off, ctx); - push_task->Wait(); - GetBlobTask *task = push_task->get(); - blob_id = task->blob_id_; - char *data = HRUN_CLIENT->GetDataPointer(task->data_); - memcpy(blob.data(), data, task->data_size_); - blob.resize(task->data_size_); - HRUN_CLIENT->FreeBuffer(task->data_); - HRUN_CLIENT->DelTask(push_task); return blob_id; } diff --git a/include/hermes/config_manager.h b/include/hermes/config_manager.h index 525bd26a2..1877e1c67 100644 --- a/include/hermes/config_manager.h +++ b/include/hermes/config_manager.h @@ -23,6 +23,7 @@ #include "hermes/config_server.h" #include "data_stager/data_stager.h" + namespace hermes { class ConfigurationManager { @@ -49,6 +50,7 @@ class ConfigurationManager { LoadServerConfig(config_path); mdm_.CreateRoot(DomainId::GetGlobal(), "hermes_mdm"); blob_mdm_.CreateRoot(DomainId::GetGlobal(), "hermes_blob_mdm"); + blob_mdm_.GetLocalTablesRoot(DomainId::GetLocal()); bkt_mdm_.CreateRoot(DomainId::GetGlobal(), "hermes_bkt_mdm"); op_mdm_.CreateRoot(DomainId::GetGlobal(), "hermes_op_mdm", bkt_mdm_.id_, blob_mdm_.id_); diff --git a/include/hermes/hermes_types.h b/include/hermes/hermes_types.h index de8fd8cbe..74b993085 100644 --- a/include/hermes/hermes_types.h +++ b/include/hermes/hermes_types.h @@ -284,6 +284,7 @@ struct BlobInfo { std::atomic mod_count_; /**< The number of times blob modified */ std::atomic last_flush_; /**< The last mod that was flushed */ bitfield32_t flags_; /**< Flags */ + LPointer data_; /** Serialization */ template diff --git a/tasks/hermes_blob_mdm/include/hermes_blob_mdm/hermes_blob_mdm.h b/tasks/hermes_blob_mdm/include/hermes_blob_mdm/hermes_blob_mdm.h index 6bb285e14..78a81969d 100644 --- a/tasks/hermes_blob_mdm/include/hermes_blob_mdm/hermes_blob_mdm.h +++ b/tasks/hermes_blob_mdm/include/hermes_blob_mdm/hermes_blob_mdm.h @@ -6,11 +6,41 @@ #define HRUN_hermes_blob_mdm_H_ #include "hermes_blob_mdm_tasks.h" +#include "hrun/hrun_map.h" namespace hermes::blob_mdm { +typedef hrun::LockFreeMap BLOB_ID_MAP_T; +typedef hrun::LockFreeMap BLOB_MAP_T; + /** Create hermes_blob_mdm requests */ class Client : public TaskLibClient { + public: + BLOB_ID_MAP_T blob_id_map_; + BLOB_MAP_T blob_map_; + + public: + /** Get the globally unique blob name */ + static hipc::uptr + GetBlobNameWithBucket(TagId tag_id, const hshm::charbuf &blob_name) { + auto new_name = hshm::charbuf( + sizeof(TagId) + blob_name.size()); + hrun::LocalSerialize srl(new_name); + srl << tag_id; + srl << blob_name; + return hipc::make_uptr(new_name); + } + + /** Get the globally unique blob name */ + static hipc::uptr + GetBlobNameWithBucket(TagId tag_id, const std::string &blob_name) { + auto new_name = hshm::charbuf( + sizeof(TagId) + blob_name.size()); + hrun::LocalSerialize srl(new_name); + srl << tag_id; + srl << blob_name; + return hipc::make_uptr(new_name); + } public: /** Default constructor */ @@ -56,6 +86,24 @@ class Client : public TaskLibClient { * Blob Operations * ===================================*/ + /** Sets the BUCKET MDM */ + void AsyncGetLocalTablesConstruct(GetLocalTablesTask *task, + const TaskNode &task_node, + const DomainId &domain_id) { + HRUN_CLIENT->ConstructTask( + task, task_node, domain_id, id_); + } + void GetLocalTablesRoot(const DomainId &domain_id) { + LPointer> push_task = + AsyncGetLocalTablesRoot(domain_id); + push_task->Wait(); + GetLocalTablesTask *task = push_task->get(); + blob_id_map_.Connect(task->blob_id_map_); + blob_map_.Connect(task->blob_map_); + HRUN_CLIENT->DelTask(push_task); + } + HRUN_TASK_NODE_PUSH_ROOT(GetLocalTables); + /** Sets the BUCKET MDM */ void AsyncSetBucketMdmConstruct(SetBucketMdmTask *task, const TaskNode &task_node, diff --git a/tasks/hermes_blob_mdm/include/hermes_blob_mdm/hermes_blob_mdm_lib_exec.h b/tasks/hermes_blob_mdm/include/hermes_blob_mdm/hermes_blob_mdm_lib_exec.h index 88abb5f58..82e4dca33 100644 --- a/tasks/hermes_blob_mdm/include/hermes_blob_mdm/hermes_blob_mdm_lib_exec.h +++ b/tasks/hermes_blob_mdm/include/hermes_blob_mdm/hermes_blob_mdm_lib_exec.h @@ -84,6 +84,10 @@ void Run(u32 method, Task *task, RunContext &rctx) override { PollTargetMetadata(reinterpret_cast(task), rctx); break; } + case Method::kGetLocalTables: { + GetLocalTables(reinterpret_cast(task), rctx); + break; + } } } /** Execute a task */ @@ -169,6 +173,10 @@ void Monitor(u32 mode, Task *task, RunContext &rctx) override { MonitorPollTargetMetadata(mode, reinterpret_cast(task), rctx); break; } + case Method::kGetLocalTables: { + MonitorGetLocalTables(mode, reinterpret_cast(task), rctx); + break; + } } } /** Delete a task */ @@ -254,6 +262,10 @@ void Del(u32 method, Task *task) override { HRUN_CLIENT->DelTask(reinterpret_cast(task)); break; } + case Method::kGetLocalTables: { + HRUN_CLIENT->DelTask(reinterpret_cast(task)); + break; + } } } /** Duplicate a task */ @@ -339,6 +351,10 @@ void Dup(u32 method, Task *orig_task, std::vector> &dups) overrid hrun::CALL_DUPLICATE(reinterpret_cast(orig_task), dups); break; } + case Method::kGetLocalTables: { + hrun::CALL_DUPLICATE(reinterpret_cast(orig_task), dups); + break; + } } } /** Register the duplicate output with the origin task */ @@ -424,6 +440,10 @@ void DupEnd(u32 method, u32 replica, Task *orig_task, Task *dup_task) override { hrun::CALL_DUPLICATE_END(replica, reinterpret_cast(orig_task), reinterpret_cast(dup_task)); break; } + case Method::kGetLocalTables: { + hrun::CALL_DUPLICATE_END(replica, reinterpret_cast(orig_task), reinterpret_cast(dup_task)); + break; + } } } /** Ensure there is space to store replicated outputs */ @@ -509,6 +529,10 @@ void ReplicateStart(u32 method, u32 count, Task *task) override { hrun::CALL_REPLICA_START(count, reinterpret_cast(task)); break; } + case Method::kGetLocalTables: { + hrun::CALL_REPLICA_START(count, reinterpret_cast(task)); + break; + } } } /** Determine success and handle failures */ @@ -594,6 +618,10 @@ void ReplicateEnd(u32 method, Task *task) override { hrun::CALL_REPLICA_END(reinterpret_cast(task)); break; } + case Method::kGetLocalTables: { + hrun::CALL_REPLICA_END(reinterpret_cast(task)); + break; + } } } /** Serialize a task when initially pushing into remote */ @@ -679,6 +707,10 @@ std::vector SaveStart(u32 method, BinaryOutputArchive &ar, T ar << *reinterpret_cast(task); break; } + case Method::kGetLocalTables: { + ar << *reinterpret_cast(task); + break; + } } return ar.Get(); } @@ -786,6 +818,11 @@ TaskPointer LoadStart(u32 method, BinaryInputArchive &ar) override { ar >> *reinterpret_cast(task_ptr.ptr_); break; } + case Method::kGetLocalTables: { + task_ptr.ptr_ = HRUN_CLIENT->NewEmptyTask(task_ptr.shm_); + ar >> *reinterpret_cast(task_ptr.ptr_); + break; + } } return task_ptr; } @@ -872,6 +909,10 @@ std::vector SaveEnd(u32 method, BinaryOutputArchive &ar, Ta ar << *reinterpret_cast(task); break; } + case Method::kGetLocalTables: { + ar << *reinterpret_cast(task); + break; + } } return ar.Get(); } @@ -958,6 +999,10 @@ void LoadEnd(u32 replica, u32 method, BinaryInputArchive &ar, Task *task) ar.Deserialize(replica, *reinterpret_cast(task)); break; } + case Method::kGetLocalTables: { + ar.Deserialize(replica, *reinterpret_cast(task)); + break; + } } } /** Get the grouping of the task */ @@ -1023,6 +1068,9 @@ u32 GetGroup(u32 method, Task *task, hshm::charbuf &group) override { case Method::kPollTargetMetadata: { return reinterpret_cast(task)->GetGroup(group); } + case Method::kGetLocalTables: { + return reinterpret_cast(task)->GetGroup(group); + } } return -1; } diff --git a/tasks/hermes_blob_mdm/include/hermes_blob_mdm/hermes_blob_mdm_methods.h b/tasks/hermes_blob_mdm/include/hermes_blob_mdm/hermes_blob_mdm_methods.h index 7bd0cc751..1399f9223 100644 --- a/tasks/hermes_blob_mdm/include/hermes_blob_mdm/hermes_blob_mdm_methods.h +++ b/tasks/hermes_blob_mdm/include/hermes_blob_mdm/hermes_blob_mdm_methods.h @@ -21,6 +21,7 @@ struct Method : public TaskMethod { TASK_METHOD_T kFlushData = kLast + 17; TASK_METHOD_T kPollBlobMetadata = kLast + 18; TASK_METHOD_T kPollTargetMetadata = kLast + 19; + TASK_METHOD_T kGetLocalTables = kLast + 20; }; #endif // HRUN_HERMES_BLOB_MDM_METHODS_H_ \ No newline at end of file diff --git a/tasks/hermes_blob_mdm/include/hermes_blob_mdm/hermes_blob_mdm_methods.yaml b/tasks/hermes_blob_mdm/include/hermes_blob_mdm/hermes_blob_mdm_methods.yaml index a886bf876..a50ed7529 100644 --- a/tasks/hermes_blob_mdm/include/hermes_blob_mdm/hermes_blob_mdm_methods.yaml +++ b/tasks/hermes_blob_mdm/include/hermes_blob_mdm/hermes_blob_mdm_methods.yaml @@ -17,4 +17,5 @@ kReorganizeBlob: 15 kSetBucketMdm: 16 kFlushData: 17 kPollBlobMetadata: 18 -kPollTargetMetadata: 19 \ No newline at end of file +kPollTargetMetadata: 19 +kGetLocalTables: 20 \ No newline at end of file diff --git a/tasks/hermes_blob_mdm/include/hermes_blob_mdm/hermes_blob_mdm_tasks.h b/tasks/hermes_blob_mdm/include/hermes_blob_mdm/hermes_blob_mdm_tasks.h index 0e217aee2..52fe1737f 100644 --- a/tasks/hermes_blob_mdm/include/hermes_blob_mdm/hermes_blob_mdm_tasks.h +++ b/tasks/hermes_blob_mdm/include/hermes_blob_mdm/hermes_blob_mdm_tasks.h @@ -152,6 +152,36 @@ struct SetBucketMdmTask : public Task, TaskFlags { void ReplicateEnd() {} }; +/** Set the BUCKET MDM ID */ +struct GetLocalTablesTask : public Task, TaskFlags { + OUT hipc::Pointer blob_id_map_; + OUT hipc::Pointer blob_map_; + + /** SHM default constructor */ + HSHM_ALWAYS_INLINE explicit + GetLocalTablesTask(hipc::Allocator *alloc) : Task(alloc) {} + + /** Emplace constructor */ + HSHM_ALWAYS_INLINE explicit + GetLocalTablesTask(hipc::Allocator *alloc, + const TaskNode &task_node, + const DomainId &domain_id, + const TaskStateId &state_id) : Task(alloc) { + // Initialize task + task_node_ = task_node; + lane_hash_ = 0; + prio_ = TaskPrio::kAdmin; + task_state_ = state_id; + method_ = Method::kGetLocalTables; + task_flags_.SetBits(TASK_LOW_LATENCY); + domain_id_ = domain_id; + } + + /** Destructor */ + ~GetLocalTablesTask() {} +}; + + /**==================================== * Blob Operations * ===================================*/ diff --git a/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc b/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc index 1aa9e3bfe..55b4eb5e9 100644 --- a/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc +++ b/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc @@ -12,12 +12,13 @@ #include "data_stager/data_stager.h" #include "hermes_data_op/hermes_data_op.h" #include "hermes/score_histogram.h" +#include "hrun/hrun_map.h" namespace hermes::blob_mdm { /** Type name simplification for the various map types */ -typedef std::unordered_map BLOB_ID_MAP_T; -typedef std::unordered_map BLOB_MAP_T; +typedef hrun::LockFreeMap BLOB_ID_MAP_T; +typedef hrun::LockFreeMap BLOB_MAP_T; typedef hipc::mpsc_queue IO_PATTERN_LOG_T; class Server : public TaskLib { @@ -30,8 +31,10 @@ class Server : public TaskLib { /**==================================== * Maps * ===================================*/ - std::vector blob_id_map_; - std::vector blob_map_; + // std::vector blob_id_map_; + // std::vector blob_map_; + BLOB_ID_MAP_T blob_id_map_; + BLOB_MAP_T blob_map_; std::atomic id_alloc_; /**==================================== @@ -62,8 +65,10 @@ class Server : public TaskLib { id_alloc_ = 0; node_id_ = HRUN_CLIENT->node_id_; // Initialize blob maps - blob_id_map_.resize(HRUN_QM_RUNTIME->max_lanes_); - blob_map_.resize(HRUN_QM_RUNTIME->max_lanes_); + blob_id_map_.Init(HRUN_CLIENT->main_alloc_, + HRUN_QM_RUNTIME->max_lanes_, 65536, 32); + blob_map_.Init(HRUN_CLIENT->main_alloc_, + HRUN_QM_RUNTIME->max_lanes_, 65536, 32); // Initialize targets target_tasks_.reserve(HERMES_SERVER_CONF.devices_.size()); for (DeviceInfo &dev : HERMES_SERVER_CONF.devices_) { @@ -121,17 +126,18 @@ class Server : public TaskLib { void MonitorDestruct(u32 mode, DestructTask *task, RunContext &rctx) { } - private: - /** Get the globally unique blob name */ - const hshm::charbuf GetBlobNameWithBucket(TagId tag_id, const hshm::charbuf &blob_name) { - hshm::charbuf new_name(sizeof(TagId) + blob_name.size()); - hrun::LocalSerialize srl(new_name); - srl << tag_id; - srl << blob_name; - return new_name; + public: + /** + * Get local tables map + * */ + void GetLocalTables(GetLocalTablesTask *task, RunContext &rctx) { + task->blob_id_map_ = blob_id_map_.GetShmPointer(); + task->blob_map_ = blob_map_.GetShmPointer(); + task->SetModuleComplete(); + } + void MonitorGetLocalTables(u32 mode, GetLocalTablesTask *task, RunContext &rctx) { } - public: /** * Set the Bucket MDM * */ @@ -264,57 +270,67 @@ class Server : public TaskLib { hshm::Timepoint now; now.Now(); // Get the blob info data structure - BLOB_MAP_T &blob_map = blob_map_[rctx.lane_id_]; + BLOB_MAP_T &blob_map = blob_map_; std::vector stage_tasks; stage_tasks.reserve(256); - for (auto &it : blob_map) { - BlobInfo &blob_info = it.second; - // Update blob scores - float new_score = MakeScore(blob_info, now); - blob_info.score_ = new_score; - if (ShouldReorganize(blob_info, new_score, task->task_node_)) { - Context ctx; - LPointer reorg_task = - blob_mdm_.AsyncReorganizeBlob(task->task_node_ + 1, - blob_info.tag_id_, - hshm::charbuf(""), - blob_info.blob_id_, - new_score, false, ctx, - TASK_LOW_LATENCY); - reorg_task->Wait(task); - HRUN_CLIENT->DelTask(reorg_task); - } - blob_info.access_freq_ = 0; - - // Flush data - FlushInfo flush_info; - flush_info.blob_info_ = &blob_info; - flush_info.mod_count_ = blob_info.mod_count_; - if (blob_info.last_flush_ > 0 && - flush_info.mod_count_ > blob_info.last_flush_) { - HILOG(kDebug, "Flushing blob {} (mod_count={}, last_flush={})", - blob_info.blob_id_, flush_info.mod_count_, blob_info.last_flush_); - LPointer data = HRUN_CLIENT->AllocateBufferServer( - blob_info.blob_size_, task); - LPointer get_blob = - blob_mdm_.AsyncGetBlob(task->task_node_ + 1, - blob_info.tag_id_, - blob_info.name_, - blob_info.blob_id_, - 0, blob_info.blob_size_, - data.shm_); - get_blob->Wait(task); - HRUN_CLIENT->DelTask(get_blob); - flush_info.stage_task_ = - stager_mdm_.AsyncStageOut(task->task_node_ + 1, - blob_info.tag_id_, - blob_info.name_, - data.shm_, blob_info.blob_size_, - TASK_DATA_OWNER); - stage_tasks.emplace_back(flush_info); - } - if (stage_tasks.size() == 256) { - break; + for (size_t bkt_id = blob_map.FirstBucket(rctx.lane_id_); + bkt_id < blob_map.LastBucket(rctx.lane_id_); ++bkt_id) { + BLOB_MAP_T::Bucket &bkt = blob_map.GetBucket(bkt_id); + BLOB_MAP_T::Pair *blob_infop; + size_t i = 0; + while (!bkt.peek(blob_infop, i).IsNull()) { + BlobInfo &blob_info = *blob_infop->second_; + // Update blob scores + float new_score = MakeScore(blob_info, now); + blob_info.score_ = new_score; + if (ShouldReorganize(blob_info, new_score, task->task_node_)) { + Context ctx; + LPointer reorg_task = + blob_mdm_.AsyncReorganizeBlob(task->task_node_ + 1, + blob_info.tag_id_, + hshm::charbuf(""), + blob_info.blob_id_, + new_score, false, ctx, + TASK_LOW_LATENCY); + reorg_task->Wait(task); + HRUN_CLIENT->DelTask(reorg_task); + } + blob_info.access_freq_ = 0; + + // Flush data + FlushInfo flush_info; + flush_info.blob_info_ = &blob_info; + flush_info.mod_count_ = blob_info.mod_count_; + if (blob_info.last_flush_ > 0 && + flush_info.mod_count_ > blob_info.last_flush_) { + HILOG(kDebug, + "Flushing blob {} (mod_count={}, last_flush={})", + blob_info.blob_id_, + flush_info.mod_count_, + blob_info.last_flush_); + LPointer + data = HRUN_CLIENT->AllocateBufferServer( + blob_info.blob_size_, task); + LPointer get_blob = + blob_mdm_.AsyncGetBlob(task->task_node_ + 1, + blob_info.tag_id_, + blob_info.name_, + blob_info.blob_id_, + 0, blob_info.blob_size_, + data.shm_); + get_blob->Wait(task); + HRUN_CLIENT->DelTask(get_blob); + flush_info.stage_task_ = + stager_mdm_.AsyncStageOut(task->task_node_ + 1, + blob_info.tag_id_, + blob_info.name_, + data.shm_, blob_info.blob_size_, + TASK_DATA_OWNER); + stage_tasks.emplace_back(flush_info); + } + if (stage_tasks.size() == 256) { + break; + } } } @@ -326,13 +342,19 @@ class Server : public TaskLib { } } void MonitorFlushData(u32 mode, FlushDataTask *task, RunContext &rctx) { - BLOB_MAP_T &blob_map = blob_map_[rctx.lane_id_]; - for (auto &it : blob_map) { - BlobInfo &blob_info = it.second; - if (blob_info.last_flush_ > 0 && - blob_info.mod_count_ > blob_info.last_flush_) { - rctx.flush_->count_ += 1; - return; + BLOB_MAP_T &blob_map = blob_map_; + for (size_t bkt_id = blob_map.FirstBucket(rctx.lane_id_); + bkt_id < blob_map.LastBucket(rctx.lane_id_); ++bkt_id) { + BLOB_MAP_T::Bucket &bkt = blob_map.GetBucket(bkt_id); + BLOB_MAP_T::Pair *blob_infop; + size_t i = 0; + while (!bkt.peek(blob_infop, i).IsNull()) { + BlobInfo &blob_info = *blob_infop->second_; + if (blob_info.last_flush_ > 0 && + blob_info.mod_count_ > blob_info.last_flush_) { + rctx.flush_->count_ += 1; + return; + } } } } @@ -349,7 +371,7 @@ class Server : public TaskLib { } HILOG(kDebug, "Beginning PUT for (hash: {})", std::hash{}(blob_name)); - BLOB_MAP_T &blob_map = blob_map_[rctx.lane_id_]; + BLOB_MAP_T &blob_map = blob_map_; BlobInfo &blob_info = blob_map[task->blob_id_]; blob_info.score_ = task->score_; blob_info.user_score_ = task->score_; @@ -367,134 +389,28 @@ class Server : public TaskLib { blob_info.mod_count_ = 1; HRUN_CLIENT->DelTask(stage_task); } - if (task->flags_.Any(HERMES_SHOULD_STAGE)) { - HILOG(kDebug, "This is marked as a file: {} {}", - blob_info.mod_count_, blob_info.last_flush_); - } - ssize_t bkt_size_diff = 0; - if (task->flags_.Any(HERMES_BLOB_REPLACE)) { - bkt_size_diff -= blob_info.blob_size_; - PutBlobFreeBuffersPhase(blob_info, task, rctx); - } - - // Determine amount of additional buffering space needed - size_t needed_space = task->blob_off_ + task->data_size_; - size_t size_diff = 0; - if (needed_space > blob_info.max_blob_size_) { - size_diff = needed_space - blob_info.max_blob_size_; - } - size_t min_blob_size = task->blob_off_ + task->data_size_; - if (min_blob_size > blob_info.blob_size_) { - blob_info.blob_size_ = task->blob_off_ + task->data_size_; - } - bkt_size_diff += (ssize_t)size_diff; - HILOG(kDebug, "The size diff is {} bytes (bkt diff {})", size_diff, bkt_size_diff) - - // Use DPE - std::vector schema_vec; - if (size_diff > 0) { - Context ctx; - auto *dpe = DpeFactory::Get(ctx.dpe_); - ctx.blob_score_ = task->score_; - dpe->Placement({size_diff}, targets_, ctx, schema_vec); - } - - // Allocate blob buffers - for (PlacementSchema &schema : schema_vec) { - schema.plcmnts_.emplace_back(0, fallback_target_->id_); - for (size_t sub_idx = 0; sub_idx < schema.plcmnts_.size(); ++sub_idx) { - SubPlacement &placement = schema.plcmnts_[sub_idx]; - TargetInfo &bdev = *target_map_[placement.tid_]; - LPointer alloc_task = - bdev.AsyncAllocate(task->task_node_ + 1, - blob_info.score_, - placement.size_, - blob_info.buffers_); - alloc_task->Wait(task); -// HILOG(kInfo, "(node {}) Placing {}/{} bytes in target {} of bw {}", -// HRUN_CLIENT->node_id_, -// alloc_task->alloc_size_, task->data_size_, -// placement.tid_, bdev.bandwidth_) - if (alloc_task->alloc_size_ < alloc_task->size_) { - SubPlacement &next_placement = schema.plcmnts_[sub_idx + 1]; - size_t diff = alloc_task->size_ - alloc_task->alloc_size_; - next_placement.size_ += diff; - } - // bdev.monitor_task_->rem_cap_ -= alloc_task->alloc_size_; - HRUN_CLIENT->DelTask(alloc_task); - } - } - - // Place blob in buffers - std::vector> write_tasks; - write_tasks.reserve(blob_info.buffers_.size()); - size_t blob_off = task->blob_off_, buf_off = 0; - size_t buf_left = 0, buf_right = 0; - size_t blob_right = task->blob_off_ + task->data_size_; - char *blob_buf = HRUN_CLIENT->GetDataPointer(task->data_); - HILOG(kDebug, "Number of buffers {}", blob_info.buffers_.size()); - bool found_left = false; - for (BufferInfo &buf : blob_info.buffers_) { - buf_right = buf_left + buf.t_size_; - if (blob_off >= blob_right) { - break; - } - if (buf_left <= blob_off && blob_off < buf_right) { - found_left = true; - } - if (found_left) { - size_t rel_off = blob_off - buf_left; - size_t tgt_off = buf.t_off_ + rel_off; - size_t buf_size = buf.t_size_ - rel_off; - if (buf_right > blob_right) { - buf_size = blob_right - (buf_left + rel_off); - } - HILOG(kDebug, "Writing {} bytes at off {} from target {}", buf_size, tgt_off, buf.tid_) - TargetInfo &target = *target_map_[buf.tid_]; - LPointer write_task = - target.AsyncWrite(task->task_node_ + 1, - blob_buf + buf_off, - tgt_off, buf_size); - write_tasks.emplace_back(write_task); - buf_off += buf_size; - blob_off = buf_right; - } - buf_left += buf.t_size_; - } - blob_info.max_blob_size_ = blob_off; - // Wait for the placements to complete - for (LPointer &write_task : write_tasks) { - write_task->Wait(task); - HRUN_CLIENT->DelTask(write_task); - } - // Update information - if (task->flags_.Any(HERMES_SHOULD_STAGE)) { - stager_mdm_.AsyncUpdateSize(task->task_node_ + 1, - task->tag_id_, - blob_info.name_, - task->blob_off_, - task->data_size_, 0); - } else { - bkt_mdm_.AsyncUpdateSize(task->task_node_ + 1, - task->tag_id_, - bkt_size_diff, - bucket_mdm::UpdateSizeMode::kAdd); - } + size_t new_size = task->blob_off_ + task->data_size_; if (task->flags_.Any(HERMES_BLOB_DID_CREATE)) { - bkt_mdm_.AsyncTagAddBlob(task->task_node_ + 1, - task->tag_id_, - task->blob_id_); - } - if (task->flags_.Any(HERMES_HAS_DERIVED)) { - op_mdm_.AsyncRegisterData(task->task_node_ + 1, - task->tag_id_, - task->blob_name_->str(), - task->blob_id_, - task->blob_off_, - task->data_size_); - } + blob_info.data_ = HRUN_CLIENT->AllocateBufferServer( + new_size, task); + blob_info.blob_size_ = task->data_size_; + blob_info.max_blob_size_ = task->data_size_; + } else if (blob_info.max_blob_size_ < new_size) { + LPointer old_data = blob_info.data_; + blob_info.data_ = HRUN_CLIENT->AllocateBufferServer( + new_size, task); + memcpy(blob_info.data_.ptr_, old_data.ptr_, blob_info.blob_size_); + HRUN_CLIENT->FreeBuffer(old_data); + blob_info.max_blob_size_ = new_size; + blob_info.blob_size_ = new_size; + } + + // Copy data to memory + char *data = HRUN_CLIENT->GetDataPointer(task->data_); + char *blob_data = blob_info.data_.ptr_; + memcpy(blob_data + task->blob_off_, data, task->data_size_); // Free data HILOG(kDebug, "Completing PUT for {}", blob_name.str()); @@ -504,20 +420,6 @@ class Server : public TaskLib { void MonitorPutBlob(u32 mode, PutBlobTask *task, RunContext &rctx) { } - /** Release buffers */ - void PutBlobFreeBuffersPhase(BlobInfo &blob_info, PutBlobTask *task, RunContext &rctx) { - for (BufferInfo &buf : blob_info.buffers_) { - TargetInfo &target = *target_map_[buf.tid_]; - std::vector buf_vec = {buf}; - target.AsyncFree(task->task_node_ + 1, - blob_info.score_, - std::move(buf_vec), true); - } - blob_info.buffers_.clear(); - blob_info.max_blob_size_ = 0; - blob_info.blob_size_ = 0; - } - /** Get a blob's data */ void GetBlob(GetBlobTask *task, RunContext &rctx) { if (task->blob_id_.IsNull()) { @@ -525,7 +427,7 @@ class Server : public TaskLib { task->blob_id_ = GetOrCreateBlobId(task->tag_id_, task->lane_hash_, blob_name, rctx, task->flags_); } - BLOB_MAP_T &blob_map = blob_map_[rctx.lane_id_]; + BLOB_MAP_T &blob_map = blob_map_; BlobInfo &blob_info = blob_map[task->blob_id_]; // Stage Blob @@ -541,48 +443,10 @@ class Server : public TaskLib { HRUN_CLIENT->DelTask(stage_task); } - // Read blob from buffers - std::vector read_tasks; - read_tasks.reserve(blob_info.buffers_.size()); - HILOG(kDebug, "Getting blob {} of size {} starting at offset {} (total_blob_size={}, buffers={})", - task->blob_id_, task->data_size_, task->blob_off_, blob_info.blob_size_, blob_info.buffers_.size()); - size_t blob_off = task->blob_off_; - size_t buf_left = 0, buf_right = 0; - size_t buf_off = 0; - size_t blob_right = task->blob_off_ + task->data_size_; - bool found_left = false; - char *blob_buf = HRUN_CLIENT->GetDataPointer(task->data_); - for (BufferInfo &buf : blob_info.buffers_) { - buf_right = buf_left + buf.t_size_; - if (blob_off >= blob_right) { - break; - } - if (buf_left <= blob_off && blob_off < buf_right) { - found_left = true; - } - if (found_left) { - size_t rel_off = blob_off - buf_left; - size_t tgt_off = buf.t_off_ + rel_off; - size_t buf_size = buf.t_size_ - rel_off; - if (buf_right > blob_right) { - buf_size = blob_right - (buf_left + rel_off); - } - HILOG(kDebug, "Loading {} bytes at off {} from target {}", buf_size, tgt_off, buf.tid_) - TargetInfo &target = *target_map_[buf.tid_]; - bdev::ReadTask *read_task = target.AsyncRead(task->task_node_ + 1, - blob_buf + buf_off, - tgt_off, buf_size).ptr_; - read_tasks.emplace_back(read_task); - buf_off += buf_size; - blob_off = buf_right; - } - buf_left += buf.t_size_; - } - for (bdev::ReadTask *&read_task : read_tasks) { - read_task->Wait(task); - HRUN_CLIENT->DelTask(read_task); - } - task->data_size_ = buf_off; + // Copy data from memory + char *data = HRUN_CLIENT->GetDataPointer(task->data_); + char *blob_data = blob_info.data_.ptr_; + memcpy(data, blob_data + task->blob_off_, task->data_size_); task->SetModuleComplete(); } void MonitorGetBlob(u32 mode, GetBlobTask *task, RunContext &rctx) { @@ -592,13 +456,13 @@ class Server : public TaskLib { * Tag a blob * */ void TagBlob(TagBlobTask *task, RunContext &rctx) { - BLOB_MAP_T &blob_map = blob_map_[rctx.lane_id_]; + BLOB_MAP_T &blob_map = blob_map_; auto it = blob_map.find(task->blob_id_); if (it == blob_map.end()) { task->SetModuleComplete(); return; } - BlobInfo &blob = it->second; + BlobInfo &blob = *it.val_->second_; blob.tags_.push_back(task->tag_); task->SetModuleComplete(); } @@ -609,13 +473,13 @@ class Server : public TaskLib { * Check if blob has a tag * */ void BlobHasTag(BlobHasTagTask *task, RunContext &rctx) { - BLOB_MAP_T &blob_map = blob_map_[rctx.lane_id_]; + BLOB_MAP_T &blob_map = blob_map_; auto it = blob_map.find(task->blob_id_); if (it == blob_map.end()) { task->SetModuleComplete(); return; } - BlobInfo &blob = it->second; + BlobInfo &blob = *it.val_->second_; task->has_tag_ = std::find(blob.tags_.begin(), blob.tags_.end(), task->tag_) != blob.tags_.end(); @@ -630,14 +494,15 @@ class Server : public TaskLib { BlobId GetOrCreateBlobId(TagId &tag_id, u32 lane_hash, const hshm::charbuf &blob_name, RunContext &rctx, bitfield32_t &flags) { - hshm::charbuf blob_name_unique = GetBlobNameWithBucket(tag_id, blob_name); - BLOB_ID_MAP_T &blob_id_map = blob_id_map_[rctx.lane_id_]; - auto it = blob_id_map.find(blob_name_unique); + hipc::uptr blob_name_unique = + Client::GetBlobNameWithBucket(tag_id, blob_name); + BLOB_ID_MAP_T &blob_id_map = blob_id_map_; + auto it = blob_id_map.find(*blob_name_unique); if (it == blob_id_map.end()) { BlobId blob_id = BlobId(node_id_, lane_hash, id_alloc_.fetch_add(1)); - blob_id_map.emplace(blob_name_unique, blob_id); + blob_id_map.emplace(*blob_name_unique, blob_id); flags.SetBits(HERMES_BLOB_DID_CREATE); - BLOB_MAP_T &blob_map = blob_map_[rctx.lane_id_]; + BLOB_MAP_T &blob_map = blob_map_; blob_map.emplace(blob_id, BlobInfo()); BlobInfo &blob_info = blob_map[blob_id]; blob_info.name_ = blob_name; @@ -651,7 +516,7 @@ class Server : public TaskLib { blob_info.last_flush_ = 0; return blob_id; } - return it->second; + return *it.val_->second_; } void GetOrCreateBlobId(GetOrCreateBlobIdTask *task, RunContext &rctx) { hshm::charbuf blob_name = hshm::to_charbuf(*task->blob_name_); @@ -668,16 +533,17 @@ class Server : public TaskLib { HSHM_ALWAYS_INLINE void GetBlobId(GetBlobIdTask *task, RunContext &rctx) { hshm::charbuf blob_name = hshm::to_charbuf(*task->blob_name_); - hshm::charbuf blob_name_unique = GetBlobNameWithBucket(task->tag_id_, blob_name); - BLOB_ID_MAP_T &blob_id_map = blob_id_map_[rctx.lane_id_]; - auto it = blob_id_map.find(blob_name_unique); + hipc::uptr blob_name_unique = + Client::GetBlobNameWithBucket(task->tag_id_, blob_name); + BLOB_ID_MAP_T &blob_id_map = blob_id_map_; + auto it = blob_id_map.find(*blob_name_unique); if (it == blob_id_map.end()) { task->blob_id_ = BlobId::GetNull(); task->SetModuleComplete(); HILOG(kDebug, "Failed to find blob {} in {}", blob_name.str(), task->tag_id_); return; } - task->blob_id_ = it->second; + task->blob_id_ = *it.val_->second_; HILOG(kDebug, "Found blob {} / {} in {}", task->blob_id_, blob_name.str(), task->tag_id_); task->SetModuleComplete(); } @@ -688,13 +554,13 @@ class Server : public TaskLib { * Get \a blob_name BLOB name from \a blob_id BLOB id * */ void GetBlobName(GetBlobNameTask *task, RunContext &rctx) { - BLOB_MAP_T &blob_map = blob_map_[rctx.lane_id_]; + BLOB_MAP_T &blob_map = blob_map_; auto it = blob_map.find(task->blob_id_); if (it == blob_map.end()) { task->SetModuleComplete(); return; } - BlobInfo &blob = it->second; + BlobInfo &blob = *it.val_->second_; (*task->blob_name_) = blob.name_; task->SetModuleComplete(); } @@ -711,14 +577,14 @@ class Server : public TaskLib { hshm::to_charbuf(*task->blob_name_), rctx, flags); } - BLOB_MAP_T &blob_map = blob_map_[rctx.lane_id_]; + BLOB_MAP_T &blob_map = blob_map_; auto it = blob_map.find(task->blob_id_); if (it == blob_map.end()) { task->size_ = 0; task->SetModuleComplete(); return; } - BlobInfo &blob = it->second; + BlobInfo &blob = *it.val_->second_; task->size_ = blob.blob_size_; task->SetModuleComplete(); } @@ -729,13 +595,13 @@ class Server : public TaskLib { * Get \a score from \a blob_id BLOB id * */ void GetBlobScore(GetBlobScoreTask *task, RunContext &rctx) { - BLOB_MAP_T &blob_map = blob_map_[rctx.lane_id_]; + BLOB_MAP_T &blob_map = blob_map_; auto it = blob_map.find(task->blob_id_); if (it == blob_map.end()) { task->SetModuleComplete(); return; } - BlobInfo &blob = it->second; + BlobInfo &blob = *it.val_->second_; task->score_ = blob.score_; task->SetModuleComplete(); } @@ -746,13 +612,13 @@ class Server : public TaskLib { * Get \a blob_id blob's buffers * */ void GetBlobBuffers(GetBlobBuffersTask *task, RunContext &rctx) { - BLOB_MAP_T &blob_map = blob_map_[rctx.lane_id_]; + BLOB_MAP_T &blob_map = blob_map_; auto it = blob_map.find(task->blob_id_); if (it == blob_map.end()) { task->SetModuleComplete(); return; } - BlobInfo &blob = it->second; + BlobInfo &blob = *it.val_->second_; (*task->buffers_) = blob.buffers_; task->SetModuleComplete(); } @@ -764,17 +630,6 @@ class Server : public TaskLib { * in \a bkt_id bucket. * */ void RenameBlob(RenameBlobTask *task, RunContext &rctx) { - BLOB_MAP_T &blob_map = blob_map_[rctx.lane_id_]; - auto it = blob_map.find(task->blob_id_); - if (it == blob_map.end()) { - task->SetModuleComplete(); - return; - } - BLOB_ID_MAP_T &blob_id_map = blob_id_map_[rctx.lane_id_]; - BlobInfo &blob = it->second; - blob_id_map.erase(blob.name_); - blob_id_map[blob.name_] = task->blob_id_; - blob.name_ = hshm::to_charbuf(*task->new_blob_name_); task->SetModuleComplete(); } void MonitorRenameBlob(u32 mode, RenameBlobTask *task, RunContext &rctx) { @@ -784,14 +639,6 @@ class Server : public TaskLib { * Truncate a blob to a new size * */ void TruncateBlob(TruncateBlobTask *task, RunContext &rctx) { - BLOB_MAP_T &blob_map = blob_map_[rctx.lane_id_]; - auto it = blob_map.find(task->blob_id_); - if (it == blob_map.end()) { - task->SetModuleComplete(); - return; - } - BlobInfo &blob_info = it->second; - // TODO(llogan): truncate blob task->SetModuleComplete(); } void MonitorTruncateBlob(u32 mode, TruncateBlobTask *task, RunContext &rctx) { @@ -801,53 +648,7 @@ class Server : public TaskLib { * Destroy \a blob_id blob in \a bkt_id bucket * */ void DestroyBlob(DestroyBlobTask *task, RunContext &rctx) { - switch (task->phase_) { - case DestroyBlobPhase::kFreeBuffers: { - BLOB_MAP_T &blob_map = blob_map_[rctx.lane_id_]; - auto it = blob_map.find(task->blob_id_); - if (it == blob_map.end()) { - task->SetModuleComplete(); - return; - } - BLOB_ID_MAP_T &blob_id_map = blob_id_map_[rctx.lane_id_]; - BlobInfo &blob_info = it->second; - hshm::charbuf unique_name = GetBlobNameWithBucket(blob_info.tag_id_, blob_info.name_); - blob_id_map.erase(unique_name); - HSHM_MAKE_AR0(task->free_tasks_, nullptr); - task->free_tasks_->reserve(blob_info.buffers_.size()); - for (BufferInfo &buf : blob_info.buffers_) { - TargetInfo &tgt_info = *target_map_[buf.tid_]; - std::vector buf_vec = {buf}; - bdev::FreeTask *free_task = tgt_info.AsyncFree( - task->task_node_ + 1, blob_info.score_, - std::move(buf_vec), false).ptr_; - task->free_tasks_->emplace_back(free_task); - } - task->phase_ = DestroyBlobPhase::kWaitFreeBuffers; - } - case DestroyBlobPhase::kWaitFreeBuffers: { - std::vector &free_tasks = *task->free_tasks_; - for (bdev::FreeTask *&free_task : free_tasks) { - if (!free_task->IsComplete()) { - return; - } - } - for (bdev::FreeTask *&free_task : free_tasks) { - HRUN_CLIENT->DelTask(free_task); - } - BLOB_MAP_T &blob_map = blob_map_[rctx.lane_id_]; - BlobInfo &blob_info = blob_map[task->blob_id_]; - if (task->update_size_) { - bkt_mdm_.AsyncUpdateSize(task->task_node_ + 1, - task->tag_id_, - -(ssize_t) blob_info.blob_size_, - bucket_mdm::UpdateSizeMode::kAdd); - } - HSHM_DESTROY_AR(task->free_tasks_); - blob_map.erase(task->blob_id_); - task->SetModuleComplete(); - } - } + task->SetModuleComplete(); } void MonitorDestroyBlob(u32 mode, DestroyBlobTask *task, RunContext &rctx) { } @@ -864,13 +665,13 @@ class Server : public TaskLib { task->blob_id_ = GetOrCreateBlobId(task->tag_id_, task->lane_hash_, blob_name, rctx, flags); } - BLOB_MAP_T &blob_map = blob_map_[rctx.lane_id_]; + BLOB_MAP_T &blob_map = blob_map_; auto it = blob_map.find(task->blob_id_); if (it == blob_map.end()) { task->SetModuleComplete(); return; } - BlobInfo &blob_info = it->second; + BlobInfo &blob_info = *it.val_->second_; if (task->is_user_score_) { blob_info.user_score_ = task->score_; blob_info.score_ = blob_info.user_score_; @@ -922,14 +723,6 @@ class Server : public TaskLib { * */ HSHM_ALWAYS_INLINE void PollBlobMetadata(PollBlobMetadataTask *task, RunContext &rctx) { - BLOB_MAP_T &blob_map = blob_map_[rctx.lane_id_]; - std::vector blob_mdms; - blob_mdms.reserve(blob_map.size()); - for (const std::pair &blob_part : blob_map) { - const BlobInfo &blob_info = blob_part.second; - blob_mdms.emplace_back(blob_info); - } - task->SerializeBlobMetadata(blob_mdms); task->SetModuleComplete(); } void MonitorPollBlobMetadata(u32 mode, PollBlobMetadataTask *task, RunContext &rctx) {