Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filterx parse csv #116

Merged
merged 6 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions modules/csvparser/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ set(CSVPARSER_SOURCES
csvparser-parser.c
csvparser-parser.h
csvparser-plugin.c
filterx-func-parse-csv.h
filterx-func-parse-csv.c
)

add_module(
Expand Down
4 changes: 3 additions & 1 deletion modules/csvparser/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ modules_csvparser_libcsvparser_la_SOURCES = \
modules/csvparser/csvparser-grammar.y \
modules/csvparser/csvparser-parser.c \
modules/csvparser/csvparser-parser.h \
modules/csvparser/csvparser-plugin.c
modules/csvparser/csvparser-plugin.c \
modules/csvparser/filterx-func-parse-csv.h \
modules/csvparser/filterx-func-parse-csv.c

modules_csvparser_libcsvparser_la_CPPFLAGS = \
$(AM_CPPFLAGS) \
Expand Down
6 changes: 6 additions & 0 deletions modules/csvparser/csvparser-plugin.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "csvparser.h"
#include "plugin.h"
#include "plugin-types.h"
#include "filterx-func-parse-csv.h"

extern CfgParser csvparser_parser;

Expand All @@ -35,6 +36,11 @@ static Plugin csvparser_plugins[] =
.name = "csv-parser",
.parser = &csvparser_parser,
},
{
.type = LL_CONTEXT_FILTERX_FUNC,
.name = "parse_csv",
.construct = filterx_function_construct_parse_csv,
},
};

gboolean
Expand Down
350 changes: 350 additions & 0 deletions modules/csvparser/filterx-func-parse-csv.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,350 @@
/*
* Copyright (c) 2024 shifter
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 as published
* by the Free Software Foundation, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*
* As an additional exemption you are allowed to compile & link against the
* OpenSSL libraries as published by the OpenSSL project. See the file
* COPYING for details.
*
*/

#include "filterx-func-parse-csv.h"
#include "filterx/object-string.h"
#include "filterx/object-primitive.h"
#include "filterx/expr-literal.h"
#include "filterx/filterx-eval.h"
#include "filterx/filterx-globals.h"
#include "filterx/object-json.h"
#include "filterx/object-message-value.h"
#include "filterx/object-null.h"
#include "filterx/filterx-object.h"
#include "filterx/object-dict-interface.h"
#include "filterx/object-list-interface.h"

#include "scanner/csv-scanner/csv-scanner.h"
#include "parser/parser-expr.h"
#include "scratch-buffers.h"
#include "str-utils.h"
#include "csvparser.h"

typedef struct FilterXFunctionParseCSV_
{
FilterXFunction super;
FilterXExpr *msg;
CSVScannerOptions options;
FilterXExpr *columns;
} FilterXFunctionParseCSV;

static gboolean
_parse_columns(FilterXFunctionParseCSV *self, GList **col_names)
{
gboolean result = FALSE;
if (!self->columns)
return TRUE;
FilterXObject *cols_obj = filterx_expr_eval(self->columns);
if (!cols_obj)
return FALSE;

if (!filterx_object_is_type(cols_obj, &FILTERX_TYPE_NAME(json_array)))
{
msg_error("columns argument must be a type of json array.",
evt_tag_str("current_type", cols_obj->type->name ) );
goto exit;
}

guint64 size;
if (!filterx_object_len(cols_obj, &size))
return FALSE;

for (guint64 i = 0; i < size; i++)
{
FilterXObject *col = filterx_list_get_subscript(cols_obj, i);
if (filterx_object_is_type(col, &FILTERX_TYPE_NAME(string)))
{
const gchar *col_name = filterx_string_get_value(col, NULL);
*col_names = g_list_append(*col_names, g_strdup(col_name));
}
filterx_object_unref(col);
}

result = TRUE;
exit:
filterx_object_unref(cols_obj);
return result;
}

