Skip to content

Commit

Permalink
in_xbee: Support another payload format
Browse files Browse the repository at this point in the history
in_xbee now recoginize [ time, { key => val, ... } ] format.

Signed-off-by: Takeshi HASEGAWA <[email protected]>
  • Loading branch information
hasegaw committed Jul 2, 2015
1 parent a4860a0 commit d3fafc0
Showing 1 changed file with 54 additions and 13 deletions.
67 changes: 54 additions & 13 deletions plugins/in_xbee/in_xbee.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,36 +75,77 @@ void in_xbee_rx_queue_raw(struct flb_in_xbee_config *ctx, const char *buf ,int l
pthread_mutex_unlock(&ctx->mtx_mp);
}


/*
* This plugin accepts following formats of MessagePack:
* { map => val, map => val, map => val }
* or [ time, { map => val, map => val, map => val } ]
*/
int in_xbee_rx_queue_msgpack(struct flb_in_xbee_config *ctx, const char *buf ,int len)
{
msgpack_unpacked result;
msgpack_unpacked_init(&result);
msgpack_unpacked record;
msgpack_unpacked field;
msgpack_unpacked_init(&record);
msgpack_unpacked_init(&field);

size_t off = 0;
size_t start = 0;
size_t off2;
size_t mp_offset;
int queued = 0;
uint64_t t;

pthread_mutex_lock(&ctx->mtx_mp);

while (msgpack_unpack_next(&record, buf, len, &off)) {
if (record.data.type == MSGPACK_OBJECT_ARRAY && record.data.via.array.size == 2) {
/* [ time, { map => val, map => val, map => val } ] */

while (msgpack_unpack_next(&result, buf, len, &off)) {
if (result.data.type != MSGPACK_OBJECT_MAP)
break;
msgpack_unpacked_destroy(&field);
msgpack_unpacked_init(&field);
off2 = 0;

if (! msgpack_unpack_next(&field, buf + 1, len - 1, &off2))
break;

if (field.data.type != MSGPACK_OBJECT_POSITIVE_INTEGER)
break;

in_xbee_flush_if_needed(ctx);
t = field.data.via.u64;
mp_offset = off2;

/* Increase buffer position */
ctx->buffer_id++;
if (! msgpack_unpack_next(&field, buf + 1, len - 1, &off2))
break;

msgpack_pack_array(&ctx->mp_pck, 2);
msgpack_pack_uint64(&ctx->mp_pck, time(NULL));
msgpack_pack_bin_body(&ctx->mp_pck, buf + start, off - start);
if (field.data.type != MSGPACK_OBJECT_MAP)
break;

in_xbee_flush_if_needed(ctx);
ctx->buffer_id++;

msgpack_pack_array(&ctx->mp_pck, 2);
msgpack_pack_uint64(&ctx->mp_pck, t);
msgpack_pack_bin_body(&ctx->mp_pck, (char*) buf + 1 + mp_offset, off2 - mp_offset);

} else if (record.data.type == MSGPACK_OBJECT_MAP) {
/* { map => val, map => val, map => val } */

in_xbee_flush_if_needed(ctx);
ctx->buffer_id++;

msgpack_pack_array(&ctx->mp_pck, 2);
msgpack_pack_uint64(&ctx->mp_pck, time(NULL));
msgpack_pack_bin_body(&ctx->mp_pck, buf + start, off - start);

} else {
break;

}
start = off;
queued++;
}

msgpack_unpacked_destroy(&result);
msgpack_unpacked_destroy(&record);
msgpack_unpacked_destroy(&field);
pthread_mutex_unlock(&ctx->mtx_mp);
return queued;
}
Expand Down

0 comments on commit d3fafc0

Please sign in to comment.