Skip to content

Commit

Permalink
draft commit for refactoring checkers
Browse files Browse the repository at this point in the history
  • Loading branch information
Spedoske committed Jun 25, 2023
1 parent 8e5245b commit b8f016f
Show file tree
Hide file tree
Showing 42 changed files with 1,867 additions and 1,702 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ kind_config
acto/k8s_util/lib/k8sutil.so
acto/k8s_util/lib/test
kubernetes_config
**/.pytest_cache/
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ make

To reproduce the bug, run the following command:
```sh
python3 -m Acto.reproduce --reproduce-dir test/cassop-330/trial-demo --config data/cass-operator/config.json
python3 -m acto.reproduce --reproduce-dir test/cassop-330/trial-demo --config data/cass-operator/config.json
```
The files in the `test/cassop-330/trial-demo` directory are the sequence of CRs required to trigger
this bug.
Expand Down
6 changes: 2 additions & 4 deletions acto/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import sys
import threading
import time
from acto import common, config
from acto import common
from acto.engine import Acto, apply_testcase
from acto.input.input import DeterministicInputModel, InputModel
from acto.post_process import PostDiffTest
Expand Down Expand Up @@ -59,7 +59,6 @@
default=1,
help='Number of testcases to bundle each time')
parser.add_argument('--learn', dest='learn', action='store_true', help='Learn mode')
parser.add_argument('--blackbox', dest='blackbox', action='store_true', help='Blackbox mode')

parser.add_argument('--additional-semantic',
dest='additional_semantic',
Expand Down Expand Up @@ -99,7 +98,7 @@
threading.excepthook = thread_excepthook

if args.notify_crash:
config.NOTIFY_CRASH = True
logger.critical('Crash notification should be enabled in config.yaml')

with open(args.config, 'r') as config_file:
config = OperatorConfig(**json.load(config_file))
Expand Down Expand Up @@ -142,7 +141,6 @@
is_reproduce=is_reproduce,
input_model=input_model,
apply_testcase_f=apply_testcase_f,
blackbox=args.blackbox,
delta_from=args.delta_from)
generation_time = datetime.now()
logger.info('Acto initialization finished in %s', generation_time - start_time)
Expand Down
2 changes: 1 addition & 1 deletion acto/checker/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from .checker import BlackBoxChecker, Checker, compare_system_equality

1,340 changes: 48 additions & 1,292 deletions acto/checker/checker.py

Large diffs are not rendered by default.

Empty file added acto/checker/impl/__init__.py
Empty file.
149 changes: 149 additions & 0 deletions acto/checker/impl/compare_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
import configparser
from typing import Any

from deepdiff.helper import NotPresent

from acto.k8s_util.k8sutil import canonicalize_quantity


def is_nullish(value: Any) -> bool:
"""
Check if value is None, NotPresent, empty string, empty list, empty dict, or 0
@param value:
@return:
"""
if value is None:
return True
if isinstance(value, NotPresent):
return True

if isinstance(value, str) and value == '':
return True

if isinstance(value, int) and value == 0:
return True

if isinstance(value, float) and value == 0:
return True

if isinstance(value, list) and len(value) == 0:
return True

if isinstance(value, dict) and len(value) == 0:
return True

return False


def either_is_nullish(left: Any, right: Any) -> bool:
"""
Check if either left or right is None, NotPresent, empty string, empty list, empty dict, or 0
@param left:
@param right:
@return:
"""
return is_nullish(left) or is_nullish(right)


def input_is_substring_of_output(input_value: Any, output_value: Any) -> bool:
# if input is int, then we want exact match to avoid mapping 10 to 1000, 2 to 20, etc.
if type(input_value) == int and input_value == output_value:
return True
if str(input_value).lower() in str(output_value).lower():
return True


def input_config_is_subset_of_output_config(input_config: Any, output_config: Any) -> bool:
if isinstance(input_config, str) and isinstance(output_config, str):
try:
input_parser = configparser.ConfigParser()
input_parser.read_string("[ACTO]\n" + input_config)
if len(input_parser.options("ACTO")) == 0:
return False

output_parser = configparser.ConfigParser()
output_parser.read_string("[ACTO]\n" + output_config)

for k, v in input_parser.items("ACTO"):
if output_parser.get("ACTO", k) != v:
return False
return True
except configparser.Error:
return False
return False


