From 8c1cc3457d0a3d96ca74a169638c9fa0354bf30b Mon Sep 17 00:00:00 2001 From: nishipy <41185206+nishipy@users.noreply.github.com> Date: Tue, 12 Jul 2022 22:52:53 +0900 Subject: [PATCH] no-same-owner: refactor to use matchtask() (#2233) Signed-off-by: nishipy Co-authored-by: Jacob Floyd --- .../role_for_no_same_owner/tasks/fail.yml | 10 ++ src/ansiblelint/rules/no_same_owner.py | 106 +++++++----------- 2 files changed, 50 insertions(+), 66 deletions(-) diff --git a/examples/roles/role_for_no_same_owner/tasks/fail.yml b/examples/roles/role_for_no_same_owner/tasks/fail.yml index bc286e4330..76208ebb82 100644 --- a/examples/roles/role_for_no_same_owner/tasks/fail.yml +++ b/examples/roles/role_for_no_same_owner/tasks/fail.yml @@ -19,6 +19,16 @@ ansible.posix.synchronize: src: dummy dest: dummy + rescue: + - name: synchronize-in-rescue + ansible.posix.synchronize: + src: dummy + dest: dummy + always: + - name: synchronize-in-always + ansible.posix.synchronize: + src: dummy + dest: dummy - name: unarchive-bz2 ansible.builtin.unarchive: diff --git a/src/ansiblelint/rules/no_same_owner.py b/src/ansiblelint/rules/no_same_owner.py index 6d4a89b1ca..268ae8842a 100644 --- a/src/ansiblelint/rules/no_same_owner.py +++ b/src/ansiblelint/rules/no_same_owner.py @@ -1,12 +1,16 @@ """Optional rule for avoiding keeping owner/group when transferring files.""" import re import sys -from typing import Any, List +from typing import TYPE_CHECKING, Any, Dict, Union + +from ansible.utils.sentinel import Sentinel -from ansiblelint.errors import MatchError -from ansiblelint.file_utils import Lintable from ansiblelint.rules import AnsibleLintRule -from ansiblelint.utils import LINE_NUMBER_KEY + +if TYPE_CHECKING: + from typing import Optional + + from ansiblelint.file_utils import Lintable class NoSameOwnerRule(AnsibleLintRule): @@ -26,83 +30,53 @@ class NoSameOwnerRule(AnsibleLintRule): severity = "LOW" tags = ["opt-in"] - def matchplay(self, file: Lintable, data: Any) -> List[MatchError]: - """Return matches found for a specific playbook.""" - results: List[MatchError] = [] - if file.kind not in ("tasks", "handlers", "playbook"): - return results - - results.extend(self.handle_play(file, data)) - return results - - def handle_play(self, lintable: Lintable, task: Any) -> List[MatchError]: - """Process a play.""" - results = [] - if "block" in task: - results.extend(self.handle_playlist(lintable, task["block"])) - else: - results.extend(self.handle_task(lintable, task)) - return results - - def handle_playlist(self, lintable: Lintable, playlist: Any) -> List[MatchError]: - """Process a playlist.""" - results = [] - for play in playlist: - results.extend(self.handle_play(lintable, play)) - return results - - def handle_task(self, lintable: Lintable, task: Any) -> List[MatchError]: - """Process a task.""" - results = [] - action = [e for e in ("synchronize", "ansible.posix.synchronize") if e in task] - if action: - if self.handle_synchronize(task, action[0]): - print(task) - results.append( - self.create_matcherror( - filename=lintable, linenumber=task[LINE_NUMBER_KEY] - ) - ) - action = [e for e in ("unarchive", "ansible.builtin.unarchive") if e in task] - if action: - if self.handle_unarchive(task, action[0]): - results.append( - self.create_matcherror( - filename=lintable, linenumber=task[LINE_NUMBER_KEY] - ) - ) - - return results + def matchtask( + self, task: Dict[str, Any], file: "Optional[Lintable]" = None + ) -> Union[bool, str]: + """Return matches for a task.""" + action = task.get("action") + if not isinstance(action, dict): + return False + + module = action["__ansible_module__"] + + if module in ["synchronize", "ansible.posix.synchronize"]: + return self.handle_synchronize(task, action) + + if module in ["unarchive", "ansible.builtin.unarchive"]: + return self.handle_unarchive(task, action) + + return False @staticmethod - def handle_synchronize(task: Any, action: str) -> bool: + def handle_synchronize(task: Any, action: Dict[str, Any]) -> bool: """Process a synchronize task.""" - if task.get("delegate_to") is not None: + if task.get("delegate_to") != Sentinel: return False - synchronize = task[action] - archive = synchronize.get("archive", True) - - if synchronize.get("owner", archive) or synchronize.get("group", archive): + archive = action.get("archive", True) + if action.get("owner", archive) or action.get("group", archive): return True return False @staticmethod - def handle_unarchive(task: Any, action: str) -> bool: + def handle_unarchive(task: Any, action: Dict[str, Any]) -> bool: """Process unarchive task.""" - unarchive = task[action] delegate_to = task.get("delegate_to") - if ( delegate_to == "localhost" or delegate_to != "localhost" - and "remote_src" not in unarchive + and not action.get("remote_src") ): - if unarchive["src"].endswith("zip"): - if "-X" in unarchive.get("extra_opts", []): + src = action.get("src") + if not isinstance(src, str): + return False + + if src.endswith("zip"): + if "-X" in action.get("extra_opts", []): return True - if re.search(r".*\.tar(\.(gz|bz2|xz))?$", unarchive["src"]): - if "--no-same-owner" not in unarchive.get("extra_opts", []): + if re.search(r".*\.tar(\.(gz|bz2|xz))?$", src): + if "--no-same-owner" not in action.get("extra_opts", []): return True return False @@ -119,7 +93,7 @@ def handle_unarchive(task: Any, action: str) -> bool: ("test_file", "failures"), ( pytest.param( - "examples/roles/role_for_no_same_owner/tasks/fail.yml", 10, id="fail" + "examples/roles/role_for_no_same_owner/tasks/fail.yml", 12, id="fail" ), pytest.param( "examples/roles/role_for_no_same_owner/tasks/pass.yml", 0, id="pass"