Skip to content

Commit

Permalink
add row number
Browse files Browse the repository at this point in the history
  • Loading branch information
Jiayu Liu committed May 21, 2021
1 parent bf5b8a5 commit 1723926
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 3 deletions.
70 changes: 70 additions & 0 deletions datafusion/src/physical_plan/expressions/row_number.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Defines physical expressions that can evaluated at runtime during query execution
use std::any::Any;
use std::convert::TryFrom;
use std::sync::Arc;

use crate::error::{DataFusionError, Result};
use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
use crate::scalar::ScalarValue;
use arrow::compute;
use arrow::datatypes::{DataType, TimeUnit};
use arrow::{
array::{
ArrayRef, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array,
LargeStringArray, StringArray, TimestampMicrosecondArray, TimestampMillisecondArray,
TimestampNanosecondArray, TimestampSecondArray, UInt16Array, UInt32Array,
UInt64Array, UInt8Array,
},
datatypes::Field,
};

pub struct RowNumber {
name: String,
expr: Arc<dyn PhysicalSortExpr>,
}

impl RowNumber {
/// Create a new MAX aggregate function
pub fn new(expr: Arc<dyn PhysicalExpr>, name: String) -> Self {
Self { name, expr }
}
}

impl BuiltInWindowFunctionExpr for RowNumber {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
self
}

fn field(&self) -> Result<Field> {
let nullable = false;
let data_type = DataType::UInt64;
Ok(Field::new(&self.name, data_type, nullable))
}

fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.expr.clone()]
}

fn name(&self) -> &str {
&self.name
}
}
16 changes: 16 additions & 0 deletions datafusion/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,22 @@ pub trait WindowExpr: Send + Sync + Debug {
}
}

/// A window expression that is a built-in window function
pub trait BuiltInWindowFunctionExpr: Send + Sync + Debug {
/// Returns the aggregate expression as [`Any`](std::any::Any) so that it can be
/// downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;

/// the field of the final result of this aggregation.
fn field(&self) -> Result<Field>;

/// Human readable name such as `"MIN(c2)"` or `"RANK()"`. The default
/// implementation returns placeholder text.
fn name(&self) -> &str {
"BuiltInWindowFunctionExpr: default name"
}
}

/// An accumulator represents a stateful object that lives throughout the evaluation of multiple rows and
/// generically accumulates values. An accumulator knows how to:
/// * update its state from inputs via `update`
Expand Down
23 changes: 20 additions & 3 deletions datafusion/src/physical_plan/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
use crate::error::{DataFusionError, Result};
use crate::physical_plan::{
aggregates, expressions::Column, window_functions::WindowFunction, AggregateExpr,
Distribution, ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream,
SendableRecordBatchStream, WindowExpr,
BuiltInWindowFunctionExpr, Distribution, ExecutionPlan, Partitioning, PhysicalExpr,
RecordBatchStream, SendableRecordBatchStream, WindowExpr,
};
use arrow::{
array::{Array, UInt32Builder},
Expand Down Expand Up @@ -80,7 +80,24 @@ pub fn create_window_expr(

/// A window expr that takes the form of a built in window function
#[derive(Debug)]
pub struct BuiltInWindowExpr {}
pub struct BuiltInWindowExpr {
window: Arc<dyn BuiltInWindowFunctionExpr>,
}

impl WindowExpr for BuiltInWindowExpr {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
&self.window.name()
}

fn field(&self) -> Result<Field> {
self.window.field()
}
}

/// A window expr that takes the form of an aggregate function
#[derive(Debug)]
Expand Down

0 comments on commit 1723926

Please sign in to comment.