Skip to content

Commit

Permalink
Combine Cargo workspaces (#23)
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove authored Apr 23, 2021
1 parent 9d720f4 commit 74cdf6f
Show file tree
Hide file tree
Showing 17 changed files with 34 additions and 59 deletions.
16 changes: 8 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

[workspace]
members = [
"datafusion",
"datafusion-examples",
"datafusion",
"datafusion-examples",
"benchmarks",
]

# this package is excluded because it requires different compilation flags, thereby significantly changing
# how it is compiled within the workspace, causing the whole workspace to be compiled from scratch
# this way, this is a stand-alone package that compiles independently of the others.
exclude = ["ballista"]
"ballista/rust/benchmarks/tpch",
"ballista/rust/client",
"ballista/rust/core",
"ballista/rust/executor",
"ballista/rust/scheduler",
]
30 changes: 0 additions & 30 deletions ballista/rust/Cargo.toml

This file was deleted.

2 changes: 1 addition & 1 deletion ballista/rust/benchmarks/tpch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

[package]
name = "tpch"
version = "0.4.2-SNAPSHOT"
version = "0.5.0-SNAPSHOT"
homepage = "https://github.com/apache/arrow"
repository = "https://github.com/apache/arrow"
authors = ["Apache Arrow <[email protected]>"]
Expand Down
2 changes: 1 addition & 1 deletion ballista/rust/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
name = "ballista"
description = "Ballista Distributed Compute"
license = "Apache-2.0"
version = "0.4.2-SNAPSHOT"
version = "0.5.0-SNAPSHOT"
homepage = "https://github.com/apache/arrow"
repository = "https://github.com/apache/arrow"
authors = ["Apache Arrow <[email protected]>"]
Expand Down
4 changes: 2 additions & 2 deletions ballista/rust/client/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use ballista_core::serde::protobuf::{
};
use ballista_core::{
client::BallistaClient,
datasource::DFTableAdapter,
datasource::DfTableAdapter,
error::{BallistaError, Result},
memory_stream::MemoryStream,
utils::create_datafusion_context,
Expand Down Expand Up @@ -151,7 +151,7 @@ impl BallistaContext {
let execution_plan = ctx.create_physical_plan(&plan)?;
ctx.register_table(
TableReference::Bare { table: name },
Arc::new(DFTableAdapter::new(plan, execution_plan)),
Arc::new(DfTableAdapter::new(plan, execution_plan)),
)?;
}
let df = ctx.sql(sql)?;
Expand Down
2 changes: 1 addition & 1 deletion ballista/rust/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
name = "ballista-core"
description = "Ballista Distributed Compute"
license = "Apache-2.0"
version = "0.4.2-SNAPSHOT"
version = "0.5.0-SNAPSHOT"
homepage = "https://github.com/apache/arrow"
repository = "https://github.com/apache/arrow"
authors = ["Apache Arrow <[email protected]>"]
Expand Down
6 changes: 3 additions & 3 deletions ballista/rust/core/src/datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,20 @@ use datafusion::{
/// TableProvider which is effectively a wrapper around a physical plan. We need to be able to
/// register tables so that we can create logical plans from SQL statements that reference these
/// tables.
pub struct DFTableAdapter {
pub struct DfTableAdapter {
/// DataFusion logical plan
pub logical_plan: LogicalPlan,
/// DataFusion execution plan
plan: Arc<dyn ExecutionPlan>,
}

impl DFTableAdapter {
impl DfTableAdapter {
pub fn new(logical_plan: LogicalPlan, plan: Arc<dyn ExecutionPlan>) -> Self {
Self { logical_plan, plan }
}
}

impl TableProvider for DFTableAdapter {
impl TableProvider for DfTableAdapter {
fn as_any(&self) -> &dyn Any {
self
}
Expand Down
1 change: 1 addition & 0 deletions ballista/rust/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub enum BallistaError {
TokioError(tokio::task::JoinError),
}

#[allow(clippy::from_over_into)]
impl<T> Into<Result<T>> for BallistaError {
fn into(self) -> Result<T> {
Err(self)
Expand Down
2 changes: 2 additions & 0 deletions ballista/rust/core/src/serde/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,7 @@ impl TryInto<arrow::datatypes::DataType> for &protobuf::arrow_type::ArrowTypeEnu
}
}

#[allow(clippy::from_over_into)]
impl Into<arrow::datatypes::DataType> for protobuf::PrimitiveScalarType {
fn into(self) -> arrow::datatypes::DataType {
use arrow::datatypes::DataType;
Expand Down Expand Up @@ -1170,6 +1171,7 @@ impl TryFrom<i32> for protobuf::FileType {
}
}

#[allow(clippy::from_over_into)]
impl Into<datafusion::sql::parser::FileType> for protobuf::FileType {
fn into(self) -> datafusion::sql::parser::FileType {
use datafusion::sql::parser::FileType;
Expand Down
7 changes: 4 additions & 3 deletions ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::{
convert::{TryFrom, TryInto},
};

use crate::datasource::DFTableAdapter;
use crate::datasource::DfTableAdapter;
use crate::serde::{protobuf, BallistaError};

use arrow::datatypes::{DataType, Schema};
Expand Down Expand Up @@ -679,7 +679,7 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {

// unwrap the DFTableAdapter to get to the real TableProvider
let source = if let Some(adapter) =
source.as_any().downcast_ref::<DFTableAdapter>()
source.as_any().downcast_ref::<DfTableAdapter>()
{
match &adapter.logical_plan {
LogicalPlan::TableScan { source, .. } => Ok(source.as_any()),
Expand Down Expand Up @@ -1021,7 +1021,7 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
let fun: protobuf::ScalarFunction = fun.try_into()?;
let expr: Vec<protobuf::LogicalExprNode> = args
.iter()
.map(|e| Ok(e.try_into()?))
.map(|e| e.try_into())
.collect::<Result<Vec<protobuf::LogicalExprNode>, BallistaError>>()?;
Ok(protobuf::LogicalExprNode {
expr_type: Some(
Expand Down Expand Up @@ -1164,6 +1164,7 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
}
}

#[allow(clippy::from_over_into)]
impl Into<protobuf::Schema> for &Schema {
fn into(self) -> protobuf::Schema {
protobuf::Schema {
Expand Down
1 change: 1 addition & 0 deletions ballista/rust/core/src/serde/scheduler/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ impl TryInto<PartitionId> for protobuf::PartitionId {
}
}

#[allow(clippy::from_over_into)]
impl Into<PartitionStats> for protobuf::PartitionStats {
fn into(self) -> PartitionStats {
PartitionStats::new(
Expand Down
1 change: 1 addition & 0 deletions ballista/rust/core/src/serde/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pub struct ExecutorMeta {
pub port: u16,
}

#[allow(clippy::from_over_into)]
impl Into<protobuf::ExecutorMetadata> for ExecutorMeta {
fn into(self) -> protobuf::ExecutorMetadata {
protobuf::ExecutorMetadata {
Expand Down
2 changes: 2 additions & 0 deletions ballista/rust/core/src/serde/scheduler/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ impl TryInto<protobuf::ExecutePartition> for ExecutePartition {
}
}

#[allow(clippy::from_over_into)]
impl Into<protobuf::PartitionId> for PartitionId {
fn into(self) -> protobuf::PartitionId {
protobuf::PartitionId {
Expand All @@ -77,6 +78,7 @@ impl TryInto<protobuf::PartitionLocation> for PartitionLocation {
}
}

#[allow(clippy::from_over_into)]
impl Into<protobuf::PartitionStats> for PartitionStats {
fn into(self) -> protobuf::PartitionStats {
let none_value = -1_i64;
Expand Down
3 changes: 1 addition & 2 deletions ballista/rust/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@
name = "ballista-executor"
description = "Ballista Distributed Compute - Executor"
license = "Apache-2.0"
version = "0.4.2-SNAPSHOT"
version = "0.5.0-SNAPSHOT"
homepage = "https://github.com/apache/arrow"
repository = "https://github.com/apache/arrow"
authors = ["Apache Arrow <[email protected]>"]
edition = "2018"

[features]
default = ["snmalloc"]
snmalloc = ["snmalloc-rs"]

[dependencies]
Expand Down
2 changes: 1 addition & 1 deletion ballista/rust/scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
name = "ballista-scheduler"
description = "Ballista Distributed Compute - Scheduler"
license = "Apache-2.0"
version = "0.4.2-SNAPSHOT"
version = "0.5.0-SNAPSHOT"
homepage = "https://github.com/apache/arrow"
repository = "https://github.com/apache/arrow"
authors = ["Apache Arrow <[email protected]>"]
Expand Down
4 changes: 2 additions & 2 deletions ballista/rust/scheduler/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::time::Instant;
use std::{collections::HashMap, future::Future};

use ballista_core::client::BallistaClient;
use ballista_core::datasource::DFTableAdapter;
use ballista_core::datasource::DfTableAdapter;
use ballista_core::error::{BallistaError, Result};
use ballista_core::serde::scheduler::ExecutorMeta;
use ballista_core::serde::scheduler::PartitionId;
Expand Down Expand Up @@ -138,7 +138,7 @@ impl DistributedPlanner {
stages.append(&mut child_stages);
}

if let Some(adapter) = execution_plan.as_any().downcast_ref::<DFTableAdapter>() {
if let Some(adapter) = execution_plan.as_any().downcast_ref::<DfTableAdapter>() {
// remove Repartition rule because that isn't supported yet
let rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
Arc::new(CoalesceBatches::new()),
Expand Down
8 changes: 3 additions & 5 deletions dev/docker/rust.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -59,20 +59,18 @@ ARG RELEASE_FLAG=--release
# force build.rs to run to generate configure_me code.
ENV FORCE_REBUILD='true'
RUN cargo build $RELEASE_FLAG
RUN cd ballista/rust && \
cargo build $RELEASE_FLAG

# put the executor on /executor (need to be copied from different places depending on FLAG)
ENV RELEASE_FLAG=${RELEASE_FLAG}
RUN if [ -z "$RELEASE_FLAG" ]; then mv /tmp/ballista/ballista/rust/target/debug/ballista-executor /executor; else mv /tmp/ballista/ballista/rust/target/release/ballista-executor /executor; fi
RUN if [ -z "$RELEASE_FLAG" ]; then mv /tmp/ballista/target/debug/ballista-executor /executor; else mv /tmp/ballista/target/release/ballista-executor /executor; fi

# put the scheduler on /scheduler (need to be copied from different places depending on FLAG)
ENV RELEASE_FLAG=${RELEASE_FLAG}
RUN if [ -z "$RELEASE_FLAG" ]; then mv /tmp/ballista/ballista/rust/target/debug/ballista-scheduler /scheduler; else mv /tmp/ballista/ballista/rust/target/release/ballista-scheduler /scheduler; fi
RUN if [ -z "$RELEASE_FLAG" ]; then mv /tmp/ballista/target/debug/ballista-scheduler /scheduler; else mv /tmp/ballista/target/release/ballista-scheduler /scheduler; fi

# put the tpch on /tpch (need to be copied from different places depending on FLAG)
ENV RELEASE_FLAG=${RELEASE_FLAG}
RUN if [ -z "$RELEASE_FLAG" ]; then mv /tmp/ballista/ballista/rust/target/debug/tpch /tpch; else mv /tmp/ballista/ballista/rust/target/release/tpch /tpch; fi
RUN if [ -z "$RELEASE_FLAG" ]; then mv /tmp/ballista/target/debug/tpch /tpch; else mv /tmp/ballista/target/release/tpch /tpch; fi

# Copy the binary into a new container for a smaller docker image
FROM ballistacompute/rust-base:0.4.0-20210213
Expand Down

0 comments on commit 74cdf6f

Please sign in to comment.