Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: replace arrow2 with official implementation 🎉 #753

Merged
merged 45 commits into from
Dec 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
571a84d
chore: kick off. change datafusion/arrow/parquet to target version
waynexia Nov 21, 2022
e3201a4
chore: replace one last datafusion dep
waynexia Nov 21, 2022
d7626fd
feat: arrow_array switch to arrow
evenyag Nov 21, 2022
61c4a36
chore: update dep of binary vector
evenyag Nov 21, 2022
221f3e9
Merge branch 'dev' into replace-arrow2
waynexia Nov 21, 2022
d0686f9
Merge branch 'replace-arrow2' of github.com:GreptimeTeam/greptimedb i…
waynexia Nov 21, 2022
7151deb
Merge branch 'dev' into replace-arrow2
waynexia Dec 5, 2022
504059a
chore: fix wrong merge commit
waynexia Dec 5, 2022
cc1ec26
feat: Switch to datatypes2
evenyag Dec 5, 2022
fe505fe
feat: Make recordbatch compile
evenyag Dec 6, 2022
99371fd
chore: sort Cargo.toml
evenyag Dec 6, 2022
8c66b7d
feat: Fix common::recordbatch compiler errors
evenyag Dec 6, 2022
3c0adb0
feat: Fix recordbatch test compiling issue
evenyag Dec 6, 2022
b48ae21
fix: api crate (#708)
v0y4g3r Dec 6, 2022
0ccb8b4
chore: delete datatypes based on arrow2
v0y4g3r Dec 6, 2022
b32438e
feat: Fix some compiler errors in common::query (#710)
evenyag Dec 6, 2022
829ff49
fix: common-query subcrate (#712)
waynexia Dec 6, 2022
653906d
fix: Fix common::query compiler errors (#713)
evenyag Dec 6, 2022
551cde2
Merge branch 'dev' into replace-arrow2
waynexia Dec 7, 2022
2ba9925
feat: Implements diff accumulator using WrapperType (#715)
evenyag Dec 7, 2022
fb0b4eb
fix: fix compile error for mean/polyval/pow/interp ops
waynexia Dec 7, 2022
a562199
Revert "fix: fix compile error for mean/polyval/pow/interp ops"
waynexia Dec 7, 2022
a898f84
fix: Fix compiler errors in argmax/rate/median/norm_cdf (#716)
evenyag Dec 7, 2022
6f3baf9
fix: fix compile error for mean/polyval/pow/interp ops (#717)
waynexia Dec 7, 2022
58c26de
fix: fix argmin/percentile/clip/interp/scipy_stats_norm_pdf errors (#…
evenyag Dec 7, 2022
587bdc9
fix: fix other compile error in common-function (#719)
waynexia Dec 8, 2022
3687bc7
fix: Fix tests and clippy for common-function subcrate (#726)
waynexia Dec 8, 2022
1bde1ba
fix: row group pruning (#725)
v0y4g3r Dec 8, 2022
b936d8b
fix: Fix common::grpc compiler errors (#722)
evenyag Dec 8, 2022
fff530c
fix common record batch
waynexia Dec 8, 2022
d0892bf
fix: Fix compile error in server subcrate (#727)
waynexia Dec 8, 2022
42fdc72
fix: Fix common grpc expr (#730)
waynexia Dec 9, 2022
95b2d86
fix: pre-cast to avoid tremendous match arms (#734)
waynexia Dec 9, 2022
4defde0
feat: upgrade storage crate to arrow and parquet offcial impl (#738)
v0y4g3r Dec 13, 2022
4b644aa
fix: Fix compiler errors in catalog and mito crates (#742)
evenyag Dec 13, 2022
a712382
Merge pull request #745
waynexia Dec 13, 2022
36c929e
fix: Fix imports in optimizer.rs
evenyag Dec 13, 2022
fa971c6
fix: errors in optimzer
v0y4g3r Dec 13, 2022
652d59a
fix: remove unwrap
v0y4g3r Dec 13, 2022
dbb3034
fix: Fix compiler errors in query crate (#746)
evenyag Dec 14, 2022
ce6d1cb
fix: frontend compile errors (#747)
v0y4g3r Dec 14, 2022
142dee4
fix: Fix compiler errors in script crate (#749)
evenyag Dec 15, 2022
7c696da
Merge branch 'develop' into replace-arrow2
evenyag Dec 15, 2022
0f3dcc1
fix: Fix All The Tests! (#752)
waynexia Dec 15, 2022
a8630cd
fix: clippy errors
v0y4g3r Dec 15, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,536 changes: 828 additions & 708 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ members = [
"src/common/time",
"src/datanode",
"src/datatypes",
"src/datatypes2",
"src/frontend",
"src/log-store",
"src/meta-client",
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ edition = "2021"
license = "Apache-2.0"

[dependencies]
arrow = "10"
arrow = "26.0.0"
clap = { version = "4.0", features = ["derive"] }
client = { path = "../src/client" }
indicatif = "0.17.1"
itertools = "0.10.5"
parquet = { version = "*" }
parquet = "26.0.0"
tokio = { version = "1.21", features = ["full"] }
22 changes: 12 additions & 10 deletions benchmarks/src/bin/nyc-taxi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Instant;

use arrow::array::{ArrayRef, PrimitiveArray, StringArray, TimestampNanosecondArray};
Expand All @@ -32,9 +31,7 @@ use client::api::v1::column::Values;
use client::api::v1::{Column, ColumnDataType, ColumnDef, CreateExpr, InsertExpr};
use client::{Client, Database, Select};
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use parquet::arrow::{ArrowReader, ParquetFileArrowReader};
use parquet::file::reader::FileReader;
use parquet::file::serialized_reader::SerializedFileReader;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use tokio::task::JoinSet;

const DATABASE_NAME: &str = "greptime";
Expand Down Expand Up @@ -86,10 +83,14 @@ async fn write_data(
pb_style: ProgressStyle,
) -> u128 {
let file = std::fs::File::open(&path).unwrap();
let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
let row_num = file_reader.metadata().file_metadata().num_rows();
let record_batch_reader = ParquetFileArrowReader::new(file_reader)
.get_record_reader(batch_size)
let record_batch_reader_builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
let row_num = record_batch_reader_builder
.metadata()
.file_metadata()
.num_rows();
let record_batch_reader = record_batch_reader_builder
.with_batch_size(batch_size)
.build()
.unwrap();
let progress_bar = mpb.add(ProgressBar::new(row_num as _));
progress_bar.set_style(pb_style);
Expand Down Expand Up @@ -210,9 +211,10 @@ fn build_values(column: &ArrayRef) -> Values {
| DataType::FixedSizeList(_, _)
| DataType::LargeList(_)
| DataType::Struct(_)
| DataType::Union(_, _)
| DataType::Union(_, _, _)
| DataType::Dictionary(_, _)
| DataType::Decimal(_, _)
| DataType::Decimal128(_, _)
| DataType::Decimal256(_, _)
| DataType::Map(_, _) => todo!(),
}
}
Expand Down
10 changes: 8 additions & 2 deletions src/api/greptime/v1/column.proto
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ message Column {

repeated int32 date_values = 14;
repeated int64 datetime_values = 15;
repeated int64 ts_millis_values = 16;
repeated int64 ts_second_values = 16;
repeated int64 ts_millisecond_values = 17;
repeated int64 ts_microsecond_values = 18;
repeated int64 ts_nanosecond_values = 19;
}
// The array of non-null values in this column.
//
Expand Down Expand Up @@ -75,5 +78,8 @@ enum ColumnDataType {
STRING = 12;
DATE = 13;
DATETIME = 14;
TIMESTAMP = 15;
TIMESTAMP_SECOND = 15;
TIMESTAMP_MILLISECOND = 16;
TIMESTAMP_MICROSECOND = 17;
TIMESTAMP_NANOSECOND = 18;
}
104 changes: 89 additions & 15 deletions src/api/src/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use common_base::BitVec;
use common_time::timestamp::TimeUnit;
use datatypes::prelude::ConcreteDataType;
use datatypes::types::TimestampType;
use datatypes::value::Value;
use datatypes::vectors::VectorRef;
use snafu::prelude::*;
Expand Down Expand Up @@ -56,7 +57,16 @@ impl From<ColumnDataTypeWrapper> for ConcreteDataType {
ColumnDataType::String => ConcreteDataType::string_datatype(),
ColumnDataType::Date => ConcreteDataType::date_datatype(),
ColumnDataType::Datetime => ConcreteDataType::datetime_datatype(),
ColumnDataType::Timestamp => ConcreteDataType::timestamp_millis_datatype(),
ColumnDataType::TimestampSecond => ConcreteDataType::timestamp_second_datatype(),
ColumnDataType::TimestampMillisecond => {
ConcreteDataType::timestamp_millisecond_datatype()
}
ColumnDataType::TimestampMicrosecond => {
ConcreteDataType::timestamp_microsecond_datatype()
}
ColumnDataType::TimestampNanosecond => {
ConcreteDataType::timestamp_nanosecond_datatype()
}
}
}
}
Expand All @@ -81,7 +91,12 @@ impl TryFrom<ConcreteDataType> for ColumnDataTypeWrapper {
ConcreteDataType::String(_) => ColumnDataType::String,
ConcreteDataType::Date(_) => ColumnDataType::Date,
ConcreteDataType::DateTime(_) => ColumnDataType::Datetime,
ConcreteDataType::Timestamp(_) => ColumnDataType::Timestamp,
ConcreteDataType::Timestamp(unit) => match unit {
TimestampType::Second(_) => ColumnDataType::TimestampSecond,
TimestampType::Millisecond(_) => ColumnDataType::TimestampMillisecond,
TimestampType::Microsecond(_) => ColumnDataType::TimestampMicrosecond,
TimestampType::Nanosecond(_) => ColumnDataType::TimestampNanosecond,
},
ConcreteDataType::Null(_) | ConcreteDataType::List(_) => {
return error::IntoColumnDataTypeSnafu { from: datatype }.fail()
}
Expand Down Expand Up @@ -153,8 +168,20 @@ impl Values {
datetime_values: Vec::with_capacity(capacity),
..Default::default()
},
ColumnDataType::Timestamp => Values {
ts_millis_values: Vec::with_capacity(capacity),
ColumnDataType::TimestampSecond => Values {
ts_second_values: Vec::with_capacity(capacity),
..Default::default()
},
ColumnDataType::TimestampMillisecond => Values {
ts_millisecond_values: Vec::with_capacity(capacity),
..Default::default()
},
ColumnDataType::TimestampMicrosecond => Values {
ts_microsecond_values: Vec::with_capacity(capacity),
..Default::default()
},
ColumnDataType::TimestampNanosecond => Values {
ts_nanosecond_values: Vec::with_capacity(capacity),
..Default::default()
},
}
Expand Down Expand Up @@ -187,9 +214,12 @@ impl Column {
Value::Binary(val) => values.binary_values.push(val.to_vec()),
Value::Date(val) => values.date_values.push(val.val()),
Value::DateTime(val) => values.datetime_values.push(val.val()),
Value::Timestamp(val) => values
.ts_millis_values
.push(val.convert_to(TimeUnit::Millisecond)),
Value::Timestamp(val) => match val.unit() {
TimeUnit::Second => values.ts_second_values.push(val.value()),
TimeUnit::Millisecond => values.ts_millisecond_values.push(val.value()),
TimeUnit::Microsecond => values.ts_microsecond_values.push(val.value()),
TimeUnit::Nanosecond => values.ts_nanosecond_values.push(val.value()),
},
Value::List(_) => unreachable!(),
});
self.null_mask = null_mask.into_vec();
Expand All @@ -200,7 +230,10 @@ impl Column {
mod tests {
use std::sync::Arc;

use datatypes::vectors::BooleanVector;
use datatypes::vectors::{
BooleanVector, TimestampMicrosecondVector, TimestampMillisecondVector,
TimestampNanosecondVector, TimestampSecondVector,
};

use super::*;

Expand Down Expand Up @@ -258,8 +291,8 @@ mod tests {
let values = values.datetime_values;
assert_eq!(2, values.capacity());

let values = Values::with_capacity(ColumnDataType::Timestamp, 2);
let values = values.ts_millis_values;
let values = Values::with_capacity(ColumnDataType::TimestampMillisecond, 2);
let values = values.ts_millisecond_values;
assert_eq!(2, values.capacity());
}

Expand Down Expand Up @@ -326,8 +359,8 @@ mod tests {
ColumnDataTypeWrapper(ColumnDataType::Datetime).into()
);
assert_eq!(
ConcreteDataType::timestamp_millis_datatype(),
ColumnDataTypeWrapper(ColumnDataType::Timestamp).into()
ConcreteDataType::timestamp_millisecond_datatype(),
ColumnDataTypeWrapper(ColumnDataType::TimestampMillisecond).into()
);
}

Expand Down Expand Up @@ -394,8 +427,8 @@ mod tests {
ConcreteDataType::datetime_datatype().try_into().unwrap()
);
assert_eq!(
ColumnDataTypeWrapper(ColumnDataType::Timestamp),
ConcreteDataType::timestamp_millis_datatype()
ColumnDataTypeWrapper(ColumnDataType::TimestampMillisecond),
ConcreteDataType::timestamp_millisecond_datatype()
.try_into()
.unwrap()
);
Expand All @@ -412,7 +445,48 @@ mod tests {
assert!(result.is_err());
assert_eq!(
result.unwrap_err().to_string(),
"Failed to create column datatype from List(ListType { inner: Boolean(BooleanType) })"
"Failed to create column datatype from List(ListType { item_type: Boolean(BooleanType) })"
);
}

#[test]
fn test_column_put_timestamp_values() {
let mut column = Column {
column_name: "test".to_string(),
semantic_type: 0,
values: Some(Values {
..Default::default()
}),
null_mask: vec![],
datatype: 0,
};

let vector = Arc::new(TimestampNanosecondVector::from_vec(vec![1, 2, 3]));
column.push_vals(3, vector);
assert_eq!(
vec![1, 2, 3],
column.values.as_ref().unwrap().ts_nanosecond_values
);

let vector = Arc::new(TimestampMillisecondVector::from_vec(vec![4, 5, 6]));
column.push_vals(3, vector);
assert_eq!(
vec![4, 5, 6],
column.values.as_ref().unwrap().ts_millisecond_values
);

let vector = Arc::new(TimestampMicrosecondVector::from_vec(vec![7, 8, 9]));
column.push_vals(3, vector);
assert_eq!(
vec![7, 8, 9],
column.values.as_ref().unwrap().ts_microsecond_values
);

let vector = Arc::new(TimestampSecondVector::from_vec(vec![10, 11, 12]));
column.push_vals(3, vector);
assert_eq!(
vec![10, 11, 12],
column.values.as_ref().unwrap().ts_second_values
);
}

Expand Down
4 changes: 1 addition & 3 deletions src/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ common-recordbatch = { path = "../common/recordbatch" }
common-runtime = { path = "../common/runtime" }
common-telemetry = { path = "../common/telemetry" }
common-time = { path = "../common/time" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = [
"simd",
] }
datafusion = "14.0.0"
datatypes = { path = "../datatypes" }
futures = "0.3"
futures-util = "0.3"
Expand Down
23 changes: 9 additions & 14 deletions src/catalog/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::any::Any;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::prelude::{Snafu, StatusCode};
use datafusion::error::DataFusionError;
use datatypes::arrow;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::RawSchema;
use snafu::{Backtrace, ErrorCompat};

Expand Down Expand Up @@ -51,14 +51,12 @@ pub enum Error {
SystemCatalog { msg: String, backtrace: Backtrace },

#[snafu(display(
"System catalog table type mismatch, expected: binary, found: {:?} source: {}",
"System catalog table type mismatch, expected: binary, found: {:?}",
data_type,
source
))]
SystemCatalogTypeMismatch {
data_type: arrow::datatypes::DataType,
#[snafu(backtrace)]
source: datatypes::error::Error,
data_type: ConcreteDataType,
backtrace: Backtrace,
},

#[snafu(display("Invalid system catalog entry type: {:?}", entry_type))]
Expand Down Expand Up @@ -222,10 +220,11 @@ impl ErrorExt for Error {
| Error::ValueDeserialize { .. }
| Error::Io { .. } => StatusCode::StorageUnavailable,

Error::RegisterTable { .. } => StatusCode::Internal,
Error::RegisterTable { .. } | Error::SystemCatalogTypeMismatch { .. } => {
StatusCode::Internal
}

Error::ReadSystemCatalog { source, .. } => source.status_code(),
Error::SystemCatalogTypeMismatch { source, .. } => source.status_code(),
Error::InvalidCatalogValue { source, .. } => source.status_code(),

Error::TableExists { .. } => StatusCode::TableAlreadyExists,
Expand Down Expand Up @@ -265,7 +264,6 @@ impl From<Error> for DataFusionError {
#[cfg(test)]
mod tests {
use common_error::mock::MockError;
use datatypes::arrow::datatypes::DataType;
use snafu::GenerateImplicitData;

use super::*;
Expand Down Expand Up @@ -314,11 +312,8 @@ mod tests {
assert_eq!(
StatusCode::Internal,
Error::SystemCatalogTypeMismatch {
data_type: DataType::Boolean,
source: datatypes::error::Error::UnsupportedArrowType {
arrow_type: DataType::Boolean,
backtrace: Backtrace::generate()
}
data_type: ConcreteDataType::binary_datatype(),
backtrace: Backtrace::generate(),
}
.status_code()
);
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/src/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ impl TableGlobalKey {

/// Table global info contains necessary info for a datanode to create table regions, including
/// table id, table meta(schema...), region id allocation across datanodes.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct TableGlobalValue {
/// Id of datanode that created the global table info kv. only for debugging.
pub node_id: u64,
Expand Down
33 changes: 20 additions & 13 deletions src/catalog/src/local/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,27 +145,34 @@ impl LocalCatalogManager {
/// Convert `RecordBatch` to a vector of `Entry`.
fn record_batch_to_entry(rb: RecordBatch) -> Result<Vec<Entry>> {
ensure!(
rb.df_recordbatch.columns().len() >= 6,
rb.num_columns() >= 6,
SystemCatalogSnafu {
msg: format!("Length mismatch: {}", rb.df_recordbatch.columns().len())
msg: format!("Length mismatch: {}", rb.num_columns())
}
);

let entry_type = UInt8Vector::try_from_arrow_array(&rb.df_recordbatch.columns()[0])
.with_context(|_| SystemCatalogTypeMismatchSnafu {
data_type: rb.df_recordbatch.columns()[ENTRY_TYPE_INDEX]
.data_type()
.clone(),
let entry_type = rb
.column(ENTRY_TYPE_INDEX)
.as_any()
.downcast_ref::<UInt8Vector>()
.with_context(|| SystemCatalogTypeMismatchSnafu {
data_type: rb.column(ENTRY_TYPE_INDEX).data_type(),
})?;

let key = BinaryVector::try_from_arrow_array(&rb.df_recordbatch.columns()[1])
.with_context(|_| SystemCatalogTypeMismatchSnafu {
data_type: rb.df_recordbatch.columns()[KEY_INDEX].data_type().clone(),
let key = rb
.column(KEY_INDEX)
.as_any()
.downcast_ref::<BinaryVector>()
.with_context(|| SystemCatalogTypeMismatchSnafu {
data_type: rb.column(KEY_INDEX).data_type(),
})?;

let value = BinaryVector::try_from_arrow_array(&rb.df_recordbatch.columns()[3])
.with_context(|_| SystemCatalogTypeMismatchSnafu {
data_type: rb.df_recordbatch.columns()[VALUE_INDEX].data_type().clone(),
let value = rb
.column(VALUE_INDEX)
.as_any()
.downcast_ref::<BinaryVector>()
.with_context(|| SystemCatalogTypeMismatchSnafu {
data_type: rb.column(VALUE_INDEX).data_type(),
})?;

let mut res = Vec::with_capacity(rb.num_rows());
Expand Down
Loading