Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Enable task to returne object as if it's returned by its parent #26774

Closed
wants to merge 6 commits into from

Conversation

kira-lin
Copy link
Contributor

Why are these changes needed?

It is crucial for raydp to implement fault tolerance.

It solves the following issue: if an object returned by an actor needs to be recovered, while the actor was dead, and cannot be restarted. (In our case, actor would be spark executors, and it cannot be registered if it restarts). By adding a task to call the actor's task, and forwarding the lineage to that task, the task is able to be resubmitted and choose an actor to run the task again.

Related issue number

Checks

  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@kira-lin
Copy link
Contributor Author

kira-lin commented Jul 20, 2022

Hi @stephanie-wang ,
I realized that it's possible to return spark rdd data from executor actors through ray calls, because RDD and Partition is serializable. I have implemented this, but in order for it to be fault tolerant, I need to implement this feature, which is very similar to what you've suggested me before.

I ended up with this prototype. With this, I can forward the returned result successfully, and that object is accessible later in python if run on a single-node cluster. But when I run it in a cluster, the object cannot be successfully retrieved in python. It will hang indefinitely.

I'm not sure which part I missed. Any help would be appreciated!

@jjyao
Copy link
Collaborator

jjyao commented Aug 1, 2022

Do you have a test case showing case what you are doing here?

@kira-lin
Copy link
Contributor Author

kira-lin commented Aug 2, 2022

hi @jjyao ,
I have some problem to make this work in a cluster now, so I have not added the tests yet.
But the effect we want is something like this:

@ray.remote
class a:
   def b:
       return 1

@ray.remote
def c():
    h = ray.get_actor("actor_a")
    return h.b.options(forward_to_parent = 1).remote()

if __name__ == "__main__":
    h = a.options(name="actor_a").remote()
    ref = ray.get(c.remote())

Normally we can call the b method directly. In our case, we want the value returned by b to be fault tolerant. However, actor of class a cannot be restarted. (If spark loses an executor, it'll request a new one.) In this case, b cannot be resubmitted.
Also it's important to notice spark executors are interchangeable, we can resubmit the task to another spark executor.
Therefore, we want to introduce a normal task c, which finds a spark executor to submit the task, and can be resubmitted when the returned value is lost.

This PR is to implement assigning the ownership of a.b's returned value to c's caller, driver in this case, and use c's task id to generate the object id, in order to resubmit c when lost.

@kfstorm
Copy link
Member

kfstorm commented Aug 9, 2022

@kira-lin Is it possible to solve it with ray-project/enhancements#10?

@kira-lin
Copy link
Contributor Author

It seems to be able to solve our problem. But it'll need an external HA storage, if I understand correctly?

How is it different than saving our spark results to HDFS and use ray to read it?

@kfstorm
Copy link
Member

kfstorm commented Aug 10, 2022

It seems to be able to solve our problem. But it'll need an external HA storage, if I understand correctly?

How is it different than saving our spark results to HDFS and use ray to read it?

Yes, it needs external HA storage.

I don't think there's much difference if you only consider the results and availability. However, Ray makes it transparent to Ray users. You don't need to design and maintain key / object ref / HDFS path mappings.

@kira-lin
Copy link
Contributor Author

I see. This PR aims to apply ray and spark's lineage-based recovery to handle object loss. I think there are no conflicts between this PR and the Object HA PR. If our users are OK with checkpointing all the training data, then go with HA is a very good choice, while some users may want better performance and choose lineage-based recovery.

@scv119
Copy link
Contributor

scv119 commented Aug 15, 2022

@kira-lin do I understand correctly that this only applies to stateless actors? Also it will be nice to add unit test/integration test to show the problem it solves.

@scv119 scv119 added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Aug 15, 2022
@kira-lin
Copy link
Contributor Author

@scv119 Yes, as long as the actos's task can be executed by another actor and yield the same output.

I'll add a unit test soon.

@jjyao
Copy link
Collaborator

jjyao commented Aug 16, 2022

Hi @kira-lin,

Sorry for the late reply and thanks for your response. Now I understand your use case and motivations. I think my current concern is how generic the solution is. Seems you would only need it when the actor is not restartable and the task can be run by another equivalent actor that generates the same result. Not sure besides your use cases, who else will need it.

I may not know raydp very well but is it possible to make the stateless actor restartable (e.g. creating a new spark executor)?

Copy link
Collaborator

@jjyao jjyao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel it might worth a REP (https://github.com/ray-project/enhancements) to flush out the problem statement, proposal, semantics of the proposed APIs, design, etc. WDYT? @stephanie-wang @scv119

if (parent_num_returns < 0) {
return ObjectID::FromIndex(TaskId(), return_index + 1);
} else {
return ObjectID::FromIndex(ParentTaskId(), parent_num_returns + return_index + 1);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means we can only have one forward_to_parent task call inside one task right?

@jjyao
Copy link
Collaborator

jjyao commented Aug 16, 2022

Also does this api align with the ownership transfer thing we might want to do in the future? @stephanie-wang

@kira-lin
Copy link
Contributor Author

@jjyao, as you said, this pr is not that generic. I'm trying to see if the executor can be restarted in raydp now. I was thinking this should be simpler at that time, but it turns out to be a large PR.

@stephanie-wang stephanie-wang self-assigned this Aug 18, 2022
@kira-lin kira-lin closed this Aug 19, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
@author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants