diff --git a/data-source/src/main/scala/tech/ytsaurus/spyt/serializers/YtLogicalType.scala b/data-source/src/main/scala/tech/ytsaurus/spyt/serializers/YtLogicalType.scala index 84d6ed0..38f736c 100644 --- a/data-source/src/main/scala/tech/ytsaurus/spyt/serializers/YtLogicalType.scala +++ b/data-source/src/main/scala/tech/ytsaurus/spyt/serializers/YtLogicalType.scala @@ -75,7 +75,9 @@ sealed abstract class AtomicYtLogicalType(name: String, sealed trait CompositeYtLogicalType extends YtLogicalType { override def columnValueType: ColumnValueType = ColumnValueType.ANY - override def getName(isColumnType: Boolean): String = ColumnValueType.ANY.getName + override def getName(isColumnType: Boolean): String = throw new IllegalStateException( + "unable to serialize a complex type: use spark.yt.write.typeV3.enabled" + ) } sealed abstract class CompositeYtLogicalTypeAlias(name: String, diff --git a/data-source/src/test/scala/tech/ytsaurus/spyt/format/types/ComplexTypeTest.scala b/data-source/src/test/scala/tech/ytsaurus/spyt/format/types/ComplexTypeTest.scala index 8ecf52c..4c8a716 100644 --- a/data-source/src/test/scala/tech/ytsaurus/spyt/format/types/ComplexTypeTest.scala +++ b/data-source/src/test/scala/tech/ytsaurus/spyt/format/types/ComplexTypeTest.scala @@ -1,5 +1,6 @@ package tech.ytsaurus.spyt.format.types +import com.google.common.base.Throwables import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkException import org.apache.spark.sql.types._ @@ -469,30 +470,28 @@ class ComplexTypeTest extends FlatSpec with Matchers with LocalSpark with TmpDir res.select($"value.*").as[TestSmall].collect() should contain theSameElementsAs Seq(testRowSmall) } - it should "write dataset with complex types" in { + it should "not write dataset with complex types" in { import spark.implicits._ - Seq( - (Seq(1, 2, 3), A(1, Some("a")), Map("1" -> 0.1)), - (Seq(4, 5, 6), A(2, None), Map("2" -> 0.3)) - ) - .toDF("a", "b", "c").coalesce(1) - .write.mode(SaveMode.Overwrite) - .yt(tmpPath) - - val res = spark.read - .schemaHint( - "a" -> ArrayType(LongType), - "b" -> StructType(Seq(StructField("field1", LongType), StructField("field2", StringType))), - "c" -> MapType(StringType, DoubleType) + try { + Seq( + (Seq(1, 2, 3), A(1, Some("a")), Map("1" -> 0.1)), + (Seq(4, 5, 6), A(2, None), Map("2" -> 0.3)) ) - .yt(tmpPath) - - res.columns should contain theSameElementsAs Seq("a", "b", "c") - res.select("a", "b", "c").collect() should contain theSameElementsAs Seq( - Row(Seq(1, 2, 3), Row(1, "a"), Map("1" -> 0.1)), - Row(Seq(4, 5, 6), Row(2, null), Map("2" -> 0.3)) - ) + .toDF("a", "b", "c").coalesce(1) + .write.mode(SaveMode.Overwrite) + .yt(tmpPath) + fail("was supposed to fail") + } catch { + case t: Throwable => + val cause = Throwables.getRootCause(t) + if ( + !cause.isInstanceOf[IllegalStateException] + || cause.asInstanceOf[IllegalStateException].getMessage != "unable to serialize a complex type: use spark.yt.write.typeV3.enabled" + ) { + throw t + } + } } it should "sort map data while writing" in { @@ -528,16 +527,19 @@ class ComplexTypeTest extends FlatSpec with Matchers with LocalSpark with TmpDir ) val df = data.toDF("map1", "map2") - df.write.yt(tmpPath) - - val res = spark.read - .yt(tmpPath) - .select('map1.cast(BinaryType), 'map2.cast(BinaryType)) - .as[(Option[Array[Byte]], Option[Array[Byte]])] - .collect() - .map{case (x,y) => x.map(_.toList) -> y.map(_.toList)} - - res should contain theSameElementsAs binaryData + try { + df.write.yt(tmpPath) + fail("was supposed to fail") + } catch { + case t: Throwable => + val cause = Throwables.getRootCause(t) + if ( + !cause.isInstanceOf[IllegalStateException] + || cause.asInstanceOf[IllegalStateException].getMessage != "unable to serialize a complex type: use spark.yt.write.typeV3.enabled" + ) { + throw t + } + } } }