Skip to content

Commit

Permalink
Add Plasma Metrics API (apache#4)
Browse files Browse the repository at this point in the history
* add jni metrics interface

* add metrics related protocol, update fbs file and generated header file

* add Plasma metrics message functions

* impl metrcis method (server side)

* impl metrics method (client side)

* impl metrics method (jni layer)

* add ut

* fix code style
  • Loading branch information
jikunshang authored and zhztheplayer committed Feb 28, 2022
1 parent 031e8e1 commit 4d893aa
Show file tree
Hide file tree
Showing 13 changed files with 353 additions and 5 deletions.
12 changes: 12 additions & 0 deletions cpp/src/plasma/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,8 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp

int64_t store_capacity() { return store_capacity_; }

Status Metrics(PlasmaMetrics* metrics);

private:
/// Check if store_fd has already been received from the store. If yes,
/// return it. Otherwise, receive it from the store (see analogous logic
Expand Down Expand Up @@ -1114,6 +1116,14 @@ std::string PlasmaClient::Impl::DebugString() {
return debug_string;
}

Status PlasmaClient::Impl::Metrics(PlasmaMetrics* metrics) {
std::lock_guard<std::recursive_mutex> guard(client_mutex_);
RETURN_NOT_OK(SendMetricsRequest(store_conn_));
std::vector<uint8_t> buffer;
RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaMetricsReply, &buffer));
return ReadMetricsReply(buffer.data(), buffer.size(), metrics);
}

// ----------------------------------------------------------------------
// PlasmaClient

Expand Down Expand Up @@ -1221,4 +1231,6 @@ bool PlasmaClient::IsInUse(const ObjectID& object_id) {

int64_t PlasmaClient::store_capacity() { return impl_->store_capacity(); }

Status PlasmaClient::Metrics(PlasmaMetrics* metrics) { return impl_->Metrics(metrics); }

} // namespace plasma
9 changes: 9 additions & 0 deletions cpp/src/plasma/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,15 @@ class ARROW_EXPORT PlasmaClient {
/// \return Memory capacity of the store in bytes.
int64_t store_capacity();

/// Get PlasmaStore memory usage metrics.
///
/// This API is experimental and might change in the future.
///
/// \param[out] metrics PlasmaStore memory uasge, including total share memory,
/// used share memory, total external memory, used external memory.
/// \return The return status.
Status Metrics(PlasmaMetrics* metrics);

private:
friend class PlasmaBuffer;
friend class PlasmaMutableBuffer;
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/plasma/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,14 @@ typedef std::unordered_map<ObjectID, std::unique_ptr<ObjectTableEntry>> ObjectTa
/// by making it possible to pass a context object through dlmalloc.
struct PlasmaStoreInfo;
extern const PlasmaStoreInfo* plasma_config;

/// This type is used to transfer plasma metrics between server and client.
struct PlasmaMetrics {
int64_t share_mem_total;
int64_t share_mem_used;
int64_t external_total;
int64_t external_used;
};
} // namespace plasma

namespace std {
Expand Down
24 changes: 24 additions & 0 deletions cpp/src/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.cc
Original file line number Diff line number Diff line change
Expand Up @@ -261,3 +261,27 @@ Java_org_apache_arrow_plasma_PlasmaClientJNI_list(JNIEnv* env, jclass cls, jlong

return ret;
}

JNIEXPORT jint JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_metrics(
JNIEnv* env, jclass cls, jlong conn, jlongArray result) {
plasma::PlasmaClient* client = reinterpret_cast<plasma::PlasmaClient*>(conn);
int64_t* metrics;
plasma::PlasmaMetrics plasmaMetrics;
client->Metrics(&plasmaMetrics);
if (result != nullptr) {
metrics = (int64_t*)env->GetPrimitiveArrayCritical(result, 0);
}
if (metrics == nullptr) {
return -1;
}
metrics[0] = plasmaMetrics.share_mem_total;
metrics[1] = plasmaMetrics.share_mem_used;
metrics[2] = plasmaMetrics.external_total;
metrics[3] = plasmaMetrics.external_used;

if (result != nullptr) {
env->ReleasePrimitiveArrayCritical(result, (void*)metrics, 0);
}

return 0;
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,15 @@ JNIEXPORT jobjectArray JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_list
jclass,
jlong);

/*
* Class: org_apache_arrow_plasma_PlasmaClientJNI
* Method: metrics
* Signature: (J[J)I
*/
JNIEXPORT jint JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_metrics(JNIEnv*,
jclass, jlong,
jlongArray);

#ifdef __cplusplus
}
#endif
Expand Down
22 changes: 22 additions & 0 deletions cpp/src/plasma/plasma.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ enum MessageType:long {
// Touch a number of objects to bump their position in the LRU cache.
PlasmaRefreshLRURequest,
PlasmaRefreshLRUReply,
// Get Plasma metrics info
PlasmaMetricsRequest,
PlasmaMetricsReply,
}

enum PlasmaError:int {
Expand Down Expand Up @@ -116,6 +119,17 @@ struct PlasmaObjectSpec {
device_num: int;
}

struct PlasmaMetricsSpec {
// total share memory allocated.
share_mem_total: long;
// used share memory currently.
share_mem_used: long;
// total external store size.
external_total: long;
// used external store size.
external_used: long;
}

table PlasmaSetOptionsRequest {
// The name of the client.
client_name: string;
Expand Down Expand Up @@ -355,3 +369,11 @@ table PlasmaRefreshLRURequest {

table PlasmaRefreshLRUReply {
}

table PlasmaMetricsRequest {
}

table PlasmaMetricsReply {
// Metrics info to reply
metrics: PlasmaMetricsSpec;
}
Loading

0 comments on commit 4d893aa

Please sign in to comment.