Skip to content

Commit

Permalink
[Core] Reconstruct actor to run lineage reconstruction triggered acto…
Browse files Browse the repository at this point in the history
…r task (ray-project#47396)

Currently if we need to rerun an actor task to recover a lost object but the actor is dead, the actor task will fail immediately. This PR allows the actor to be restarted (if it doesn't violate max_restarts) so that the actor task can run to recover lost objects.

In terms of the state machine, we add a state transition from DEAD to RESTARTING.

Signed-off-by: Jiajun Yao <[email protected]>
Signed-off-by: ujjawal-khare <[email protected]>
  • Loading branch information
jjyao authored and ujjawal-khare committed Oct 15, 2024
1 parent cb7beb6 commit 0eb741a
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 12 deletions.
24 changes: 19 additions & 5 deletions src/ray/gcs/gcs_server/gcs_actor_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1107,10 +1107,21 @@ void GcsActorManager::DestroyActor(const ActorID &actor_id,
RAY_CHECK_OK(gcs_table_storage_->ActorTable().Put(
actor->GetActorID(),
*actor_table_data,
[this, actor, actor_id, actor_table_data](Status status) {
[this,
actor,
actor_id,
actor_table_data,
is_restartable,
done_callback = std::move(done_callback)](Status status) {
if (done_callback) {
done_callback();
}
RAY_CHECK_OK(gcs_publisher_->PublishActor(
actor_id, *GenActorDataOnlyWithStates(*actor_table_data), nullptr));
RAY_CHECK_OK(gcs_table_storage_->ActorTaskSpecTable().Delete(actor_id, nullptr));
if (!is_restartable) {
RAY_CHECK_OK(
gcs_table_storage_->ActorTaskSpecTable().Delete(actor_id, nullptr));
}
actor->WriteActorExportEvent();
// Destroy placement group owned by this actor.
destroy_owned_placement_group_if_needed_(actor_id);
Expand Down Expand Up @@ -1352,8 +1363,8 @@ void GcsActorManager::RestartActor(const ActorID &actor_id,
// could've been destroyed and dereigstered before restart.
auto iter = registered_actors_.find(actor_id);
if (iter == registered_actors_.end()) {
RAY_LOG(DEBUG).WithField(actor_id.JobId()).WithField(actor_id)
<< "Actor is destroyed before restart";
RAY_LOG(DEBUG) << "Actor is destroyed before restart, actor id = " << actor_id
<< ", job id = " << actor_id.JobId();
if (done_callback) {
done_callback();
}
Expand Down Expand Up @@ -1399,7 +1410,10 @@ void GcsActorManager::RestartActor(const ActorID &actor_id,
RAY_CHECK_OK(gcs_table_storage_->ActorTable().Put(
actor_id,
*mutable_actor_table_data,
[this, actor, actor_id, mutable_actor_table_data](Status status) {
[this, actor, actor_id, mutable_actor_table_data, done_callback](Status status) {
if (done_callback) {
done_callback();
}
RAY_CHECK_OK(gcs_publisher_->PublishActor(
actor_id, *GenActorDataOnlyWithStates(*mutable_actor_table_data), nullptr));
actor->WriteActorExportEvent();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface {
public:
MockWorkerClient(instrumented_io_context &io_service) : io_service_(io_service) {}

void WaitForActorOutOfScope(
const rpc::WaitForActorOutOfScopeRequest &request,
const rpc::ClientCallback<rpc::WaitForActorOutOfScopeReply> &callback) override {
void WaitForActorRefDeleted(
const rpc::WaitForActorRefDeletedRequest &request,
const rpc::ClientCallback<rpc::WaitForActorRefDeletedReply> &callback) override {
callbacks_.push_back(callback);
}

Expand All @@ -93,12 +93,12 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface {

// The created_actors_ of gcs actor manager will be modified in io_service thread.
// In order to avoid multithreading reading and writing created_actors_, we also
// send the `WaitForActorOutOfScope` callback operation to io_service thread.
// send the `WaitForActorRefDeleted` callback operation to io_service thread.
std::promise<bool> promise;
io_service_.post(
[this, status, &promise]() {
auto callback = callbacks_.front();
auto reply = rpc::WaitForActorOutOfScopeReply();
auto reply = rpc::WaitForActorRefDeletedReply();
callback(status, std::move(reply));
promise.set_value(false);
},
Expand All @@ -109,7 +109,7 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface {
return true;
}

std::list<rpc::ClientCallback<rpc::WaitForActorOutOfScopeReply>> callbacks_;
std::list<rpc::ClientCallback<rpc::WaitForActorRefDeletedReply>> callbacks_;
std::vector<ActorID> killed_actors_;
instrumented_io_context &io_service_;
};
Expand Down Expand Up @@ -308,7 +308,8 @@ TEST_F(GcsActorManagerTest, TestBasic) {
// Verify death cause for last actor DEAD event
ASSERT_EQ(
event_data["death_cause"]["actor_died_error_context"]["error_message"],
"The actor is dead because all references to the actor were removed.");
"The actor is dead because all references to the actor were removed "
"including lineage ref count.");
}
}
return;
Expand Down

0 comments on commit 0eb741a

Please sign in to comment.