Skip to content

Commit

Permalink
Merge pull request #681 from lukemartinlogan/memory_only
Browse files Browse the repository at this point in the history
Hermes memory only first implementation
  • Loading branch information
lukemartinlogan authored Feb 20, 2024
2 parents accec29 + fb7274e commit b029d6a
Show file tree
Hide file tree
Showing 12 changed files with 447 additions and 374 deletions.
2 changes: 0 additions & 2 deletions ci/hermes/packages/hermes_shm/package.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ class HermesShm(CMakePackage):
homepage = "https://github.com/lukemartinlogan/hermes_shm/wiki"
git = "https://github.com/lukemartinlogan/hermes_shm.git"
version('master', branch='master')
version("1.1.0", sha256="080d5361cff22794b670e4544c532926ca8b6d6ec695af25596efe035bfffea5")
version("1.0.0", sha256="a79f01d531ce89985ad59a2f62b41d74c2385e48d929e2f4ad895ae34137573b")

# Main variants
variant('debug', default=False, description='Build shared libraries')
Expand Down
2 changes: 2 additions & 0 deletions hrun/include/hrun/api/hrun_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ class Client : public ConfigurationManager {
qm.shm_name_);
mem_mngr->AttachBackend(hipc::MemoryBackendType::kPosixShmMmap,
qm.data_shm_name_);
mem_mngr->AttachBackend(hipc::MemoryBackendType::kPosixShmMmap,
qm.rdata_shm_name_);
}
main_alloc_ = mem_mngr->GetAllocator(main_alloc_id_);
data_alloc_ = mem_mngr->GetAllocator(data_alloc_id_);
Expand Down
123 changes: 123 additions & 0 deletions hrun/include/hrun/hrun_map.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
//
// Created by lukemartinlogan on 2/20/24.
//

#ifndef HERMES_HRUN_INCLUDE_HRUN_HRUN_MAP_H_
#define HERMES_HRUN_INCLUDE_HRUN_HRUN_MAP_H_

#include "hrun_types.h"

namespace hrun {

template<typename Key, typename T>
class LockFreeMap;

/**
* Iterator for lock-free map
* */
template<typename Key, typename T>
struct LockFreeMapIterator {
public:
using Pair = hipc::pair<Key, T>;
size_t x_, y_;
Pair *val_;

bool operator==(const LockFreeMapIterator &other) const {
return x_ == other.x_ && y_ == other.y_;
}

bool operator!=(const LockFreeMapIterator &other) const {
return x_ != other.x_ || y_ != other.y_;
}
};

/**
* A lock-free unordered map implementation
* */
template<typename Key, typename T>
class LockFreeMap {
public:
using Pair = hipc::pair<Key, T>;
using Bucket = hipc::mpsc_queue<Pair>;
hipc::mptr<hipc::vector<Bucket>> map_;
int num_lanes_;
size_t bkts_per_lane_;

void Init(hipc::Allocator *alloc, int num_lanes, size_t bkts_per_lane,
size_t collisions) {
map_ = hipc::make_mptr<hipc::vector<Bucket>>(
alloc, num_lanes * bkts_per_lane, collisions);
}

void Connect(const hipc::Pointer &p) {
map_ = hipc::mptr<hipc::vector<Bucket>>(
p);
}

size_t FirstBucket(int lane_id) const {
return lane_id * bkts_per_lane_;
}

size_t LastBucket(int lane_id) const {
return FirstBucket(lane_id) + bkts_per_lane_;
}

Bucket& GetBucket(size_t bkt_id) {
return (*map_)[bkt_id];
}

hipc::Pointer GetShmPointer() {
return map_->template GetShmPointer<hipc::Pointer>();
}

void emplace(const Key &key, const T &val) {
size_t hash = std::hash<Key>{}(key);
size_t x = hash % map_->size();
Bucket &bkt = (*map_)[x];
bkt.emplace(key, val);
}

LockFreeMapIterator<Key, T> find(const Key &key) {
LockFreeMapIterator<Key, T> it;
size_t hash = std::hash<Key>{}(key);
it.x_ = hash % map_->size();
Bucket &bkt = (*map_)[it.x_];
size_t head = bkt.head_;
size_t tail = bkt.tail_;
size_t bkt_size = 0;
if (head <= tail) {
bkt_size = tail - head;
}
for (size_t i = 0; i < bkt_size; ++i) {
Pair *val;
if (bkt.peek(val, i).IsNull()) {
continue;
}
it.y_ = i;
it.val_ = val;
return it;
}
it.x_ = map_->size();
it.y_ = -1;
return it;
}

T& operator[](const Key &key) {
LockFreeMapIterator<Key, T> it = find(key);
if (it.x_ == map_->size()) {
throw std::runtime_error("Key not found");
}
return *it.val_->second_;
}

LockFreeMapIterator<Key, T> end() const {
LockFreeMapIterator<Key, T> it;
it.x_ = map_->size();
it.y_ = -1;
return it;
}
};

} // namespace hshm

