From d73eb1c14492f49ea2448214b99f655a9f10af3c Mon Sep 17 00:00:00 2001 From: Felix Fontein Date: Mon, 11 Oct 2021 23:44:40 +0200 Subject: [PATCH] Add module_utils and filters for quoting and unquoting (#53) * Move splitting code to own file. * Move list to dictionary code to quoting as well. * Add quoting functionality. * Add quoting filters. * Add integration tests to CI. * Fix bugs, increase coverage. * Make parsing more strict. * Extract function parse_argument_value from split_routeros_command to make proper parsing of WHERE possible. * Adjust expected error message in integration tests. * Simplify code and improve coverage. * Add changelog fragment for WHERE strictness in api module. * Add documenation. * Fix typo. * Add documentation references. * Add example to api module which uses quote_argument_value. * Fix bug, and add tests which prevent this in the future. * Add more escape sequence tests. * Make sure all control characters are quoted. --- .github/workflows/ansible-test.yml | 73 +++++ changelogs/fragments/53-api-where.yml | 2 + changelogs/fragments/53-quoting-filters.yml | 12 + docs/docsite/extra-docs.yml | 1 + docs/docsite/rst/quoting.rst | 14 + plugins/filter/quoting.py | 113 ++++++++ plugins/module_utils/quoting.py | 206 +++++++++++++ plugins/modules/api.py | 156 +++------- plugins/modules/command.py | 5 + plugins/modules/facts.py | 3 + .../targets/filter_quoting/aliases | 2 + .../targets/filter_quoting/tasks/main.yml | 59 ++++ .../unit/plugins/module_utils/test_quoting.py | 272 ++++++++++++++++++ tests/unit/plugins/modules/test_api.py | 30 -- 14 files changed, 803 insertions(+), 145 deletions(-) create mode 100644 changelogs/fragments/53-api-where.yml create mode 100644 changelogs/fragments/53-quoting-filters.yml create mode 100644 docs/docsite/rst/quoting.rst create mode 100644 plugins/filter/quoting.py create mode 100644 plugins/module_utils/quoting.py create mode 100644 tests/integration/targets/filter_quoting/aliases create mode 100644 tests/integration/targets/filter_quoting/tasks/main.yml create mode 100644 tests/unit/plugins/module_utils/test_quoting.py diff --git a/.github/workflows/ansible-test.yml b/.github/workflows/ansible-test.yml index 9a00a3bb..5777dcd4 100644 --- a/.github/workflows/ansible-test.yml +++ b/.github/workflows/ansible-test.yml @@ -112,3 +112,76 @@ jobs: - uses: codecov/codecov-action@v1 with: fail_ci_if_error: false + +### +# Integration tests (RECOMMENDED) +# +# https://docs.ansible.com/ansible/latest/dev_guide/testing_integration.html + + integration: + runs-on: ubuntu-latest + name: I (Ⓐ${{ matrix.ansible }}+py${{ matrix.python }}) + strategy: + fail-fast: false + matrix: + ansible: + - stable-2.12 + - devel + python: + - 3.8 + - 3.9 + - "3.10" + include: + # 2.9 + - ansible: stable-2.9 + python: 2.7 + - ansible: stable-2.9 + python: 3.5 + - ansible: stable-2.9 + python: 3.6 + # 2.10 + - ansible: stable-2.10 + python: 3.5 + # 2.11 + - ansible: stable-2.11 + python: 2.7 + - ansible: stable-2.11 + python: 3.6 + - ansible: stable-2.11 + python: 3.9 + + steps: + - name: Check out code + uses: actions/checkout@v2 + with: + path: ansible_collections/community/routeros + + - name: Set up Python + uses: actions/setup-python@v2 + with: + # it is just required to run that once as "ansible-test integration" in the docker image + # will run on all python versions it supports. + python-version: 3.8 + + - name: Install ansible-core (${{ matrix.ansible }}) + run: pip install https://github.com/ansible/ansible/archive/${{ matrix.ansible }}.tar.gz --disable-pip-version-check + + - name: Install collection dependencies + run: git clone --depth=1 --single-branch https://github.com/ansible-collections/ansible.netcommon.git ansible_collections/ansible/netcommon + # NOTE: we're installing with git to work around Galaxy being a huge PITA (https://github.com/ansible/galaxy/issues/2429) + # run: ansible-galaxy collection install ansible.netcommon -p . + + # Run the integration tests + - name: Run integration test + run: ansible-test integration -v --color --retry-on-error --continue-on-error --diff --python ${{ matrix.python }} --docker --coverage + working-directory: ./ansible_collections/community/routeros + + # ansible-test support producing code coverage date + - name: Generate coverage report + run: ansible-test coverage xml -v --requirements --group-by command --group-by version + working-directory: ./ansible_collections/community/routeros + + # See the reports at https://codecov.io/gh/ansible-collections/community.routeros + - uses: codecov/codecov-action@v1 + with: + fail_ci_if_error: false diff --git a/changelogs/fragments/53-api-where.yml b/changelogs/fragments/53-api-where.yml new file mode 100644 index 00000000..a2b09e20 --- /dev/null +++ b/changelogs/fragments/53-api-where.yml @@ -0,0 +1,2 @@ +minor_changes: +- "api - make validation of ``WHERE`` for ``query`` more strict (https://github.com/ansible-collections/community.routeros/pull/53)." diff --git a/changelogs/fragments/53-quoting-filters.yml b/changelogs/fragments/53-quoting-filters.yml new file mode 100644 index 00000000..3cbb5cd2 --- /dev/null +++ b/changelogs/fragments/53-quoting-filters.yml @@ -0,0 +1,12 @@ +--- +add plugin.filter: + - name: split + description: Split a command into arguments + - name: quote_argument_value + description: Quote an argument value + - name: quote_argument + description: Quote an argument + - name: join + description: Join a list of arguments to a command + - name: list_to_dict + description: Convert a list of arguments to a list of dictionary diff --git a/docs/docsite/extra-docs.yml b/docs/docsite/extra-docs.yml index cb2ef6aa..385bd7e1 100644 --- a/docs/docsite/extra-docs.yml +++ b/docs/docsite/extra-docs.yml @@ -4,3 +4,4 @@ sections: toctree: - api-guide - ssh-guide + - quoting diff --git a/docs/docsite/rst/quoting.rst b/docs/docsite/rst/quoting.rst new file mode 100644 index 00000000..f17a8618 --- /dev/null +++ b/docs/docsite/rst/quoting.rst @@ -0,0 +1,14 @@ +.. _ansible_collections.community.routeros.docsite.quoting: + +How to quote and unquote commands and arguments +=============================================== + +When using the :ref:`community.routeros.command module ` or the :ref:`community.routeros.api module ` modules, you need to pass text data in quoted form. While in some cases quoting is not needed (when passing IP addresses or names without spaces, for example), in other cases it is required, like when passing a comment which contains a space. + +The community.routeros collection provides a set of Jinja2 filter plugins which helps you with these tasks: + +- The ``community.routeros.quote_argument_value`` filter quotes an argument value: ``'this is a "comment"' | community.routeros.quote_argument_value == '"this is a \\"comment\\""'``. +- The ``community.routeros.quote_argument`` filter quotes an argument with or without a value: ``'comment=this is a "comment"' | community.routeros.quote_argument == 'comment="this is a \\"comment\\""'``. +- The ``community.routeros.join`` filter quotes a list of arguments and joins them to one string: ``['foo=bar', 'comment=foo is bar'] | community.routeros.join == 'foo=bar comment="foo is bar"'``. +- The ``community.routeros.split`` filter splits a command into a list of arguments (with or without values): ``'foo=bar comment="foo is bar"' | community.routeros.split == ['foo=bar', 'comment=foo is bar']`` +- The ``community.routeros.list_to_dict`` filter splits a list of arguments with values into a dictionary: ``['foo=bar', 'comment=foo is bar'] | community.routeros.list_to_dict == {'foo': 'bar', 'comment': 'foo is bar'}``. It has two optional arguments: ``require_assignment`` (default value ``true``) allows to accept arguments without values when set to ``false``; and ``skip_empty_values`` (default value ``false``) allows to skip arguments whose value is empty. diff --git a/plugins/filter/quoting.py b/plugins/filter/quoting.py new file mode 100644 index 00000000..aabb421a --- /dev/null +++ b/plugins/filter/quoting.py @@ -0,0 +1,113 @@ +# -*- coding: utf-8 -*- + +# Copyright: (c) 2021, Felix Fontein +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + +from ansible.errors import AnsibleFilterError +from ansible.module_utils.common.text.converters import to_text + +from ansible_collections.community.routeros.plugins.module_utils.quoting import ( + ParseError, + convert_list_to_dictionary, + join_routeros_command, + quote_routeros_argument, + quote_routeros_argument_value, + split_routeros_command, +) + + +def wrap_exception(fn, *args, **kwargs): + try: + return fn(*args, **kwargs) + except ParseError as e: + raise AnsibleFilterError(to_text(e)) + + +def split(line): + ''' + Split a command into arguments. + + Example: + 'add name=wrap comment="with space"' + is converted to: + ['add', 'name=wrap', 'comment=with space'] + ''' + return wrap_exception(split_routeros_command, line) + + +def quote_argument_value(argument): + ''' + Quote an argument value. + + Example: + 'with "space"' + is converted to: + r'"with \"space\""' + ''' + return wrap_exception(quote_routeros_argument_value, argument) + + +def quote_argument(argument): + ''' + Quote an argument. + + Example: + 'comment=with "space"' + is converted to: + r'comment="with \"space\""' + ''' + return wrap_exception(quote_routeros_argument, argument) + + +def join(arguments): + ''' + Join a list of arguments to a command. + + Example: + ['add', 'name=wrap', 'comment=with space'] + is converted to: + 'add name=wrap comment="with space"' + ''' + return wrap_exception(join_routeros_command, arguments) + + +def list_to_dict(string_list, require_assignment=True, skip_empty_values=False): + ''' + Convert a list of arguments to a list of dictionary. + + Example: + ['foo=bar', 'comment=with space', 'additional='] + is converted to: + {'foo': 'bar', 'comment': 'with space', 'additional': ''} + + If require_assignment is True (default), arguments without assignments are + rejected. (Example: in ['add', 'name=foo'], 'add' is an argument without + assignment.) If it is False, these are given value None. + + If skip_empty_values is True, arguments with empty value are removed from + the result. (Example: in ['name='], 'name' has an empty value.) + If it is False (default), these are kept. + + ''' + return wrap_exception( + convert_list_to_dictionary, + string_list, + require_assignment=require_assignment, + skip_empty_values=skip_empty_values, + ) + + +class FilterModule(object): + '''Ansible jinja2 filters for RouterOS command quoting and unquoting''' + + def filters(self): + return { + 'split': split, + 'quote_argument': quote_argument, + 'quote_argument_value': quote_argument_value, + 'join': join, + 'list_to_dict': list_to_dict, + } diff --git a/plugins/module_utils/quoting.py b/plugins/module_utils/quoting.py new file mode 100644 index 00000000..61d67da5 --- /dev/null +++ b/plugins/module_utils/quoting.py @@ -0,0 +1,206 @@ +# -*- coding: utf-8 -*- + +# Copyright: (c) 2021, Felix Fontein (@felixfontein) +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + + +import sys + +from ansible.module_utils.common.text.converters import to_native, to_bytes + + +class ParseError(Exception): + pass + + +ESCAPE_SEQUENCES = { + b'"': b'"', + b'\\': b'\\', + b'?': b'?', + b'$': b'$', + b'_': b'_', + b'a': b'\a', + b'b': b'\b', + b'f': b'\xFF', + b'n': b'\n', + b'r': b'\r', + b't': b'\t', + b'v': b'\v', +} + +ESCAPE_SEQUENCE_REVERSED = dict([(v, k) for k, v in ESCAPE_SEQUENCES.items()]) + +ESCAPE_DIGITS = b'0123456789ABCDEF' + + +if sys.version_info[0] < 3: + _int_to_byte = chr +else: + def _int_to_byte(value): + return bytes((value, )) + + +def parse_argument_value(line, start_index=0, must_match_everything=True): + ''' + Parse an argument value (quoted or not quoted) from ``line``. + + Will start at offset ``start_index``. Returns pair ``(parsed_value, + end_index)``, where ``end_index`` is the first character after the + attribute. + + If ``must_match_everything`` is ``True`` (default), will fail if + ``end_index < len(line)``. + ''' + line = to_bytes(line) + length = len(line) + index = start_index + if index == length: + raise ParseError('Expected value, but found end of string') + quoted = False + if line[index:index + 1] == b'"': + quoted = True + index += 1 + current = [] + while index < length: + ch = line[index:index + 1] + index += 1 + if not quoted and ch == b' ': + index -= 1 + break + elif ch == b'"': + if quoted: + quoted = False + if line[index:index + 1] not in (b'', b' '): + raise ParseError('Ending \'"\' must be followed by space or end of string') + break + raise ParseError('\'"\' must not appear in an unquoted value') + elif ch == b'\\': + if not quoted: + raise ParseError('Escape sequences can only be used inside double quotes') + if index == length: + raise ParseError('\'\\\' must not be at the end of the line') + ch = line[index:index + 1] + index += 1 + if ch in ESCAPE_SEQUENCES: + current.append(ESCAPE_SEQUENCES[ch]) + else: + d1 = ESCAPE_DIGITS.find(ch) + if d1 < 0: + raise ParseError('Invalid escape sequence \'\\{0}\''.format(to_native(ch))) + if index == length: + raise ParseError('Hex escape sequence cut off at end of line') + ch2 = line[index:index + 1] + d2 = ESCAPE_DIGITS.find(ch2) + index += 1 + if d2 < 0: + raise ParseError('Invalid hex escape sequence \'\\{0}\''.format(to_native(ch + ch2))) + current.append(_int_to_byte(d1 * 16 + d2)) + else: + if not quoted and ch in (b"'", b'=', b'(', b')', b'$', b'[', b'{', b'`'): + raise ParseError('"{0}" can only be used inside double quotes'.format(to_native(ch))) + if ch == b'?': + raise ParseError('"{0}" can only be used in escaped form'.format(to_native(ch))) + current.append(ch) + if quoted: + raise ParseError('Unexpected end of string during escaped parameter') + if must_match_everything and index < length: + raise ParseError('Unexpected data at end of value') + return to_native(b''.join(current)), index + + +def split_routeros_command(line): + line = to_bytes(line) + result = [] + current = [] + index = 0 + length = len(line) + parsing_attribute_name = False + while index < length: + ch = line[index:index + 1] + index += 1 + if ch == b' ': + if parsing_attribute_name: + parsing_attribute_name = False + result.append(b''.join(current)) + current = [] + elif ch == b'=' and parsing_attribute_name: + current.append(ch) + value, index = parse_argument_value(line, start_index=index, must_match_everything=False) + current.append(to_bytes(value)) + parsing_attribute_name = False + result.append(b''.join(current)) + current = [] + elif ch in (b'"', b'\\', b"'", b'=', b'(', b')', b'$', b'[', b'{', b'`', b'?'): + raise ParseError('Found unexpected "{0}"'.format(to_native(ch))) + else: + current.append(ch) + parsing_attribute_name = True + if parsing_attribute_name and current: + result.append(b''.join(current)) + return [to_native(part) for part in result] + + +def quote_routeros_argument_value(argument): + argument = to_bytes(argument) + result = [] + quote = False + length = len(argument) + index = 0 + while index < length: + letter = argument[index:index + 1] + index += 1 + if letter in ESCAPE_SEQUENCE_REVERSED: + result.append(b'\\%s' % ESCAPE_SEQUENCE_REVERSED[letter]) + quote = True + continue + elif ord(letter) < 32: + v = ord(letter) + v1 = v % 16 + v2 = v // 16 + result.append(b'\\%s%s' % (ESCAPE_DIGITS[v2:v2 + 1], ESCAPE_DIGITS[v1:v1 + 1])) + quote = True + continue + elif letter in (b' ', b'=', b';', b"'"): + quote = True + result.append(letter) + argument = to_native(b''.join(result)) + if quote or not argument: + argument = '"%s"' % argument + return argument + + +def quote_routeros_argument(argument): + def check_attribute(attribute): + if ' ' in attribute: + raise ParseError('Attribute names must not contain spaces') + return attribute + + if '=' not in argument: + check_attribute(argument) + return argument + + attribute, value = argument.split('=', 1) + check_attribute(attribute) + value = quote_routeros_argument_value(value) + return '%s=%s' % (attribute, value) + + +def join_routeros_command(arguments): + return ' '.join([quote_routeros_argument(argument) for argument in arguments]) + + +def convert_list_to_dictionary(string_list, require_assignment=True, skip_empty_values=False): + dictionary = {} + for p in string_list: + if '=' not in p: + if require_assignment: + raise ParseError("missing '=' after '%s'" % p) + dictionary[p] = None + continue + p = p.split('=', 1) + if not skip_empty_values or p[1]: + dictionary[p[0]] = p[1] + return dictionary diff --git a/plugins/modules/api.py b/plugins/modules/api.py index 93ea2500..2fe904c8 100644 --- a/plugins/modules/api.py +++ b/plugins/modules/api.py @@ -119,6 +119,11 @@ - See also I(validate_cert_hostname). Only used when I(tls=true) and I(validate_certs=true). type: path version_added: 1.2.0 +seealso: + - ref: ansible_collections.community.routeros.docsite.api-guide + description: How to connect to RouterOS devices with the RouterOS API + - ref: ansible_collections.community.routeros.docsite.quoting + description: How to quote and unquote commands and arguments ''' EXAMPLES = ''' @@ -190,7 +195,10 @@ password: "{{ password }}" username: "{{ username }}" path: "{{ path }}" - update: ".id={{ query_id }} address={{ ip3 }}" + update: >- + .id={{ query_id }} + address={{ ip3 }} + comment={{ 'A comment with spaces' | community.routeros.quote_argument_value }} register: updateout - name: Dump "Update" output @@ -258,8 +266,16 @@ from ansible.module_utils.basic import AnsibleModule from ansible.module_utils.basic import missing_required_lib -from ansible.module_utils.common.text.converters import to_native, to_bytes +from ansible.module_utils.common.text.converters import to_native +from ansible_collections.community.routeros.plugins.module_utils.quoting import ( + ParseError, + convert_list_to_dictionary, + parse_argument_value, + split_routeros_command, +) + +import re import ssl import traceback @@ -274,95 +290,6 @@ LIB_IMP_ERR = traceback.format_exc() -class ParseError(Exception): - pass - - -ESCAPE_SEQUENCES = { - b'"': b'"', - b'\\': b'\\', - b'?': b'?', - b'$': b'$', - b'_': b'_', - b'a': b'\a', - b'b': b'\b', - b'f': b'\xFF', - b'n': b'\n', - b'r': b'\r', - b't': b'\t', - b'v': b'\v', -} - -ESCAPE_DIGITS = b'0123456789ABCDEF' - - -def split_routeros(line): - line = to_bytes(line) - result = [] - current = [] - index = 0 - length = len(line) - # States: - # 0 = outside param - # 1 = param before '=' - # 2 = param after '=' without quote - # 3 = param after '=' with quote - state = 0 - while index < length: - ch = line[index:index + 1] - index += 1 - if state == 0 and ch == b' ': - pass - elif state in (1, 2) and ch == b' ': - state = 0 - result.append(b''.join(current)) - current = [] - elif ch == b'=' and state == 1: - state = 2 - current.append(ch) - if index + 1 < length and line[index:index + 1] == b'"': - state = 3 - index += 1 - elif ch == b'"': - if state == 3: - state = 0 - result.append(b''.join(current)) - current = [] - if index + 1 < length and line[index:index + 1] != b' ': - raise ParseError('Ending \'"\' must be followed by space or end of string') - else: - raise ParseError('\'"\' must follow \'=\'') - elif ch == b'\\': - if index + 1 == length: - raise ParseError('\'\\\' must not be at the end of the line') - ch = line[index:index + 1] - index += 1 - if ch in ESCAPE_SEQUENCES: - current.append(ch) - else: - d1 = ESCAPE_DIGITS.find(ch) - if d1 < 0: - raise ParseError('Invalid escape sequence \'\\{0}\''.format(ch)) - if index + 1 == length: - raise ParseError('Hex escape sequence cut off at end of line') - ch2 = line[index:index + 1] - d2 = ESCAPE_DIGITS.find(ch2) - index += 1 - if d2 < 0: - raise ParseError('Invalid hex escape sequence \'\\{0}{1}\''.format(ch, ch2)) - result.append(chr(d1 * 16 + d2)) - else: - current.append(ch) - if state == 0: - state = 1 - if state in (1, 2): - if current: - result.append(b''.join(current)) - elif state == 3: - raise ParseError('Unexpected end of string during escaped parameter') - return [to_native(part) for part in result] - - class ROS_api_module: def __init__(self): module_args = dict( @@ -410,7 +337,24 @@ def __init__(self): self.where = None self.query = self.module.params['query'] if self.query: - self.query = self.list_remove_empty(self.split_params(self.query)) + where_index = self.query.find(' WHERE ') + if where_index < 0: + self.query = self.split_params(self.query) + else: + where = self.query[where_index + len(' WHERE '):] + self.query = self.split_params(self.query[:where_index]) + # where must be of the format ' ' + m = re.match(r'^\s*([^ ]+)\s+([^ ]+)\s+(.*)$', where) + if not m: + self.errors("invalid syntax for 'WHERE %s'" % where) + try: + self.where = [ + m.group(1), # attribute + m.group(2), # operator + parse_argument_value(m.group(3).rstrip())[0], # value + ] + except ParseError as exc: + self.errors("invalid syntax for 'WHERE %s': %s" % (where, exc)) try: idx = self.query.index('WHERE') self.where = self.query[idx + 1:] @@ -439,26 +383,14 @@ def __init__(self): else: self.api_get_all() - def list_remove_empty(self, check_list): - while("" in check_list): - check_list.remove("") - return check_list - def list_to_dic(self, ldict): - dict = {} - for p in ldict: - if '=' not in p: - self.errors("missing '=' after '%s'" % p) - p = p.split('=', 1) - if p[1]: - dict[p[0]] = p[1] - return dict + return convert_list_to_dictionary(ldict, skip_empty_values=True, require_assignment=True) def split_params(self, params): if not isinstance(params, str): raise AssertionError('Parameters can only be a string, received %s' % type(params)) try: - return split_routeros(params) + return split_routeros_command(params) except ParseError as e: self.module.fail_json(msg=to_native(e)) @@ -512,11 +444,6 @@ def api_query(self): keys[k] = Key(k) try: if self.where: - if len(self.where) < 3: - self.errors("invalid syntax for 'WHERE %s'" - % ' '.join(self.where)) - - where = [] if self.where[1] == '==': select = self.api_path.select(*keys).where(keys[self.where[0]] == self.where[2]) elif self.where[1] == '!=': @@ -528,11 +455,10 @@ def api_query(self): else: self.errors("'%s' is not operator for 'where'" % self.where[1]) - for row in select: - self.result['message'].append(row) else: - for row in self.api_path.select(*keys): - self.result['message'].append(row) + select = self.api_path.select(*keys) + for row in select: + self.result['message'].append(row) if len(self.result['message']) < 1: msg = "no results for '%s 'query' %s" % (' '.join(self.path), ' '.join(self.query)) diff --git a/plugins/modules/command.py b/plugins/modules/command.py index a96c7f0f..6b0c4998 100644 --- a/plugins/modules/command.py +++ b/plugins/modules/command.py @@ -58,6 +58,11 @@ conditions, the interval indicates how long to wait before trying the command again. default: 1 +seealso: + - ref: ansible_collections.community.routeros.docsite.ssh-guide + description: How to connect to RouterOS devices with SSH + - ref: ansible_collections.community.routeros.docsite.quoting + description: How to quote and unquote commands and arguments ''' EXAMPLES = """ diff --git a/plugins/modules/facts.py b/plugins/modules/facts.py index 6b37f23d..2de3c212 100644 --- a/plugins/modules/facts.py +++ b/plugins/modules/facts.py @@ -27,6 +27,9 @@ not be collected. required: false default: '!config' +seealso: + - ref: ansible_collections.community.routeros.docsite.ssh-guide + description: How to connect to RouterOS devices with SSH ''' EXAMPLES = """ diff --git a/tests/integration/targets/filter_quoting/aliases b/tests/integration/targets/filter_quoting/aliases new file mode 100644 index 00000000..54ea5a3b --- /dev/null +++ b/tests/integration/targets/filter_quoting/aliases @@ -0,0 +1,2 @@ +shippable/posix/group1 +skip/python2.6 diff --git a/tests/integration/targets/filter_quoting/tasks/main.yml b/tests/integration/targets/filter_quoting/tasks/main.yml new file mode 100644 index 00000000..a871cfc8 --- /dev/null +++ b/tests/integration/targets/filter_quoting/tasks/main.yml @@ -0,0 +1,59 @@ +--- +- name: "Test split filter" + assert: + that: + - "'' | community.routeros.split == []" + - "'foo bar' | community.routeros.split == ['foo', 'bar']" + - > + 'foo bar="a b c"' | community.routeros.split == ['foo', 'bar=a b c'] + +- name: "Test split filter error handling" + set_fact: + test: >- + {{ 'a="' | community.routeros.split }} + ignore_errors: true + register: result + +- name: "Verify split filter error handling" + assert: + that: + - >- + result.msg == "Unexpected end of string during escaped parameter" + +- name: "Test quote_argument filter" + assert: + that: + - > + 'a=' | community.routeros.quote_argument == 'a=""' + - > + 'a=b' | community.routeros.quote_argument == 'a=b' + - > + 'a=b c' | community.routeros.quote_argument == 'a="b c"' + - > + 'a=""' | community.routeros.quote_argument == 'a="\\"\\""' + +- name: "Test quote_argument_value filter" + assert: + that: + - > + '' | community.routeros.quote_argument_value == '""' + - > + 'foo' | community.routeros.quote_argument_value == 'foo' + - > + '"foo bar"' | community.routeros.quote_argument_value == '"\\"foo bar\\""' + +- name: "Test join filter" + assert: + that: + - > + ['a=', 'b=c d'] | community.routeros.join == 'a="" b="c d"' + +- name: "Test list_to_dict filter" + assert: + that: + - > + ['a=', 'b=c'] | community.routeros.list_to_dict == {'a': '', 'b': 'c'} + - > + ['a=', 'b=c'] | community.routeros.list_to_dict(skip_empty_values=True) == {'b': 'c'} + - > + ['a', 'b=c'] | community.routeros.list_to_dict(require_assignment=False) == {'a': none, 'b': 'c'} diff --git a/tests/unit/plugins/module_utils/test_quoting.py b/tests/unit/plugins/module_utils/test_quoting.py new file mode 100644 index 00000000..15d6142c --- /dev/null +++ b/tests/unit/plugins/module_utils/test_quoting.py @@ -0,0 +1,272 @@ +# -*- coding: utf-8 -*- + +# Copyright: (c) 2021, Felix Fontein (@felixfontein) +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + +import pytest + +from ansible.module_utils.common.text.converters import to_native + +from ansible_collections.community.routeros.plugins.module_utils.quoting import ( + ParseError, + convert_list_to_dictionary, + join_routeros_command, + parse_argument_value, + quote_routeros_argument, + quote_routeros_argument_value, + split_routeros_command, +) + + +TEST_PARSE_ARGUMENT_VALUE = [ + ('a', {}, ('a', 1)), + ('a ', {'must_match_everything': False}, ('a', 1)), + (r'"a b"', {}, ('a b', 5)), + (r'"b\"f"', {}, ('b"f', 6)), + (r'"\01"', {}, ('\x01', 5)), + (r'"\1F"', {}, ('\x1f', 5)), + (r'"\FF"', {}, (to_native(b'\xff'), 5)), + (r'"\"e"', {}, ('"e', 5)), + (r'"\""', {}, ('"', 4)), + (r'"\\"', {}, ('\\', 4)), + (r'"\?"', {}, ('?', 4)), + (r'"\$"', {}, ('$', 4)), + (r'"\_"', {}, ('_', 4)), + (r'"\a"', {}, ('\a', 4)), + (r'"\b"', {}, ('\b', 4)), + (r'"\f"', {}, (to_native(b'\xff'), 4)), + (r'"\n"', {}, ('\n', 4)), + (r'"\r"', {}, ('\r', 4)), + (r'"\t"', {}, ('\t', 4)), + (r'"\v"', {}, ('\v', 4)), + (r'"b=c"', {}, ('b=c', 5)), + (r'""', {}, ('', 2)), + (r'"" ', {'must_match_everything': False}, ('', 2)), + ("'e", {'start_index': 1}, ('e', 2)), +] + + +@pytest.mark.parametrize("command, kwargs, result", TEST_PARSE_ARGUMENT_VALUE) +def test_parse_argument_value(command, kwargs, result): + result_ = parse_argument_value(command, **kwargs) + print(result_, result) + assert result_ == result + + +TEST_PARSE_ARGUMENT_VALUE_ERRORS = [ + (r'"e', {}, 'Unexpected end of string during escaped parameter'), + ("'e", {}, '"\'" can only be used inside double quotes'), + (r'\FF', {}, 'Escape sequences can only be used inside double quotes'), + (r'\"e', {}, 'Escape sequences can only be used inside double quotes'), + ('e=f', {}, '"=" can only be used inside double quotes'), + ('e$', {}, '"$" can only be used inside double quotes'), + ('e(', {}, '"(" can only be used inside double quotes'), + ('e)', {}, '")" can only be used inside double quotes'), + ('e[', {}, '"[" can only be used inside double quotes'), + ('e{', {}, '"{" can only be used inside double quotes'), + ('e`', {}, '"`" can only be used inside double quotes'), + ('?', {}, '"?" can only be used in escaped form'), + (r'b"', {}, '\'"\' must not appear in an unquoted value'), + (r'""a', {}, "Ending '\"' must be followed by space or end of string"), + (r'"" ', {}, "Unexpected data at end of value"), + ('"\\', {}, r"'\' must not be at the end of the line"), + (r'"\A', {}, r'Hex escape sequence cut off at end of line'), + (r'"\Z"', {}, r"Invalid escape sequence '\Z'"), + (r'"\Aa"', {}, r"Invalid hex escape sequence '\Aa'"), +] + + +@pytest.mark.parametrize("command, kwargs, message", TEST_PARSE_ARGUMENT_VALUE_ERRORS) +def test_parse_argument_value_errors(command, kwargs, message): + with pytest.raises(ParseError) as exc: + parse_argument_value(command, **kwargs) + print(exc.value.args[0], message) + assert exc.value.args[0] == message + + +TEST_SPLIT_ROUTEROS_COMMAND = [ + ('', []), + (' ', []), + (r'a b c', ['a', 'b', 'c']), + (r'a=b c d=e', ['a=b', 'c', 'd=e']), + (r'a="b f" c d=e', ['a=b f', 'c', 'd=e']), + (r'a="b\"f" c="\FF" d="\"e"', ['a=b"f', to_native(b'c=\xff'), 'd="e']), + (r'a="b=c"', ['a=b=c']), + (r'a=b ', ['a=b']), +] + + +@pytest.mark.parametrize("command, result", TEST_SPLIT_ROUTEROS_COMMAND) +def test_split_routeros_command(command, result): + result_ = split_routeros_command(command) + print(result_, result) + assert result_ == result + + +TEST_SPLIT_ROUTEROS_COMMAND_ERRORS = [ + (r'a=', 'Expected value, but found end of string'), + (r'a="b\"f" d="e', 'Unexpected end of string during escaped parameter'), + ('d=\'e', '"\'" can only be used inside double quotes'), + (r'c\FF', r'Found unexpected "\"'), + (r'd=\"e', 'Escape sequences can only be used inside double quotes'), + ('d=e=f', '"=" can only be used inside double quotes'), + ('d=e$', '"$" can only be used inside double quotes'), + ('d=e(', '"(" can only be used inside double quotes'), + ('d=e)', '")" can only be used inside double quotes'), + ('d=e[', '"[" can only be used inside double quotes'), + ('d=e{', '"{" can only be used inside double quotes'), + ('d=e`', '"`" can only be used inside double quotes'), + ('d=?', '"?" can only be used in escaped form'), + (r'a=b"', '\'"\' must not appear in an unquoted value'), + (r'a=""a', "Ending '\"' must be followed by space or end of string"), + ('a="\\', r"'\' must not be at the end of the line"), + (r'a="\Z', r"Invalid escape sequence '\Z'"), + (r'a="\Aa', r"Invalid hex escape sequence '\Aa'"), +] + + +@pytest.mark.parametrize("command, message", TEST_SPLIT_ROUTEROS_COMMAND_ERRORS) +def test_split_routeros_command_errors(command, message): + with pytest.raises(ParseError) as exc: + split_routeros_command(command) + print(exc.value.args[0], message) + assert exc.value.args[0] == message + + +TEST_CONVERT_LIST_TO_DICTIONARY = [ + (['a=b', 'c=d=e', 'e='], {}, {'a': 'b', 'c': 'd=e', 'e': ''}), + (['a=b', 'c=d=e', 'e='], {'skip_empty_values': False}, {'a': 'b', 'c': 'd=e', 'e': ''}), + (['a=b', 'c=d=e', 'e='], {'skip_empty_values': True}, {'a': 'b', 'c': 'd=e'}), + (['a=b', 'c=d=e', 'e=', 'f'], {'require_assignment': False}, {'a': 'b', 'c': 'd=e', 'e': '', 'f': None}), +] + + +@pytest.mark.parametrize("list, kwargs, expected_dict", TEST_CONVERT_LIST_TO_DICTIONARY) +def test_convert_list_to_dictionary(list, kwargs, expected_dict): + result = convert_list_to_dictionary(list, **kwargs) + print(result, expected_dict) + assert result == expected_dict + + +TEST_CONVERT_LIST_TO_DICTIONARY_ERRORS = [ + (['a=b', 'c=d=e', 'e=', 'f'], {}, "missing '=' after 'f'"), +] + + +@pytest.mark.parametrize("list, kwargs, message", TEST_CONVERT_LIST_TO_DICTIONARY_ERRORS) +def test_convert_list_to_dictionary_errors(list, kwargs, message): + with pytest.raises(ParseError) as exc: + result = convert_list_to_dictionary(list, **kwargs) + print(exc.value.args[0], message) + assert exc.value.args[0] == message + + +TEST_JOIN_ROUTEROS_COMMAND = [ + (['a=b', 'c=d=e', 'e=', 'f', 'g=h i j', 'h="h"'], r'a=b c="d=e" e="" f g="h i j" h="\"h\""'), +] + + +@pytest.mark.parametrize("list, expected", TEST_JOIN_ROUTEROS_COMMAND) +def test_join_routeros_command(list, expected): + result = join_routeros_command(list) + print(result, expected) + assert result == expected + + +TEST_QUOTE_ROUTEROS_ARGUMENT = [ + (r'', r''), + (r'a', r'a'), + (r'a=b', r'a=b'), + (r'a=b c', r'a="b c"'), + (r'a="b c"', r'a="\"b c\""'), + (r"a='b", "a=\"'b\""), + (r"a=b'", "a=\"b'\""), + (r'a=""', r'a="\"\""'), +] + + +@pytest.mark.parametrize("argument, expected", TEST_QUOTE_ROUTEROS_ARGUMENT) +def test_quote_routeros_argument(argument, expected): + result = quote_routeros_argument(argument) + print(result, expected) + assert result == expected + + +TEST_QUOTE_ROUTEROS_ARGUMENT_ERRORS = [ + ('a b', 'Attribute names must not contain spaces'), + ('a b=c', 'Attribute names must not contain spaces'), +] + + +@pytest.mark.parametrize("argument, message", TEST_QUOTE_ROUTEROS_ARGUMENT_ERRORS) +def test_quote_routeros_argument_errors(argument, message): + with pytest.raises(ParseError) as exc: + result = quote_routeros_argument(argument) + print(exc.value.args[0], message) + assert exc.value.args[0] == message + + +TEST_QUOTE_ROUTEROS_ARGUMENT_VALUE = [ + (r'', r'""'), + (r";", r'";"'), + (r" ", r'" "'), + (r"=", r'"="'), + (r'a', r'a'), + (r'a=b', r'"a=b"'), + (r'b c', r'"b c"'), + (r'"b c"', r'"\"b c\""'), + ("'b", "\"'b\""), + ("b'", "\"b'\""), + ('"', r'"\""'), + ('\\', r'"\\"'), + ('?', r'"\?"'), + ('$', r'"\$"'), + ('_', r'"\_"'), + ('\a', r'"\a"'), + ('\b', r'"\b"'), + # (to_native(b'\xff'), r'"\f"'), + ('\n', r'"\n"'), + ('\r', r'"\r"'), + ('\t', r'"\t"'), + ('\v', r'"\v"'), + ('\x01', r'"\01"'), + ('\x1f', r'"\1F"'), +] + + +@pytest.mark.parametrize("argument, expected", TEST_QUOTE_ROUTEROS_ARGUMENT_VALUE) +def test_quote_routeros_argument_value(argument, expected): + result = quote_routeros_argument_value(argument) + print(result, expected) + assert result == expected + + +TEST_ROUNDTRIP = [ + {'a': 'b', 'c': 'd'}, + {'script': ''':local host value=[/system identity get name]; +:local date value=[/system clock get date]; +:local day [ :pick $date 4 6 ]; +:local month [ :pick $date 0 3 ]; +:local year [ :pick $date 7 11 ]; +:local name value=($host."-".$day."-".$month."-".$year); +/system backup save name=$name; +/export file=$name; +/tool fetch address="192.168.1.1" user=ros password="PASSWORD" mode=ftp dst-path=("/mikrotik/rsc/".$name.".rsc") src-path=($name.".rsc") upload=yes; +/tool fetch address="192.168.1.1" user=ros password="PASSWORD" mode=ftp dst-path=("/mikrotik/backup/".$name.".backup") src-path=($name.".backup") upload=yes; +'''}, +] + + +@pytest.mark.parametrize("dictionary", TEST_ROUNDTRIP) +def test_roundtrip(dictionary): + argument_list = ['%s=%s' % (k, v) for k, v in dictionary.items()] + command = join_routeros_command(argument_list) + resplit_list = split_routeros_command(command) + print(resplit_list, argument_list) + assert resplit_list == argument_list + re_dictionary = convert_list_to_dictionary(resplit_list) + print(re_dictionary, dictionary) + assert re_dictionary == dictionary diff --git a/tests/unit/plugins/modules/test_api.py b/tests/unit/plugins/modules/test_api.py index 0eabc049..cabb9183 100644 --- a/tests/unit/plugins/modules/test_api.py +++ b/tests/unit/plugins/modules/test_api.py @@ -288,33 +288,3 @@ def test_api_query_and_WHERE_no_cond(self): result = exc.exception.args[0] self.assertEqual(result['changed'], False) - - -TEST_SPLIT_ROUTEROS = [ - ('', []), - (' ', []), - (r'a b c', ['a', 'b', 'c']), - (r'a=b c d=e', ['a=b', 'c', 'd=e']), - (r'a="b f" c d=e', ['a=b f', 'c', 'd=e']), - (r'a="b\"f" c\FF d=\"e', ['a=b"f', '\xff', 'c', 'd="e']), -] - - -@pytest.mark.parametrize("command, result", TEST_SPLIT_ROUTEROS) -def test_split_routeros(command, result): - result_ = api.split_routeros(command) - print(result_, result) - assert result_ == result - - -TEST_SPLIT_ROUTEROS_ERRORS = [ - (r'a="b\"f" c\FF d="e', 'Unexpected end of string during escaped parameter'), -] - - -@pytest.mark.parametrize("command, message", TEST_SPLIT_ROUTEROS_ERRORS) -def test_split_routeros_errors(command, message): - with pytest.raises(api.ParseError) as exc: - api.split_routeros(command) - print(exc.value.args[0], message) - assert exc.value.args[0] == message