Skip to content

Commit

Permalink
[DataFusion] - Add show and show_limit function for DataFrame (#923)
Browse files Browse the repository at this point in the history
* feat: support show function for DataFrame

* fix: fix docs comments

* fix: fix typo

* fix: fix pre-commit

* fix: fix code format

* fix: improve show function implementation

* fix: change match pattern to 'if let' single pattern

* fix: Rewrite show function impl and add a new show_limit function

* fix: Add the show function to the sample code

* fix: fix cargo test error
  • Loading branch information
francis-du authored Aug 24, 2021
1 parent 1922130 commit 5871207
Show file tree
Hide file tree
Showing 13 changed files with 61 additions and 47 deletions.
9 changes: 3 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,7 @@ async fn main() -> datafusion::error::Result<()> {
let df = ctx.sql("SELECT a, MIN(b) FROM example GROUP BY a LIMIT 100")?;

// execute and print results
let results: Vec<RecordBatch> = df.collect().await?;
print_batches(&results)?;
df.show().await?;
Ok(())
}
```
Expand All @@ -102,12 +101,10 @@ async fn main() -> datafusion::error::Result<()> {
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;

let df = df.filter(col("a").lt_eq(col("b")))?
.aggregate(vec![col("a")], vec![min(col("b"))])?
.limit(100)?;
.aggregate(vec![col("a")], vec![min(col("b"))])?;

// execute and print results
let results: Vec<RecordBatch> = df.collect().await?;
print_batches(&results)?;
df.show_limit(100).await?;
Ok(())
}
```
Expand Down
5 changes: 2 additions & 3 deletions ballista-examples/src/bin/ballista-dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
// under the License.

use ballista::prelude::*;
use datafusion::arrow::util::pretty;
use datafusion::prelude::{col, lit};

/// This example demonstrates executing a simple query against an Arrow data source (Parquet) and
Expand All @@ -38,8 +37,8 @@ async fn main() -> Result<()> {
.select_columns(&["id", "bool_col", "timestamp_col"])?
.filter(col("id").gt(lit(1)))?;

let results = df.collect().await?;
pretty::print_batches(&results)?;
// print the results
df.show().await?;

Ok(())
}
5 changes: 2 additions & 3 deletions ballista-examples/src/bin/ballista-sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
// under the License.

use ballista::prelude::*;
use datafusion::arrow::util::pretty;
use datafusion::prelude::CsvReadOptions;

/// This example demonstrates executing a simple query against an Arrow data source (CSV) and
Expand Down Expand Up @@ -45,8 +44,8 @@ async fn main() -> Result<()> {
GROUP BY c1",
)?;

let results = df.collect().await?;
pretty::print_batches(&results)?;
// print the results
df.show().await?;

Ok(())
}
5 changes: 1 addition & 4 deletions datafusion-examples/examples/csv_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
// specific language governing permissions and limitations
// under the License.

use datafusion::arrow::util::pretty;

use datafusion::error::Result;
use datafusion::prelude::*;

Expand All @@ -43,10 +41,9 @@ async fn main() -> Result<()> {
WHERE c11 > 0.1 AND c11 < 0.9 \
GROUP BY c1",
)?;
let results = df.collect().await?;

// print the results
pretty::print_batches(&results)?;
df.show().await?;

Ok(())
}
7 changes: 1 addition & 6 deletions datafusion-examples/examples/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
// specific language governing permissions and limitations
// under the License.

use datafusion::arrow::util::pretty;

use datafusion::error::Result;
use datafusion::prelude::*;

Expand All @@ -37,11 +35,8 @@ async fn main() -> Result<()> {
.select_columns(&["id", "bool_col", "timestamp_col"])?
.filter(col("id").gt(lit(1)))?;

// execute the query
let results = df.collect().await?;

// print the results
pretty::print_batches(&results)?;
df.show().await?;

Ok(())
}
6 changes: 1 addition & 5 deletions datafusion-examples/examples/dataframe_in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use std::sync::Arc;
use datafusion::arrow::array::{Int32Array, StringArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::arrow::util::pretty;

use datafusion::datasource::MemTable;
use datafusion::error::Result;
Expand Down Expand Up @@ -57,11 +56,8 @@ async fn main() -> Result<()> {

let df = df.select_columns(&["a", "b"])?.filter(filter)?;

// execute
let results = df.collect().await?;

// print the results
pretty::print_batches(&results)?;
df.show().await?;

Ok(())
}
2 changes: 1 addition & 1 deletion datafusion-examples/examples/flight_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ use std::convert::TryFrom;
use std::sync::Arc;

use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::util::pretty;

use arrow_flight::flight_descriptor;
use arrow_flight::flight_service_client::FlightServiceClient;
use arrow_flight::utils::flight_data_to_arrow_batch;
use arrow_flight::{FlightDescriptor, Ticket};
use datafusion::arrow::util::pretty;

/// This example shows how to wrap DataFusion with `FlightService` to support looking up schema information for
/// Parquet files and executing SQL queries against them on a remote server.
Expand Down
5 changes: 1 addition & 4 deletions datafusion-examples/examples/parquet_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
// specific language governing permissions and limitations
// under the License.

use datafusion::arrow::util::pretty;

use datafusion::error::Result;
use datafusion::prelude::*;

Expand All @@ -41,10 +39,9 @@ async fn main() -> Result<()> {
FROM alltypes_plain \
WHERE id > 1 AND tinyint_col < double_col",
)?;
let results = df.collect().await?;

// print the results
pretty::print_batches(&results)?;
df.show().await?;

Ok(())
}
6 changes: 1 addition & 5 deletions datafusion-examples/examples/simple_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use datafusion::arrow::{
array::{ArrayRef, Float32Array, Float64Array},
datatypes::DataType,
record_batch::RecordBatch,
util::pretty,
};