#endif //HERMES_HRUN_INCLUDE_HRUN_HRUN_MAP_H_
64 changes: 45 additions & 19 deletions include/hermes/bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -465,29 +465,55 @@ class Bucket {
* Get \a blob_id Blob from the bucket (sync)
* */
BlobId BaseGet(const std::string &blob_name,
const BlobId &orig_blob_id,
BlobId blob_id,
Blob &blob,
size_t blob_off,
Context &ctx) {
// TODO(llogan): intercept mmap to avoid copy
size_t data_size = blob.size();
if (blob.size() == 0) {
data_size = blob_mdm_->GetBlobSizeRoot(
id_, hshm::charbuf(blob_name), orig_blob_id);
blob.resize(data_size);
// Get the blob ID
if (blob_id.IsNull()) {
auto &blob_id_map = HERMES_CONF->blob_mdm_.blob_id_map_;
auto blob_name_buf = blob_mdm::Client::GetBlobNameWithBucket(id_, blob_name);
auto it = blob_id_map.find(*blob_name_buf);
if (it != blob_id_map.end()) {
blob_id = *it.val_->second_;
}
}
if (!blob_id.IsNull()) {
auto &blob_map = HERMES_CONF->blob_mdm_.blob_map_;
auto it = blob_map.find(blob_id);
if (it != blob_map.end()) {
BlobInfo &blob_info = *it.val_->second_;
if (blob_off + blob.size() > blob_info.blob_size_) {
if (blob_info.blob_size_ < blob_off) {
return BlobId::GetNull();
}
blob.resize(blob_info.blob_size_ - blob_off);
}
char *data = HRUN_CLIENT->GetDataPointer(blob_info.data_.shm_);
memcpy(blob.data(), data + blob_off, blob.size());
return blob_id;
}
} else {
size_t data_size = blob.size();
if (blob.size() == 0) {
data_size = blob_mdm_->GetBlobSizeRoot(
id_, hshm::charbuf(blob_name), blob_id);
blob.resize(data_size);
}
HILOG(kDebug, "Getting blob of size {}", data_size);
BlobId blob_id;
LPointer<hrunpq::TypedPushTask<GetBlobTask>> push_task;
push_task = AsyncBaseGet(blob_name, blob_id, blob, blob_off, ctx);
push_task->Wait();
GetBlobTask *task = push_task->get();
blob_id = task->blob_id_;
char *data = HRUN_CLIENT->GetDataPointer(task->data_);
memcpy(blob.data(), data, task->data_size_);
blob.resize(task->data_size_);
HRUN_CLIENT->FreeBuffer(task->data_);
HRUN_CLIENT->DelTask(push_task);
return blob_id;
}
HILOG(kDebug, "Getting blob of size {}", data_size);
BlobId blob_id;
LPointer<hrunpq::TypedPushTask<GetBlobTask>> push_task;
push_task = AsyncBaseGet(blob_name, orig_blob_id, blob, blob_off, ctx);
push_task->Wait();
GetBlobTask *task = push_task->get();
blob_id = task->blob_id_;
char *data = HRUN_CLIENT->GetDataPointer(task->data_);
memcpy(blob.data(), data, task->data_size_);
blob.resize(task->data_size_);
HRUN_CLIENT->FreeBuffer(task->data_);
HRUN_CLIENT->DelTask(push_task);
return blob_id;
}

Expand Down
2 changes: 2 additions & 0 deletions include/hermes/config_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "hermes/config_server.h"
#include "data_stager/data_stager.h"


namespace hermes {

class ConfigurationManager {
Expand All @@ -49,6 +50,7 @@ class ConfigurationManager {
LoadServerConfig(config_path);
mdm_.CreateRoot(DomainId::GetGlobal(), "hermes_mdm");
blob_mdm_.CreateRoot(DomainId::GetGlobal(), "hermes_blob_mdm");
blob_mdm_.GetLocalTablesRoot(DomainId::GetLocal());
bkt_mdm_.CreateRoot(DomainId::GetGlobal(), "hermes_bkt_mdm");
op_mdm_.CreateRoot(DomainId::GetGlobal(), "hermes_op_mdm",
bkt_mdm_.id_, blob_mdm_.id_);
Expand Down
1 change: 1 addition & 0 deletions include/hermes/hermes_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ struct BlobInfo {
std::atomic<size_t> mod_count_; /**< The number of times blob modified */
std::atomic<size_t> last_flush_; /**< The last mod that was flushed */
bitfield32_t flags_; /**< Flags */
LPointer<char> data_;

/** Serialization */
template<typename Ar>
Expand Down
48 changes: 48 additions & 0 deletions tasks/hermes_blob_mdm/include/hermes_blob_mdm/hermes_blob_mdm.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,41 @@
#define HRUN_hermes_blob_mdm_H_

#include "hermes_blob_mdm_tasks.h"
#include "hrun/hrun_map.h"

namespace hermes::blob_mdm {

typedef hrun::LockFreeMap<hipc::charbuf, BlobId> BLOB_ID_MAP_T;
typedef hrun::LockFreeMap<BlobId, BlobInfo> BLOB_MAP_T;

/** Create hermes_blob_mdm requests */
class Client : public TaskLibClient {
public:
BLOB_ID_MAP_T blob_id_map_;
BLOB_MAP_T blob_map_;

public:
/** Get the globally unique blob name */
static hipc::uptr<hipc::charbuf>
GetBlobNameWithBucket(TagId tag_id, const hshm::charbuf &blob_name) {
auto new_name = hshm::charbuf(
sizeof(TagId) + blob_name.size());
hrun::LocalSerialize srl(new_name);
srl << tag_id;
srl << blob_name;
return hipc::make_uptr<hipc::charbuf>(new_name);
}

/** Get the globally unique blob name */
static hipc::uptr<hipc::charbuf>
GetBlobNameWithBucket(TagId tag_id, const std::string &blob_name) {
auto new_name = hshm::charbuf(
sizeof(TagId) + blob_name.size());
hrun::LocalSerialize srl(new_name);
srl << tag_id;
srl << blob_name;
return hipc::make_uptr<hipc::charbuf>(new_name);
}

public:
/** Default constructor */
Expand Down Expand Up @@ -56,6 +86,24 @@ class Client : public TaskLibClient {
* Blob Operations
* ===================================*/

/** Sets the BUCKET MDM */
void AsyncGetLocalTablesConstruct(GetLocalTablesTask *task,
const TaskNode &task_node,
const DomainId &domain_id) {
HRUN_CLIENT->ConstructTask<GetLocalTablesTask>(
task, task_node, domain_id, id_);
}
void GetLocalTablesRoot(const DomainId &domain_id) {
LPointer<hrunpq::TypedPushTask<GetLocalTablesTask>> push_task =
AsyncGetLocalTablesRoot(domain_id);
push_task->Wait();
GetLocalTablesTask *task = push_task->get();
blob_id_map_.Connect(task->blob_id_map_);
blob_map_.Connect(task->blob_map_);
HRUN_CLIENT->DelTask(push_task);
}
HRUN_TASK_NODE_PUSH_ROOT(GetLocalTables);

/** Sets the BUCKET MDM */
void AsyncSetBucketMdmConstruct(SetBucketMdmTask *task,
const TaskNode &task_node,
Expand Down
Loading

0 comments on commit b029d6a

Please sign in to comment.