Skip to content

Commit

Permalink
Handle node/job data from pod scheduled events
Browse files Browse the repository at this point in the history
  • Loading branch information
jjnesbitt committed Nov 13, 2023
1 parent c35294a commit 219d737
Showing 1 changed file with 90 additions and 79 deletions.
169 changes: 90 additions & 79 deletions analytics/analytics/management/commands/collect_running_jobs.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Literal

import djclick as click
Expand All @@ -9,7 +10,8 @@
from analytics.models import Job

# Ensure kubernetes API is setup
kubernetes.config.load_incluster_config()
kubernetes.config.load_config()
client = kubernetes.client.CoreV1Api()


@dataclass
Expand All @@ -21,8 +23,8 @@ class PodMetadata:
job_size: str
job_ref: str
package_name: str
cpu_request: float
memory_request: int
cpu_request: float | None
memory_request: int | None
package_version: str
compiler_name: str
compiler_version: str
Expand Down Expand Up @@ -52,7 +54,7 @@ def get_pod_metadata(pod: V1Pod) -> PodMetadata:
"""Get data from the pod that's necessary for storing a job."""
pod_dict = pod.to_dict()
pod_env = next(
(x.env for x in pod_dict["spec"]["containers"] if x.name == "build"),
(x["env"] for x in pod_dict["spec"]["containers"] if x["name"] == "build"),
None,
)
if pod_env is None:
Expand All @@ -66,6 +68,10 @@ def get_pod_metadata(pod: V1Pod) -> PodMetadata:
# Retrieve labels
labels: dict = pod_dict["metadata"]["labels"]

# Retrieve k8s resource requests, if they're set
cpu_request = pod_env.get("KUBERNETES_CPU_REQUEST")
memory_request = pod_env.get("KUBERNETES_MEMORY_REQUEST")

# Return data in one place
return PodMetadata(
project_id=pod_env["CI_PROJECT_ID"],
Expand All @@ -76,8 +82,8 @@ def get_pod_metadata(pod: V1Pod) -> PodMetadata:
job_ref=pod_env["CI_COMMIT_REF_NAME"],
# Note: tags not provided here, will be populated in the gitlab webhook
package_name=labels["metrics/spack_job_spec_pkg_name"],
cpu_request=float(parse_quantity(pod_env["KUBERNETES_CPU_REQUEST"])),
memory_request=int(parse_quantity(pod_env["KUBERNETES_MEMORY_REQUEST"])),
cpu_request=float(parse_quantity(cpu_request)) if cpu_request else None,
memory_request=int(parse_quantity(memory_request)) if memory_request else None,
package_version=labels["metrics/spack_job_spec_pkg_version"],
compiler_name=labels["metrics/spack_job_spec_compiler_name"],
compiler_version=labels["metrics/spack_job_spec_compiler_version"],
Expand All @@ -102,86 +108,91 @@ def get_node_metadata(node: dict) -> NodeMetadata:
)


def get_running_build_pods():
"""Returns pod running in the `pipeline` namespace that are on a valid stage, not `generate`."""
client = kubernetes.client.CoreV1Api()
return [
pod
for pod in client.list_namespaced_pod(namespace="pipeline").items
if pod.metadata.labels["metrics/gitlab_ci_job_stage"].startswith("stage-")
]


def get_running_job_metadata() -> list[JobMetadata]:
client = kubernetes.client.CoreV1Api()
pods = get_running_build_pods()
node_map = {
node.metadata.name: node.to_dict()
for node in client.list_node(label_selector="spack.io/pipeline=true").items
}

results = []
for pod in pods:
node = node_map[pod.to_dict()["spec"]["node_name"]]
results.append(
JobMetadata(
node=get_node_metadata(node),
pod=get_pod_metadata(pod),
)
)
def handle_scheduled_pipeline_pod(wrapped_event: dict, start_time: datetime):
if wrapped_event["type"] != "ADDED":
return

