Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Major performance regression in reading partitioned Parquet data on master #1363

Closed
Dandandan opened this issue Nov 26, 2021 · 16 comments · Fixed by #1366
Closed

Major performance regression in reading partitioned Parquet data on master #1363

Dandandan opened this issue Nov 26, 2021 · 16 comments · Fixed by #1366
Labels
performance Make DataFusion faster

Comments

@Dandandan
Copy link
Contributor

Dandandan commented Nov 26, 2021

Reading (partitioned) Parquet data got slower, when executing parquet data.

It seems to have started with #1010 - the commit before doesn't have the performance regression.

This is visible when running TPCH benchmarks, for example, executing query 6 became much slower.

I am not sure what the cause is - it would help to find the commit where the "slowness" was introduced.

Looking back at the earlier results I posted - it looks the main difference is that the original arrow / parquet got slower. I am not sure what the cause is.

Originally posted by @Dandandan in #68 (comment)

@Dandandan Dandandan changed the title Parquet reading got slower in newer version Parquet reading got slower on master Nov 26, 2021
@Dandandan Dandandan changed the title Parquet reading got slower on master Major performance regression in reading Parquet on master Nov 26, 2021
@Dandandan Dandandan added the performance Make DataFusion faster label Nov 26, 2021
@Dandandan
Copy link
Contributor Author

@rdettai It seems the PR #1010 introduced this regression.

Do you have any idea maybe what the cause could be?

@rdettai
Copy link
Contributor

rdettai commented Nov 26, 2021

Hi @Dandandan, sorry if its #1010 that created this regression. I think I only ran the TPCH on CSV for performance testing. Did you do your tests on the latest of master? after #1347 was merged?

@Dandandan
Copy link
Contributor Author

Dandandan commented Nov 26, 2021

@rdettai

No problem - just want to figure out what could be the reason :)!

So far I tested:

  • master - performance regression
  • 2454e46 - regression
  • d2d47d3 - fast again

after #1347 was merged

For TPCH I remember collecting stats doesn't have a big effect now, as data is very evenly distributed, and I think also doesn't take a long time to collect those.

To reproduce:

  • Create partitioned Parquet dataset (slowness seems to increase with nr of partitions? - not 100% sure yet)
  • Run some queries cargo run --release --bin tpch -- benchmark datafusion --iterations 5 --path [data] --format parquet --query 6 --batch-size 8192 -p 16

@Dandandan Dandandan changed the title Major performance regression in reading Parquet on master Major performance regression in reading partitioned Parquet data on master Nov 26, 2021
@xudong963
Copy link
Member

Off-topic: how about introducing bench results in every PR?

@rdettai
Copy link
Contributor

rdettai commented Nov 26, 2021

One possible reason might be that #1010 introduces the use of the ObjectStore: https://github.com/apache/arrow-datafusion/blob/414c826bf06fd22e0bb52edbb497791b5fe558e0/datafusion/src/physical_plan/file_format/parquet.rs#L408-L411

The abstraction requires the use of dynamic dispatch on the reader (fn sync_chunk_reader(&self,start: u64, length: usize) -> Result<Box<dyn Read + Send + Sync>>), which can indeed reduce performances if read() is called a lot. Actually, now that I'm thinking, some old memories are coming back: if I remember correctly, 2 years ago when I was first playing with the parquet reader I noticed that something like this was happening. read() was called in a way that it was often getting only 1 byte at a time.

EDIT: I tried reverting just that part back to the original std::fs::File wrapped with a SerializedFileReader and the performances remain as bad! Hypothesis invalidated! The issue is not that dynamic dispatch 😕.

@Dandandan
Copy link
Contributor Author

@rdettai It seems changing Read into BufRead everywhere seems mostly to recover the performance here. Does that make sense to you to do at this place?

@Dandandan
Copy link
Contributor Author

Hm - maybe too early. it seems that for query 6 this brings it down to the roughly the earlier performance, but for other queries it's doing even worse 🤔

@Dandandan
Copy link
Contributor Author

FTR - this is the entire diff:

diff --git a/datafusion/src/datasource/file_format/parquet.rs b/datafusion/src/datasource/file_format/parquet.rs
index 7976be791..1d493c274 100644
--- a/datafusion/src/datasource/file_format/parquet.rs
+++ b/datafusion/src/datasource/file_format/parquet.rs
@@ -18,6 +18,7 @@
 //! Parquet format abstractions
 
 use std::any::Any;
+use std::io::BufRead;
 use std::io::Read;
 use std::sync::Arc;
 
@@ -321,7 +322,7 @@ impl Length for ChunkObjectReader {
 }
 
 impl ChunkReader for ChunkObjectReader {
-    type T = Box<dyn Read + Send + Sync>;
+    type T = Box<dyn BufRead + Send + Sync>;
 
     fn get_read(&self, start: u64, length: usize) -> ParquetResult<Self::T> {
         self.0
diff --git a/datafusion/src/datasource/object_store/local.rs b/datafusion/src/datasource/object_store/local.rs
index b2a2ddfa9..5b87bee1a 100644
--- a/datafusion/src/datasource/object_store/local.rs
+++ b/datafusion/src/datasource/object_store/local.rs
@@ -18,7 +18,7 @@
 //! Object store that represents the Local File System.
 
 use std::fs::{self, File, Metadata};
-use std::io::{Read, Seek, SeekFrom};
+use std::io::{BufRead, BufReader, Read, Seek, SeekFrom};
 use std::sync::Arc;
 
 use async_trait::async_trait;
@@ -82,12 +82,15 @@ impl ObjectReader for LocalFileReader {
         &self,
         start: u64,
         length: usize,
-    ) -> Result<Box<dyn Read + Send + Sync>> {
+    ) -> Result<Box<dyn BufRead + Send + Sync>> {
         // A new file descriptor is opened for each chunk reader.
         // This okay because chunks are usually fairly large.
         let mut file = File::open(&self.file.path)?;
         file.seek(SeekFrom::Start(start))?;
-        Ok(Box::new(file.take(length as u64)))
+        
+        let file = BufReader::new(file.take(length as u64));
+        
+        Ok(Box::new(file))
     }
 
     fn length(&self) -> u64 {
diff --git a/datafusion/src/datasource/object_store/mod.rs b/datafusion/src/datasource/object_store/mod.rs
index 59e184103..97085c5df 100644
--- a/datafusion/src/datasource/object_store/mod.rs
+++ b/datafusion/src/datasource/object_store/mod.rs
@@ -21,7 +21,7 @@ pub mod local;
 
 use std::collections::HashMap;
 use std::fmt::{self, Debug};
-use std::io::Read;
+use std::io::{BufRead, Read};
 use std::pin::Pin;
 use std::sync::{Arc, RwLock};
 
@@ -48,10 +48,10 @@ pub trait ObjectReader: Send + Sync {
         &self,
         start: u64,
         length: usize,
-    ) -> Result<Box<dyn Read + Send + Sync>>;
+    ) -> Result<Box<dyn BufRead + Send + Sync>>;
 
     /// Get reader for the entire file
-    fn sync_reader(&self) -> Result<Box<dyn Read + Send + Sync>> {
+    fn sync_reader(&self) -> Result<Box<dyn BufRead + Send + Sync>> {
         self.sync_chunk_reader(0, self.length() as usize)
     }
 
diff --git a/datafusion/src/physical_plan/file_format/file_stream.rs b/datafusion/src/physical_plan/file_format/file_stream.rs
index 958b1721b..ca3a69a6e 100644
--- a/datafusion/src/physical_plan/file_format/file_stream.rs
+++ b/datafusion/src/physical_plan/file_format/file_stream.rs
@@ -32,13 +32,7 @@ use arrow::{
     record_batch::RecordBatch,
 };
 use futures::Stream;
-use std::{
-    io::Read,
-    iter,
-    pin::Pin,
-    sync::Arc,
-    task::{Context, Poll},
-};
+use std::{io::{BufRead, Read}, iter, pin::Pin, sync::Arc, task::{Context, Poll}};
 
 use super::PartitionColumnProjector;
 
@@ -48,12 +42,12 @@ pub type BatchIter = Box<dyn Iterator<Item = ArrowResult<RecordBatch>> + Send +
 /// A closure that creates a file format reader (iterator over `RecordBatch`) from a `Read` object
 /// and an optional number of required records.
 pub trait FormatReaderOpener:
-    FnMut(Box<dyn Read + Send + Sync>, &Option<usize>) -> BatchIter + Send + Unpin + 'static
+    FnMut(Box<dyn BufRead + Send + Sync>, &Option<usize>) -> BatchIter + Send + Unpin + 'static
 {
 }
 
 impl<T> FormatReaderOpener for T where
-    T: FnMut(Box<dyn Read + Send + Sync>, &Option<usize>) -> BatchIter
+    T: FnMut(Box<dyn BufRead + Send + Sync>, &Option<usize>) -> BatchIter
         + Send
         + Unpin
         + 'static

@rdettai
Copy link
Contributor

rdettai commented Nov 26, 2021

Interesting fact:
Before #1010, the read_partition method was where 99% of the time was spent:

Query read_partition took 191.6 ms
Query 6 iteration 0 took 193.1 ms

Query read_partition took 178.3 ms
Query 6 iteration 1 took 180.2 ms

Query read_partition took 179.7 ms
Query 6 iteration 2 took 181.2 ms

After #1010, this is not the case anymore, but read_partition itself is also slower:

Query read_partition took 253.3 ms
Query 6 iteration 0 took 354.6 ms

Query read_partition took 252.4 ms
Query 6 iteration 1 took 354.6 ms

Query read_partition took 258.9 ms
Query 6 iteration 2 took 357.8 ms

So everything is slowed down, both on the thread where the reading occurs and in the Tokio threads... WeEEeeIiiiIrd 😅

@Dandandan
Copy link
Contributor Author

I opened a PR here https://github.com/apache/arrow-datafusion/pull/1366/files so you can have a look too.

It seems the slowdown of query 10 might be unrelated to the reading performance.

@Dandandan
Copy link
Contributor Author

Off-topic: how about introducing bench results in every PR?

Yes - some continuous benchmarking would definitely be something valuable and could avoid regressions.

@mingmwang
Copy link
Contributor

Why the issue was closed? Was the issue resolved and performance come back ?

@Dandandan
Copy link
Contributor Author

Why the issue was closed? Was the issue resolved and performance come back ?

Yes, according to my benchmarks performance was around the same as before introducing the regression.

@mingmwang
Copy link
Contributor

mingmwang commented Dec 23, 2021

I'm not sure whether it is relevant or not. In the current implementation, the sync_chunk_reader() method was
invoked for every parquet column chunk which will cause lots of unnecessary file open and seek calls.

FilePageIterator.next() -> FileReader.get_row_group().get_column_page_reader() -> SerializedRowGroupReader.get_column_page_reader() -> ChunkObjectReader.get_read() -> LocalFileReader.sync_chunk_reader()

     fn sync_chunk_reader(
        &self,
        start: u64,
        length: usize,
    ) -> Result<Box<dyn Read + Send + Sync>> {
        // A new file descriptor is opened for each chunk reader.
        // This okay because chunks are usually fairly large.
        let mut file = File::open(&self.file.path)?;
        file.seek(SeekFrom::Start(start))?;

        let file = BufReader::new(file.take(length as u64));

        Ok(Box::new(file))
    }

TPCH Q1:

Read parquet file lineitem.parquet time spent: 590639777 ns, row group count 60, skipped row group 0
total open/seek count 421, bytes read from FS: 97028517
memory alloc size: 1649375985 memory alloc count: 499533 during parquet read.

Query 1 iteration 0 took 679.9 ms

@Dandandan
Copy link
Contributor Author

I'm not sure whether it is relevant or not. In the current implementation, the sync_chunk_reader() method was
invoked for every parquet page which will cause lots of unnecessary file open and seek calls.

FilePageIterator.next() -> FileReader.get_row_group().get_column_page_reader() -> SerializedRowGroupReader.get_column_page_reader() -> ChunkObjectReader.get_read() -> LocalFileReader.sync_chunk_reader()

     fn sync_chunk_reader(
        &self,
        start: u64,
        length: usize,
    ) -> Result<Box<dyn Read + Send + Sync>> {
        // A new file descriptor is opened for each chunk reader.
        // This okay because chunks are usually fairly large.
        let mut file = File::open(&self.file.path)?;
        file.seek(SeekFrom::Start(start))?;

        let file = BufReader::new(file.take(length as u64));

        Ok(Box::new(file))
    }

TPCH Q1:

Read parquet file lineitem.parquet time spent: 590639777 ns, row group count 60, skipped row group 0
total open/seek count 421, bytes read from FS: 97028517
memory alloc size: 1649375985 memory alloc count: 499533 during parquet read.

Query 1 iteration 0 took 679.9 ms

I think that might be relevant. How about opening a new issue to track improving on that (reusing the file descriptor)?

@mingmwang
Copy link
Contributor

Sure, I can open a new issue to track it. But please help to take a look and confirm my understanding is correct.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
performance Make DataFusion faster
Projects
None yet
4 participants