use datafusion::prelude::*;
Expand Down Expand Up @@ -141,11 +140,8 @@ async fn main() -> Result<()> {

// note that "b" is f32, not f64. DataFusion coerces the types to match the UDF's signature.

// execute the query
let results = df.collect().await?;

// print the results
pretty::print_batches(&results)?;
df.show().await?;

Ok(())
}
30 changes: 30 additions & 0 deletions datafusion/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,36 @@ pub trait DataFrame: Send + Sync {
/// ```
async fn collect(&self) -> Result<Vec<RecordBatch>>;

/// Print results.
///
/// ```
/// # use datafusion::prelude::*;
/// # use datafusion::error::Result;
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// let mut ctx = ExecutionContext::new();
/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
/// df.show().await?;
/// # Ok(())
/// # }
/// ```
async fn show(&self) -> Result<()>;

/// Print results and limit rows.
///
/// ```
/// # use datafusion::prelude::*;
/// # use datafusion::error::Result;
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// let mut ctx = ExecutionContext::new();
/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
/// df.show_limit(10).await?;
/// # Ok(())
/// # }
/// ```
async fn show_limit(&self, n: usize) -> Result<()>;

/// Executes this DataFrame and returns a stream over a single partition
///
/// ```
Expand Down
13 changes: 13 additions & 0 deletions datafusion/src/execution/dataframe_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::{
physical_plan::{collect, collect_partitioned},
};

use crate::arrow::util::pretty;
use crate::physical_plan::{
execute_stream, execute_stream_partitioned, ExecutionPlan, SendableRecordBatchStream,
};
Expand Down Expand Up @@ -156,6 +157,18 @@ impl DataFrame for DataFrameImpl {
Ok(collect(plan).await?)
}

/// Print results.
async fn show(&self) -> Result<()> {
let results = self.collect().await?;
Ok(pretty::print_batches(&results)?)
}

/// Print results and limit rows.
async fn show_limit(&self, num: usize) -> Result<()> {
let results = self.limit(num)?.collect().await?;
Ok(pretty::print_batches(&results)?)
}

/// Convert the logical plan represented by this DataFrame into a physical plan and
/// execute it, returning a stream over a single partition
async fn execute_stream(&self) -> Result<SendableRecordBatchStream> {
Expand Down
9 changes: 3 additions & 6 deletions docs/user-guide/src/example-usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ async fn main() -> datafusion::error::Result<()> {
let df = ctx.sql("SELECT a, MIN(b) FROM example GROUP BY a LIMIT 100")?;

// execute and print results
let results: Vec<RecordBatch> = df.collect().await?;
print_batches(&results)?;
df.show().await?;
Ok(())
}
```
Expand All @@ -56,12 +55,10 @@ async fn main() -> datafusion::error::Result<()> {
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;

let df = df.filter(col("a").lt_eq(col("b")))?
.aggregate(vec![col("a")], vec![min(col("b"))])?
.limit(100)?;
.aggregate(vec![col("a")], vec![min(col("b"))])?;

// execute and print results
let results: Vec<RecordBatch> = df.collect().await?;
print_batches(&results)?;
df.show_limit(100).await?;
Ok(())
}
```
Expand Down
6 changes: 2 additions & 4 deletions pre-commit.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
# This file is git pre-commit hook.
#
# Soft link it as git hook under top dir of apache arrow git repository:
# $ ln -s ../../rust/pre-commit.sh .git/hooks/pre-commit
# $ ln -s ../../pre-commit.sh .git/hooks/pre-commit
#
# This file be run directly:
# $ ./pre-commit.sh
Expand All @@ -37,14 +37,12 @@ function BYELLOW() {
echo "\033[1;33m$@\033[0m"
}

RUST_DIR="rust"

# env GIT_DIR is set by git when run a pre-commit hook.
if [ -z "${GIT_DIR}" ]; then
GIT_DIR=$(git rev-parse --show-toplevel)
fi

cd ${GIT_DIR}/${RUST_DIR}
cd ${GIT_DIR}

NUM_CHANGES=$(git diff --cached --name-only . |
grep -e ".*/*.rs$" |
Expand Down

0 comments on commit 5871207

Please sign in to comment.