Skip to content

Commit

Permalink
no-same-owner: refactor to use matchtask() (#2233)
Browse files Browse the repository at this point in the history
Signed-off-by: nishipy <[email protected]>

Co-authored-by: Jacob Floyd <[email protected]>
  • Loading branch information
nishipy and cognifloyd authored Jul 12, 2022
1 parent 3aa0b27 commit 8c1cc34
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 66 deletions.
10 changes: 10 additions & 0 deletions examples/roles/role_for_no_same_owner/tasks/fail.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,16 @@
ansible.posix.synchronize:
src: dummy
dest: dummy
rescue:
- name: synchronize-in-rescue
ansible.posix.synchronize:
src: dummy
dest: dummy
always:
- name: synchronize-in-always
ansible.posix.synchronize:
src: dummy
dest: dummy

- name: unarchive-bz2
ansible.builtin.unarchive:
Expand Down
106 changes: 40 additions & 66 deletions src/ansiblelint/rules/no_same_owner.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
"""Optional rule for avoiding keeping owner/group when transferring files."""
import re
import sys
from typing import Any, List
from typing import TYPE_CHECKING, Any, Dict, Union

from ansible.utils.sentinel import Sentinel

from ansiblelint.errors import MatchError
from ansiblelint.file_utils import Lintable
from ansiblelint.rules import AnsibleLintRule
from ansiblelint.utils import LINE_NUMBER_KEY

if TYPE_CHECKING:
from typing import Optional

from ansiblelint.file_utils import Lintable


class NoSameOwnerRule(AnsibleLintRule):
Expand All @@ -26,83 +30,53 @@ class NoSameOwnerRule(AnsibleLintRule):
severity = "LOW"
tags = ["opt-in"]

def matchplay(self, file: Lintable, data: Any) -> List[MatchError]:
"""Return matches found for a specific playbook."""
results: List[MatchError] = []
if file.kind not in ("tasks", "handlers", "playbook"):
return results

results.extend(self.handle_play(file, data))
return results

def handle_play(self, lintable: Lintable, task: Any) -> List[MatchError]:
"""Process a play."""
results = []
if "block" in task:
results.extend(self.handle_playlist(lintable, task["block"]))
else:
results.extend(self.handle_task(lintable, task))
return results

def handle_playlist(self, lintable: Lintable, playlist: Any) -> List[MatchError]:
"""Process a playlist."""
results = []
for play in playlist:
results.extend(self.handle_play(lintable, play))
return results

def handle_task(self, lintable: Lintable, task: Any) -> List[MatchError]:
"""Process a task."""
results = []
action = [e for e in ("synchronize", "ansible.posix.synchronize") if e in task]
if action:
if self.handle_synchronize(task, action[0]):
print(task)
results.append(
self.create_matcherror(
filename=lintable, linenumber=task[LINE_NUMBER_KEY]
)
)
action = [e for e in ("unarchive", "ansible.builtin.unarchive") if e in task]
if action:
if self.handle_unarchive(task, action[0]):
results.append(
self.create_matcherror(
filename=lintable, linenumber=task[LINE_NUMBER_KEY]
)
)

return results
def matchtask(
self, task: Dict[str, Any], file: "Optional[Lintable]" = None
) -> Union[bool, str]:
"""Return matches for a task."""
action = task.get("action")
if not isinstance(action, dict):
return False

module = action["__ansible_module__"]

if module in ["synchronize", "ansible.posix.synchronize"]:
return self.handle_synchronize(task, action)

if module in ["unarchive", "ansible.builtin.unarchive"]:
return self.handle_unarchive(task, action)

return False

@staticmethod
def handle_synchronize(task: Any, action: str) -> bool:
def handle_synchronize(task: Any, action: Dict[str, Any]) -> bool:
"""Process a synchronize task."""
if task.get("delegate_to") is not None:
if task.get("delegate_to") != Sentinel:
return False

synchronize = task[action]
archive = synchronize.get("archive", True)

if synchronize.get("owner", archive) or synchronize.get("group", archive):
archive = action.get("archive", True)
if action.get("owner", archive) or action.get("group", archive):
return True
return False

@staticmethod
def handle_unarchive(task: Any, action: str) -> bool:
def handle_unarchive(task: Any, action: Dict[str, Any]) -> bool:
"""Process unarchive task."""
unarchive = task[action]
delegate_to = task.get("delegate_to")

if (
delegate_to == "localhost"
or delegate_to != "localhost"
and "remote_src" not in unarchive
and not action.get("remote_src")
):
if unarchive["src"].endswith("zip"):
if "-X" in unarchive.get("extra_opts", []):
src = action.get("src")
if not isinstance(src, str):
return False

if src.endswith("zip"):
if "-X" in action.get("extra_opts", []):
return True
if re.search(r".*\.tar(\.(gz|bz2|xz))?$", unarchive["src"]):
if "--no-same-owner" not in unarchive.get("extra_opts", []):
if re.search(r".*\.tar(\.(gz|bz2|xz))?$", src):
if "--no-same-owner" not in action.get("extra_opts", []):
return True
return False

Expand All @@ -119,7 +93,7 @@ def handle_unarchive(task: Any, action: str) -> bool:
("test_file", "failures"),
(
pytest.param(
"examples/roles/role_for_no_same_owner/tasks/fail.yml", 10, id="fail"
"examples/roles/role_for_no_same_owner/tasks/fail.yml", 12, id="fail"
),
pytest.param(
"examples/roles/role_for_no_same_owner/tasks/pass.yml", 0, id="pass"
Expand Down

0 comments on commit 8c1cc34

Please sign in to comment.