Skip to content

Commit

Permalink
feat(sdk): add pipeline job placholders for scheduled runs (kubeflow#…
Browse files Browse the repository at this point in the history
…8447)

* add pipeline job placeholders

* simplify example code in docs

* add _PLACEHOLDER suffix to variable names
  • Loading branch information
connor-mccarthy authored and gkcalat committed Nov 23, 2022
1 parent 3f99b13 commit 7981629
Showing 1 changed file with 34 additions and 7 deletions.
41 changes: 34 additions & 7 deletions sdk/python/kfp/dsl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@
Example:
::
@dsl.pipeline(name='my-pipeline')
@dsl.pipeline
def my_pipeline():
print_op(
msg='Job name:',
Expand All @@ -100,7 +100,7 @@ def my_pipeline():
Example:
::
@dsl.pipeline(name='my-pipeline')
@dsl.pipeline
def my_pipeline():
print_op(
msg='Job resource name:',
Expand All @@ -114,7 +114,7 @@ def my_pipeline():
Example:
::
@dsl.pipeline(name='my-pipeline')
@dsl.pipeline
def my_pipeline():
print_op(
msg='Job ID:',
Expand All @@ -128,7 +128,7 @@ def my_pipeline():
Example:
::
@dsl.pipeline(name='my-pipeline')
@dsl.pipeline
def my_pipeline():
print_op(
msg='Task name:',
Expand All @@ -142,13 +142,40 @@ def my_pipeline():
Example:
::
@dsl.pipeline(name='my-pipeline')
@dsl.pipeline
def my_pipeline():
print_op(
msg='Task ID:',
value=dsl.PIPELINE_TASK_ID_PLACEHOLDER,
)
"""
PIPELINE_JOB_CREATE_TIME_UTC_PLACEHOLDER = '{{$.pipeline_job_create_time_utc}}'
"""A placeholder used to obtain the time that a pipeline job was created.
Example:
::
@dsl.pipeline
def my_pipeline():
print_op(
msg='Job created at:',
value=dsl.PIPELINE_JOB_CREATE_TIME_UTC,
)
"""
PIPELINE_JOB_SCHEDULE_TIME_UTC_PLACEHOLDER = '{{$.pipeline_job_schedule_time_utc}}'
"""A placeholder used to obtain the time for which a pipeline job is scheduled.
Example:
::
@dsl.pipeline
def my_pipeline():
print_op(
msg='Job scheduled at:',
value=dsl.PIPELINE_JOB_SCHEDULE_TIME_UTC,
)
"""

T = TypeVar('T')
Input = Annotated[T, InputAnnotation]
"""Type generic used to represent an input artifact of type ``T``, where ``T`` is an artifact class.
Expand All @@ -170,7 +197,7 @@ def artifact_producer(model: Output[Artifact]):
def artifact_consumer(model: Input[Artifact]):
print(model)
@dsl.pipeline(name='my-pipeline')
@dsl.pipeline
def my_pipeline():
producer_task = artifact_producer()
artifact_consumer(model=producer_task.output)
Expand All @@ -196,7 +223,7 @@ def artifact_producer(model: Output[Artifact]):
def artifact_consumer(model: Input[Artifact]):
print(model)
@dsl.pipeline(name='my-pipeline')
@dsl.pipeline
def my_pipeline():
producer_task = artifact_producer()
artifact_consumer(model=producer_task.output)
Expand Down

0 comments on commit 7981629

Please sign in to comment.