Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spill): Align with the multi IO compression codec in spill #657

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions native-engine/blaze-jni-bridge/src/conf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ define_conf!(IntConf, PARTIAL_AGG_SKIPPING_MIN_ROWS);
define_conf!(BooleanConf, PARQUET_ENABLE_PAGE_FILTERING);
define_conf!(BooleanConf, PARQUET_ENABLE_BLOOM_FILTER);
define_conf!(StringConf, SPARK_IO_COMPRESSION_CODEC);
define_conf!(StringConf, SPILL_COMPRESSION_CODEC);

pub trait BooleanConf {
fn key(&self) -> &'static str;
Expand Down
3 changes: 2 additions & 1 deletion native-engine/datafusion-ext-plans/src/agg/agg_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,10 +508,10 @@ impl HashingData {
// next bucket
begin = end;
}

// EOF
write_len(NUM_SPILL_BUCKETS, &mut writer)?;
write_len(0, &mut writer)?;
writer.flush()?;
Ok(())
}
}
Expand Down Expand Up @@ -621,6 +621,7 @@ impl MergingData {
// EOF
write_len(NUM_SPILL_BUCKETS, &mut writer)?;
write_len(0, &mut writer)?;
writer.flush()?;
Ok(())
}
}
Expand Down
18 changes: 11 additions & 7 deletions native-engine/datafusion-ext-plans/src/common/ipc_compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,25 +168,25 @@ impl<R: Read> IpcCompressionReader<R> {
}
}

