From 74d6bae3b518c64125797ca0ff8e9b81650010c0 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Fri, 25 Mar 2016 22:32:42 -0700 Subject: [PATCH] Convert BYTE_ARRAY to StringType or List depending on the logical type --- cpp/src/arrow/parquet/CMakeLists.txt | 8 +- cpp/src/arrow/parquet/parquet-schema-test.cc | 49 +++++++++-- cpp/src/arrow/parquet/schema.cc | 90 ++++++++++++++------ cpp/src/arrow/parquet/schema.h | 20 +++-- 4 files changed, 124 insertions(+), 43 deletions(-) diff --git a/cpp/src/arrow/parquet/CMakeLists.txt b/cpp/src/arrow/parquet/CMakeLists.txt index f9479900bb135..0d5cf263ec3e2 100644 --- a/cpp/src/arrow/parquet/CMakeLists.txt +++ b/cpp/src/arrow/parquet/CMakeLists.txt @@ -19,15 +19,15 @@ # arrow_parquet : Arrow <-> Parquet adapter set(PARQUET_SRCS - schema.cc + schema.cc ) set(PARQUET_LIBS - arrow - ${PARQUET_SHARED_LIB} + arrow + ${PARQUET_SHARED_LIB} ) -add_library(arrow_parquet STATIC +add_library(arrow_parquet SHARED ${PARQUET_SRCS} ) target_link_libraries(arrow_parquet ${PARQUET_LIBS}) diff --git a/cpp/src/arrow/parquet/parquet-schema-test.cc b/cpp/src/arrow/parquet/parquet-schema-test.cc index e562df9c57704..4debb2964edb3 100644 --- a/cpp/src/arrow/parquet/parquet-schema-test.cc +++ b/cpp/src/arrow/parquet/parquet-schema-test.cc @@ -19,6 +19,9 @@ #include "arrow/parquet/schema.h" +#include "arrow/test-util.h" +#include "arrow/util/status.h" + namespace arrow { namespace parquet { @@ -28,21 +31,24 @@ using parquet_cpp::schema::NodePtr; using parquet_cpp::schema::PrimitiveNode; TEST(TestNodeConversion, Primitive) { + std::shared_ptr field; + NodePtr node = PrimitiveNode::Make("boolean", Repetition::REQUIRED, parquet_cpp::Type::BOOLEAN); - std::shared_ptr field = NodeToField(node); + + ASSERT_OK(NodeToField(node, &field)); ASSERT_EQ(field->name, "boolean"); ASSERT_TRUE(field->type->Equals(std::make_shared())); ASSERT_FALSE(field->nullable); node = PrimitiveNode::Make("int32", Repetition::REQUIRED, parquet_cpp::Type::INT32); - field = NodeToField(node); + ASSERT_OK(NodeToField(node, &field)); ASSERT_EQ(field->name, "int32"); ASSERT_TRUE(field->type->Equals(std::make_shared())); ASSERT_FALSE(field->nullable); node = PrimitiveNode::Make("int64", Repetition::REQUIRED, parquet_cpp::Type::INT64); - field = NodeToField(node); + ASSERT_OK(NodeToField(node, &field)); ASSERT_EQ(field->name, "int64"); ASSERT_TRUE(field->type->Equals(std::make_shared())); ASSERT_FALSE(field->nullable); @@ -55,14 +61,14 @@ TEST(TestNodeConversion, Primitive) { // case parquet_cpp::Type::FLOAT: node = PrimitiveNode::Make("float", Repetition::REQUIRED, parquet_cpp::Type::FLOAT); - field = NodeToField(node); + ASSERT_OK(NodeToField(node, &field)); ASSERT_EQ(field->name, "float"); ASSERT_TRUE(field->type->Equals(std::make_shared())); ASSERT_FALSE(field->nullable); // case parquet_cpp::Type::DOUBLE: node = PrimitiveNode::Make("double", Repetition::REQUIRED, parquet_cpp::Type::DOUBLE); - field = NodeToField(node); + ASSERT_OK(NodeToField(node, &field)); ASSERT_EQ(field->name, "double"); ASSERT_TRUE(field->type->Equals(std::make_shared())); ASSERT_FALSE(field->nullable); @@ -80,6 +86,39 @@ TEST(TestNodeConversion, Primitive) { // TODO: Assertions } +const auto UINT8 = std::make_shared(); + +TEST(TestNodeConversion, Int96Timestamp) { +} + +TEST(TestNodeConversion, ByteArray) { + std::shared_ptr field; + + NodePtr node = PrimitiveNode::Make("field0", Repetition::OPTIONAL, + parquet_cpp::Type::BYTE_ARRAY); + ASSERT_OK(NodeToField(node, &field)); + + std::shared_ptr ex_type = std::make_shared( + std::make_shared("", UINT8)); + + ASSERT_EQ(field->name, "field0"); + ASSERT_TRUE(field->type->Equals(ex_type)); + ASSERT_TRUE(field->nullable); + + node = PrimitiveNode::Make("field1", Repetition::OPTIONAL, + parquet_cpp::Type::BYTE_ARRAY, + parquet_cpp::LogicalType::UTF8); + ASSERT_OK(NodeToField(node, &field)); + ex_type = std::make_shared(); + + ASSERT_EQ(field->name, "field1"); + ASSERT_TRUE(field->type->Equals(ex_type)); + ASSERT_TRUE(field->nullable); +} + +TEST(TestNodeConversion, FixedLenByteArray) { +} + TEST(TestNodeConversion, Logical) { } diff --git a/cpp/src/arrow/parquet/schema.cc b/cpp/src/arrow/parquet/schema.cc index 3ac169d2fe1dc..63f7e47bf73bc 100644 --- a/cpp/src/arrow/parquet/schema.cc +++ b/cpp/src/arrow/parquet/schema.cc @@ -15,9 +15,13 @@ // specific language governing permissions and limitations // under the License. +#include "arrow/parquet/schema.h" + #include -#include "arrow/parquet/schema.h" +#include "parquet/api/schema.h" + +#include "arrow/util/status.h" #include "arrow/types/decimal.h" using parquet_cpp::schema::Node; @@ -25,65 +29,95 @@ using parquet_cpp::schema::NodePtr; using parquet_cpp::schema::GroupNode; using parquet_cpp::schema::PrimitiveNode; +using parquet_cpp::LogicalType; + namespace arrow { namespace parquet { +const auto BOOL = std::make_shared(); +const auto UINT8 = std::make_shared(); +const auto INT32 = std::make_shared(); +const auto INT64 = std::make_shared(); +const auto FLOAT = std::make_shared(); +const auto DOUBLE = std::make_shared(); +const auto UTF8 = std::make_shared(); +const auto BINARY = std::make_shared( + std::make_shared("", UINT8)); TypePtr MakeDecimalType(const PrimitiveNode* node) { int precision = node->decimal_metadata().precision; int scale = node->decimal_metadata().scale; - return TypePtr(new DecimalType(precision, scale)); + return std::make_shared(precision, scale); +} + +static Status FromByteArray(const PrimitiveNode* node, TypePtr* out) { + switch (node->logical_type()) { + case LogicalType::UTF8: + *out = UTF8; + break; + default: + // BINARY + *out = BINARY; + break; + } + return Status::OK(); +} + +static Status FromFLBA(const PrimitiveNode* node, TypePtr* out) { + switch (node->logical_type()) { + case LogicalType::DECIMAL: + *out = MakeDecimalType(node); + break; + default: + return Status::NotImplemented("unhandled type"); + break; + } + + return Status::OK(); } // TODO: Logical Type Handling -std::shared_ptr NodeToField(const NodePtr& node) { +Status NodeToField(const NodePtr& node, std::shared_ptr* out) { TypePtr type; if (node->is_group()) { const GroupNode* group = static_cast(node.get()); - std::vector> fields; + std::vector> fields(group->field_count()); for (int i = 0; i < group->field_count(); i++) { - fields.push_back(NodeToField(group->field(i))); + RETURN_NOT_OK(NodeToField(group->field(i), &fields[i])); } - type = TypePtr(new StructType(fields)); + type = std::make_shared(fields); } else { // Primitive (leaf) node const PrimitiveNode* primitive = static_cast(node.get()); switch (primitive->physical_type()) { case parquet_cpp::Type::BOOLEAN: - type = TypePtr(new BooleanType()); + type = BOOL; break; case parquet_cpp::Type::INT32: - type = TypePtr(new Int32Type()); + type = INT32; break; case parquet_cpp::Type::INT64: - type = TypePtr(new Int64Type()); + type = INT64; break; case parquet_cpp::Type::INT96: // TODO: Do we have that type in Arrow? // type = TypePtr(new Int96Type()); - break; + return Status::NotImplemented("int96"); case parquet_cpp::Type::FLOAT: - type = TypePtr(new FloatType()); + type = FLOAT; break; case parquet_cpp::Type::DOUBLE: - type = TypePtr(new DoubleType()); + type = DOUBLE; break; case parquet_cpp::Type::BYTE_ARRAY: // TODO: Do we have that type in Arrow? - // type = TypePtr(new Int96Type()); + RETURN_NOT_OK(FromByteArray(primitive, &type)); break; case parquet_cpp::Type::FIXED_LEN_BYTE_ARRAY: - switch (primitive->logical_type()) { - case parquet_cpp::LogicalType::DECIMAL: - type = MakeDecimalType(primitive); - break; - default: - // TODO: Do we have that type in Arrow? - break; - } + RETURN_NOT_OK(FromFLBA(primitive, &type)); break; } } @@ -92,21 +126,25 @@ std::shared_ptr NodeToField(const NodePtr& node) { type = TypePtr(new ListType(type)); } - return std::shared_ptr(new Field(node->name(), type, !node->is_required())); + *out = std::make_shared(node->name(), type, !node->is_required()); + + return Status::OK(); } -std::shared_ptr FromParquetSchema( - const parquet_cpp::SchemaDescriptor* parquet_schema) { +Status FromParquetSchema(const parquet_cpp::SchemaDescriptor* parquet_schema, + std::shared_ptr* out) { std::vector> fields; const GroupNode* schema_node = static_cast( parquet_schema->schema().get()); // TODO: What to with the head node? + fields.resize(schema_node->field_count()); for (int i = 0; i < schema_node->field_count(); i++) { - fields.push_back(NodeToField(schema_node->field(i))); + RETURN_NOT_OK(NodeToField(schema_node->field(i), &fields[i])); } - return std::shared_ptr(new Schema(fields)); + *out = std::make_shared(fields); + return Status::OK(); } } // namespace parquet diff --git a/cpp/src/arrow/parquet/schema.h b/cpp/src/arrow/parquet/schema.h index 0071681656b8c..61de193a33877 100644 --- a/cpp/src/arrow/parquet/schema.h +++ b/cpp/src/arrow/parquet/schema.h @@ -18,20 +18,24 @@ #ifndef ARROW_PARQUET_SCHEMA_H #define ARROW_PARQUET_SCHEMA_H -#include -#include -#include -#include - #include +#include "parquet/api/schema.h" + +#include "arrow/schema.h" +#include "arrow/type.h" + namespace arrow { +class Status; + namespace parquet { -std::shared_ptr NodeToField(const parquet_cpp::schema::NodePtr& node); -std::shared_ptr FromParquetSchema( - const parquet_cpp::SchemaDescriptor* parquet_schema); +Status NodeToField(const parquet_cpp::schema::NodePtr& node, + std::shared_ptr* out); + +Status FromParquetSchema(const parquet_cpp::SchemaDescriptor* parquet_schema, + std::shared_ptr* out); } // namespace parquet