Skip to content

Commit

Permalink
more fix of smoke tests (#4548)
Browse files Browse the repository at this point in the history
* fix release smoke tests

* robust backward compatibility

* bug fix

* support gke mark

* update retry field

* fix

* fix smoke tests issue

* update command of GCP

* comment for azure fix

* clear resources

* remove legacy_credentials

* bug fix

* resolve PR comment

* update comment

* text param for subprocess

* longer timeout

* rename to is_on_gcp

* revert initial_delay

* cache based on boot time

* fix command

* test_job_pipeline

* fix bug

* separate different param to different step

* fix bug

* increase azure delay test

* fix

* increase initial delay for azure

* prevent check

* skip certrain test

* fix test_inferentia

* Revert "increase initial delay for azure"

This reverts commit 94976c8.

* Revert "fix"

This reverts commit 376259c.

* Revert "increase azure delay test"

This reverts commit 068f409.

* align with pytest

* SKYPILOT_DISABLE_USAGE_COLLECTION

* rename to avoid run by pytest

* filter cloud

* remove yaml

* mypy

* verify

* support -k

* generic cloud

* comment

* remove

* Update tests/smoke_tests/test_cluster_job.py

Co-authored-by: Zhanghao Wu <[email protected]>

* import module

* test script

* bug fix

* fix comment

* comment of BUMP_UP_SECONDS

* skip

* pass kubernetes test

* fix config azure

* bug fix

* remove duplicated test

* longer timeout

* fix azure

* rename test

* remove print

* consistent with pytest

* one more test for empty arg

* resolve PR comment

* fix aws READY status

* add todo

* argparse fix

* comment out print

---------

Co-authored-by: Zhanghao Wu <[email protected]>
  • Loading branch information
zpoint and Michaelvll authored Jan 24, 2025
1 parent 97b8e8f commit 1c94d0f
Show file tree
Hide file tree
Showing 9 changed files with 409 additions and 147 deletions.
201 changes: 141 additions & 60 deletions .buildkite/generate_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@
clouds are not supported yet, smoke tests for those clouds are not generated.
"""

import argparse
import collections
import os
import random
import re
import subprocess
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Tuple

import click
from conftest import cloud_to_pytest_keyword
Expand Down Expand Up @@ -64,8 +65,59 @@
'edit directly.\n')


def _extract_marked_tests(file_path: str,
filter_marks: List[str]) -> Dict[str, List[str]]:
def _parse_args(args: Optional[str] = None):
"""
Parse command-line arguments to figure out which clouds to run
and the -k pattern for tests.
:return: (list_of_clouds, k_pattern)
"""
if args:
args_list = args.split()
else:
args_list = []
parser = argparse.ArgumentParser(
description="Process cloud arguments for tests")

# Flags for recognized clouds
for cloud in PYTEST_TO_CLOUD_KEYWORD.keys():
parser.add_argument(f"--{cloud}", action="store_true")

# Generic cloud argument, which takes a value (e.g., --generic-cloud aws)
parser.add_argument("--generic-cloud")

# -k argument for a test selection pattern
parser.add_argument("-k")

parsed_args, _ = parser.parse_known_args(args_list)

# Collect chosen clouds from the flags
# TODO(zpoint): get default clouds from the conftest.py
default_clouds_to_run = []
for cloud in PYTEST_TO_CLOUD_KEYWORD.keys():
if getattr(parsed_args, cloud):
default_clouds_to_run.append(cloud)
if default_clouds_to_run:
default_clouds_to_run = list(
set(default_clouds_to_run) & set(CLOUD_QUEUE_MAP.keys()))
# if user pass clouds we don't support, we should revert back to default
if not default_clouds_to_run:
default_clouds_to_run = DEFAULT_CLOUDS_TO_RUN

# If a generic cloud is specified, it overrides any chosen clouds
if (parsed_args.generic_cloud and
parsed_args.generic_cloud in CLOUD_QUEUE_MAP):
default_clouds_to_run = [parsed_args.generic_cloud]

if not default_clouds_to_run:
default_clouds_to_run = DEFAULT_CLOUDS_TO_RUN

return default_clouds_to_run, parsed_args.k


def _extract_marked_tests(
file_path: str, args: str
) -> Dict[str, Tuple[List[str], List[str], List[Optional[str]]]]:
"""Extract test functions and filter clouds using pytest.mark
from a Python test file.
Expand All @@ -79,43 +131,58 @@ def _extract_marked_tests(file_path: str,
rerun failures. Additionally, the parallelism would be controlled by pytest
instead of the buildkite job queue.
"""
cmd = f'pytest {file_path} --collect-only'
cmd = f'pytest {file_path} --collect-only {args}'
output = subprocess.run(cmd, shell=True, capture_output=True, text=True)
matches = re.findall('Collected .+?\.py::(.+?) with marks: \[(.*?)\]',
output.stdout)
function_name_marks_map = {}
print(f'args: {args}')
default_clouds_to_run, k_value = _parse_args(args)

print(f'default_clouds_to_run: {default_clouds_to_run}, k_value: {k_value}')
function_name_marks_map = collections.defaultdict(set)
function_name_param_map = collections.defaultdict(list)

for function_name, marks in matches:
function_name = re.sub(r'\[.*?\]', '', function_name)
clean_function_name = re.sub(r'\[.*?\]', '', function_name)
clean_function_name = re.sub(r'@.*?$', '', clean_function_name)
# The skip mark is generated by pytest naturally, and print in
# conftest.py
if 'skip' in marks:
continue
if k_value is not None and k_value not in function_name:
# TODO(zpoint): support and/or in k_value
continue

marks = marks.replace('\'', '').split(',')
marks = [i.strip() for i in marks]
if function_name not in function_name_marks_map:
function_name_marks_map[function_name] = set(marks)
else:
function_name_marks_map[function_name].update(marks)

function_name_marks_map[clean_function_name].update(marks)

# extract parameter from function name
# example: test_skyserve_new_autoscaler_update[rolling]
# param: rolling
# function_name: test_skyserve_new_autoscaler_update
param = None
if '[' in function_name and 'serve' in marks:
# Only serve tests are slow and flaky, so we separate them
# to different steps for parallel execution
param = re.search('\[(.+?)\]', function_name).group(1)
if param:
function_name_param_map[clean_function_name].append(param)

function_cloud_map = {}
filter_marks = set(filter_marks)
for function_name, marks in function_name_marks_map.items():
if filter_marks and not filter_marks & marks:
continue
clouds_to_include = []
clouds_to_exclude = []
is_serve_test = 'serve' in marks
run_on_gke = 'requires_gke' in marks
for mark in marks:
if mark.startswith('no_'):
clouds_to_exclude.append(mark[3:])
else:
if mark not in PYTEST_TO_CLOUD_KEYWORD:
# This mark does not specify a cloud, so we skip it.
continue
clouds_to_include.append(PYTEST_TO_CLOUD_KEYWORD[mark])
if mark not in PYTEST_TO_CLOUD_KEYWORD:
# This mark does not specify a cloud, so we skip it.
continue
clouds_to_include.append(PYTEST_TO_CLOUD_KEYWORD[mark])

clouds_to_include = (clouds_to_include
if clouds_to_include else DEFAULT_CLOUDS_TO_RUN)
clouds_to_include = [
cloud for cloud in clouds_to_include
if cloud not in clouds_to_exclude
]
if clouds_to_include else default_clouds_to_run)
cloud_queue_map = SERVE_CLOUD_QUEUE_MAP if is_serve_test else CLOUD_QUEUE_MAP
final_clouds_to_include = [
cloud for cloud in clouds_to_include if cloud in cloud_queue_map
Expand All @@ -132,82 +199,101 @@ def _extract_marked_tests(file_path: str,
f'Warning: {function_name} is marked to run on {clouds_to_include}, '
f'but we only have credentials for {final_clouds_to_include}. '
f'clouds {excluded_clouds} are skipped.')

# pytest will only run the first cloud if there are multiple clouds
# make it consistent with pytest behavior
# print(f"final_clouds_to_include: {final_clouds_to_include}")
final_clouds_to_include = [final_clouds_to_include[0]]
param_list = function_name_param_map.get(function_name, [None])
if len(param_list) < len(final_clouds_to_include):
# align, so we can zip them together
param_list += [None
] * (len(final_clouds_to_include) - len(param_list))
function_cloud_map[function_name] = (final_clouds_to_include, [
QUEUE_GKE if run_on_gke else cloud_queue_map[cloud]
for cloud in final_clouds_to_include
])
], param_list)

return function_cloud_map


def _generate_pipeline(test_file: str,
filter_marks: List[str],
args: str,
auto_retry: bool = False) -> Dict[str, Any]:
"""Generate a Buildkite pipeline from test files."""
steps = []
function_cloud_map = _extract_marked_tests(test_file, filter_marks)
for test_function, clouds_and_queues in function_cloud_map.items():
for cloud, queue in zip(*clouds_and_queues):
generated_function_set = set()
function_cloud_map = _extract_marked_tests(test_file, args)
for test_function, clouds_queues_param in function_cloud_map.items():
for cloud, queue, param in zip(*clouds_queues_param):
if test_function in generated_function_set:
# Skip duplicate nested function tests under the same class
continue
label = f'{test_function} on {cloud}'
command = f'pytest {test_file}::{test_function} --{cloud}'
if param:
label += f' with param {param}'
command += f' -k {param}'
step = {
'label': f'{test_function} on {cloud}',
'command': f'pytest {test_file}::{test_function} --{cloud}',
'label': label,
'command': command,
'agents': {
# Separate agent pool for each cloud.
# Since they require different amount of resources and
# concurrency control.
'queue': queue
},
'if': f'build.env("{cloud}") == "1"'
}
}
if auto_retry:
step['retry'] = {
# Automatically retry 2 times on any failure by default.
'automatic': True
}
generated_function_set.add(test_function)
steps.append(step)
return {'steps': steps}


def _dump_pipeline_to_file(yaml_file_path: str,
pipelines: List[Dict[str, Any]],
extra_env: Optional[Dict[str, str]] = None):
default_env = {'LOG_TO_STDOUT': '1', 'PYTHONPATH': '${PYTHONPATH}:$(pwd)'}
default_env = {
'LOG_TO_STDOUT': '1',
'PYTHONPATH': '${PYTHONPATH}:$(pwd)',
'SKYPILOT_DISABLE_USAGE_COLLECTION': '1'
}
if extra_env:
default_env.update(extra_env)
with open(yaml_file_path, 'w', encoding='utf-8') as file:
file.write(GENERATED_FILE_HEAD)
all_steps = []
for pipeline in pipelines:
all_steps.extend(pipeline['steps'])
# Shuffle the steps to avoid flakyness, consecutive runs of the same
# kind of test may fail for requiring locks on the same resources.
random.shuffle(all_steps)
final_pipeline = {'steps': all_steps, 'env': default_env}
yaml.dump(final_pipeline, file, default_flow_style=False)


def _convert_release(test_files: List[str], filter_marks: List[str]):
def _convert_release(test_files: List[str], args: str):
yaml_file_path = '.buildkite/pipeline_smoke_tests_release.yaml'
output_file_pipelines = []
for test_file in test_files:
print(f'Converting {test_file} to {yaml_file_path}')
pipeline = _generate_pipeline(test_file, filter_marks, auto_retry=True)
pipeline = _generate_pipeline(test_file, args, auto_retry=True)
output_file_pipelines.append(pipeline)
print(f'Converted {test_file} to {yaml_file_path}\n\n')
# Enable all clouds by default for release pipeline.
_dump_pipeline_to_file(yaml_file_path,
output_file_pipelines,
extra_env={cloud: '1' for cloud in CLOUD_QUEUE_MAP})
_dump_pipeline_to_file(yaml_file_path, output_file_pipelines)


def _convert_quick_tests_core(test_files: List[str], filter_marks: List[str]):
def _convert_quick_tests_core(test_files: List[str], args: List[str]):
yaml_file_path = '.buildkite/pipeline_smoke_tests_quick_tests_core.yaml'
output_file_pipelines = []
for test_file in test_files:
print(f'Converting {test_file} to {yaml_file_path}')
# We want enable all clouds by default for each test function
# for pre-merge. And let the author controls which clouds
# to run by parameter.
pipeline = _generate_pipeline(test_file, filter_marks)
pipeline = _generate_pipeline(test_file, args)
pipeline['steps'].append({
'label': 'Backward compatibility test',
'command': 'bash tests/backward_compatibility_tests.sh',
Expand All @@ -223,11 +309,10 @@ def _convert_quick_tests_core(test_files: List[str], filter_marks: List[str]):


@click.command()
@click.option(
'--filter-marks',
type=str,
help='Filter to include only a subset of pytest marks, e.g., managed_jobs')
def main(filter_marks):
@click.option('--args',
type=str,
help='Args to pass to pytest, e.g., --managed-jobs --aws')
def main(args):
test_files = os.listdir('tests/smoke_tests')
release_files = []
quick_tests_core_files = []
Expand All @@ -240,15 +325,11 @@ def main(filter_marks):
else:
release_files.append(test_file_path)

filter_marks = filter_marks or os.getenv('FILTER_MARKS')
if filter_marks:
filter_marks = filter_marks.split(',')
print(f'Filter marks: {filter_marks}')
else:
filter_marks = []
args = args or os.getenv('ARGS', '')
print(f'args: {args}')

_convert_release(release_files, filter_marks)
_convert_quick_tests_core(quick_tests_core_files, filter_marks)
_convert_release(release_files, args)
_convert_quick_tests_core(quick_tests_core_files, args)


if __name__ == '__main__':
Expand Down
Loading

0 comments on commit 1c94d0f

Please sign in to comment.