Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix common::grpc compiler errors #722

Merged
merged 2 commits into from
Dec 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/common/grpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ common-recordbatch = { path = "../recordbatch" }
common-runtime = { path = "../runtime" }
dashmap = "5.4"
datafusion = "14.0.0"
datatypes = { path = "../../datatypes" }
snafu = { version = "0.7", features = ["backtraces"] }
tokio = { version = "1.0", features = ["full"] }
tonic = "0.8"
Expand Down
211 changes: 136 additions & 75 deletions src/common/grpc/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use api::helper::ColumnDataTypeWrapper;
use api::result::{build_err_result, ObjectResultBuilder};
use api::v1::codec::SelectResult;
Expand All @@ -24,9 +22,14 @@ use common_error::prelude::ErrorExt;
use common_error::status_code::StatusCode;
use common_query::Output;
use common_recordbatch::{util, RecordBatches, SendableRecordBatchStream};
use datatypes::arrow::array::{Array, BooleanArray, PrimitiveArray};
use datatypes::arrow_array::{BinaryArray, StringArray};
use datatypes::schema::SchemaRef;
use datatypes::types::{TimestampType, WrapperType};
use datatypes::vectors::{
BinaryVector, BooleanVector, DateTimeVector, DateVector, Float32Vector, Float64Vector,
Int16Vector, Int32Vector, Int64Vector, Int8Vector, StringVector, TimestampMicrosecondVector,
TimestampMillisecondVector, TimestampNanosecondVector, TimestampSecondVector, UInt16Vector,
UInt32Vector, UInt64Vector, UInt8Vector, VectorRef,
};
use snafu::{OptionExt, ResultExt};

use crate::error::{self, ConversionSnafu, Result};
Expand All @@ -46,6 +49,7 @@ pub async fn to_object_result(output: std::result::Result<Output, impl ErrorExt>
Err(e) => build_err_result(&e),
}
}

