Skip to content

Commit

Permalink
Merge branch 'oldrich-s-master'
Browse files Browse the repository at this point in the history
  • Loading branch information
davedoesdev committed May 6, 2022
2 parents 54af69c + 376261b commit 44fa0f4
Show file tree
Hide file tree
Showing 4 changed files with 242 additions and 85 deletions.
30 changes: 10 additions & 20 deletions aedes/qlobber-sub.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,23 @@ QlobberSub.prototype._initial_value = function (val)

let r = {
topic: val.topic,
clientMap: new Map().set(val.clientId, val.qos),
clientMap: new Map().set(val.clientId, { qos: val.qos, rh: val.rh, rap: val.rap, nl: val.nl }),
};

r[Symbol.iterator] = function* (topic)
{
if (topic === undefined)
{
for (let [clientId, qos] of r.clientMap)
for (let [clientId, sub] of r.clientMap)
{
yield { topic: r.topic, clientId, qos };
yield { topic: r.topic, clientId, ...sub };
}
}
else if (r.topic === topic)
{
for (let [clientId, qos] of r.clientMap)
for (let [clientId, sub] of r.clientMap)
{
yield { clientId, qos };
yield { clientId, ...sub };
}
}
};
Expand All @@ -48,7 +48,7 @@ QlobberSub.prototype._add_value = function (existing, val)
var clientMap = existing.clientMap,
size = clientMap.size;

clientMap.set(val.clientId, val.qos);
clientMap.set(val.clientId, { qos: val.qos, rh: val.rh, rap: val.rap, nl: val.nl });

if (clientMap.size > size)
{
Expand All @@ -58,28 +58,18 @@ QlobberSub.prototype._add_value = function (existing, val)

QlobberSub.prototype._add_values = function (dest, existing, topic)
{
var clientIdAndQos;
if (topic === undefined)
{
for (clientIdAndQos of existing.clientMap)
for (let [clientId, sub] of existing.clientMap)
{
dest.push(
{
clientId: clientIdAndQos[0],
topic: existing.topic,
qos: clientIdAndQos[1]
});
dest.push({ clientId, topic: existing.topic, ...sub });
}
}
else if (existing.topic === topic)
{
for (clientIdAndQos of existing.clientMap)
for (let [clientId, sub] of existing.clientMap)
{
dest.push(
{
clientId: clientIdAndQos[0],
qos: clientIdAndQos[1]
});
dest.push({ clientId,...sub });
}
}
};
Expand Down
34 changes: 26 additions & 8 deletions native/src/qlobber_sub.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ class QlobberSub :
return {
val.Get("clientId").As<Napi::String>(),
val.Get("topic").As<Napi::String>(),
static_cast<QoS>(val.Get("qos").As<Napi::Number>().Uint32Value())
static_cast<QoS>(val.Get("qos").As<Napi::Number>().Uint32Value()),
static_cast<RetainHandling>(val.Get("rh").As<Napi::Number>().Uint32Value()),
val.Get("rap").As<Napi::Boolean>(),
val.Get("nl").As<Napi::Boolean>()
};
}

