Skip to content

Commit

Permalink
Merge pull request #123 from garlick/matchtag
Browse files Browse the repository at this point in the history
implement matchtag protocol enhancement
  • Loading branch information
grondo committed Dec 19, 2014
2 parents cb132c6 + 72bd9e8 commit 49ba060
Show file tree
Hide file tree
Showing 9 changed files with 201 additions and 55 deletions.
50 changes: 49 additions & 1 deletion src/common/libflux/handle.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ struct flux_handle_struct {
void *impl;
reactor_t reactor;
zhash_t *aux;
bool matchtag_pool[256];
};

flux_t flux_handle_create (void *impl, const struct flux_handle_ops *ops, int flags)
Expand Down Expand Up @@ -102,6 +103,23 @@ void flux_aux_set (flux_t h, const char *name, void *aux, FluxFreeFn destroy)
zhash_freefn (h->aux, name, destroy);
}

uint8_t flux_matchtag_alloc (flux_t h)
{
uint8_t t;
for (t = 1; t > 0; t++) {
if (!h->matchtag_pool[t]) {
h->matchtag_pool[t] = true;
break;
}
}
return t;
}

void flux_matchtag_free (flux_t h, uint8_t t)
{
h->matchtag_pool[t] = false;
}

int flux_request_sendmsg (flux_t h, zmsg_t **zmsg)
{
int type;
Expand Down Expand Up @@ -156,7 +174,7 @@ int flux_response_sendmsg (flux_t h, zmsg_t **zmsg)
return h->ops->response_sendmsg (h->impl, zmsg);
}

zmsg_t *flux_response_recvmsg (flux_t h, bool nonblock)
static zmsg_t *flux_response_recvmsg_any (flux_t h, bool nonblock)
{
zmsg_t *zmsg;
int type;
Expand Down Expand Up @@ -189,6 +207,36 @@ int flux_response_putmsg (flux_t h, zmsg_t **zmsg)
return h->ops->response_putmsg (h->impl, zmsg);
}

zmsg_t *flux_response_recvmsg (flux_t h, uint8_t matchtag, bool nonblock)
{
zmsg_t *zmsg = NULL;
zlist_t *nomatch = NULL;

do {
if (!(zmsg = flux_response_recvmsg_any (h, nonblock)))
goto done;
if (matchtag != 0 && !flux_msg_cmp_matchtag (zmsg, matchtag)) {
if (!nomatch && !(nomatch = zlist_new ()))
oom ();
if (h->flags & FLUX_FLAGS_TRACE)
fprintf (stderr, "[deferred: matchtag != %d]\n", matchtag);
if (zlist_append (nomatch, zmsg) < 0)
oom ();
zmsg = NULL;
}
} while (!zmsg);
done:
if (nomatch) {
zmsg_t *z;
while ((z = zlist_pop (nomatch))) {
if (flux_response_putmsg (h, &z) < 0)
zmsg_destroy (&z);
}
zlist_destroy (&nomatch);
}
return zmsg;
}

