Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

first sampling implementation #2

Merged
merged 3 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions __main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,17 @@ def parse_cla() -> argparse.Namespace:
help="Disables execution of tests.",
)

parser.add_argument(
"-r",
"--resource-sample-size",
metavar="size",
dest="resource_sample_size",
type=float,
default=0.05,
help="Specifies the percent of machines in each resource to send tests to. Argument value "
+ "should be a float between 0.0 (exclusive) and 1.0 (inclusive).",
)

return parser.parse_args()


Expand All @@ -114,6 +125,10 @@ def process_cla(args: argparse.Namespace):
print("Error: Cannot select both --snapshot and --print-tests options at the same time")
sys.exit(1)

if (args.resource_sample_size <= 0.0) or (args.resource_sample_size > 1.0):
print("Error: Resource sample size must be between 0.0 (exclusive) and 1.0 (inclusive)")
sys.exit(1)

# process tests arg
# split the list around commas and remove duplicates
for item in args.tests:
Expand Down
10 changes: 7 additions & 3 deletions docs/How_To_Add_Tests.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,12 @@ run it.
1. $(ResourceName) - This will be replaced with the target GLIDEIN_ResourceName for each job. This
may be of use to you, but is mostly used by the exerciser to target a specific resource.

2. $(uniq\_output\_dir) - Specifies a unique sub-directory of execution_dir/results/ResourceName
2. $(resource\_dir) - Specifies a unique sub-directory: execution_dir/results/ResourceName,
corresponding to the targeted resource of the job. This is useful for organizing any output files
automatically. Note: These sub-directories are not created before submission so storing the user log
within will not work. For an example on how to use this macro, you can inspect checksum.sub, which
you can find in tests/checksum.
within will not work.

3. $(sample\_dir) - Further specifies directory hierarchy beyond resource\_dir. When a test is
submitted to the pool, it will send a certain number of identical "sample tests" to each resource.
This macro will expand to: execution_dir/results/ResourceName/sample_XXX, where XXX is the sample
number for that unique instance of the test.
35 changes: 25 additions & 10 deletions src/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import shutil
from datetime import datetime
import argparse
from math import ceil


def get_resources() -> dict:
Expand All @@ -36,7 +37,7 @@ def get_resources() -> dict:
"""
collector = htcondor2.Collector("cm-1.ospool.osg-htc.org")
resources = collector.query(
ad_type=htcondor2.AdTypes.Startd,
ad_type=htcondor2.AdTypes.StartDaemon,
constraint="!isUndefined(GLIDEIN_ResourceName)",
projection=["GLIDEIN_ResourceName"],
)
Expand Down Expand Up @@ -117,7 +118,7 @@ def run_exerciser(args: argparse.Namespace):
# -b option
# controls whether the excersier runs. set to True by default
if args.run:
execute_tests(tests_dir, working_dir, args.tests)
execute_tests(tests_dir, working_dir, args.tests, args.resource_sample_size)


def parse_date(date_from_cla: str) -> str:
Expand Down Expand Up @@ -154,11 +155,13 @@ def parse_date(date_from_cla: str) -> str:
return format_date


def execute_tests(tests_dir: Path, working_dir: Path, test_list: list):
def execute_tests(tests_dir: Path, working_dir: Path, test_list: list, sample_percent: float):
"""
Usage: builds working file system and submits tests
@param tests_dir: directory containing all exerciser tests
@param working_dir: directory for storing info on exerciser runs
@param test_list: list parsed from args of all the tests to run
@param sample_percent: percent of machines to send tests to in each resource
"""
# create top level working dir for exerciser run
curr_time = datetime.now().strftime("%Y-%m-%d_%H-%M")
Expand Down Expand Up @@ -193,10 +196,19 @@ def execute_tests(tests_dir: Path, working_dir: Path, test_list: list):

os.chdir(execute_dir)
job = generate_sub_object(sub_file, test.name, abs_timestamp_dir)
item_data = [
{"ResourceName": resource, "uniq_output_dir": f"results/{resource}"}
for resource in resources.keys()
]

item_data = []
for resource in resources.keys():
resource_size = resources[resource]
sample_size = ceil(resource_size * sample_percent)
for i in range(sample_size):
item = {
"ResourceName": resource,
"resource_dir": f"results/{resource}",
ColeBollig marked this conversation as resolved.
Show resolved Hide resolved
"sample_dir": f"results/{resource}/sample_{i:03}",
"SampleNumber": str(i)
}
item_data.append(item)

job.issue_credentials()
schedd.submit(job, itemdata=iter(item_data))
Expand Down Expand Up @@ -292,6 +304,8 @@ def generate_sub_object(sub_file: Path, test_name: str, timestamp_dir: str) -> h
print(f"Error: Invalid submit file for test {test_name}")
sys.exit(1)

job.setSubmitMethod(99, True)

# add requirement to land on target ResourceName
req_expr = 'TARGET.GLIDEIN_ResourceName == "$(ResourceName)"'
req = job.get("Requirements")
Expand All @@ -314,13 +328,14 @@ def generate_sub_object(sub_file: Path, test_name: str, timestamp_dir: str) -> h
job["dagman_log"] = os.path.join(timestamp_dir, "shared_exerciser.log")

# create submit notes to identify job by the testname and expected resource
job["submit_event_notes"] = f"exerciser_info:{test_name},$(ResourceName)"
job["submit_event_notes"] = f"exerciser_info:{test_name},$(ResourceName),$(SampleNumber)"

# add execute attributes
job["ulog_execute_attrs"] = "GLIDEIN_ResourceName"

# add pool exerciser identifier attributes
job["My.is_pool_exerciser"] = "true"
job["My.pool_exerciser_test"] = test_name
job["My.EXERCISER_Job"] = "true"
job["My.EXERCISER_TestName"] = test_name
job["My.EXERCISER_SampleNum"] = "$(SampleNumber)"

return job
2 changes: 1 addition & 1 deletion src/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def status(timestamp_dir: Path, verbosity: int):
if event.type is JobEventType.SUBMIT:
log_notes = event["LogNotes"]
if ":" in log_notes:
testname, resource = log_notes.split(":")[1].split(",")
testname, resource, sample_num = log_notes.split(":")[1].split(",")

# add info to clusters to utilize for future execute, term, and abort events
if event.cluster not in clusters:
Expand Down
4 changes: 2 additions & 2 deletions tests/checksum/checksum.sub
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
executable = checksum.exe
arguments = input.h5 input.checksum

output = $(uniq_output_dir)/checksum.out
error = $(uniq_output_dir)/checksum.err
output = $(sample_dir)/checksum.out
error = $(sample_dir)/checksum.err
log = checksum.log

request_cpus = 1
Expand Down