Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C++][Parquet] Slow column reading from multi-column parquet files #38149

Open
marcin-krystianc opened this issue Oct 9, 2023 · 9 comments
Open

Comments

@marcin-krystianc
Copy link

Describe the bug, including details regarding any error messages, version, and platform.

Describe the bug, including details regarding any error messages, version, and platform.

Hi,

this is related to #38087 but it covers a different problem.
Similar to the previous issue, in our use case, we read some columns (e.g. 100) from a parquet file containing many more columns (e.g. 20k).

The problem is that the more columns are in the file to more time is needed to read a particular column (The repro code: https://github.com/marcin-krystianc/arrow_issue_2023-10-06).

In the graph below(Produced with https://github.com/marcin-krystianc/arrow_issue_2023-10-06/blob/master/plot_results.py), we can clearly see that when we read 100 columns from a parquet file (the orange line), the more columns are in the file the longer it takes to read a single column.
However, when we read the entire file (all columns), then the time to read a single column doesn't depend too much on the number of columns in the file. There is still some correlation but it is much weaker than before.
Screenshot 2023-10-06 143748
Screenshot 2023-10-06 151957

Both Python and C++ exhibit the same problem, but it is not a surprise since Python delegates the Parquet file reading to C++ anyway.

According to my analysis, there is a simple explanation for the reported problem. Namely, when we create a FileReader class, it reads and parses the entire metadata section from the file. Since the metadata section contains information about all columns, it means a lot of that metadata reading and parsing is wasted work in case we read only a tiny fraction of columns from the file.

Python code:
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq

import time
import polars as pl
import csv
import gc

t_write = []
t_read_100_pre_buffer = []

path = "/tmp/test_wide.parquet"

columns_list = [
                100, 200, 300, 400, 500, 600, 700, 800, 900,              
                1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000,
                10_000, 20_000, 30_000, 40_000, 50_000, 
                ]

chunks_list = [1000, 10_000]
rows_lsit = [5000]
with open('results_python.csv', 'w', encoding='UTF8', newline='') as f:

    writer = csv.writer(f)
    # write the header
    writer.writerow(['columns','rows','chunk_size','writing(μs)','reading_all(μs)','reading_100(μs)'])

    for chunk_size in chunks_list:
        for rows in rows_lsit:
            for columns in columns_list:
      
                table = pl.DataFrame(
                    data=np.random.randn(rows, columns),
                    schema=[f"c{i}" for i in range(columns)]).to_arrow()

                t = time.time()
                pq.write_table(table, path, row_group_size=chunk_size, use_dictionary=False, write_statistics=False)
                t_writing = time.time() - t
                t_write.append(t_writing)

                del table
                gc.collect()

                t_read = []
                t_read_100 = []

                for i in range(0, 3):

                    t = time.time()
                    res = pq.read_table(path, use_threads=False)
                    t_read.append(time.time() - t)
        
                    del res 
                    gc.collect()

                    t = time.time()               
                    res_100 = pq.read_table(path, columns=[f"c{i}" for i in range(100)], use_threads=False)
                    t_read_100.append(time.time() - t)    
                
                    del res_100
                    gc.collect()

                t_reading = min(t_read)
                t_reading_100 = min(t_read_100)

                data = [columns, rows, chunk_size, t_writing * 1_000_000, t_reading * 1_000_000, t_reading_100 * 1_000_000]
                writer.writerow(data)
                print(str(data))
C++ code:
#include "arrow/api.h"
#include "arrow/io/api.h"
#include "arrow/result.h"
#include "arrow/util/type_fwd.h"
#include "parquet/arrow/reader.h"
#include "parquet/arrow/writer.h"

#include <iostream>
#include <list>
#include <chrono>
#include <random>
#include <vector>
#include <fstream>
#include <iomanip>

using arrow::Status;

namespace
{
  const char *FILE_NAME = "/tmp/my_cpp.parquet";

  std::shared_ptr<arrow::Table> GetTable(size_t nColumns, size_t nRows)
  {
    std::random_device dev;
    std::mt19937 rng(dev());
    std::uniform_real_distribution<> rand_gen(0.0, 1.0);

    std::vector<std::shared_ptr<arrow::Array>> arrays;
    std::vector<std::shared_ptr<arrow::Field>> fields;

    // For simplicity, we'll create int32 columns. You can expand this to handle other types.
    for (int i = 0; i < nColumns; i++)
    {
      arrow::DoubleBuilder builder;
      for (auto j = 0; j < nRows; j++)
      {
        if (!builder.Append(rand_gen(rng)).ok())
          throw std::runtime_error("builder.Append");
      }

      std::shared_ptr<arrow::Array> array;
      if (!builder.Finish(&array).ok())
        throw std::runtime_error("builder.Finish");

      arrays.push_back(array);
      fields.push_back(arrow::field("c_" + std::to_string(i), arrow::float64(), false));
    }

    auto table = arrow::Table::Make(arrow::schema(fields), arrays);
    return table;
  }

  Status WriteTableToParquet(size_t nColumns, size_t nRows, const std::string &filename, std::chrono::microseconds *dt, int64_t chunkSize)
  {
    auto table = GetTable(nColumns, nRows);
    auto begin = std::chrono::steady_clock::now();
    auto result = arrow::io::FileOutputStream::Open(filename);
    auto outfile = result.ValueOrDie();
    parquet::WriterProperties::Builder builder;
    auto properties = builder
                          .max_row_group_length(chunkSize)
                          ->disable_dictionary()
                          ->build();
    PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), outfile, chunkSize, properties));
    auto end = std::chrono::steady_clock::now();
    *dt = std::chrono::duration_cast<std::chrono::microseconds>(end - begin);
    return Status::OK();
  }

  Status ReadEntireTable(const std::string &filename, std::chrono::microseconds *dt)
  {
    auto begin = std::chrono::steady_clock::now();

    std::shared_ptr<arrow::io::ReadableFile> infile;
    ARROW_ASSIGN_OR_RAISE(infile, arrow::io::ReadableFile::Open(filename));
    std::unique_ptr<parquet::arrow::FileReader> reader;
    ARROW_RETURN_NOT_OK(parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader));
    std::shared_ptr<arrow::Table> parquet_table;

    // Read the table.
    PARQUET_THROW_NOT_OK(reader->ReadTable(&parquet_table));
    auto end = std::chrono::steady_clock::now();
    *dt = std::chrono::duration_cast<std::chrono::microseconds>(end - begin);
    return Status::OK();
  }

  Status ReadColumnsAsTable(const std::string &filename, std::vector<int> indicies, std::chrono::microseconds *dt)
  {
    auto begin = std::chrono::steady_clock::now();

    std::shared_ptr<arrow::io::ReadableFile> infile;
    ARROW_ASSIGN_OR_RAISE(infile, arrow::io::ReadableFile::Open(filename));
    std::unique_ptr<parquet::arrow::FileReader> reader;
    ARROW_RETURN_NOT_OK(parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader));

    // Read the table.
    std::shared_ptr<arrow::Table> parquet_table;
    PARQUET_THROW_NOT_OK(reader->ReadTable(indicies, &parquet_table));
    auto end = std::chrono::steady_clock::now();
    *dt = std::chrono::duration_cast<std::chrono::microseconds>(end - begin);
    return Status::OK();
  }

  Status RunMain(int argc, char **argv)
  {
    std::ofstream csvFile;
    csvFile.open("results_cpp.csv", std::ios_base::out); // append instead of overwrite
    csvFile << "columns, rows, chunk_size, writing(μs), reading_all(μs), reading_100(μs)" << std::endl;

    std::list<int> nColumns = {
        100, 200, 300, 400, 500, 600, 700, 800, 900,
        1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000,
        10000, 20000, 30000, 40000, 50000};

    std::list<int64_t> chunk_sizes = {1000, 100000};
    std::list<int> rows_list = {5000};

    std::vector<int> indicies(100);
    std::iota(indicies.begin(), indicies.end(), 0);

    for (auto chunk_size : chunk_sizes)
    {
      for (int nRow : rows_list)
      {
        for (int nColumn : nColumns)
        {
          std::chrono::microseconds writing_dt;
          ARROW_RETURN_NOT_OK(WriteTableToParquet(nColumn, nRow, FILE_NAME, &writing_dt, chunk_size));

          const int repeats = 3;
          std::vector<std::chrono::microseconds> reading_all_dts(repeats);
          std::vector<std::chrono::microseconds> reading_100_dts(repeats);
          for (int i = 0; i < repeats; i++)
          {
            ARROW_RETURN_NOT_OK(ReadEntireTable(FILE_NAME, &reading_all_dts[i]));
            ARROW_RETURN_NOT_OK(ReadColumnsAsTable(FILE_NAME, indicies, &reading_100_dts[i]));
          }

          auto reading_all_dt = *std::min_element(reading_all_dts.begin(), reading_all_dts.end());
          auto reading_100_dt = *std::min_element(reading_100_dts.begin(), reading_100_dts.end());

          std::cerr << "(" << nColumn << ", " << nRow << ")"
                    << ", chunk_size=" << chunk_size
                    << ", writing_dt=" << writing_dt.count() / nColumn
                    << ", reading_all_dt=" << reading_all_dt.count() / nColumn
                    << ", reading_100_dt=" << reading_100_dt.count() / 100
                    << std::endl;

          csvFile << nColumn << ","
                  << nRow << ","
                  << chunk_size << ","
                  << writing_dt.count() << ","
                  << reading_all_dt.count() << ","
                  << reading_100_dt.count()
                  << std::endl;
        }
      }
    }

    return Status::OK();
  }
}

