From d107b35492571e4f1af3f046267eddfd881b1771 Mon Sep 17 00:00:00 2001 From: sft-managed Date: Sun, 30 May 2021 21:07:36 +0000 Subject: [PATCH 01/10] allow multiple input files for json reader --- cpp/src/io/json/reader_impl.cu | 32 +++++++++++++++------------- cpp/src/io/json/reader_impl.hpp | 8 +++---- cpp/tests/io/json_test.cpp | 37 +++++++++++++++++++++++++++++++-- 3 files changed, 57 insertions(+), 20 deletions(-) diff --git a/cpp/src/io/json/reader_impl.cu b/cpp/src/io/json/reader_impl.cu index 89bb05f7875..1cc34ebe124 100644 --- a/cpp/src/io/json/reader_impl.cu +++ b/cpp/src/io/json/reader_impl.cu @@ -241,14 +241,18 @@ void reader::impl::ingest_raw_input(size_t range_offset, size_t range_size) // Support delayed opening of the file if using memory mapping datasource // This allows only mapping of a subset of the file if using byte range - if (source_ == nullptr) { - assert(!filepath_.empty()); - source_ = datasource::create(filepath_, range_offset, map_range_size); + if (sources_.empty()) { + assert(!filepaths_.empty()); + sources_.emplace_back(datasource::create(filepaths_[0], range_offset, map_range_size)); } - if (!source_->is_empty()) { - auto data_size = (map_range_size != 0) ? map_range_size : source_->size(); - buffer_ = source_->host_read(range_offset, data_size); + // Iterate through the user defined sources and read the contents into the local buffer + CUDF_EXPECTS(!sources_.empty(), "No sources were defined"); + for (const auto &source : sources_) { + if (!source->is_empty()) { + auto data_size = (map_range_size != 0) ? map_range_size : source->size(); + buffer_ = source->host_read(range_offset, data_size); + } } byte_range_offset_ = range_offset; @@ -266,7 +270,7 @@ void reader::impl::decompress_input(rmm::cuda_stream_view stream) { const auto compression_type = infer_compression_type(options_.get_compression(), - filepath_, + filepaths_[0], // XXX: Is is a requirement that all files have the same compression type? {{"gz", "gzip"}, {"zip", "zip"}, {"bz2", "bz2"}, {"xz", "xz"}}); if (compression_type == "none") { // Do not use the owner vector here to avoid extra copy @@ -620,12 +624,12 @@ table_with_metadata reader::impl::convert_data_to_table(device_span(std::move(out_columns)), metadata_}; } -reader::impl::impl(std::unique_ptr source, - std::string filepath, +reader::impl::impl(std::vector> &&sources, + std::vector filepaths, json_reader_options const &options, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource *mr) - : options_(options), mr_(mr), source_(std::move(source)), filepath_(filepath) + : options_(options), mr_(mr), sources_(std::move(sources)), filepaths_(filepaths) { CUDF_EXPECTS(options_.is_enabled_lines(), "Only JSON Lines format is currently supported.\n"); @@ -679,10 +683,10 @@ reader::reader(std::vector const &filepaths, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource *mr) { - CUDF_EXPECTS(filepaths.size() == 1, "Only a single source is currently supported."); // Delay actual instantiation of data source until read to allow for // partial memory mapping of file using byte ranges - _impl = std::make_unique(nullptr, filepaths[0], options, stream, mr); + std::vector> src; // Empty datasources + _impl = std::make_unique(std::move(src), filepaths, options, stream, mr); } // Forward to implementation @@ -691,8 +695,8 @@ reader::reader(std::vector> &&sources, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource *mr) { - CUDF_EXPECTS(sources.size() == 1, "Only a single source is currently supported."); - _impl = std::make_unique(std::move(sources[0]), "", options, stream, mr); + std::vector file_paths; // Empty filepaths + _impl = std::make_unique(std::move(sources), file_paths, options, stream, mr); } // Destructor within this translation unit diff --git a/cpp/src/io/json/reader_impl.hpp b/cpp/src/io/json/reader_impl.hpp index cb413630d07..3b571546e7f 100644 --- a/cpp/src/io/json/reader_impl.hpp +++ b/cpp/src/io/json/reader_impl.hpp @@ -56,8 +56,8 @@ class reader::impl { rmm::mr::device_memory_resource *mr_ = nullptr; - std::unique_ptr source_; - std::string filepath_; + std::vector> &&sources_; + std::vector filepaths_; std::unique_ptr buffer_; const char *uncomp_data_ = nullptr; @@ -182,8 +182,8 @@ class reader::impl { /** * @brief Constructor from a dataset source with reader options. */ - explicit impl(std::unique_ptr source, - std::string filepath, + explicit impl(std::vector> &&sources, + std::vector filepaths, json_reader_options const &options, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource *mr); diff --git a/cpp/tests/io/json_test.cpp b/cpp/tests/io/json_test.cpp index 76dba91e3a7..b0161aa18d6 100644 --- a/cpp/tests/io/json_test.cpp +++ b/cpp/tests/io/json_test.cpp @@ -145,6 +145,7 @@ void check_float_column(cudf::column_view const& col, struct JsonReaderTest : public cudf::test::BaseFixture { }; +/* TEST_F(JsonReaderTest, BasicJsonLines) { std::string data = "[1, 1.1]\n[2, 2.2]\n[3, 3.3]\n"; @@ -614,7 +615,7 @@ TEST_F(JsonReaderTest, JsonLinesObjectsOutOfOrder) CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(2), cudf::test::strings_column_wrapper({"aaa", "bbb"})); } - +*/ /* // currently, the json reader is strict about having non-empty input. TEST_F(JsonReaderTest, EmptyFile) { @@ -648,7 +649,7 @@ TEST_F(JsonReaderTest, NoDataFile) { EXPECT_EQ(0, view.num_columns()); } */ - +/* TEST_F(JsonReaderTest, ArrowFileSource) { const std::string fname = temp_env->get_temp_dir() + "ArrowFileSource.csv"; @@ -860,5 +861,37 @@ TEST_F(JsonReaderTest, ParseOutOfRangeIntegers) CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(input_less_int64_min_append, view.column(8)); CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(input_mixed_range_append, view.column(9)); } +*/ +TEST_F(JsonReaderTest, JsonLinesMultipleFileInputs) +{ + const std::string file1 = temp_env->get_temp_dir() + "JsonLinesFileTest1.json"; + std::ofstream outfile(file1, std::ofstream::out); + outfile << "[11, 1.1]\n[22, 2.2]"; + outfile.close(); + + const std::string file2 = temp_env->get_temp_dir() + "JsonLinesFileTest2.json"; + std::ofstream outfile(file2, std::ofstream::out); + outfile << "[11, 1.1]\n[22, 2.2]"; + outfile.close(); + + cudf_io::json_reader_options in_options = + cudf_io::json_reader_options::builder(cudf_io::source_info{{file1, file2}}).lines(true); + + cudf_io::table_with_metadata result = cudf_io::read_json(in_options); + + EXPECT_EQ(result.tbl->num_columns(), 2); + EXPECT_EQ(result.tbl->num_rows(), 4); + + EXPECT_EQ(result.tbl->get_column(0).type().id(), cudf::type_id::INT64); + EXPECT_EQ(result.tbl->get_column(1).type().id(), cudf::type_id::FLOAT64); + + EXPECT_EQ(std::string(result.metadata.column_names[0]), "0"); + EXPECT_EQ(std::string(result.metadata.column_names[1]), "1"); + + auto validity = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return true; }); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(0), int64_wrapper{{11, 22}, validity}); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(1), float64_wrapper{{1.1, 2.2}, validity}); +} CUDF_TEST_PROGRAM_MAIN() From ebd7b95f3a38cedb8b0d87fbffc004844a076e3f Mon Sep 17 00:00:00 2001 From: Jeremy Dyer Date: Sun, 30 May 2021 19:26:04 -0400 Subject: [PATCH 02/10] updates for sources_ and buffer_ --- cpp/src/io/json/reader_impl.cu | 17 ++++++++++++----- cpp/src/io/json/reader_impl.hpp | 2 +- cpp/tests/io/json_test.cpp | 6 +++--- 3 files changed, 16 insertions(+), 9 deletions(-) diff --git a/cpp/src/io/json/reader_impl.cu b/cpp/src/io/json/reader_impl.cu index 1cc34ebe124..c5666896cce 100644 --- a/cpp/src/io/json/reader_impl.cu +++ b/cpp/src/io/json/reader_impl.cu @@ -227,7 +227,7 @@ std::pair, col_map_ptr_type> reader::impl::get_json_obj /** * @brief Ingest input JSON file/buffer, without decompression. * - * Sets the source_, byte_range_offset_, and byte_range_size_ data members + * Sets the sources_, byte_range_offset_, and byte_range_size_ data members * * @param[in] range_offset Number of bytes offset from the start * @param[in] range_size Bytes to read; use `0` for all remaining data @@ -243,7 +243,9 @@ void reader::impl::ingest_raw_input(size_t range_offset, size_t range_size) // This allows only mapping of a subset of the file if using byte range if (sources_.empty()) { assert(!filepaths_.empty()); - sources_.emplace_back(datasource::create(filepaths_[0], range_offset, map_range_size)); + for (const auto &path : filepaths_) { + sources_.emplace_back(datasource::create(path, range_offset, map_range_size)); + } } // Iterate through the user defined sources and read the contents into the local buffer @@ -251,7 +253,12 @@ void reader::impl::ingest_raw_input(size_t range_offset, size_t range_size) for (const auto &source : sources_) { if (!source->is_empty()) { auto data_size = (map_range_size != 0) ? map_range_size : source->size(); - buffer_ = source->host_read(range_offset, data_size); + if (buffer_ == nullptr || buffer_->size() == 0) { + // XXX: Read to an existing buffer in host memory here instead of getting a new buffer_ each time (aka append) + buffer_ = source->host_read(range_offset, data_size); + } else { + printf("Buffer_ already has some data. Lets append to it\n"); + } } } @@ -685,7 +692,7 @@ reader::reader(std::vector const &filepaths, { // Delay actual instantiation of data source until read to allow for // partial memory mapping of file using byte ranges - std::vector> src; // Empty datasources + std::vector> src = {}; // Empty datasources _impl = std::make_unique(std::move(src), filepaths, options, stream, mr); } @@ -695,7 +702,7 @@ reader::reader(std::vector> &&sources, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource *mr) { - std::vector file_paths; // Empty filepaths + std::vector file_paths = {}; // Empty filepaths _impl = std::make_unique(std::move(sources), file_paths, options, stream, mr); } diff --git a/cpp/src/io/json/reader_impl.hpp b/cpp/src/io/json/reader_impl.hpp index 3b571546e7f..ab27029e4c4 100644 --- a/cpp/src/io/json/reader_impl.hpp +++ b/cpp/src/io/json/reader_impl.hpp @@ -56,7 +56,7 @@ class reader::impl { rmm::mr::device_memory_resource *mr_ = nullptr; - std::vector> &&sources_; + std::vector> sources_; std::vector filepaths_; std::unique_ptr buffer_; diff --git a/cpp/tests/io/json_test.cpp b/cpp/tests/io/json_test.cpp index b0161aa18d6..86643ff4ca8 100644 --- a/cpp/tests/io/json_test.cpp +++ b/cpp/tests/io/json_test.cpp @@ -870,9 +870,9 @@ TEST_F(JsonReaderTest, JsonLinesMultipleFileInputs) outfile.close(); const std::string file2 = temp_env->get_temp_dir() + "JsonLinesFileTest2.json"; - std::ofstream outfile(file2, std::ofstream::out); - outfile << "[11, 1.1]\n[22, 2.2]"; - outfile.close(); + std::ofstream outfile2(file2, std::ofstream::out); + outfile2 << "[11, 1.1]\n[22, 2.2]"; + outfile2.close(); cudf_io::json_reader_options in_options = cudf_io::json_reader_options::builder(cudf_io::source_info{{file1, file2}}).lines(true); From 12e8057f217ee8243ae7f28aee7e816b026f72ed Mon Sep 17 00:00:00 2001 From: Jeremy Dyer Date: Mon, 31 May 2021 15:32:18 -0400 Subject: [PATCH 03/10] updates --- cpp/src/io/json/reader_impl.cu | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/cpp/src/io/json/reader_impl.cu b/cpp/src/io/json/reader_impl.cu index c5666896cce..3f7345c9047 100644 --- a/cpp/src/io/json/reader_impl.cu +++ b/cpp/src/io/json/reader_impl.cu @@ -250,6 +250,15 @@ void reader::impl::ingest_raw_input(size_t range_offset, size_t range_size) // Iterate through the user defined sources and read the contents into the local buffer CUDF_EXPECTS(!sources_.empty(), "No sources were defined"); + size_t total_source_size = 0; + for (const auto &source : sources_) { + total_source_size += source->size(); + } + + // Create the buffer for holding the json data in host memory + uint8_t *host_buffer = new uint8_t[total_source_size]; // XXX: This needs to be deleted + buffer_ = std::make_unique(host_buffer, total_source_size); + for (const auto &source : sources_) { if (!source->is_empty()) { auto data_size = (map_range_size != 0) ? map_range_size : source->size(); From 0a68732ed72aab2a4b9e670bd7284cb1078c8182 Mon Sep 17 00:00:00 2001 From: Jeremy Dyer Date: Tue, 1 Jun 2021 15:38:00 -0400 Subject: [PATCH 04/10] changes per review --- cpp/src/io/json/reader_impl.cu | 27 +++++++++++++-------------- cpp/src/io/json/reader_impl.hpp | 2 +- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/cpp/src/io/json/reader_impl.cu b/cpp/src/io/json/reader_impl.cu index 3f7345c9047..4dd6ad4c2ce 100644 --- a/cpp/src/io/json/reader_impl.cu +++ b/cpp/src/io/json/reader_impl.cu @@ -251,20 +251,19 @@ void reader::impl::ingest_raw_input(size_t range_offset, size_t range_size) // Iterate through the user defined sources and read the contents into the local buffer CUDF_EXPECTS(!sources_.empty(), "No sources were defined"); size_t total_source_size = 0; - for (const auto &source : sources_) { - total_source_size += source->size(); - } + for (const auto &source : sources_) { total_source_size += source->size(); } // Create the buffer for holding the json data in host memory - uint8_t *host_buffer = new uint8_t[total_source_size]; // XXX: This needs to be deleted - buffer_ = std::make_unique(host_buffer, total_source_size); + uint8_t *host_buffer = new uint8_t[total_source_size]; // XXX: This needs to be deleted + buffer_ = std::make_unique(host_buffer, total_source_size); for (const auto &source : sources_) { if (!source->is_empty()) { auto data_size = (map_range_size != 0) ? map_range_size : source->size(); if (buffer_ == nullptr || buffer_->size() == 0) { - // XXX: Read to an existing buffer in host memory here instead of getting a new buffer_ each time (aka append) - buffer_ = source->host_read(range_offset, data_size); + // XXX: Read to an existing buffer in host memory here instead of getting a new buffer_ each + // time (aka append) + buffer_ = source->host_read(range_offset, data_size); } else { printf("Buffer_ already has some data. Lets append to it\n"); } @@ -284,10 +283,10 @@ void reader::impl::ingest_raw_input(size_t range_offset, size_t range_size) */ void reader::impl::decompress_input(rmm::cuda_stream_view stream) { - const auto compression_type = - infer_compression_type(options_.get_compression(), - filepaths_[0], // XXX: Is is a requirement that all files have the same compression type? - {{"gz", "gzip"}, {"zip", "zip"}, {"bz2", "bz2"}, {"xz", "xz"}}); + const auto compression_type = infer_compression_type( + options_.get_compression(), + filepaths_[0], // XXX: Is is a requirement that all files have the same compression type? + {{"gz", "gzip"}, {"zip", "zip"}, {"bz2", "bz2"}, {"xz", "xz"}}); if (compression_type == "none") { // Do not use the owner vector here to avoid extra copy uncomp_data_ = reinterpret_cast(buffer_->data()); @@ -641,7 +640,7 @@ table_with_metadata reader::impl::convert_data_to_table(device_span> &&sources, - std::vector filepaths, + std::vector const &filepaths, json_reader_options const &options, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource *mr) @@ -701,7 +700,7 @@ reader::reader(std::vector const &filepaths, { // Delay actual instantiation of data source until read to allow for // partial memory mapping of file using byte ranges - std::vector> src = {}; // Empty datasources + std::vector> src = {}; // Empty datasources _impl = std::make_unique(std::move(src), filepaths, options, stream, mr); } @@ -711,7 +710,7 @@ reader::reader(std::vector> &&sources, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource *mr) { - std::vector file_paths = {}; // Empty filepaths + std::vector file_paths = {}; // Empty filepaths _impl = std::make_unique(std::move(sources), file_paths, options, stream, mr); } diff --git a/cpp/src/io/json/reader_impl.hpp b/cpp/src/io/json/reader_impl.hpp index ab27029e4c4..1da04af3e7f 100644 --- a/cpp/src/io/json/reader_impl.hpp +++ b/cpp/src/io/json/reader_impl.hpp @@ -183,7 +183,7 @@ class reader::impl { * @brief Constructor from a dataset source with reader options. */ explicit impl(std::vector> &&sources, - std::vector filepaths, + std::vector const &filepaths, json_reader_options const &options, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource *mr); From 60cfea7a964feecbcdb736e0c59807bf54b5e5a9 Mon Sep 17 00:00:00 2001 From: Jeremy Dyer Date: Tue, 1 Jun 2021 18:36:44 -0400 Subject: [PATCH 05/10] updated host memory buffer --- cpp/src/io/json/reader_impl.cu | 32 ++++++++++++-------------------- cpp/src/io/json/reader_impl.hpp | 2 +- 2 files changed, 13 insertions(+), 21 deletions(-) diff --git a/cpp/src/io/json/reader_impl.cu b/cpp/src/io/json/reader_impl.cu index 4dd6ad4c2ce..15a95d3340f 100644 --- a/cpp/src/io/json/reader_impl.cu +++ b/cpp/src/io/json/reader_impl.cu @@ -253,20 +253,12 @@ void reader::impl::ingest_raw_input(size_t range_offset, size_t range_size) size_t total_source_size = 0; for (const auto &source : sources_) { total_source_size += source->size(); } - // Create the buffer for holding the json data in host memory - uint8_t *host_buffer = new uint8_t[total_source_size]; // XXX: This needs to be deleted - buffer_ = std::make_unique(host_buffer, total_source_size); - + buffer_.resize(total_source_size); + size_t bytes_read = 0; for (const auto &source : sources_) { if (!source->is_empty()) { auto data_size = (map_range_size != 0) ? map_range_size : source->size(); - if (buffer_ == nullptr || buffer_->size() == 0) { - // XXX: Read to an existing buffer in host memory here instead of getting a new buffer_ each - // time (aka append) - buffer_ = source->host_read(range_offset, data_size); - } else { - printf("Buffer_ already has some data. Lets append to it\n"); - } + bytes_read += source->host_read(range_offset, data_size, &buffer_[bytes_read]); } } @@ -283,19 +275,19 @@ void reader::impl::ingest_raw_input(size_t range_offset, size_t range_size) */ void reader::impl::decompress_input(rmm::cuda_stream_view stream) { - const auto compression_type = infer_compression_type( - options_.get_compression(), - filepaths_[0], // XXX: Is is a requirement that all files have the same compression type? - {{"gz", "gzip"}, {"zip", "zip"}, {"bz2", "bz2"}, {"xz", "xz"}}); + const auto compression_type = + infer_compression_type(options_.get_compression(), + filepaths_[0], + {{"gz", "gzip"}, {"zip", "zip"}, {"bz2", "bz2"}, {"xz", "xz"}}); if (compression_type == "none") { // Do not use the owner vector here to avoid extra copy - uncomp_data_ = reinterpret_cast(buffer_->data()); - uncomp_size_ = buffer_->size(); + uncomp_data_ = reinterpret_cast(buffer_.data()); + uncomp_size_ = buffer_.size(); } else { uncomp_data_owner_ = get_uncompressed_data( // host_span( // - reinterpret_cast(buffer_->data()), - buffer_->size()), + reinterpret_cast(buffer_.data()), + buffer_.size()), compression_type); uncomp_data_ = uncomp_data_owner_.data(); @@ -671,7 +663,7 @@ table_with_metadata reader::impl::read(json_reader_options const &options, auto range_size = options.get_byte_range_size(); ingest_raw_input(range_offset, range_size); - CUDF_EXPECTS(buffer_ != nullptr, "Ingest failed: input data is null.\n"); + CUDF_EXPECTS(buffer_.size() != 0, "Ingest failed: input data is null.\n"); decompress_input(stream); CUDF_EXPECTS(uncomp_data_ != nullptr, "Ingest failed: uncompressed input data is null.\n"); diff --git a/cpp/src/io/json/reader_impl.hpp b/cpp/src/io/json/reader_impl.hpp index 1da04af3e7f..6535779e844 100644 --- a/cpp/src/io/json/reader_impl.hpp +++ b/cpp/src/io/json/reader_impl.hpp @@ -58,7 +58,7 @@ class reader::impl { std::vector> sources_; std::vector filepaths_; - std::unique_ptr buffer_; + std::vector buffer_; const char *uncomp_data_ = nullptr; size_t uncomp_size_ = 0; From c3afcdbb22f35c9d283635bb5ab4a1c98febbac4 Mon Sep 17 00:00:00 2001 From: Jeremy Dyer Date: Sun, 6 Jun 2021 09:23:36 -0400 Subject: [PATCH 06/10] updated test to account for extra rows that were being emitted due to reading from multiple input files --- cpp/tests/io/json_test.cpp | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/cpp/tests/io/json_test.cpp b/cpp/tests/io/json_test.cpp index 86643ff4ca8..426a39ce9d3 100644 --- a/cpp/tests/io/json_test.cpp +++ b/cpp/tests/io/json_test.cpp @@ -866,12 +866,12 @@ TEST_F(JsonReaderTest, JsonLinesMultipleFileInputs) { const std::string file1 = temp_env->get_temp_dir() + "JsonLinesFileTest1.json"; std::ofstream outfile(file1, std::ofstream::out); - outfile << "[11, 1.1]\n[22, 2.2]"; + outfile << "[11, 1.1]\n[22, 2.2]\n"; outfile.close(); const std::string file2 = temp_env->get_temp_dir() + "JsonLinesFileTest2.json"; std::ofstream outfile2(file2, std::ofstream::out); - outfile2 << "[11, 1.1]\n[22, 2.2]"; + outfile2 << "[33, 3.3]\n[44, 4.4]"; outfile2.close(); cudf_io::json_reader_options in_options = @@ -890,8 +890,10 @@ TEST_F(JsonReaderTest, JsonLinesMultipleFileInputs) auto validity = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return true; }); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(0), int64_wrapper{{11, 22}, validity}); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(1), float64_wrapper{{1.1, 2.2}, validity}); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(0), + int64_wrapper{{11, 22, 33, 44}, validity}); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(1), + float64_wrapper{{1.1, 2.2, 3.3, 4.4}, validity}); } CUDF_TEST_PROGRAM_MAIN() From 992cd5a82f6e30804d75d7b9d60cdb8094573168 Mon Sep 17 00:00:00 2001 From: Jeremy Dyer Date: Mon, 7 Jun 2021 11:16:56 -0400 Subject: [PATCH 07/10] Updates to python and cpp to ensure that filepaths is not empty for selecting compression type --- cpp/src/io/json/reader_impl.cu | 2 +- python/cudf/cudf/_lib/json.pyx | 18 +++++------ python/cudf/cudf/io/json.py | 59 ++++++++++++++++++++++++++-------- 3 files changed, 56 insertions(+), 23 deletions(-) diff --git a/cpp/src/io/json/reader_impl.cu b/cpp/src/io/json/reader_impl.cu index 15a95d3340f..b31184157e2 100644 --- a/cpp/src/io/json/reader_impl.cu +++ b/cpp/src/io/json/reader_impl.cu @@ -277,7 +277,7 @@ void reader::impl::decompress_input(rmm::cuda_stream_view stream) { const auto compression_type = infer_compression_type(options_.get_compression(), - filepaths_[0], + filepaths_.size() > 0 ? filepaths_[0] : "", {{"gz", "gzip"}, {"zip", "zip"}, {"bz2", "bz2"}, {"xz", "xz"}}); if (compression_type == "none") { // Do not use the owner vector here to avoid extra copy diff --git a/python/cudf/cudf/_lib/json.pyx b/python/cudf/cudf/_lib/json.pyx index 7a6ec47ab66..48538c50f88 100644 --- a/python/cudf/cudf/_lib/json.pyx +++ b/python/cudf/cudf/_lib/json.pyx @@ -23,7 +23,7 @@ from cudf._lib.table cimport Table cimport cudf._lib.cpp.io.types as cudf_io_types -cpdef read_json(object filepath_or_buffer, +cpdef read_json(object filepaths_or_buffers, object dtype, bool lines, object compression, @@ -37,16 +37,16 @@ cpdef read_json(object filepath_or_buffer, cudf.io.json.to_json """ - # Determine read source - path_or_data = filepath_or_buffer - # If input data is a JSON string (or StringIO), hold a reference to # the encoded memoryview externally to ensure the encoded buffer # isn't destroyed before calling libcudf++ `read_json()` - if isinstance(path_or_data, io.StringIO): - path_or_data = path_or_data.read().encode() - elif isinstance(path_or_data, str) and not os.path.isfile(path_or_data): - path_or_data = path_or_data.encode() + for idx in range(len(filepaths_or_buffers)): + if isinstance(filepaths_or_buffers[idx], io.StringIO): + filepaths_or_buffers[idx] = \ + filepaths_or_buffers[idx].read().encode() + elif isinstance(filepaths_or_buffers[idx], str) and \ + not os.path.isfile(filepaths_or_buffers[idx]): + filepaths_or_buffers[idx] = filepaths_or_buffers[idx].encode() # Setup arguments cdef vector[string] c_dtypes @@ -95,7 +95,7 @@ cpdef read_json(object filepath_or_buffer, c_dtypes.push_back(str(col_dtype).encode()) cdef json_reader_options opts = move( - json_reader_options.builder(make_source_info([path_or_data])) + json_reader_options.builder(make_source_info(filepaths_or_buffers)) .dtypes(c_dtypes) .compression(c_compression) .lines(c_lines) diff --git a/python/cudf/cudf/io/json.py b/python/cudf/cudf/io/json.py index 96d8cac3afb..aec92299126 100644 --- a/python/cudf/cudf/io/json.py +++ b/python/cudf/cudf/io/json.py @@ -3,10 +3,13 @@ from io import BytesIO, StringIO import pandas as pd +from fsspec.core import get_fs_token_paths +from fsspec.utils import stringify_path import cudf from cudf._lib import json as libjson from cudf.utils import ioutils +from cudf.utils.dtypes import is_list_like @ioutils.doc_read_json() @@ -27,24 +30,37 @@ def read_json( if engine == "auto": engine = "cudf" if lines else "pandas" - is_single_filepath_or_buffer = ioutils.ensure_single_filepath_or_buffer( - path_or_data=path_or_buf, **kwargs, - ) - if not is_single_filepath_or_buffer: - raise NotImplementedError( - "`read_json` does not yet support reading multiple files" + # Multiple sources are passed as a list. If a single source is passed, + # wrap it in a list for unified processing downstream. + if not is_list_like(path_or_buf): + path_or_buf = [path_or_buf] + + filepaths_or_buffers = [] + for source in path_or_buf: + if ioutils.is_directory(source, **kwargs): + fs = _ensure_filesystem(passed_filesystem=None, path=source) + source = stringify_path(source) + source = fs.sep.join([source, "*.json"]) + + tmp_source, compression = ioutils.get_filepath_or_buffer( + path_or_data=source, + compression=None, + iotypes=(BytesIO, StringIO), + **kwargs, ) + if compression is not None: + raise ValueError( + "URL content-encoding decompression is not supported" + ) + if isinstance(tmp_source, list): + filepaths_or_buffers.extend(tmp_source) + else: + filepaths_or_buffers.append(tmp_source) - path_or_buf, compression = ioutils.get_filepath_or_buffer( - path_or_data=path_or_buf, - compression=compression, - iotypes=(BytesIO, StringIO), - **kwargs, - ) if engine == "cudf": return cudf.DataFrame._from_table( libjson.read_json( - path_or_buf, dtype, lines, compression, byte_range + filepaths_or_buffers, dtype, lines, compression, byte_range ) ) else: @@ -61,6 +77,15 @@ def read_json( **kwargs, ) else: + # Pandas cannot accept a list of sources so we must narrow and warn + if is_list_like(path_or_buf): + if len(path_or_buf) > 1: + raise ValueError( + "Pandas cannot accept a list of input " + + "sources, size of input list is: " + + str(len(path_or_buf)) + ) + path_or_buf = path_or_buf[0] pd_value = pd.read_json( path_or_buf, lines=lines, @@ -74,6 +99,14 @@ def read_json( return df +def _ensure_filesystem(passed_filesystem, path): + if passed_filesystem is None: + return get_fs_token_paths(path[0] if isinstance(path, list) else path)[ + 0 + ] + return passed_filesystem + + @ioutils.doc_to_json() def to_json(cudf_val, path_or_buf=None, *args, **kwargs): """{docstring}""" From 81c444eabcdb8203c61d5e18d9d2663c65cddf54 Mon Sep 17 00:00:00 2001 From: Jeremy Dyer Date: Mon, 7 Jun 2021 16:24:43 -0400 Subject: [PATCH 08/10] test fixes --- cpp/src/io/json/reader_impl.cu | 1 + python/cudf/cudf/io/json.py | 60 +++++++++------------------------- 2 files changed, 17 insertions(+), 44 deletions(-) diff --git a/cpp/src/io/json/reader_impl.cu b/cpp/src/io/json/reader_impl.cu index b31184157e2..28d03896e44 100644 --- a/cpp/src/io/json/reader_impl.cu +++ b/cpp/src/io/json/reader_impl.cu @@ -252,6 +252,7 @@ void reader::impl::ingest_raw_input(size_t range_offset, size_t range_size) CUDF_EXPECTS(!sources_.empty(), "No sources were defined"); size_t total_source_size = 0; for (const auto &source : sources_) { total_source_size += source->size(); } + total_source_size = total_source_size - range_offset; buffer_.resize(total_source_size); size_t bytes_read = 0; diff --git a/python/cudf/cudf/io/json.py b/python/cudf/cudf/io/json.py index aec92299126..d0660e35a0d 100644 --- a/python/cudf/cudf/io/json.py +++ b/python/cudf/cudf/io/json.py @@ -3,8 +3,6 @@ from io import BytesIO, StringIO import pandas as pd -from fsspec.core import get_fs_token_paths -from fsspec.utils import stringify_path import cudf from cudf._lib import json as libjson @@ -30,34 +28,25 @@ def read_json( if engine == "auto": engine = "cudf" if lines else "pandas" - # Multiple sources are passed as a list. If a single source is passed, - # wrap it in a list for unified processing downstream. - if not is_list_like(path_or_buf): - path_or_buf = [path_or_buf] - - filepaths_or_buffers = [] - for source in path_or_buf: - if ioutils.is_directory(source, **kwargs): - fs = _ensure_filesystem(passed_filesystem=None, path=source) - source = stringify_path(source) - source = fs.sep.join([source, "*.json"]) + if engine == "cudf": + # Multiple sources are passed as a list. If a single source is passed, + # wrap it in a list for unified processing downstream. + if not is_list_like(path_or_buf): + path_or_buf = [path_or_buf] - tmp_source, compression = ioutils.get_filepath_or_buffer( - path_or_data=source, - compression=None, - iotypes=(BytesIO, StringIO), - **kwargs, - ) - if compression is not None: - raise ValueError( - "URL content-encoding decompression is not supported" + filepaths_or_buffers = [] + for source in path_or_buf: + tmp_source, compression = ioutils.get_filepath_or_buffer( + path_or_data=source, + compression=compression, + iotypes=(BytesIO, StringIO), + **kwargs, ) - if isinstance(tmp_source, list): - filepaths_or_buffers.extend(tmp_source) - else: - filepaths_or_buffers.append(tmp_source) + if isinstance(tmp_source, list): + filepaths_or_buffers.extend(tmp_source) + else: + filepaths_or_buffers.append(tmp_source) - if engine == "cudf": return cudf.DataFrame._from_table( libjson.read_json( filepaths_or_buffers, dtype, lines, compression, byte_range @@ -77,15 +66,6 @@ def read_json( **kwargs, ) else: - # Pandas cannot accept a list of sources so we must narrow and warn - if is_list_like(path_or_buf): - if len(path_or_buf) > 1: - raise ValueError( - "Pandas cannot accept a list of input " - + "sources, size of input list is: " - + str(len(path_or_buf)) - ) - path_or_buf = path_or_buf[0] pd_value = pd.read_json( path_or_buf, lines=lines, @@ -99,14 +79,6 @@ def read_json( return df -def _ensure_filesystem(passed_filesystem, path): - if passed_filesystem is None: - return get_fs_token_paths(path[0] if isinstance(path, list) else path)[ - 0 - ] - return passed_filesystem - - @ioutils.doc_to_json() def to_json(cudf_val, path_or_buf=None, *args, **kwargs): """{docstring}""" From a7452c61d6aa2f49f234b7ac318c9e8aad3958e8 Mon Sep 17 00:00:00 2001 From: Jeremy Dyer Date: Tue, 15 Jun 2021 13:51:04 +0000 Subject: [PATCH 09/10] Added test for multiple inputs and read_json --- python/cudf/cudf/io/json.py | 17 ++++++++++++++++- python/cudf/cudf/tests/test_json.py | 19 +++++++++++++++++++ 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/python/cudf/cudf/io/json.py b/python/cudf/cudf/io/json.py index d0660e35a0d..6d8fcad7364 100644 --- a/python/cudf/cudf/io/json.py +++ b/python/cudf/cudf/io/json.py @@ -27,7 +27,6 @@ def read_json( raise ValueError("cudf engine only supports JSON Lines format") if engine == "auto": engine = "cudf" if lines else "pandas" - if engine == "cudf": # Multiple sources are passed as a list. If a single source is passed, # wrap it in a list for unified processing downstream. @@ -57,6 +56,22 @@ def read_json( "Using CPU via Pandas to read JSON dataset, this may " "be GPU accelerated in the future" ) + + if not ioutils.ensure_single_filepath_or_buffer( + path_or_data=path_or_buf, **kwargs, + ): + raise NotImplementedError( + "`read_json` does not yet support reading " + "multiple files via pandas" + ) + + path_or_buf, compression = ioutils.get_filepath_or_buffer( + path_or_data=path_or_buf, + compression=compression, + iotypes=(BytesIO, StringIO), + **kwargs, + ) + if kwargs.get("orient") == "table": pd_value = pd.read_json( path_or_buf, diff --git a/python/cudf/cudf/tests/test_json.py b/python/cudf/cudf/tests/test_json.py index e0a922f35fe..4cd906377d6 100644 --- a/python/cudf/cudf/tests/test_json.py +++ b/python/cudf/cudf/tests/test_json.py @@ -193,6 +193,25 @@ def test_json_lines_basic(json_input, engine): np.testing.assert_array_equal(pd_df[pd_col], cu_df[cu_col].to_array()) +@pytest.mark.filterwarnings("ignore:Using CPU") +@pytest.mark.parametrize("engine", ["auto", "cudf"]) +def test_json_lines_multiple(tmpdir, json_input, engine): + tmp_file1 = tmpdir.join("MultiInputs1.json") + tmp_file2 = tmpdir.join("MultiInputs2.json") + + pdf = pd.read_json(json_input, lines=True) + pdf.to_json(tmp_file1, compression="infer", lines=True, orient="records") + pdf.to_json(tmp_file2, compression="infer", lines=True, orient="records") + + cu_df = cudf.read_json([tmp_file1, tmp_file2], engine=engine, lines=True) + pd_df = pd.concat([pdf, pdf]) + + assert all(cu_df.dtypes == ["int64", "int64", "int64"]) + for cu_col, pd_col in zip(cu_df.columns, pd_df.columns): + assert str(cu_col) == str(pd_col) + np.testing.assert_array_equal(pd_df[pd_col], cu_df[cu_col].to_array()) + + def test_json_lines_byte_range(json_input): # include the first row and half of the second row # should parse the first two rows From b89f08545b7a4b9d6c062d9a9e37362187ac17f1 Mon Sep 17 00:00:00 2001 From: Jeremy Dyer Date: Wed, 16 Jun 2021 21:48:39 +0000 Subject: [PATCH 10/10] Added documentation, extra test cases, and a change to ioutils for when an unknown file protocol is encountered --- python/cudf/cudf/io/json.py | 14 +++++++++++++ python/cudf/cudf/tests/test_json.py | 31 +++++++++++++++++++++++++++++ python/cudf/cudf/utils/ioutils.py | 8 +++++--- 3 files changed, 50 insertions(+), 3 deletions(-) diff --git a/python/cudf/cudf/io/json.py b/python/cudf/cudf/io/json.py index 6d8fcad7364..a6a6c05a54e 100644 --- a/python/cudf/cudf/io/json.py +++ b/python/cudf/cudf/io/json.py @@ -3,6 +3,7 @@ from io import BytesIO, StringIO import pandas as pd +from fsspec.core import get_fs_token_paths import cudf from cudf._lib import json as libjson @@ -10,6 +11,14 @@ from cudf.utils.dtypes import is_list_like +def _ensure_filesystem(passed_filesystem, path): + if passed_filesystem is None: + return get_fs_token_paths(path[0] if isinstance(path, list) else path)[ + 0 + ] + return passed_filesystem + + @ioutils.doc_read_json() def read_json( path_or_buf, @@ -35,6 +44,11 @@ def read_json( filepaths_or_buffers = [] for source in path_or_buf: + if ioutils.is_directory(source, **kwargs): + fs = _ensure_filesystem(passed_filesystem=None, path=source) + source = ioutils.stringify_pathlike(source) + source = fs.sep.join([source, "*.json"]) + tmp_source, compression = ioutils.get_filepath_or_buffer( path_or_data=source, compression=compression, diff --git a/python/cudf/cudf/tests/test_json.py b/python/cudf/cudf/tests/test_json.py index 4cd906377d6..2da2cea164f 100644 --- a/python/cudf/cudf/tests/test_json.py +++ b/python/cudf/cudf/tests/test_json.py @@ -212,6 +212,37 @@ def test_json_lines_multiple(tmpdir, json_input, engine): np.testing.assert_array_equal(pd_df[pd_col], cu_df[cu_col].to_array()) +@pytest.mark.parametrize("engine", ["auto", "cudf"]) +def test_json_read_directory(tmpdir, json_input, engine): + pdf = pd.read_json(json_input, lines=True) + pdf.to_json( + tmpdir.join("MultiInputs1.json"), + compression="infer", + lines=True, + orient="records", + ) + pdf.to_json( + tmpdir.join("MultiInputs2.json"), + compression="infer", + lines=True, + orient="records", + ) + pdf.to_json( + tmpdir.join("MultiInputs3.json"), + compression="infer", + lines=True, + orient="records", + ) + + cu_df = cudf.read_json(tmpdir, engine=engine, lines=True) + pd_df = pd.concat([pdf, pdf, pdf]) + + assert all(cu_df.dtypes == ["int64", "int64", "int64"]) + for cu_col, pd_col in zip(cu_df.columns, pd_df.columns): + assert str(cu_col) == str(pd_col) + np.testing.assert_array_equal(pd_df[pd_col], cu_df[cu_col].to_array()) + + def test_json_lines_byte_range(json_input): # include the first row and half of the second row # should parse the first two rows diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index 15120fd8fab..d77ba93f9e8 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -390,11 +390,13 @@ Parameters ---------- -path_or_buf : str, path object, or file-like object +path_or_buf : list, str, path object, or file-like object Either JSON data in a `str`, path to a file (a `str`, `pathlib.Path`, or `py._path.local.LocalPath`), URL (including http, ftp, and S3 locations), or any object with a `read()` method (such as builtin `open()` file handler - function or `StringIO`). + function or `StringIO`). Multiple inputs may be provided as a list. If a + list is specified each list entry may be of a different input type as long + as each input is of a valid type and all input JSON schema(s) match. engine : {{ 'auto', 'cudf', 'pandas' }}, default 'auto' Parser engine to use. If 'auto' is passed, the engine will be automatically selected based on the other parameters. @@ -1086,7 +1088,7 @@ def is_directory(path_or_data, **kwargs): ) except ValueError as e: if str(e).startswith("Protocol not known"): - return True + return False else: raise e