Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce push-based task scheduling for Ballista #1560

Merged
merged 3 commits into from
Jan 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ballista/rust/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ tokio = "1.0"
tonic = "0.6"
uuid = { version = "0.8", features = ["v4"] }
chrono = { version = "0.4", default-features = false }
clap = "2"
parse_arg = "0.1.3"
Comment on lines +45 to +46
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be nice if we can avoid depending on CLI related dependencies like clap and parse_arg in the core, but I think it's a minor issue we can clean up as follow ups later.


arrow-flight = { version = "7.0.0" }
datafusion = { path = "../../../datafusion", version = "6.0.0" }
Expand Down
94 changes: 94 additions & 0 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,7 @@ message ExecutorMetadata {
string id = 1;
string host = 2;
uint32 port = 3;
uint32 grpc_port = 4;
}

message ExecutorRegistration {
Expand All @@ -848,12 +849,46 @@ message ExecutorRegistration {
string host = 2;
}
uint32 port = 3;
uint32 grpc_port = 4;
}

message ExecutorHeartbeat {
ExecutorMetadata meta = 1;
// Unix epoch-based timestamp in seconds
uint64 timestamp = 2;
ExecutorState state = 3;
}

message ExecutorState {
repeated ExecutorMetric metrics = 1;
}

message ExecutorMetric {
// TODO add more metrics
oneof metric {
uint64 available_memory = 1;
}
}

message ExecutorSpecification {
repeated ExecutorResource resources = 1;
}

message ExecutorResource {
// TODO add more resources
oneof resource {
uint32 task_slots = 1;
}
}

message ExecutorData {
string executor_id = 1;
repeated ExecutorResourcePair resources = 2;
}

message ExecutorResourcePair {
ExecutorResource total = 1;
ExecutorResource available = 2;
}

