Skip to content

Commit

Permalink
Merge pull request apache#23 from massakam/producer-flush
Browse files Browse the repository at this point in the history
[Issue 2] Support Producer.flush()
  • Loading branch information
merlimat authored Mar 14, 2019
2 parents 2f977b9 + e0a82d9 commit 87e7666
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 11 deletions.
7 changes: 3 additions & 4 deletions examples/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,14 @@ const Pulsar = require('../index.js');
});

// Send messages
const results = [];
for (let i = 0; i < 10; i += 1) {
const msg = `my-message-${i}`;
results.push(producer.send({
producer.send({
data: Buffer.from(msg),
}));
});
console.log(`Sent message: ${msg}`);
}
await Promise.all(results);
await producer.flush();

await producer.close();
await client.close();
Expand Down
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 3 additions & 4 deletions perf/perf_producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,17 +85,16 @@ const Pulsar = require('../index.js');
// measure
await delay(1000);
const startMeasureTimeMilliSeconds = performance.now();
const results = [];
for (let mi = 0; mi < numOfMessages; mi += 1) {
const startSendTimeMilliSeconds = performance.now();
results.push(producer.send({
producer.send({
data: message,
}).then(() => {
// add latency
histogram.recordValue((performance.now() - startSendTimeMilliSeconds));
}));
});
}
await Promise.all(results);
await producer.flush();
const endMeasureTimeMilliSeconds = performance.now();

// result
Expand Down
39 changes: 37 additions & 2 deletions src/Producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ Napi::FunctionReference Producer::constructor;
void Producer::Init(Napi::Env env, Napi::Object exports) {
Napi::HandleScope scope(env);

Napi::Function func = DefineClass(
env, "Producer", {InstanceMethod("send", &Producer::Send), InstanceMethod("close", &Producer::Close)});
Napi::Function func =
DefineClass(env, "Producer",
{InstanceMethod("send", &Producer::Send), InstanceMethod("flush", &Producer::Flush),
InstanceMethod("close", &Producer::Close)});

constructor = Napi::Persistent(func);
constructor.SuppressDestruct();
Expand Down Expand Up @@ -119,6 +121,39 @@ Napi::Value Producer::Send(const Napi::CallbackInfo &info) {
return deferred.Promise();
}

class ProducerFlushWorker : public Napi::AsyncWorker {
public:
ProducerFlushWorker(const Napi::Promise::Deferred &deferred, pulsar_producer_t *cProducer)
: AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})),
deferred(deferred),
cProducer(cProducer) {}

~ProducerFlushWorker() {}

void Execute() {
pulsar_result result = pulsar_producer_flush(this->cProducer);
if (result != pulsar_result_Ok) SetError(pulsar_result_str(result));
}

void OnOK() { this->deferred.Resolve(Env().Null()); }

void OnError(const Napi::Error &e) {
this->deferred.Reject(
Napi::Error::New(Env(), std::string("Failed to flush producer: ") + e.Message()).Value());
}

private:
Napi::Promise::Deferred deferred;
pulsar_producer_t *cProducer;
};

Napi::Value Producer::Flush(const Napi::CallbackInfo &info) {
Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
ProducerFlushWorker *wk = new ProducerFlushWorker(deferred, this->cProducer);
wk->Queue();
return deferred.Promise();
}

class ProducerCloseWorker : public Napi::AsyncWorker {
public:
ProducerCloseWorker(const Napi::Promise::Deferred &deferred, pulsar_producer_t *cProducer)
Expand Down
1 change: 1 addition & 0 deletions src/Producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class Producer : public Napi::ObjectWrap<Producer> {
private:
pulsar_producer_t *cProducer;
Napi::Value Send(const Napi::CallbackInfo &info);
Napi::Value Flush(const Napi::CallbackInfo &info);
Napi::Value Close(const Napi::CallbackInfo &info);
};

Expand Down

0 comments on commit 87e7666

Please sign in to comment.