Skip to content

Commit

Permalink
parquet (fix): Fix a bug in writing Map columns #3315 (#3342)
Browse files Browse the repository at this point in the history
- Reproduction of #3315
- Fix map column writer
  • Loading branch information
xerial authored Jan 18, 2024
1 parent e29dae0 commit 1ee3560
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,11 @@ class ParquetOptionWriter(parameterCodec: ParquetParameterWriter) extends Parque
v match {
case Some(x) =>
parameterCodec.write(recordConsumer, x)
case _ => // None or null
// Skip writing Optional parameter
case null | None =>
// Skip writing None or null parameter
case _ =>
// Write other values (e.g., Map binary)
parameterCodec.write(recordConsumer, v)
}
}

Expand Down Expand Up @@ -256,7 +259,7 @@ case class ParquetObjectWriter(paramWriters: Seq[ParquetFieldWriter], params: Se
}
}

object ParquetObjectWriter {
object ParquetObjectWriter extends LogSupport {
def buildFromSurface(surface: Surface, schema: MessageType): ParquetObjectWriter = {
val paramCodecs = surface.params.zip(schema.getFields.asScala).map { case (param, tpe) =>
// Resolve the element type X of Option[X], Seq[X], etc.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package wvlet.airframe.parquet

import wvlet.airframe.surface.Surface
import wvlet.airspec.AirSpec
import wvlet.log.io.IOUtil

import scala.util.Using

object MapColumnWriteTest extends AirSpec {

case class MapRecord(id: Int, m: Map[Int, Long])

test("write map column") {
val schema = Parquet.toParquetSchema(Surface.of[MapRecord])
debug(schema)

IOUtil.withTempFile("target/tmp-map-record", ".parquet") { file =>
val data = Seq(
MapRecord(1, Map(1 -> 10L, 2 -> 20L)),
MapRecord(2, Map(1 -> 10L, 2 -> 20L, 3 -> 30L))
)
Using.resource(Parquet.newWriter[MapRecord](file.getPath)) { writer =>
data.foreach(writer.write(_))
}

Using.resource(Parquet.newReader[MapRecord](file.getPath)) { reader =>
val r0 = reader.read()
debug(s"${r0}")
val r1 = reader.read()
debug(s"${r1}")
r0 shouldBe data(0)
r1 shouldBe data(1)
}
}
}

import scala.collection.immutable.ListMap
case class ListMapRecord(id: Int, m: ListMap[Int, Long])

test("write ListMap column") {
val schema = Parquet.toParquetSchema(Surface.of[ListMapRecord])
debug(schema)

IOUtil.withTempFile("target/tmp-list-map-record", ".parquet") { file =>
val data = Seq(
ListMapRecord(1, ListMap(1 -> 10L, 2 -> 20L)),
ListMapRecord(2, ListMap(1 -> 10L, 2 -> 20L, 3 -> 30L))
)
Using.resource(Parquet.newWriter[ListMapRecord](file.getPath)) { writer =>
data.foreach(writer.write(_))
}

Using.resource(Parquet.newReader[ListMapRecord](file.getPath)) { reader =>
val r0 = reader.read()
debug(s"${r0}")
val r1 = reader.read()
debug(s"${r1}")
r0 shouldBe data(0)
r1 shouldBe data(1)
}
}
}
}

0 comments on commit 1ee3560

Please sign in to comment.