Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sdk): Use google.protobuf.Value in v2 for passing parameters. #6804

Merged
merged 26 commits into from
Oct 28, 2021
Merged
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
4966b0b
Use google.protobuf.Value in v2 for passing parameters.
neuromage Oct 26, 2021
8f14fb1
retest samples.
neuromage Oct 26, 2021
728f922
Fix tests.
neuromage Oct 27, 2021
5b13589
Merge branch 'master' into protobuf-value.
neuromage Oct 27, 2021
cf3407d
Update release, more cleanup.
neuromage Oct 27, 2021
7e6d18f
Use github.com/kubeflow/pipelines/api from same repo.
neuromage Oct 27, 2021
bde385f
Run go mod tidy
neuromage Oct 27, 2021
ac5f6e3
chore: go mod tidy
Bobgy Oct 27, 2021
fd8ee77
fix v2 compile error and clean up unused code
Bobgy Oct 27, 2021
216944a
pr comments.
neuromage Oct 27, 2021
87319b8
Merge branch 'protobuf-value' of github.com:neuromage/pipelines into …
neuromage Oct 27, 2021
107a492
Merge branch 'master' into protobuf-value
neuromage Oct 27, 2021
df770b8
Merge branch 'master' into protobuf-value
neuromage Oct 27, 2021
9b5e012
update goldens
neuromage Oct 27, 2021
c8d49a2
Fix metadata recording.
neuromage Oct 27, 2021
de1f943
Update kfp mlmd client.
neuromage Oct 27, 2021
bfc380b
fix test again
neuromage Oct 27, 2021
ad3e7f9
another try.
neuromage Oct 27, 2021
917d979
chore: migrate v2 DAG driver input parameters to protobuf.Value + sma…
Bobgy Oct 28, 2021
b0718ec
fix v2 launcher + clean up
Bobgy Oct 28, 2021
1df554a
fix a compile error
Bobgy Oct 28, 2021
24360f2
fix a few more tests
Bobgy Oct 28, 2021
7b9a681
fix number parsing
Bobgy Oct 28, 2021
547747e
clean up
Bobgy Oct 28, 2021
f2426b0
disable cache_v2 test.
neuromage Oct 28, 2021
35f8448
Merge branch 'protobuf-value' of github.com:neuromage/pipelines into …
neuromage Oct 28, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
retest samples.
neuromage committed Oct 26, 2021
commit 8f14fb15dc9ef3be7f37e7bed3c0c2a84722d6a7
27 changes: 21 additions & 6 deletions sdk/python/kfp/compiler/v2_compat.py
Original file line number Diff line number Diff line change
@@ -141,12 +141,27 @@ def update_op(op: dsl.ContainerOp,
component_spec = op.component_spec
for parameter, spec in sorted(
component_spec.input_definitions.parameters.items()):
parameter_info = {
"type":
pipeline_spec_pb2.ParameterType.ParameterTypeEnum.Name(
spec.parameter_type),
}
op.command += [f"{parameter}={op._parameter_arguments[parameter]}"]
parameter_type = pipeline_spec_pb2.ParameterType.ParameterTypeEnum.Name(
spec.parameter_type)
parameter_info = {"type": parameter_type}

parameter_value = op._parameter_arguments[parameter]
# print('PAR VAL: ', parameter_value)
# if parameter_type in ['STRUCT', 'LIST']:
# parameter_value = json.dumps(op._parameter_arguments[parameter])
# print('NEW PAR VAL: ', parameter_value)
# if isinstance(default_value_or_pipeline_param, (dict, list, bool)):
# default_value_or_pipeline_param = json.dumps(
# default_value_or_pipeline_param)
# elif isinstance(default_value_or_pipeline_param, (int, float)):
# default_value_or_pipeline_param = str(
# default_value_or_pipeline_param)
# else:
# print('{} is instance of {}'.format(
# default_value_or_pipeline_param,
# type(default_value_or_pipeline_param)))
op.command += [f"{parameter}={parameter_value}"]

runtime_info["inputParameters"][parameter] = parameter_info
op.command += ["--"]

17 changes: 14 additions & 3 deletions sdk/python/kfp/dsl/_component_bridge.py
Original file line number Diff line number Diff line change
@@ -272,13 +272,24 @@ def _create_container_op_from_component_and_arguments(
name_to_spec_type = {}
if component_meta.inputs:
name_to_spec_type = {
input.name: input.type for input in component_meta.inputs
input.name: {
'type': input.type,
'default': input.default,
} for input in component_meta.inputs
}

if kfp.COMPILING_FOR_V2:
for name, spec_type in name_to_spec_type.items():
if (name in original_arguments and
type_utils.is_parameter_type(spec_type)):
task._parameter_arguments[name] = str(original_arguments[name])
type_utils.is_parameter_type(spec_type['type'])):
print('ARG: ', name_to_spec_type[name])
print('TYPEOF: ', type(original_arguments[name]))
if isinstance(original_arguments[name], (list, dict)):
task._parameter_arguments[name] = json.dumps(
original_arguments[name])
else:
task._parameter_arguments[name] = str(
original_arguments[name])

for name in list(task.artifact_arguments.keys()):
if name in task._parameter_arguments:
8 changes: 6 additions & 2 deletions sdk/python/kfp/v2/components/executor.py
Original file line number Diff line number Diff line change
@@ -141,8 +141,8 @@ def _write_output_parameter_value(self, name: str,
'Unable to serialize unknown type `{}` for parameter'
' input with value `{}`'.format(value, type(value)))

if not self._executor_output.get('parameters'):
self._executor_output['parameters'] = {}
if not self._executor_output.get('parameterValues'):
self._executor_output['parameterValues'] = {}

self._executor_output['parameterValues'][name] = output

@@ -303,3 +303,7 @@ def execute(self):

result = self._func(**func_kwargs)
self._write_executor_output(result)
# import time
# print('DONE EXECUTION. Sleeeping.....')
# time.sleep(10 * 60)
# print('DONE Sleeping!')
1 change: 1 addition & 0 deletions v2/component/launcher.go
Original file line number Diff line number Diff line change
@@ -515,6 +515,7 @@ func (l *Launcher) dumpOutputParameters(executorOutput *pipelinespec.ExecutorOut
wrap := func(err error) error {
return fmt.Errorf("failed to dump output parameter %q in executor output to disk: %w", name, err)
}

var value string
switch t := parameter.Kind.(type) {
case *structpb.Value_StringValue:
2 changes: 2 additions & 0 deletions v2/go.mod
Original file line number Diff line number Diff line change
@@ -22,3 +22,5 @@ require (
k8s.io/apimachinery v0.21.2
k8s.io/client-go v0.20.4
)

replace github.com/kubeflow/pipelines/api v0.0.0-20211020193552-20f28631517d => /build/api