Skip to content

Commit

Permalink
Adding optional fallback to encode java.sql.Date as EpochMs (#29081)
Browse files Browse the repository at this point in the history
* Cosmos Spark: Allow disabling endpoint rediscovery

* Change log and config reference

* Adding optional fallback to encode java.sql.Date as EpochMs

* Added changelog

* Reacted to code review feedback
  • Loading branch information
FabianMeiswinkel authored May 25, 2022
1 parent 8901e4c commit 632835e
Show file tree
Hide file tree
Showing 8 changed files with 240 additions and 23 deletions.
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#### Features Added
* Added ability to disable endpoint rediscovery when using custom domain names in combination with private endpoints from a custom (on-premise) Spark environment (neither Databricks nor Synapse). - See [PR 29027](https://github.com/Azure/azure-sdk-for-java/pull/29027)
* Added a config option `spark.cosmos.serialization.dateTimeConversionMode` to allow changing date/time conversion to fall back to converting `java.sql.Date` and `java.sql.Tiemstamp` into Epoch Milliseconds like in the Cosmos DB Connector for Spark 2.4 - See [PR 29081](https://github.com/Azure/azure-sdk-for-java/pull/29081)

#### Breaking Changes

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#### Features Added
* Added ability to disable endpoint rediscovery when using custom domain names in combination with private endpoints from a custom (on-premise) Spark environment (neither Databricks nor Synapse). - See [PR 29027](https://github.com/Azure/azure-sdk-for-java/pull/29027)
* Added a config option `spark.cosmos.serialization.dateTimeConversionMode` to allow changing date/time conversion to fall back to converting `java.sql.Date` and `java.sql.Tiemstamp` into Epoch Milliseconds like in the Cosmos DB Connector for Spark 2.4 - See [PR 29081](https://github.com/Azure/azure-sdk-for-java/pull/29081)

#### Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
| `spark.cosmos.applicationName` | None | Application name |
| `spark.cosmos.preferredRegionsList` | None | Preferred regions list to be used for a multi region Cosmos DB account. This is a comma separated value (e.g., `[East US, West US]` or `East US, West US`) provided preferred regions will be used as hint. You should use a collocated spark cluster with your Cosmos DB account and pass the spark cluster region as preferred region. See list of azure regions [here](https://docs.microsoft.com/dotnet/api/microsoft.azure.documents.locationnames?view=azure-dotnet&preserve-view=true). Please note that you can also use `spark.cosmos.preferredRegions` as alias |
| `spark.cosmos.diagnostics` | None | Can be used to enable more verbose diagnostics. Currently the only supported option is to set this property to `simple` - which will result in additional logs being emitted as `INFO` logs in the Driver and Executor logs.|
| `spark.cosmos.disableTcpConnectionEndpointRediscovery` | None | Can be used to disable TCP connection endpoint rediscovery. TCP connection endpoint rediscovery should only be disabled when using custom domain names with private endpoints when using a custom Spark environment. When using Azure Databricks or Azure Synapse as Spark runtime it should never be required to disable endpoint rediscovery.|
| `spark.cosmos.disableTcpConnectionEndpointRediscovery` | `false` | Can be used to disable TCP connection endpoint rediscovery. TCP connection endpoint rediscovery should only be disabled when using custom domain names with private endpoints when using a custom Spark environment. When using Azure Databricks or Azure Synapse as Spark runtime it should never be required to disable endpoint rediscovery.|

### Write Config
| Config Property Name | Default | Description |
Expand Down Expand Up @@ -60,7 +60,7 @@ Used to influence the json serialization/deserialization behavior
| Config Property Name | Default | Description |
| :--- | :---- | :--- |
| `spark.cosmos.serialization.inclusionMode` | `Always` | Determines whether null/default values will be serialized to json or whether properties with null/default value will be skipped. The behavior follows the same ideas as [Jackson's JsonInclude.Include](https://github.com/FasterXML/jackson-annotations/blob/d0820002721c76adad2cc87fcd88bf60f56b64de/src/main/java/com/fasterxml/jackson/annotation/JsonInclude.java#L98-L227). `Always` means json properties are created even for null and default values. `NonNull` means no json properties will be created for explicit null values. `NonEmpty` means json properties will not be created for empty string values or empty arrays/mpas. `NonDefault` means json properties will be skipped not just for null/empty but also when the value is identical to the default value `0` for numeric properties for example. |

| `spark.cosmos.serialization.dateTimeConversionMode` | `Default` | The date/time conversion mode (`Default`, `AlwaysEpochMilliseconds`). With `Default` the standard Spark 3.* behavior is used (`java.sql.Date`/`java.time.LocalDate` are converted to EpochDay, `java.sql.Timestamp`/`java.time.Instant` are converted to MicrosecondsFromEpoch). With `AlwaysEpochMilliseconds` the same behavior the Cosmos DB connector for Spark 2.4 used is applied - `java.sql.Date`, `java.time.LocalDate`, `java.sql.Timestamp` and `java.time.Instant` are converted to MillisecondsFromEpoch.|

#### Change feed (only for Spark-Streaming using `cosmos.oltp.changeFeed` data source, which is read-only) configuration
| Config Property Name | Default | Description |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import com.azure.cosmos.spark.CosmosPredicates.{assertNotNullOrEmpty, requireNot
import com.azure.cosmos.spark.ItemWriteStrategy.{ItemWriteStrategy, values}
import com.azure.cosmos.spark.PartitioningStrategies.PartitioningStrategy
import com.azure.cosmos.spark.SchemaConversionModes.SchemaConversionMode
import com.azure.cosmos.spark.SerializationDateTimeConversionModes.SerializationDateTimeConversionMode
import com.azure.cosmos.spark.SerializationInclusionModes.SerializationInclusionMode
import com.azure.cosmos.spark.diagnostics.{DiagnosticsProvider, FeedDiagnosticsProvider, SimpleDiagnosticsProvider}
import org.apache.spark.SparkConf
Expand Down Expand Up @@ -89,6 +90,8 @@ private[spark] object CosmosConfigNames {
"spark.cosmos.throughputControl.globalControl.expireIntervalInMS"
val SerializationInclusionMode =
"spark.cosmos.serialization.inclusionMode"
val SerializationDateTimeConversionMode =
"spark.cosmos.serialization.dateTimeConversionMode"

private val cosmosPrefix = "spark.cosmos."

Expand Down Expand Up @@ -140,7 +143,8 @@ private[spark] object CosmosConfigNames {
ThroughputControlGlobalControlContainer,
ThroughputControlGlobalControlRenewalIntervalInMS,
ThroughputControlGlobalControlExpireIntervalInMS,
SerializationInclusionMode
SerializationInclusionMode,
SerializationDateTimeConversionMode
)

def validateConfigName(name: String): Unit = {
Expand Down Expand Up @@ -803,7 +807,18 @@ private object SerializationInclusionModes extends Enumeration {
val NonDefault: SerializationInclusionModes.Value = Value("NonDefault")
}

private case class CosmosSerializationConfig(serializationInclusionMode: SerializationInclusionMode)
private object SerializationDateTimeConversionModes extends Enumeration {
type SerializationDateTimeConversionMode = Value

val Default: SerializationDateTimeConversionModes.Value = Value("Default")
val AlwaysEpochMilliseconds: SerializationDateTimeConversionModes.Value = Value("AlwaysEpochMilliseconds")
}

private case class CosmosSerializationConfig
(
serializationInclusionMode: SerializationInclusionMode,
serializationDateTimeConversionMode: SerializationDateTimeConversionMode
)

private object CosmosSerializationConfig {
private val inclusionMode = CosmosConfigEntry[SerializationInclusionMode](
Expand All @@ -815,14 +830,29 @@ private object CosmosSerializationConfig {
" When serializing json documents this setting determines whether json properties will be emitted" +
" for columns in the RDD that are null/empty. The default value is `Always`.")

private val dateTimeConversionMode = CosmosConfigEntry[SerializationDateTimeConversionMode](
key = CosmosConfigNames.SerializationDateTimeConversionMode,
mandatory = false,
defaultValue = Some(SerializationDateTimeConversionModes.Default),
parseFromStringFunction = value => CosmosConfigEntry.parseEnumeration(value, SerializationDateTimeConversionModes),
helpMessage = "The date/time conversion mode (`Default`, `AlwaysEpochMilliseconds`). " +
"With `Default` the standard Spark 3.* behavior is used (`java.sql.Date`/`java.time.LocalDate` are converted " +
"to EpochDay, `java.sql.Timestamp`/`java.time.Instant` are converted to MicrosecondsFromEpoch). With " +
"`AlwaysEpochMilliseconds` the same behavior the Cosmos DB connector for Spark 2.4 used is applied - " +
"`java.sql.Date`, `java.time.LocalDate`, `java.sql.Timestamp` and `java.time.Instant` are converted " +
"to MillisecondsFromEpoch.")

def parseSerializationConfig(cfg: Map[String, String]): CosmosSerializationConfig = {
val inclusionModeOpt = CosmosConfigEntry.parse(cfg, inclusionMode)
val dateTimeConversionModeOpt = CosmosConfigEntry.parse(cfg, dateTimeConversionMode)

// parsing above already validated this
assert(inclusionModeOpt.isDefined)
assert(dateTimeConversionModeOpt.isDefined)

CosmosSerializationConfig(
serializationInclusionMode = inclusionModeOpt.get
serializationInclusionMode = inclusionModeOpt.get,
serializationDateTimeConversionMode = dateTimeConversionModeOpt.get
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package com.azure.cosmos.spark

import com.azure.cosmos.implementation.{Constants, Utils}
import com.azure.cosmos.spark.CosmosConfigNames.SerializationDateTimeConversionMode
import com.azure.cosmos.spark.CosmosTableSchemaInferrer.LsnAttributeName
import com.azure.cosmos.spark.SchemaConversionModes.SchemaConversionMode
import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait
Expand All @@ -18,8 +19,9 @@ import org.apache.spark.sql.catalyst.expressions.{GenericRowWithSchema, UnsafeMa
import org.apache.spark.sql.catalyst.util.ArrayData

import java.io.IOException
import java.time.{OffsetDateTime, ZoneOffset}
import java.time.{Instant, LocalDate, OffsetDateTime, ZoneOffset}
import java.time.format.DateTimeFormatter
import java.util.concurrent.TimeUnit
import scala.collection.concurrent.TrieMap

// scalastyle:off underscore.import
Expand Down Expand Up @@ -293,14 +295,54 @@ private[cosmos] class CosmosRowConverter(
case DecimalType() =>
convertToJsonNodeConditionally(rowData.asInstanceOf[java.math.BigDecimal])
case DateType if rowData.isInstanceOf[java.lang.Long] =>
convertToJsonNodeConditionally(rowData.asInstanceOf[Long])
serializationConfig.serializationDateTimeConversionMode match {
case SerializationDateTimeConversionModes.Default =>
convertToJsonNodeConditionally(rowData.asInstanceOf[Long])
case SerializationDateTimeConversionModes.AlwaysEpochMilliseconds =>
convertToJsonNodeConditionally(LocalDate
.ofEpochDay(rowData.asInstanceOf[Long])
.atStartOfDay()
.toInstant(ZoneOffset.UTC).toEpochMilli)
}
case DateType if rowData.isInstanceOf[java.lang.Integer] =>
convertToJsonNodeConditionally(rowData.asInstanceOf[Integer])
serializationConfig.serializationDateTimeConversionMode match {
case SerializationDateTimeConversionModes.Default =>
convertToJsonNodeConditionally(rowData.asInstanceOf[java.lang.Integer])
case SerializationDateTimeConversionModes.AlwaysEpochMilliseconds =>
convertToJsonNodeConditionally(LocalDate
.ofEpochDay(rowData.asInstanceOf[java.lang.Integer].longValue())
.atStartOfDay()
.toInstant(ZoneOffset.UTC).toEpochMilli)
}
case DateType => convertToJsonNodeConditionally(rowData.asInstanceOf[Date].getTime)
case TimestampType if rowData.isInstanceOf[java.lang.Long] =>
convertToJsonNodeConditionally(rowData.asInstanceOf[Long])
serializationConfig.serializationDateTimeConversionMode match {
case SerializationDateTimeConversionModes.Default =>
convertToJsonNodeConditionally(rowData.asInstanceOf[java.lang.Long])
case SerializationDateTimeConversionModes.AlwaysEpochMilliseconds =>
val microsSinceEpoch = rowData.asInstanceOf[java.lang.Long]
convertToJsonNodeConditionally(
Instant.ofEpochSecond(
TimeUnit.MICROSECONDS.toSeconds(microsSinceEpoch),
TimeUnit.MICROSECONDS.toNanos(
Math.floorMod(microsSinceEpoch, TimeUnit.SECONDS.toMicros(1))
)
).toEpochMilli)
}
case TimestampType if rowData.isInstanceOf[java.lang.Integer] =>
convertToJsonNodeConditionally(rowData.asInstanceOf[Integer])
serializationConfig.serializationDateTimeConversionMode match {
case SerializationDateTimeConversionModes.Default =>
convertToJsonNodeConditionally(rowData.asInstanceOf[java.lang.Integer])
case SerializationDateTimeConversionModes.AlwaysEpochMilliseconds =>
val microsSinceEpoch = rowData.asInstanceOf[java.lang.Integer].longValue()
convertToJsonNodeConditionally(
Instant.ofEpochSecond(
TimeUnit.MICROSECONDS.toSeconds(microsSinceEpoch),
TimeUnit.MICROSECONDS.toNanos(
Math.floorMod(microsSinceEpoch, TimeUnit.SECONDS.toMicros(1))
)
).toEpochMilli)
}
case TimestampType => convertToJsonNodeConditionally(rowData.asInstanceOf[Timestamp].getTime)
case arrayType: ArrayType if rowData.isInstanceOf[ArrayData] =>
val arrayDataValue = rowData.asInstanceOf[ArrayData]
Expand Down Expand Up @@ -365,11 +407,62 @@ private[cosmos] class CosmosRowConverter(
case DecimalType() if rowData.isInstanceOf[Decimal] => objectMapper.convertValue(rowData.asInstanceOf[Decimal].toJavaBigDecimal, classOf[JsonNode])
case DecimalType() if rowData.isInstanceOf[Long] => objectMapper.convertValue(new java.math.BigDecimal(rowData.asInstanceOf[java.lang.Long]), classOf[JsonNode])
case DecimalType() => objectMapper.convertValue(rowData.asInstanceOf[java.math.BigDecimal], classOf[JsonNode])
case DateType if rowData.isInstanceOf[java.lang.Long] => objectMapper.convertValue(rowData.asInstanceOf[java.lang.Long], classOf[JsonNode])
case DateType if rowData.isInstanceOf[java.lang.Integer] => objectMapper.convertValue(rowData.asInstanceOf[java.lang.Integer], classOf[JsonNode])
case DateType if rowData.isInstanceOf[java.lang.Long] =>
serializationConfig.serializationDateTimeConversionMode match {
case SerializationDateTimeConversionModes.Default =>
objectMapper.convertValue(rowData.asInstanceOf[java.lang.Long], classOf[JsonNode])
case SerializationDateTimeConversionModes.AlwaysEpochMilliseconds =>
objectMapper.convertValue(
LocalDate
.ofEpochDay(rowData.asInstanceOf[java.lang.Long])
.atStartOfDay()
.toInstant(ZoneOffset.UTC).toEpochMilli,
classOf[JsonNode])
}

case DateType if rowData.isInstanceOf[java.lang.Integer] =>
serializationConfig.serializationDateTimeConversionMode match {
case SerializationDateTimeConversionModes.Default =>
objectMapper.convertValue(rowData.asInstanceOf[java.lang.Integer], classOf[JsonNode])
case SerializationDateTimeConversionModes.AlwaysEpochMilliseconds =>
objectMapper.convertValue(
LocalDate
.ofEpochDay(rowData.asInstanceOf[java.lang.Integer].longValue())
.atStartOfDay()
.toInstant(ZoneOffset.UTC).toEpochMilli,
classOf[JsonNode])
}
case DateType => objectMapper.convertValue(rowData.asInstanceOf[Date].getTime, classOf[JsonNode])
case TimestampType if rowData.isInstanceOf[java.lang.Long] => objectMapper.convertValue(rowData.asInstanceOf[java.lang.Long], classOf[JsonNode])
case TimestampType if rowData.isInstanceOf[java.lang.Integer] => objectMapper.convertValue(rowData.asInstanceOf[java.lang.Integer], classOf[JsonNode])
case TimestampType if rowData.isInstanceOf[java.lang.Long] =>
serializationConfig.serializationDateTimeConversionMode match {
case SerializationDateTimeConversionModes.Default =>
objectMapper.convertValue(rowData.asInstanceOf[java.lang.Long], classOf[JsonNode])
case SerializationDateTimeConversionModes.AlwaysEpochMilliseconds =>
val microsSinceEpoch = rowData.asInstanceOf[java.lang.Long]
objectMapper.convertValue(
Instant.ofEpochSecond(
TimeUnit.MICROSECONDS.toSeconds(microsSinceEpoch),
TimeUnit.MICROSECONDS.toNanos(
Math.floorMod(microsSinceEpoch, TimeUnit.SECONDS.toMicros(1))
)
).toEpochMilli,
classOf[JsonNode])
}
case TimestampType if rowData.isInstanceOf[java.lang.Integer] =>
serializationConfig.serializationDateTimeConversionMode match {
case SerializationDateTimeConversionModes.Default =>
objectMapper.convertValue(rowData.asInstanceOf[java.lang.Integer], classOf[JsonNode])
case SerializationDateTimeConversionModes.AlwaysEpochMilliseconds =>
val microsSinceEpoch = rowData.asInstanceOf[java.lang.Integer].longValue()
objectMapper.convertValue(
Instant.ofEpochSecond(
TimeUnit.MICROSECONDS.toSeconds(microsSinceEpoch),
TimeUnit.MICROSECONDS.toNanos(
Math.floorMod(microsSinceEpoch, TimeUnit.SECONDS.toMicros(1))
)
).toEpochMilli,
classOf[JsonNode])
}
case TimestampType => objectMapper.convertValue(rowData.asInstanceOf[Timestamp].getTime, classOf[JsonNode])
case arrayType: ArrayType if rowData.isInstanceOf[ArrayData] => convertSparkArrayToArrayNode(arrayType.elementType, arrayType.containsNull, rowData.asInstanceOf[ArrayData])
case arrayType: ArrayType => convertSparkArrayToArrayNode(arrayType.elementType, arrayType.containsNull, rowData.asInstanceOf[Seq[_]])
Expand Down
Loading

0 comments on commit 632835e

Please sign in to comment.