class CompareMethods:
def __init__(self, enable_k8s_value_canonicalization=True):
"""
@param enable_k8s_value_canonicalization: if True, then canonicalize_quantity() will be used to canonicalize values
"""
self.custom_equality_checkers = []
self.enable_k8s_value_canonicalization = enable_k8s_value_canonicalization
if enable_k8s_value_canonicalization:
self.custom_equality_checkers.extend([input_is_substring_of_output, input_config_is_subset_of_output_config])

def equals(self, left: Any, right: Any) -> bool:
"""
Compare two values. If the values are not equal, then try to use custom_equality_checkers to see if they are
@param left:
@param right:
@return:
"""
if left == right:
return True
else:
for equals in self.custom_equality_checkers:
if equals(left, right):
return True
return False

def equals_after_transform(self, in_prev, in_curr, out_prev, out_curr) -> bool:
# parse the argument: if a number, convert it to pure decimal format (i.e. 1e3 -> 1000); otherwise unchanged
in_prev, in_curr, out_prev, out_curr = self.transform_field_value(in_prev, in_curr, out_prev, out_curr)

# try every compare method possible
if self.equals(in_prev, out_prev) and self.equals(in_curr, out_curr):
return True
if either_is_nullish(in_prev, out_prev) and self.equals(in_curr, out_curr):
return True
if self.equals(in_prev, out_prev) and either_is_nullish(in_curr, out_curr):
return True
return False

def transform_field_value(self, in_prev, in_curr, out_prev, out_curr):
"""
Transform the field value if necessary
only one transformer is allowed for each field
However, currently we only support one transformer. So we just apply the transformer to all fields.
@param in_prev:
@param in_curr:
@param out_prev:
@param out_curr:
@return: transformed in_prev, in_curr, out_prev, out_curr
"""
if self.enable_k8s_value_canonicalization:
in_prev = canonicalize_quantity(in_prev)
in_curr = canonicalize_quantity(in_curr)
out_prev = canonicalize_quantity(out_prev)
out_curr = canonicalize_quantity(out_curr)

# return original values
return in_prev, in_curr, out_prev, out_curr


def delta_equals(prev, curr) -> bool:
if prev is None:
return True
if isinstance(prev, NotPresent):
return True

if prev == curr:
return True

# prev will neither be None nor NotPresent here
if curr is None or isinstance(curr, NotPresent):
return is_nullish(prev)
return False
44 changes: 44 additions & 0 deletions acto/checker/impl/crash.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from acto.checker.checker import Checker
from acto.common import OracleResult, ErrorResult, Oracle, PassResult
from acto.snapshot import Snapshot
from acto.lib.dict import visit_dict


def check_pod_status(pod):
container_statuses = pod['status']['container_statuses']
pod_name = pod['metadata']['name']
if not container_statuses:
return
for container_status in container_statuses:
if 'state' not in container_status:
continue
if visit_dict(container_status['state'], ['terminated', 'reason']) == (True, 'Error'):
raise ErrorResult(Oracle.CRASH, 'Pod %s crashed' % pod_name)
if visit_dict(container_status['state'], ['waiting', 'reason']) == (True, 'CrashLoopBackOff'):
raise ErrorResult(Oracle.CRASH, 'Pod %s crashed' % pod_name)


class CrashChecker(Checker):
name = 'crash'

def check(self, _: int, snapshot: Snapshot, __: Snapshot) -> OracleResult:
if snapshot.operator_log is not None:
for line in snapshot.operator_log:
if 'Bug!' in line:
return ErrorResult(Oracle.CRASH, line)

pods = snapshot.system_state['pod']
deployment_pods = snapshot.system_state['deployment_pods']

try:
for _, pod in pods.items():
check_pod_status(pod)

for deployment_name, deployment in deployment_pods.items():
for pod in deployment:
check_pod_status(pod)

except ErrorResult as e:
return e

return PassResult()
94 changes: 94 additions & 0 deletions acto/checker/impl/health.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
from acto.checker.checker import Checker
from acto.common import OracleResult, PassResult, Oracle, UnhealthyResult
from acto.snapshot import Snapshot
from acto.utils import get_thread_logger


class HealthChecker(Checker):
name = 'health'

def check(self, _: int, snapshot: Snapshot, prev_snapshot: Snapshot) -> OracleResult:
'''System health oracle'''
logger = get_thread_logger(with_prefix=True)

system_state = snapshot.system_state
unhealthy_resources = {
'statefulset': [],
'deployment': [],
'pod': [],
'cr': []
}

