Skip to content

Commit

Permalink
feat(sdk+api) Implement Semaphores and Mutexes Pipeline Configuration
Browse files Browse the repository at this point in the history
Signed-off-by: ddalvi <[email protected]>
  • Loading branch information
DharmitD committed Nov 13, 2024
1 parent cac3739 commit 8a7b0e5
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 14 deletions.
36 changes: 30 additions & 6 deletions api/v2alpha1/go/pipelinespec/pipeline_spec.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion api/v2alpha1/pipeline_spec.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1103,5 +1103,9 @@ message PlatformDeploymentConfig {

// Spec for pipeline-level config options. See PipelineConfig DSL class.
message PipelineConfig {
// TODO add pipeline-level configs
// Name of the semaphore key to control pipeline concurrency
string semaphore_key = 1;

// Name of the mutex to ensure mutual exclusion
string mutex_name = 2;
}
12 changes: 7 additions & 5 deletions sdk/python/kfp/compiler/pipeline_spec_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -2011,11 +2011,13 @@ def write_pipeline_spec_to_file(

def _merge_pipeline_config(pipelineConfig: pipeline_config.PipelineConfig,
platformSpec: pipeline_spec_pb2.PlatformSpec):
# TODO: add pipeline config options (ttl, semaphore, etc.) to the dict
# json_format.ParseDict(
# {'pipelineConfig': {
# '<some pipeline config option>': pipelineConfig.<get that value>,
# }}, platformSpec.platforms['kubernetes'])
json_format.ParseDict(
{
'pipelineConfig': {
'semaphoreName': pipelineConfig.semaphore_key,
'mutexName': pipelineConfig.mutex_name,
}
}, platformSpec.platforms['kubernetes'])

return platformSpec

Expand Down
19 changes: 17 additions & 2 deletions sdk/python/kfp/dsl/pipeline_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,21 @@ class PipelineConfig:
"""PipelineConfig contains pipeline-level config options."""

def __init__(self):
pass
self.semaphore_key = None
self.mutex_name = None

# TODO add pipeline level configs
def set_semaphore_key(self, semaphore_key: str):
"""Set the name of the semaphore to control pipeline concurrency.
Args:
semaphore_key (str): Name of the semaphore.
"""
self.semaphore_key = semaphore_key.strip()

def set_mutex_name(self, mutex_name: str):
"""Set the name of the mutex to ensure mutual exclusion.
Args:
mutex_name (str): Name of the mutex.
"""
self.mutex_name = mutex_name.strip()

0 comments on commit 8a7b0e5

Please sign in to comment.