Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Combine Cargo workspaces #23

Merged
merged 6 commits into from
Apr 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

[workspace]
members = [
"datafusion",
"datafusion-examples",
"datafusion",
"datafusion-examples",
"benchmarks",
]

# this package is excluded because it requires different compilation flags, thereby significantly changing
# how it is compiled within the workspace, causing the whole workspace to be compiled from scratch
# this way, this is a stand-alone package that compiles independently of the others.
exclude = ["ballista"]
"ballista/rust/benchmarks/tpch",
"ballista/rust/client",
"ballista/rust/core",
"ballista/rust/executor",
"ballista/rust/scheduler",
]
30 changes: 0 additions & 30 deletions ballista/rust/Cargo.toml

This file was deleted.

2 changes: 1 addition & 1 deletion ballista/rust/benchmarks/tpch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

[package]
name = "tpch"
version = "0.4.2-SNAPSHOT"
version = "0.5.0-SNAPSHOT"
homepage = "https://github.com/apache/arrow"
repository = "https://github.com/apache/arrow"
authors = ["Apache Arrow <[email protected]>"]
Expand Down
2 changes: 1 addition & 1 deletion ballista/rust/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
name = "ballista"
description = "Ballista Distributed Compute"
license = "Apache-2.0"
version = "0.4.2-SNAPSHOT"
version = "0.5.0-SNAPSHOT"
homepage = "https://github.com/apache/arrow"
repository = "https://github.com/apache/arrow"
authors = ["Apache Arrow <[email protected]>"]
Expand Down
4 changes: 2 additions & 2 deletions ballista/rust/client/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use ballista_core::serde::protobuf::{
};
use ballista_core::{
client::BallistaClient,
datasource::DFTableAdapter,
datasource::DfTableAdapter,
error::{BallistaError, Result},
memory_stream::MemoryStream,
utils::create_datafusion_context,
Expand Down Expand Up @@ -151,7 +151,7 @@ impl BallistaContext {
let execution_plan = ctx.create_physical_plan(&plan)?;
ctx.register_table(
TableReference::Bare { table: name },
Arc::new(DFTableAdapter::new(plan, execution_plan)),
Arc::new(DfTableAdapter::new(plan, execution_plan)),
)?;
}
let df = ctx.sql(sql)?;
Expand Down
2 changes: 1 addition & 1 deletion ballista/rust/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
name = "ballista-core"
description = "Ballista Distributed Compute"
license = "Apache-2.0"
version = "0.4.2-SNAPSHOT"
version = "0.5.0-SNAPSHOT"
homepage = "https://github.com/apache/arrow"
repository = "https://github.com/apache/arrow"
authors = ["Apache Arrow <[email protected]>"]
Expand Down
6 changes: 3 additions & 3 deletions ballista/rust/core/src/datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,20 @@ use datafusion::{
/// TableProvider which is effectively a wrapper around a physical plan. We need to be able to
/// register tables so that we can create logical plans from SQL statements that reference these
/// tables.
pub struct DFTableAdapter {
pub struct DfTableAdapter {
/// DataFusion logical plan
pub logical_plan: LogicalPlan,
/// DataFusion execution plan
plan: Arc<dyn ExecutionPlan>,
}

impl DFTableAdapter {
impl DfTableAdapter {
pub fn new(logical_plan: LogicalPlan, plan: Arc<dyn ExecutionPlan>) -> Self {
Self { logical_plan, plan }
}
}

impl TableProvider for DFTableAdapter {
impl TableProvider for DfTableAdapter {
fn as_any(&self) -> &dyn Any {
self
}
Expand Down
1 change: 1 addition & 0 deletions ballista/rust/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub enum BallistaError {
TokioError(tokio::task::JoinError),
}

#[allow(clippy::from_over_into)]
impl<T> Into<Result<T>> for BallistaError {
fn into(self) -> Result<T> {
Err(self)
Expand Down
2 changes: 2 additions & 0 deletions ballista/rust/core/src/serde/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,7 @@ impl TryInto<arrow::datatypes::DataType> for &protobuf::arrow_type::ArrowTypeEnu
}
}

#[allow(clippy::from_over_into)]
impl Into<arrow::datatypes::DataType> for protobuf::PrimitiveScalarType {
fn into(self) -> arrow::datatypes::DataType {
use arrow::datatypes::DataType;
Expand Down Expand Up @@ -1170,6 +1171,7 @@ impl TryFrom<i32> for protobuf::FileType {
}
}

#[allow(clippy::from_over_into)]
impl Into<datafusion::sql::parser::FileType> for protobuf::FileType {
fn into(self) -> datafusion::sql::parser::FileType {
use datafusion::sql::parser::FileType;
Expand Down
7 changes: 4 additions & 3 deletions ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::{
convert::{TryFrom, TryInto},
};

use crate::datasource::DFTableAdapter;
use crate::datasource::DfTableAdapter;
use crate::serde::{protobuf, BallistaError};

use arrow::datatypes::{DataType, Schema};
Expand Down Expand Up @@ -679,7 +679,7 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {

// unwrap the DFTableAdapter to get to the real TableProvider
let source = if let Some(adapter) =
source.as_any().downcast_ref::<DFTableAdapter>()
source.as_any().downcast_ref::<DfTableAdapter>()
{
match &adapter.logical_plan {
LogicalPlan::TableScan { source, .. } => Ok(source.as_any()),
Expand Down Expand Up @@ -1020,7 +1020,7 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
let fun: protobuf::ScalarFunction = fun.try_into()?;
let expr: Vec<protobuf::LogicalExprNode> = args
.iter()
.map(|e| Ok(e.try_into()?))
.map(|e| e.try_into())
.collect::<Result<Vec<protobuf::LogicalExprNode>, BallistaError>>()?;
Ok(protobuf::LogicalExprNode {
expr_type: Some(
Expand Down Expand Up @@ -1163,6 +1163,7 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
}
}

#[allow(clippy::from_over_into)]
impl Into<protobuf::Schema> for &Schema {
fn into(self) -> protobuf::Schema {
protobuf::Schema {
Expand Down
1 change: 1 addition & 0 deletions ballista/rust/core/src/serde/scheduler/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ impl TryInto<PartitionId> for protobuf::PartitionId {
}
}

#[allow(clippy::from_over_into)]
impl Into<PartitionStats> for protobuf::PartitionStats {
fn into(self) -> PartitionStats {
PartitionStats::new(
Expand Down
1 change: 1 addition & 0 deletions ballista/rust/core/src/serde/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pub struct ExecutorMeta {
pub port: u16,
}

#[allow(clippy::from_over_into)]
impl Into<protobuf::ExecutorMetadata> for ExecutorMeta {
fn into(self) -> protobuf::ExecutorMetadata {
protobuf::ExecutorMetadata {
Expand Down
2 changes: 2 additions & 0 deletions ballista/rust/core/src/serde/scheduler/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ impl TryInto<protobuf::ExecutePartition> for ExecutePartition {
}
}

#[allow(clippy::from_over_into)]
impl Into<protobuf::PartitionId> for PartitionId {
fn into(self) -> protobuf::PartitionId {
protobuf::PartitionId {
Expand All @@ -77,6 +78,7 @@ impl TryInto<protobuf::PartitionLocation> for PartitionLocation {
}
}

#[allow(clippy::from_over_into)]
impl Into<protobuf::PartitionStats> for PartitionStats {
fn into(self) -> protobuf::PartitionStats {
let none_value = -1_i64;
Expand Down
3 changes: 1 addition & 2 deletions ballista/rust/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@
name = "ballista-executor"
description = "Ballista Distributed Compute - Executor"
license = "Apache-2.0"
version = "0.4.2-SNAPSHOT"
version = "0.5.0-SNAPSHOT"
homepage = "https://github.com/apache/arrow"
repository = "https://github.com/apache/arrow"
authors = ["Apache Arrow <[email protected]>"]
edition = "2018"

[features]
default = ["snmalloc"]
snmalloc = ["snmalloc-rs"]

[dependencies]
Expand Down
2 changes: 1 addition & 1 deletion ballista/rust/scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
name = "ballista-scheduler"
description = "Ballista Distributed Compute - Scheduler"
license = "Apache-2.0"
version = "0.4.2-SNAPSHOT"
version = "0.5.0-SNAPSHOT"
homepage = "https://github.com/apache/arrow"
repository = "https://github.com/apache/arrow"
authors = ["Apache Arrow <[email protected]>"]
Expand Down
4 changes: 2 additions & 2 deletions ballista/rust/scheduler/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::time::Instant;
use std::{collections::HashMap, future::Future};

use ballista_core::client::BallistaClient;
use ballista_core::datasource::DFTableAdapter;
use ballista_core::datasource::DfTableAdapter;
use ballista_core::error::{BallistaError, Result};
use ballista_core::serde::scheduler::ExecutorMeta;
use ballista_core::serde::scheduler::PartitionId;
Expand Down Expand Up @@ -138,7 +138,7 @@ impl DistributedPlanner {
stages.append(&mut child_stages);
}

if let Some(adapter) = execution_plan.as_any().downcast_ref::<DFTableAdapter>() {
if let Some(adapter) = execution_plan.as_any().downcast_ref::<DfTableAdapter>() {
// remove Repartition rule because that isn't supported yet
let rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
Arc::new(CoalesceBatches::new()),
Expand Down
8 changes: 3 additions & 5 deletions dev/docker/rust.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -59,20 +59,18 @@ ARG RELEASE_FLAG=--release
# force build.rs to run to generate configure_me code.
ENV FORCE_REBUILD='true'
RUN cargo build $RELEASE_FLAG
RUN cd ballista/rust && \
cargo build $RELEASE_FLAG

# put the executor on /executor (need to be copied from different places depending on FLAG)
ENV RELEASE_FLAG=${RELEASE_FLAG}
RUN if [ -z "$RELEASE_FLAG" ]; then mv /tmp/ballista/ballista/rust/target/debug/ballista-executor /executor; else mv /tmp/ballista/ballista/rust/target/release/ballista-executor /executor; fi
RUN if [ -z "$RELEASE_FLAG" ]; then mv /tmp/ballista/target/debug/ballista-executor /executor; else mv /tmp/ballista/target/release/ballista-executor /executor; fi

# put the scheduler on /scheduler (need to be copied from different places depending on FLAG)
ENV RELEASE_FLAG=${RELEASE_FLAG}
RUN if [ -z "$RELEASE_FLAG" ]; then mv /tmp/ballista/ballista/rust/target/debug/ballista-scheduler /scheduler; else mv /tmp/ballista/ballista/rust/target/release/ballista-scheduler /scheduler; fi
RUN if [ -z "$RELEASE_FLAG" ]; then mv /tmp/ballista/target/debug/ballista-scheduler /scheduler; else mv /tmp/ballista/target/release/ballista-scheduler /scheduler; fi

# put the tpch on /tpch (need to be copied from different places depending on FLAG)
ENV RELEASE_FLAG=${RELEASE_FLAG}
RUN if [ -z "$RELEASE_FLAG" ]; then mv /tmp/ballista/ballista/rust/target/debug/tpch /tpch; else mv /tmp/ballista/ballista/rust/target/release/tpch /tpch; fi
RUN if [ -z "$RELEASE_FLAG" ]; then mv /tmp/ballista/target/debug/tpch /tpch; else mv /tmp/ballista/target/release/tpch /tpch; fi

# Copy the binary into a new container for a smaller docker image
FROM ballistacompute/rust-base:0.4.0-20210213
Expand Down