Skip to content

Commit

Permalink
Introduce push-based task scheduling for Ballista (#1560)
Browse files Browse the repository at this point in the history
* Remove call_ip in the SchedulerServer

* Introduce push-based task scheduling

Co-authored-by: yangzhong <[email protected]>
  • Loading branch information
yahoNanJing and kyotoYaho authored Jan 23, 2022
1 parent 9c5ccae commit 71757bb
Show file tree
Hide file tree
Showing 17 changed files with 1,245 additions and 103 deletions.
2 changes: 2 additions & 0 deletions ballista/rust/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ tokio = "1.0"
tonic = "0.6"
uuid = { version = "0.8", features = ["v4"] }
chrono = { version = "0.4", default-features = false }
clap = "2"
parse_arg = "0.1.3"

arrow-flight = { version = "7.0.0" }
datafusion = { path = "../../../datafusion", version = "6.0.0" }
Expand Down
94 changes: 94 additions & 0 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,7 @@ message ExecutorMetadata {
string id = 1;
string host = 2;
uint32 port = 3;
uint32 grpc_port = 4;
}

message ExecutorRegistration {
Expand All @@ -848,12 +849,46 @@ message ExecutorRegistration {
string host = 2;
}
uint32 port = 3;
uint32 grpc_port = 4;
}

message ExecutorHeartbeat {
ExecutorMetadata meta = 1;
// Unix epoch-based timestamp in seconds
uint64 timestamp = 2;
ExecutorState state = 3;
}

message ExecutorState {
repeated ExecutorMetric metrics = 1;
}

message ExecutorMetric {
// TODO add more metrics
oneof metric {
uint64 available_memory = 1;
}
}

message ExecutorSpecification {
repeated ExecutorResource resources = 1;
}

message ExecutorResource {
// TODO add more resources
oneof resource {
uint32 task_slots = 1;
}
}

message ExecutorData {
string executor_id = 1;
repeated ExecutorResourcePair resources = 2;
}

message ExecutorResourcePair {
ExecutorResource total = 1;
ExecutorResource available = 2;
}

message RunningTask {
Expand Down Expand Up @@ -906,6 +941,41 @@ message PollWorkResult {
TaskDefinition task = 1;
}

message RegisterExecutorParams {
ExecutorRegistration metadata = 1;
ExecutorSpecification specification = 2;
}

message RegisterExecutorResult {
bool success = 1;
}

message SendHeartBeatParams {
ExecutorRegistration metadata = 1;
ExecutorState state = 2;
}

message SendHeartBeatResult {
// TODO it's from Spark for BlockManager
bool reregister = 1;
}

message StopExecutorParams {
}

message StopExecutorResult {
}

message UpdateTaskStatusParams {
ExecutorRegistration metadata = 1;
// All tasks must be reported until they reach the failed or completed state
repeated TaskStatus task_status = 2;
}

message UpdateTaskStatusResult {
bool success = 1;
}

message ExecuteQueryParams {
oneof query {
LogicalPlanNode logical_plan = 1;
Expand Down Expand Up @@ -965,17 +1035,41 @@ message FilePartitionMetadata {
repeated string filename = 1;
}

message LaunchTaskParams {
// Allow to launch a task set to an executor at once
repeated TaskDefinition task = 1;
}

message LaunchTaskResult {
bool success = 1;
// TODO when part of the task set are scheduled successfully
}

service SchedulerGrpc {
// Executors must poll the scheduler for heartbeat and to receive tasks
rpc PollWork (PollWorkParams) returns (PollWorkResult) {}

rpc RegisterExecutor(RegisterExecutorParams) returns (RegisterExecutorResult) {}

// Push-based task scheduler will only leverage this interface
// rather than the PollWork interface to report executor states
rpc SendHeartBeat (SendHeartBeatParams) returns (SendHeartBeatResult) {}

rpc UpdateTaskStatus (UpdateTaskStatusParams) returns (UpdateTaskStatusResult) {}

rpc GetFileMetadata (GetFileMetadataParams) returns (GetFileMetadataResult) {}

rpc ExecuteQuery (ExecuteQueryParams) returns (ExecuteQueryResult) {}

rpc GetJobStatus (GetJobStatusParams) returns (GetJobStatusResult) {}
}

service ExecutorGrpc {
rpc LaunchTask (LaunchTaskParams) returns (LaunchTaskResult) {}

rpc StopExecutor (StopExecutorParams) returns (StopExecutorResult) {}
}

///////////////////////////////////////////////////////////////////////////////////////////////////
// Arrow Data Types
///////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down
17 changes: 17 additions & 0 deletions ballista/rust/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

//! Ballista configuration
use clap::arg_enum;
use core::fmt;
use std::collections::HashMap;
use std::result;
Expand Down Expand Up @@ -196,6 +197,22 @@ impl BallistaConfig {
}
}

// an enum used to configure the scheduler policy
// needs to be visible to code generated by configure_me
arg_enum! {
#[derive(Clone, Copy, Debug, serde::Deserialize)]
pub enum TaskSchedulingPolicy {
PullStaged,
PushStaged,
}
}

impl parse_arg::ParseArgFromStr for TaskSchedulingPolicy {
fn describe_type<W: fmt::Write>(mut writer: W) -> fmt::Result {
write!(writer, "The scheduler policy for the scheduler")
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
141 changes: 141 additions & 0 deletions ballista/rust/core/src/serde/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ pub struct ExecutorMeta {
pub id: String,
pub host: String,
pub port: u16,
pub grpc_port: u16,
}

#[allow(clippy::from_over_into)]
Expand All @@ -86,6 +87,7 @@ impl Into<protobuf::ExecutorMetadata> for ExecutorMeta {
id: self.id,
host: self.host,
port: self.port as u32,
grpc_port: self.grpc_port as u32,
}
}
}
Expand All @@ -96,10 +98,149 @@ impl From<protobuf::ExecutorMetadata> for ExecutorMeta {
id: meta.id,
host: meta.host,
port: meta.port as u16,
grpc_port: meta.grpc_port as u16,
}
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
pub struct ExecutorSpecification {
pub task_slots: u32,
}

#[allow(clippy::from_over_into)]
impl Into<protobuf::ExecutorSpecification> for ExecutorSpecification {
fn into(self) -> protobuf::ExecutorSpecification {
protobuf::ExecutorSpecification {
resources: vec![protobuf::executor_resource::Resource::TaskSlots(
self.task_slots,
)]
.into_iter()
.map(|r| protobuf::ExecutorResource { resource: Some(r) })
.collect(),
}
}
}

impl From<protobuf::ExecutorSpecification> for ExecutorSpecification {
fn from(input: protobuf::ExecutorSpecification) -> Self {
let mut ret = Self { task_slots: 0 };
for resource in input.resources {
if let Some(protobuf::executor_resource::Resource::TaskSlots(task_slots)) =
resource.resource
{
ret.task_slots = task_slots
}
}
ret
}
}

#[derive(Debug, Clone, Serialize)]
pub struct ExecutorData {
pub executor_id: String,
pub total_task_slots: u32,
pub available_task_slots: u32,
}

struct ExecutorResourcePair {
total: protobuf::executor_resource::Resource,
available: protobuf::executor_resource::Resource,
}

#[allow(clippy::from_over_into)]
impl Into<protobuf::ExecutorData> for ExecutorData {
fn into(self) -> protobuf::ExecutorData {
protobuf::ExecutorData {
executor_id: self.executor_id,
resources: vec![ExecutorResourcePair {
total: protobuf::executor_resource::Resource::TaskSlots(
self.total_task_slots,
),
available: protobuf::executor_resource::Resource::TaskSlots(
self.available_task_slots,
),
}]
.into_iter()
.map(|r| protobuf::ExecutorResourcePair {
total: Some(protobuf::ExecutorResource {
resource: Some(r.total),
}),
available: Some(protobuf::ExecutorResource {
resource: Some(r.available),
}),
})
.collect(),
}
}
}

impl From<protobuf::ExecutorData> for ExecutorData {
fn from(input: protobuf::ExecutorData) -> Self {
let mut ret = Self {
executor_id: input.executor_id,
total_task_slots: 0,
available_task_slots: 0,
};
for resource in input.resources {
if let Some(task_slots) = resource.total {
if let Some(protobuf::executor_resource::Resource::TaskSlots(
task_slots,
)) = task_slots.resource
{
ret.total_task_slots = task_slots
}
};
if let Some(task_slots) = resource.available {
if let Some(protobuf::executor_resource::Resource::TaskSlots(
task_slots,
)) = task_slots.resource
{
ret.available_task_slots = task_slots
}
};
}
ret
}
}

#[derive(Debug, Clone, Copy, Serialize)]
pub struct ExecutorState {
// in bytes
pub available_memory_size: u64,
}

#[allow(clippy::from_over_into)]
impl Into<protobuf::ExecutorState> for ExecutorState {
fn into(self) -> protobuf::ExecutorState {
protobuf::ExecutorState {
metrics: vec![protobuf::executor_metric::Metric::AvailableMemory(
self.available_memory_size,
)]
.into_iter()
.map(|m| protobuf::ExecutorMetric { metric: Some(m) })
.collect(),
}
}
}

impl From<protobuf::ExecutorState> for ExecutorState {
fn from(input: protobuf::ExecutorState) -> Self {
let mut ret = Self {
available_memory_size: u64::MAX,
};
for metric in input.metrics {
if let Some(protobuf::executor_metric::Metric::AvailableMemory(
available_memory_size,
)) = metric.metric
{
ret.available_memory_size = available_memory_size
}
}
ret
}
}

/// Summary of executed partition
#[derive(Debug, Copy, Clone, Default)]
pub struct PartitionStats {
Expand Down
1 change: 1 addition & 0 deletions ballista/rust/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread"] }
tokio-stream = { version = "0.1", features = ["net"] }
tonic = "0.6"
uuid = { version = "0.8", features = ["v4"] }
hyper = "0.14.4"

[dev-dependencies]

Expand Down
13 changes: 13 additions & 0 deletions ballista/rust/executor/executor_config_spec.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ type = "u16"
default = "50051"
doc = "bind port"

[[param]]
name = "bind_grpc_port"
type = "u16"
default = "50052"
doc = "bind grpc service port"

[[param]]
name = "work_dir"
type = "String"
Expand All @@ -65,3 +71,10 @@ name = "concurrent_tasks"
type = "usize"
default = "4"
doc = "Max concurrent tasks."

[[param]]
abbr = "s"
name = "task_scheduling_policy"
type = "ballista_core::config::TaskSchedulingPolicy"
doc = "The task scheduing policy for the scheduler, see TaskSchedulingPolicy::variants() for options. Default: PullStaged"
default = "ballista_core::config::TaskSchedulingPolicy::PullStaged"
Loading

0 comments on commit 71757bb

Please sign in to comment.