From 6db963cfad79dbdd612f95d829e945cfe1fff8d1 Mon Sep 17 00:00:00 2001 From: shuaichen Date: Tue, 21 May 2024 11:37:43 -0700 Subject: [PATCH] stackdriver: Support writing to textPayload field of Cloud Logging LogEntry. Write payload to textPayload field of LogEntry if the text_payload_key is string format and the only field after stripping special fields. Signed-off-by: shuaichen --- plugins/out_stackdriver/stackdriver.c | 102 ++++--- plugins/out_stackdriver/stackdriver.h | 3 + .../stackdriver/stackdriver_test_payload.h | 26 ++ tests/runtime/out_stackdriver.c | 249 +++++++++++++++++- 4 files changed, 345 insertions(+), 35 deletions(-) create mode 100644 tests/runtime/data/stackdriver/stackdriver_test_payload.h diff --git a/plugins/out_stackdriver/stackdriver.c b/plugins/out_stackdriver/stackdriver.c index 503fc9e68bd..b21f5b9476c 100644 --- a/plugins/out_stackdriver/stackdriver.c +++ b/plugins/out_stackdriver/stackdriver.c @@ -372,7 +372,7 @@ static flb_sds_t get_google_token(struct flb_stackdriver *ctx) if (time(NULL) >= cached_expiration) { return output; } else { - /* + /* * Cached token is expired. Wait on lock to use up-to-date token * by either waiting for it to be refreshed or refresh it ourselves. */ @@ -1068,7 +1068,7 @@ static int pack_resource_labels(struct flb_stackdriver *ctx, if (rval != NULL && rval->o.type == MSGPACK_OBJECT_STR) { flb_mp_map_header_append(mh); msgpack_pack_str(mp_pck, flb_sds_len(label_kv->key)); - msgpack_pack_str_body(mp_pck, label_kv->key, + msgpack_pack_str_body(mp_pck, label_kv->key, flb_sds_len(label_kv->key)); msgpack_pack_str(mp_pck, flb_sds_len(rval->val.string)); msgpack_pack_str_body(mp_pck, rval->val.string, @@ -1082,7 +1082,7 @@ static int pack_resource_labels(struct flb_stackdriver *ctx, } else { flb_mp_map_header_append(mh); msgpack_pack_str(mp_pck, flb_sds_len(label_kv->key)); - msgpack_pack_str_body(mp_pck, label_kv->key, + msgpack_pack_str_body(mp_pck, label_kv->key, flb_sds_len(label_kv->key)); msgpack_pack_str(mp_pck, flb_sds_len(label_kv->val)); msgpack_pack_str_body(mp_pck, label_kv->val, @@ -1284,7 +1284,7 @@ static int cb_stackdriver_init(struct flb_output_instance *ins, return -1; } - if (ctx->resource_type != RESOURCE_TYPE_GENERIC_NODE + if (ctx->resource_type != RESOURCE_TYPE_GENERIC_NODE && ctx->resource_type != RESOURCE_TYPE_GENERIC_TASK) { ret = gce_metadata_read_zone(ctx); if (ret == -1) { @@ -1434,13 +1434,13 @@ static int get_trace_sampled(int * trace_sampled_value, const msgpack_object * s { msgpack_object tmp; int ret = get_msgpack_obj(&tmp, src_obj, key, flb_sds_len(key), MSGPACK_OBJECT_BOOLEAN); - + if (ret == 0 && tmp.via.boolean == true) { *trace_sampled_value = FLB_TRUE; return 0; } else if (ret == 0 && tmp.via.boolean == false) { *trace_sampled_value = FLB_FALSE; - return 0; + return 0; } return -1; @@ -1476,15 +1476,16 @@ static insert_id_status validate_insert_id(msgpack_object * insert_id_value, return ret; } -static int pack_json_payload(int insert_id_extracted, - int operation_extracted, int operation_extra_size, - int source_location_extracted, - int source_location_extra_size, - int http_request_extracted, - int http_request_extra_size, - timestamp_status tms_status, - msgpack_packer *mp_pck, msgpack_object *obj, - struct flb_stackdriver *ctx) +static int pack_payload(int insert_id_extracted, + int operation_extracted, + int operation_extra_size, + int source_location_extracted, + int source_location_extra_size, + int http_request_extracted, + int http_request_extra_size, + timestamp_status tms_status, + msgpack_packer *mp_pck, msgpack_object *obj, + struct flb_stackdriver *ctx) { /* Specified fields include local_resource_id, operation, sourceLocation ... */ int i, j; @@ -1495,10 +1496,14 @@ static int pack_json_payload(int insert_id_extracted, int len; int len_to_be_removed; int key_not_found; + int text_payload_len = 0; + int is_string_text_payload = FLB_FALSE; + int write_to_textpayload_field = FLB_FALSE; flb_sds_t removed; flb_sds_t monitored_resource_key; flb_sds_t local_resource_id_key; flb_sds_t stream; + flb_sds_t text_payload = NULL; msgpack_object_kv *kv = obj->via.map.ptr; msgpack_object_kv *const kvend = obj->via.map.ptr + obj->via.map.size; @@ -1565,14 +1570,36 @@ static int pack_json_payload(int insert_id_extracted, new_map_size = map_size - to_remove; - ret = msgpack_pack_map(mp_pck, new_map_size); - if (ret < 0) { - goto error; + if (ctx->text_payload_key && get_string(&text_payload, obj, ctx->text_payload_key) == 0) { + is_string_text_payload = FLB_TRUE; + } + + /* write to textPayload if text_payload_key is the only residual string field*/ + if ((new_map_size == 1) && is_string_text_payload) { + write_to_textpayload_field = FLB_TRUE; + } + + if (write_to_textpayload_field) { + msgpack_pack_str(mp_pck, 11); + msgpack_pack_str_body(mp_pck, "textPayload", 11); + + text_payload_len = flb_sds_len(text_payload); + msgpack_pack_str(mp_pck, text_payload_len); + msgpack_pack_str_body(mp_pck, text_payload, text_payload_len); + } else { + /* jsonPayload */ + msgpack_pack_str(mp_pck, 11); + msgpack_pack_str_body(mp_pck, "jsonPayload", 11); + + ret = msgpack_pack_map(mp_pck, new_map_size); + if (ret < 0) { + goto error; + } } /* points back to the beginning of map */ kv = obj->via.map.ptr; - for(; kv != kvend; ++kv ) { + for(; kv != kvend; ++kv) { key_not_found = 1; /* processing logging.googleapis.com/insertId */ @@ -1639,7 +1666,8 @@ static int pack_json_payload(int insert_id_extracted, } } - if (key_not_found) { + /* write residual log fields to jsonPayload */ + if (key_not_found && !write_to_textpayload_field) { ret = msgpack_pack_object(mp_pck, kv->key); if (ret < 0) { goto error; @@ -1654,12 +1682,14 @@ static int pack_json_payload(int insert_id_extracted, flb_sds_destroy(monitored_resource_key); flb_sds_destroy(local_resource_id_key); flb_sds_destroy(stream); + flb_sds_destroy(text_payload); return 0; error: flb_sds_destroy(monitored_resource_key); flb_sds_destroy(local_resource_id_key); flb_sds_destroy(stream); + flb_sds_destroy(text_payload); return ret; } @@ -1821,7 +1851,7 @@ static flb_sds_t stackdriver_format(struct flb_stackdriver *ctx, msgpack_pack_str_body(&mp_pck, "labels", 6); ret = pack_resource_labels(ctx, &mh, &mp_pck, data, bytes); - if (ret != 0) { + if (ret != 0) { if (ctx->resource_type == RESOURCE_TYPE_K8S) { ret = extract_local_resource_id(data, bytes, ctx, tag); if (ret != 0) { @@ -2314,7 +2344,7 @@ static flb_sds_t stackdriver_format(struct flb_stackdriver *ctx, /* Extract httpRequest */ init_http_request(&http_request); http_request_extra_size = 0; - http_request_extracted = extract_http_request(&http_request, + http_request_extracted = extract_http_request(&http_request, ctx->http_request_key, ctx->http_request_key_size, obj, &http_request_extra_size); @@ -2432,17 +2462,16 @@ static flb_sds_t stackdriver_format(struct flb_stackdriver *ctx, flb_sds_destroy(source_location_function); destroy_http_request(&http_request); - /* jsonPayload */ - msgpack_pack_str(&mp_pck, 11); - msgpack_pack_str_body(&mp_pck, "jsonPayload", 11); - pack_json_payload(insert_id_extracted, - operation_extracted, operation_extra_size, - source_location_extracted, - source_location_extra_size, - http_request_extracted, - http_request_extra_size, - tms_status, - &mp_pck, obj, ctx); + /* both textPayload and jsonPayload are supported */ + pack_payload(insert_id_extracted, + operation_extracted, + operation_extra_size, + source_location_extracted, + source_location_extra_size, + http_request_extracted, + http_request_extra_size, + tms_status, + &mp_pck, obj, ctx); /* avoid modifying the original tag */ newtag = tag; @@ -2594,7 +2623,7 @@ static void update_retry_metric(struct flb_stackdriver *ctx, uint64_t ts, int http_status) { - char tmp[32]; + char tmp[32]; char *name = (char *) flb_output_name(ctx->ins); /* convert status to string format */ @@ -3154,6 +3183,11 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct flb_stackdriver, resource_labels), "Set the resource labels" }, + { + FLB_CONFIG_MAP_STR, "text_payload_key", (char *)NULL, + 0, FLB_TRUE, offsetof(struct flb_stackdriver, text_payload_key), + "Set key for extracting text payload" + }, { FLB_CONFIG_MAP_BOOL, "test_log_entry_format", "false", 0, FLB_TRUE, offsetof(struct flb_stackdriver, test_log_entry_format), diff --git a/plugins/out_stackdriver/stackdriver.h b/plugins/out_stackdriver/stackdriver.h index 2a645c16402..76f5a7598ea 100644 --- a/plugins/out_stackdriver/stackdriver.h +++ b/plugins/out_stackdriver/stackdriver.h @@ -208,6 +208,9 @@ struct flb_stackdriver { /* upstream context for metadata end-point */ struct flb_upstream *metadata_u; + /* the key to extract unstructured text payload from */ + flb_sds_t text_payload_key; + #ifdef FLB_HAVE_METRICS /* metrics */ struct cmt_counter *cmt_successful_requests; diff --git a/tests/runtime/data/stackdriver/stackdriver_test_payload.h b/tests/runtime/data/stackdriver/stackdriver_test_payload.h new file mode 100644 index 00000000000..75b771f449e --- /dev/null +++ b/tests/runtime/data/stackdriver/stackdriver_test_payload.h @@ -0,0 +1,26 @@ +#define STRING_TEXT_PAYLOAD "[" \ + "1595349600," \ + "{" \ + "\"message\": \"The application errored out\"," \ + "\"logging.googleapis.com/severity\": \"ERROR\"" \ + "}]" + +#define STRING_TEXT_PAYLOAD_WITH_RESIDUAL_FIELDS "[" \ + "1595349600," \ + "{" \ + "\"message\": \"The application errored out\"," \ + "\"logging.googleapis.com/severity\": \"ERROR\"," \ + "\"errorCode\": \"400\"" \ + "}]" + +#define NON_SCALAR_PAYLOAD_WITH_RESIDUAL_FIELDS "[" \ + "1595349600," \ + "{" \ + "\"message\": " \ + "{" \ + "\"application_name\": \"my_application\"," \ + "\"error_message\": \"The application errored out\"," \ + "}," \ + "\"logging.googleapis.com/severity\": \"ERROR\"," \ + "\"errorCode\": \"400\"" \ + "}]" diff --git a/tests/runtime/out_stackdriver.c b/tests/runtime/out_stackdriver.c index 585e734478d..2bd79f31aaf 100644 --- a/tests/runtime/out_stackdriver.c +++ b/tests/runtime/out_stackdriver.c @@ -44,7 +44,7 @@ #include "data/stackdriver/stackdriver_test_http_request.h" #include "data/stackdriver/stackdriver_test_timestamp.h" #include "data/stackdriver/stackdriver_test_monitored_resource.h" - +#include "data/stackdriver/stackdriver_test_payload.h" /* * Fluent Bit Stackdriver plugin, always set as payload a JSON strings contained in a @@ -2292,6 +2292,85 @@ static void cb_check_timestamp_format_duo_fields_incorrect_type(void *ctx, int f flb_sds_destroy(res_data); } +static void cb_check_string_text_payload_with_matched_text_payload_key(void *ctx, int ffd, + int res_ret, void *res_data, size_t res_size, + void *data) +{ + int ret; + + ret = mp_kv_cmp(res_data, res_size, "$entries[0]['timestamp']", "2020-07-21T16:40:00.000000000Z"); + TEST_CHECK(ret == FLB_TRUE); + + ret = mp_kv_cmp_integer(res_data, res_size, "$entries[0]['severity']", 500); + TEST_CHECK(ret == FLB_TRUE); + + /* check payload is written to textPayload field */ + ret = mp_kv_cmp(res_data, res_size, "$entries[0]['textPayload']", "The application errored out"); + TEST_CHECK(ret == FLB_TRUE); + + flb_sds_destroy(res_data); +} + +static void cb_check_string_text_payload_with_mismatched_text_payload_key(void *ctx, int ffd, + int res_ret, void *res_data, size_t res_size, + void *data) +{ + int ret; + + ret = mp_kv_cmp(res_data, res_size, "$entries[0]['timestamp']", "2020-07-21T16:40:00.000000000Z"); + TEST_CHECK(ret == FLB_TRUE); + + ret = mp_kv_cmp_integer(res_data, res_size, "$entries[0]['severity']", 500); + TEST_CHECK(ret == FLB_TRUE); + + /* check payload is written to jsonPayload field */ + ret = mp_kv_cmp(res_data, res_size, "$entries[0]['jsonPayload']['message']", "The application errored out"); + TEST_CHECK(ret == FLB_TRUE); + + flb_sds_destroy(res_data); +} + +static void cb_check_string_text_payload_with_residual_fields(void *ctx, int ffd, + int res_ret, void *res_data, size_t res_size, + void *data) +{ + int ret; + + ret = mp_kv_cmp(res_data, res_size, "$entries[0]['timestamp']", "2020-07-21T16:40:00.000000000Z"); + TEST_CHECK(ret == FLB_TRUE); + + ret = mp_kv_cmp_integer(res_data, res_size, "$entries[0]['severity']", 500); + TEST_CHECK(ret == FLB_TRUE); + + /* check payload is written to jsonPayload field */ + ret = mp_kv_cmp(res_data, res_size, "$entries[0]['jsonPayload']['message']", "The application errored out"); + ret = mp_kv_cmp(res_data, res_size, "$entries[0]['jsonPayload']['errorCode']", "400"); + TEST_CHECK(ret == FLB_TRUE); + + flb_sds_destroy(res_data); +} + +static void cb_check_non_scalar_payload_with_residual_fields(void *ctx, int ffd, + int res_ret, void *res_data, size_t res_size, + void *data) +{ + int ret; + + ret = mp_kv_cmp(res_data, res_size, "$entries[0]['timestamp']", "2020-07-21T16:40:00.000000000Z"); + TEST_CHECK(ret == FLB_TRUE); + + ret = mp_kv_cmp_integer(res_data, res_size, "$entries[0]['severity']", 500); + TEST_CHECK(ret == FLB_TRUE); + + /* check payload is written to jsonPayload field */ + ret = mp_kv_cmp(res_data, res_size, "$entries[0]['jsonPayload']['application_name']", "my_application"); + ret = mp_kv_cmp(res_data, res_size, "$entries[0]['jsonPayload']['message']", "The application errored out"); + ret = mp_kv_cmp(res_data, res_size, "$entries[0]['jsonPayload']['errorCode']", "400"); + TEST_CHECK(ret == FLB_TRUE); + + flb_sds_destroy(res_data); +} + void flb_test_monitored_resource_common() { int ret; @@ -6294,6 +6373,170 @@ void flb_test_timestamp_format_duo_fields_incorrect_type() flb_destroy(ctx); } +void flb_test_string_text_payload_with_matched_text_payload_key() +{ + int ret; + int size = sizeof(STRING_TEXT_PAYLOAD) - 1; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Stackdriver output */ + out_ffd = flb_output(ctx, (char *) "stackdriver", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + "resource", "gce_instance", + "text_payload_key", "message", + NULL); + + /* Enable test mode */ + ret = flb_output_set_test(ctx, out_ffd, "formatter", + cb_check_string_text_payload_with_matched_text_payload_key, + NULL, NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + flb_lib_push(ctx, in_ffd, (char *) STRING_TEXT_PAYLOAD, size); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_string_text_payload_with_mismatched_text_payload_key() +{ + int ret; + int size = sizeof(STRING_TEXT_PAYLOAD) - 1; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Stackdriver output */ + out_ffd = flb_output(ctx, (char *) "stackdriver", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + "resource", "gce_instance", + "text_payload_key", "msg", + NULL); + + /* Enable test mode */ + ret = flb_output_set_test(ctx, out_ffd, "formatter", + cb_check_string_text_payload_with_mismatched_text_payload_key, + NULL, NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + flb_lib_push(ctx, in_ffd, (char *) STRING_TEXT_PAYLOAD, size); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_string_text_payload_with_residual_fields() +{ + int ret; + int size = sizeof(STRING_TEXT_PAYLOAD) - 1; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Stackdriver output */ + out_ffd = flb_output(ctx, (char *) "stackdriver", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + "resource", "gce_instance", + "text_payload_key", "message", + NULL); + + /* Enable test mode */ + ret = flb_output_set_test(ctx, out_ffd, "formatter", + cb_check_string_text_payload_with_residual_fields, + NULL, NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + flb_lib_push(ctx, in_ffd, (char *) STRING_TEXT_PAYLOAD_WITH_RESIDUAL_FIELDS, size); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_non_scalar_payload_with_residual_fields() +{ + int ret; + int size = sizeof(STRING_TEXT_PAYLOAD) - 1; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Stackdriver output */ + out_ffd = flb_output(ctx, (char *) "stackdriver", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + "resource", "gce_instance", + "text_payload_key", "message", + NULL); + + /* Enable test mode */ + ret = flb_output_set_test(ctx, out_ffd, "formatter", + cb_check_non_scalar_payload_with_residual_fields, + NULL, NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + flb_lib_push(ctx, in_ffd, (char *) NON_SCALAR_PAYLOAD_WITH_RESIDUAL_FIELDS, size); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + /* Test list */ TEST_LIST = { {"severity_multi_entries", flb_test_multi_entries_severity }, @@ -6424,5 +6667,9 @@ TEST_LIST = { {"timestamp_format_duo_fields_missing_nanos", flb_test_timestamp_format_duo_fields_missing_nanos}, {"timestamp_format_duo_fields_incorrect_type", flb_test_timestamp_format_duo_fields_incorrect_type}, + {"string_text_payload_with_matched_text_payload_key", flb_test_string_text_payload_with_matched_text_payload_key}, + {"string_text_payload_with_mismatched_text_payload_key", flb_test_string_text_payload_with_mismatched_text_payload_key}, + {"string_text_payload_with_residual_fields", flb_test_string_text_payload_with_residual_fields}, + {"non_scalar_payload_with_residual_fields", flb_test_non_scalar_payload_with_residual_fields}, {NULL, NULL} };