From 12417900dc302ecb6c0784e1fc9337f77a095049 Mon Sep 17 00:00:00 2001 From: jikunshang Date: Fri, 12 Mar 2021 16:37:48 +0800 Subject: [PATCH] Add Plasma Metrics API (#4) * add jni metrics interface * add metrics related protocol, update fbs file and generated header file * add Plasma metrics message functions * impl metrcis method (server side) * impl metrics method (client side) * impl metrics method (jni layer) * add ut * fix code style --- cpp/src/plasma/client.cc | 12 ++ cpp/src/plasma/client.h | 9 + cpp/src/plasma/common.h | 8 + ...org_apache_arrow_plasma_PlasmaClientJNI.cc | 24 +++ .../org_apache_arrow_plasma_PlasmaClientJNI.h | 9 + cpp/src/plasma/plasma.fbs | 22 ++ cpp/src/plasma/plasma_generated.h | 204 +++++++++++++++++- cpp/src/plasma/protocol.cc | 27 +++ cpp/src/plasma/protocol.h | 8 + cpp/src/plasma/store.cc | 14 ++ cpp/src/plasma/store.h | 2 + cpp/src/plasma/test/client_tests.cc | 17 ++ .../apache/arrow/plasma/PlasmaClientJNI.java | 2 + 13 files changed, 353 insertions(+), 5 deletions(-) diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc index 260999922f554..a698aac6a6166 100644 --- a/cpp/src/plasma/client.cc +++ b/cpp/src/plasma/client.cc @@ -276,6 +276,8 @@ class PlasmaClient::Impl : public std::enable_shared_from_this guard(client_mutex_); + RETURN_NOT_OK(SendMetricsRequest(store_conn_)); + std::vector buffer; + RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaMetricsReply, &buffer)); + return ReadMetricsReply(buffer.data(), buffer.size(), metrics); +} + // ---------------------------------------------------------------------- // PlasmaClient @@ -1221,4 +1231,6 @@ bool PlasmaClient::IsInUse(const ObjectID& object_id) { int64_t PlasmaClient::store_capacity() { return impl_->store_capacity(); } +Status PlasmaClient::Metrics(PlasmaMetrics* metrics) { return impl_->Metrics(metrics); } + } // namespace plasma diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h index 7a70bba5fa0ba..b912555fbad7d 100644 --- a/cpp/src/plasma/client.h +++ b/cpp/src/plasma/client.h @@ -293,6 +293,15 @@ class ARROW_EXPORT PlasmaClient { /// \return Memory capacity of the store in bytes. int64_t store_capacity(); + /// Get PlasmaStore memory usage metrics. + /// + /// This API is experimental and might change in the future. + /// + /// \param[out] metrics PlasmaStore memory uasge, including total share memory, + /// used share memory, total external memory, used external memory. + /// \return The return status. + Status Metrics(PlasmaMetrics* metrics); + private: friend class PlasmaBuffer; friend class PlasmaMutableBuffer; diff --git a/cpp/src/plasma/common.h b/cpp/src/plasma/common.h index 071e55ea30e27..9332857f42a1e 100644 --- a/cpp/src/plasma/common.h +++ b/cpp/src/plasma/common.h @@ -145,6 +145,14 @@ typedef std::unordered_map> ObjectTa /// by making it possible to pass a context object through dlmalloc. struct PlasmaStoreInfo; extern const PlasmaStoreInfo* plasma_config; + +/// This type is used to transfer plasma metrics between server and client. +struct PlasmaMetrics { + int64_t share_mem_total; + int64_t share_mem_used; + int64_t external_total; + int64_t external_used; +}; } // namespace plasma namespace std { diff --git a/cpp/src/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.cc b/cpp/src/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.cc index 10e0fcb371df7..32b48e33e62d8 100644 --- a/cpp/src/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.cc +++ b/cpp/src/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.cc @@ -261,3 +261,27 @@ Java_org_apache_arrow_plasma_PlasmaClientJNI_list(JNIEnv* env, jclass cls, jlong return ret; } + +JNIEXPORT jint JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_metrics( + JNIEnv* env, jclass cls, jlong conn, jlongArray result) { + plasma::PlasmaClient* client = reinterpret_cast(conn); + int64_t* metrics; + plasma::PlasmaMetrics plasmaMetrics; + client->Metrics(&plasmaMetrics); + if (result != nullptr) { + metrics = (int64_t*)env->GetPrimitiveArrayCritical(result, 0); + } + if (metrics == nullptr) { + return -1; + } + metrics[0] = plasmaMetrics.share_mem_total; + metrics[1] = plasmaMetrics.share_mem_used; + metrics[2] = plasmaMetrics.external_total; + metrics[3] = plasmaMetrics.external_used; + + if (result != nullptr) { + env->ReleasePrimitiveArrayCritical(result, (void*)metrics, 0); + } + + return 0; +} diff --git a/cpp/src/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.h b/cpp/src/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.h index 8a18be91debe9..31fb0c49b485a 100644 --- a/cpp/src/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.h +++ b/cpp/src/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.h @@ -135,6 +135,15 @@ JNIEXPORT jobjectArray JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_list jclass, jlong); +/* + * Class: org_apache_arrow_plasma_PlasmaClientJNI + * Method: metrics + * Signature: (J[J)I + */ +JNIEXPORT jint JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_metrics(JNIEnv*, + jclass, jlong, + jlongArray); + #ifdef __cplusplus } #endif diff --git a/cpp/src/plasma/plasma.fbs b/cpp/src/plasma/plasma.fbs index 62c02b96a4e9f..ead54f20ea26d 100644 --- a/cpp/src/plasma/plasma.fbs +++ b/cpp/src/plasma/plasma.fbs @@ -81,6 +81,9 @@ enum MessageType:long { // Touch a number of objects to bump their position in the LRU cache. PlasmaRefreshLRURequest, PlasmaRefreshLRUReply, + // Get Plasma metrics info + PlasmaMetricsRequest, + PlasmaMetricsReply, } enum PlasmaError:int { @@ -116,6 +119,17 @@ struct PlasmaObjectSpec { device_num: int; } +struct PlasmaMetricsSpec { + // total share memory allocated. + share_mem_total: long; + // used share memory currently. + share_mem_used: long; + // total external store size. + external_total: long; + // used external store size. + external_used: long; +} + table PlasmaSetOptionsRequest { // The name of the client. client_name: string; @@ -355,3 +369,11 @@ table PlasmaRefreshLRURequest { table PlasmaRefreshLRUReply { } + +table PlasmaMetricsRequest { +} + +table PlasmaMetricsReply { + // Metrics info to reply + metrics: PlasmaMetricsSpec; +} diff --git a/cpp/src/plasma/plasma_generated.h b/cpp/src/plasma/plasma_generated.h index 340f043bc833c..5ee10db88450b 100644 --- a/cpp/src/plasma/plasma_generated.h +++ b/cpp/src/plasma/plasma_generated.h @@ -13,6 +13,8 @@ namespace flatbuf { struct PlasmaObjectSpec; +struct PlasmaMetricsSpec; + struct PlasmaSetOptionsRequest; struct PlasmaSetOptionsRequestBuilder; struct PlasmaSetOptionsRequestT; @@ -153,6 +155,14 @@ struct PlasmaRefreshLRUReply; struct PlasmaRefreshLRUReplyBuilder; struct PlasmaRefreshLRUReplyT; +struct PlasmaMetricsRequest; +struct PlasmaMetricsRequestBuilder; +struct PlasmaMetricsRequestT; + +struct PlasmaMetricsReply; +struct PlasmaMetricsReplyBuilder; +struct PlasmaMetricsReplyT; + enum class MessageType : int64_t { PlasmaDisconnectClient = 0, PlasmaCreateRequest = 1LL, @@ -190,11 +200,13 @@ enum class MessageType : int64_t { PlasmaCreateAndSealBatchReply = 33LL, PlasmaRefreshLRURequest = 34LL, PlasmaRefreshLRUReply = 35LL, + PlasmaMetricsRequest = 36LL, + PlasmaMetricsReply = 37LL, MIN = PlasmaDisconnectClient, - MAX = PlasmaRefreshLRUReply + MAX = PlasmaMetricsReply }; -inline const MessageType (&EnumValuesMessageType())[36] { +inline const MessageType (&EnumValuesMessageType())[38] { static const MessageType values[] = { MessageType::PlasmaDisconnectClient, MessageType::PlasmaCreateRequest, @@ -231,13 +243,15 @@ inline const MessageType (&EnumValuesMessageType())[36] { MessageType::PlasmaCreateAndSealBatchRequest, MessageType::PlasmaCreateAndSealBatchReply, MessageType::PlasmaRefreshLRURequest, - MessageType::PlasmaRefreshLRUReply + MessageType::PlasmaRefreshLRUReply, + MessageType::PlasmaMetricsRequest, + MessageType::PlasmaMetricsReply }; return values; } inline const char * const *EnumNamesMessageType() { - static const char * const names[37] = { + static const char * const names[39] = { "PlasmaDisconnectClient", "PlasmaCreateRequest", "PlasmaCreateReply", @@ -274,13 +288,15 @@ inline const char * const *EnumNamesMessageType() { "PlasmaCreateAndSealBatchReply", "PlasmaRefreshLRURequest", "PlasmaRefreshLRUReply", + "PlasmaMetricsRequest", + "PlasmaMetricsReply", nullptr }; return names; } inline const char *EnumNameMessageType(MessageType e) { - if (flatbuffers::IsOutRange(e, MessageType::PlasmaDisconnectClient, MessageType::PlasmaRefreshLRUReply)) return ""; + if (flatbuffers::IsOutRange(e, MessageType::PlasmaDisconnectClient, MessageType::PlasmaMetricsReply)) return ""; const size_t index = static_cast(e); return EnumNamesMessageType()[index]; } @@ -375,6 +391,38 @@ FLATBUFFERS_MANUALLY_ALIGNED_STRUCT(8) PlasmaObjectSpec FLATBUFFERS_FINAL_CLASS }; FLATBUFFERS_STRUCT_END(PlasmaObjectSpec, 48); +FLATBUFFERS_MANUALLY_ALIGNED_STRUCT(8) PlasmaMetricsSpec FLATBUFFERS_FINAL_CLASS { + private: + int64_t share_mem_total_; + int64_t share_mem_used_; + int64_t external_total_; + int64_t external_used_; + + public: + PlasmaMetricsSpec() { + memset(static_cast(this), 0, sizeof(PlasmaMetricsSpec)); + } + PlasmaMetricsSpec(int64_t _share_mem_total, int64_t _share_mem_used, int64_t _external_total, int64_t _external_used) + : share_mem_total_(flatbuffers::EndianScalar(_share_mem_total)), + share_mem_used_(flatbuffers::EndianScalar(_share_mem_used)), + external_total_(flatbuffers::EndianScalar(_external_total)), + external_used_(flatbuffers::EndianScalar(_external_used)) { + } + int64_t share_mem_total() const { + return flatbuffers::EndianScalar(share_mem_total_); + } + int64_t share_mem_used() const { + return flatbuffers::EndianScalar(share_mem_used_); + } + int64_t external_total() const { + return flatbuffers::EndianScalar(external_total_); + } + int64_t external_used() const { + return flatbuffers::EndianScalar(external_used_); + } +}; +FLATBUFFERS_STRUCT_END(PlasmaMetricsSpec, 32); + struct PlasmaSetOptionsRequestT : public flatbuffers::NativeTable { typedef PlasmaSetOptionsRequest TableType; std::string client_name; @@ -2981,6 +3029,103 @@ inline flatbuffers::Offset CreatePlasmaRefreshLRUReply( flatbuffers::Offset CreatePlasmaRefreshLRUReply(flatbuffers::FlatBufferBuilder &_fbb, const PlasmaRefreshLRUReplyT *_o, const flatbuffers::rehasher_function_t *_rehasher = nullptr); +struct PlasmaMetricsRequestT : public flatbuffers::NativeTable { + typedef PlasmaMetricsRequest TableType; + PlasmaMetricsRequestT() { + } +}; + +struct PlasmaMetricsRequest FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { + typedef PlasmaMetricsRequestT NativeTableType; + typedef PlasmaMetricsRequestBuilder Builder; + bool Verify(flatbuffers::Verifier &verifier) const { + return VerifyTableStart(verifier) && + verifier.EndTable(); + } + PlasmaMetricsRequestT *UnPack(const flatbuffers::resolver_function_t *_resolver = nullptr) const; + void UnPackTo(PlasmaMetricsRequestT *_o, const flatbuffers::resolver_function_t *_resolver = nullptr) const; + static flatbuffers::Offset Pack(flatbuffers::FlatBufferBuilder &_fbb, const PlasmaMetricsRequestT* _o, const flatbuffers::rehasher_function_t *_rehasher = nullptr); +}; + +struct PlasmaMetricsRequestBuilder { + typedef PlasmaMetricsRequest Table; + flatbuffers::FlatBufferBuilder &fbb_; + flatbuffers::uoffset_t start_; + explicit PlasmaMetricsRequestBuilder(flatbuffers::FlatBufferBuilder &_fbb) + : fbb_(_fbb) { + start_ = fbb_.StartTable(); + } + PlasmaMetricsRequestBuilder &operator=(const PlasmaMetricsRequestBuilder &); + flatbuffers::Offset Finish() { + const auto end = fbb_.EndTable(start_); + auto o = flatbuffers::Offset(end); + return o; + } +}; + +inline flatbuffers::Offset CreatePlasmaMetricsRequest( + flatbuffers::FlatBufferBuilder &_fbb) { + PlasmaMetricsRequestBuilder builder_(_fbb); + return builder_.Finish(); +} + +flatbuffers::Offset CreatePlasmaMetricsRequest(flatbuffers::FlatBufferBuilder &_fbb, const PlasmaMetricsRequestT *_o, const flatbuffers::rehasher_function_t *_rehasher = nullptr); + +struct PlasmaMetricsReplyT : public flatbuffers::NativeTable { + typedef PlasmaMetricsReply TableType; + std::unique_ptr metrics; + PlasmaMetricsReplyT() { + } +}; + +struct PlasmaMetricsReply FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { + typedef PlasmaMetricsReplyT NativeTableType; + typedef PlasmaMetricsReplyBuilder Builder; + enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { + VT_METRICS = 4 + }; + const plasma::flatbuf::PlasmaMetricsSpec *metrics() const { + return GetStruct(VT_METRICS); + } + bool Verify(flatbuffers::Verifier &verifier) const { + return VerifyTableStart(verifier) && + VerifyField(verifier, VT_METRICS) && + verifier.EndTable(); + } + PlasmaMetricsReplyT *UnPack(const flatbuffers::resolver_function_t *_resolver = nullptr) const; + void UnPackTo(PlasmaMetricsReplyT *_o, const flatbuffers::resolver_function_t *_resolver = nullptr) const; + static flatbuffers::Offset Pack(flatbuffers::FlatBufferBuilder &_fbb, const PlasmaMetricsReplyT* _o, const flatbuffers::rehasher_function_t *_rehasher = nullptr); +}; + +struct PlasmaMetricsReplyBuilder { + typedef PlasmaMetricsReply Table; + flatbuffers::FlatBufferBuilder &fbb_; + flatbuffers::uoffset_t start_; + void add_metrics(const plasma::flatbuf::PlasmaMetricsSpec *metrics) { + fbb_.AddStruct(PlasmaMetricsReply::VT_METRICS, metrics); + } + explicit PlasmaMetricsReplyBuilder(flatbuffers::FlatBufferBuilder &_fbb) + : fbb_(_fbb) { + start_ = fbb_.StartTable(); + } + PlasmaMetricsReplyBuilder &operator=(const PlasmaMetricsReplyBuilder &); + flatbuffers::Offset Finish() { + const auto end = fbb_.EndTable(start_); + auto o = flatbuffers::Offset(end); + return o; + } +}; + +inline flatbuffers::Offset CreatePlasmaMetricsReply( + flatbuffers::FlatBufferBuilder &_fbb, + const plasma::flatbuf::PlasmaMetricsSpec *metrics = 0) { + PlasmaMetricsReplyBuilder builder_(_fbb); + builder_.add_metrics(metrics); + return builder_.Finish(); +} + +flatbuffers::Offset CreatePlasmaMetricsReply(flatbuffers::FlatBufferBuilder &_fbb, const PlasmaMetricsReplyT *_o, const flatbuffers::rehasher_function_t *_rehasher = nullptr); + inline PlasmaSetOptionsRequestT *PlasmaSetOptionsRequest::UnPack(const flatbuffers::resolver_function_t *_resolver) const { std::unique_ptr _o = std::unique_ptr(new PlasmaSetOptionsRequestT()); UnPackTo(_o.get(), _resolver); @@ -3978,6 +4123,55 @@ inline flatbuffers::Offset CreatePlasmaRefreshLRUReply(fl _fbb); } +inline PlasmaMetricsRequestT *PlasmaMetricsRequest::UnPack(const flatbuffers::resolver_function_t *_resolver) const { + std::unique_ptr _o = std::unique_ptr(new PlasmaMetricsRequestT()); + UnPackTo(_o.get(), _resolver); + return _o.release(); +} + +inline void PlasmaMetricsRequest::UnPackTo(PlasmaMetricsRequestT *_o, const flatbuffers::resolver_function_t *_resolver) const { + (void)_o; + (void)_resolver; +} + +inline flatbuffers::Offset PlasmaMetricsRequest::Pack(flatbuffers::FlatBufferBuilder &_fbb, const PlasmaMetricsRequestT* _o, const flatbuffers::rehasher_function_t *_rehasher) { + return CreatePlasmaMetricsRequest(_fbb, _o, _rehasher); +} + +inline flatbuffers::Offset CreatePlasmaMetricsRequest(flatbuffers::FlatBufferBuilder &_fbb, const PlasmaMetricsRequestT *_o, const flatbuffers::rehasher_function_t *_rehasher) { + (void)_rehasher; + (void)_o; + struct _VectorArgs { flatbuffers::FlatBufferBuilder *__fbb; const PlasmaMetricsRequestT* __o; const flatbuffers::rehasher_function_t *__rehasher; } _va = { &_fbb, _o, _rehasher}; (void)_va; + return plasma::flatbuf::CreatePlasmaMetricsRequest( + _fbb); +} + +inline PlasmaMetricsReplyT *PlasmaMetricsReply::UnPack(const flatbuffers::resolver_function_t *_resolver) const { + std::unique_ptr _o = std::unique_ptr(new PlasmaMetricsReplyT()); + UnPackTo(_o.get(), _resolver); + return _o.release(); +} + +inline void PlasmaMetricsReply::UnPackTo(PlasmaMetricsReplyT *_o, const flatbuffers::resolver_function_t *_resolver) const { + (void)_o; + (void)_resolver; + { auto _e = metrics(); if (_e) _o->metrics = std::unique_ptr(new plasma::flatbuf::PlasmaMetricsSpec(*_e)); } +} + +inline flatbuffers::Offset PlasmaMetricsReply::Pack(flatbuffers::FlatBufferBuilder &_fbb, const PlasmaMetricsReplyT* _o, const flatbuffers::rehasher_function_t *_rehasher) { + return CreatePlasmaMetricsReply(_fbb, _o, _rehasher); +} + +inline flatbuffers::Offset CreatePlasmaMetricsReply(flatbuffers::FlatBufferBuilder &_fbb, const PlasmaMetricsReplyT *_o, const flatbuffers::rehasher_function_t *_rehasher) { + (void)_rehasher; + (void)_o; + struct _VectorArgs { flatbuffers::FlatBufferBuilder *__fbb; const PlasmaMetricsReplyT* __o; const flatbuffers::rehasher_function_t *__rehasher; } _va = { &_fbb, _o, _rehasher}; (void)_va; + auto _metrics = _o->metrics ? _o->metrics.get() : 0; + return plasma::flatbuf::CreatePlasmaMetricsReply( + _fbb, + _metrics); +} + } // namespace flatbuf } // namespace plasma diff --git a/cpp/src/plasma/protocol.cc b/cpp/src/plasma/protocol.cc index 735636cda9fcd..eaf4272e94504 100644 --- a/cpp/src/plasma/protocol.cc +++ b/cpp/src/plasma/protocol.cc @@ -826,4 +826,31 @@ Status ReadRefreshLRUReply(const uint8_t* data, size_t size) { return Status::OK(); } +// Metrics message +Status SendMetricsRequest(int sock) { + flatbuffers::FlatBufferBuilder fbb; + auto message = fb::CreatePlasmaMetricsRequest(fbb); + return PlasmaSend(sock, MessageType::PlasmaMetricsRequest, &fbb, message); +} + +Status ReadMetricsRequest(const uint8_t* data, size_t size) { return Status::OK(); } + +Status SendMetricsReply(int sock, const PlasmaMetrics* metrics) { + flatbuffers::FlatBufferBuilder fbb; + plasma::flatbuf::PlasmaMetricsSpec metrics_( + metrics->share_mem_total, metrics->share_mem_used, metrics->external_total, + metrics->external_used); + auto message = fb::CreatePlasmaMetricsReply(fbb, &metrics_); + fbb.Finish(message); + return PlasmaSend(sock, MessageType::PlasmaMetricsReply, &fbb, message); +} + +Status ReadMetricsReply(const uint8_t* data, size_t size, PlasmaMetrics* metrics) { + DCHECK(data); + auto message = flatbuffers::GetRoot(data); + DCHECK(VerifyFlatbuffer(message, data, size)); + memcpy(metrics, message->metrics(), sizeof(PlasmaMetrics)); + return Status::OK(); +} + } // namespace plasma diff --git a/cpp/src/plasma/protocol.h b/cpp/src/plasma/protocol.h index 31257be472ce8..da4cbcb5f6dc5 100644 --- a/cpp/src/plasma/protocol.h +++ b/cpp/src/plasma/protocol.h @@ -248,4 +248,12 @@ Status SendRefreshLRUReply(int sock); Status ReadRefreshLRUReply(const uint8_t* data, size_t size); +Status SendMetricsRequest(int sock); + +Status ReadMetricsRequest(const uint8_t* data, size_t size); + +Status SendMetricsReply(int sock, const PlasmaMetrics* metrics); + +Status ReadMetricsReply(const uint8_t* data, size_t size, PlasmaMetrics* metrics); + } // namespace plasma diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc index 032a12fcfac36..fef697499c41c 100644 --- a/cpp/src/plasma/store.cc +++ b/cpp/src/plasma/store.cc @@ -893,6 +893,14 @@ void PlasmaStore::PushNotification(fb::ObjectInfoT* object_info, int client_fd) } } +void PlasmaStore::UpdateMetrics(PlasmaMetrics* metrics) { + metrics->share_mem_total = PlasmaAllocator::GetFootprintLimit(); + metrics->share_mem_used = PlasmaAllocator::Allocated(); + // TODO: get external store info. + metrics->external_total = 0; + metrics->external_used = 0; +} + // Subscribe to notifications about sealed objects. void PlasmaStore::SubscribeToUpdates(Client* client) { ARROW_LOG(DEBUG) << "subscribing to updates on fd " << client->fd; @@ -1136,6 +1144,12 @@ Status PlasmaStore::ProcessMessage(Client* client) { HANDLE_SIGPIPE(SendGetDebugStringReply(client->fd, eviction_policy_.DebugString()), client->fd); } break; + case fb::MessageType::PlasmaMetricsRequest: { + RETURN_NOT_OK(ReadMetricsRequest(input, input_size)); + PlasmaMetrics metrics; + UpdateMetrics(&metrics); + HANDLE_SIGPIPE(SendMetricsReply(client->fd, &metrics), client->fd); + } break; default: // This code should be unreachable. ARROW_CHECK(0); diff --git a/cpp/src/plasma/store.h b/cpp/src/plasma/store.h index 18279893887d7..7d45f3df2b5c4 100644 --- a/cpp/src/plasma/store.h +++ b/cpp/src/plasma/store.h @@ -184,6 +184,8 @@ class PlasmaStore { void AddToClientObjectIds(const ObjectID& object_id, ObjectTableEntry* entry, Client* client); + void UpdateMetrics(PlasmaMetrics* metrics); + /// Remove a GetRequest and clean up the relevant data structures. /// /// \param get_request The GetRequest to remove. diff --git a/cpp/src/plasma/test/client_tests.cc b/cpp/src/plasma/test/client_tests.cc index e3d517b0a0606..03ebc7d53fe87 100644 --- a/cpp/src/plasma/test/client_tests.cc +++ b/cpp/src/plasma/test/client_tests.cc @@ -864,6 +864,23 @@ TEST_F(TestPlasmaStore, ManyObjectTest) { } } +TEST_F(TestPlasmaStore, MetricsTest) { + // Create an object + ObjectID object_id = random_object_id(); + std::vector data(100, 0); + CreateObject(client_, object_id, {42}, data); + + bool has_object; + ARROW_CHECK_OK(client_.Contains(object_id, &has_object)); + ASSERT_TRUE(has_object); + + PlasmaMetrics plasmaMetrics; + ARROW_CHECK_OK(client_.Metrics(&plasmaMetrics)); + + ASSERT_GT(plasmaMetrics.share_mem_used, 0); + ASSERT_LE(plasmaMetrics.share_mem_used, plasmaMetrics.share_mem_total); +} + #ifdef PLASMA_CUDA using arrow::cuda::CudaBuffer; using arrow::cuda::CudaBufferReader; diff --git a/java/plasma/src/main/java/org/apache/arrow/plasma/PlasmaClientJNI.java b/java/plasma/src/main/java/org/apache/arrow/plasma/PlasmaClientJNI.java index da5c17e6be02e..c293e5ab7a50c 100644 --- a/java/plasma/src/main/java/org/apache/arrow/plasma/PlasmaClientJNI.java +++ b/java/plasma/src/main/java/org/apache/arrow/plasma/PlasmaClientJNI.java @@ -54,4 +54,6 @@ public static native byte[][] wait(long conn, byte[][] objectIds, int timeoutMs, public static native long evict(long conn, long numBytes); public static native byte[][] list(long conn); + + public static native int metrics(long conn, long[] metrics); }