diff --git a/plugins/in_xbee/in_xbee.c b/plugins/in_xbee/in_xbee.c index 0f8e8d831b1..0a6312c2454 100644 --- a/plugins/in_xbee/in_xbee.c +++ b/plugins/in_xbee/in_xbee.c @@ -75,36 +75,77 @@ void in_xbee_rx_queue_raw(struct flb_in_xbee_config *ctx, const char *buf ,int l pthread_mutex_unlock(&ctx->mtx_mp); } - +/* + * This plugin accepts following formats of MessagePack: + * { map => val, map => val, map => val } + * or [ time, { map => val, map => val, map => val } ] + */ int in_xbee_rx_queue_msgpack(struct flb_in_xbee_config *ctx, const char *buf ,int len) { - msgpack_unpacked result; - msgpack_unpacked_init(&result); + msgpack_unpacked record; + msgpack_unpacked field; + msgpack_unpacked_init(&record); + msgpack_unpacked_init(&field); size_t off = 0; size_t start = 0; + size_t off2; + size_t mp_offset; int queued = 0; + uint64_t t; pthread_mutex_lock(&ctx->mtx_mp); + + while (msgpack_unpack_next(&record, buf, len, &off)) { + if (record.data.type == MSGPACK_OBJECT_ARRAY && record.data.via.array.size == 2) { + /* [ time, { map => val, map => val, map => val } ] */ - while (msgpack_unpack_next(&result, buf, len, &off)) { - if (result.data.type != MSGPACK_OBJECT_MAP) - break; + msgpack_unpacked_destroy(&field); + msgpack_unpacked_init(&field); + off2 = 0; + + if (! msgpack_unpack_next(&field, buf + 1, len - 1, &off2)) + break; + + if (field.data.type != MSGPACK_OBJECT_POSITIVE_INTEGER) + break; - in_xbee_flush_if_needed(ctx); + t = field.data.via.u64; + mp_offset = off2; - /* Increase buffer position */ - ctx->buffer_id++; + if (! msgpack_unpack_next(&field, buf + 1, len - 1, &off2)) + break; - msgpack_pack_array(&ctx->mp_pck, 2); - msgpack_pack_uint64(&ctx->mp_pck, time(NULL)); - msgpack_pack_bin_body(&ctx->mp_pck, buf + start, off - start); + if (field.data.type != MSGPACK_OBJECT_MAP) + break; + + in_xbee_flush_if_needed(ctx); + ctx->buffer_id++; + + msgpack_pack_array(&ctx->mp_pck, 2); + msgpack_pack_uint64(&ctx->mp_pck, t); + msgpack_pack_bin_body(&ctx->mp_pck, (char*) buf + 1 + mp_offset, off2 - mp_offset); + + } else if (record.data.type == MSGPACK_OBJECT_MAP) { + /* { map => val, map => val, map => val } */ + + in_xbee_flush_if_needed(ctx); + ctx->buffer_id++; + + msgpack_pack_array(&ctx->mp_pck, 2); + msgpack_pack_uint64(&ctx->mp_pck, time(NULL)); + msgpack_pack_bin_body(&ctx->mp_pck, buf + start, off - start); + + } else { + break; + } start = off; queued++; } - msgpack_unpacked_destroy(&result); + msgpack_unpacked_destroy(&record); + msgpack_unpacked_destroy(&field); pthread_mutex_unlock(&ctx->mtx_mp); return queued; }