diff --git a/src/stream_processor/flb_sp.c b/src/stream_processor/flb_sp.c index 00eb2f18b78..41a7ac8f432 100644 --- a/src/stream_processor/flb_sp.c +++ b/src/stream_processor/flb_sp.c @@ -266,8 +266,8 @@ static int sp_cmd_aggregated_keys(struct flb_sp_cmd *cmd) } /* - * if some aggregated function is required, not aggregated keys are - * not allowed so we return an error (-1). + * If aggregated functions are included in the query, non-aggregated keys are + * not allowed (except for the ones inside GROUP BY statement). */ if (aggr > 0 && not_aggr == 0) { return aggr; @@ -490,7 +490,7 @@ struct flb_sp_task *flb_sp_task_create(struct flb_sp *sp, const char *name, /* Check and validate aggregated keys */ ret = sp_cmd_aggregated_keys(task->cmd); if (ret == -1) { - flb_error("[sp] aggregated query cannot mix not aggregated keys: %s", + flb_error("[sp] aggregated query cannot include the aggregated keys: %s", query); flb_sp_task_destroy(task); return NULL; @@ -506,10 +506,10 @@ struct flb_sp_task *flb_sp_task_create(struct flb_sp *sp, const char *name, event = &task->window.event; MK_EVENT_ZERO(event); - /* Run every 'size' seconds */ + /* Run every 'window size' seconds */ fd = mk_event_timeout_create(sp->config->evl, cmd->window.size, (long) 0, - &task->window.event); + event); if (fd == -1) { flb_error("[sp] registration for task %s failed", task->name); flb_free(task); @@ -525,7 +525,7 @@ struct flb_sp_task *flb_sp_task_create(struct flb_sp *sp, const char *name, /* Run every 'size' seconds */ fd = mk_event_timeout_create(sp->config->evl, cmd->window.advance_by, (long) 0, - &task->window.event_hop); + event); if (fd == -1) { flb_error("[sp] registration for task %s failed", task->name); flb_free(task); @@ -624,8 +624,7 @@ void flb_sp_aggregate_node_destroy(struct flb_sp_cmd *cmd, flb_free(aggr_node); } -void flb_sp_window_destroy(struct flb_sp_cmd *cmd, - struct flb_sp_task_window *window) +void flb_sp_window_destroy(struct flb_sp_task *task) { struct flb_sp_window_data *data; struct aggregate_node *aggr_node; @@ -635,39 +634,45 @@ void flb_sp_window_destroy(struct flb_sp_cmd *cmd, struct mk_list *head_hs; struct mk_list *tmp_hs; - mk_list_foreach_safe(head, tmp, &window->data) { + mk_list_foreach_safe(head, tmp, &task->window.data) { data = mk_list_entry(head, struct flb_sp_window_data, _head); flb_free(data->buf_data); mk_list_del(&data->_head); flb_free(data); } - mk_list_foreach_safe(head, tmp, &window->aggregate_list) { + mk_list_foreach_safe(head, tmp, &task->window.aggregate_list) { aggr_node = mk_list_entry(head, struct aggregate_node, _head); mk_list_del(&aggr_node->_head); - flb_sp_aggregate_node_destroy(cmd, aggr_node); + flb_sp_aggregate_node_destroy(task->cmd, aggr_node); } - mk_list_foreach_safe(head, tmp, &window->hopping_slot) { + mk_list_foreach_safe(head, tmp, &task->window.hopping_slot) { hs = mk_list_entry(head, struct flb_sp_hopping_slot, _head); mk_list_foreach_safe(head_hs, tmp_hs, &hs->aggregate_list) { aggr_node = mk_list_entry(head_hs, struct aggregate_node, _head); mk_list_del(&aggr_node->_head); - flb_sp_aggregate_node_destroy(cmd, aggr_node); + flb_sp_aggregate_node_destroy(task->cmd, aggr_node); } rb_tree_destroy(&hs->aggregate_tree); flb_free(hs); } - rb_tree_destroy(&window->aggregate_tree); + if (task->window.fd > 0) { + mk_event_timeout_destroy(task->sp->config->evl, &task->window.event); + mk_event_closesocket(task->window.fd); + } + + rb_tree_destroy(&task->window.aggregate_tree); } void flb_sp_task_destroy(struct flb_sp_task *task) { flb_sds_destroy(task->name); flb_sds_destroy(task->query); - flb_sp_window_destroy(task->cmd, &task->window); + flb_sp_window_destroy(task); flb_sp_snapshot_destroy(task->snapshot); + mk_list_del(&task->_head); if (task->stream) { @@ -1114,6 +1119,7 @@ void package_results(const char *tag, int tag_len, char **out_buf, size_t *out_size, struct flb_sp_task *task) { + char *c_name; int i; int len; int map_entries; @@ -1165,14 +1171,13 @@ void package_results(const char *tag, int tag_len, flb_sds_len(ckey->alias)); } else { - len = 0; - char *c_name; if (!ckey->name) { c_name = "*"; } else { c_name = ckey->name; } + len = strlen(c_name); msgpack_pack_str(&mp_pck, len); msgpack_pack_str_body(&mp_pck, c_name, len); diff --git a/src/stream_processor/parser/flb_sp_parser.c b/src/stream_processor/parser/flb_sp_parser.c index 429d0b4c105..a72cf04efb0 100644 --- a/src/stream_processor/parser/flb_sp_parser.c +++ b/src/stream_processor/parser/flb_sp_parser.c @@ -139,8 +139,8 @@ struct flb_sp_cmd_key *flb_sp_key_create(struct flb_sp_cmd *cmd, int func, struct flb_sp_cmd_key *key; struct flb_slist_entry *entry; - /* aggregation function ? */ if (func >= FLB_SP_AVG && func <= FLB_SP_FORECAST) { + /* Aggregation function */ aggr_func = func; } else if (func >= FLB_SP_NOW && func <= FLB_SP_UNIX_TIMESTAMP) { diff --git a/tests/internal/include/sp_cb_functions.h b/tests/internal/include/sp_cb_functions.h index aee3fb393e6..346a4bb7e12 100644 --- a/tests/internal/include/sp_cb_functions.h +++ b/tests/internal/include/sp_cb_functions.h @@ -535,6 +535,20 @@ static void cb_select_groupby(int id, struct task_check *check, ret = mp_count_rows(buf, size); TEST_CHECK(ret == 2); + /* bool is 1 for record 0 (bool=true) */ + ret = mp_record_key_cmp(buf, size, + 0, "bool", + MSGPACK_OBJECT_POSITIVE_INTEGER, + NULL, 1, 0); + TEST_CHECK(ret == FLB_TRUE); + + /* bool is 0 for record 1 (bool=false) */ + ret = mp_record_key_cmp(buf, size, + 1, "bool", + MSGPACK_OBJECT_POSITIVE_INTEGER, + NULL, 0, 0); + TEST_CHECK(ret == FLB_TRUE); + /* MIN(id) is 0 for record 0 (bool=true) */ ret = mp_record_key_cmp(buf, size, 0, "MIN(id)", @@ -556,7 +570,7 @@ static void cb_select_groupby(int id, struct task_check *check, NULL, 8, 0); TEST_CHECK(ret == FLB_TRUE); - /* MAX(id) is i9 for record 1 (bool=false) */ + /* MAX(id) is 9 for record 1 (bool=false) */ ret = mp_record_key_cmp(buf, size, 1, "MAX(id)", MSGPACK_OBJECT_POSITIVE_INTEGER,