From 1ee3560e2f5fb176faf891082ecb346a005f59f9 Mon Sep 17 00:00:00 2001 From: "Taro L. Saito" Date: Wed, 17 Jan 2024 16:12:19 -0800 Subject: [PATCH] parquet (fix): Fix a bug in writing Map columns #3315 (#3342) - Reproduction of #3315 - Fix map column writer --- .../parquet/ParquetObjectWriter.scala | 9 ++- .../airframe/parquet/MapColumnWriteTest.scala | 76 +++++++++++++++++++ 2 files changed, 82 insertions(+), 3 deletions(-) create mode 100644 airframe-parquet/src/test/scala/wvlet/airframe/parquet/MapColumnWriteTest.scala diff --git a/airframe-parquet/src/main/scala/wvlet/airframe/parquet/ParquetObjectWriter.scala b/airframe-parquet/src/main/scala/wvlet/airframe/parquet/ParquetObjectWriter.scala index 5ac0660abf..6c08ba763d 100644 --- a/airframe-parquet/src/main/scala/wvlet/airframe/parquet/ParquetObjectWriter.scala +++ b/airframe-parquet/src/main/scala/wvlet/airframe/parquet/ParquetObjectWriter.scala @@ -82,8 +82,11 @@ class ParquetOptionWriter(parameterCodec: ParquetParameterWriter) extends Parque v match { case Some(x) => parameterCodec.write(recordConsumer, x) - case _ => // None or null - // Skip writing Optional parameter + case null | None => + // Skip writing None or null parameter + case _ => + // Write other values (e.g., Map binary) + parameterCodec.write(recordConsumer, v) } } @@ -256,7 +259,7 @@ case class ParquetObjectWriter(paramWriters: Seq[ParquetFieldWriter], params: Se } } -object ParquetObjectWriter { +object ParquetObjectWriter extends LogSupport { def buildFromSurface(surface: Surface, schema: MessageType): ParquetObjectWriter = { val paramCodecs = surface.params.zip(schema.getFields.asScala).map { case (param, tpe) => // Resolve the element type X of Option[X], Seq[X], etc. diff --git a/airframe-parquet/src/test/scala/wvlet/airframe/parquet/MapColumnWriteTest.scala b/airframe-parquet/src/test/scala/wvlet/airframe/parquet/MapColumnWriteTest.scala new file mode 100644 index 0000000000..e4d4a0491a --- /dev/null +++ b/airframe-parquet/src/test/scala/wvlet/airframe/parquet/MapColumnWriteTest.scala @@ -0,0 +1,76 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package wvlet.airframe.parquet + +import wvlet.airframe.surface.Surface +import wvlet.airspec.AirSpec +import wvlet.log.io.IOUtil + +import scala.util.Using + +object MapColumnWriteTest extends AirSpec { + + case class MapRecord(id: Int, m: Map[Int, Long]) + + test("write map column") { + val schema = Parquet.toParquetSchema(Surface.of[MapRecord]) + debug(schema) + + IOUtil.withTempFile("target/tmp-map-record", ".parquet") { file => + val data = Seq( + MapRecord(1, Map(1 -> 10L, 2 -> 20L)), + MapRecord(2, Map(1 -> 10L, 2 -> 20L, 3 -> 30L)) + ) + Using.resource(Parquet.newWriter[MapRecord](file.getPath)) { writer => + data.foreach(writer.write(_)) + } + + Using.resource(Parquet.newReader[MapRecord](file.getPath)) { reader => + val r0 = reader.read() + debug(s"${r0}") + val r1 = reader.read() + debug(s"${r1}") + r0 shouldBe data(0) + r1 shouldBe data(1) + } + } + } + + import scala.collection.immutable.ListMap + case class ListMapRecord(id: Int, m: ListMap[Int, Long]) + + test("write ListMap column") { + val schema = Parquet.toParquetSchema(Surface.of[ListMapRecord]) + debug(schema) + + IOUtil.withTempFile("target/tmp-list-map-record", ".parquet") { file => + val data = Seq( + ListMapRecord(1, ListMap(1 -> 10L, 2 -> 20L)), + ListMapRecord(2, ListMap(1 -> 10L, 2 -> 20L, 3 -> 30L)) + ) + Using.resource(Parquet.newWriter[ListMapRecord](file.getPath)) { writer => + data.foreach(writer.write(_)) + } + + Using.resource(Parquet.newReader[ListMapRecord](file.getPath)) { reader => + val r0 = reader.read() + debug(s"${r0}") + val r1 = reader.read() + debug(s"${r1}") + r0 shouldBe data(0) + r1 shouldBe data(1) + } + } + } +}