zmsg_t *flux_event_recvmsg (flux_t h, bool nonblock)
{
zmsg_t *zmsg;
Expand Down
7 changes: 7 additions & 0 deletions src/common/libflux/handle.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#ifndef _FLUX_CORE_HANDLE_H
#define _FLUX_CORE_HANDLE_H

#include <stdint.h>

typedef struct flux_handle_struct *flux_t;

/* Flags for handle creation and flux_flags_set()/flux_flags_unset.
Expand All @@ -23,6 +25,11 @@ void flux_aux_set (flux_t h, const char *name, void *aux, FluxFreeFn destroy);
void flux_flags_set (flux_t h, int flags);
void flux_flags_unset (flux_t h, int flags);

/* Alloc/free a matchtag for matched requests.
*/
uint8_t flux_matchtag_alloc (flux_t h);
void flux_matchtag_free (flux_t h, uint8_t t);

#endif /* !_FLUX_CORE_HANDLE_H */

/*
Expand Down
90 changes: 82 additions & 8 deletions src/common/libflux/message.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,17 @@
#include "src/common/libutil/log.h"

/* Begin manual codec
* We have 4 byte values followed by a 32 bit int (network order).
*/
#define PROTO_MAGIC 0x8e
#define PROTO_VERSION 1
#define PROTO_SIZE 8
#define PROTO_OFF_MAGIC 0
#define PROTO_OFF_VERSION 1
#define PROTO_OFF_TYPE 2
#define PROTO_OFF_FLAGS 3
#define PROTO_OFF_BIGINT 4
#define PROTO_SIZE 9
#define PROTO_OFF_MAGIC 0 /* 1 byte */
#define PROTO_OFF_VERSION 1 /* 1 byte */
#define PROTO_OFF_TYPE 2 /* 1 byte */
#define PROTO_OFF_FLAGS 3 /* 1 byte */
#define PROTO_OFF_BIGINT 4 /* 4 bytes */
#define PROTO_OFF_MATCHTAG 8 /* 1 byte */

static int proto_set_bigint (uint8_t *data, int len, uint32_t bigint);

static int proto_set_type (uint8_t *data, int len, int type)
Expand Down Expand Up @@ -113,6 +114,22 @@ static int proto_get_bigint (uint8_t *data, int len, uint32_t *bigint)
*bigint = ntohl (x);
return 0;
}
static int proto_set_matchtag (uint8_t *data, int len, uint8_t matchtag)
{
if (len < PROTO_SIZE || data[PROTO_OFF_MAGIC] != PROTO_MAGIC
|| data[PROTO_OFF_VERSION] != PROTO_VERSION)
return -1;
data[PROTO_OFF_MATCHTAG] = matchtag;
return 0;
}
static int proto_get_matchtag (uint8_t *data, int len, uint8_t *val)
{
if (len < PROTO_SIZE || data[PROTO_OFF_MAGIC] != PROTO_MAGIC
|| data[PROTO_OFF_VERSION] != PROTO_VERSION)
return -1;
*val = data[PROTO_OFF_MATCHTAG];
return 0;
}
static void proto_init (uint8_t *data, int len, uint8_t flags)
{
assert (len >= PROTO_SIZE);
Expand Down Expand Up @@ -276,6 +293,44 @@ int flux_msg_get_seq (zmsg_t *zmsg, uint32_t *seq)
return 0;
}

int flux_msg_set_matchtag (zmsg_t *zmsg, uint8_t t)
{
zframe_t *zf = zmsg_last (zmsg);
int type;

if (!zf || proto_get_type (zframe_data (zf), zframe_size (zf), &type) < 0
|| (type != FLUX_MSGTYPE_REQUEST && type != FLUX_MSGTYPE_RESPONSE)
|| proto_set_matchtag (zframe_data (zf), zframe_size (zf), t) < 0) {
errno = EINVAL;
return -1;
}
return 0;
}

int flux_msg_get_matchtag (zmsg_t *zmsg, uint8_t *t)
{
zframe_t *zf = zmsg_last (zmsg);
int type;

if (!zf || proto_get_type (zframe_data (zf), zframe_size (zf), &type) < 0
|| (type != FLUX_MSGTYPE_REQUEST && type != FLUX_MSGTYPE_RESPONSE)
|| proto_get_matchtag (zframe_data (zf), zframe_size (zf), t) < 0) {
errno = EPROTO;
return -1;
}
return 0;
}

bool flux_msg_cmp_matchtag (zmsg_t *zmsg, uint8_t t)
{
uint8_t q;
if (flux_msg_get_matchtag (zmsg, &q) < 0)
return false;
if (q != t)
return false;
return true;
}

int flux_msg_enable_route (zmsg_t *zmsg)
{
uint8_t flags;
Expand Down Expand Up @@ -1195,9 +1250,27 @@ void check_proto (void)
zmsg_destroy (&zmsg);
}

void check_matchtag (void)
{
zmsg_t *zmsg;
uint8_t t;

ok ((zmsg = flux_msg_create (FLUX_MSGTYPE_REQUEST)) != NULL,
"flux_msg_create works");
ok (flux_msg_get_matchtag (zmsg, &t) == 0 && t == 0,
"flux_msg_get_matchtag returns 0 when uninitialized");
ok (flux_msg_set_matchtag (zmsg, 42) == 0,
"flux_msg_set_matchtag works");
ok (flux_msg_get_matchtag (zmsg, &t) == 0 && t == 42,
"flux_msg_get_matchtag returns set value");
ok (flux_msg_cmp_matchtag (zmsg, 42) && !flux_msg_cmp_matchtag (zmsg, 0),
"flux_msg_cmp_matchtag works");
zmsg_destroy (&zmsg);
}

