Skip to content

Commit

Permalink
Merge branch 'main' into bugfix/memory-usage
Browse files Browse the repository at this point in the history
  • Loading branch information
mildbyte authored Aug 30, 2022
2 parents 4dc61ae + 786a283 commit 207398b
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 43 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ jobs:
with:
path: ~/.cache/pre-commit
key: pre-commit-${{ runner.os }}-pre-commit-${{ hashFiles('**/.pre-commit-config.yaml') }}
- name: Install cmake
run: apt-get update && apt-get install -y cmake

# Use https://github.com/marketplace/actions/rust-cache

Expand Down
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ config = "0.13.1"
convergence = { git = "https://github.com/returnString/convergence", rev = "500ea8e1f03f63f20effb9b93cf414c1c6711515", optional = true }
convergence-arrow = { git = "https://github.com/returnString/convergence", rev = "500ea8e1f03f63f20effb9b93cf414c1c6711515", package = "convergence-arrow", optional = true }
datafusion = "10"
datafusion-proto = "10"
futures = "0.3"

# passing to DF's query planner
Expand All @@ -34,6 +35,7 @@ log = "0.4"
moka = { version = "0.9.3", default_features = false, features = ["future", "atomic64", "quanta"] }
object_store = "0.3.0"
pretty_env_logger = "0.4"
prost = "0.10"

# Needs to be in non-dev because repository::testutils can't be
# imported by tests::end_to_end if it's cfg(test).
Expand Down
70 changes: 34 additions & 36 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use datafusion::execution::DiskManager;

use datafusion::logical_plan::plan::Projection;
use datafusion::logical_plan::{CreateExternalTable, DFField, DropTable, Expr, FileType};
use datafusion_proto::protobuf;

