Skip to content

Commit

Permalink
Officially maintained Arrow2 branch
Browse files Browse the repository at this point in the history
    Co-authored-by: Jorge C. Leitao <[email protected]>
    Co-authored-by: Yijie Shen <[email protected]>
    Co-authored-by: Guillaume Balaine <[email protected]>
    Co-authored-by: Guillaume Balaine <[email protected]>
  • Loading branch information
houqp committed Mar 22, 2022
1 parent ca952bd commit 744b262
Show file tree
Hide file tree
Showing 240 changed files with 9,519 additions and 8,209 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -383,8 +383,7 @@ jobs:
run: |
cargo miri setup
cargo clean
# Ignore MIRI errors until we can get a clean run
cargo miri test || true
cargo miri test
# Check answers are correct when hash values collide
hash-collisions:
Expand Down
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,7 @@ members = [
[profile.release]
lto = true
codegen-units = 1

[patch.crates-io]
arrow2 = { git = "https://github.com/jorgecarleitao/arrow2.git", rev = "v0.10.0" }
parquet2 = { git = "https://github.com/jorgecarleitao/parquet2.git", rev = "v0.10.1" }
2 changes: 1 addition & 1 deletion ballista-examples/src/bin/ballista-dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ async fn main() -> Result<()> {
.build()?;
let ctx = BallistaContext::remote("localhost", 50050, &config);

let testdata = datafusion::arrow::util::test_util::parquet_test_data();
let testdata = datafusion::test_util::parquet_test_data();

let filename = &format!("{}/alltypes_plain.parquet", testdata);

Expand Down
2 changes: 1 addition & 1 deletion ballista-examples/src/bin/ballista-sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ async fn main() -> Result<()> {
.build()?;
let ctx = BallistaContext::remote("localhost", 50050, &config);

let testdata = datafusion::arrow::util::test_util::arrow_test_data();
let testdata = datafusion::test_util::arrow_test_data();

// register csv file with the execution context
ctx.register_csv(
Expand Down
4 changes: 2 additions & 2 deletions ballista/rust/client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ data set.

```rust,no_run
use ballista::prelude::*;
use datafusion::arrow::util::pretty;
use datafusion::arrow::io::print;
use datafusion::prelude::CsvReadOptions;
#[tokio::main]
Expand Down Expand Up @@ -125,7 +125,7 @@ async fn main() -> Result<()> {
// collect the results and print them to stdout
let results = df.collect().await?;
pretty::print_batches(&results)?;
print::print(&results);
Ok(())
}
```
Expand Down
12 changes: 7 additions & 5 deletions ballista/rust/client/src/columnar_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ use ballista_core::error::{ballista_error, Result};

use datafusion::arrow::{
array::ArrayRef,
compute::aggregate::estimated_bytes_size,
datatypes::{DataType, Schema},
record_batch::RecordBatch,
};
use datafusion::field_util::{FieldExt, SchemaExt};
use datafusion::record_batch::RecordBatch;
use datafusion::scalar::ScalarValue;

pub type MaybeColumnarBatch = Result<Option<ColumnarBatch>>;
Expand All @@ -43,14 +45,14 @@ impl ColumnarBatch {
.enumerate()
.map(|(i, array)| {
(
batch.schema().field(i).name().clone(),
batch.schema().field(i).name().to_string(),
ColumnarValue::Columnar(array.clone()),
)
})
.collect();

Self {
schema: batch.schema(),
schema: batch.schema().clone(),
columns,
}
}
Expand All @@ -60,7 +62,7 @@ impl ColumnarBatch {
.fields()
.iter()
.enumerate()
.map(|(i, f)| (f.name().clone(), values[i].clone()))
.map(|(i, f)| (f.name().to_string(), values[i].clone()))
.collect();

Self {
Expand Down Expand Up @@ -156,7 +158,7 @@ impl ColumnarValue {

pub fn memory_size(&self) -> usize {
match self {
ColumnarValue::Columnar(array) => array.get_array_memory_size(),
ColumnarValue::Columnar(array) => estimated_bytes_size(array.as_ref()),
_ => 0,
}
}
Expand Down
4 changes: 3 additions & 1 deletion ballista/rust/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ chrono = { version = "0.4", default-features = false }
clap = { version = "3", features = ["derive", "cargo"] }
parse_arg = "0.1.3"

arrow-flight = { version = "10.0" }
arrow-format = { version = "0.4", features = ["flight-data", "flight-service"] }
arrow = { package = "arrow2", version="0.10", features = ["io_ipc", "io_flight"] }

datafusion = { path = "../../../datafusion", version = "7.0.0" }
datafusion-proto = { path = "../../../datafusion-proto", version = "7.0.0" }

Expand Down
51 changes: 34 additions & 17 deletions ballista/rust/core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,29 @@

//! Client API for sending requests to executors.
use arrow::io::flight::deserialize_schemas;
use arrow::io::ipc::IpcSchema;
use std::collections::HashMap;
use std::sync::Arc;

use std::{
convert::{TryFrom, TryInto},
convert::TryInto,
task::{Context, Poll},
};

use crate::error::{ballista_error, BallistaError, Result};
use crate::serde::protobuf::{self};
use crate::serde::scheduler::Action;

use arrow_flight::utils::flight_data_to_arrow_batch;
use arrow_flight::Ticket;
use arrow_flight::{flight_service_client::FlightServiceClient, FlightData};
use arrow_format::flight::data::{FlightData, Ticket};
use arrow_format::flight::service::flight_service_client::FlightServiceClient;
use datafusion::arrow::{
datatypes::{Schema, SchemaRef},
datatypes::SchemaRef,
error::{ArrowError, Result as ArrowResult},
record_batch::RecordBatch,
};

use datafusion::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
use datafusion::field_util::SchemaExt;
use datafusion::physical_plan::RecordBatchStream;
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion::record_batch::RecordBatch;
use futures::{Stream, StreamExt};
use log::debug;
use prost::Message;
Expand Down Expand Up @@ -116,10 +118,12 @@ impl BallistaClient {
{
Some(flight_data) => {
// convert FlightData to a stream
let schema = Arc::new(Schema::try_from(&flight_data)?);
let (schema, ipc_schema) =
deserialize_schemas(flight_data.data_body.as_slice()).unwrap();
let schema = Arc::new(schema);

// all the remaining stream messages should be dictionary and record batches
Ok(Box::pin(FlightDataStream::new(stream, schema)))
Ok(Box::pin(FlightDataStream::new(stream, schema, ipc_schema)))
}
None => Err(ballista_error(
"Did not receive schema batch from flight server",
Expand All @@ -131,11 +135,20 @@ impl BallistaClient {
struct FlightDataStream {
stream: Streaming<FlightData>,
schema: SchemaRef,
ipc_schema: IpcSchema,
}

impl FlightDataStream {
pub fn new(stream: Streaming<FlightData>, schema: SchemaRef) -> Self {
Self { stream, schema }
pub fn new(
stream: Streaming<FlightData>,
schema: SchemaRef,
ipc_schema: IpcSchema,
) -> Self {
Self {
stream,
schema,
ipc_schema,
}
}
}

Expand All @@ -151,12 +164,16 @@ impl Stream for FlightDataStream {
let converted_chunk = flight_data_chunk_result
.map_err(|e| ArrowError::from_external_error(Box::new(e)))
.and_then(|flight_data_chunk| {
flight_data_to_arrow_batch(
let hm = HashMap::new();

arrow::io::flight::deserialize_batch(
&flight_data_chunk,
self.schema.clone(),
&[],
self.schema.fields(),
&self.ipc_schema,
&hm,
)
});
})
.map(|c| RecordBatch::new_with_chunk(&self.schema, c));
Some(converted_chunk)
}
None => None,
Expand Down
2 changes: 1 addition & 1 deletion ballista/rust/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ impl BallistaConfig {
.map_err(|e| format!("{:?}", e))?;
}
_ => {
return Err(format!("not support data type: {}", data_type));
return Err(format!("not support data type: {:?}", data_type));
}
}

Expand Down
1 change: 0 additions & 1 deletion ballista/rust/core/src/execution_plans/shuffle_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use crate::serde::scheduler::{PartitionLocation, PartitionStats};
use crate::utils::WrappedStream;
use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;

use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::metrics::{
Expand Down
79 changes: 39 additions & 40 deletions ballista/rust/core/src/execution_plans/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
//! partition is re-partitioned and streamed to disk in Arrow IPC format. Future stages of the query
//! will use the ShuffleReaderExec to read these results.
use datafusion::physical_plan::expressions::PhysicalSortExpr;

use std::any::Any;
use std::iter::Iterator;
use std::path::PathBuf;
Expand All @@ -33,16 +31,12 @@ use crate::utils;
use crate::serde::protobuf::ShuffleWritePartition;
use crate::serde::scheduler::PartitionStats;
use async_trait::async_trait;
use datafusion::arrow::array::{
Array, ArrayBuilder, ArrayRef, StringBuilder, StructBuilder, UInt32Builder,
UInt64Builder,
};
use datafusion::arrow::array::*;
use datafusion::arrow::compute::take;
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};

use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::field_util::SchemaExt;
use datafusion::physical_plan::common::IPCWriter;
use datafusion::physical_plan::hash_utils::create_hashes;
use datafusion::physical_plan::memory::MemoryStream;
Expand All @@ -53,8 +47,10 @@ use datafusion::physical_plan::metrics::{
use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
};
use datafusion::record_batch::RecordBatch;
use futures::StreamExt;

use datafusion::physical_plan::expressions::PhysicalSortExpr;
use log::{debug, info};

/// ShuffleWriterExec represents a section of a query plan that has consistent partitioning and
Expand Down Expand Up @@ -230,21 +226,24 @@ impl ShuffleWriterExec {
for (output_partition, partition_indices) in
indices.into_iter().enumerate()
{
let indices = partition_indices.into();

// Produce batches based on indices
let columns = input_batch
.columns()
.iter()
.map(|c| {
take(c.as_ref(), &indices, None).map_err(|e| {
DataFusionError::Execution(e.to_string())
})
take::take(
c.as_ref(),
&PrimitiveArray::<u64>::from_slice(
&partition_indices,
),
)
.map_err(|e| DataFusionError::Execution(e.to_string()))
.map(ArrayRef::from)
})
.collect::<Result<Vec<Arc<dyn Array>>>>()?;

let output_batch =
RecordBatch::try_new(input_batch.schema(), columns)?;
RecordBatch::try_new(input_batch.schema().clone(), columns)?;

// write non-empty batch out

Expand Down Expand Up @@ -364,36 +363,34 @@ impl ExecutionPlan for ShuffleWriterExec {

// build metadata result batch
let num_writers = part_loc.len();
let mut partition_builder = UInt32Builder::new(num_writers);
let mut path_builder = StringBuilder::new(num_writers);
let mut num_rows_builder = UInt64Builder::new(num_writers);
let mut num_batches_builder = UInt64Builder::new(num_writers);
let mut num_bytes_builder = UInt64Builder::new(num_writers);
let mut partition_builder = UInt32Vec::with_capacity(num_writers);
let mut path_builder = MutableUtf8Array::<i32>::with_capacity(num_writers);
let mut num_rows_builder = UInt64Vec::with_capacity(num_writers);
let mut num_batches_builder = UInt64Vec::with_capacity(num_writers);
let mut num_bytes_builder = UInt64Vec::with_capacity(num_writers);

for loc in &part_loc {
path_builder.append_value(loc.path.clone())?;
partition_builder.append_value(loc.partition_id as u32)?;
num_rows_builder.append_value(loc.num_rows)?;
num_batches_builder.append_value(loc.num_batches)?;
num_bytes_builder.append_value(loc.num_bytes)?;
path_builder.push(Some(loc.path.clone()));
partition_builder.push(Some(loc.partition_id as u32));
num_rows_builder.push(Some(loc.num_rows));
num_batches_builder.push(Some(loc.num_batches));
num_bytes_builder.push(Some(loc.num_bytes));
}

// build arrays
let partition_num: ArrayRef = Arc::new(partition_builder.finish());
let path: ArrayRef = Arc::new(path_builder.finish());
let field_builders: Vec<Box<dyn ArrayBuilder>> = vec![
Box::new(num_rows_builder),
Box::new(num_batches_builder),
Box::new(num_bytes_builder),
let partition_num: ArrayRef = partition_builder.into_arc();
let path: ArrayRef = path_builder.into_arc();
let field_builders: Vec<Arc<dyn Array>> = vec![
num_rows_builder.into_arc(),
num_batches_builder.into_arc(),
num_bytes_builder.into_arc(),
];
let mut stats_builder = StructBuilder::new(
PartitionStats::default().arrow_struct_fields(),
let stats_builder = StructArray::from_data(
DataType::Struct(PartitionStats::default().arrow_struct_fields()),
field_builders,
None,
);
for _ in 0..num_writers {
stats_builder.append(true)?;
}
let stats = Arc::new(stats_builder.finish());
let stats = Arc::new(stats_builder);

// build result batch containing metadata
let schema = result_schema();
Expand Down Expand Up @@ -443,9 +440,11 @@ fn result_schema() -> SchemaRef {
#[cfg(test)]
mod tests {
use super::*;
use datafusion::arrow::array::{StringArray, StructArray, UInt32Array, UInt64Array};
use datafusion::arrow::array::{StructArray, UInt32Array, UInt64Array, Utf8Array};
use datafusion::field_util::StructArrayExt;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::expressions::Column;
use std::iter::FromIterator;

use datafusion::physical_plan::memory::MemoryExec;
use tempfile::TempDir;
Expand Down Expand Up @@ -473,7 +472,7 @@ mod tests {
assert_eq!(2, batch.num_rows());
let path = batch.columns()[1]
.as_any()
.downcast_ref::<StringArray>()
.downcast_ref::<Utf8Array<i32>>()
.unwrap();

let file0 = path.value(0);
Expand Down Expand Up @@ -551,8 +550,8 @@ mod tests {
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(UInt32Array::from(vec![Some(1), Some(2)])),
Arc::new(StringArray::from(vec![Some("hello"), Some("world")])),
Arc::new(UInt32Array::from_iter(vec![Some(1), Some(2)])),
Arc::new(Utf8Array::<i32>::from(vec![Some("hello"), Some("world")])),
],
)?;
let partition = vec![batch.clone(), batch];
Expand Down
3 changes: 3 additions & 0 deletions ballista/rust/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
#![doc = include_str!("../README.md")]
pub const BALLISTA_VERSION: &str = env!("CARGO_PKG_VERSION");

#[macro_use]
extern crate async_trait;

pub fn print_version() {
println!("Ballista version: {}", BALLISTA_VERSION)
}
Expand Down
1 change: 1 addition & 0 deletions ballista/rust/core/src/memory_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

Loading

0 comments on commit 744b262

Please sign in to comment.