int main(int argc, char **argv)
{
  Status st = RunMain(argc, argv);
  if (!st.ok())
  {
    std::cerr << st << std::endl;
    return 1;
  }
  return 0;
}

Component(s)

C++, Parquet

@mapleFU
Copy link
Member

mapleFU commented Oct 9, 2023

Does these tests are running on Local FileSystem or an ObjectStore?

@marcin-krystianc
Copy link
Author

Does these tests are running on Local FileSystem or an ObjectStore?

It is all test with local file system.

@mapleFU
Copy link
Member

mapleFU commented Nov 4, 2023

Sorry for late replying. Have you solved this problem?

When column grows, the metadata will grow. The metadata is thrift, and thrift need to deserialize all data.

I must admit this is hard to optimize, because:

  1. it involves the logic of handling thrift format, maybe we need to parse the wire format directly
  2. Though read_table may have support the read-columns, however, the C++ ParquetFileReader don't know the column it need, so it will parse the whole footer

Currently I don't know a proper way to solve this. Any idea is welcomed.

Advice: maybe cache the deserialized footer is ok in this case? @marcin-krystianc

@marcin-krystianc
Copy link
Author

Hi @mapleFU,

We haven't found any solution to this problem yet, but we are still looking for it.
The size of the metadata footer depends on these three factors multiplied together:

  • number of columns
  • number of row groups
  • size of the metadata for each column chunk (~50 bytes)

