Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sdk] Unable to aggregate results over ParallelFor in Kubeflow V2 using V1 workarounds such as .after() #10050

Open
TristanGreathouse opened this issue Oct 2, 2023 · 16 comments

Comments

@TristanGreathouse
Copy link

TristanGreathouse commented Oct 2, 2023

Environment

  • KFP version: V2 backend. 2.0.1.
  • KFP SDK version: 2.3.0 (failing), 1.8.21 (working)
  • All dependencies version:
kfp                        2.3.0     
kfp-pipeline-spec          0.2.2     
kfp-server-api             2.0.1 

Steps to reproduce

The new Kubeflow V2 backend does not yet support the use of dsl.Collected(), which is a functionality I was really looking forward to. I have a process where I run several components within a dsl.ParallelFor and then want to aggregate the results of all of these components after the dsl.ParallelFor finishes.

In V1 using SDK version kfp==1.8.21, I was able to work around the lack of any released fan-in mechanism by using the .after() method. A dummy snippet of code would be as follows, where comp_in_parfor executes some process and saves the results to S3, and then collector collects the results after the comp_in_parfor has finished by reading in the results from S3.

with dsl.ParallelFor(parfor_list) as args:
    # saves out some artifacts to S3 to make DIY fan-in logic
    comp_in_parfor = comp_template(a=args.a, b=args.b)
    
# reads in those same artifacts from S3
 collector = collector_template(...).after(comp_in_parfor)

However, if I try to do the same thing using kfp==2.3.0 for the SDK, I get the following error, which means I cannot use the .after() method in V2. However, because the new dsl.Collected method also does not work in the V2 open-source backend (only Vertex AI from what I can tell), there is no way to fan-in from a ParallelFor, either DIY or properly.

kfp.compiler.compiler_utils.InvalidTopologyException: Illegal task dependency across DSL context managers. A downstream task
cannot depend on an upstream task within a dsl.ParallelFor context unless the downstream is within that context too or the
outputs are begin fanned-in to a list using dsl.Collected. Found task square-root which depends on upstream task square-and
sum within an uncommon dsl.ParallelFor context.

This error can easily be reproduced with this example:

import os
import kfp
from kfp import dsl
from typing import List


@dsl.component
def add(x: float, y: float) -> float:
    return x + y

@dsl.component
def square_root(x: float) -> float:
    return x ** .5

@dsl.pipeline
def dummy(parfor_list: List[float] = [1.2, 2.4, 3.6], b: float = 1.2) -> float:
    with dsl.ParallelFor(parfor_list) as args:
        sq_and_sum_task = add(x=args, y=args)
    square_root_comp = square_root(x=b).after(sq_and_sum_task)
    return square_root_comp.output

Expected result

My expected result can be one of two things:

  1. dsl.Collected() is properly implemented in V2 KFP backend, rendering the need for .after() moot.
  2. .after() works, so that users can properly lay out the sequential structure of their DAG and workaround the lack of fan-in logic present in kubeflow.

Am I missing a workaround that allows users to fan-in from a ParallelFor in the V2 SDK? If not, is there any way that .after() can be restored until dsl.Collected() is implemented properly in the backend? Until then, everyone blocked by this issue will be totally unable to use the V2 Kubeflow with their own DIY fan-in logic. It's a major bummer because V2 has some great functionalities. Namely I'm quite excited to have sub-dags to logically break up my repeated ParallelFor units and reduce UI strain due to overly large DAG structures.

Materials and Reference

Impacted by this bug? Give it a 👍.

@zijianjoy
Copy link
Collaborator

/assign @connor-mccarthy

@zijianjoy
Copy link
Collaborator

/assign @connor-mccarthy

Copy link

github-actions bot commented Jan 4, 2024

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the lifecycle/stale The issue / pull request is stale, any activities remove this label. label Jan 4, 2024
@connor-mccarthy
Copy link
Member

This is fixed by #10257 and will be released in kfp==2.5.0.

@mitchell-lawson
Copy link

This issue is present in the newest release kfp==2.6.0 but not kfp==2.5.0

@espoirMur
Copy link

I can confirm the presence of the issue on 2.6.0

@papagala
Copy link
Contributor

papagala commented Aug 1, 2024

We are also impacted by this. We see this as a critical blocker for us since we have more and more tasks that we want to run in parallel. Specially dealing with gigantic images in the Gb size per image, so we cannot do this not in parallel. However, we must aggregate the results after we are done.

@gregsheremeta
Copy link
Contributor

so it was fixed in 2.5.0 but regressed in 2.6.0?

are we missing a test?

@papagala
Copy link
Contributor

I just tested this on Kubeflow 1.8.1 and using KFP SDK 2.5.0 and 2.8.0, but none work. Here's the sample dummy pipeline

from kfp import compiler, dsl
from typing import List


@dsl.component
def print_op(message: str):
    print(s)


@dsl.pipeline()
def my_pipeline():
    with dsl.ParallelFor([1, 2, 3]):
        one = print_op(message='foo')
    two = print_op(message='bar').after(one)

if __name__ == "__main__":
    compiler.Compiler().compile(my_pipeline, __file__ + ".yaml")

there are no compilation errors but when I try to run the pipeline I get this error

{"error":"Failed to create a new run: InternalServerError: Failed to validate workflow for (): templates.entrypoint.tasks.root templates.root sorting failed: invalid dependency for-loop-2","code":13,"message":"Failed to create a new run: InternalServerError: Failed to validate workflow for (): templates.entrypoint.tasks.root templates.root sorting failed: invalid dependency for-loop-2","details":[{"@type":"type.googleapis.com/google.rpc.Status","code":13,"message":"Internal Server Error"}]}

My guess is that something changed in the KFP backend, not on KFP SDK that broke this.

@papagala
Copy link
Contributor

@connor-mccarthy could we reopen this issue based on the comments above?

@gregsheremeta
Copy link
Contributor

/reopen

Copy link

@gregsheremeta: Reopened this issue.

In response to this:

/reopen

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.

@google-oss-prow google-oss-prow bot reopened this Aug 23, 2024
@github-actions github-actions bot removed the lifecycle/stale The issue / pull request is stale, any activities remove this label. label Aug 24, 2024
@github-project-automation github-project-automation bot moved this to Needs triage in KFP Runtime Triage Aug 29, 2024
Copy link

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the lifecycle/stale The issue / pull request is stale, any activities remove this label. label Oct 23, 2024
@gregsheremeta
Copy link
Contributor

/lifecycle frozen

@google-oss-prow google-oss-prow bot added lifecycle/frozen and removed lifecycle/stale The issue / pull request is stale, any activities remove this label. labels Oct 23, 2024
@asaff1
Copy link

asaff1 commented Oct 27, 2024

I use kfp==2.9.0
This is a major issue for me in V2. I have a V1 pipeline that used .after() to collect ParallelFor results. Any workaround for this? It is not working now and I don't see an alternative...
I really don't understand the V2 development priority. I'm upgrading kubeflow 1.x installation. I'd expect that the very first thing for a V2 release is to support the V1 capabilities. Really frustrating...

@papagala
Copy link
Contributor

papagala commented Nov 4, 2024

We have many new ML use cases that we are onboarding and we are forced to do so in V1 pipeline because of this bug fix. Thanks everybody for the hard work 👍, I hope we can get some permanent solution for this soon

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: Needs triage
Development

No branches or pull requests

8 participants