Skip to content

Commit

Permalink
Add optional rule for no-same-owner (#1450)
Browse files Browse the repository at this point in the history
Add a new rule that identifies use of the same owner when transferring
files between hosts as a violation.

This rule is disabled by default and user needs to manually activate
it inside the config as it is needed only for very particular
use-cases.

Reference: https://zuul-ci.org/docs/zuul-jobs/policy.html#preservation-of-owner-between-executor-and-remote
  • Loading branch information
ssbarnea authored Mar 10, 2021
1 parent 56eb5b3 commit 4a94899
Show file tree
Hide file tree
Showing 9 changed files with 251 additions and 6 deletions.
5 changes: 5 additions & 0 deletions .ansible-lint
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ skip_list:
- skip_this_tag
- git-latest

# Any rule that has the 'opt-in' tag will not be loaded unless its 'id' is
# mentioned in the enable_list:
enable_list:
- no-same-owner

# Report only a subset of tags and fully ignore any others
# tags:
# - var-spacing
Expand Down
59 changes: 59 additions & 0 deletions examples/roles/role_for_no_same_owner/tasks/fail.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
- name: block
block:
- name: synchonize-in-block
synchronize:
src: dummy
dest: dummy

- name: synchronize
synchronize:
src: dummy
dest: dummy

- name: nested-block
block:
- block:
- name: synchonize-in-deep-block
synchronize:
src: dummy
dest: dummy

- name: unarchive-bz2
unarchive:
src: "{{ file }}.tar.bz2"
dest: "dummy"

- name: unarchive delegated
unarchive:
src: "{{ file }}.tar.bz2"
dest: "dummy"
delegate_to: localhost

- name: unarchive-gz
unarchive:
src: "{{ file }}.tar.gz"
dest: "dummy"

- name: unarchive-tar
unarchive:
src: "{{ file }}.tar"
dest: "dummy"

- name: unarchive-xz
unarchive:
src: "{{ file }}.tar.xz"
dest: "dummy"

- name: unarchive-zip
unarchive:
src: "{{ file }}.zip"
dest: dummy
extra_opts:
- '-X'

- name: unarchive-zip-same-owner
unarchive:
src: "{{ file }}.zip"
dest: dummy
extra_opts:
- '-X'
31 changes: 31 additions & 0 deletions examples/roles/role_for_no_same_owner/tasks/pass.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
- name: syncronize-delegate
synchronize:
src: dummy
dest: dummy
delegate_to: localhost

- name: synchronize-no-same-owner
synchronize:
src: dummy
dest: dummy
owner: false
group: false

- name: unarchive-no-same-owner
unarchive:
src: "{{ file }}.tar.gz"
dest: dummy
extra_opts:
- '--no-same-owner'

- name: unarchive-remote-src
unarchive:
src: "{{ file }}.tar.gz"
dest: dummy
extra_opts:
- '--no-same-owner'

- name: unarchive-unknown-file-ending
unarchive:
src: "{{ file }}"
dest: "dummy"
8 changes: 8 additions & 0 deletions src/ansiblelint/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,13 @@ def get_cli_parser() -> argparse.ArgumentParser:
help="only warn about these rules, unless overridden in "
"config file defaults to 'experimental'",
)
parser.add_argument(
'--enable-list',
dest='enable_list',
default=[],
action='append',
help="activate optional rules by their tag name",
)
# Do not use store_true/store_false because they create opposite defaults.
parser.add_argument(
'--nocolor',
Expand Down Expand Up @@ -313,6 +320,7 @@ def merge_config(file_config: Dict[Any, Any], cli_config: Namespace) -> Namespac
'warn_list': ['experimental'],
'mock_modules': [],
'mock_roles': [],
'enable_list': [],
}

scalar_map = {"loop_var_prefix": None, "project_dir": None}
Expand Down
1 change: 1 addition & 0 deletions src/ansiblelint/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
offline=False,
project_dir=None,
extra_vars=None,
enable_list=[],
skip_action_validation=True,
)

Expand Down
130 changes: 130 additions & 0 deletions src/ansiblelint/rules/NoSameOwnerRule.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
"""Optional rule for avoiding keeping owner/group when transferring files."""
import re
import sys
from typing import Any, List

from ansiblelint.errors import MatchError
from ansiblelint.file_utils import Lintable
from ansiblelint.rules import AnsibleLintRule


class NoSameOwnerRule(AnsibleLintRule):

id = 'no-same-owner'
shortdesc = 'Owner should not be kept between different hosts'
description = """
Optional rule that hihglight dangers of assuming that user/group on the remote
machines may not exist on ansible controller or vice versa. Owner and group
should not be preserved when transferring files between them.
This rule is not enabled by default and was inspired by Zuul execution policy.
See:
https://zuul-ci.org/docs/zuul-jobs/policy.html\
#preservation-of-owner-between-executor-and-remote
"""
severity = 'LOW'
tags = ['opt-in']

def matchplay(self, file: Lintable, data: Any) -> List[MatchError]:
"""Return matches found for a specific playbook."""
results: List[MatchError] = []
if file.kind not in ('tasks', 'handlers', 'playbook'):
return results

results.extend(self.handle_play(file, data))
return results

def handle_play(self, lintable: Lintable, task: Any) -> List[MatchError]:
"""Process a play."""
results = []
if 'block' in task:
results.extend(self.handle_playlist(lintable, task['block']))
else:
results.extend(self.handle_task(lintable, task))
return results