Expand All @@ -67,18 +70,24 @@ class QlobberSub :
const SubStorage& existing,
const std::optional<const std::string>& topic) override {
if (!topic) {
for (const auto& clientIdAndQos : existing.clientMap) {
for (const auto& clientIdAndNode : existing.clientMap) {
Napi::Object obj = Napi::Object::New(dest.Env());
obj.Set("clientId", clientIdAndQos.first);
obj.Set("clientId", clientIdAndNode.first);
obj.Set("topic", existing.topic);
obj.Set("qos", static_cast<uint32_t>(clientIdAndQos.second));
obj.Set("qos", static_cast<uint32_t>(clientIdAndNode.second.qos));
obj.Set("rh", static_cast<uint32_t>(clientIdAndNode.second.rh));
obj.Set("rap", clientIdAndNode.second.rap);
obj.Set("nl", clientIdAndNode.second.nl);
dest.Set(dest.Length(), obj);
}
} else if (existing.topic == topic.value()) {
for (const auto& clientIdAndQos : existing.clientMap) {
for (const auto& clientIdAndNode : existing.clientMap) {
Napi::Object obj = Napi::Object::New(dest.Env());
obj.Set("clientId", clientIdAndQos.first);
obj.Set("qos", static_cast<uint32_t>(clientIdAndQos.second));
obj.Set("clientId", clientIdAndNode.first);
obj.Set("qos", static_cast<uint32_t>(clientIdAndNode.second.qos));
obj.Set("rh", static_cast<uint32_t>(clientIdAndNode.second.rh));
obj.Set("rap", clientIdAndNode.second.rap);
obj.Set("nl", clientIdAndNode.second.nl);
dest.Set(dest.Length(), obj);
}
}
Expand All @@ -103,6 +112,9 @@ Napi::Object FromValue<Sub, Napi::Object>(const Napi::Env& env, const Sub& sub)
obj.Set("clientId", sub.clientId);
obj.Set("topic", sub.topic);
obj.Set("qos", static_cast<uint32_t>(sub.qos));
obj.Set("rh", static_cast<uint32_t>(sub.rh));
obj.Set("rap", sub.rap);
obj.Set("nl", sub.nl);
return obj;
}

Expand All @@ -114,6 +126,9 @@ Napi::Object FromValue<IterSub, Napi::Object>(const Napi::Env& env, const IterSu
obj.Set("topic", sub.topic.value());
}
obj.Set("qos", static_cast<uint32_t>(sub.qos));
obj.Set("rh", static_cast<uint32_t>(sub.rh));
obj.Set("rap", sub.rap);
obj.Set("nl", sub.nl);
return obj;
}

Expand All @@ -122,6 +137,9 @@ Sub ToValue<Sub, Napi::Object>(const Napi::Object& v) {
return Sub {
v.Get("clientId").As<Napi::String>(),
v.Get("topic").As<Napi::String>(),
static_cast<QoS>(v.Get("qos").As<Napi::Number>().Uint32Value())
static_cast<QoS>(v.Get("qos").As<Napi::Number>().Uint32Value()),
static_cast<RetainHandling>(v.Get("rh").As<Napi::Number>().Uint32Value()),
v.Get("rap").As<Napi::Boolean>(),
v.Get("nl").As<Napi::Boolean>()
};
}
58 changes: 48 additions & 10 deletions native/src/qlobber_sub_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,49 @@ enum QoS {
exactly_once = 2
};

enum RetainHandling {
send = 0,
send_if_not_exist = 1,
do_not_send = 2
};

struct Sub {
std::string clientId;
std::string topic;
QoS qos;
RetainHandling rh;
bool rap;
bool nl;
};

struct IterSub {
std::string clientId;
std::optional<std::string> topic;
QoS qos;
RetainHandling rh;
bool rap;
bool nl;
};

struct SubStorageNode {
QoS qos;
RetainHandling rh;
bool rap;
bool nl;
};

struct SubStorage {
SubStorage(const Sub& sub) :
topic(sub.topic) {
clientMap.insert_or_assign(sub.clientId, sub.qos);
clientMap.insert_or_assign(sub.clientId, SubStorageNode {
sub.qos,
sub.rh,
sub.rap,
sub.nl
});
}
std::string topic;
std::unordered_map<std::string, QoS> clientMap;
std::unordered_map<std::string, SubStorageNode> clientMap;
};

struct SubTest {
Expand Down Expand Up @@ -98,7 +122,12 @@ class QlobberSubBase<Sub, MatchResult, Context, std::string, SubTest, IterSub> :
}

void add_value(SubStorage& existing, const Sub& sub) override {
if (existing.clientMap.insert_or_assign(sub.clientId, sub.qos).second) {
if (existing.clientMap.insert_or_assign(sub.clientId, SubStorageNode {
sub.qos,
sub.rh,
sub.rap,
sub.nl
}).second) {
++this->state->subscriptionsCount;
}
}
Expand All @@ -123,19 +152,25 @@ class QlobberSubBase<Sub, MatchResult, Context, std::string, SubTest, IterSub> :
const SubStorage& storage,
const std::optional<const std::string>& topic) override {
if (!topic) {
for (const auto& clientIdAndQos : storage.clientMap) {
for (const auto& clientIdAndNode : storage.clientMap) {
sink(IterSub {
clientIdAndQos.first,
clientIdAndNode.first,
std::optional<std::string>(storage.topic),
clientIdAndQos.second
clientIdAndNode.second.qos,
clientIdAndNode.second.rh,
clientIdAndNode.second.rap,
clientIdAndNode.second.nl
});
}
} else if (storage.topic == topic.value()) {
for (const auto& clientIdAndQos : storage.clientMap) {
for (const auto& clientIdAndNode : storage.clientMap) {
sink(IterSub {
clientIdAndQos.first,
clientIdAndNode.first,
std::nullopt,
clientIdAndQos.second
clientIdAndNode.second.qos,
clientIdAndNode.second.rh,
clientIdAndNode.second.rap,
clientIdAndNode.second.nl
});
}
}
Expand All @@ -152,7 +187,10 @@ class QlobberSubBase<Sub, MatchResult, Context, std::string, SubTest, IterSub> :
Sub {
entry.first,
storage.topic,
entry.second
entry.second.qos,
entry.second.rh,
entry.second.rap,
entry.second.nl
})
}
});
Expand Down
Loading

0 comments on commit 44fa0f4

Please sign in to comment.