Skip to content

Commit

Permalink
support retention for cloudwatch_logs output
Browse files Browse the repository at this point in the history
  • Loading branch information
shogo82148 committed Oct 31, 2020
1 parent 4cace3d commit 02d3169
Show file tree
Hide file tree
Showing 4 changed files with 220 additions and 1 deletion.
86 changes: 85 additions & 1 deletion plugins/out_cloudwatch_logs/cloudwatch_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ static struct flb_aws_header create_group_header = {
.val_len = 28,
};

static struct flb_aws_header put_retention_policy_header = {
.key = "X-Amz-Target",
.key_len = 12,
.val = "Logs_20140328.PutRetentionPolicy",
.val_len = 32,
};

static struct flb_aws_header create_stream_header = {
.key = "X-Amz-Target",
.key_len = 12,
Expand Down Expand Up @@ -1013,6 +1020,83 @@ struct log_stream *get_log_stream(struct flb_cloudwatch *ctx,
return get_dynamic_log_stream(ctx, tag, tag_len);
}

static int set_log_group_retention(struct flb_cloudwatch *ctx)
{
if (ctx->log_retention_days <= 0) {
/* no need to set */
return 0;
}

struct flb_http_client *c = NULL;
struct flb_aws_client *cw_client;
flb_sds_t body;
flb_sds_t tmp;
flb_sds_t error;

flb_plg_info(ctx->ins, "Set retention policy on log group %s to %dd", ctx->log_group, ctx->log_retention_days);

body = flb_sds_create_size(68 + strlen(ctx->log_group));
if (!body) {
flb_sds_destroy(body);
flb_errno();
return -1;
}

/* construct CreateLogGroup request body */
tmp = flb_sds_printf(&body, "{\"logGroupName\":\"%s\",\"retentionInDays\":%d}", ctx->log_group, ctx->log_retention_days);
if (!tmp) {
flb_sds_destroy(body);
flb_errno();
return -1;
}
body = tmp;

if (plugin_under_test() == FLB_TRUE) {
c = mock_http_call("TEST_PUT_RETENTION_POLICY_ERROR", "PutRetentionPolicy");
}
else {
cw_client = ctx->cw_client;
c = cw_client->client_vtable->request(cw_client, FLB_HTTP_POST,
"/", body, strlen(body),
&put_retention_policy_header, 1);
}

if (c) {
flb_plg_debug(ctx->ins, "PutRetentionPolicy http status=%d", c->resp.status);

if (c->resp.status == 200) {
/* success */
flb_plg_info(ctx->ins, "Set retention policy to %d", ctx->log_retention_days);
flb_sds_destroy(body);
flb_http_client_destroy(c);
return 0;
}

/* Check error */
if (c->resp.payload_size > 0) {
error = flb_aws_error(c->resp.payload, c->resp.payload_size);
if (error != NULL) {
/* some other error occurred; notify user */
flb_aws_print_error(c->resp.payload, c->resp.payload_size,
"PutRetentionPolicy", ctx->ins);
flb_sds_destroy(error);
}
else {
/* error can not be parsed, print raw response to debug */
flb_plg_debug(ctx->ins, "Raw response: %s", c->resp.payload);
}
}
}

flb_plg_error(ctx->ins, "Failed to putRetentionPolicy");
if (c) {
flb_http_client_destroy(c);
}
flb_sds_destroy(body);

return 0;
}

int create_log_group(struct flb_cloudwatch *ctx)
{
struct flb_http_client *c = NULL;
Expand Down Expand Up @@ -1058,7 +1142,7 @@ int create_log_group(struct flb_cloudwatch *ctx)
ctx->group_created = FLB_TRUE;
flb_sds_destroy(body);
flb_http_client_destroy(c);
return 0;
return set_log_group_retention(ctx);
}

/* Check error */
Expand Down
13 changes: 13 additions & 0 deletions plugins/out_cloudwatch_logs/cloudwatch_logs.c
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,12 @@ static int cb_cloudwatch_init(struct flb_output_instance *ins,
ctx->create_group = FLB_TRUE;
}

ctx->log_retention_days = 0;
tmp = flb_output_get_property("log_retention_days", ins);
if (tmp) {
ctx->log_retention_days = atoi(tmp);
}

tmp = flb_output_get_property("role_arn", ins);
if (tmp) {
ctx->role_arn = tmp;
Expand Down Expand Up @@ -528,6 +534,13 @@ static struct flb_config_map config_map[] = {
" be created)"
},

{
FLB_CONFIG_MAP_INT, "log_retention_days", "0",
0, FLB_FALSE, 0,
"If set to a number greater than zero, and newly create log group's "
"retention policy is set to this many days."
},

{
FLB_CONFIG_MAP_STR, "endpoint", NULL,
0, FLB_FALSE, 0,
Expand Down
3 changes: 3 additions & 0 deletions plugins/out_cloudwatch_logs/cloudwatch_logs.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ struct flb_cloudwatch {
/* Should the plugin create the log group */
int create_group;

/* If set to a number greater than zero, and newly create log group's retention policy is set to this many days. */
int log_retention_days;

/* has the log group successfully been created */
int group_created;

Expand Down
119 changes: 119 additions & 0 deletions tests/runtime/out_cloudwatch.c
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,122 @@ void flb_test_cloudwatch_error_put_log_events(void)
flb_destroy(ctx);
}

void flb_test_cloudwatch_put_retention_policy_success(void)
{
int ret;
flb_ctx_t *ctx;
int in_ffd;
int out_ffd;

/* mocks calls- signals that we are in test mode */
setenv("FLB_CLOUDWATCH_PLUGIN_UNDER_TEST", "true", 1);

ctx = flb_create();

in_ffd = flb_input(ctx, (char *) "lib", NULL);
TEST_CHECK(in_ffd >= 0);
flb_input_set(ctx,in_ffd, "tag", "test", NULL);

out_ffd = flb_output(ctx, (char *) "cloudwatch_logs", NULL);
TEST_CHECK(out_ffd >= 0);
flb_output_set(ctx, out_ffd,"match", "test", NULL);
flb_output_set(ctx, out_ffd,"region", "us-west-2", NULL);
flb_output_set(ctx, out_ffd,"log_group_name", "fluent", NULL);
flb_output_set(ctx, out_ffd,"log_stream_prefix", "from-fluent-", NULL);
flb_output_set(ctx, out_ffd,"auto_create_group", "On", NULL);
flb_output_set(ctx, out_ffd,"log_retention_days", "14", NULL);
flb_output_set(ctx, out_ffd,"net.keepalive", "Off", NULL);
flb_output_set(ctx, out_ffd,"Retry_Limit", "1", NULL);

ret = flb_start(ctx);
TEST_CHECK(ret == 0);

flb_lib_push(ctx, in_ffd, (char *) JSON_TD , (int) sizeof(JSON_TD) - 1);

sleep(2);
flb_stop(ctx);
flb_destroy(ctx);
}

void flb_test_cloudwatch_already_exists_create_group_put_retention_policy(void)
{
int ret;
flb_ctx_t *ctx;
int in_ffd;
int out_ffd;

/* mocks calls- signals that we are in test mode */
setenv("FLB_CLOUDWATCH_PLUGIN_UNDER_TEST", "true", 1);
setenv("TEST_CREATE_LOG_GROUP_ERROR", ERROR_ALREADY_EXISTS, 1);

/* PutRetentionPolicy is not called if the group already exists */
setenv("TEST_PUT_RETENTION_POLICY_ERROR", ERROR_UNKNOWN, 1);

ctx = flb_create();

in_ffd = flb_input(ctx, (char *) "lib", NULL);
TEST_CHECK(in_ffd >= 0);
flb_input_set(ctx,in_ffd, "tag", "test", NULL);

out_ffd = flb_output(ctx, (char *) "cloudwatch_logs", NULL);
TEST_CHECK(out_ffd >= 0);
flb_output_set(ctx, out_ffd,"match", "test", NULL);
flb_output_set(ctx, out_ffd,"region", "us-west-2", NULL);
flb_output_set(ctx, out_ffd,"log_group_name", "fluent", NULL);
flb_output_set(ctx, out_ffd,"log_stream_prefix", "from-fluent-", NULL);
flb_output_set(ctx, out_ffd,"auto_create_group", "On", NULL);
flb_output_set(ctx, out_ffd,"log_retention_days", "14", NULL);
flb_output_set(ctx, out_ffd,"net.keepalive", "Off", NULL);
flb_output_set(ctx, out_ffd,"Retry_Limit", "1", NULL);

ret = flb_start(ctx);
TEST_CHECK(ret == 0);

flb_lib_push(ctx, in_ffd, (char *) JSON_TD , (int) sizeof(JSON_TD) - 1);

sleep(2);
flb_stop(ctx);
flb_destroy(ctx);
}

void flb_test_cloudwatch_error_put_retention_policy(void)
{
int ret;
flb_ctx_t *ctx;
int in_ffd;
int out_ffd;

/* mocks calls- signals that we are in test mode */
setenv("FLB_CLOUDWATCH_PLUGIN_UNDER_TEST", "true", 1);
setenv("TEST_PUT_RETENTION_POLICY_ERROR", ERROR_UNKNOWN, 1);

ctx = flb_create();

in_ffd = flb_input(ctx, (char *) "lib", NULL);
TEST_CHECK(in_ffd >= 0);
flb_input_set(ctx,in_ffd, "tag", "test", NULL);

out_ffd = flb_output(ctx, (char *) "cloudwatch_logs", NULL);
TEST_CHECK(out_ffd >= 0);
flb_output_set(ctx, out_ffd,"match", "test", NULL);
flb_output_set(ctx, out_ffd,"region", "us-west-2", NULL);
flb_output_set(ctx, out_ffd,"log_group_name", "fluent", NULL);
flb_output_set(ctx, out_ffd,"log_stream_prefix", "from-fluent-", NULL);
flb_output_set(ctx, out_ffd,"auto_create_group", "On", NULL);
flb_output_set(ctx, out_ffd,"log_retention_days", "14", NULL);
flb_output_set(ctx, out_ffd,"net.keepalive", "Off", NULL);
flb_output_set(ctx, out_ffd,"Retry_Limit", "1", NULL);

ret = flb_start(ctx);
TEST_CHECK(ret == 0);

flb_lib_push(ctx, in_ffd, (char *) JSON_TD , (int) sizeof(JSON_TD) - 1);

sleep(2);
flb_stop(ctx);
flb_destroy(ctx);
}

/* Test list */
TEST_LIST = {
{"success", flb_test_cloudwatch_success },
Expand All @@ -239,5 +355,8 @@ TEST_LIST = {
{"create_group_error", flb_test_cloudwatch_error_create_group },
{"create_stream_error", flb_test_cloudwatch_error_create_stream },
{"put_log_events_error", flb_test_cloudwatch_error_put_log_events },
{"put_retention_policy_success", flb_test_cloudwatch_put_retention_policy_success },
{"already_exists_create_group_put_retention_policy", flb_test_cloudwatch_already_exists_create_group_put_retention_policy },
{"error_put_retention_policy", flb_test_cloudwatch_error_put_retention_policy },
{NULL, NULL}
};

0 comments on commit 02d3169

Please sign in to comment.