Skip to content

Commit

Permalink
Merge pull request #78 from jameshcorbett/persistent-dw-directives
Browse files Browse the repository at this point in the history
Persistent dw directives
  • Loading branch information
mergify[bot] authored Jun 22, 2023
2 parents 08288cc + 6b29836 commit bbf8278
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 75 deletions.
68 changes: 36 additions & 32 deletions src/modules/coral2_dws.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,9 @@ def wrapper(fh, t, msg, k8s_api):
except Exception as exc:
try:
jobid = msg.payload["jobid"]
topic = msg.topic
except Exception:
jobid = None
topic = jobid = None
try:
# only k8s APIExceptions will have a JSON message body,
# but try to extract it out of every exception for simplicity
Expand All @@ -79,7 +80,7 @@ def wrapper(fh, t, msg, k8s_api):
errstr = str(exc)
fh.log(syslog.LOG_ERR, f"{os.path.basename(__file__)}: {errstr}")
fh.respond(msg, {"success": False, "errstr": errstr})
LOGGER.error("Error in responding to RPC for %s: %s", jobid, errstr)
LOGGER.error("Error in responding to %s RPC for %s: %s", topic, jobid, errstr)

return wrapper

Expand Down Expand Up @@ -160,38 +161,41 @@ def setup_cb(fh, t, msg, k8s_api):
{"data": compute_nodes},
)
for breakdown in workflow.breakdowns:
allocation_sets = []
for alloc_set in breakdown["status"]["storage"]["allocationSets"]:
storage_field = []
server_alloc_set = {
"allocationSize": alloc_set["minimumCapacity"],
"label": alloc_set["label"],
"storage": storage_field,
}
if (
alloc_set["allocationStrategy"]
== directivebreakdown.AllocationStrategy.PER_COMPUTE.value
):
# make an allocation on every rabbit attached to compute nodes
# in the job
for nnf_name in nodes_per_nnf.keys():
# if a breakdown doesn't have a storage field (e.g. persistentdw) directives
# ignore it and proceed
if "storage" in breakdown["status"]:
allocation_sets = []
for alloc_set in breakdown["status"]["storage"]["allocationSets"]:
storage_field = []
server_alloc_set = {
"allocationSize": alloc_set["minimumCapacity"],
"label": alloc_set["label"],
"storage": storage_field,
}
if (
alloc_set["allocationStrategy"]
== directivebreakdown.AllocationStrategy.PER_COMPUTE.value
):
# make an allocation on every rabbit attached to compute nodes
# in the job
for nnf_name in nodes_per_nnf.keys():
storage_field.append(
{"allocationCount": nodes_per_nnf[nnf_name], "name": nnf_name}
)
else:
# make a single allocation on a random rabbit
storage_field.append(
{"allocationCount": nodes_per_nnf[nnf_name], "name": nnf_name}
{"allocationCount": 1, "name": next(iter(nodes_per_nnf.keys()))}
)
else:
# make a single allocation on a random rabbit
storage_field.append(
{"allocationCount": 1, "name": next(iter(nodes_per_nnf.keys()))}
)
allocation_sets.append(server_alloc_set)
k8s_api.patch_namespaced_custom_object(
SERVER_CRD.group,
SERVER_CRD.version,
breakdown["status"]["storage"]["reference"]["namespace"],
SERVER_CRD.plural,
breakdown["status"]["storage"]["reference"]["name"],
{"spec": {"allocationSets": allocation_sets}},
)
allocation_sets.append(server_alloc_set)
k8s_api.patch_namespaced_custom_object(
SERVER_CRD.group,
SERVER_CRD.version,
breakdown["status"]["storage"]["reference"]["namespace"],
SERVER_CRD.plural,
breakdown["status"]["storage"]["reference"]["name"],
{"spec": {"allocationSets": allocation_sets}},
)
workflow.setup_rpc = msg
move_workflow_desiredstate(workflow.name, "Setup", k8s_api)

