Skip to content

Commit

Permalink
[Core] Fix the accidental protobuf backward incompatible changes (ray…
Browse files Browse the repository at this point in the history
…-project#37650) (ray-project#37826)

Move the JobTableData to "backward compatibility required" section since it is. However, we should generally be careful when we modify the protobuf to keep the backward compatibility.
https://github.com/ray-project/ray/pull/35188/files#diff-66e7c4b1cbb0a6ce505e17964332b0e4d26631f291bcbabd221d13fa09707d90 -> Fix the protobuf compatibility by
keep the driver_ip_address and add a new field driver_address at the end of the protobuf
Append fields instead of adding in the middle. To do this, we move the field is_running_tasks to the end.
We also mark the old address field as deprecated in a comment.

The resulting protobuf is backwards compatible with Ray 2.5.1 and any prior version, and breaks compatibility with Ray 2.6.0 and Ray 2.6.1. It's not possible to be backwards compatible with both simultaneously.

---------

Signed-off-by: SangBin Cho <[email protected]>
Signed-off-by: Archit Kulkarni <[email protected]>
Co-authored-by: SangBin Cho <[email protected]>
  • Loading branch information
architkulkarni and rkooo567 authored Jul 26, 2023
1 parent 0c7d455 commit b03a76b
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 25 deletions.
2 changes: 2 additions & 0 deletions src/ray/gcs/gcs_server/test/gcs_job_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,7 @@ TEST_F(GcsJobManagerTest, TestPreserveDriverInfo) {
address.set_port(8264);
address.set_raylet_id(NodeID::FromRandom().Binary());
address.set_worker_id(WorkerID::FromRandom().Binary());
add_job_request->mutable_data()->set_driver_ip_address("10.0.0.1");
add_job_request->mutable_data()->mutable_driver_address()->CopyFrom(address);

add_job_request->mutable_data()->set_driver_pid(8264);
Expand Down Expand Up @@ -548,6 +549,7 @@ TEST_F(GcsJobManagerTest, TestPreserveDriverInfo) {
ASSERT_EQ(all_job_info_reply.job_info_list().size(), 1);
rpc::JobTableData data = all_job_info_reply.job_info_list().Get(0);
ASSERT_EQ(data.driver_address().ip_address(), "10.0.0.1");
ASSERT_EQ(data.driver_ip_address(), "10.0.0.1");
ASSERT_EQ(data.driver_pid(), 8264);
}

Expand Down
1 change: 1 addition & 0 deletions src/ray/gcs/pb_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ inline std::shared_ptr<ray::rpc::JobTableData> CreateJobTableData(
job_info_ptr->set_job_id(job_id.Binary());
job_info_ptr->set_is_dead(is_dead);
*job_info_ptr->mutable_driver_address() = driver_address;
job_info_ptr->set_driver_ip_address(driver_address.ip_address());
job_info_ptr->set_driver_pid(driver_pid);
job_info_ptr->set_entrypoint(entrypoint);
*job_info_ptr->mutable_config() = job_config;
Expand Down
1 change: 1 addition & 0 deletions src/ray/gcs/test/gcs_test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ struct Mocker {
job_table_data->set_job_id(job_id.Binary());
job_table_data->set_is_dead(false);
job_table_data->set_timestamp(current_sys_time_ms());
job_table_data->set_driver_ip_address("127.0.0.1");
rpc::Address address;
address.set_ip_address("127.0.0.1");
address.set_port(1234);
Expand Down
53 changes: 28 additions & 25 deletions src/ray/protobuf/gcs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -377,31 +377,6 @@ message JobsAPIInfo {
optional string driver_node_id = 13;
}

message JobTableData {
// The job ID.
bytes job_id = 1;
// Whether it's dead.
bool is_dead = 2;
// The UNIX timestamp corresponding to this event (job added or removed).
int64 timestamp = 3;
// Address of the driver that started this job.
Address driver_address = 4;
// Process ID of the driver running this job.
int64 driver_pid = 5;
// The config of this job.
JobConfig config = 6;
// The timestamp the job was started at.
uint64 start_time = 7;
// The timestamp the job was ended at.
uint64 end_time = 8;
// The entrypoint of the job.
string entrypoint = 9;
// Whether this job has running tasks.
bool is_running_tasks = 10;
// The optional JobInfo from the Ray Job API.
optional JobsAPIInfo job_info = 11;
}

message WorkerTableData {
// Is this worker alive.
bool is_alive = 1;
Expand Down Expand Up @@ -688,4 +663,32 @@ message PlacementGroupTableData {
// effectively when the placement group has been "scheduled".
int64 placement_group_final_bundle_placement_timestamp_ms = 15;
}

message JobTableData {
// The job ID.
bytes job_id = 1;
// Whether it's dead.
bool is_dead = 2;
// The UNIX timestamp corresponding to this event (job added or removed).
int64 timestamp = 3;
// IP address of the driver that started this job.
// Deprecated
string driver_ip_address = 4;
// Process ID of the driver running this job.
int64 driver_pid = 5;
// The config of this job.
JobConfig config = 6;
// The timestamp the job was started at.
uint64 start_time = 7;
// The timestamp the job was ended at.
uint64 end_time = 8;
// The entrypoint of the job.
string entrypoint = 9;
// The optional JobInfo from the Ray Job API.
optional JobsAPIInfo job_info = 10;
// Whether this job has running tasks.
bool is_running_tasks = 11;
// Address of the driver that started this job.
Address driver_address = 12;
}
///////////////////////////////////////////////////////////////////////////////

0 comments on commit b03a76b

Please sign in to comment.