From e0a82d9a0e894b8b699492be7f185a3c4d025c67 Mon Sep 17 00:00:00 2001 From: Masahiro Sakamoto Date: Thu, 14 Mar 2019 16:16:17 +0900 Subject: [PATCH] Support Producer.flush() --- examples/producer.js | 7 +++---- package-lock.json | 2 +- perf/perf_producer.js | 7 +++---- src/Producer.cc | 39 +++++++++++++++++++++++++++++++++++++-- src/Producer.h | 1 + 5 files changed, 45 insertions(+), 11 deletions(-) diff --git a/examples/producer.js b/examples/producer.js index 8730719d546e80..2dda9cc8359545 100644 --- a/examples/producer.js +++ b/examples/producer.js @@ -34,15 +34,14 @@ const Pulsar = require('../index.js'); }); // Send messages - const results = []; for (let i = 0; i < 10; i += 1) { const msg = `my-message-${i}`; - results.push(producer.send({ + producer.send({ data: Buffer.from(msg), - })); + }); console.log(`Sent message: ${msg}`); } - await Promise.all(results); + await producer.flush(); await producer.close(); await client.close(); diff --git a/package-lock.json b/package-lock.json index 25d495ee8985ee..0adfd94aa31c60 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "pulsar-client", - "version": "0.0.1", + "version": "2.4.0-SNAPSHOT", "lockfileVersion": 1, "requires": true, "dependencies": { diff --git a/perf/perf_producer.js b/perf/perf_producer.js index 19f251c8a88fd0..fbe12f14568882 100644 --- a/perf/perf_producer.js +++ b/perf/perf_producer.js @@ -85,17 +85,16 @@ const Pulsar = require('../index.js'); // measure await delay(1000); const startMeasureTimeMilliSeconds = performance.now(); - const results = []; for (let mi = 0; mi < numOfMessages; mi += 1) { const startSendTimeMilliSeconds = performance.now(); - results.push(producer.send({ + producer.send({ data: message, }).then(() => { // add latency histogram.recordValue((performance.now() - startSendTimeMilliSeconds)); - })); + }); } - await Promise.all(results); + await producer.flush(); const endMeasureTimeMilliSeconds = performance.now(); // result diff --git a/src/Producer.cc b/src/Producer.cc index a19d828069e1e2..041695eae78794 100644 --- a/src/Producer.cc +++ b/src/Producer.cc @@ -27,8 +27,10 @@ Napi::FunctionReference Producer::constructor; void Producer::Init(Napi::Env env, Napi::Object exports) { Napi::HandleScope scope(env); - Napi::Function func = DefineClass( - env, "Producer", {InstanceMethod("send", &Producer::Send), InstanceMethod("close", &Producer::Close)}); + Napi::Function func = + DefineClass(env, "Producer", + {InstanceMethod("send", &Producer::Send), InstanceMethod("flush", &Producer::Flush), + InstanceMethod("close", &Producer::Close)}); constructor = Napi::Persistent(func); constructor.SuppressDestruct(); @@ -119,6 +121,39 @@ Napi::Value Producer::Send(const Napi::CallbackInfo &info) { return deferred.Promise(); } +class ProducerFlushWorker : public Napi::AsyncWorker { + public: + ProducerFlushWorker(const Napi::Promise::Deferred &deferred, pulsar_producer_t *cProducer) + : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})), + deferred(deferred), + cProducer(cProducer) {} + + ~ProducerFlushWorker() {} + + void Execute() { + pulsar_result result = pulsar_producer_flush(this->cProducer); + if (result != pulsar_result_Ok) SetError(pulsar_result_str(result)); + } + + void OnOK() { this->deferred.Resolve(Env().Null()); } + + void OnError(const Napi::Error &e) { + this->deferred.Reject( + Napi::Error::New(Env(), std::string("Failed to flush producer: ") + e.Message()).Value()); + } + + private: + Napi::Promise::Deferred deferred; + pulsar_producer_t *cProducer; +}; + +Napi::Value Producer::Flush(const Napi::CallbackInfo &info) { + Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env()); + ProducerFlushWorker *wk = new ProducerFlushWorker(deferred, this->cProducer); + wk->Queue(); + return deferred.Promise(); +} + class ProducerCloseWorker : public Napi::AsyncWorker { public: ProducerCloseWorker(const Napi::Promise::Deferred &deferred, pulsar_producer_t *cProducer) diff --git a/src/Producer.h b/src/Producer.h index 5d31cfcf1c14c3..ea9b112e816096 100644 --- a/src/Producer.h +++ b/src/Producer.h @@ -36,6 +36,7 @@ class Producer : public Napi::ObjectWrap { private: pulsar_producer_t *cProducer; Napi::Value Send(const Napi::CallbackInfo &info); + Napi::Value Flush(const Napi::CallbackInfo &info); Napi::Value Close(const Napi::CallbackInfo &info); };