static FilterXObject *
_eval(FilterXExpr *s)
{
FilterXFunctionParseCSV *self = (FilterXFunctionParseCSV *) s;

FilterXObject *obj = filterx_expr_eval(self->msg);
if (!obj)
return NULL;

CSVScanner scanner;
gboolean ok = FALSE;
FilterXObject *result = NULL;
GList *cols = NULL;

gsize len;
const gchar *input;
if (filterx_object_is_type(obj, &FILTERX_TYPE_NAME(string)))
input = filterx_string_get_value(obj, &len);
else if (filterx_object_is_type(obj, &FILTERX_TYPE_NAME(message_value))
&& filterx_message_value_get_type(obj) == LM_VT_STRING)
input = filterx_message_value_get_value(obj, &len);
else
goto exit;

APPEND_ZERO(input, input, len);

if (!_parse_columns(self, &cols))
goto exit;

if (cols)
{
csv_scanner_options_set_expected_columns(&self->options, g_list_length(cols));
result = filterx_json_object_new_empty();
}
else
result = filterx_json_array_new_empty();

csv_scanner_init(&scanner, &self->options, input);

GList *col = cols;
while (csv_scanner_scan_next(&scanner))
{
if (cols)
{
if (!col)
break;
FilterXObject *key = filterx_string_new(col->data, -1);
FilterXObject *val = filterx_string_new(csv_scanner_get_current_value(&scanner),
csv_scanner_get_current_value_len(&scanner));

ok = filterx_object_set_subscript(result, key, &val);

filterx_object_unref(key);
filterx_object_unref(val);

if (!ok)
goto exit;
col = g_list_next(col);
}
else
{
const gchar *current_value = csv_scanner_get_current_value(&scanner);
gint current_value_len = csv_scanner_get_current_value_len(&scanner);
FilterXObject *val = filterx_string_new(current_value, current_value_len);

ok = filterx_list_append(result, &val);

filterx_object_unref(val);
}
}

exit:
if (!ok)
{
filterx_object_unref(result);
}
g_list_free_full(cols, (GDestroyNotify)g_free);
filterx_object_unref(obj);
csv_scanner_deinit(&scanner);
return ok?result:NULL;
}

static void
_free(FilterXExpr *s)
{
FilterXFunctionParseCSV *self = (FilterXFunctionParseCSV *) s;
filterx_expr_unref(self->msg);
filterx_expr_unref(self->columns);
csv_scanner_options_clean(&self->options);
filterx_function_free_method(&self->super);
}

static FilterXExpr *
_extract_msg_expr(FilterXFunctionArgs *args, GError **error)
{
FilterXExpr *msg_expr = filterx_function_args_get_expr(args, 0);
if (!msg_expr)
{
g_set_error(error, FILTERX_FUNCTION_ERROR, FILTERX_FUNCTION_ERROR_CTOR_FAIL,
"argument must be set: msg_str. " FILTERX_FUNC_PARSE_CSV_USAGE);
return NULL;
}

return msg_expr;
}

static FilterXExpr *
_extract_columns_expr(FilterXFunctionArgs *args, GError **error)
{
return filterx_function_args_get_named_expr(args, FILTERX_FUNC_PARSE_CSV_ARG_NAME_COLUMNS);
}

