Skip to content

Commit

Permalink
out_calyptia: add support for agent patching and session store
Browse files Browse the repository at this point in the history
Signed-off-by: Eduardo Silva <[email protected]>
  • Loading branch information
edsiper committed Aug 16, 2021
1 parent d7703b2 commit 59ddf18
Show file tree
Hide file tree
Showing 2 changed files with 232 additions and 29 deletions.
233 changes: 213 additions & 20 deletions plugins/out_calyptia/calyptia.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <fluent-bit/flb_pack.h>
#include <fluent-bit/flb_version.h>
#include <fluent-bit/flb_metrics.h>
#include <fluent-bit/flb_fstore.h>

#include "calyptia.h"

Expand Down Expand Up @@ -250,6 +251,16 @@ static int calyptia_http_do(struct flb_calyptia *ctx, struct flb_http_client *c,
CALYPTIA_H_PROJECT, sizeof(CALYPTIA_H_PROJECT) - 1,
ctx->api_key, flb_sds_len(ctx->api_key));
}
else if (type == CALYPTIA_ACTION_PATCH) {
flb_http_add_header(c,
CALYPTIA_H_CTYPE, sizeof(CALYPTIA_H_CTYPE) - 1,
CALYPTIA_H_CTYPE_JSON, sizeof(CALYPTIA_H_CTYPE_JSON) - 1);

flb_http_add_header(c,
CALYPTIA_H_AGENT_TOKEN,
sizeof(CALYPTIA_H_AGENT_TOKEN) - 1,
ctx->agent_token, flb_sds_len(ctx->agent_token));
}
else if (type == CALYPTIA_ACTION_METRICS) {
flb_http_add_header(c,
CALYPTIA_H_CTYPE, sizeof(CALYPTIA_H_CTYPE) - 1,
Expand All @@ -272,7 +283,7 @@ static int calyptia_http_do(struct flb_calyptia *ctx, struct flb_http_client *c,
return FLB_RETRY;
}

if (c->resp.status != 200 && c->resp.status != 201) {
if (c->resp.status != 200 && c->resp.status != 201 && c->resp.status != 204) {
if (c->resp.payload_size > 0) {
flb_plg_warn(ctx->ins, "http_status=%i:\n%s",
c->resp.status, c->resp.payload);
Expand Down Expand Up @@ -351,11 +362,146 @@ static flb_sds_t get_agent_info(char *buf, size_t size, char *k)
return v;
}

/* Set the session content */
static int store_session_set(struct flb_calyptia *ctx, char *buf, size_t size)
{
int ret;
int type;
char *mp_buf;
size_t mp_size;

/* remove any previous session file */
if (ctx->fs_file) {
flb_fstore_file_delete(ctx->fs, ctx->fs_file);
}

/* create session file */
ctx->fs_file = flb_fstore_file_create(ctx->fs, ctx->fs_stream,
CALYPTIA_SESSION_FILE, 1024);
if (!ctx->fs_file) {
flb_plg_error(ctx->ins, "could not create new session file");
return -1;
}

/* store meta */
flb_fstore_file_meta_set(ctx->fs, ctx->fs_file,
FLB_VERSION_STR "\n", sizeof(FLB_VERSION_STR) - 1);

/* encode */
ret = flb_pack_json(buf, size, &mp_buf, &mp_size, &type);
if (ret < 0) {
flb_plg_error(ctx->ins, "could not encode session information");
return -1;
}

/* store content */
ret = flb_fstore_file_append(ctx->fs_file, mp_buf, mp_size);
if (ret == -1) {
flb_plg_error(ctx->ins, "could not store session information");
flb_free(mp_buf);
return -1;
}

flb_free(mp_buf);
return 0;
}

static int store_session_get(struct flb_calyptia *ctx,
void **out_buf, size_t *out_size)
{
int ret;
void *buf;
size_t size;
flb_sds_t json;

ret = flb_fstore_file_content_copy(ctx->fs, ctx->fs_file,
&buf, &size);

if (size == 0) {
return -1;
}

/* decode */
json = flb_msgpack_raw_to_json_sds(buf, size);
flb_free(buf);
if (!json) {
return -1;
}

*out_buf = json;
*out_size = flb_sds_len(json);

return ret;
}

static int store_init(struct flb_calyptia *ctx)
{
int ret;
struct flb_fstore *fs;
struct flb_fstore_file *fsf;
void *buf;
size_t size;

/* store context */
fs = flb_fstore_create(ctx->store_path, FLB_FSTORE_FS);
if (!fs) {
flb_plg_error(ctx->ins,
"could not initialize 'store_path': %s",
ctx->store_path);
return -1;
}
ctx->fs = fs;

/* stream */
ctx->fs_stream = flb_fstore_stream_create(ctx->fs, "calyptia");
if (!ctx->fs_stream) {
flb_plg_error(ctx->ins, "could not create storage stream");
return -1;
}

/* lookup any previous file */
fsf = flb_fstore_file_get(ctx->fs, ctx->fs_stream, CALYPTIA_SESSION_FILE,
sizeof(CALYPTIA_SESSION_FILE) - 1);
if (!fsf) {
flb_plg_debug(ctx->ins, "no session file was found");
return 0;
}
ctx->fs_file = fsf;

/* retrieve session info */
ret = store_session_get(ctx, &buf, &size);
if (ret == 0) {
/* agent id */
ctx->agent_id = get_agent_info(buf, size, "id");

/* agent token */
ctx->agent_token = get_agent_info(buf, size, "token");

if (ctx->agent_id && ctx->agent_token) {
flb_plg_info(ctx->ins, "session setup OK");
}
else {
if (ctx->agent_id) {
flb_sds_destroy(ctx->agent_id);
}
if (ctx->agent_token) {
flb_sds_destroy(ctx->agent_token);
}
}
flb_sds_destroy(buf);
}

return 0;
}

/* Agent creation is perform on initialization using a sync upstream connection */
static int api_agent_create(struct flb_config *config, struct flb_calyptia *ctx)
{
int ret;
int flb_ret;
int flags;
int action = CALYPTIA_ACTION_REGISTER;
char uri[1024];
flb_sds_t meta;
struct flb_upstream *u;
struct flb_upstream_conn *u_conn;
Expand Down Expand Up @@ -391,31 +537,61 @@ static int api_agent_create(struct flb_config *config, struct flb_calyptia *ctx)
return -1;
}

/* Compose HTTP Client request */
c = flb_http_client(u_conn, FLB_HTTP_POST, CALYPTIA_ENDPOINT_CREATE,
meta, flb_sds_len(meta), NULL, 0, NULL, 0);
if (ctx->agent_id && ctx->agent_token) {
/* Patch */
action = CALYPTIA_ACTION_PATCH;
snprintf(uri, sizeof(uri) - 1, CALYPTIA_ENDPOINT_PATCH, ctx->agent_id);
c = flb_http_client(u_conn, FLB_HTTP_PATCH, uri,
meta, flb_sds_len(meta), NULL, 0, NULL, 0);
}
else {
/* Create */
action = CALYPTIA_ACTION_REGISTER;
c = flb_http_client(u_conn, FLB_HTTP_POST, CALYPTIA_ENDPOINT_CREATE,
meta, flb_sds_len(meta), NULL, 0, NULL, 0);
}

if (!c) {
flb_upstream_conn_release(u_conn);
flb_upstream_destroy(u);
return -1;
}

/* perform requst */
ret = calyptia_http_do(ctx, c, CALYPTIA_ACTION_REGISTER);
if (ret == FLB_OK && (c->resp.status == 200 || c->resp.status == 201)) {
/* perform request */
flb_ret = calyptia_http_do(ctx, c, action);
if (flb_ret == FLB_OK &&
(c->resp.status == 200 || c->resp.status == 201 || c->resp.status == 204)) {
if (c->resp.payload_size > 0) {
/* agent id */
ctx->agent_id = get_agent_info(c->resp.payload,
c->resp.payload_size,
"id");

/* agent token */
ctx->agent_token = get_agent_info(c->resp.payload,
c->resp.payload_size,
"token");

flb_plg_info(ctx->ins, "connected to Calyptia, agent_id='%s'",
ctx->agent_id);
if (action == CALYPTIA_ACTION_REGISTER) {
/* agent id */
ctx->agent_id = get_agent_info(c->resp.payload,
c->resp.payload_size,
"id");

/* agent token */
ctx->agent_token = get_agent_info(c->resp.payload,
c->resp.payload_size,
"token");

if (ctx->agent_id && ctx->agent_token) {
flb_plg_info(ctx->ins, "connected to Calyptia, agent_id='%s'",
ctx->agent_id);

if (ctx->store_path && ctx->fs) {
ret = store_session_set(ctx,
c->resp.payload,
c->resp.payload_size);
if (ret == -1) {
flb_plg_warn(ctx->ins,
"could not store Calyptia session");
}
}
}
}
}

if (action == CALYPTIA_ACTION_PATCH) {
flb_plg_info(ctx->ins, "known agent registration successful");
}
}

Expand All @@ -425,7 +601,7 @@ static int api_agent_create(struct flb_config *config, struct flb_calyptia *ctx)
flb_upstream_conn_release(u_conn);
flb_upstream_destroy(u);

return ret;
return flb_ret;
}

static struct flb_calyptia *config_init(struct flb_output_instance *ins,
Expand Down Expand Up @@ -473,6 +649,14 @@ static struct flb_calyptia *config_init(struct flb_output_instance *ins,
/* Set context */
flb_output_set_context(ins, ctx);

/* Initialize optional storage */
if (ctx->store_path) {
ret = store_init(ctx);
if (ret == -1) {
return NULL;
}
}

/* machine id */
ret = get_machine_id(ctx, &machine_id, &size);
if (ret == -1) {
Expand Down Expand Up @@ -658,6 +842,9 @@ static int cb_calyptia_exit(void *data, struct flb_config *config)
flb_sds_destroy(ctx->metrics_endpoint);
}

if (ctx->fs) {
flb_fstore_destroy(ctx->fs);
}
flb_free(ctx);

return 0;
Expand All @@ -683,6 +870,12 @@ static struct flb_config_map config_map[] = {
"Calyptia Cloud API Key."
},

{
FLB_CONFIG_MAP_STR, "store_path", NULL,
0, FLB_TRUE, offsetof(struct flb_calyptia, store_path),
""
},

/* EOF */
{0}
};
Expand Down
28 changes: 19 additions & 9 deletions plugins/out_calyptia/calyptia.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <fluent-bit/flb_output_plugin.h>
#include <fluent-bit/flb_upstream.h>
#include <fluent-bit/flb_env.h>
#include <fluent-bit/flb_fstore.h>
#include <mbedtls/sha256.h>

/* End point */
Expand All @@ -32,12 +33,17 @@

/* HTTP action types */
#define CALYPTIA_ACTION_REGISTER 0
#define CALYPTIA_ACTION_METRICS 1
#define CALYPTIA_ACTION_PATCH 1
#define CALYPTIA_ACTION_METRICS 2

/* Endpoints */
#define CALYPTIA_ENDPOINT_CREATE "/v1/agents"
#define CALYPTIA_ENDPOINT_PATCH "/v1/agents/%s"
#define CALYPTIA_ENDPOINT_METRICS "/v1/agents/%s/metrics"

/* Storage */
#define CALYPTIA_SESSION_FILE "session.CALYPTIA"

/* Headers */
#define CALYPTIA_H_PROJECT "X-Project-Token"
#define CALYPTIA_H_AGENT_TOKEN "X-Agent-Token"
Expand All @@ -48,20 +54,24 @@
struct flb_calyptia {
/* config map */
int cloud_port;
flb_sds_t cloud_host;
flb_sds_t api_key;
flb_sds_t cloud_host;
flb_sds_t store_path;
struct mk_list *add_labels;

/* internal */
flb_sds_t agent_id;
flb_sds_t agent_token;
flb_sds_t machine_id; /* machine-id */
flb_sds_t metrics_endpoint; /* metrics endpoint */
struct flb_env *env; /* environment */
struct flb_upstream *u; /* upstream connection */
struct mk_list kv_labels; /* parsed add_labels */
struct flb_output_instance *ins; /* plugin instance */
struct flb_config *config; /* Fluent Bit context */
flb_sds_t machine_id; /* machine-id */
flb_sds_t metrics_endpoint; /* metrics endpoint */
struct flb_fstore *fs; /* fstore ctx */
struct flb_fstore_stream *fs_stream; /* fstore stream */
struct flb_fstore_file *fs_file; /* fstore session file */
struct flb_env *env; /* environment */
struct flb_upstream *u; /* upstream connection */
struct mk_list kv_labels; /* parsed add_labels */
struct flb_output_instance *ins; /* plugin instance */
struct flb_config *config; /* Fluent Bit context */
};

#endif

0 comments on commit 59ddf18

Please sign in to comment.