return results
# Check that it's a current event
event: dict = wrapped_event["object"].to_dict()
created: datetime = event["metadata"]["creation_timestamp"]
if created < start_time:
click.echo(f"Skipping event from {created.isoformat()}")
return

# Retrieve pod
pod_name = event["involved_object"]["name"]
pod: V1Pod = client.read_namespaced_pod(namespace="pipeline", name=pod_name) # type: ignore
node_name = pod.to_dict()["spec"]["node_name"]

@click.command()
def main():
job_metadata = get_running_job_metadata()
# Retrieve node
node = client.read_node(name=node_name).to_dict() # type: ignore
item = JobMetadata(
node=get_node_metadata(node),
pod=get_pod_metadata(pod),
)

# Ensure we only act on new jobs
running_job_ids = [x.pod.job_id for x in job_metadata]
existing_job_ids = set(
Job.objects.filter(job_id__in=running_job_ids).values_list("job_id", flat=True)
# Check to make sure job hasn't already been recorded
if Job.objects.filter(
project_id=item.pod.project_id, job_id=item.pod.job_id
).exists():
return

# Tags, duration intentionally left blank, as they will be updated once the job finishes
job = Job.objects.create(
# Core data
job_id=item.pod.job_id,
project_id=item.pod.project_id,
name=item.pod.job_name,
started_at=item.pod.job_started_at,
duration=None,
ref=item.pod.job_ref,
package_name=item.pod.package_name,
job_cpu_request=item.pod.cpu_request,
job_memory_request=item.pod.memory_request,
# Node data
node_name=item.node.name,
node_uid=item.node.uid,
node_instance_type=item.node.instance_type,
node_capacity_type=item.node.capacity_type,
node_cpu=item.node.cpu,
node_mem=item.node.mem,
# Extra data
package_version=item.pod.package_version,
compiler_name=item.pod.compiler_name,
compiler_version=item.pod.compiler_version,
arch=item.pod.arch,
package_variants=item.pod.package_variants,
build_jobs=item.pod.build_jobs,
job_size=item.pod.job_size,
stack=item.pod.stack,
# By defninition this is true, since this script runs in the cluster
aws=True,
)
new_jobs = [job for job in job_metadata if job.pod.job_id not in existing_job_ids]

# Bulk create new jobs
Job.objects.bulk_create(
[
# Tags, duration intentionally left blank, as they will be updated once the job finishes
Job(
# Core data
job_id=item.pod.job_id,
project_id=item.pod.project_id,
name=item.pod.job_name,
started_at=item.pod.job_started_at,
duration=None,
ref=item.pod.job_ref,
package_name=item.pod.package_name,
job_cpu_request=item.pod.cpu_request,
job_memory_request=item.pod.memory_request,
# Node data
node_name=item.node.name,
node_uid=item.node.uid,
node_instance_type=item.node.instance_type,
node_capacity_type=item.node.capacity_type,
node_cpu=item.node.cpu,
node_mem=item.node.mem,
# Extra data
package_version=item.pod.package_version,
compiler_name=item.pod.compiler_name,
compiler_version=item.pod.compiler_version,
arch=item.pod.arch,
package_variants=item.pod.package_variants,
build_jobs=item.pod.build_jobs,
job_size=item.pod.job_size,
stack=item.pod.stack,
# By defninition this is true, since this script runs in the cluster
aws=True,
)
for item in new_jobs
]

click.echo(f"Processed job {job.job_id}")


@click.command()
def main():
start_time = datetime.now(timezone.utc)

# Setup event stream
watcher = kubernetes.watch.Watch()
events = watcher.stream(
client.list_namespaced_event,
namespace="pipeline",
field_selector="reason=Scheduled,involvedObject.kind=Pod",
)

click.echo("Listening for scheduled pipeline pods...")
click.echo(f"Start time is {start_time.isoformat()}")
click.echo("----------------------------------------")

# Get events yielded from generator
for event in events:
assert isinstance(event, dict)
handle_scheduled_pipeline_pod(event, start_time)


if __name__ == "__main__":
main()

0 comments on commit 219d737

Please sign in to comment.