# check Health of Statefulsets
for sfs in system_state['stateful_set'].values():
if sfs['status']['ready_replicas'] is None and sfs['spec']['replicas'] == 0:
# replicas could be 0
continue
if sfs['spec']['replicas'] != sfs['status']['ready_replicas']:
unhealthy_resources['statefulset'].append(
'%s replicas [%s] ready_replicas [%s]' %
(sfs['metadata']['name'], sfs['status']['replicas'],
sfs['status']['ready_replicas']))

# check Health of Deployments
for dp in system_state['deployment'].values():
if dp['spec']['replicas'] == 0:
continue

if dp['spec']['replicas'] != dp['status']['ready_replicas']:
unhealthy_resources['deployment'].append(
'%s replicas [%s] ready_replicas [%s]' %
(dp['metadata']['name'], dp['status']['replicas'],
dp['status']['ready_replicas']))

for condition in dp['status']['conditions']:
if condition['type'] == 'Available' and condition['status'] != 'True':
unhealthy_resources['deployment'].append(
'%s condition [%s] status [%s] message [%s]' %
(dp['metadata']['name'], condition['type'], condition['status'],
condition['message']))
elif condition['type'] == 'Progressing' and condition['status'] != 'True':
unhealthy_resources['deployment'].append(
'%s condition [%s] status [%s] message [%s]' %
(dp['metadata']['name'], condition['type'], condition['status'],
condition['message']))

# check Health of Pods
for pod in system_state['pod'].values():
if pod['status']['phase'] in ['Running', 'Completed', 'Succeeded']:
continue
unhealthy_resources['pod'].append(pod['metadata']['name'])

for deployment in system_state['deployment_pods'].values():
for pod in deployment:
if pod['status']['phase'] in ['Completed', 'Succeeded']:
continue

if 'container_statuses' in pod['status'] and pod['status']['container_statuses']:
for container in pod['status']['container_statuses']:
if container['restart_count'] > 0:
unhealthy_resources['pod'].append(
'%s container [%s] restart_count [%s]' %
(pod['metadata']['name'], container['name'],
container['restart_count']))

# check Health of CRs
if system_state['custom_resource_status'] is not None and 'conditions' in system_state[
'custom_resource_status']:
for condition in system_state['custom_resource_status']['conditions']:
if condition['type'] == 'Ready' and condition[
'status'] != 'True' and 'is forbidden' in condition['message'].lower():
unhealthy_resources['cr'].append('%s condition [%s] status [%s] message [%s]' %
('CR status unhealthy', condition['type'],
condition['status'], condition['message']))

error_msg = ''
for kind, resources in unhealthy_resources.items():
if len(resources) != 0:
error_msg += f"{kind}: {', '.join(resources)}\n"
logger.error(f"Found {kind}: {', '.join(resources)} with unhealthy status")

if error_msg != '':
return UnhealthyResult(Oracle.SYSTEM_HEALTH, error_msg)

return PassResult()
41 changes: 41 additions & 0 deletions acto/checker/impl/kubectl_cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from acto.checker.checker import Checker
from acto.common import OracleResult, PassResult, InvalidInputResult, UnchangedInputResult, ConnectionRefusedResult, invalid_input_message
from acto.snapshot import Snapshot
from acto.utils import get_thread_logger

"""
This checker is used to check the output of kubectl cli command.
If kubectl reject the mutation, an error result will be returned.
"""


class KubectlCliChecker(Checker):
name = 'input'

def check(self, _: int, snapshot: Snapshot, prev_snapshot: Snapshot) -> OracleResult:
logger = get_thread_logger(with_prefix=True)

stdout, stderr = snapshot.cli_result['stdout'], snapshot.cli_result['stderr']

if stderr.find('connection refused') != -1 or stderr.find('deadline exceeded') != -1:
logger.info('Connection refused, reject mutation')
return ConnectionRefusedResult()

input_delta, _ = snapshot.delta(prev_snapshot)
is_invalid, invalid_field_path = invalid_input_message(stderr, input_delta)

# the stderr should indicate the invalid input
if len(stderr) > 0:
is_invalid = True

if is_invalid:
logger.info('Invalid input, reject mutation')
logger.info('STDOUT: ' + stdout)
logger.info('STDERR: ' + stderr)
return InvalidInputResult(invalid_field_path)

if stdout.find('unchanged') != -1 or stderr.find('unchanged') != -1:
logger.info('CR unchanged, continue')
return UnchangedInputResult()

return PassResult()
Loading

0 comments on commit b8f016f

Please sign in to comment.