In our scenario we use files with many thousands of columns and many row groups.
Our use case is that we know in advance which columns and which row groups we want to read from the file.
Ideal solution would be to make the metadata decoding process to be lazy. So it would only perform the decoding process for relevant column chunks on demand, thus we pay the decoding cost only for what we use. But I don't think that the metadata format even allows for this, because I think the whole metadata footer needs to be read sequentially. What do you think?

The workaround with caching the metadata is an option, but I think it will not work for everyone.
In our use case we want to be able to quickly read thousands of files. If we keep the files open, it is going to cost us many megabytes for each file which in total can be too many gigabytes of RAM to cache the metadata for all the files.

@mapleFU
Copy link
Member

mapleFU commented Nov 7, 2023

Would you mind tell the size of a parquet footer in your file?

Also we don't need to keep the file open. Just caching the FileMetadata like what it does in ParquetFragment

@marcin-krystianc
Copy link
Author

Would you mind tell the size of a parquet footer in your file?

In my tests I'm using 20k columns, 10 row groups which gives about 9-10 megabytes of metadata (exactly 9386184 bytes). But in production we use even larger files (although I don't know exact numbers at the moment).

@mapleFU
Copy link
Member

mapleFU commented Nov 7, 2023

In my tests I'm using 20k columns, 10 row groups which gives about 9-10 megabytes of metadata (exactly 9386184 bytes).

