Skip to content

Commit

Permalink
PR to fix the ARI inline replace functionality issue observed under i…
Browse files Browse the repository at this point in the history
…ssue 247 (#248)

* fix ari 247 issue

* fix flak8

Signed-off-by: Sumit Jaiswal <[email protected]>

* update readme n fn

Signed-off-by: Sumit Jaiswal <[email protected]>

* fix variables

Signed-off-by: Sumit Jaiswal <[email protected]>

---------

Signed-off-by: Sumit Jaiswal <[email protected]>
  • Loading branch information
justjais authored Jun 25, 2024
1 parent 62beb6d commit b28ba60
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 23 deletions.
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,17 @@ $ ari ram generate -f ram_input_list.txt

```

## ARI supported options to run against

Name | Description
--- | ---
playbook | Scan a playbook (e.g. `ari playbook path/to/playbook.yml` )
collection | Scan a collection (e.g. `ari collection collection.name` )
role | Scan a role (e.g. `ari role role.name` )
project | Scan a project (e.g. `ari project path/to/project`)
taskfile | Scan a taskfile (e.g. `ari taskfile path/to/taskfile.yml`)
ram | Operate the backend data (e.g. `ari ram generate -f input.txt`)

## Installation (for development)

```
Expand Down
23 changes: 18 additions & 5 deletions ansible_risk_insight/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,21 +237,34 @@ def run(self):
line_number_list = []
mutated_yaml_list = []
target_file_path = ''
temp_file_path = ''
for j in range(1, len(nodes)):
node_rules = nodes[j]['rules']
for k in range(len(node_rules)):
for k in reversed(range(len(node_rules))): # loop through from rule 11, as that has the mutation
w007_rule = node_rules[k]
if (w007_rule['rule']['rule_id']).lower() == 'w007':
if not w007_rule.get('verdict') and w007_rule:
break
mutated_yaml = w007_rule['detail']['mutated_yaml']
if mutated_yaml == '':
break
mutated_yaml_list.append(mutated_yaml)
if w007_rule['file'][0] not in index_data[each]:
target_file_path = os.path.join(args.target_name, index_data[each], w007_rule['file'][0])
temp_data = index_data[each]
if w007_rule['file'][0] not in temp_data:
target_file_path = os.path.join(args.target_name, temp_data, w007_rule['file'][0])
if temp_file_path != '' and target_file_path != temp_file_path:
update_the_yaml_target(target_file_path, line_number_list, mutated_yaml_list)
line_number_list = []
mutated_yaml_list = []
mutated_yaml_list.append(mutated_yaml)
temp_file_path = target_file_path
else:
target_file_path = os.path.join(args.target_name, index_data[each])
target_file_path = os.path.join(args.target_name, temp_data)
if temp_file_path != '' and target_file_path != temp_file_path:
update_the_yaml_target(target_file_path, line_number_list, mutated_yaml_list)
line_number_list = []
mutated_yaml_list = []
mutated_yaml_list.append(mutated_yaml)
temp_file_path = target_file_path
line_number = w007_rule['file'][1]
line_number_list.append(line_number)
break # w007 rule with mutated yaml is processed, breaking out of iteration
Expand Down
71 changes: 53 additions & 18 deletions ansible_risk_insight/finder.py
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@ def list_scan_target(root_dir: str, task_num_threshold: int = -1):
return all_targets


def update_line_logic(new_line_content, old_line_content, leading_spaces=0):
def update_line_with_space(new_line_content, old_line_content, leading_spaces=0):
"""
Returns the line of the input lines with mutation, having spaces
exactly same as input yaml lines
Expand Down Expand Up @@ -776,6 +776,30 @@ def check_and_add_diff_lines(start_line, stop_line, lines, data_copy):
data_copy.append(line)


def check_diff_and_copy_olddata_to_newdata(line_number_list, lines, new_data):
"""
Function to find the old lines which weren't mutated by ARI rules,
it need to be copied to new content as is
"""
if line_number_list and isinstance(line_number_list, list):
new_content_last_set = line_number_list[-1]
new_content_last_line = int(new_content_last_set.lstrip("L").split("-")[1])
if new_content_last_line < len(lines) - 1:
for i in range(new_content_last_line, len(lines) - 1):
new_data.append(lines[i])
return new_data


def update_and_append_new_line(new_line, old_line, leading_spaces, data_copy):
"""
Function to get the leading space for the new ARI mutated line, with
its equivaltent old line with space similar to the old line
"""
line_with_adjusted_space = update_line_with_space(new_line, old_line, leading_spaces)
data_copy.append(line_with_adjusted_space)
return ''


def update_the_yaml_target(file_path, line_number_list, new_content_list):
try:
# Read the original YAML file
Expand All @@ -786,26 +810,38 @@ def update_the_yaml_target(file_path, line_number_list, new_content_list):
)
data_copy = populate_new_data_list(data, line_number_list)
stop_line_number = 0
new_lines = []
for iter in range(len(line_number_list)):
line_number = line_number_list[iter]
new_content = new_content_list[iter]
input_line_number = line_number.lstrip("L").split("-")
lines = data.splitlines(keepends=True)
if new_lines:
for i in range(len(new_lines)):
try:
data_copy.append(new_lines.pop(i))
except IndexError:
break
new_lines = new_content.splitlines(keepends=True)
# Update the specific line with new content
j = 0
start_line_number = int(input_line_number[0])
if stop_line_number > 0 and (start_line_number - stop_line_number) > 1:
check_and_add_diff_lines(stop_line_number, start_line_number, lines, data_copy)
stop_line_number = int(input_line_number[1])
diff_in_lines = stop_line_number - start_line_number
temp_content = []
data_copy.append('\n')
for i in range(start_line_number - 1, stop_line_number - 1):
start = start_line_number - 1
end = stop_line_number - 1
for i in range(start, end):
line_number = i
if len(lines) == i or j >= len(new_lines):
if len(lines) == i:
break
try:
# always pop 1st element of the new lines list
new_line_content = new_lines.pop(0)
except IndexError:
break
new_line_content = new_lines[j]
if 0 <= line_number < len(lines):
# Preserve the original indentation
old_line_content = lines[line_number]
Expand All @@ -819,14 +855,14 @@ def update_the_yaml_target(file_path, line_number_list, new_content_list):
data_copy.append(lines[line_number])
else:
new_line_key = new_line_content.split(':')
for k in range(start_line_number - 1, stop_line_number - 1):
new_key = new_line_key[0].strip(' ')
for k in range(start, end):
if k < len(lines):
old_line_key = lines[k].split(':')
if '---' in old_line_key[0]:
continue
old_key = old_line_key[0].strip(' ')
new_key = new_line_key[0].strip(' ')
if '-' in old_line_key[0] and ':' not in lines[k]:
if '-' in old_line_key[0] and ':' not in lines[k] and '-' in new_key:
# diff_in_lines = len(lines) - len(new_lines)
leading_spaces = len(lines[k]) - len(lines[k].lstrip())
if diff_in_lines > len(lines):
Expand All @@ -838,24 +874,24 @@ def update_the_yaml_target(file_path, line_number_list, new_content_list):
lines.pop(i)
else:
break
lines[k] = update_line_logic(new_line_content, lines[k], leading_spaces)
data_copy.append(lines[k])
new_line_content = update_and_append_new_line(new_line_content, lines[k], leading_spaces, data_copy)
break
elif old_key == new_key:
lines[k] = update_line_logic(new_line_content, lines[k])
data_copy.append(lines[k])
new_line_content = update_and_append_new_line(new_line_content, lines[k], 0, data_copy)
break
elif old_key.rstrip('\n') == new_key:
lines[k] = update_line_logic(new_line_content, lines[k])
data_copy.append(lines[k])
new_line_content = update_and_append_new_line(new_line_content, lines[k], 0, data_copy)
break
elif old_key.rstrip('\n') in new_key.split('.'):
lines[k] = update_line_logic(new_line_content, lines[k])
data_copy.append(lines[k])
new_line_content = update_and_append_new_line(new_line_content, lines[k], 0, data_copy)
break
if new_line_content: # if there wasn't a match with old line, so this seems updated by ARI and added to w/o any change
data_copy.append(new_line_content)
else:
return IndexError("Line number out of range.")
j += 1
# check for diff b/w new content and old contents,
# and copy the old content that's not updated by ARI mutation
data_copy = check_diff_and_copy_olddata_to_newdata(line_number_list, lines, data_copy)
# Join the lines back to a single string
updated_data = ''.join(data_copy)
# Parse the updated YAML content to ensure it is valid
Expand All @@ -866,4 +902,3 @@ def update_the_yaml_target(file_path, line_number_list, new_content_list):
yaml.dump(updated_parsed_data, file)
except Exception as ex:
logger.warning("YAML LINES: ARI fix update yaml by lines failed for file: '%s', with error: '%s'", file_path, ex)
raise ex

0 comments on commit b28ba60

Please sign in to comment.