diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index 1360ca9d87a2..06641d0d5777 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -447,6 +447,7 @@ py_test_module_list( py_test_module_list( files = [ "test_actor.py", + "test_actor_lineage_reconstruction.py", "test_actor_retry1.py", "test_actor_retry2.py", "test_actor_failures.py", diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.cc b/src/ray/gcs/gcs_server/gcs_actor_manager.cc index 90c9c090cba9..3b75e2dfd932 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.cc @@ -1107,10 +1107,21 @@ void GcsActorManager::DestroyActor(const ActorID &actor_id, RAY_CHECK_OK(gcs_table_storage_->ActorTable().Put( actor->GetActorID(), *actor_table_data, - [this, actor, actor_id, actor_table_data](Status status) { + [this, + actor, + actor_id, + actor_table_data, + is_restartable, + done_callback = std::move(done_callback)](Status status) { + if (done_callback) { + done_callback(); + } RAY_CHECK_OK(gcs_publisher_->PublishActor( actor_id, *GenActorDataOnlyWithStates(*actor_table_data), nullptr)); - RAY_CHECK_OK(gcs_table_storage_->ActorTaskSpecTable().Delete(actor_id, nullptr)); + if (!is_restartable) { + RAY_CHECK_OK( + gcs_table_storage_->ActorTaskSpecTable().Delete(actor_id, nullptr)); + } actor->WriteActorExportEvent(); // Destroy placement group owned by this actor. destroy_owned_placement_group_if_needed_(actor_id); @@ -1352,8 +1363,8 @@ void GcsActorManager::RestartActor(const ActorID &actor_id, // could've been destroyed and dereigstered before restart. auto iter = registered_actors_.find(actor_id); if (iter == registered_actors_.end()) { - RAY_LOG(DEBUG).WithField(actor_id.JobId()).WithField(actor_id) - << "Actor is destroyed before restart"; + RAY_LOG(DEBUG) << "Actor is destroyed before restart, actor id = " << actor_id + << ", job id = " << actor_id.JobId(); if (done_callback) { done_callback(); } @@ -1399,7 +1410,10 @@ void GcsActorManager::RestartActor(const ActorID &actor_id, RAY_CHECK_OK(gcs_table_storage_->ActorTable().Put( actor_id, *mutable_actor_table_data, - [this, actor, actor_id, mutable_actor_table_data](Status status) { + [this, actor, actor_id, mutable_actor_table_data, done_callback](Status status) { + if (done_callback) { + done_callback(); + } RAY_CHECK_OK(gcs_publisher_->PublishActor( actor_id, *GenActorDataOnlyWithStates(*mutable_actor_table_data), nullptr)); actor->WriteActorExportEvent(); diff --git a/src/ray/gcs/gcs_server/test/export_api/gcs_actor_manager_export_event_test.cc b/src/ray/gcs/gcs_server/test/export_api/gcs_actor_manager_export_event_test.cc index 0d2dc48cc763..2e91f34bcf73 100644 --- a/src/ray/gcs/gcs_server/test/export_api/gcs_actor_manager_export_event_test.cc +++ b/src/ray/gcs/gcs_server/test/export_api/gcs_actor_manager_export_event_test.cc @@ -75,9 +75,9 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface { public: MockWorkerClient(instrumented_io_context &io_service) : io_service_(io_service) {} - void WaitForActorOutOfScope( - const rpc::WaitForActorOutOfScopeRequest &request, - const rpc::ClientCallback &callback) override { + void WaitForActorRefDeleted( + const rpc::WaitForActorRefDeletedRequest &request, + const rpc::ClientCallback &callback) override { callbacks_.push_back(callback); } @@ -93,12 +93,12 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface { // The created_actors_ of gcs actor manager will be modified in io_service thread. // In order to avoid multithreading reading and writing created_actors_, we also - // send the `WaitForActorOutOfScope` callback operation to io_service thread. + // send the `WaitForActorRefDeleted` callback operation to io_service thread. std::promise promise; io_service_.post( [this, status, &promise]() { auto callback = callbacks_.front(); - auto reply = rpc::WaitForActorOutOfScopeReply(); + auto reply = rpc::WaitForActorRefDeletedReply(); callback(status, std::move(reply)); promise.set_value(false); }, @@ -109,7 +109,7 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface { return true; } - std::list> callbacks_; + std::list> callbacks_; std::vector killed_actors_; instrumented_io_context &io_service_; }; @@ -308,7 +308,8 @@ TEST_F(GcsActorManagerTest, TestBasic) { // Verify death cause for last actor DEAD event ASSERT_EQ( event_data["death_cause"]["actor_died_error_context"]["error_message"], - "The actor is dead because all references to the actor were removed."); + "The actor is dead because all references to the actor were removed " + "including lineage ref count."); } } return;