def handle_playlist(self, lintable: Lintable, playlist: Any) -> List[MatchError]:
"""Process a playlist."""
results = []
for play in playlist:
results.extend(self.handle_play(lintable, play))
return results

def handle_task(self, lintable: Lintable, task: Any) -> List[MatchError]:
"""Process a task."""
results = []
if 'synchronize' in task:
if self.handle_synchronize(task):
print(task)
results.append(
self.create_matcherror(
filename=lintable, linenumber=task['__line__']
)
)
elif 'unarchive' in task:
if self.handle_unarchive(task):
results.append(
self.create_matcherror(
filename=lintable, linenumber=task['__line__']
)
)

return results

@staticmethod
def handle_synchronize(task: Any) -> bool:
"""Process a synchronize task."""
if task.get('delegate_to') is not None:
return False

synchronize = task['synchronize']
archive = synchronize.get('archive', True)

if synchronize.get('owner', archive) or synchronize.get('group', archive):
return True
return False

@staticmethod
def handle_unarchive(task: Any) -> bool:
"""Process unarchive task."""
unarchive = task['unarchive']
delegate_to = task.get('delegate_to')

if (
delegate_to == 'localhost'
or delegate_to != 'localhost'
and 'remote_src' not in unarchive
):
if unarchive['src'].endswith('zip'):
if '-X' in unarchive.get('extra_opts', []):
return True
if re.search(r'.*\.tar(\.(gz|bz2|xz))?$', unarchive['src']):
if '--no-same-owner' not in unarchive.get('extra_opts', []):
return True
return False


# testing code to be loaded only with pytest or when executed the rule file
if "pytest" in sys.modules:

import pytest

from ansiblelint.runner import Runner # pylint: disable=ungrouped-imports

@pytest.mark.parametrize(
("test_file", "failures"),
(
pytest.param(
'examples/roles/role_for_no_same_owner/tasks/fail.yml', 10, id='fail'
),
pytest.param(
'examples/roles/role_for_no_same_owner/tasks/pass.yml', 0, id='pass'
),
),
)
def test_no_same_owner_rule(default_rules_collection, test_file, failures) -> None:
"""Test rule matches."""
results = Runner(test_file, rules=default_rules_collection).run()
assert len(results) == failures
for result in results:
assert result.message == NoSameOwnerRule.shortdesc
11 changes: 8 additions & 3 deletions src/ansiblelint/rules/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
LoadingFailureRule,
RuntimeErrorRule,
)
from ansiblelint.config import options
from ansiblelint.errors import MatchError
from ansiblelint.file_utils import Lintable
from ansiblelint.skip_utils import append_skipped_rules, get_rule_skips_from_line
Expand Down Expand Up @@ -186,8 +187,9 @@ def load_plugins(directory: str) -> List[AnsibleLintRule]:


class RulesCollection:
def __init__(self, rulesdirs: Optional[List[str]] = None) -> None:
def __init__(self, rulesdirs: Optional[List[str]] = None, options=options) -> None:
"""Initialize a RulesCollection instance."""
self.options = options
if rulesdirs is None:
rulesdirs = []
self.rulesdirs = ansiblelint.file_utils.expand_paths_vars(rulesdirs)
Expand All @@ -199,11 +201,14 @@ def __init__(self, rulesdirs: Optional[List[str]] = None) -> None:
)
for rulesdir in self.rulesdirs:
_logger.debug("Loading rules from %s", rulesdir)
self.extend(load_plugins(rulesdir))
for rule in load_plugins(rulesdir):
self.register(rule)
self.rules = sorted(self.rules)

def register(self, obj: AnsibleLintRule) -> None:
self.rules.append(obj)
# We skip opt-in rules which were not manually enabled
if 'opt-in' not in obj.tags or obj.id in self.options.enable_list:
self.rules.append(obj)

def __iter__(self) -> Iterator[BaseRule]:
"""Return the iterator over the rules in the RulesCollection."""
Expand Down
4 changes: 3 additions & 1 deletion src/ansiblelint/testing/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ def runner(play_file_path, default_rules_collection):
def default_rules_collection():
"""Return default rule collection."""
assert os.path.isdir(DEFAULT_RULESDIR)
return RulesCollection(rulesdirs=[DEFAULT_RULESDIR])
# For testing we want to manually enable opt-in rules
options.enable_list = ['no-same-owner']
return RulesCollection(rulesdirs=[DEFAULT_RULESDIR], options=options)


@pytest.fixture
Expand Down
8 changes: 6 additions & 2 deletions test/TestRulesCollection.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import pytest

from ansiblelint.config import options
from ansiblelint.file_utils import Lintable
from ansiblelint.rules import RulesCollection
from ansiblelint.testing import run_ansible_lint
Expand Down Expand Up @@ -119,9 +120,12 @@ def test_rich_rule_listing():
def test_rules_id_format() -> None:
"""Assure all our rules have consistent format."""
rule_id_re = re.compile("^[a-z-]{4,30}$")
rules = RulesCollection([os.path.abspath('./src/ansiblelint/rules')])
options.enable_list = ['no-same-owner']
rules = RulesCollection(
[os.path.abspath('./src/ansiblelint/rules')], options=options
)
for rule in rules:
assert rule_id_re.match(
rule.id
), f"R rule id {rule.id} did not match our required format."
assert len(rules) == 37
assert len(rules) == 38

0 comments on commit 4a94899

Please sign in to comment.