use crate::datafusion::parser::{DFParser, Statement as DFStatement};
use crate::datafusion::utils::{
Expand All @@ -45,6 +46,7 @@ use std::iter::zip;
use std::sync::Arc;

pub use datafusion::error::{DataFusionError as Error, Result};
use datafusion::scalar::ScalarValue;
use datafusion::{
arrow::{
datatypes::{Schema, SchemaRef},
Expand All @@ -65,6 +67,9 @@ use datafusion::{
prelude::SessionContext,
sql::{planner::SqlToRel, TableReference},
};

use log::warn;
use prost::Message;
use tempfile::TempPath;

use crate::catalog::{PartitionCatalog, DEFAULT_SCHEMA, STAGING_SCHEMA};
Expand Down Expand Up @@ -145,6 +150,17 @@ async fn get_parquet_file_statistics_bytes(
Ok(stats)
}

// Serialise min/max stats in the form of a given ScalarValue using Datafusion protobufs format
pub fn scalar_value_to_bytes(value: &ScalarValue) -> Option<Vec<u8>> {
match <&ScalarValue as TryInto<protobuf::ScalarValue>>::try_into(value) {
Ok(proto) => Some(proto.encode_to_vec()),
Err(error) => {
warn!("Failed to serialise min/max value {:?}: {}", value, error);
None
}
}
}

/// Serialize data for the physical partition index from Parquet file statistics
fn build_partition_columns(
partition_stats: &Statistics,
Expand All @@ -157,19 +173,11 @@ fn build_partition_columns(
// pruning will fail, and we will default to using all partitions.
Some(column_statistics) => zip(column_statistics, schema.fields())
.map(|(stats, column)| {
// TODO: the to_string will discard the timezone for Timestamp* values, and will
// therefore hinder the ability to recreate them once needed for partition pruning.
// However, since DF stats rely on Parquet stats that problem won't come up until
// 1) Parquet starts collecting stats for Timestamp* types (`parquet::file::statistics::Statistics` enum)
// Since DF stats rely on Parquet stats we won't have stats on Timestamp* values until
// 1) Parquet starts collecting stats for them (`parquet::file::statistics::Statistics` enum)
// 2) DF pattern matches those types in `summarize_min_max`.
let min_value = stats
.min_value
.as_ref()
.map(|m| m.to_string().as_bytes().into());
let max_value = stats
.max_value
.as_ref()
.map(|m| m.to_string().as_bytes().into());
let min_value = stats.min_value.as_ref().and_then(scalar_value_to_bytes);
let max_value = stats.max_value.as_ref().and_then(scalar_value_to_bytes);

PartitionColumn {
name: Arc::from(column.name().to_string()),
Expand Down Expand Up @@ -1443,8 +1451,8 @@ mod tests {
const EXPECTED_INSERT_FILE_NAME: &str =
"1592625fb7bb063580d94fe2eaf514d55e6b44f1bebd6c7f6b2e79f55477218b.parquet";

fn to_min_max_value<T: ToString>(item: T) -> Arc<Option<Vec<u8>>> {
Arc::from(Some(item.to_string().as_bytes().to_vec()))
fn to_min_max_value(value: ScalarValue) -> Arc<Option<Vec<u8>>> {
Arc::from(scalar_value_to_bytes(&value))
}

async fn assert_uploaded_objects(
Expand Down Expand Up @@ -1550,8 +1558,8 @@ mod tests {
"{\"name\":\"int\",\"bitWidth\":64,\"isSigned\":true}"
.to_string()
),
min_value: to_min_max_value(12),
max_value: to_min_max_value(42),
min_value: to_min_max_value(ScalarValue::Int64(Some(12))),
max_value: to_min_max_value(ScalarValue::Int64(Some(42))),
null_count: Some(0),
},
PartitionColumn {
Expand Down Expand Up @@ -1580,8 +1588,8 @@ mod tests {
"{\"name\":\"int\",\"bitWidth\":64,\"isSigned\":true}"
.to_string()
),
min_value: to_min_max_value(22),
max_value: to_min_max_value(32),
min_value: to_min_max_value(ScalarValue::Int64(Some(22))),
max_value: to_min_max_value(ScalarValue::Int64(Some(32))),
null_count: Some(0),
},
PartitionColumn {
Expand Down Expand Up @@ -1717,12 +1725,12 @@ mod tests {
r#type: Arc::from(
r#"{"name":"int","bitWidth":32,"isSigned":true}"#
),
min_value: to_min_max_value(
output_partitions[i].iter().min().unwrap()
),
max_value: to_min_max_value(
output_partitions[i].iter().max().unwrap()
),
min_value: to_min_max_value(ScalarValue::Int32(
output_partitions[i].iter().min().copied()
)),
max_value: to_min_max_value(ScalarValue::Int32(
output_partitions[i].iter().max().copied()
)),
null_count: Some(0),
}])
},
Expand Down Expand Up @@ -1854,18 +1862,8 @@ mod tests {
PartitionColumn {
name: Arc::from("value"),
r#type: Arc::from("{\"name\":\"floatingpoint\",\"precision\":\"DOUBLE\"}"),
min_value: Arc::new(Some(
vec![
52,
50,
],
)),
max_value: Arc::new(Some(
vec![
52,
50,
],
)),
min_value: Arc::new(scalar_value_to_bytes(&ScalarValue::Float64(Some(42.0)))),
max_value: Arc::new(scalar_value_to_bytes(&ScalarValue::Float64(Some(42.0)))),
null_count: Some(0),
},
],)
Expand Down
36 changes: 29 additions & 7 deletions src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ use datafusion::{
SendableRecordBatchStream, Statistics,
},
};
use datafusion_proto::protobuf;

use futures::future;
use log::warn;
use prost::Message;

use object_store::path::Path;

Expand Down Expand Up @@ -294,6 +296,7 @@ impl SeafowlPruningStatistics {
/// use arrow::datatypes::DataType;
/// use datafusion::scalar::ScalarValue;
/// use seafowl::provider::SeafowlPruningStatistics;
/// use seafowl::context::scalar_value_to_bytes;
///
/// fn parse(value: &Arc<Option<Vec<u8>>>, dt: &DataType) -> ScalarValue {
/// SeafowlPruningStatistics::parse_bytes_value(&value, &dt).unwrap()
Expand All @@ -305,20 +308,34 @@ impl SeafowlPruningStatistics {
/// assert_eq!(parse(&val, &DataType::Boolean), ScalarValue::Boolean(None));
///
/// // Parse some actual value
/// let val = Arc::from(Some(42.to_string().as_bytes().to_vec()));
/// let val = Arc::from(scalar_value_to_bytes(&ScalarValue::Int32(Some(42))));
/// assert_eq!(parse(&val, &DataType::Int32), ScalarValue::Int32(Some(42)));
///
/// let val = Arc::from(scalar_value_to_bytes(&ScalarValue::Float32(Some(42.0))));
/// assert_eq!(parse(&val, &DataType::Float32), ScalarValue::Float32(Some(42.0)));
///
/// let val = Arc::from(scalar_value_to_bytes(&ScalarValue::Utf8(Some("42".to_string()))));
/// assert_eq!(parse(&val, &DataType::Utf8), ScalarValue::Utf8(Some("42".to_string())));
/// ```
pub fn parse_bytes_value(
bytes_value: &Arc<Option<Vec<u8>>>,
data_type: &DataType,
) -> Result<ScalarValue> {
match bytes_value.as_ref() {
Some(bytes) => match String::from_utf8(bytes.clone()) {
Ok(string_val) => ScalarValue::try_from_string(string_val, data_type),
Some(bytes) => match protobuf::ScalarValue::decode(bytes.as_slice()) {
Ok(proto) => {
match <&protobuf::ScalarValue as TryInto<ScalarValue>>::try_into(
&proto,
) {
Ok(value) => Ok(value),
Err(error) => Err(DataFusionError::Internal(format!(
"Failed to deserialize min/max value: {}",
error
))),
}
}
Err(error) => Err(DataFusionError::Internal(format!(
"Failed to parse min/max value: {}",
"Failed to decode min/max value: {}",
error
))),
},
Expand Down Expand Up @@ -478,6 +495,7 @@ mod tests {
use arrow::array::StringArray;
use arrow::datatypes::{DataType, Field, Schema};
use bytes::{BufMut, Bytes, BytesMut};
use datafusion::common::ScalarValue;
use datafusion::logical_expr::{col, lit, or, Expr};
use datafusion::{
arrow::{
Expand All @@ -497,7 +515,7 @@ mod tests {
use crate::provider::{PartitionColumn, SeafowlPruningStatistics};
use crate::{
catalog::MockPartitionCatalog,
context::INTERNAL_OBJECT_STORE_SCHEME,
context::{scalar_value_to_bytes, INTERNAL_OBJECT_STORE_SCHEME},
provider::{SeafowlPartition, SeafowlTable},
schema,
};
Expand Down Expand Up @@ -665,8 +683,12 @@ mod tests {
// Create some fake partitions
let mut partitions = vec![];
for (ind, (min, max, null_count)) in part_stats.iter().enumerate() {
let min_value = Arc::from(min.map(|v| v.to_string().as_bytes().to_vec()));
let max_value = Arc::from(max.map(|v| v.to_string().as_bytes().to_vec()));
let min_value = Arc::from(
min.and_then(|v| scalar_value_to_bytes(&ScalarValue::Int32(Some(v)))),
);
let max_value = Arc::from(
max.and_then(|v| scalar_value_to_bytes(&ScalarValue::Int32(Some(v)))),
);

partitions.push(SeafowlPartition {
object_storage_id: Arc::from(format!("par{}.parquet", ind)),
Expand Down

0 comments on commit 207398b

Please sign in to comment.