Skip to content

Commit

Permalink
rename is_csv to is_csv_or_json
Browse files Browse the repository at this point in the history
scaffold json methods

scaffold on the arrow json loader

Add benchmark support, add python tests, add JSON array -> JSONL converter
  • Loading branch information
timkpaine committed Sep 28, 2023
1 parent afa7438 commit 04030f5
Show file tree
Hide file tree
Showing 18 changed files with 682 additions and 58 deletions.
18 changes: 15 additions & 3 deletions cmake/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,20 @@ set(ARROW_SRCS
# ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/csv/reader.cc
${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/csv/writer.cc

# JSON
${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/json/converter.cc
${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/json/chunker.cc
${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/json/chunked_builder.cc
${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/json/object_parser.cc
${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/json/object_writer.cc
${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/json/options.cc
${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/json/parser.cc
# ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/json/reader.cc

# IPC
${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/ipc/dictionary.cc
${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/ipc/feather.cc
${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/ipc/json_simple.cc
${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/ipc/message.cc
${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/ipc/metadata_internal.cc
${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/ipc/options.cc
Expand All @@ -154,11 +165,11 @@ set(ARROW_SRCS
${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/compute/exec.cc
${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/compute/expression.cc
${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/compute/function.cc
${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/compute/function_internal.cc
${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/compute/function_internal.cc
${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/compute/kernel.cc
${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/compute/ordering.cc


# ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/compute/registry.cc
# ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/compute/kernels/aggregate_basic.cc
# ${CMAKE_BINARY_DIR}/arrow-src/cpp/src/arrow/compute/kernels/aggregate_mode.cc
Expand Down Expand Up @@ -239,7 +250,8 @@ else()
${ARROW_SRCS}
# Use our vendored reader that does not use threads.
${PSP_CPP_SRC}/src/cpp/vendor/single_threaded_reader.cpp
${PSP_CPP_SRC}/src/cpp/vendor/arrow_single_threaded_reader.cpp)
${PSP_CPP_SRC}/src/cpp/vendor/arrow_single_threaded_csv_reader.cpp
${PSP_CPP_SRC}/src/cpp/vendor/arrow_single_threaded_json_reader.cpp)
endif()

set_property(SOURCE util/io_util.cc
Expand Down
5 changes: 3 additions & 2 deletions cpp/perspective/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ set(SOURCE_FILES
${PSP_CPP_SRC}/src/cpp/aggregate.cpp
${PSP_CPP_SRC}/src/cpp/aggspec.cpp
${PSP_CPP_SRC}/src/cpp/arg_sort.cpp
${PSP_CPP_SRC}/src/cpp/arrow_format.cpp
${PSP_CPP_SRC}/src/cpp/arrow_loader.cpp
${PSP_CPP_SRC}/src/cpp/arrow_writer.cpp
${PSP_CPP_SRC}/src/cpp/base.cpp
Expand Down Expand Up @@ -501,15 +502,15 @@ set(SOURCE_FILES
${PSP_CPP_SRC}/src/cpp/view.cpp
${PSP_CPP_SRC}/src/cpp/view_config.cpp
${PSP_CPP_SRC}/src/cpp/vocab.cpp
${PSP_CPP_SRC}/src/cpp/arrow_csv.cpp
)

set(PYTHON_SOURCE_FILES ${SOURCE_FILES}
${PSP_PYTHON_SRC}/src/column.cpp
)

set(WASM_SOURCE_FILES ${SOURCE_FILES}
# ${PSP_CPP_SRC}/src/cpp/vendor/arrow_single_threaded_reader.cpp
# ${PSP_CPP_SRC}/src/cpp/vendor/arrow_single_threaded_csv_reader.cpp
# ${PSP_CPP_SRC}/src/cpp/vendor/arrow_single_threaded_json_reader.cpp
)

set(PYTHON_BINDING_SOURCE_FILES
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,21 @@
// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛

#include <perspective/base.h>
#include <perspective/arrow_csv.h>
#include <perspective/arrow_format.h>
#include <arrow/util/value_parsing.h>
#include <arrow/io/memory.h>
#include <rapidjson/document.h>
#include <rapidjson/writer.h>
#include <rapidjson/stringbuffer.h>

#ifdef PSP_ENABLE_WASM
// This causes build warnings
// https://github.com/emscripten-core/emscripten/issues/8574
#include <perspective/vendor/arrow_single_threaded_reader.h>
#include <perspective/vendor/arrow_single_threaded_csv_reader.h>
#include <perspective/vendor/arrow_single_threaded_json_reader.h>
#else
#include <arrow/csv/reader.h>
#include <arrow/json/reader.h>
#endif

template <class TimePoint>
Expand Down Expand Up @@ -431,5 +436,63 @@ namespace apachearrow {
return *maybe_table;
}

std::shared_ptr<::arrow::Table>
jsonToTable(std::string& json, bool is_update,
std::unordered_map<std::string, std::shared_ptr<arrow::DataType>>&
schema) {

// NOTE: Arrow only supports JSONL/NDJSON as of 12.0.0, so convert if needed
// This incurs some overhead, but in general it should still be better in C++
// than doing from host language.
if(json[0] == '[') {
rapidjson::Document document;
document.Parse(json.c_str());

if(!document.IsArray()) {
PSP_COMPLAIN_AND_ABORT("Unable to convert detected JSON array to JSONL");
}

std::string new_json;
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
auto arr = document.GetArray();
for (rapidjson::Value::ConstValueIterator itr = arr.Begin(); itr != arr.End(); ++itr) {
if(!itr->IsObject()) {
PSP_COMPLAIN_AND_ABORT("Unable to convert detected JSON array to JSONL - Values of JSON array must be objects");
}

itr->Accept(writer);
new_json += buffer.GetString();
new_json += "\n";
buffer.Clear();
}

json = new_json;
}

arrow::io::IOContext io_context = arrow::io::default_io_context();
auto input = std::make_shared<arrow::io::BufferReader>(json);
auto read_options = arrow::json::ReadOptions::Defaults();
auto parse_options = arrow::json::ParseOptions::Defaults();
// #ifdef PSP_PARALLEL_FOR
// read_options.use_threads = true;
// #else
// read_options.use_threads = false;
// #endif
read_options.use_threads = false;
parse_options.newlines_in_values = true;

auto maybe_reader = arrow::json::TableReader::Make(
io_context.pool(), input, read_options, parse_options);

std::shared_ptr<arrow::json::TableReader> reader = *maybe_reader;

auto maybe_table = reader->Read();
if (!maybe_table.ok()) {
PSP_COMPLAIN_AND_ABORT(maybe_table.status().ToString());
}
return *maybe_table;
}

} // namespace apachearrow
} // namespace perspective
15 changes: 15 additions & 0 deletions cpp/perspective/src/cpp/arrow_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,21 @@ namespace apachearrow {
}
}

void
ArrowLoader::init_json(std::string& json, bool is_update,
std::unordered_map<std::string, std::shared_ptr<arrow::DataType>>&
psp_schema) {
m_table = jsonToTable(json, is_update, psp_schema);

std::shared_ptr<arrow::Schema> schema = m_table->schema();
std::vector<std::shared_ptr<arrow::Field>> fields = schema->fields();

for (auto field : fields) {
m_names.push_back(field->name());
m_types.push_back(convert_type(field->type()->name()));
}
}

void
ArrowLoader::fill_table(t_data_table& tbl, const t_schema& input_schema,
const std::string& index, std::uint32_t offset, std::uint32_t limit,
Expand Down
7 changes: 4 additions & 3 deletions cpp/perspective/src/cpp/emscripten.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <perspective/arrow_loader.h>
#include <perspective/arrow_writer.h>
#include <arrow/csv/api.h>
#include <arrow/json/api.h>
#include <boost/optional.hpp>

using namespace emscripten;
Expand Down Expand Up @@ -1135,7 +1136,7 @@ namespace binding {
std::shared_ptr<Table>
make_table(t_val table, t_data_accessor accessor, std::uint32_t limit,
const std::string& index, t_op op, bool is_update, bool is_arrow,
bool is_csv, t_uindex port_id) {
bool is_csv_or_json, t_uindex port_id) {
bool table_initialized = has_value(table);
std::shared_ptr<t_pool> pool;
std::shared_ptr<Table> tbl;
Expand All @@ -1160,7 +1161,7 @@ namespace binding {
bool is_delete = op == OP_DELETE;

if (is_arrow && !is_delete) {
if (is_csv) {
if (is_csv_or_json) {
std::string s = accessor.as<std::string>();
auto map = std::unordered_map<std::string,
std::shared_ptr<arrow::DataType>>();
Expand Down Expand Up @@ -1355,7 +1356,7 @@ namespace binding {
is_update);
}

if (is_arrow && !is_csv) {
if (is_arrow && !is_csv_or_json) {
free((void*)ptr);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
* This file is part of the Perspective library, distributed under the terms of
* the Apache License 2.0. The full license can be found in the LICENSE file.
*
* Originally forked from
* Originally forked from
* https://github.com/apache/arrow/blob/apache-arrow-1.0.1/cpp/src/arrow/csv/reader.cc
* Currently using
* https://github.com/apache/arrow/blob/apache-arrow-12.0.0/cpp/src/arrow/csv/reader.cc
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
Expand All @@ -27,17 +29,17 @@
*/

/* * * * WARNING * * *
*
* This file and respective header is a fork of
* https://github.com/apache/arrow/blob/apache-arrow-1.0.1/cpp/src/arrow/csv/reader.cc
*
* This file and respective header is a fork of
* https://github.com/apache/arrow/blob/apache-arrow-12.0.0/cpp/src/arrow/csv/reader.cc
* which removes references to `std::thread` such that compilation under
* Emscripten is possible. It should not be modified directly.
*
*
* TODO Pending a better solution or upstream fix ..
*
*
*/

#include <perspective/vendor/arrow_single_threaded_reader.h>
#include <perspective/vendor/arrow_single_threaded_csv_reader.h>

#include <cstdint>
#include <cstring>
Expand Down
Loading

0 comments on commit 04030f5

Please sign in to comment.