Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

out_stackdriver: Support writing to textPayload field of Cloud Logging LogEntry. #8850

Merged
merged 1 commit into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 68 additions & 34 deletions plugins/out_stackdriver/stackdriver.c
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ static flb_sds_t get_google_token(struct flb_stackdriver *ctx)
if (time(NULL) >= cached_expiration) {
return output;
} else {
/*
/*
* Cached token is expired. Wait on lock to use up-to-date token
* by either waiting for it to be refreshed or refresh it ourselves.
*/
Expand Down Expand Up @@ -1068,7 +1068,7 @@ static int pack_resource_labels(struct flb_stackdriver *ctx,
if (rval != NULL && rval->o.type == MSGPACK_OBJECT_STR) {
flb_mp_map_header_append(mh);
msgpack_pack_str(mp_pck, flb_sds_len(label_kv->key));
msgpack_pack_str_body(mp_pck, label_kv->key,
msgpack_pack_str_body(mp_pck, label_kv->key,
flb_sds_len(label_kv->key));
msgpack_pack_str(mp_pck, flb_sds_len(rval->val.string));
msgpack_pack_str_body(mp_pck, rval->val.string,
Expand All @@ -1082,7 +1082,7 @@ static int pack_resource_labels(struct flb_stackdriver *ctx,
} else {
flb_mp_map_header_append(mh);
msgpack_pack_str(mp_pck, flb_sds_len(label_kv->key));
msgpack_pack_str_body(mp_pck, label_kv->key,
msgpack_pack_str_body(mp_pck, label_kv->key,
flb_sds_len(label_kv->key));
msgpack_pack_str(mp_pck, flb_sds_len(label_kv->val));
msgpack_pack_str_body(mp_pck, label_kv->val,
Expand Down Expand Up @@ -1284,7 +1284,7 @@ static int cb_stackdriver_init(struct flb_output_instance *ins,
return -1;
}

if (ctx->resource_type != RESOURCE_TYPE_GENERIC_NODE
if (ctx->resource_type != RESOURCE_TYPE_GENERIC_NODE
&& ctx->resource_type != RESOURCE_TYPE_GENERIC_TASK) {
ret = gce_metadata_read_zone(ctx);
if (ret == -1) {
Expand Down Expand Up @@ -1434,13 +1434,13 @@ static int get_trace_sampled(int * trace_sampled_value, const msgpack_object * s
{
msgpack_object tmp;
int ret = get_msgpack_obj(&tmp, src_obj, key, flb_sds_len(key), MSGPACK_OBJECT_BOOLEAN);

if (ret == 0 && tmp.via.boolean == true) {
*trace_sampled_value = FLB_TRUE;
return 0;
} else if (ret == 0 && tmp.via.boolean == false) {
*trace_sampled_value = FLB_FALSE;
return 0;
return 0;
}

return -1;
Expand Down Expand Up @@ -1476,15 +1476,16 @@ static insert_id_status validate_insert_id(msgpack_object * insert_id_value,
return ret;
}

static int pack_json_payload(int insert_id_extracted,
int operation_extracted, int operation_extra_size,
int source_location_extracted,
int source_location_extra_size,
int http_request_extracted,
int http_request_extra_size,
timestamp_status tms_status,
msgpack_packer *mp_pck, msgpack_object *obj,
struct flb_stackdriver *ctx)
static int pack_payload(int insert_id_extracted,
int operation_extracted,
int operation_extra_size,
int source_location_extracted,
int source_location_extra_size,
int http_request_extracted,
int http_request_extra_size,
timestamp_status tms_status,
msgpack_packer *mp_pck, msgpack_object *obj,
struct flb_stackdriver *ctx)
{
/* Specified fields include local_resource_id, operation, sourceLocation ... */
int i, j;
Expand All @@ -1495,10 +1496,14 @@ static int pack_json_payload(int insert_id_extracted,
int len;
int len_to_be_removed;
int key_not_found;
int text_payload_len = 0;
int is_string_text_payload = FLB_FALSE;
int write_to_textpayload_field = FLB_FALSE;
flb_sds_t removed;
flb_sds_t monitored_resource_key;
flb_sds_t local_resource_id_key;
flb_sds_t stream;
flb_sds_t text_payload = NULL;
msgpack_object_kv *kv = obj->via.map.ptr;
msgpack_object_kv *const kvend = obj->via.map.ptr + obj->via.map.size;

Expand Down Expand Up @@ -1565,14 +1570,36 @@ static int pack_json_payload(int insert_id_extracted,

new_map_size = map_size - to_remove;

ret = msgpack_pack_map(mp_pck, new_map_size);
if (ret < 0) {
goto error;
if (ctx->text_payload_key && get_string(&text_payload, obj, ctx->text_payload_key) == 0) {
is_string_text_payload = FLB_TRUE;
}

/* write to textPayload if text_payload_key is the only residual string field*/
if ((new_map_size == 1) && is_string_text_payload) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way you had it was fine too, but I actually like this better for consistency with the lines above.

write_to_textpayload_field = FLB_TRUE;
}

if (write_to_textpayload_field) {
msgpack_pack_str(mp_pck, 11);
msgpack_pack_str_body(mp_pck, "textPayload", 11);

text_payload_len = flb_sds_len(text_payload);
msgpack_pack_str(mp_pck, text_payload_len);
msgpack_pack_str_body(mp_pck, text_payload, text_payload_len);
} else {
/* jsonPayload */
msgpack_pack_str(mp_pck, 11);
msgpack_pack_str_body(mp_pck, "jsonPayload", 11);

ret = msgpack_pack_map(mp_pck, new_map_size);
if (ret < 0) {
goto error;
}
}

/* points back to the beginning of map */
kv = obj->via.map.ptr;
for(; kv != kvend; ++kv ) {
for(; kv != kvend; ++kv) {
key_not_found = 1;

/* processing logging.googleapis.com/insertId */
Expand Down Expand Up @@ -1639,7 +1666,8 @@ static int pack_json_payload(int insert_id_extracted,
}
}

if (key_not_found) {
/* write residual log fields to jsonPayload */
if (key_not_found && !write_to_textpayload_field) {
ret = msgpack_pack_object(mp_pck, kv->key);
if (ret < 0) {
goto error;
Expand All @@ -1654,12 +1682,14 @@ static int pack_json_payload(int insert_id_extracted,
flb_sds_destroy(monitored_resource_key);
flb_sds_destroy(local_resource_id_key);
flb_sds_destroy(stream);
flb_sds_destroy(text_payload);
return 0;

error:
flb_sds_destroy(monitored_resource_key);
flb_sds_destroy(local_resource_id_key);
flb_sds_destroy(stream);
flb_sds_destroy(text_payload);
return ret;
}

Expand Down Expand Up @@ -1821,7 +1851,7 @@ static flb_sds_t stackdriver_format(struct flb_stackdriver *ctx,
msgpack_pack_str_body(&mp_pck, "labels", 6);

ret = pack_resource_labels(ctx, &mh, &mp_pck, data, bytes);
if (ret != 0) {
if (ret != 0) {
if (ctx->resource_type == RESOURCE_TYPE_K8S) {
ret = extract_local_resource_id(data, bytes, ctx, tag);
if (ret != 0) {
Expand Down Expand Up @@ -2314,7 +2344,7 @@ static flb_sds_t stackdriver_format(struct flb_stackdriver *ctx,
/* Extract httpRequest */
init_http_request(&http_request);
http_request_extra_size = 0;
http_request_extracted = extract_http_request(&http_request,
http_request_extracted = extract_http_request(&http_request,
ctx->http_request_key,
ctx->http_request_key_size,
obj, &http_request_extra_size);
Expand Down Expand Up @@ -2432,17 +2462,16 @@ static flb_sds_t stackdriver_format(struct flb_stackdriver *ctx,
flb_sds_destroy(source_location_function);
destroy_http_request(&http_request);

/* jsonPayload */
msgpack_pack_str(&mp_pck, 11);
msgpack_pack_str_body(&mp_pck, "jsonPayload", 11);
pack_json_payload(insert_id_extracted,
operation_extracted, operation_extra_size,
source_location_extracted,
source_location_extra_size,
http_request_extracted,
http_request_extra_size,
tms_status,
&mp_pck, obj, ctx);
/* both textPayload and jsonPayload are supported */
pack_payload(insert_id_extracted,
operation_extracted,
operation_extra_size,
source_location_extracted,
source_location_extra_size,
http_request_extracted,
http_request_extra_size,
tms_status,
&mp_pck, obj, ctx);

/* avoid modifying the original tag */
newtag = tag;
Expand Down Expand Up @@ -2594,7 +2623,7 @@ static void update_retry_metric(struct flb_stackdriver *ctx,
uint64_t ts,
int http_status)
{
char tmp[32];
char tmp[32];
char *name = (char *) flb_output_name(ctx->ins);

/* convert status to string format */
Expand Down Expand Up @@ -3154,6 +3183,11 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct flb_stackdriver, resource_labels),
"Set the resource labels"
},
{
FLB_CONFIG_MAP_STR, "text_payload_key", (char *)NULL,
0, FLB_TRUE, offsetof(struct flb_stackdriver, text_payload_key),
"Set key for extracting text payload"
},
{
FLB_CONFIG_MAP_BOOL, "test_log_entry_format", "false",
0, FLB_TRUE, offsetof(struct flb_stackdriver, test_log_entry_format),
Expand Down
3 changes: 3 additions & 0 deletions plugins/out_stackdriver/stackdriver.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,9 @@ struct flb_stackdriver {
/* upstream context for metadata end-point */
struct flb_upstream *metadata_u;

/* the key to extract unstructured text payload from */
flb_sds_t text_payload_key;

#ifdef FLB_HAVE_METRICS
/* metrics */
struct cmt_counter *cmt_successful_requests;
Expand Down
26 changes: 26 additions & 0 deletions tests/runtime/data/stackdriver/stackdriver_test_payload.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#define STRING_TEXT_PAYLOAD "[" \
"1595349600," \
"{" \
"\"message\": \"The application errored out\"," \
"\"logging.googleapis.com/severity\": \"ERROR\"" \
"}]"

#define STRING_TEXT_PAYLOAD_WITH_RESIDUAL_FIELDS "[" \
"1595349600," \
"{" \
"\"message\": \"The application errored out\"," \
"\"logging.googleapis.com/severity\": \"ERROR\"," \
"\"errorCode\": \"400\"" \
"}]"

#define NON_SCALAR_PAYLOAD_WITH_RESIDUAL_FIELDS "[" \
"1595349600," \
"{" \
"\"message\": " \
"{" \
"\"application_name\": \"my_application\"," \
"\"error_message\": \"The application errored out\"," \
"}," \
"\"logging.googleapis.com/severity\": \"ERROR\"," \
"\"errorCode\": \"400\"" \
"}]"
Loading
Loading