From 632835e2948a2476b5c4837e3858da214c19ac05 Mon Sep 17 00:00:00 2001 From: Fabian Meiswinkel Date: Wed, 25 May 2022 19:30:41 +0000 Subject: [PATCH] Adding optional fallback to encode java.sql.Date as EpochMs (#29081) * Cosmos Spark: Allow disabling endpoint rediscovery * Change log and config reference * Adding optional fallback to encode java.sql.Date as EpochMs * Added changelog * Reacted to code review feedback --- .../azure-cosmos-spark_3-1_2-12/CHANGELOG.md | 1 + .../azure-cosmos-spark_3-2_2-12/CHANGELOG.md | 1 + .../docs/configuration-reference.md | 4 +- .../com/azure/cosmos/spark/CosmosConfig.scala | 36 +++++- .../cosmos/spark/CosmosRowConverter.scala | 111 ++++++++++++++++-- .../cosmos/spark/CosmosRowConverterSpec.scala | 93 ++++++++++++++- ...ansientIOErrorsRetryingIteratorITest.scala | 11 +- ...ransientIOErrorsRetryingIteratorSpec.scala | 6 +- 8 files changed, 240 insertions(+), 23 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md index bc804ff13e170..498a6cd43e8cc 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md @@ -4,6 +4,7 @@ #### Features Added * Added ability to disable endpoint rediscovery when using custom domain names in combination with private endpoints from a custom (on-premise) Spark environment (neither Databricks nor Synapse). - See [PR 29027](https://github.com/Azure/azure-sdk-for-java/pull/29027) +* Added a config option `spark.cosmos.serialization.dateTimeConversionMode` to allow changing date/time conversion to fall back to converting `java.sql.Date` and `java.sql.Tiemstamp` into Epoch Milliseconds like in the Cosmos DB Connector for Spark 2.4 - See [PR 29081](https://github.com/Azure/azure-sdk-for-java/pull/29081) #### Breaking Changes diff --git a/sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md index 9bc71b012597e..1fc2c31031d2c 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md @@ -4,6 +4,7 @@ #### Features Added * Added ability to disable endpoint rediscovery when using custom domain names in combination with private endpoints from a custom (on-premise) Spark environment (neither Databricks nor Synapse). - See [PR 29027](https://github.com/Azure/azure-sdk-for-java/pull/29027) +* Added a config option `spark.cosmos.serialization.dateTimeConversionMode` to allow changing date/time conversion to fall back to converting `java.sql.Date` and `java.sql.Tiemstamp` into Epoch Milliseconds like in the Cosmos DB Connector for Spark 2.4 - See [PR 29081](https://github.com/Azure/azure-sdk-for-java/pull/29081) #### Breaking Changes diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/docs/configuration-reference.md b/sdk/cosmos/azure-cosmos-spark_3_2-12/docs/configuration-reference.md index 45c5690d62039..5ac5ea3c46d62 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/docs/configuration-reference.md +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/docs/configuration-reference.md @@ -18,7 +18,7 @@ | `spark.cosmos.applicationName` | None | Application name | | `spark.cosmos.preferredRegionsList` | None | Preferred regions list to be used for a multi region Cosmos DB account. This is a comma separated value (e.g., `[East US, West US]` or `East US, West US`) provided preferred regions will be used as hint. You should use a collocated spark cluster with your Cosmos DB account and pass the spark cluster region as preferred region. See list of azure regions [here](https://docs.microsoft.com/dotnet/api/microsoft.azure.documents.locationnames?view=azure-dotnet&preserve-view=true). Please note that you can also use `spark.cosmos.preferredRegions` as alias | | `spark.cosmos.diagnostics` | None | Can be used to enable more verbose diagnostics. Currently the only supported option is to set this property to `simple` - which will result in additional logs being emitted as `INFO` logs in the Driver and Executor logs.| -| `spark.cosmos.disableTcpConnectionEndpointRediscovery` | None | Can be used to disable TCP connection endpoint rediscovery. TCP connection endpoint rediscovery should only be disabled when using custom domain names with private endpoints when using a custom Spark environment. When using Azure Databricks or Azure Synapse as Spark runtime it should never be required to disable endpoint rediscovery.| +| `spark.cosmos.disableTcpConnectionEndpointRediscovery` | `false` | Can be used to disable TCP connection endpoint rediscovery. TCP connection endpoint rediscovery should only be disabled when using custom domain names with private endpoints when using a custom Spark environment. When using Azure Databricks or Azure Synapse as Spark runtime it should never be required to disable endpoint rediscovery.| ### Write Config | Config Property Name | Default | Description | @@ -60,7 +60,7 @@ Used to influence the json serialization/deserialization behavior | Config Property Name | Default | Description | | :--- | :---- | :--- | | `spark.cosmos.serialization.inclusionMode` | `Always` | Determines whether null/default values will be serialized to json or whether properties with null/default value will be skipped. The behavior follows the same ideas as [Jackson's JsonInclude.Include](https://github.com/FasterXML/jackson-annotations/blob/d0820002721c76adad2cc87fcd88bf60f56b64de/src/main/java/com/fasterxml/jackson/annotation/JsonInclude.java#L98-L227). `Always` means json properties are created even for null and default values. `NonNull` means no json properties will be created for explicit null values. `NonEmpty` means json properties will not be created for empty string values or empty arrays/mpas. `NonDefault` means json properties will be skipped not just for null/empty but also when the value is identical to the default value `0` for numeric properties for example. | - +| `spark.cosmos.serialization.dateTimeConversionMode` | `Default` | The date/time conversion mode (`Default`, `AlwaysEpochMilliseconds`). With `Default` the standard Spark 3.* behavior is used (`java.sql.Date`/`java.time.LocalDate` are converted to EpochDay, `java.sql.Timestamp`/`java.time.Instant` are converted to MicrosecondsFromEpoch). With `AlwaysEpochMilliseconds` the same behavior the Cosmos DB connector for Spark 2.4 used is applied - `java.sql.Date`, `java.time.LocalDate`, `java.sql.Timestamp` and `java.time.Instant` are converted to MillisecondsFromEpoch.| #### Change feed (only for Spark-Streaming using `cosmos.oltp.changeFeed` data source, which is read-only) configuration | Config Property Name | Default | Description | diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala index bbe73e5bf8fd4..ae9852a7c4d11 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala @@ -13,6 +13,7 @@ import com.azure.cosmos.spark.CosmosPredicates.{assertNotNullOrEmpty, requireNot import com.azure.cosmos.spark.ItemWriteStrategy.{ItemWriteStrategy, values} import com.azure.cosmos.spark.PartitioningStrategies.PartitioningStrategy import com.azure.cosmos.spark.SchemaConversionModes.SchemaConversionMode +import com.azure.cosmos.spark.SerializationDateTimeConversionModes.SerializationDateTimeConversionMode import com.azure.cosmos.spark.SerializationInclusionModes.SerializationInclusionMode import com.azure.cosmos.spark.diagnostics.{DiagnosticsProvider, FeedDiagnosticsProvider, SimpleDiagnosticsProvider} import org.apache.spark.SparkConf @@ -89,6 +90,8 @@ private[spark] object CosmosConfigNames { "spark.cosmos.throughputControl.globalControl.expireIntervalInMS" val SerializationInclusionMode = "spark.cosmos.serialization.inclusionMode" + val SerializationDateTimeConversionMode = + "spark.cosmos.serialization.dateTimeConversionMode" private val cosmosPrefix = "spark.cosmos." @@ -140,7 +143,8 @@ private[spark] object CosmosConfigNames { ThroughputControlGlobalControlContainer, ThroughputControlGlobalControlRenewalIntervalInMS, ThroughputControlGlobalControlExpireIntervalInMS, - SerializationInclusionMode + SerializationInclusionMode, + SerializationDateTimeConversionMode ) def validateConfigName(name: String): Unit = { @@ -803,7 +807,18 @@ private object SerializationInclusionModes extends Enumeration { val NonDefault: SerializationInclusionModes.Value = Value("NonDefault") } -private case class CosmosSerializationConfig(serializationInclusionMode: SerializationInclusionMode) +private object SerializationDateTimeConversionModes extends Enumeration { + type SerializationDateTimeConversionMode = Value + + val Default: SerializationDateTimeConversionModes.Value = Value("Default") + val AlwaysEpochMilliseconds: SerializationDateTimeConversionModes.Value = Value("AlwaysEpochMilliseconds") +} + +private case class CosmosSerializationConfig +( + serializationInclusionMode: SerializationInclusionMode, + serializationDateTimeConversionMode: SerializationDateTimeConversionMode +) private object CosmosSerializationConfig { private val inclusionMode = CosmosConfigEntry[SerializationInclusionMode]( @@ -815,14 +830,29 @@ private object CosmosSerializationConfig { " When serializing json documents this setting determines whether json properties will be emitted" + " for columns in the RDD that are null/empty. The default value is `Always`.") + private val dateTimeConversionMode = CosmosConfigEntry[SerializationDateTimeConversionMode]( + key = CosmosConfigNames.SerializationDateTimeConversionMode, + mandatory = false, + defaultValue = Some(SerializationDateTimeConversionModes.Default), + parseFromStringFunction = value => CosmosConfigEntry.parseEnumeration(value, SerializationDateTimeConversionModes), + helpMessage = "The date/time conversion mode (`Default`, `AlwaysEpochMilliseconds`). " + + "With `Default` the standard Spark 3.* behavior is used (`java.sql.Date`/`java.time.LocalDate` are converted " + + "to EpochDay, `java.sql.Timestamp`/`java.time.Instant` are converted to MicrosecondsFromEpoch). With " + + "`AlwaysEpochMilliseconds` the same behavior the Cosmos DB connector for Spark 2.4 used is applied - " + + "`java.sql.Date`, `java.time.LocalDate`, `java.sql.Timestamp` and `java.time.Instant` are converted " + + "to MillisecondsFromEpoch.") + def parseSerializationConfig(cfg: Map[String, String]): CosmosSerializationConfig = { val inclusionModeOpt = CosmosConfigEntry.parse(cfg, inclusionMode) + val dateTimeConversionModeOpt = CosmosConfigEntry.parse(cfg, dateTimeConversionMode) // parsing above already validated this assert(inclusionModeOpt.isDefined) + assert(dateTimeConversionModeOpt.isDefined) CosmosSerializationConfig( - serializationInclusionMode = inclusionModeOpt.get + serializationInclusionMode = inclusionModeOpt.get, + serializationDateTimeConversionMode = dateTimeConversionModeOpt.get ) } } diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosRowConverter.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosRowConverter.scala index 93b9b27e48bed..6a87a8c5d22e1 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosRowConverter.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosRowConverter.scala @@ -3,6 +3,7 @@ package com.azure.cosmos.spark import com.azure.cosmos.implementation.{Constants, Utils} +import com.azure.cosmos.spark.CosmosConfigNames.SerializationDateTimeConversionMode import com.azure.cosmos.spark.CosmosTableSchemaInferrer.LsnAttributeName import com.azure.cosmos.spark.SchemaConversionModes.SchemaConversionMode import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait @@ -18,8 +19,9 @@ import org.apache.spark.sql.catalyst.expressions.{GenericRowWithSchema, UnsafeMa import org.apache.spark.sql.catalyst.util.ArrayData import java.io.IOException -import java.time.{OffsetDateTime, ZoneOffset} +import java.time.{Instant, LocalDate, OffsetDateTime, ZoneOffset} import java.time.format.DateTimeFormatter +import java.util.concurrent.TimeUnit import scala.collection.concurrent.TrieMap // scalastyle:off underscore.import @@ -293,14 +295,54 @@ private[cosmos] class CosmosRowConverter( case DecimalType() => convertToJsonNodeConditionally(rowData.asInstanceOf[java.math.BigDecimal]) case DateType if rowData.isInstanceOf[java.lang.Long] => - convertToJsonNodeConditionally(rowData.asInstanceOf[Long]) + serializationConfig.serializationDateTimeConversionMode match { + case SerializationDateTimeConversionModes.Default => + convertToJsonNodeConditionally(rowData.asInstanceOf[Long]) + case SerializationDateTimeConversionModes.AlwaysEpochMilliseconds => + convertToJsonNodeConditionally(LocalDate + .ofEpochDay(rowData.asInstanceOf[Long]) + .atStartOfDay() + .toInstant(ZoneOffset.UTC).toEpochMilli) + } case DateType if rowData.isInstanceOf[java.lang.Integer] => - convertToJsonNodeConditionally(rowData.asInstanceOf[Integer]) + serializationConfig.serializationDateTimeConversionMode match { + case SerializationDateTimeConversionModes.Default => + convertToJsonNodeConditionally(rowData.asInstanceOf[java.lang.Integer]) + case SerializationDateTimeConversionModes.AlwaysEpochMilliseconds => + convertToJsonNodeConditionally(LocalDate + .ofEpochDay(rowData.asInstanceOf[java.lang.Integer].longValue()) + .atStartOfDay() + .toInstant(ZoneOffset.UTC).toEpochMilli) + } case DateType => convertToJsonNodeConditionally(rowData.asInstanceOf[Date].getTime) case TimestampType if rowData.isInstanceOf[java.lang.Long] => - convertToJsonNodeConditionally(rowData.asInstanceOf[Long]) + serializationConfig.serializationDateTimeConversionMode match { + case SerializationDateTimeConversionModes.Default => + convertToJsonNodeConditionally(rowData.asInstanceOf[java.lang.Long]) + case SerializationDateTimeConversionModes.AlwaysEpochMilliseconds => + val microsSinceEpoch = rowData.asInstanceOf[java.lang.Long] + convertToJsonNodeConditionally( + Instant.ofEpochSecond( + TimeUnit.MICROSECONDS.toSeconds(microsSinceEpoch), + TimeUnit.MICROSECONDS.toNanos( + Math.floorMod(microsSinceEpoch, TimeUnit.SECONDS.toMicros(1)) + ) + ).toEpochMilli) + } case TimestampType if rowData.isInstanceOf[java.lang.Integer] => - convertToJsonNodeConditionally(rowData.asInstanceOf[Integer]) + serializationConfig.serializationDateTimeConversionMode match { + case SerializationDateTimeConversionModes.Default => + convertToJsonNodeConditionally(rowData.asInstanceOf[java.lang.Integer]) + case SerializationDateTimeConversionModes.AlwaysEpochMilliseconds => + val microsSinceEpoch = rowData.asInstanceOf[java.lang.Integer].longValue() + convertToJsonNodeConditionally( + Instant.ofEpochSecond( + TimeUnit.MICROSECONDS.toSeconds(microsSinceEpoch), + TimeUnit.MICROSECONDS.toNanos( + Math.floorMod(microsSinceEpoch, TimeUnit.SECONDS.toMicros(1)) + ) + ).toEpochMilli) + } case TimestampType => convertToJsonNodeConditionally(rowData.asInstanceOf[Timestamp].getTime) case arrayType: ArrayType if rowData.isInstanceOf[ArrayData] => val arrayDataValue = rowData.asInstanceOf[ArrayData] @@ -365,11 +407,62 @@ private[cosmos] class CosmosRowConverter( case DecimalType() if rowData.isInstanceOf[Decimal] => objectMapper.convertValue(rowData.asInstanceOf[Decimal].toJavaBigDecimal, classOf[JsonNode]) case DecimalType() if rowData.isInstanceOf[Long] => objectMapper.convertValue(new java.math.BigDecimal(rowData.asInstanceOf[java.lang.Long]), classOf[JsonNode]) case DecimalType() => objectMapper.convertValue(rowData.asInstanceOf[java.math.BigDecimal], classOf[JsonNode]) - case DateType if rowData.isInstanceOf[java.lang.Long] => objectMapper.convertValue(rowData.asInstanceOf[java.lang.Long], classOf[JsonNode]) - case DateType if rowData.isInstanceOf[java.lang.Integer] => objectMapper.convertValue(rowData.asInstanceOf[java.lang.Integer], classOf[JsonNode]) + case DateType if rowData.isInstanceOf[java.lang.Long] => + serializationConfig.serializationDateTimeConversionMode match { + case SerializationDateTimeConversionModes.Default => + objectMapper.convertValue(rowData.asInstanceOf[java.lang.Long], classOf[JsonNode]) + case SerializationDateTimeConversionModes.AlwaysEpochMilliseconds => + objectMapper.convertValue( + LocalDate + .ofEpochDay(rowData.asInstanceOf[java.lang.Long]) + .atStartOfDay() + .toInstant(ZoneOffset.UTC).toEpochMilli, + classOf[JsonNode]) + } + + case DateType if rowData.isInstanceOf[java.lang.Integer] => + serializationConfig.serializationDateTimeConversionMode match { + case SerializationDateTimeConversionModes.Default => + objectMapper.convertValue(rowData.asInstanceOf[java.lang.Integer], classOf[JsonNode]) + case SerializationDateTimeConversionModes.AlwaysEpochMilliseconds => + objectMapper.convertValue( + LocalDate + .ofEpochDay(rowData.asInstanceOf[java.lang.Integer].longValue()) + .atStartOfDay() + .toInstant(ZoneOffset.UTC).toEpochMilli, + classOf[JsonNode]) + } case DateType => objectMapper.convertValue(rowData.asInstanceOf[Date].getTime, classOf[JsonNode]) - case TimestampType if rowData.isInstanceOf[java.lang.Long] => objectMapper.convertValue(rowData.asInstanceOf[java.lang.Long], classOf[JsonNode]) - case TimestampType if rowData.isInstanceOf[java.lang.Integer] => objectMapper.convertValue(rowData.asInstanceOf[java.lang.Integer], classOf[JsonNode]) + case TimestampType if rowData.isInstanceOf[java.lang.Long] => + serializationConfig.serializationDateTimeConversionMode match { + case SerializationDateTimeConversionModes.Default => + objectMapper.convertValue(rowData.asInstanceOf[java.lang.Long], classOf[JsonNode]) + case SerializationDateTimeConversionModes.AlwaysEpochMilliseconds => + val microsSinceEpoch = rowData.asInstanceOf[java.lang.Long] + objectMapper.convertValue( + Instant.ofEpochSecond( + TimeUnit.MICROSECONDS.toSeconds(microsSinceEpoch), + TimeUnit.MICROSECONDS.toNanos( + Math.floorMod(microsSinceEpoch, TimeUnit.SECONDS.toMicros(1)) + ) + ).toEpochMilli, + classOf[JsonNode]) + } + case TimestampType if rowData.isInstanceOf[java.lang.Integer] => + serializationConfig.serializationDateTimeConversionMode match { + case SerializationDateTimeConversionModes.Default => + objectMapper.convertValue(rowData.asInstanceOf[java.lang.Integer], classOf[JsonNode]) + case SerializationDateTimeConversionModes.AlwaysEpochMilliseconds => + val microsSinceEpoch = rowData.asInstanceOf[java.lang.Integer].longValue() + objectMapper.convertValue( + Instant.ofEpochSecond( + TimeUnit.MICROSECONDS.toSeconds(microsSinceEpoch), + TimeUnit.MICROSECONDS.toNanos( + Math.floorMod(microsSinceEpoch, TimeUnit.SECONDS.toMicros(1)) + ) + ).toEpochMilli, + classOf[JsonNode]) + } case TimestampType => objectMapper.convertValue(rowData.asInstanceOf[Timestamp].getTime, classOf[JsonNode]) case arrayType: ArrayType if rowData.isInstanceOf[ArrayData] => convertSparkArrayToArrayNode(arrayType.elementType, arrayType.containsNull, rowData.asInstanceOf[ArrayData]) case arrayType: ArrayType => convertSparkArrayToArrayNode(arrayType.elementType, arrayType.containsNull, rowData.asInstanceOf[Seq[_]]) diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/CosmosRowConverterSpec.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/CosmosRowConverterSpec.scala index 48ba9eb17ad69..22bba2ff8c449 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/CosmosRowConverterSpec.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/CosmosRowConverterSpec.scala @@ -6,14 +6,16 @@ import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.node.{ArrayNode, BinaryNode, BooleanNode, ObjectNode} import org.apache.spark.sql.Row -import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToCatalyst +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.util.ArrayData import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.{GenericRowWithSchema, Uuid} import java.sql.{Date, Timestamp} import java.time.format.DateTimeFormatter -import java.time.{LocalDateTime, OffsetDateTime, ZoneOffset} +import java.time.temporal.ChronoUnit +import java.time.{Instant, LocalDate, LocalDateTime, OffsetDateTime, ZoneOffset} import java.util.UUID import scala.util.Random @@ -28,13 +30,51 @@ class CosmosRowConverterSpec extends UnitSpec with BasicLoggingTrait { val objectMapper = new ObjectMapper() private[this] val defaultRowConverter = - CosmosRowConverter.get(new CosmosSerializationConfig(SerializationInclusionModes.Always)) + CosmosRowConverter.get( + new CosmosSerializationConfig( + SerializationInclusionModes.Always, + SerializationDateTimeConversionModes.Default + ) + ) + private[this] val alwaysEpochMsRowConverter = + CosmosRowConverter.get( + new CosmosSerializationConfig( + SerializationInclusionModes.Always, + SerializationDateTimeConversionModes.AlwaysEpochMilliseconds + ) + ) + + private[this] val alwaysEpochMsRowConverterNonNull = + CosmosRowConverter.get( + new CosmosSerializationConfig( + SerializationInclusionModes.NonNull, + SerializationDateTimeConversionModes.AlwaysEpochMilliseconds + ) + ) + private[this] val rowConverterInclusionNonNull = - CosmosRowConverter.get(new CosmosSerializationConfig(SerializationInclusionModes.NonNull)) + CosmosRowConverter.get( + new CosmosSerializationConfig( + SerializationInclusionModes.NonNull, + SerializationDateTimeConversionModes.Default + ) + ) + private[this] val rowConverterInclusionNonDefault = - CosmosRowConverter.get(new CosmosSerializationConfig(SerializationInclusionModes.NonDefault)) + CosmosRowConverter.get( + new CosmosSerializationConfig( + SerializationInclusionModes.NonDefault, + SerializationDateTimeConversionModes.Default + ) + ) + private[this] val rowConverterInclusionNonEmpty = - CosmosRowConverter.get(new CosmosSerializationConfig(SerializationInclusionModes.NonEmpty)) + CosmosRowConverter.get( + new CosmosSerializationConfig( + SerializationInclusionModes.NonEmpty, + SerializationDateTimeConversionModes.Default + ) + ) "basic spark row" should "translate to ObjectNode" in { @@ -423,6 +463,47 @@ class CosmosRowConverterSpec extends UnitSpec with BasicLoggingTrait { objectNode.get(colName4).asInt() shouldEqual colVal3 } + "date and time in spark row" should "should honor dateTimeConversionMode config" in { + val colName1 = "testCol1" + val colName2 = "testCol2" + + val testDate = LocalDate.of(1945, 12, 12) + val testTimestamp = new java.sql.Timestamp( + 46, 11, 12, 12, 12, 12, 0) + .toLocalDateTime.toInstant(ZoneOffset.UTC) + + // Catalyst optimizer will convert java.sql.Date into LocalDate.toEpochDay + val colVal1Raw = new Date(45, 11, 12) + convertToCatalyst(colVal1Raw).isInstanceOf[Int] shouldEqual true + val colVal1= convertToCatalyst(colVal1Raw).asInstanceOf[Int] + colVal1 shouldEqual -8786 + colVal1 shouldEqual testDate.toEpochDay + + // Catalyst optimizer will convert java.sql.Timestamp into epoch Microseconds + val colVal2Raw = Timestamp.from(testTimestamp) + convertToCatalyst(colVal2Raw).isInstanceOf[Long] shouldEqual true + val colVal2= convertToCatalyst(colVal2Raw).asInstanceOf[Long] + colVal2 shouldEqual -727530468000000L + colVal2 shouldEqual ChronoUnit.MICROS.between(Instant.EPOCH, testTimestamp) + + val row = new GenericRowWithSchema( + Array(colVal1, colVal2), + StructType(Seq(StructField(colName1, DateType), + StructField(colName2, TimestampType)))) + + var objectNode = defaultRowConverter.fromRowToObjectNode(row) + objectNode.get(colName1).asLong() shouldEqual colVal1 + objectNode.get(colName2).asLong() shouldEqual colVal2 + + objectNode = alwaysEpochMsRowConverter.fromRowToObjectNode(row) + objectNode.get(colName1).asLong() shouldEqual testDate.atStartOfDay().toInstant(ZoneOffset.UTC).toEpochMilli + objectNode.get(colName2).asLong() shouldEqual testTimestamp.toEpochMilli + + objectNode = alwaysEpochMsRowConverterNonNull.fromRowToObjectNode(row) + objectNode.get(colName1).asLong() shouldEqual testDate.atStartOfDay().toInstant(ZoneOffset.UTC).toEpochMilli + objectNode.get(colName2).asLong() shouldEqual testTimestamp.toEpochMilli + } + "numeric types in spark row" should "translate to ObjectNode" in { val colName1 = "testCol1" val colName2 = "testCol2" diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/TransientIOErrorsRetryingIteratorITest.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/TransientIOErrorsRetryingIteratorITest.scala index ec15bd7ffb6f2..3b162d590ed24 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/TransientIOErrorsRetryingIteratorITest.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/TransientIOErrorsRetryingIteratorITest.scala @@ -47,7 +47,11 @@ class TransientIOErrorsRetryingIteratorITest } } - val cosmosSerializationConfig = CosmosSerializationConfig(SerializationInclusionModes.Always) + val cosmosSerializationConfig = CosmosSerializationConfig( + SerializationInclusionModes.Always, + SerializationDateTimeConversionModes.Default + ) + val cosmosRowConverter = CosmosRowConverter.get(cosmosSerializationConfig) val queryOptions = new CosmosQueryRequestOptions() ImplementationBridgeHelpers @@ -170,7 +174,10 @@ class TransientIOErrorsRetryingIteratorITest } } - val cosmosSerializationConfig = CosmosSerializationConfig(SerializationInclusionModes.Always) + val cosmosSerializationConfig = CosmosSerializationConfig( + SerializationInclusionModes.Always, + SerializationDateTimeConversionModes.Default + ) val cosmosRowConverter = CosmosRowConverter.get(cosmosSerializationConfig) val queryOptions = new CosmosQueryRequestOptions() ImplementationBridgeHelpers diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/TransientIOErrorsRetryingIteratorSpec.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/TransientIOErrorsRetryingIteratorSpec.scala index e4dea392e9489..2047eef344e86 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/TransientIOErrorsRetryingIteratorSpec.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/TransientIOErrorsRetryingIteratorSpec.scala @@ -26,7 +26,11 @@ class TransientIOErrorsRetryingIteratorSpec extends UnitSpec with BasicLoggingTr private val rnd = scala.util.Random private val pageSize = 2 - private val cosmosSerializationConfig = CosmosSerializationConfig(SerializationInclusionModes.Always) + private val cosmosSerializationConfig = CosmosSerializationConfig( + SerializationInclusionModes.Always, + SerializationDateTimeConversionModes.Default + ) + private val cosmosRowConverter = CosmosRowConverter.get(cosmosSerializationConfig) "TransientIOErrors" should "be retried without duplicates or missing records" in {