Expand Down
2 changes: 1 addition & 1 deletion src/python/flux_k8s/directivebreakdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def apply_breakdowns(k8s_api, workflow, resources):
def _fetch_breakdowns(k8s_api, workflow):
"""Fetch all of the directive breakdowns associated with a workflow."""
if not workflow["status"].get("directiveBreakdowns"):
raise ValueError(f"workflow {workflow} has no directive breakdowns")
return [] # destroy_persistent DW directives have no breakdowns
for breakdown in workflow["status"]["directiveBreakdowns"]:
yield k8s_api.get_namespaced_custom_object(
DIRECTIVEBREAKDOWN_CRD.group,
Expand Down
114 changes: 72 additions & 42 deletions t/t1002-dws-workflow-obj.t
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ test_expect_success 'exec dws service-providing script' '
R=$(flux R encode -r 0) &&
DWS_JOBID=$(flux submit \
--setattr=system.alloc-bypass.R="$R" \
-o per-resource.type=node --output=dws.out --error=dws.err \
-o per-resource.type=node --output=dws1.out --error=dws1.err \
python ${DWS_MODULE_PATH} -e1) &&
flux job wait-event -vt 15 -p guest.exec.eventlog ${DWS_JOBID} shell.start
'
Expand All @@ -51,77 +51,80 @@ test_expect_success 'wait for service to register and send test RPC' '

test_expect_success 'job submission without DW string works' '
jobid=$(flux submit -n1 /bin/true) &&
flux job wait-event -vt 25 ${jobid} finish &&
flux job wait-event -vt 25 -m status=0 ${jobid} finish &&
test_must_fail flux job wait-event -vt 5 -m description=${CREATE_DEP_NAME} \
${jobid} dependency-add
'

test_expect_success 'job submission with valid DW string works' '
jobid=$(flux submit --setattr=system.dw="#DW jobdw capacity=10KiB type=xfs name=project1" \
jobid=$(flux submit --setattr=system.dw="#DW jobdw capacity=10GiB type=xfs name=project1" \
-N1 -n1 hostname) &&
flux job wait-event -vt 10 -m description=${CREATE_DEP_NAME} \
${jobid} dependency-add &&
flux job wait-event -t 10 -m description=${CREATE_DEP_NAME} \
${jobid} dependency-remove &&
flux job wait-event -t 10 -m rabbit_workflow=fluxjob-$(flux job id ${jobid}) \
${jobid} memo &&
flux job wait-event -vt 5 ${jobid} depend &&
flux job wait-event -vt 5 ${jobid} priority &&
flux job wait-event -vt 5 -m description=${PROLOG_NAME} \
flux job wait-event -vt 15 ${jobid} depend &&
flux job wait-event -vt 15 ${jobid} priority &&
flux job wait-event -vt 15 -m description=${PROLOG_NAME} \
${jobid} prolog-start &&
flux job wait-event -vt 5 -m description=${PROLOG_NAME} \
flux job wait-event -vt 15 -m description=${PROLOG_NAME} \
${jobid} prolog-finish &&
flux job wait-event -vt 5 -m description=${EPILOG_NAME} \
flux job wait-event -vt 15 -m status=0 ${jobid} finish &&
flux job wait-event -vt 15 -m description=${EPILOG_NAME} \
${jobid} epilog-start &&
flux job wait-event -vt 5 -m description=${EPILOG_NAME} \
flux job wait-event -vt 30 -m description=${EPILOG_NAME} \
${jobid} epilog-finish &&
flux job wait-event -vt 5 ${jobid} clean
flux job wait-event -vt 15 ${jobid} clean
'

test_expect_success 'job submission with multiple valid DW strings on different lines works' '
jobid=$(flux submit --setattr=system.dw="
#DW jobdw capacity=10KiB type=xfs name=project1
#DW jobdw capacity=10GiB type=xfs name=project1
#DW jobdw capacity=20KiB type=gfs2 name=project2" \
#DW jobdw capacity=20GiB type=gfs2 name=project2" \
-N1 -n1 hostname) &&
flux job wait-event -vt 10 -m description=${CREATE_DEP_NAME} \
${jobid} dependency-add &&
flux job wait-event -t 10 -m description=${CREATE_DEP_NAME} \
${jobid} dependency-remove &&
flux job wait-event -t 10 -m rabbit_workflow=fluxjob-$(flux job id ${jobid}) \
${jobid} memo &&
flux job wait-event -vt 5 ${jobid} depend &&
flux job wait-event -vt 5 ${jobid} priority &&
flux job wait-event -vt 5 -m description=${PROLOG_NAME} \
flux job wait-event -vt 15 ${jobid} depend &&
flux job wait-event -vt 15 ${jobid} priority &&
flux job wait-event -vt 15 -m description=${PROLOG_NAME} \
${jobid} prolog-start &&
flux job wait-event -vt 5 -m description=${PROLOG_NAME} \
flux job wait-event -vt 15 -m description=${PROLOG_NAME} \
${jobid} prolog-finish &&
flux job wait-event -vt 5 -m description=${EPILOG_NAME} \
flux job wait-event -vt 15 -m status=0 ${jobid} finish &&
flux job wait-event -vt 15 -m description=${EPILOG_NAME} \
${jobid} epilog-start &&
flux job wait-event -vt 5 -m description=${EPILOG_NAME} \
flux job wait-event -vt 30 -m description=${EPILOG_NAME} \
${jobid} epilog-finish &&
flux job wait-event -vt 5 ${jobid} clean
flux job wait-event -vt 15 ${jobid} clean
'

test_expect_success 'job submission with multiple valid DW strings on the same line works' '
jobid=$(flux submit --setattr=system.dw="#DW jobdw capacity=10KiB type=xfs name=project1 \
#DW jobdw capacity=20KiB type=gfs2 name=project2" \
jobid=$(flux submit --setattr=system.dw="#DW jobdw capacity=10GiB type=xfs name=project1 \
#DW jobdw capacity=20GiB type=gfs2 name=project2" \
-N1 -n1 hostname) &&
flux job wait-event -vt 10 -m description=${CREATE_DEP_NAME} \
${jobid} dependency-add &&
flux job wait-event -t 10 -m description=${CREATE_DEP_NAME} \
${jobid} dependency-remove &&
flux job wait-event -vt 5 ${jobid} depend &&
flux job wait-event -vt 5 ${jobid} priority &&
flux job wait-event -vt 5 -m description=${PROLOG_NAME} \
flux job wait-event -vt 15 ${jobid} depend &&
flux job wait-event -vt 15 ${jobid} priority &&
flux job wait-event -vt 15 -m description=${PROLOG_NAME} \
${jobid} prolog-start &&
flux job wait-event -vt 5 -m description=${PROLOG_NAME} \
flux job wait-event -vt 15 -m description=${PROLOG_NAME} \
${jobid} prolog-finish &&
flux job wait-event -vt 5 -m description=${EPILOG_NAME} \
flux job wait-event -vt 15 -m status=0 ${jobid} finish &&
flux job wait-event -vt 15 -m description=${EPILOG_NAME} \
${jobid} epilog-start &&
flux job wait-event -vt 5 -m description=${EPILOG_NAME} \
flux job wait-event -vt 30 -m description=${EPILOG_NAME} \
${jobid} epilog-finish &&
flux job wait-event -vt 5 ${jobid} clean
flux job wait-event -vt 15 ${jobid} clean
'

test_expect_success 'job submission with multiple valid DW strings in a JSON file works' '
Expand All @@ -131,19 +134,20 @@ test_expect_success 'job submission with multiple valid DW strings in a JSON fil
${jobid} dependency-add &&
flux job wait-event -t 10 -m description=${CREATE_DEP_NAME} \
${jobid} dependency-remove &&
flux job wait-event -vt 5 ${jobid} depend &&
flux job wait-event -vt 5 ${jobid} priority &&
flux job wait-event -vt 15 ${jobid} depend &&
flux job wait-event -vt 15 ${jobid} priority &&
flux job wait-event -t 10 -m rabbit_workflow=fluxjob-$(flux job id ${jobid}) \
${jobid} memo &&
flux job wait-event -vt 5 -m description=${PROLOG_NAME} \
flux job wait-event -vt 15 -m description=${PROLOG_NAME} \
${jobid} prolog-start &&
flux job wait-event -vt 5 -m description=${PROLOG_NAME} \
flux job wait-event -vt 15 -m description=${PROLOG_NAME} \
${jobid} prolog-finish &&
flux job wait-event -vt 5 -m description=${EPILOG_NAME} \
flux job wait-event -vt 15 -m status=0 ${jobid} finish &&
flux job wait-event -vt 15 -m description=${EPILOG_NAME} \
${jobid} epilog-start &&
flux job wait-event -vt 5 -m description=${EPILOG_NAME} \
flux job wait-event -vt 30 -m description=${EPILOG_NAME} \
${jobid} epilog-finish &&
flux job wait-event -vt 5 ${jobid} clean
flux job wait-event -vt 15 ${jobid} clean
'

test_expect_success 'job-manager: dependency plugin works when validation fails' '
Expand All @@ -153,8 +157,8 @@ test_expect_success 'job-manager: dependency plugin works when validation fails'
flux job wait-event -vt 10 ${jobid} exception
'

test_expect_success 'workflows in Error are killed properly' '
jobid=$(flux submit --setattr=system.dw="#DW jobdw capacity=10KiB type=xfs name=project1
test_expect_success 'dws service kills workflows in Error properly' '
jobid=$(flux submit --setattr=system.dw="#DW jobdw capacity=10GiB type=xfs name=project1
#DW copy_in source=/some/fake/dir destination=$DW_JOB_project1/" \
-N1 -n1 hostname) &&
flux job wait-event -vt 10 -m description=${CREATE_DEP_NAME} \
Expand All @@ -168,23 +172,49 @@ test_expect_success 'exec dws service-providing script with custom config path'
cp $REAL_HOME/.kube/config ./kubeconfig
DWS_JOBID=$(flux submit \
--setattr=system.alloc-bypass.R="$R" \
-o per-resource.type=node --output=dws.out --error=dws.err \
-o per-resource.type=node --output=dws2.out --error=dws2.err \
python ${DWS_MODULE_PATH} -e1 --kubeconfig $PWD/kubeconfig) &&
flux job wait-event -vt 15 -m "note=dws watchers setup" ${DWS_JOBID} exception &&
${RPC} "dws.create"
'

test_expect_success 'job submission with valid DW string works after config change' '
jobid=$(flux submit --setattr=system.dw="#DW jobdw capacity=10KiB type=xfs name=project1" \
jobid=$(flux submit --setattr=system.dw="#DW jobdw capacity=10GiB type=xfs name=project1" \
-N1 -n1 hostname) &&
flux job wait-event -vt 15 -m description=${CREATE_DEP_NAME} \
${jobid} dependency-add &&
flux job wait-event -vt 15 -m description=${PROLOG_NAME} \
${jobid} prolog-start &&
flux job wait-event -vt 15 -m description=${EPILOG_NAME} \
flux job wait-event -vt 15 -m status=0 ${jobid} finish &&
flux job wait-event -vt 5 -m description=${EPILOG_NAME} \
${jobid} epilog-start &&
flux job wait-event -vt 15 ${jobid} clean
'

test_expect_success 'job submission with persistent DW string works' '
flux run --setattr=system.dw="#DW create_persistent capacity=10GiB type=lustre name=project1" \
-N1 -n1 -c1 hostname &&
jobid=$(flux submit --setattr=system.dw="#DW persistentdw name=project1" \
-N1 -n1 hostname) &&
flux job wait-event -vt 30 -m description=${PROLOG_NAME} \
${jobid} prolog-start &&
flux job wait-event -vt 30 -m status=0 ${jobid} finish &&
flux job wait-event -vt 30 -m description=${EPILOG_NAME} \
${jobid} epilog-start &&
flux job wait-event -vt 30 ${jobid} clean &&
jobid=$(flux submit --setattr=system.dw="#DW persistentdw name=project1" \
-N1 -n1 hostname) &&
flux job wait-event -vt 30 -m description=${PROLOG_NAME} \
${jobid} prolog-start &&
flux job wait-event -vt 30 -m status=0 ${jobid} finish &&
flux job wait-event -vt 30 -m description=${EPILOG_NAME} \
${jobid} epilog-start &&
flux job wait-event -vt 30 ${jobid} clean &&
jobid=$(flux submit --setattr=system.dw="#DW destroy_persistent name=project1" \
-N1 -n1 -c1 hostname) &&
flux job wait-event -vt 30 -m description=${EPILOG_NAME} \
${jobid} epilog-start &&
flux job wait-event -vt 15 ${jobid} clean &&
flux job cancel ${DWS_JOBID}
flux job wait-event -vt 30 ${jobid} clean
'

test_done

0 comments on commit bbf8278

Please sign in to comment.