@marcin-krystianc Logic like PraquetFragment and ParquetDataset might works:

  1. A single reader parse and get FileMetadata, it's cpu and memory bound, and will take some time
  2. dispatch it to different row-group with same FileMetadata. This avoid read row-group with parsing same footer

Also, deserialize introduce so many virtual function calls, this can be optimize with some thrift optimizations

@mapleFU
Copy link
Member

mapleFU commented Nov 19, 2023

Also you can try to profile the deserialization, lets find should possible optimizations here. I think the current thrift deserialzation might be low performance..

@marcin-krystianc
Copy link
Author

Also we don't need to keep the file open. Just caching the FileMetadata like what it does in ParquetFragment

That is a quite good workaround, but unfortunately, it will not work for us,
In our scenario, it is not feasible to keep the metadata in memory for all the files we are working with. We would run out of RAM if we did that.

Also you can try to profile the deserialization, lets find should possible optimizations here. I think the current thrift deserialzation might be low performance..

It "feels" to be slow but there are no obvious culprits:

  • I tried to use the "skip" method to skip reading some row groups and columns. Even if we skip everything it only cuts the decoding time in half. Note that if we skip-reading the metadata data, there are basically no virtual calls, and also a lot of objects to store the column and row metadata don't need to be created.
  • I tried a profiler but doesn't show anything obvious
  • My hunch is, (but I don't know how to easily change the implementation to prove it) is that the main inefficiency is caused by memory-copying the data from the input buffer "byte by byte", e.g:
libparquet.so.1500!apache::thrift::transport::TBufferBase::readAll(apache::thrift::transport::TBufferBase * const this, uint8_t * buf, uint32_t len) (\usr\include\thrift\transport\TBufferTransports.h:81)
libparquet.so.1500!apache::thrift::transport::TMemoryBuffer::readAll(apache::thrift::transport::TMemoryBuffer * const this, uint8_t * buf, uint32_t len) (\usr\include\thrift\transport\TBufferTransports.h:696)
libparquet.so.1500!apache::thrift::protocol::TCompactProtocolT<apache::thrift::transport::TMemoryBuffer>::readByte(apache::thrift::protocol::TCompactProtocolT<apache::thrift::transport::TMemoryBuffer> * const this, int8_t & byte) (\usr\include\thrift\protocol\TCompactProtocol.tcc:620)
libparquet.so.1500!apache::thrift::protocol::TCompactProtocolT<apache::thrift::transport::TMemoryBuffer>::readFieldBegin(apache::thrift::protocol::TCompactProtocolT<apache::thrift::transport::TMemoryBuffer> * const this, std::string & name, apache::thrift::protocol::TType & fieldType, int16_t & fieldId) (\usr\include\thrift\protocol\TCompactProtocol.tcc:481)
libparquet.so.1500!apache::thrift::protocol::TVirtualProtocol<apache::thrift::protocol::TCompactProtocolT<apache::thrift::transport::TMemoryBuffer>, apache::thrift::protocol::TProtocolDefaults>::readFieldBegin_virt(apache::thrift::protocol::TVirtualProtocol<apache::thrift::protocol::TCompactProtocolT<apache::thrift::transport::TMemoryBuffer>, apache::thrift::protocol::TProtocolDefaults> * const this, std::string & name, apache::thrift::protocol::TType & fieldType, int16_t & fieldId) (\usr\include\thrift\protocol\TVirtualProtocol.h:415)
libparquet.so.1500!apache::thrift::protocol::TProtocol::readFieldBegin(apache::thrift::protocol::TProtocol * const this, std::string & name, apache::thrift::protocol::TType & fieldType, int16_t & fieldId) (\usr\include\thrift\protocol\TProtocol.h:423)
libparquet.so.1500!parquet::format::FileMetaData::read(parquet::format::FileMetaData * const this, apache::thrift::protocol::TProtocol * iprot) (\src\arrow\cpp\src\generated\parquet_types.cpp:8011)
libparquet.so.1500!parquet::ThriftDeserializer::DeserializeUnencryptedMessage<parquet::format::FileMetaData>(parquet::ThriftDeserializer * const this, const uint8_t * buf, uint32_t * len, parquet::format::FileMetaData * deserialized_msg) (\src\arrow\cpp\src\parquet\thrift_internal.h:455)
libparquet.so.1500!parquet::ThriftDeserializer::DeserializeMessage<parquet::format::FileMetaData>(parquet::ThriftDeserializer * const this, const uint8_t * buf, uint32_t * len, parquet::format::FileMetaData * deserialized_msg, parquet::Decryptor * decryptor) (\src\arrow\cpp\src\parquet\thrift_internal.h:409)
libparquet.so.1500!parquet::FileMetaData::FileMetaDataImpl::FileMetaDataImpl(parquet::FileMetaData::FileMetaDataImpl * const this, const void * metadata, uint32_t * metadata_len, parquet::ReaderProperties properties, std::shared_ptr<parquet::InternalFileDecryptor> file_decryptor) (\src\arrow\cpp\src\parquet\metadata.cc:606)
libparquet.so.1500!parquet::FileMetaData::FileMetaData(parquet::FileMetaData * const this, const void * metadata, uint32_t * metadata_len, const parquet::ReaderProperties & properties, std::shared_ptr<parquet::InternalFileDecryptor> file_decryptor) (\src\arrow\cpp\src\parquet\metadata.cc:884)
libparquet.so.1500!parquet::FileMetaData::Make(const void * metadata, uint32_t * metadata_len, const parquet::ReaderProperties & properties, std::shared_ptr<parquet::InternalFileDecryptor> file_decryptor) (\src\arrow\cpp\src\parquet\metadata.cc:871)
libparquet.so.1500!parquet::SerializedFile::ParseUnencryptedFileMetadata(parquet::SerializedFile * const this, const std::shared_ptr<arrow::Buffer> & metadata_buffer, const uint32_t metadata_len) (\src\arrow\cpp\src\parquet\file_reader.cc:626)
libparquet.so.1500!parquet::SerializedFile::ParseMetaData(parquet::SerializedFile * const this) (\src\arrow\cpp\src\parquet\file_reader.cc:444)
libparquet.so.1500!parquet::ParquetFileReader::Contents::Open(std::shared_ptr<arrow::io::RandomAccessFile> source, const parquet::ReaderProperties & props, std::shared_ptr<parquet::FileMetaData> metadata) (\src\arrow\cpp\src\parquet\file_reader.cc:764)
libparquet.so.1500!parquet::ParquetFileReader::Open(std::shared_ptr<arrow::io::RandomAccessFile> source, const parquet::ReaderProperties & props, std::shared_ptr<parquet::FileMetaData> metadata) (\src\arrow\cpp\src\parquet\file_reader.cc:802)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants