Skip to content

Commit

Permalink
Add YAML path (to play/task) discovery utils for Transforms to use (#…
Browse files Browse the repository at this point in the history
…1970)

Co-authored-by: Sorin Sbarnea <[email protected]>
  • Loading branch information
cognifloyd and ssbarnea authored Mar 18, 2022
1 parent dbd10e7 commit edc0e04
Show file tree
Hide file tree
Showing 5 changed files with 870 additions and 2 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/tox.yml
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ jobs:
WSLENV: FORCE_COLOR:PYTEST_REQPASS:TOXENV:TOX_PARALLEL_NO_SPINNER
# Number of expected test passes, safety measure for accidental skip of
# tests. Update value if you add/remove tests.
PYTEST_REQPASS: 399
PYTEST_REQPASS: 503

steps:
- name: Activate WSL1
Expand Down
17 changes: 17 additions & 0 deletions examples/playbooks/tasks/empty_blocks.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
---
- name: a named block task
block:
- name: an assertion
ansible.builtin.assert:
fail_msg: foo
rescue: # null
always: {}
- block:
- name: another assertion
ansible.builtin.assert:
fail_msg: bar
rescue: {}
always:
- name: yet another assertion
ansible.builtin.assert:
fail_msg: baz
1 change: 1 addition & 0 deletions examples/playbooks/tasks/x.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@
action: funny value=clown
args:
key: value
- # a second null task, validates yaml_utils.get_path_to_task
217 changes: 217 additions & 0 deletions src/ansiblelint/yaml_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,22 @@
import re
from io import StringIO
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Iterator,
List,
Optional,
Pattern,
Set,
Tuple,
Union,
cast,
)

import ruamel.yaml.events
from ruamel.yaml.comments import CommentedMap, CommentedSeq
from ruamel.yaml.constructor import RoundTripConstructor
from ruamel.yaml.emitter import Emitter, ScalarAnalysis

Expand All @@ -31,10 +34,15 @@
from yamllint.config import YamlLintConfig

import ansiblelint.skip_utils
from ansiblelint.constants import NESTED_TASK_KEYS, PLAYBOOK_TASK_KEYWORDS
from ansiblelint.errors import MatchError
from ansiblelint.file_utils import Lintable
from ansiblelint.utils import get_action_tasks, normalize_task, parse_yaml_linenumbers

if TYPE_CHECKING:
# noinspection PyProtectedMember
from ruamel.yaml.comments import LineCol # pylint: disable=ungrouped-imports

_logger = logging.getLogger(__name__)

YAMLLINT_CONFIG = """
Expand Down Expand Up @@ -225,6 +233,215 @@ def _nested_items_path(
)


def get_path_to_play(
lintable: Lintable,
line_number: int, # 1-based
ruamel_data: Union[CommentedMap, CommentedSeq],
) -> List[Union[str, int]]:
"""Get the path to the play in the given file at the given line number."""
if line_number < 1:
raise ValueError(f"expected line_number >= 1, got {line_number}")
if lintable.kind != "playbook" or not isinstance(ruamel_data, CommentedSeq):
return []
lc: "LineCol" # lc uses 0-based counts # pylint: disable=invalid-name
# line_number is 1-based. Convert to 0-based.
line_index = line_number - 1

prev_play_line_index = ruamel_data.lc.line
last_play_index = len(ruamel_data)
for play_index, play in enumerate(ruamel_data):
next_play_index = play_index + 1
if last_play_index > next_play_index:
next_play_line_index = ruamel_data[next_play_index].lc.line
else:
next_play_line_index = None

lc = play.lc # pylint: disable=invalid-name
assert isinstance(lc.line, int)
if lc.line == line_index:
return [play_index]
if play_index > 0 and prev_play_line_index < line_index < lc.line:
return [play_index - 1]
# The previous play check (above) can't catch the last play,
# so, handle the last play separately.
if (
next_play_index == last_play_index
and line_index > lc.line
and (next_play_line_index is None or line_index < next_play_line_index)
):
# part of this (last) play
return [play_index]
prev_play_line_index = play.lc.line
return []


def get_path_to_task(
lintable: Lintable,
line_number: int, # 1-based
ruamel_data: Union[CommentedMap, CommentedSeq],
) -> List[Union[str, int]]:
"""Get the path to the task in the given file at the given line number."""
if line_number < 1:
raise ValueError(f"expected line_number >= 1, got {line_number}")
if lintable.kind in ("tasks", "handlers"):
assert isinstance(ruamel_data, CommentedSeq)
return _get_path_to_task_in_tasks_block(line_number, ruamel_data)
if lintable.kind == "playbook":
assert isinstance(ruamel_data, CommentedSeq)
return _get_path_to_task_in_playbook(line_number, ruamel_data)
# if lintable.kind in ["yaml", "requirements", "vars", "meta", "reno"]:

return []


def _get_path_to_task_in_playbook(
line_number: int, # 1-based
ruamel_data: CommentedSeq,
) -> List[Union[str, int]]:
"""Get the path to the task in the given playbook data at the given line number."""
last_play_index = len(ruamel_data)
for play_index, play in enumerate(ruamel_data):
next_play_index = play_index + 1
if last_play_index > next_play_index:
next_play_line_index = ruamel_data[next_play_index].lc.line
else:
next_play_line_index = None

play_keys = list(play.keys())
for tasks_keyword in PLAYBOOK_TASK_KEYWORDS:
if not play.get(tasks_keyword):
continue

try:
next_keyword = play_keys[play_keys.index(tasks_keyword) + 1]
except IndexError:
next_block_line_index = None
else:
next_block_line_index = play.lc.data[next_keyword][0]
# last_line_number_in_block is 1-based; next_*_line_index is 0-based
# next_*_line_index - 1 to get line before next_*_line_index.
# Then + 1 to make it a 1-based number.
# So, last_line_number_in_block = next_*_line_index - 1 + 1
if next_block_line_index is not None:
last_line_number_in_block = next_block_line_index
elif next_play_line_index is not None:
last_line_number_in_block = next_play_line_index
else:
last_line_number_in_block = None

task_path = _get_path_to_task_in_tasks_block(
line_number, play[tasks_keyword], last_line_number_in_block
)
if task_path:
# mypy gets confused without this typehint
tasks_keyword_path: List[Union[int, str]] = [
play_index,
tasks_keyword,
]
return tasks_keyword_path + list(task_path)
# line_number is before first play or no tasks keywords in any of the plays
return []


def _get_path_to_task_in_tasks_block(
line_number: int, # 1-based
tasks_block: CommentedSeq,
last_line_number: Optional[int] = None, # 1-based
) -> List[Union[str, int]]:
"""Get the path to the task in the given tasks block at the given line number."""
task: Optional[CommentedMap]
# line_number and last_line_number are 1-based. Convert to 0-based.
line_index = line_number - 1
last_line_index = None if last_line_number is None else last_line_number - 1

# lc (LineCol) uses 0-based counts
prev_task_line_index = tasks_block.lc.line
last_task_index = len(tasks_block)
for task_index, task in enumerate(tasks_block):
next_task_index = task_index + 1
if last_task_index > next_task_index:
if tasks_block[next_task_index] is not None:
next_task_line_index = tasks_block[next_task_index].lc.line
else:
next_task_line_index = tasks_block.lc.item(next_task_index)[0]
else:
next_task_line_index = None

if task is None:
# create a dummy task to represent the null task
task = CommentedMap()
task.lc.line, task.lc.col = tasks_block.lc.item(task_index)

nested_task_keys = set(task.keys()).intersection(set(NESTED_TASK_KEYS))
if nested_task_keys:
subtask_path = _get_path_to_task_in_nested_tasks_block(
line_number, task, nested_task_keys, next_task_line_index
)
if subtask_path:
# mypy gets confused without this typehint
task_path: List[Union[str, int]] = [task_index]
return task_path + list(subtask_path)

assert isinstance(task.lc.line, int)
if task.lc.line == line_index:
return [task_index]
if task_index > 0 and prev_task_line_index < line_index < task.lc.line:
return [task_index - 1]
# The previous task check can't catch the last task,
# so, handle the last task separately (also after subtask checks).
# pylint: disable=too-many-boolean-expressions
if (
next_task_index == last_task_index
and line_index > task.lc.line
and (next_task_line_index is None or line_index < next_task_line_index)
and (last_line_index is None or line_index <= last_line_index)
):
# part of this (last) task
return [task_index]
prev_task_line_index = task.lc.line
# line is not part of this tasks block
return []


def _get_path_to_task_in_nested_tasks_block(
line_number: int, # 1-based
task: CommentedMap,
nested_task_keys: Set[str],
next_task_line_index: Optional[int] = None, # 0-based
) -> List[Union[str, int]]:
"""Get the path to the task in the given nested tasks block."""
# loop through the keys in line order
task_keys = list(task.keys())
task_keys_by_index = dict(enumerate(task_keys))
for task_index, task_key in enumerate(task_keys):
nested_task_block = task[task_key]
if task_key not in nested_task_keys or not nested_task_block:
continue
next_task_key = task_keys_by_index.get(task_index + 1, None)
if next_task_key is not None:
next_task_key_line_index = task.lc.data[next_task_key][0]
else:
next_task_key_line_index = None
# last_line_number_in_block is 1-based; next_*_line_index is 0-based
# next_*_line_index - 1 to get line before next_*_line_index.
# Then + 1 to make it a 1-based number.
# So, last_line_number_in_block = next_*_line_index - 1 + 1
last_line_number_in_block = (
next_task_key_line_index
if next_task_key_line_index is not None
else next_task_line_index
)
subtask_path = _get_path_to_task_in_tasks_block(
line_number,
nested_task_block,
last_line_number_in_block, # 1-based
)
if subtask_path:
return [task_key] + list(subtask_path)
# line is not part of this nested tasks block
return []


class OctalIntYAML11(ScalarInt):
"""OctalInt representation for YAML 1.1."""

Expand Down
Loading

0 comments on commit edc0e04

Please sign in to comment.