async fn collect(stream: SendableRecordBatchStream) -> Result<ObjectResult> {
let schema = stream.schema();

Expand Down Expand Up @@ -82,20 +86,17 @@ fn try_convert(record_batches: RecordBatches) -> Result<SelectResult> {
let schema = record_batches.schema();
let record_batches = record_batches.take();

let row_count: usize = record_batches
.iter()
.map(|r| r.df_recordbatch.num_rows())
.sum();
let row_count: usize = record_batches.iter().map(|r| r.num_rows()).sum();

let schemas = schema.column_schemas();
let mut columns = Vec::with_capacity(schemas.len());

for (idx, column_schema) in schemas.iter().enumerate() {
let column_name = column_schema.name.clone();

let arrays: Vec<Arc<dyn Array>> = record_batches
let arrays: Vec<_> = record_batches
.iter()
.map(|r| r.df_recordbatch.columns()[idx].clone())
.map(|r| r.column(idx).clone())
.collect();

let column = Column {
Expand All @@ -116,7 +117,7 @@ fn try_convert(record_batches: RecordBatches) -> Result<SelectResult> {
})
}

pub fn null_mask(arrays: &Vec<Arc<dyn Array>>, row_count: usize) -> Vec<u8> {
pub fn null_mask(arrays: &[VectorRef], row_count: usize) -> Vec<u8> {
let null_count: usize = arrays.iter().map(|a| a.null_count()).sum();

if null_count == 0 {
Expand All @@ -126,18 +127,22 @@ pub fn null_mask(arrays: &Vec<Arc<dyn Array>>, row_count: usize) -> Vec<u8> {
let mut null_mask = BitVec::with_capacity(row_count);
for array in arrays {
let validity = array.validity();
if let Some(v) = validity {
v.iter().for_each(|x| null_mask.push(!x));
} else {
if validity.is_all_valid() {
null_mask.extend_from_bitslice(&BitVec::repeat(false, array.len()));
} else {
for i in 0..array.len() {
null_mask.push(!validity.is_set(i));
}
}
}
null_mask.into_vec()
}

macro_rules! convert_arrow_array_to_grpc_vals {
($data_type: expr, $arrays: ident, $(($Type: pat, $CastType: ty, $field: ident, $MapFunction: expr)), +) => {{
use datatypes::arrow::datatypes::{DataType, TimeUnit};
use datatypes::data_type::{ConcreteDataType};
use datatypes::prelude::ScalarVector;

match $data_type {
$(
$Type => {
Expand All @@ -147,52 +152,114 @@ macro_rules! convert_arrow_array_to_grpc_vals {
from: format!("{:?}", $data_type),
})?;
vals.$field.extend(array
.iter()
.iter_data()
.filter_map(|i| i.map($MapFunction))
.collect::<Vec<_>>());
}
return Ok(vals);
},
)+
_ => unimplemented!(),
ConcreteDataType::Null(_) | ConcreteDataType::List(_) => unreachable!("Should not send {:?} in gRPC", $data_type),
v0y4g3r marked this conversation as resolved.
Show resolved Hide resolved
}
}};
}

pub fn values(arrays: &[Arc<dyn Array>]) -> Result<Values> {
pub fn values(arrays: &[VectorRef]) -> Result<Values> {
if arrays.is_empty() {
return Ok(Values::default());
}
let data_type = arrays[0].data_type();

convert_arrow_array_to_grpc_vals!(
data_type, arrays,

(DataType::Boolean, BooleanArray, bool_values, |x| {x}),

(DataType::Int8, PrimitiveArray<i8>, i8_values, |x| {*x as i32}),
(DataType::Int16, PrimitiveArray<i16>, i16_values, |x| {*x as i32}),
(DataType::Int32, PrimitiveArray<i32>, i32_values, |x| {*x}),
(DataType::Int64, PrimitiveArray<i64>, i64_values, |x| {*x}),

(DataType::UInt8, PrimitiveArray<u8>, u8_values, |x| {*x as u32}),
(DataType::UInt16, PrimitiveArray<u16>, u16_values, |x| {*x as u32}),
(DataType::UInt32, PrimitiveArray<u32>, u32_values, |x| {*x}),
(DataType::UInt64, PrimitiveArray<u64>, u64_values, |x| {*x}),

(DataType::Float32, PrimitiveArray<f32>, f32_values, |x| {*x}),
(DataType::Float64, PrimitiveArray<f64>, f64_values, |x| {*x}),

(DataType::Binary, BinaryArray, binary_values, |x| {x.into()}),
(DataType::LargeBinary, BinaryArray, binary_values, |x| {x.into()}),

(DataType::Utf8, StringArray, string_values, |x| {x.into()}),
(DataType::LargeUtf8, StringArray, string_values, |x| {x.into()}),

(DataType::Date32, PrimitiveArray<i32>, date_values, |x| {*x as i32}),
(DataType::Date64, PrimitiveArray<i64>, datetime_values,|x| {*x as i64}),

(DataType::Timestamp(TimeUnit::Millisecond, _), PrimitiveArray<i64>, ts_millis_values, |x| {*x})
data_type,
evenyag marked this conversation as resolved.
Show resolved Hide resolved
arrays,
(
ConcreteDataType::Boolean(_),
BooleanVector,
bool_values,
|x| { x }
),
(ConcreteDataType::Int8(_), Int8Vector, i8_values, |x| {
i32::from(x)
}),
(ConcreteDataType::Int16(_), Int16Vector, i16_values, |x| {
i32::from(x)
}),
(ConcreteDataType::Int32(_), Int32Vector, i32_values, |x| {
x
}),
(ConcreteDataType::Int64(_), Int64Vector, i64_values, |x| {
x
}),
(ConcreteDataType::UInt8(_), UInt8Vector, u8_values, |x| {
u32::from(x)
}),
(ConcreteDataType::UInt16(_), UInt16Vector, u16_values, |x| {
u32::from(x)
}),
(ConcreteDataType::UInt32(_), UInt32Vector, u32_values, |x| {
x
}),
(ConcreteDataType::UInt64(_), UInt64Vector, u64_values, |x| {
x
}),
(
ConcreteDataType::Float32(_),
Float32Vector,
f32_values,
|x| { x }
),
(
ConcreteDataType::Float64(_),
Float64Vector,
f64_values,
|x| { x }
),
(
ConcreteDataType::Binary(_),
BinaryVector,
binary_values,
|x| { x.into() }
),
(
ConcreteDataType::String(_),
StringVector,
string_values,
|x| { x.into() }
),
(ConcreteDataType::Date(_), DateVector, date_values, |x| {
x.val()
}),
(
ConcreteDataType::DateTime(_),
DateTimeVector,
datetime_values,
|x| { x.val() }
),
(
ConcreteDataType::Timestamp(TimestampType::Second(_)),
TimestampSecondVector,
ts_second_values,
|x| { x.into_native() }
),
(
ConcreteDataType::Timestamp(TimestampType::Millisecond(_)),
TimestampMillisecondVector,
ts_millisecond_values,
|x| { x.into_native() }
),
(
ConcreteDataType::Timestamp(TimestampType::Microsecond(_)),
TimestampMicrosecondVector,
ts_microsecond_values,
|x| { x.into_native() }
),
(
ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)),
TimestampNanosecondVector,
ts_nanosecond_values,
|x| { x.into_native() }
)
)
}

Expand All @@ -201,14 +268,10 @@ mod tests {
use std::sync::Arc;

use common_recordbatch::{RecordBatch, RecordBatches};
use datafusion::field_util::SchemaExt;
use datatypes::arrow::array::{Array, BooleanArray, PrimitiveArray};
use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
use datatypes::arrow_array::StringArray;
use datatypes::schema::Schema;
use datatypes::vectors::{UInt32Vector, VectorRef};
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};

use crate::select::{null_mask, try_convert, values};
use super::*;

#[test]
fn test_convert_record_batches_to_select_result() {
Expand All @@ -234,9 +297,8 @@ mod tests {

#[test]
fn test_convert_arrow_arrays_i32() {
let array: PrimitiveArray<i32> =
PrimitiveArray::from(vec![Some(1), Some(2), None, Some(3)]);
let array: Arc<dyn Array> = Arc::new(array);
let array = Int32Vector::from(vec![Some(1), Some(2), None, Some(3)]);
let array: VectorRef = Arc::new(array);

let values = values(&[array]).unwrap();

Expand All @@ -245,14 +307,14 @@ mod tests {

#[test]
fn test_convert_arrow_arrays_string() {
let array = StringArray::from(vec![
let array = StringVector::from(vec![
Some("1".to_string()),
Some("2".to_string()),
None,
Some("3".to_string()),
None,
]);
let array: Arc<dyn Array> = Arc::new(array);
let array: VectorRef = Arc::new(array);

let values = values(&[array]).unwrap();

Expand All @@ -261,8 +323,8 @@ mod tests {

#[test]
fn test_convert_arrow_arrays_bool() {
let array = BooleanArray::from(vec![Some(true), Some(false), None, Some(false), None]);
let array: Arc<dyn Array> = Arc::new(array);
let array = BooleanVector::from(vec![Some(true), Some(false), None, Some(false), None]);
let array: VectorRef = Arc::new(array);

let values = values(&[array]).unwrap();

Expand All @@ -271,43 +333,42 @@ mod tests {

#[test]
fn test_convert_arrow_arrays_empty() {
let array = BooleanArray::from(vec![None, None, None, None, None]);
let array: Arc<dyn Array> = Arc::new(array);
let array = BooleanVector::from(vec![None, None, None, None, None]);
let array: VectorRef = Arc::new(array);

let values = values(&[array]).unwrap();

assert_eq!(Vec::<bool>::default(), values.bool_values);
assert!(values.bool_values.is_empty());
}

#[test]
fn test_null_mask() {
let a1: Arc<dyn Array> = Arc::new(PrimitiveArray::from(vec![None, Some(2), None]));
let a2: Arc<dyn Array> =
Arc::new(PrimitiveArray::from(vec![Some(1), Some(2), None, Some(4)]));
let a1: VectorRef = Arc::new(Int32Vector::from(vec![None, Some(2), None]));
let a2: VectorRef = Arc::new(Int32Vector::from(vec![Some(1), Some(2), None, Some(4)]));
let mask = null_mask(&vec![a1, a2], 3 + 4);
assert_eq!(vec![0b0010_0101], mask);

let empty: Arc<dyn Array> = Arc::new(PrimitiveArray::<i32>::from(vec![None, None, None]));
let empty: VectorRef = Arc::new(Int32Vector::from(vec![None, None, None]));
let mask = null_mask(&vec![empty.clone(), empty.clone(), empty], 9);
assert_eq!(vec![0b1111_1111, 0b0000_0001], mask);

let a1: Arc<dyn Array> = Arc::new(PrimitiveArray::from(vec![Some(1), Some(2), Some(3)]));
let a2: Arc<dyn Array> = Arc::new(PrimitiveArray::from(vec![Some(4), Some(5), Some(6)]));
let a1: VectorRef = Arc::new(Int32Vector::from(vec![Some(1), Some(2), Some(3)]));
let a2: VectorRef = Arc::new(Int32Vector::from(vec![Some(4), Some(5), Some(6)]));
let mask = null_mask(&vec![a1, a2], 3 + 3);
assert_eq!(Vec::<u8>::default(), mask);

let a1: Arc<dyn Array> = Arc::new(PrimitiveArray::from(vec![Some(1), Some(2), Some(3)]));
let a2: Arc<dyn Array> = Arc::new(PrimitiveArray::from(vec![Some(4), Some(5), None]));
let a1: VectorRef = Arc::new(Int32Vector::from(vec![Some(1), Some(2), Some(3)]));
let a2: VectorRef = Arc::new(Int32Vector::from(vec![Some(4), Some(5), None]));
let mask = null_mask(&vec![a1, a2], 3 + 3);
assert_eq!(vec![0b0010_0000], mask);
}

fn mock_record_batch() -> RecordBatch {
let arrow_schema = Arc::new(ArrowSchema::new(vec![
Field::new("c1", DataType::UInt32, false),
Field::new("c2", DataType::UInt32, false),
]));
let schema = Arc::new(Schema::try_from(arrow_schema).unwrap());
let column_schemas = vec![
ColumnSchema::new("c1", ConcreteDataType::uint32_datatype(), true),
ColumnSchema::new("c2", ConcreteDataType::uint32_datatype(), true),
];
let schema = Arc::new(Schema::try_new(column_schemas).unwrap());

let v1 = Arc::new(UInt32Vector::from(vec![Some(1), Some(2), None]));
let v2 = Arc::new(UInt32Vector::from(vec![Some(1), None, None]));
Expand Down
Loading