Skip to content

Commit

Permalink
Merge pull request #994 from garlick/event_error
Browse files Browse the repository at this point in the history
return event subscribe/unsubscribe errors to local connector users
  • Loading branch information
grondo authored Feb 27, 2017
2 parents 8ff2115 + 325780f commit 1a1ab10
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 13 deletions.
1 change: 1 addition & 0 deletions src/bindings/python/test/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def test_t1_0_sub(self):

def test_t1_1_unsub(self):
"""Unsubscribe from an event"""
self.assertGreaterEqual(self.f.event_subscribe("testevent.1"), 0)
self.assertGreaterEqual(self.f.event_unsubscribe("testevent.1"), 0)

def test_full_event(self):
Expand Down
5 changes: 3 additions & 2 deletions src/cmd/flux-event.c
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,9 @@ static int event_sub (optparse_t *p, int argc, char **argv)
}
/* FIXME: add SIGINT handler to exit above loop and clean up.
*/
if (argc > 1)
unsubscribe_all (h, argc - 1, argv + 1);
n = optparse_option_index (p);
if (n < argc)
unsubscribe_all (h, argc - n, argv + n);
else if (flux_event_unsubscribe (h, "") < 0)
log_err_exit ("flux_event_subscribe");
return (0);
Expand Down
10 changes: 6 additions & 4 deletions src/connectors/local/local.c
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,15 @@ static flux_msg_t *op_recv (void *impl, int flags)
static int op_event (void *impl, const char *topic, const char *msg_topic)
{
local_ctx_t *c = impl;
flux_rpc_t *rpc;
int rc = -1;

assert (c->magic == CTX_MAGIC);
flux_rpc_t *rpc = NULL;
int rc = 0;

if (!(rpc = flux_rpcf (c->h, msg_topic, FLUX_NODEID_ANY, 0,
"{s:s}", "topic", topic))
|| flux_rpc_get (rpc, NULL) < 0)
"{s:s}", "topic", topic)))
goto done;
if (flux_rpc_get (rpc, NULL) < 0)
goto done;
rc = 0;
done:
Expand Down
4 changes: 3 additions & 1 deletion src/modules/connector-local/local.c
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,10 @@ static int client_unsubscribe (client_t *c, const char *topic)
subscription_t *sub;
int rc = -1;

if (!(sub = zhash_lookup (c->subscriptions, topic)))
if (!(sub = zhash_lookup (c->subscriptions, topic))) {
errno = ENOENT;
goto done;
}
if (--sub->usecount == 0) {
zhash_delete (c->subscriptions, topic);
//flux_log (c->ctx->h, LOG_DEBUG, "%s: %s", __FUNCTION__, topic);
Expand Down
17 changes: 11 additions & 6 deletions t/lua/t0003-events.t
Original file line number Diff line number Diff line change
@@ -1,25 +1,30 @@
#!/usr/bin/env lua
--
-- Basic flux reactor testing using ping interface to kvs
-- Basic flux event testing
--
local test = require 'fluxometer'.init (...)
test:start_session {}

local fmt = string.format

plan (20)
plan (22)

local flux = require_ok ('flux')
local f, err = flux.new()
type_ok (f, 'userdata', "create new flux handle")
is (err, nil, "error is nil")

local rc, err = f:subscribe ("testevent.")
isnt (rc, -1, "subscribe: return code >= 0")
isnt (rc, nil, "subscribe: return code != nil")
is (err, nil, "subscribe: error is nil")

local rc, err = f:unsubscribe ("notmytopic")
is (rc, nil, "unsubscribe: return code == nil")
is (err, "No such file or directory",
"unsubscribe: error is No such file or directory")

local rc, err = f:sendevent ({ test = "xxx" }, "testevent.1")
isnt (rc, -1, "sendevent: return code >= 0")
isnt (rc, nil, "sendevent: return code != nil")
is (err, nil, "sendevent: error is nil")

local msg, tag = f:recv_event ()
Expand All @@ -30,7 +35,7 @@ is (msg.test, "xxx", "recv_event: got payload intact")

-- sendevent takes string.format args...
local rc, err = f:sendevent ({ test = "yyy"}, "testevent.%d", 2)
isnt (rc, -1, "sendevent: return code >= 0")
isnt (rc, nil, "sendevent: return code != nil")
is (err, nil, "sendevent: error is nil")

local msg, tag = f:recv_event ()
Expand All @@ -41,7 +46,7 @@ is (msg.test, "yyy", "recv_event: got payload intact")

-- sendevent with empty payload...
local rc, err = f:sendevent ("testevent.%d", 2)
isnt (rc, -1, "sendevent: return code >= 0")
isnt (rc, nil, "sendevent: return code != nil")
is (err, nil, "sendevent: error is nil")

local msg, tag = f:recv_event ()
Expand Down

0 comments on commit 1a1ab10

Please sign in to comment.