int main (int argc, char *argv[])
{
plan (91);
plan (96);

lives_ok ({zmsg_test (false);}, // 1
"zmsg_test doesn't assert");
Expand All @@ -1206,6 +1279,7 @@ int main (int argc, char *argv[])
check_routes (); // 26
check_topic (); // 11
check_payload (); // 21
check_matchtag (); // 5

check_legacy_encode (); // 5
check_legacy_encode_json (); // 8
Expand Down
6 changes: 6 additions & 0 deletions src/common/libflux/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ int flux_msg_get_errnum (zmsg_t *zmsg, int *errnum);
int flux_msg_set_seq (zmsg_t *zmsg, uint32_t seq);
int flux_msg_get_seq (zmsg_t *zmsg, uint32_t *seq);

/* Get/set/compare match tag (request/response only)
*/
int flux_msg_set_matchtag (zmsg_t *zmsg, uint8_t matchtag);
int flux_msg_get_matchtag (zmsg_t *zmsg, uint8_t *matchtag);
bool flux_msg_cmp_matchtag (zmsg_t *zmsg, uint8_t matchtag);

/* NOTE: routing frames are pushed on a message traveling dealer
* to router, and popped off a message traveling router to dealer.
* A message intended for dealer-router sockets must first be enabled for
Expand Down
49 changes: 15 additions & 34 deletions src/common/libflux/request.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,33 +34,6 @@
#include "src/common/libutil/shortjson.h"


static zmsg_t *response_matched_recvmsg (flux_t h, const char *match, bool nb)
{
zmsg_t *zmsg, *response = NULL;
zlist_t *nomatch = NULL;

do {
if (!(response = flux_response_recvmsg (h, nb)))
goto done;
if (!flux_msg_streq_topic (response, match)) {
if (!nomatch && !(nomatch = zlist_new ()))
oom ();
if (zlist_append (nomatch, response) < 0)
oom ();
response = NULL;
}
} while (!response);
done:
if (nomatch) {
while ((zmsg = zlist_pop (nomatch))) {
if (flux_response_putmsg (h, &zmsg) < 0)
zmsg_destroy (&zmsg);
}
zlist_destroy (&nomatch);
}
return response;
}

/* If 'o' is non-NULL, encode to string and set as payload in 'zmsg'.
* Otherwise, clear payload in 'zmsg', if any.
* Return 0 on success, -1 on failure with errno set.
Expand Down Expand Up @@ -194,7 +167,8 @@ int flux_response_decode (zmsg_t *zmsg)
return rc;
}

int flux_json_request (flux_t h, uint32_t nodeid, const char *topic, JSON in)
int flux_json_request (flux_t h, uint32_t nodeid, uint8_t matchtag,
const char *topic, JSON in)
{
zmsg_t *zmsg;
int rc = -1;
Expand All @@ -207,6 +181,8 @@ int flux_json_request (flux_t h, uint32_t nodeid, const char *topic, JSON in)
goto done;
if (flux_msg_set_nodeid (zmsg, nodeid) < 0)
goto done;
if (flux_msg_set_matchtag (zmsg, matchtag) < 0)
goto done;
if (flux_msg_set_topic (zmsg, topic) < 0)
goto done;
if (msg_set_payload_json (zmsg, in) < 0)
Expand All @@ -219,18 +195,22 @@ int flux_json_request (flux_t h, uint32_t nodeid, const char *topic, JSON in)
return rc;
}


int flux_json_rpc (flux_t h, uint32_t nodeid, const char *topic,
JSON in, JSON *out)
{
zmsg_t *zmsg = NULL;
int rc = -1;
int errnum;
JSON o;
uint8_t matchtag;

if (flux_json_request (h, nodeid, topic, in) < 0)
if (!(matchtag = flux_matchtag_alloc (h))) {
errno = EAGAIN;
goto done;
}
if (flux_json_request (h, nodeid, matchtag, topic, in) < 0)
goto done;
if (!(zmsg = response_matched_recvmsg (h, topic, false)))
if (!(zmsg = flux_response_recvmsg (h, matchtag, false)))
goto done;
if (flux_msg_get_errnum (zmsg, &errnum) < 0)
goto done;
Expand All @@ -257,6 +237,7 @@ int flux_json_rpc (flux_t h, uint32_t nodeid, const char *topic,
*out = o;
rc = 0;
done:
flux_matchtag_free (h, matchtag);
return rc;
}

Expand Down Expand Up @@ -329,7 +310,7 @@ int flux_rank_request_send (flux_t h, int rank, JSON o, const char *fmt, ...)
topic = xvasprintf (fmt, ap);
va_end (ap);

rc = flux_json_request (h, nodeid, topic, o);
rc = flux_json_request (h, nodeid, 0, topic, o);
free (topic);
return rc;
}
Expand Down Expand Up @@ -360,7 +341,7 @@ int flux_request_send (flux_t h, JSON o, const char *fmt, ...)
topic = xvasprintf (fmt, ap);
va_end (ap);

rc = flux_json_request (h, FLUX_NODEID_ANY, topic, o);
rc = flux_json_request (h, FLUX_NODEID_ANY, 0, topic, o);
free (topic);
return rc;
}
Expand All @@ -370,7 +351,7 @@ int flux_response_recv (flux_t h, JSON *respp, char **tagp, bool nb)
zmsg_t *zmsg;
int rc = -1;

if (!(zmsg = flux_response_recvmsg (h, nb)))
if (!(zmsg = flux_response_recvmsg (h, 0, nb)))
goto done;
if (flux_msg_get_errnum (zmsg, &errno) < 0 || errno != 0)
goto done;
Expand Down
16 changes: 10 additions & 6 deletions src/common/libflux/request.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ zmsg_t *flux_request_recvmsg (flux_t h, bool nonblock);
*/
int flux_response_sendmsg (flux_t h, zmsg_t **zmsg);

/* Receive a response message, blocking until one is available.
* If 'nonblock' and none is available, return NULL with errno == EAGAIN.
/* Receive a response message matching 'matchtag', blocking until one is
* available. If 'nonblock' and none is * available, return NULL
* with errno == EAGAIN. If 'matchtag' is 0, match any message.
* Returns message on success, or NULL on failure with errno set.
*/
zmsg_t *flux_response_recvmsg (flux_t h, bool nonblock);
zmsg_t *flux_response_recvmsg (flux_t h, uint8_t matchtag, bool nonblock);

/* Put a response message in the handle's inbound message queue for processing
* in FIFO order, before other unprocessed messages. The handle will become
Expand All @@ -48,11 +49,13 @@ int flux_response_putmsg (flux_t h, zmsg_t **zmsg);

/* Send a request to 'nodeid' (may be FLUX_NODEID_ANY) addressed to 'topic'.
* If 'in' is non-NULL, attach JSON payload, caller retains ownership.
* Do not wait for a response.
* Set 'matchtag' to zero to disable tag matching, or allocate/free one
* from the handle with flux_matchtag_alloc()/flux_matchtag_free().
* This function does not wait for a response message.
* Returns 0 on success, or -1 on failure with errno set.
*/
int flux_json_request (flux_t h, uint32_t nodeid, const char *topic,
json_object *in);
int flux_json_request (flux_t h, uint32_t nodeid, uint8_t matchtag,
const char *topic, json_object *in);

/* Send a request to 'nodeid' (may be FLUX_NODEID_ANY) addressed to 'topic'.
* If 'in' is non-NULL, attach JSON payload, caller retains ownership.
Expand Down Expand Up @@ -99,6 +102,7 @@ int flux_json_response_decode (zmsg_t *zmsg, json_object **out);
*/
int flux_response_decode (zmsg_t *zmsg);


/**
** Deprecated interfaces.
**/
Expand Down
Loading

0 comments on commit 49ba060

Please sign in to comment.