Skip to content

Commit

Permalink
Fixes deserialization of offsets generated with early versions of the…
Browse files Browse the repository at this point in the history
… connector (#27455)
  • Loading branch information
FabianMeiswinkel authored Mar 4, 2022
1 parent c4060a2 commit 236dd53
Showing 1 changed file with 3 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ private[spark] object ChangeFeedOffset {
private val StatePropertyName: String = "state"
private val InputPartitionsPropertyName: String = "partitions"
val V1Identifier: String = "spark.cosmos.changeFeed.offset.v1"
val V1AlternateIdentifier: String = "azure_cosmos_spark.com.azure.cosmos.spark.changeFeed.offset.v1"
private val objectMapper = new ObjectMapper()

def fromJson(json: String): ChangeFeedOffset = {
Expand Down Expand Up @@ -71,7 +72,8 @@ private[spark] object ChangeFeedOffset {
parsedNode != null &&
parsedNode.isObject &&
parsedNode.get(IdPropertyName) != null &&
parsedNode.get(IdPropertyName).asText("") == V1Identifier &&
(parsedNode.get(IdPropertyName).asText("") == V1Identifier ||
parsedNode.get(IdPropertyName).asText("") == V1AlternateIdentifier) &&
parsedNode.get(StatePropertyName) != null &&
parsedNode.get(StatePropertyName).isTextual &&
parsedNode.get(StatePropertyName).asText("") != ""
Expand Down

0 comments on commit 236dd53

Please sign in to comment.