Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-32538: [C++][Parquet] Add JSON canonical extension type #13901

Merged
merged 26 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ set(ARROW_SRCS
device_allocation_type_set.cc
extension_type.cc
extension/bool8.cc
extension/json.cc
extension/uuid.cc
pretty_print.cc
record_batch.cc
Expand Down
20 changes: 16 additions & 4 deletions cpp/src/arrow/array/validate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -985,10 +985,22 @@ Status ValidateArrayFull(const Array& array) { return ValidateArrayFull(*array.d

ARROW_EXPORT
Status ValidateUTF8(const ArrayData& data) {
DCHECK(data.type->id() == Type::STRING || data.type->id() == Type::STRING_VIEW ||
data.type->id() == Type::LARGE_STRING);
UTF8DataValidator validator{data};
return VisitTypeInline(*data.type, &validator);
const auto& storage_type =
(data.type->id() == Type::EXTENSION)
? checked_cast<const ExtensionType&>(*data.type).storage_type()
: data.type;
DCHECK(storage_type->id() == Type::STRING || storage_type->id() == Type::STRING_VIEW ||
storage_type->id() == Type::LARGE_STRING);

if (data.type->id() == Type::EXTENSION) {
rok marked this conversation as resolved.
Show resolved Hide resolved
ArrayData ext_data(data);
ext_data.type = storage_type;
UTF8DataValidator validator{ext_data};
return VisitTypeInline(*storage_type, &validator);
} else {
UTF8DataValidator validator{data};
return VisitTypeInline(*storage_type, &validator);
}
}

ARROW_EXPORT
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/extension/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.

set(CANONICAL_EXTENSION_TESTS bool8_test.cc uuid_test.cc)
set(CANONICAL_EXTENSION_TESTS bool8_test.cc json_test.cc uuid_test.cc)

if(ARROW_JSON)
list(APPEND CANONICAL_EXTENSION_TESTS fixed_shape_tensor_test.cc opaque_test.cc)
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/arrow/extension/fixed_shape_tensor_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ TEST_F(TestExtensionType, RoundtripBatch) {
std::shared_ptr<RecordBatch> read_batch;
auto ext_field = field(/*name=*/"f0", /*type=*/ext_type_);
auto batch = RecordBatch::Make(schema({ext_field}), ext_arr->length(), {ext_arr});
RoundtripBatch(batch, &read_batch);
ASSERT_OK(RoundtripBatch(batch, &read_batch));
CompareBatch(*batch, *read_batch, /*compare_metadata=*/true);

// Pass extension metadata and storage array, expect getting back extension array
Expand All @@ -216,7 +216,7 @@ TEST_F(TestExtensionType, RoundtripBatch) {
ext_field = field(/*name=*/"f0", /*type=*/element_type_, /*nullable=*/true,
/*metadata=*/ext_metadata);
auto batch2 = RecordBatch::Make(schema({ext_field}), fsla_arr->length(), {fsla_arr});
RoundtripBatch(batch2, &read_batch2);
ASSERT_OK(RoundtripBatch(batch2, &read_batch2));
CompareBatch(*batch, *read_batch2, /*compare_metadata=*/true);
}

Expand Down Expand Up @@ -469,7 +469,7 @@ TEST_F(TestExtensionType, RoundtripBatchFromTensor) {
auto ext_field = field("f0", ext_type_, true, ext_metadata);
auto batch = RecordBatch::Make(schema({ext_field}), ext_arr->length(), {ext_arr});
std::shared_ptr<RecordBatch> read_batch;
RoundtripBatch(batch, &read_batch);
ASSERT_OK(RoundtripBatch(batch, &read_batch));
CompareBatch(*batch, *read_batch, /*compare_metadata=*/true);
}

Expand Down
61 changes: 61 additions & 0 deletions cpp/src/arrow/extension/json.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "arrow/extension/json.h"

#include <string>

#include "arrow/extension_type.h"
#include "arrow/result.h"
#include "arrow/status.h"
#include "arrow/type_fwd.h"
#include "arrow/util/logging.h"

namespace arrow::extension {

bool JsonExtensionType::ExtensionEquals(const ExtensionType& other) const {
return other.extension_name() == this->extension_name();
}
Comment on lines +30 to +32
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This equality check does not take into account the storage type, but only the name.

As a consequence, a JsonExtensionType<string> type will be seen as equal to JsonExtensionType<large_string>. Was that intentional?

While from a user point of view, it certainly makes sense to have those seen as equal, but the same is true for string vs large_string itself. And in general in Arrow C++, the types are concrete types where variants of the same "logical" type (eg string vs large_string) are not seen as equal. So should the same logic be followed here?

I assume that such type equality will for example be used to check if schemas are equal to see if a set of batches can be concatenated or written to the same IPC stream, etc, and for those cases we require exact equality?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, that's certainly a bug. Sorry for not spotting this, and feel free to submit a fix :-)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I suppose I missed that when switching from string only to it being a parametric type. I can make a fix later today if no one started on it yet.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't start yet


Result<std::shared_ptr<DataType>> JsonExtensionType::Deserialize(
std::shared_ptr<DataType> storage_type, const std::string& serialized) const {
if (storage_type->id() != Type::STRING && storage_type->id() != Type::STRING_VIEW &&
storage_type->id() != Type::LARGE_STRING) {
return Status::Invalid("Invalid storage type for JsonExtensionType: ",
storage_type->ToString());
}
return std::make_shared<JsonExtensionType>(storage_type);
}

std::string JsonExtensionType::Serialize() const { return ""; }

std::shared_ptr<Array> JsonExtensionType::MakeArray(
std::shared_ptr<ArrayData> data) const {
DCHECK_EQ(data->type->id(), Type::EXTENSION);
DCHECK_EQ("arrow.json",
internal::checked_cast<const ExtensionType&>(*data->type).extension_name());
return std::make_shared<ExtensionArray>(data);
}

std::shared_ptr<DataType> json(const std::shared_ptr<DataType> storage_type) {
ARROW_CHECK(storage_type->id() != Type::STRING ||
storage_type->id() != Type::STRING_VIEW ||
storage_type->id() != Type::LARGE_STRING);
Comment on lines +55 to +57
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is not correct also.

return std::make_shared<JsonExtensionType>(storage_type);
}

} // namespace arrow::extension
56 changes: 56 additions & 0 deletions cpp/src/arrow/extension/json.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <stdexcept>
#include <string>

#include "arrow/extension_type.h"
#include "arrow/result.h"
#include "arrow/type_fwd.h"
#include "arrow/util/visibility.h"

namespace arrow::extension {

/// \brief Concrete type class for variable-size JSON data, utf8-encoded.
class ARROW_EXPORT JsonExtensionType : public ExtensionType {
public:
explicit JsonExtensionType(const std::shared_ptr<DataType>& storage_type)
: ExtensionType(storage_type), storage_type_(storage_type) {}

std::string extension_name() const override { return "arrow.json"; }

bool ExtensionEquals(const ExtensionType& other) const override;

Result<std::shared_ptr<DataType>> Deserialize(
std::shared_ptr<DataType> storage_type,
const std::string& serialized_data) const override;

std::string Serialize() const override;

std::shared_ptr<Array> MakeArray(std::shared_ptr<ArrayData> data) const override;

private:
std::shared_ptr<DataType> storage_type_;
};

/// \brief Return a JsonExtensionType instance.
ARROW_EXPORT std::shared_ptr<DataType> json(
std::shared_ptr<DataType> storage_type = utf8());

} // namespace arrow::extension
83 changes: 83 additions & 0 deletions cpp/src/arrow/extension/json_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "arrow/extension/json.h"

#include "arrow/array/validate.h"
#include "arrow/ipc/test_common.h"
#include "arrow/record_batch.h"
#include "arrow/testing/gtest_util.h"
#include "parquet/exception.h"

namespace arrow {

using arrow::ipc::test::RoundtripBatch;
using extension::json;

class TestJsonExtensionType : public ::testing::Test {};

std::shared_ptr<Array> ExampleJson(const std::shared_ptr<DataType>& storage_type) {
std::shared_ptr<Array> arr = ArrayFromJSON(storage_type, R"([
"null",
"1234",
"3.14159",
"true",
"false",
"\"a json string\"",
"[\"a\", \"json\", \"array\"]",
"{\"obj\": \"a simple json object\"}"
])");
return ExtensionType::WrapArray(arrow::extension::json(storage_type), arr);
}

TEST_F(TestJsonExtensionType, JsonRoundtrip) {
for (const auto& storage_type : {utf8(), large_utf8(), utf8_view()}) {
std::shared_ptr<Array> ext_arr = ExampleJson(storage_type);
auto batch =
RecordBatch::Make(schema({field("f0", json(storage_type))}), 8, {ext_arr});

std::shared_ptr<RecordBatch> read_batch;
ASSERT_OK(RoundtripBatch(batch, &read_batch));
ASSERT_OK(read_batch->ValidateFull());
CompareBatch(*batch, *read_batch, /*compare_metadata*/ true);

auto read_ext_arr = read_batch->column(0);
ASSERT_OK(internal::ValidateUTF8(*read_ext_arr));
ASSERT_OK(read_ext_arr->ValidateFull());
}
}

TEST_F(TestJsonExtensionType, InvalidUTF8) {
for (const auto& storage_type : {utf8(), large_utf8(), utf8_view()}) {
auto json_type = json(storage_type);
auto invalid_input = ArrayFromJSON(storage_type, "[\"Ⱥa\xFFⱭ\", \"Ɽ\xe1\xbdⱤaA\"]");
auto ext_arr = ExtensionType::WrapArray(json_type, invalid_input);

ASSERT_RAISES_WITH_MESSAGE(Invalid,
"Invalid: Invalid UTF8 sequence at string index 0",
ext_arr->ValidateFull());
ASSERT_RAISES_WITH_MESSAGE(Invalid,
"Invalid: Invalid UTF8 sequence at string index 0",
arrow::internal::ValidateUTF8(*ext_arr));

auto batch = RecordBatch::Make(schema({field("f0", json_type)}), 2, {ext_arr});
std::shared_ptr<RecordBatch> read_batch;
ASSERT_OK(RoundtripBatch(batch, &read_batch));
}
}
rok marked this conversation as resolved.
Show resolved Hide resolved

} // namespace arrow
4 changes: 2 additions & 2 deletions cpp/src/arrow/extension/uuid_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ TEST(TestUuuidExtensionType, RoundtripBatch) {
std::shared_ptr<RecordBatch> read_batch;
auto ext_field = field(/*name=*/"f0", /*type=*/ext_type);
auto batch = RecordBatch::Make(schema({ext_field}), ext_arr->length(), {ext_arr});
RoundtripBatch(batch, &read_batch);
ASSERT_OK(RoundtripBatch(batch, &read_batch));
CompareBatch(*batch, *read_batch, /*compare_metadata=*/true);

// Pass extension metadata and storage array, expect getting back extension array
Expand All @@ -65,7 +65,7 @@ TEST(TestUuuidExtensionType, RoundtripBatch) {
ext_field = field(/*name=*/"f0", /*type=*/exact_ext_type->storage_type(),
/*nullable=*/true, /*metadata=*/ext_metadata);
auto batch2 = RecordBatch::Make(schema({ext_field}), arr->length(), {arr});
RoundtripBatch(batch2, &read_batch2);
ASSERT_OK(RoundtripBatch(batch2, &read_batch2));
CompareBatch(*batch, *read_batch2, /*compare_metadata=*/true);
}

Expand Down
4 changes: 3 additions & 1 deletion cpp/src/arrow/extension_type.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
# include "arrow/extension/fixed_shape_tensor.h"
# include "arrow/extension/opaque.h"
#endif
#include "arrow/extension/json.h"
#include "arrow/extension/uuid.h"
#include "arrow/status.h"
#include "arrow/type.h"
Expand Down Expand Up @@ -148,7 +149,8 @@ static void CreateGlobalRegistry() {
// Register canonical extension types

g_registry = std::make_shared<ExtensionTypeRegistryImpl>();
std::vector<std::shared_ptr<DataType>> ext_types{extension::bool8(), extension::uuid()};
std::vector<std::shared_ptr<DataType>> ext_types{extension::bool8(), extension::json(),
extension::uuid()};

#ifdef ARROW_JSON
ext_types.push_back(extension::fixed_shape_tensor(int64(), {}));
rok marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/arrow/extension_type_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -219,14 +219,14 @@ TEST_F(TestExtensionType, IpcRoundtrip) {
auto batch = RecordBatch::Make(schema({field("f0", uuid())}), 4, {ext_arr});

std::shared_ptr<RecordBatch> read_batch;
RoundtripBatch(batch, &read_batch);
ASSERT_OK(RoundtripBatch(batch, &read_batch));
CompareBatch(*batch, *read_batch, false /* compare_metadata */);

// Wrap type in a ListArray and ensure it also makes it
auto offsets_arr = ArrayFromJSON(int32(), "[0, 0, 2, 4]");
ASSERT_OK_AND_ASSIGN(auto list_arr, ListArray::FromArrays(*offsets_arr, *ext_arr));
batch = RecordBatch::Make(schema({field("f0", list(uuid()))}), 3, {list_arr});
RoundtripBatch(batch, &read_batch);
ASSERT_OK(RoundtripBatch(batch, &read_batch));
CompareBatch(*batch, *read_batch, false /* compare_metadata */);
}

Expand Down Expand Up @@ -289,7 +289,7 @@ TEST_F(TestExtensionType, ParametricTypes) {
4, {p1, p2, p3, p4});

std::shared_ptr<RecordBatch> read_batch;
RoundtripBatch(batch, &read_batch);
ASSERT_OK(RoundtripBatch(batch, &read_batch));
CompareBatch(*batch, *read_batch, false /* compare_metadata */);
}

Expand Down
17 changes: 9 additions & 8 deletions cpp/src/arrow/ipc/test_common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1236,18 +1236,19 @@ Status MakeRandomTensor(const std::shared_ptr<DataType>& type,
return Tensor::Make(type, buf, shape, strides).Value(out);
}

void RoundtripBatch(const std::shared_ptr<RecordBatch>& batch,
std::shared_ptr<RecordBatch>* out) {
ASSERT_OK_AND_ASSIGN(auto out_stream, io::BufferOutputStream::Create());
ASSERT_OK(ipc::WriteRecordBatchStream({batch}, ipc::IpcWriteOptions::Defaults(),
out_stream.get()));
Status RoundtripBatch(const std::shared_ptr<RecordBatch>& batch,
std::shared_ptr<RecordBatch>* out) {
ARROW_ASSIGN_OR_RAISE(auto out_stream, io::BufferOutputStream::Create());
RETURN_NOT_OK(ipc::WriteRecordBatchStream({batch}, ipc::IpcWriteOptions::Defaults(),
out_stream.get()));

ASSERT_OK_AND_ASSIGN(auto complete_ipc_stream, out_stream->Finish());
ARROW_ASSIGN_OR_RAISE(auto complete_ipc_stream, out_stream->Finish());

io::BufferReader reader(complete_ipc_stream);
std::shared_ptr<RecordBatchReader> batch_reader;
ASSERT_OK_AND_ASSIGN(batch_reader, ipc::RecordBatchStreamReader::Open(&reader));
ASSERT_OK(batch_reader->ReadNext(out));
ARROW_ASSIGN_OR_RAISE(batch_reader, ipc::RecordBatchStreamReader::Open(&reader));
RETURN_NOT_OK(batch_reader->ReadNext(out));
return Status::OK();
}

} // namespace test
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/ipc/test_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,8 @@ Status MakeRandomTensor(const std::shared_ptr<DataType>& type,
const std::vector<int64_t>& shape, bool row_major_p,
std::shared_ptr<Tensor>* out, uint32_t seed = 0);

ARROW_TESTING_EXPORT void RoundtripBatch(const std::shared_ptr<RecordBatch>& batch,
std::shared_ptr<RecordBatch>* out);
ARROW_TESTING_EXPORT Status RoundtripBatch(const std::shared_ptr<RecordBatch>& batch,
std::shared_ptr<RecordBatch>* out);

} // namespace test
} // namespace ipc
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/testing/gtest_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
#include "arrow/buffer.h"
#include "arrow/compute/api_vector.h"
#include "arrow/datum.h"
#include "arrow/extension/json.h"
#include "arrow/io/memory.h"
#include "arrow/ipc/json_simple.h"
#include "arrow/ipc/reader.h"
Expand Down
Loading
Loading