message RunningTask {
Expand Down Expand Up @@ -906,6 +941,41 @@ message PollWorkResult {
TaskDefinition task = 1;
}

message RegisterExecutorParams {
ExecutorRegistration metadata = 1;
ExecutorSpecification specification = 2;
}

message RegisterExecutorResult {
bool success = 1;
}

message SendHeartBeatParams {
ExecutorRegistration metadata = 1;
ExecutorState state = 2;
}

message SendHeartBeatResult {
// TODO it's from Spark for BlockManager
bool reregister = 1;
}

message StopExecutorParams {
}

message StopExecutorResult {
}

message UpdateTaskStatusParams {
ExecutorRegistration metadata = 1;
// All tasks must be reported until they reach the failed or completed state
repeated TaskStatus task_status = 2;
}

message UpdateTaskStatusResult {
bool success = 1;
}

message ExecuteQueryParams {
oneof query {
LogicalPlanNode logical_plan = 1;
Expand Down Expand Up @@ -965,17 +1035,41 @@ message FilePartitionMetadata {
repeated string filename = 1;
}

message LaunchTaskParams {
// Allow to launch a task set to an executor at once
repeated TaskDefinition task = 1;
}

message LaunchTaskResult {
bool success = 1;
// TODO when part of the task set are scheduled successfully
}

service SchedulerGrpc {
// Executors must poll the scheduler for heartbeat and to receive tasks
rpc PollWork (PollWorkParams) returns (PollWorkResult) {}

rpc RegisterExecutor(RegisterExecutorParams) returns (RegisterExecutorResult) {}

// Push-based task scheduler will only leverage this interface
// rather than the PollWork interface to report executor states
rpc SendHeartBeat (SendHeartBeatParams) returns (SendHeartBeatResult) {}

rpc UpdateTaskStatus (UpdateTaskStatusParams) returns (UpdateTaskStatusResult) {}

rpc GetFileMetadata (GetFileMetadataParams) returns (GetFileMetadataResult) {}

rpc ExecuteQuery (ExecuteQueryParams) returns (ExecuteQueryResult) {}

rpc GetJobStatus (GetJobStatusParams) returns (GetJobStatusResult) {}
}

service ExecutorGrpc {
rpc LaunchTask (LaunchTaskParams) returns (LaunchTaskResult) {}

rpc StopExecutor (StopExecutorParams) returns (StopExecutorResult) {}
}

///////////////////////////////////////////////////////////////////////////////////////////////////
// Arrow Data Types
///////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down
17 changes: 17 additions & 0 deletions ballista/rust/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

//! Ballista configuration

use clap::arg_enum;
use core::fmt;
use std::collections::HashMap;
use std::result;
Expand Down Expand Up @@ -196,6 +197,22 @@ impl BallistaConfig {
}
}

// an enum used to configure the scheduler policy
// needs to be visible to code generated by configure_me
arg_enum! {
#[derive(Clone, Copy, Debug, serde::Deserialize)]
pub enum TaskSchedulingPolicy {
PullStaged,
PushStaged,
}
}

impl parse_arg::ParseArgFromStr for TaskSchedulingPolicy {
fn describe_type<W: fmt::Write>(mut writer: W) -> fmt::Result {
write!(writer, "The scheduler policy for the scheduler")
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
141 changes: 141 additions & 0 deletions ballista/rust/core/src/serde/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ pub struct ExecutorMeta {
pub id: String,
pub host: String,
pub port: u16,
pub grpc_port: u16,
}

#[allow(clippy::from_over_into)]
Expand All @@ -86,6 +87,7 @@ impl Into<protobuf::ExecutorMetadata> for ExecutorMeta {
id: self.id,
host: self.host,
port: self.port as u32,
grpc_port: self.grpc_port as u32,
}
}
}
Expand All @@ -96,10 +98,149 @@ impl From<protobuf::ExecutorMetadata> for ExecutorMeta {
id: meta.id,
host: meta.host,
port: meta.port as u16,
grpc_port: meta.grpc_port as u16,
}
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
pub struct ExecutorSpecification {
pub task_slots: u32,
}

#[allow(clippy::from_over_into)]
impl Into<protobuf::ExecutorSpecification> for ExecutorSpecification {
fn into(self) -> protobuf::ExecutorSpecification {
protobuf::ExecutorSpecification {
resources: vec![protobuf::executor_resource::Resource::TaskSlots(
self.task_slots,
)]
.into_iter()
.map(|r| protobuf::ExecutorResource { resource: Some(r) })
.collect(),
}
}
}

impl From<protobuf::ExecutorSpecification> for ExecutorSpecification {
fn from(input: protobuf::ExecutorSpecification) -> Self {
let mut ret = Self { task_slots: 0 };
for resource in input.resources {
if let Some(protobuf::executor_resource::Resource::TaskSlots(task_slots)) =
resource.resource
{
ret.task_slots = task_slots
}
}
ret
}
}

#[derive(Debug, Clone, Serialize)]
pub struct ExecutorData {
pub executor_id: String,
pub total_task_slots: u32,
pub available_task_slots: u32,
}

struct ExecutorResourcePair {
total: protobuf::executor_resource::Resource,
available: protobuf::executor_resource::Resource,
}

#[allow(clippy::from_over_into)]
impl Into<protobuf::ExecutorData> for ExecutorData {
fn into(self) -> protobuf::ExecutorData {
protobuf::ExecutorData {
executor_id: self.executor_id,
resources: vec![ExecutorResourcePair {
total: protobuf::executor_resource::Resource::TaskSlots(
self.total_task_slots,
),
available: protobuf::executor_resource::Resource::TaskSlots(
self.available_task_slots,
),
}]
.into_iter()
.map(|r| protobuf::ExecutorResourcePair {
total: Some(protobuf::ExecutorResource {
resource: Some(r.total),
}),
available: Some(protobuf::ExecutorResource {
resource: Some(r.available),
}),
})
.collect(),
}
}
}

impl From<protobuf::ExecutorData> for ExecutorData {
fn from(input: protobuf::ExecutorData) -> Self {
let mut ret = Self {
executor_id: input.executor_id,
total_task_slots: 0,
available_task_slots: 0,
};
for resource in input.resources {
if let Some(task_slots) = resource.total {
if let Some(protobuf::executor_resource::Resource::TaskSlots(
task_slots,
)) = task_slots.resource
{
ret.total_task_slots = task_slots
}
};
if let Some(task_slots) = resource.available {
if let Some(protobuf::executor_resource::Resource::TaskSlots(
task_slots,
)) = task_slots.resource
{
ret.available_task_slots = task_slots
}
};
}
ret
}
}

#[derive(Debug, Clone, Copy, Serialize)]
pub struct ExecutorState {
// in bytes
pub available_memory_size: u64,
}

#[allow(clippy::from_over_into)]
impl Into<protobuf::ExecutorState> for ExecutorState {
fn into(self) -> protobuf::ExecutorState {
protobuf::ExecutorState {
metrics: vec![protobuf::executor_metric::Metric::AvailableMemory(
self.available_memory_size,
)]
.into_iter()
.map(|m| protobuf::ExecutorMetric { metric: Some(m) })
.collect(),
}
}
}

impl From<protobuf::ExecutorState> for ExecutorState {
fn from(input: protobuf::ExecutorState) -> Self {
let mut ret = Self {
available_memory_size: u64::MAX,
};
for metric in input.metrics {
if let Some(protobuf::executor_metric::Metric::AvailableMemory(
available_memory_size,
)) = metric.metric
{
ret.available_memory_size = available_memory_size
}
}
ret
}
}

/// Summary of executed partition
#[derive(Debug, Copy, Clone, Default)]
pub struct PartitionStats {
Expand Down
1 change: 1 addition & 0 deletions ballista/rust/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread"] }
tokio-stream = { version = "0.1", features = ["net"] }
tonic = "0.6"
uuid = { version = "0.8", features = ["v4"] }
hyper = "0.14.4"

[dev-dependencies]

Expand Down
13 changes: 13 additions & 0 deletions ballista/rust/executor/executor_config_spec.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ type = "u16"
default = "50051"
doc = "bind port"

[[param]]
name = "bind_grpc_port"
type = "u16"
default = "50052"
doc = "bind grpc service port"

[[param]]
name = "work_dir"
type = "String"
Expand All @@ -65,3 +71,10 @@ name = "concurrent_tasks"
type = "usize"
default = "4"
doc = "Max concurrent tasks."

[[param]]
abbr = "s"
name = "task_scheduling_policy"
type = "ballista_core::config::TaskSchedulingPolicy"
doc = "The task scheduing policy for the scheduler, see TaskSchedulingPolicy::variants() for options. Default: PullStaged"
default = "ballista_core::config::TaskSchedulingPolicy::PullStaged"
Loading