static gboolean
_extract_opts(FilterXFunctionParseCSV *self, FilterXFunctionArgs *args, GError **error)
{
guint32 opt_flags = self->options.flags;

const gchar *error_str = "";
gboolean exists;
gsize len;
const gchar *value;
gboolean flag_err = FALSE;
gboolean flag_val = FALSE;

value = filterx_function_args_get_named_literal_string(args, FILTERX_FUNC_PARSE_CSV_ARG_NAME_DELIMITERS, &len, &exists);
if (exists)
{
if (len < 1)
{
error_str = FILTERX_FUNC_PARSE_CSV_ARG_NAME_DELIMITERS " can not be empty";
goto error;
}
if (!value)
{
error_str = FILTERX_FUNC_PARSE_CSV_ARG_NAME_DELIMITERS " must be a string literal";
goto error;
}
csv_scanner_options_set_delimiters(&self->options, value);
}

value = filterx_function_args_get_named_literal_string(args, FILTERX_FUNC_PARSE_CSV_ARG_NAME_DIALECT, &len, &exists);
if (exists)
{
if (len < 1)
{
error_str = FILTERX_FUNC_PARSE_CSV_ARG_NAME_DIALECT " can not be empty";
goto error;
}
if (!value)
{
error_str = FILTERX_FUNC_PARSE_CSV_ARG_NAME_DIALECT " must be a string literal";
goto error;
}
CSVScannerDialect dialect = csv_parser_lookup_dialect(value);
if (dialect == -1)
{
error_str = FILTERX_FUNC_PARSE_CSV_ARG_NAME_DIALECT " argument must be one of: [" \
"escape-none, " \
"escape-backslash, " \
"escape-backslash-with-sequences, " \
"escape-double-char]";
goto error;
}
csv_scanner_options_set_dialect(&self->options, dialect);
}

flag_val = filterx_function_args_get_named_literal_boolean(args, FILTERX_FUNC_PARSE_CSV_ARG_NAME_GREEDY, &exists,
&flag_err);
if (exists)
{

if (flag_err)
{
error_str = FILTERX_FUNC_PARSE_CSV_ARG_NAME_GREEDY " argument evaluation error";
goto error;
}

if (flag_val)
opt_flags |= CSV_SCANNER_GREEDY;
else
opt_flags &= ~CSV_SCANNER_GREEDY;
}

flag_val = filterx_function_args_get_named_literal_boolean(args, FILTERX_FUNC_PARSE_CSV_ARG_NAME_STRIP_WHITESPACES,
&exists,
&flag_err);
if (exists)
{

if (flag_err)
{
error_str = FILTERX_FUNC_PARSE_CSV_ARG_NAME_STRIP_WHITESPACES " argument evaluation error";
goto error;
}

if (flag_val)
opt_flags |= CSV_SCANNER_STRIP_WHITESPACE;
else
opt_flags &= ~CSV_SCANNER_STRIP_WHITESPACE;
}

csv_scanner_options_set_flags(&self->options, opt_flags);

return TRUE;
error:
g_set_error(error, FILTERX_FUNCTION_ERROR, FILTERX_FUNCTION_ERROR_CTOR_FAIL,
"%s. %s", error_str, FILTERX_FUNC_PARSE_CSV_USAGE);
return FALSE;
}

static gboolean
_extract_args(FilterXFunctionParseCSV *self, FilterXFunctionArgs *args, GError **error)
{
gsize args_len = filterx_function_args_len(args);
if (args_len != 1)
{
g_set_error(error, FILTERX_FUNCTION_ERROR, FILTERX_FUNCTION_ERROR_CTOR_FAIL,
"invalid number of arguments. " FILTERX_FUNC_PARSE_CSV_USAGE);
return FALSE;
}

self->msg = _extract_msg_expr(args, error);
if (!self->msg)
return FALSE;

self->columns = _extract_columns_expr(args, error);

if (!_extract_opts(self, args, error))
return FALSE;

return TRUE;
}

FilterXExpr *
filterx_function_parse_csv_new(const gchar *function_name, FilterXFunctionArgs *args, GError **error)

{
FilterXFunctionParseCSV *self = g_new0(FilterXFunctionParseCSV, 1);
filterx_function_init_instance(&self->super, function_name);
self->super.super.eval = _eval;
self->super.super.free_fn = _free;
csv_scanner_options_set_delimiters(&self->options, " ");
csv_scanner_options_set_quote_pairs(&self->options, "\"\"''");
csv_scanner_options_set_flags(&self->options, CSV_SCANNER_STRIP_WHITESPACE);
csv_scanner_options_set_dialect(&self->options, CSV_SCANNER_ESCAPE_NONE);

if (!_extract_args(self, args, error))
goto error;

filterx_function_args_free(args);
return &self->super.super;

error:
filterx_function_args_free(args);
filterx_expr_unref(&self->super.super);
return NULL;
}

gpointer
filterx_function_construct_parse_csv(Plugin *self)
{
return (gpointer) filterx_function_parse_csv_new;
}
Loading
Loading