enum IoCompressionWriter<W: Write> {
pub enum IoCompressionWriter<W: Write> {
LZ4(lz4_flex::frame::FrameEncoder<W>),
ZSTD(zstd::Encoder<'static, W>),
}

impl<W: Write> IoCompressionWriter<W> {
fn new_with_configured_codec(inner: W) -> Self {
pub fn new_with_configured_codec(inner: W) -> Self {
Self::try_new(io_compression_codec(), inner).expect("error creating compression encoder")
}

fn try_new(codec: &str, inner: W) -> Result<Self> {
pub fn try_new(codec: &str, inner: W) -> Result<Self> {
match codec {
"lz4" => Ok(Self::LZ4(lz4_flex::frame::FrameEncoder::new(inner))),
"zstd" => Ok(Self::ZSTD(zstd::Encoder::new(inner, ZSTD_LEVEL)?)),
_ => df_execution_err!("unsupported codec: {}", codec),
}
}

fn finish(&mut self) -> Result<()> {
pub fn finish(&mut self) -> Result<()> {
match self {
IoCompressionWriter::LZ4(w) => {
w.try_finish()
Expand Down Expand Up @@ -216,21 +216,25 @@ impl<W: Write> Write for IoCompressionWriter<W> {
}
}

enum IoCompressionReader<R: Read> {
pub enum IoCompressionReader<R: Read> {
LZ4(lz4_flex::frame::FrameDecoder<R>),
ZSTD(zstd::Decoder<'static, BufReader<R>>),
}

impl<R: Read> IoCompressionReader<R> {
fn try_new(codec: &str, inner: R) -> Result<Self> {
pub fn new_with_configured_codec(inner: R) -> Self {
Self::try_new(io_compression_codec(), inner).expect("error creating compression encoder")
}

pub fn try_new(codec: &str, inner: R) -> Result<Self> {
match codec {
"lz4" => Ok(Self::LZ4(lz4_flex::frame::FrameDecoder::new(inner))),
"zstd" => Ok(Self::ZSTD(zstd::Decoder::new(inner)?)),
_ => df_execution_err!("unsupported codec: {}", codec),
}
}

fn finish_into_inner(self) -> Result<R> {
pub fn finish_into_inner(self) -> Result<R> {
match self {
Self::LZ4(r) => Ok(r.into_inner()),
Self::ZSTD(r) => Ok(r.finish().into_inner()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,12 @@ impl<const P: JoinerParams> FullJoiner<P> {
Ok(probed_key_columns)
}

async fn flush(&self, probe_cols: Vec<ArrayRef>, build_cols: Vec<ArrayRef>, num_rows: usize) -> Result<()> {
async fn flush(
&self,
probe_cols: Vec<ArrayRef>,
build_cols: Vec<ArrayRef>,
num_rows: usize,
) -> Result<()> {
let output_batch = RecordBatch::try_new_with_options(
self.join_params.output_schema.clone(),
match P.probe_side {
Expand Down
36 changes: 27 additions & 9 deletions native-engine/datafusion-ext-plans/src/memmgr/spill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,21 @@ use std::{
};

use blaze_jni_bridge::{
is_jni_bridge_inited, jni_bridge::LocalRef, jni_call, jni_call_static, jni_get_string,
jni_new_direct_byte_buffer, jni_new_global_ref,
conf, conf::StringConf, is_jni_bridge_inited, jni_bridge::LocalRef, jni_call, jni_call_static,
jni_get_string, jni_new_direct_byte_buffer, jni_new_global_ref,
};
use datafusion::{common::Result, parquet::file::reader::Length, physical_plan::metrics::Time};
use jni::{objects::GlobalRef, sys::jlong};
use log::warn;
use once_cell::sync::OnceCell;

use crate::memmgr::metrics::SpillMetrics;
use crate::{
common::ipc_compression::{IoCompressionReader, IoCompressionWriter},
memmgr::metrics::SpillMetrics,
};

pub type SpillCompressedReader<'a> =
lz4_flex::frame::FrameDecoder<BufReader<Box<dyn Read + Send + 'a>>>;
pub type SpillCompressedWriter<'a> =
lz4_flex::frame::AutoFinishEncoder<BufWriter<Box<dyn Write + Send + 'a>>>;
pub type SpillCompressedReader<'a> = IoCompressionReader<BufReader<Box<dyn Read + Send + 'a>>>;
pub type SpillCompressedWriter<'a> = IoCompressionWriter<BufWriter<Box<dyn Write + Send + 'a>>>;

pub trait Spill: Send + Sync {
fn as_any(&self) -> &dyn Any;
Expand All @@ -43,11 +45,13 @@ pub trait Spill: Send + Sync {
fn get_buf_writer<'a>(&'a mut self) -> BufWriter<Box<dyn Write + Send + 'a>>;

fn get_compressed_reader(&self) -> SpillCompressedReader<'_> {
lz4_flex::frame::FrameDecoder::new(self.get_buf_reader())
IoCompressionReader::try_new(spill_compression_codec(), self.get_buf_reader())
.expect("error creating compression reader")
}

fn get_compressed_writer(&mut self) -> SpillCompressedWriter<'_> {
lz4_flex::frame::FrameEncoder::new(self.get_buf_writer()).auto_finish()
IoCompressionWriter::try_new(spill_compression_codec(), self.get_buf_writer())
.expect("error creating compression writer")
}
}

Expand All @@ -69,6 +73,20 @@ impl Spill for Vec<u8> {
}
}

fn spill_compression_codec() -> &'static str {
static CODEC: OnceCell<String> = OnceCell::new();
CODEC
.get_or_try_init(|| {
if is_jni_bridge_inited() {
conf::SPILL_COMPRESSION_CODEC.value()
} else {
Ok(format!("lz4")) // for testing
}
})
.expect("error reading spark.blaze.spill.compression.codec")
.as_str()
}

pub fn try_new_spill(spill_metrics: &SpillMetrics) -> Result<Box<dyn Spill>> {
if !is_jni_bridge_inited() || jni_call_static!(JniBridge.isDriverSide() -> bool)? {
Ok(Box::new(FileSpill::try_new(spill_metrics)?))
Expand Down
2 changes: 2 additions & 0 deletions native-engine/datafusion-ext-plans/src/sort_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ impl BufferedData {
write_one_batch(batch.num_rows(), batch.columns(), &mut writer)?;
writer.write_all(&key_collector.store)?;
}
writer.flush()?;
Ok(())
}

Expand Down Expand Up @@ -946,6 +947,7 @@ fn merge_spills(
)?;
output_writer.write_all(&key_collector.store)?;
}
output_writer.flush()?;
drop(output_writer);
Ok(output_spill)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,10 @@ public enum BlazeConf {
SPARK_IO_COMPRESSION_CODEC("spark.io.compression.codec", "lz4"),

// replace all sort-merge join to shuffled-hash join, only used for benchmarking
FORCE_SHUFFLED_HASH_JOIN("spark.blaze.forceShuffledHashJoin", false);
FORCE_SHUFFLED_HASH_JOIN("spark.blaze.forceShuffledHashJoin", false),

// spark spill compression codec
SPILL_COMPRESSION_CODEC("spark.blaze.spill.compression.codec", "lz4");

public final String key;
private final Object defaultValue;
Expand Down