Skip to content

Commit

Permalink
feat: replace arrow2 with official implementation 🎉 (GreptimeTeam#753)
Browse files Browse the repository at this point in the history
* chore: kick off. change datafusion/arrow/parquet to target version

Signed-off-by: Ruihang Xia <[email protected]>

* chore: replace one last datafusion dep

Signed-off-by: Ruihang Xia <[email protected]>

* feat: arrow_array switch to arrow

* chore: update dep of binary vector

* chore: fix wrong merge commit

Signed-off-by: Ruihang Xia <[email protected]>

* feat: Switch to datatypes2

* feat: Make recordbatch compile

* chore: sort Cargo.toml

* feat: Fix common::recordbatch compiler errors

* feat: Fix recordbatch test compiling issue

* fix: api crate (GreptimeTeam#708)

* fix: rename ConcreteDataType::timestamp_millis_type to ConcreteDataType::timestamp_millisecond_type. fix other warnings regarding timestamp

* fix: revert changes in datatypes2

* fix: helper

* chore: delete datatypes based on arrow2

* feat: Fix some compiler errors in common::query (GreptimeTeam#710)

* feat: Fix some compiler errors in common::query

* feat: test_collect use vectors api

* fix: common-query subcrate (GreptimeTeam#712)

* fix: record batch adapter

Signed-off-by: Ruihang Xia <[email protected]>

* fix error enum

Signed-off-by: Ruihang Xia <[email protected]>

Signed-off-by: Ruihang Xia <[email protected]>

* fix: Fix common::query compiler errors (GreptimeTeam#713)

* feat: Move conversion to ScalarValue to value.rs

* fix: Fix common::query compiler errors

This commit also make InnerError pub(crate)

* feat: Implements diff accumulator using WrapperType (GreptimeTeam#715)

* feat: Remove usage of opaque error from common::recordbatch

* feat: Remove opaque error from common::query

* feat: Fix diff compiler errors

Now common_function just use common_query's Error and Result. Adds
a LargestType associated type to LogicalPrimitiveType to get the largest
type a logical primitive type can cast to.

* feat: Remove LargestType from NativeType trait

* chore: Update comments

* feat: Restrict Scalar::RefType of WrapperType to itself

Add trait bound `for<'a> Scalar<RefType<'a> = Self>` to WrapperType

* chore: Address CR comments

* chore: Format codes

* fix: fix compile error for mean/polyval/pow/interp ops

Signed-off-by: Ruihang Xia <[email protected]>

* Revert "fix: fix compile error for mean/polyval/pow/interp ops"

This reverts commit fb0b4eb.

* fix: Fix compiler errors in argmax/rate/median/norm_cdf (GreptimeTeam#716)

* fix: Fix compiler errors in argmax/rate/median/norm_cdf

* chore: Address CR comments

* fix: fix compile error for mean/polyval/pow/interp ops (GreptimeTeam#717)

* fix: fix compile error for mean/polyval/pow/interp ops

Signed-off-by: Ruihang Xia <[email protected]>

* simplify type bounds

Signed-off-by: Ruihang Xia <[email protected]>

Signed-off-by: Ruihang Xia <[email protected]>

* fix: fix argmin/percentile/clip/interp/scipy_stats_norm_pdf errors (GreptimeTeam#718)

fix: fix argmin/percentile/clip/interp/scipy_stats_norm_pdf compiler errors

* fix: fix other compile error in common-function (GreptimeTeam#719)

* further fixing

Signed-off-by: Ruihang Xia <[email protected]>

* fix all compile errors in common function

Signed-off-by: Ruihang Xia <[email protected]>

Signed-off-by: Ruihang Xia <[email protected]>

* fix: Fix tests and clippy for common-function subcrate (GreptimeTeam#726)

* further fixing

Signed-off-by: Ruihang Xia <[email protected]>

* fix all compile errors in common function

Signed-off-by: Ruihang Xia <[email protected]>

* fix tests

Signed-off-by: Ruihang Xia <[email protected]>

* fix clippy

Signed-off-by: Ruihang Xia <[email protected]>

* revert test changes

Signed-off-by: Ruihang Xia <[email protected]>

Signed-off-by: Ruihang Xia <[email protected]>

* fix: row group pruning (GreptimeTeam#725)

* fix: row group pruning

* chore: use macro to simplify stats implemetation

* fxi: CR comments

* fix: row group metadata length mismatch

* fix: simplify code

* fix: Fix common::grpc compiler errors (GreptimeTeam#722)

* fix: Fix common::grpc compiler errors

This commit refactors RecordBatch and holds vectors in the RecordBatch
struct, so we don't need to cast the array to vector when doing
serialization or iterating the batch.

Now we use the vector API instead of the arrow API in grpc crate.

* chore: Address CR comments

* fix common record batch

Signed-off-by: Ruihang Xia <[email protected]>

* fix: Fix compile error in server subcrate (GreptimeTeam#727)

* fix: Fix compile error in server subcrate

Signed-off-by: Ruihang Xia <[email protected]>

* remove unused type alias

Signed-off-by: Ruihang Xia <[email protected]>

* explicitly panic

Signed-off-by: Ruihang Xia <[email protected]>

* Update src/storage/src/sst/parquet.rs

Co-authored-by: Yingwen <[email protected]>

Signed-off-by: Ruihang Xia <[email protected]>
Co-authored-by: Yingwen <[email protected]>

* fix: Fix common grpc expr (GreptimeTeam#730)

* fix compile errors

Signed-off-by: Ruihang Xia <[email protected]>

* rename fn names

Signed-off-by: Ruihang Xia <[email protected]>

* fix styles

Signed-off-by: Ruihang Xia <[email protected]>

* fix wranings in common-time

Signed-off-by: Ruihang Xia <[email protected]>

Signed-off-by: Ruihang Xia <[email protected]>

* fix: pre-cast to avoid tremendous match arms (GreptimeTeam#734)

Signed-off-by: Ruihang Xia <[email protected]>

Signed-off-by: Ruihang Xia <[email protected]>

* feat: upgrade storage crate to arrow and parquet offcial impl (GreptimeTeam#738)

* fix: compile erros

* fix: parquet reader and writer

* fix: parquet reader and writer

* fix: WriteBatch IPC encode/decode

* fix: clippy errors in storage subcrate

* chore: remove suspicious unwrap

* fix: some cr comments

* fix: CR comments

* fix: CR comments

* fix: Fix compiler errors in catalog and mito crates (GreptimeTeam#742)

* fix: Fix compiler errors in mito

* fix: Fix compiler errors in catalog crate

* style: Fix clippy

* chore: Fix use

* Merge pull request GreptimeTeam#745

* fix nyc-taxi and util

* Merge branch 'replace-arrow2' into fix-others

* fix substrait

* fix warnings and error in test

* fix: Fix imports in optimizer.rs

* fix: errors in optimzer

* fix: remove unwrap

* fix: Fix compiler errors in query crate (GreptimeTeam#746)

* fix: Fix compiler errors in state.rs

* fix: fix compiler errors in state

* feat: upgrade sqlparser to 0.26

* fix: fix datafusion engine compiler errors

* fix: Fix some tests in query crate

* fix: Fix all warnings in tests

* feat: Remove `Type` from timestamp's type name

* fix: fix query tests

Now datafusion already supports median, so this commit also remove the
median function

* style: Fix clippy

* feat: Remove RecordBatch::pretty_print

* chore: Address CR comments

* Update src/query/src/query_engine/state.rs

Co-authored-by: Ruihang Xia <[email protected]>

* fix: frontend compile errors (GreptimeTeam#747)

fix: fix compile errors in frontend

* fix: Fix compiler errors in script crate (GreptimeTeam#749)

* fix: Fix compiler errors in state.rs

* fix: fix compiler errors in state

* feat: upgrade sqlparser to 0.26

* fix: fix datafusion engine compiler errors

* fix: Fix some tests in query crate

* fix: Fix all warnings in tests

* feat: Remove `Type` from timestamp's type name

* fix: fix query tests

Now datafusion already supports median, so this commit also remove the
median function

* style: Fix clippy

* feat: Remove RecordBatch::pretty_print

* chore: Address CR comments

* feat: Add column_by_name to RecordBatch

* feat: modify select_from_rb

* feat: Fix some compiler errors in vector.rs

* feat: Fix more compiler errors in vector.rs

* fix: fix table.rs

Signed-off-by: Ruihang Xia <[email protected]>

* fix: Fix compiler errors in coprocessor

* fix: Fix some compiler errors

* fix: Fix compiler errors in script

* chore: Remove unused imports and format code

* test: disable interval tests

* test: Fix test_compile_execute test

* style: Fix clippy

* feat: Support interval

* feat: Add RecordBatch::columns and fix clippy

Signed-off-by: Ruihang Xia <[email protected]>
Co-authored-by: Ruihang Xia <[email protected]>

* fix: Fix All The Tests! (GreptimeTeam#752)

* fix: Fix several tests compile errors

Signed-off-by: Ruihang Xia <[email protected]>

* fix: some compile errors in tests

Signed-off-by: Ruihang Xia <[email protected]>

* fix: compile errors in frontend tests

* fix: compile errors in frontend tests

* test: Fix tests in api and common-query

* test: Fix test in sql crate

* fix: resolve substrait error

Signed-off-by: Ruihang Xia <[email protected]>

* chore: add more test

* test: Fix tests in servers

* fix instance_test

Signed-off-by: Ruihang Xia <[email protected]>

* test: Fix tests in tests-integration

Signed-off-by: Ruihang Xia <[email protected]>
Co-authored-by: Lei, HUANG <[email protected]>
Co-authored-by: evenyag <[email protected]>

* fix: clippy errors

Signed-off-by: Ruihang Xia <[email protected]>
Co-authored-by: Ruihang Xia <[email protected]>
Co-authored-by: evenyag <[email protected]>
  • Loading branch information
3 people authored and paomian committed Oct 19, 2023
1 parent 5248251 commit cfffa68
Show file tree
Hide file tree
Showing 272 changed files with 6,723 additions and 17,626 deletions.
1,536 changes: 828 additions & 708 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ members = [
"src/common/time",
"src/datanode",
"src/datatypes",
"src/datatypes2",
"src/frontend",
"src/log-store",
"src/meta-client",
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ edition = "2021"
license = "Apache-2.0"

[dependencies]
arrow = "10"
arrow = "26.0.0"
clap = { version = "4.0", features = ["derive"] }
client = { path = "../src/client" }
indicatif = "0.17.1"
itertools = "0.10.5"
parquet = { version = "*" }
parquet = "26.0.0"
tokio = { version = "1.21", features = ["full"] }
22 changes: 12 additions & 10 deletions benchmarks/src/bin/nyc-taxi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Instant;

use arrow::array::{ArrayRef, PrimitiveArray, StringArray, TimestampNanosecondArray};
Expand All @@ -32,9 +31,7 @@ use client::api::v1::column::Values;
use client::api::v1::{Column, ColumnDataType, ColumnDef, CreateExpr, InsertExpr};
use client::{Client, Database, Select};
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use parquet::arrow::{ArrowReader, ParquetFileArrowReader};
use parquet::file::reader::FileReader;
use parquet::file::serialized_reader::SerializedFileReader;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use tokio::task::JoinSet;

const DATABASE_NAME: &str = "greptime";
Expand Down Expand Up @@ -86,10 +83,14 @@ async fn write_data(
pb_style: ProgressStyle,
) -> u128 {
let file = std::fs::File::open(&path).unwrap();
let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
let row_num = file_reader.metadata().file_metadata().num_rows();
let record_batch_reader = ParquetFileArrowReader::new(file_reader)
.get_record_reader(batch_size)
let record_batch_reader_builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
let row_num = record_batch_reader_builder
.metadata()
.file_metadata()
.num_rows();
let record_batch_reader = record_batch_reader_builder
.with_batch_size(batch_size)
.build()
.unwrap();
let progress_bar = mpb.add(ProgressBar::new(row_num as _));
progress_bar.set_style(pb_style);
Expand Down Expand Up @@ -210,9 +211,10 @@ fn build_values(column: &ArrayRef) -> Values {
| DataType::FixedSizeList(_, _)
| DataType::LargeList(_)
| DataType::Struct(_)
| DataType::Union(_, _)
| DataType::Union(_, _, _)
| DataType::Dictionary(_, _)
| DataType::Decimal(_, _)
| DataType::Decimal128(_, _)
| DataType::Decimal256(_, _)
| DataType::Map(_, _) => todo!(),
}
}
Expand Down
10 changes: 8 additions & 2 deletions src/api/greptime/v1/column.proto
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ message Column {

repeated int32 date_values = 14;
repeated int64 datetime_values = 15;
repeated int64 ts_millis_values = 16;
repeated int64 ts_second_values = 16;
repeated int64 ts_millisecond_values = 17;
repeated int64 ts_microsecond_values = 18;
repeated int64 ts_nanosecond_values = 19;
}
// The array of non-null values in this column.
//
Expand Down Expand Up @@ -75,5 +78,8 @@ enum ColumnDataType {
STRING = 12;
DATE = 13;
DATETIME = 14;
TIMESTAMP = 15;
TIMESTAMP_SECOND = 15;
TIMESTAMP_MILLISECOND = 16;
TIMESTAMP_MICROSECOND = 17;
TIMESTAMP_NANOSECOND = 18;
}
104 changes: 89 additions & 15 deletions src/api/src/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use common_base::BitVec;
use common_time::timestamp::TimeUnit;
use datatypes::prelude::ConcreteDataType;
use datatypes::types::TimestampType;
use datatypes::value::Value;
use datatypes::vectors::VectorRef;
use snafu::prelude::*;
Expand Down Expand Up @@ -56,7 +57,16 @@ impl From<ColumnDataTypeWrapper> for ConcreteDataType {
ColumnDataType::String => ConcreteDataType::string_datatype(),
ColumnDataType::Date => ConcreteDataType::date_datatype(),
ColumnDataType::Datetime => ConcreteDataType::datetime_datatype(),
ColumnDataType::Timestamp => ConcreteDataType::timestamp_millis_datatype(),
ColumnDataType::TimestampSecond => ConcreteDataType::timestamp_second_datatype(),
ColumnDataType::TimestampMillisecond => {
ConcreteDataType::timestamp_millisecond_datatype()
}
ColumnDataType::TimestampMicrosecond => {
ConcreteDataType::timestamp_microsecond_datatype()
}
ColumnDataType::TimestampNanosecond => {
ConcreteDataType::timestamp_nanosecond_datatype()
}
}
}
}
Expand All @@ -81,7 +91,12 @@ impl TryFrom<ConcreteDataType> for ColumnDataTypeWrapper {
ConcreteDataType::String(_) => ColumnDataType::String,
ConcreteDataType::Date(_) => ColumnDataType::Date,
ConcreteDataType::DateTime(_) => ColumnDataType::Datetime,
ConcreteDataType::Timestamp(_) => ColumnDataType::Timestamp,
ConcreteDataType::Timestamp(unit) => match unit {
TimestampType::Second(_) => ColumnDataType::TimestampSecond,
TimestampType::Millisecond(_) => ColumnDataType::TimestampMillisecond,
TimestampType::Microsecond(_) => ColumnDataType::TimestampMicrosecond,
TimestampType::Nanosecond(_) => ColumnDataType::TimestampNanosecond,
},
ConcreteDataType::Null(_) | ConcreteDataType::List(_) => {
return error::IntoColumnDataTypeSnafu { from: datatype }.fail()
}
Expand Down Expand Up @@ -153,8 +168,20 @@ impl Values {
datetime_values: Vec::with_capacity(capacity),
..Default::default()
},
ColumnDataType::Timestamp => Values {
ts_millis_values: Vec::with_capacity(capacity),
ColumnDataType::TimestampSecond => Values {
ts_second_values: Vec::with_capacity(capacity),
..Default::default()
},
ColumnDataType::TimestampMillisecond => Values {
ts_millisecond_values: Vec::with_capacity(capacity),
..Default::default()
},
ColumnDataType::TimestampMicrosecond => Values {
ts_microsecond_values: Vec::with_capacity(capacity),
..Default::default()
},
ColumnDataType::TimestampNanosecond => Values {
ts_nanosecond_values: Vec::with_capacity(capacity),
..Default::default()
},
}
Expand Down Expand Up @@ -187,9 +214,12 @@ impl Column {
Value::Binary(val) => values.binary_values.push(val.to_vec()),
Value::Date(val) => values.date_values.push(val.val()),
Value::DateTime(val) => values.datetime_values.push(val.val()),
Value::Timestamp(val) => values
.ts_millis_values
.push(val.convert_to(TimeUnit::Millisecond)),
Value::Timestamp(val) => match val.unit() {
TimeUnit::Second => values.ts_second_values.push(val.value()),
TimeUnit::Millisecond => values.ts_millisecond_values.push(val.value()),
TimeUnit::Microsecond => values.ts_microsecond_values.push(val.value()),
TimeUnit::Nanosecond => values.ts_nanosecond_values.push(val.value()),
},
Value::List(_) => unreachable!(),
});
self.null_mask = null_mask.into_vec();
Expand All @@ -200,7 +230,10 @@ impl Column {
mod tests {
use std::sync::Arc;

use datatypes::vectors::BooleanVector;
use datatypes::vectors::{
BooleanVector, TimestampMicrosecondVector, TimestampMillisecondVector,
TimestampNanosecondVector, TimestampSecondVector,
};

use super::*;

Expand Down Expand Up @@ -258,8 +291,8 @@ mod tests {
let values = values.datetime_values;
assert_eq!(2, values.capacity());

let values = Values::with_capacity(ColumnDataType::Timestamp, 2);
let values = values.ts_millis_values;
let values = Values::with_capacity(ColumnDataType::TimestampMillisecond, 2);
let values = values.ts_millisecond_values;
assert_eq!(2, values.capacity());
}

Expand Down Expand Up @@ -326,8 +359,8 @@ mod tests {
ColumnDataTypeWrapper(ColumnDataType::Datetime).into()
);
assert_eq!(
ConcreteDataType::timestamp_millis_datatype(),
ColumnDataTypeWrapper(ColumnDataType::Timestamp).into()
ConcreteDataType::timestamp_millisecond_datatype(),
ColumnDataTypeWrapper(ColumnDataType::TimestampMillisecond).into()
);
}

Expand Down Expand Up @@ -394,8 +427,8 @@ mod tests {
ConcreteDataType::datetime_datatype().try_into().unwrap()
);
assert_eq!(
ColumnDataTypeWrapper(ColumnDataType::Timestamp),
ConcreteDataType::timestamp_millis_datatype()
ColumnDataTypeWrapper(ColumnDataType::TimestampMillisecond),
ConcreteDataType::timestamp_millisecond_datatype()
.try_into()
.unwrap()
);
Expand All @@ -412,7 +445,48 @@ mod tests {
assert!(result.is_err());
assert_eq!(
result.unwrap_err().to_string(),
"Failed to create column datatype from List(ListType { inner: Boolean(BooleanType) })"
"Failed to create column datatype from List(ListType { item_type: Boolean(BooleanType) })"
);
}

#[test]
fn test_column_put_timestamp_values() {
let mut column = Column {
column_name: "test".to_string(),
semantic_type: 0,
values: Some(Values {
..Default::default()
}),
null_mask: vec![],
datatype: 0,
};

let vector = Arc::new(TimestampNanosecondVector::from_vec(vec![1, 2, 3]));
column.push_vals(3, vector);
assert_eq!(
vec![1, 2, 3],
column.values.as_ref().unwrap().ts_nanosecond_values
);

let vector = Arc::new(TimestampMillisecondVector::from_vec(vec![4, 5, 6]));
column.push_vals(3, vector);
assert_eq!(
vec![4, 5, 6],
column.values.as_ref().unwrap().ts_millisecond_values
);

let vector = Arc::new(TimestampMicrosecondVector::from_vec(vec![7, 8, 9]));
column.push_vals(3, vector);
assert_eq!(
vec![7, 8, 9],
column.values.as_ref().unwrap().ts_microsecond_values
);

let vector = Arc::new(TimestampSecondVector::from_vec(vec![10, 11, 12]));
column.push_vals(3, vector);
assert_eq!(
vec![10, 11, 12],
column.values.as_ref().unwrap().ts_second_values
);
}

Expand Down
4 changes: 1 addition & 3 deletions src/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ common-recordbatch = { path = "../common/recordbatch" }
common-runtime = { path = "../common/runtime" }
common-telemetry = { path = "../common/telemetry" }
common-time = { path = "../common/time" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = [
"simd",
] }
datafusion = "14.0.0"
datatypes = { path = "../datatypes" }
futures = "0.3"
futures-util = "0.3"
Expand Down
23 changes: 9 additions & 14 deletions src/catalog/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::any::Any;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::prelude::{Snafu, StatusCode};
use datafusion::error::DataFusionError;
use datatypes::arrow;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::RawSchema;
use snafu::{Backtrace, ErrorCompat};

Expand Down Expand Up @@ -51,14 +51,12 @@ pub enum Error {
SystemCatalog { msg: String, backtrace: Backtrace },

#[snafu(display(
"System catalog table type mismatch, expected: binary, found: {:?} source: {}",
"System catalog table type mismatch, expected: binary, found: {:?}",
data_type,
source
))]
SystemCatalogTypeMismatch {
data_type: arrow::datatypes::DataType,
#[snafu(backtrace)]
source: datatypes::error::Error,
data_type: ConcreteDataType,
backtrace: Backtrace,
},

#[snafu(display("Invalid system catalog entry type: {:?}", entry_type))]
Expand Down Expand Up @@ -222,10 +220,11 @@ impl ErrorExt for Error {
| Error::ValueDeserialize { .. }
| Error::Io { .. } => StatusCode::StorageUnavailable,

Error::RegisterTable { .. } => StatusCode::Internal,
Error::RegisterTable { .. } | Error::SystemCatalogTypeMismatch { .. } => {
StatusCode::Internal
}

Error::ReadSystemCatalog { source, .. } => source.status_code(),
Error::SystemCatalogTypeMismatch { source, .. } => source.status_code(),
Error::InvalidCatalogValue { source, .. } => source.status_code(),

Error::TableExists { .. } => StatusCode::TableAlreadyExists,
Expand Down Expand Up @@ -265,7 +264,6 @@ impl From<Error> for DataFusionError {
#[cfg(test)]
mod tests {
use common_error::mock::MockError;
use datatypes::arrow::datatypes::DataType;
use snafu::GenerateImplicitData;

use super::*;
Expand Down Expand Up @@ -314,11 +312,8 @@ mod tests {
assert_eq!(
StatusCode::Internal,
Error::SystemCatalogTypeMismatch {
data_type: DataType::Boolean,
source: datatypes::error::Error::UnsupportedArrowType {
arrow_type: DataType::Boolean,
backtrace: Backtrace::generate()
}
data_type: ConcreteDataType::binary_datatype(),
backtrace: Backtrace::generate(),
}
.status_code()
);
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/src/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ impl TableGlobalKey {

/// Table global info contains necessary info for a datanode to create table regions, including
/// table id, table meta(schema...), region id allocation across datanodes.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct TableGlobalValue {
/// Id of datanode that created the global table info kv. only for debugging.
pub node_id: u64,
Expand Down
33 changes: 20 additions & 13 deletions src/catalog/src/local/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,27 +145,34 @@ impl LocalCatalogManager {
/// Convert `RecordBatch` to a vector of `Entry`.
fn record_batch_to_entry(rb: RecordBatch) -> Result<Vec<Entry>> {
ensure!(
rb.df_recordbatch.columns().len() >= 6,
rb.num_columns() >= 6,
SystemCatalogSnafu {
msg: format!("Length mismatch: {}", rb.df_recordbatch.columns().len())
msg: format!("Length mismatch: {}", rb.num_columns())
}
);

let entry_type = UInt8Vector::try_from_arrow_array(&rb.df_recordbatch.columns()[0])
.with_context(|_| SystemCatalogTypeMismatchSnafu {
data_type: rb.df_recordbatch.columns()[ENTRY_TYPE_INDEX]
.data_type()
.clone(),
let entry_type = rb
.column(ENTRY_TYPE_INDEX)
.as_any()
.downcast_ref::<UInt8Vector>()
.with_context(|| SystemCatalogTypeMismatchSnafu {
data_type: rb.column(ENTRY_TYPE_INDEX).data_type(),
})?;

let key = BinaryVector::try_from_arrow_array(&rb.df_recordbatch.columns()[1])
.with_context(|_| SystemCatalogTypeMismatchSnafu {
data_type: rb.df_recordbatch.columns()[KEY_INDEX].data_type().clone(),
let key = rb
.column(KEY_INDEX)
.as_any()
.downcast_ref::<BinaryVector>()
.with_context(|| SystemCatalogTypeMismatchSnafu {
data_type: rb.column(KEY_INDEX).data_type(),
})?;

let value = BinaryVector::try_from_arrow_array(&rb.df_recordbatch.columns()[3])
.with_context(|_| SystemCatalogTypeMismatchSnafu {
data_type: rb.df_recordbatch.columns()[VALUE_INDEX].data_type().clone(),
let value = rb
.column(VALUE_INDEX)
.as_any()
.downcast_ref::<BinaryVector>()
.with_context(|| SystemCatalogTypeMismatchSnafu {
data_type: rb.column(VALUE_INDEX).data_type(),
})?;

let mut res = Vec::with_capacity(rb.num_rows());
Expand Down
Loading

0 comments on commit cfffa68

Please sign in to comment.