Skip to content

Commit

Permalink
out_websocket: Bug fixing for out_ws for recovery from server failure…
Browse files Browse the repository at this point in the history
… and other side-effect bugs (#3010)

* bug fixing for out_websocket plugin recovery after server recovers from failure
* comment out func call flb_output_upstream_set(ctx->u, ins);  revert #2965
* bug fixing by adding config_map mechanism

Signed-off-by: Fenggang <[email protected]>
  • Loading branch information
ginobiliwang authored Feb 8, 2021
1 parent d767f2d commit 4014194
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 17 deletions.
20 changes: 20 additions & 0 deletions plugins/out_websocket/websocket.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <fluent-bit/flb_pack.h>
#include <fluent-bit/flb_sds.h>
#include <fluent-bit/flb_http_client.h>
#include <fluent-bit/flb_config_map.h>
#include <msgpack.h>

#include "websocket.h"
Expand Down Expand Up @@ -257,6 +258,7 @@ static void cb_ws_flush(const void *data, size_t bytes,
}
if (ret == -1) {
flb_error("[out_ws] dataFrameHeader sent failed");
ctx->handshake = 1;
flb_upstream_conn_release(u_conn);
FLB_OUTPUT_RETURN(FLB_RETRY);
}
Expand All @@ -272,6 +274,7 @@ static void cb_ws_flush(const void *data, size_t bytes,

//flb_info("[out_ws] sendDataFrame number of bytes sent = %i", ret);
if (ret == -1) {
ctx->handshake = 1;
flb_upstream_conn_release(u_conn);
FLB_OUTPUT_RETURN(FLB_RETRY);
}
Expand All @@ -281,12 +284,29 @@ static void cb_ws_flush(const void *data, size_t bytes,
FLB_OUTPUT_RETURN(FLB_OK);
}

/* Configuration properties map */
static struct flb_config_map config_map[] = {
{
FLB_CONFIG_MAP_STR, "uri", NULL,
0, FLB_TRUE, offsetof(struct flb_out_ws, uri),
"Specify an optional URI for the target web socket server, e.g: /something"
},
{
FLB_CONFIG_MAP_STR, "format", NULL,
0, FLB_FALSE, 0,
"Set desired payload format: json, json_stream, json_lines, gelf or msgpack"
},
/* EOF */
{0}
};

/* Plugin reference */
struct flb_output_plugin out_websocket_plugin = {
.name = "websocket",
.description = "Websocket",
.cb_init = cb_ws_init,
.cb_flush = cb_ws_flush,
.cb_exit = cb_ws_exit,
.config_map = config_map,
.flags = FLB_OUTPUT_NET | FLB_IO_OPT_TLS,
};
4 changes: 3 additions & 1 deletion plugins/out_websocket/websocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include <fluent-bit/flb_info.h>
#include <fluent-bit/flb_sds.h>

#define WEBSOCKET_INPUT_IDLE_INTERVAL 20
/*
* Configuration: we put this separate from the main
* context so every Upstream Node can have it own configuration
Expand All @@ -48,6 +47,9 @@ struct flb_out_ws {
int handshake;
time_t last_input_timestamp;
int idle_interval;

/* Plugin instance */
struct flb_output_instance *ins;
};

#endif
34 changes: 18 additions & 16 deletions plugins/out_websocket/websocket_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,15 @@ struct flb_out_ws *flb_ws_conf_create(struct flb_output_instance *ins,
flb_errno();
return NULL;
}
ctx->ins = ins;

ret = flb_output_config_map_set(ins, (void *) ctx);
if (ret == -1) {
flb_free(ctx);
return NULL;
}

//flb_output_net_default("127.0.0.1", 8080, ins);

/* Check if SSL/TLS is enabled */
#ifdef FLB_HAVE_TLS
Expand All @@ -59,10 +68,6 @@ struct flb_out_ws *flb_ws_conf_create(struct flb_output_instance *ins,
io_flags = FLB_IO_TCP;
#endif

if (ins->flags & FLB_IO_TCP_KA) {
io_flags |= FLB_IO_TCP_KA;
}

upstream = flb_upstream_create(config, ins->host.name, ins->host.port, io_flags, (void *)&ins->tls);
if (!upstream) {
flb_free(ctx);
Expand Down Expand Up @@ -127,23 +132,21 @@ struct flb_out_ws *flb_ws_conf_create(struct flb_output_instance *ins,
uri = tmp_uri;
}

/* Idle Interval */
tmp = flb_output_get_property("idle_interval", ins);
if (!tmp) {
idle_interval = WEBSOCKET_INPUT_IDLE_INTERVAL; /* 20 seconds */
}
else {
/* Convert to integer */
idle_interval = atoi(tmp);
idle_interval = ins->net_setup.keepalive_idle_timeout;
if (idle_interval > 5) {
ctx->idle_interval = idle_interval - 5;
} else if (idle_interval <= 2) {
flb_error("[out_ws] the keepalive timeout value is smaller than 2, which is meaningless! Please set it higher than 10 seconds. Current value will bring disorder for websocket plugin.");
ctx->idle_interval = idle_interval;
} else {
ctx->idle_interval = idle_interval - 2;
}

ctx->u = upstream;
ctx->uri = uri;
ctx->host = ins->host.name;
ctx->port = ins->host.port;
ctx->idle_interval = idle_interval;

/* Set instance flags into upstream */

flb_output_upstream_set(ctx->u, ins);

flb_info("[out_ws] we have following parameter %s, %s, %d, %d", ctx->uri, ctx->host, ctx->port, ctx->idle_interval);
Expand All @@ -166,5 +169,4 @@ void flb_ws_conf_destroy(struct flb_out_ws *ctx)
}
flb_free(ctx->uri);
flb_free(ctx);
ctx = NULL;
}

0 comments on commit 4014194

Please sign in to comment.