Skip to content

Commit

Permalink
Merge branch 'master' into security-policy
Browse files Browse the repository at this point in the history
  • Loading branch information
dejanzele authored Dec 3, 2024
2 parents 2314599 + 7a76f24 commit eb09bef
Show file tree
Hide file tree
Showing 77 changed files with 3,587 additions and 812 deletions.
4 changes: 2 additions & 2 deletions .run/Executor.run.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<env name="ARMADA_EXECUTORAPICONNECTION_FORCENOTLS" value="true" />
<env name="ARMADA_HTTPPORT" value="8085" />
<env name="HOME" value="$USER_HOME$/" />
<env name="KUBECONFIG" value="$PROJECT_DIR$/.kube/internal/config" />
<env name="KUBECONFIG" value="$PROJECT_DIR$/.kube/external/config" />
</envs>
<pass_parent_env value="false" />
<kind value="FILE" />
Expand All @@ -17,4 +17,4 @@
<filePath value="$PROJECT_DIR$/cmd/executor/main.go" />
<method v="2" />
</configuration>
</component>
</component>
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,4 @@ For more information about contributing to Armada see [CONTRIBUTING.md](https://
## Discussion

If you are interested in discussing Armada you can find us on [![slack](https://img.shields.io/badge/slack-armada-brightgreen.svg?logo=slack)](https://cloud-native.slack.com/?redir=%2Farchives%2FC03T9CBCEMC)

33 changes: 33 additions & 0 deletions client/python/armada_client/asyncio_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,39 @@ async def get_job_status(self, job_ids: List[str]) -> job_pb2.JobStatusResponse:
resp = await self.job_stub.GetJobStatus(req)
return resp

async def get_job_status_by_external_job_uri(
self, queue: str, job_set_id: str, external_job_uri: str
) -> job_pb2.JobDetailsResponse:
"""
Retrieves the status of a job based on externalJobUri annotation.
:param queue: The name of the queue
:param job_set_id: The name of the job set (a grouping of jobs)
:param external_job_uri: externalJobUri annotation value
:returns: The response from the server containing the job status.
:rtype: JobStatusResponse
"""
req = job_pb2.JobStatusUsingExternalJobUriRequest(
queue=queue, jobset=job_set_id, external_job_uri=external_job_uri
)
resp = await self.job_stub.GetJobStatusUsingExternalJobUri(req)
return resp

async def get_job_errors(self, job_ids: List[str]) -> job_pb2.JobErrorsResponse:
"""
Retrieves termination reason from query api.
:param job_ids: A list of unique job identifiers.
:type job_ids: List[str]
:returns: The response from the server containing the job errors.
:rtype: JobErrorsResponse
"""
req = job_pb2.JobErrorsRequest(job_ids=job_ids)
resp = await self.job_stub.GetJobErrors(req)
return resp

async def get_job_details(self, job_ids: List[str]) -> job_pb2.JobDetailsResponse:
"""
Asynchronously retrieves the details of a job from Armada.
Expand Down
32 changes: 32 additions & 0 deletions client/python/armada_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,38 @@ def get_job_status(self, job_ids: List[str]) -> job_pb2.JobStatusResponse:
req = job_pb2.JobStatusRequest(job_ids=job_ids)
return self.job_stub.GetJobStatus(req)

def get_job_status_by_external_job_uri(
self, queue: str, job_set_id: str, external_job_uri: str
) -> job_pb2.JobDetailsResponse:
"""
Retrieves the status of a job based on externalJobUri annotation.
:param queue: The name of the queue
:param job_set_id: The name of the job set (a grouping of jobs)
:param external_job_uri: externalJobUri annotation value
:returns: The response from the server containing the job status.
:rtype: JobStatusResponse
"""
req = job_pb2.JobStatusUsingExternalJobUriRequest(
queue=queue, jobset=job_set_id, external_job_uri=external_job_uri
)
return self.job_stub.GetJobStatusUsingExternalJobUri(req)

def get_job_errors(self, job_ids: List[str]) -> job_pb2.JobErrorsResponse:
"""
Retrieves termination reason from query api.
:param queue: The name of the queue
:param job_set_id: The name of the job set (a grouping of jobs)
:param external_job_uri: externalJobUri annotation value
:returns: The response from the server containing the job errors.
:rtype: JobErrorsResponse
"""
req = job_pb2.JobErrorsRequest(job_ids=job_ids)
return self.job_stub.GetJobErrors(req)

def get_job_details(self, job_ids: List[str]) -> job_pb2.JobDetailsResponse:
"""
Retrieves the details of a job from Armada.
Expand Down
2 changes: 1 addition & 1 deletion client/python/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "armada_client"
version = "0.4.6"
version = "0.4.8"
description = "Armada gRPC API python client"
readme = "README.md"
requires-python = ">=3.7"
Expand Down
1 change: 1 addition & 0 deletions config/scheduler/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ scheduling:
resolution: "1"
disableScheduling: false
enableAssertions: false
enablePreferLargeJobOrdering: false
protectedFractionOfFairShare: 1.0
nodeIdLabel: "kubernetes.io/hostname"
priorityClasses:
Expand Down
7 changes: 6 additions & 1 deletion docs/python_airflow_operator.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ This class provides integration with Airflow and Armada
## armada.operators.armada module


### _class_ armada.operators.armada.ArmadaOperator(name, channel_args, armada_queue, job_request, job_set_prefix='', lookout_url_template=None, poll_interval=30, container_logs=None, k8s_token_retriever=None, deferrable=False, job_acknowledgement_timeout=300, dry_run=False, \*\*kwargs)
### _class_ armada.operators.armada.ArmadaOperator(name, channel_args, armada_queue, job_request, job_set_prefix='', lookout_url_template=None, poll_interval=30, container_logs=None, k8s_token_retriever=None, deferrable=False, job_acknowledgement_timeout=300, dry_run=False, reattach_policy=None, \*\*kwargs)
Bases: `BaseOperator`, `LoggingMixin`

An Airflow operator that manages Job submission to Armada.
Expand Down Expand Up @@ -60,6 +60,9 @@ and handles job cancellation if the Airflow task is killed.
* **dry_run** (*bool*) –


* **reattach_policy** (*Optional**[**str**] **| **Callable**[**[**JobState**, **str**]**, **bool**]*) –



#### execute(context)
Submits the job to Armada and polls for completion.
Expand Down Expand Up @@ -167,6 +170,8 @@ acknowledged by Armada.
:type job_acknowledgement_timeout: int
:param dry_run: Run Operator in dry-run mode - render Armada request and terminate.
:type dry_run: bool
:param reattach_policy: Operator reattach policy to use (defaults to: never)
:type reattach_policy: Optional[str] | Callable[[JobState, str], bool]
:param kwargs: Additional keyword arguments to pass to the BaseOperator.


Expand Down
61 changes: 61 additions & 0 deletions docs/python_armada_client.md
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,38 @@ Retrieves the details of a job from Armada.



#### get_job_errors(job_ids)
Retrieves termination reason from query api.


* **Parameters**


* **queue** – The name of the queue


* **job_set_id** – The name of the job set (a grouping of jobs)


* **external_job_uri** – externalJobUri annotation value


* **job_ids** (*List**[**str**]*) –



* **Returns**

The response from the server containing the job errors.



* **Return type**

JobErrorsResponse



#### get_job_events_stream(queue, job_set_id, from_message_id=None)
Get event stream for a job set.

Expand Down Expand Up @@ -362,6 +394,35 @@ Retrieves the status of a list of jobs from Armada.



#### get_job_status_by_external_job_uri(queue, job_set_id, external_job_uri)
Retrieves the status of a job based on externalJobUri annotation.


* **Parameters**


* **queue** (*str*) – The name of the queue


* **job_set_id** (*str*) – The name of the job set (a grouping of jobs)


* **external_job_uri** (*str*) – externalJobUri annotation value



* **Returns**

The response from the server containing the job status.



* **Return type**

JobStatusResponse



#### get_queue(name)
Get the queue by name.

Expand Down
3 changes: 3 additions & 0 deletions e2e/setup/kind.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ apiVersion: kind.x-k8s.io/v1alpha4
name: armada-test
featureGates:
"KubeletInUserNamespace": true
networking:
apiServerAddress: 0.0.0.0
nodes:
- role: worker
image: kindest/node:v1.26.15
Expand All @@ -28,3 +30,4 @@ nodes:
- containerPort: 6443 # control plane
hostPort: 6443 # exposes control plane on localhost:6443
protocol: TCP

15 changes: 11 additions & 4 deletions internal/common/eventutil/eventutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,13 @@ func ApiJobFromLogSubmitJob(ownerId string, groups []string, queueName string, j
podSpecs = k8sPodSpecs
}

var priceInfo *api.ExperimentalPriceInfo
if e.ExperimentalPriceInfo != nil {
priceInfo = &api.ExperimentalPriceInfo{
BidPrice: e.ExperimentalPriceInfo.BidPrice,
}
}

return &api.Job{
Id: e.JobId,
ClientId: e.DeduplicationId,
Expand All @@ -170,10 +177,10 @@ func ApiJobFromLogSubmitJob(ownerId string, groups []string, queueName string, j
Labels: e.ObjectMeta.Labels,
Annotations: e.ObjectMeta.Annotations,

K8SIngress: k8sIngresses,
K8SService: k8sServices,

Priority: float64(e.Priority),
K8SIngress: k8sIngresses,
K8SService: k8sServices,
ExperimentalPriceInfo: priceInfo,
Priority: float64(e.Priority),

PodSpec: podSpec,
PodSpecs: podSpecs,
Expand Down
9 changes: 9 additions & 0 deletions internal/common/metrics/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ type QueueMetrics struct {
PriorityClass string
Resources ResourceMetrics
Durations *FloatMetrics
BidPrices *FloatMetrics
}

type QueueMetricsRecorder struct {
Pool string
PriorityClass string
resourceRecorder *ResourceMetricsRecorder
durationRecorder *FloatMetricsRecorder
bidPriceRecorder *FloatMetricsRecorder
}

type JobMetricsRecorder struct {
Expand All @@ -46,6 +48,11 @@ func (r *JobMetricsRecorder) RecordResources(pool string, priorityClass string,
recorder.resourceRecorder.Record(resources)
}

func (r *JobMetricsRecorder) RecordBidPrice(pool string, priorityClass string, price float64) {
recorder := r.getOrCreateRecorder(pool, priorityClass)
recorder.bidPriceRecorder.Record(price)
}

func (r *JobMetricsRecorder) Metrics() []*QueueMetrics {
result := make([]*QueueMetrics, 0, len(r.recordersByPoolAndPriorityClass))
for _, v := range r.recordersByPoolAndPriorityClass {
Expand All @@ -54,6 +61,7 @@ func (r *JobMetricsRecorder) Metrics() []*QueueMetrics {
PriorityClass: v.PriorityClass,
Resources: v.resourceRecorder.GetMetrics(),
Durations: v.durationRecorder.GetMetrics(),
BidPrices: v.bidPriceRecorder.GetMetrics(),
})
}
return result
Expand All @@ -68,6 +76,7 @@ func (r *JobMetricsRecorder) getOrCreateRecorder(pool string, pritorityClass str
PriorityClass: pritorityClass,
resourceRecorder: NewResourceMetricsRecorder(),
durationRecorder: NewDefaultJobDurationMetricsRecorder(),
bidPriceRecorder: NewFloatMetricsRecorder(),
}
r.recordersByPoolAndPriorityClass[recorderKey] = qmr
}
Expand Down
Loading

0 comments on commit eb09bef

Please sign in to comment.