Skip to content

Commit

Permalink
Adding an API to determine installed version of the Spark connector (#…
Browse files Browse the repository at this point in the history
…27709)

* Adding a simple public Api to determine installed version of the Spark connector

* Updated change log

* Update sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md

Co-authored-by: Kushagra Thapar <[email protected]>

* Update sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md

Co-authored-by: Kushagra Thapar <[email protected]>

* Update sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosChangeFeedDataSource.scala

Co-authored-by: Kushagra Thapar <[email protected]>

Co-authored-by: Kushagra Thapar <[email protected]>
  • Loading branch information
FabianMeiswinkel and kushagraThapar authored Mar 16, 2022
1 parent 86326b6 commit 4e287fa
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 22 deletions.
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 4.8.0-beta.1 (Unreleased)

#### Features Added
* Added an API to determine installed version of the Cosmos Spark connector (`CosmosItemsDataSource.version`/`CosmosChangeFeedDataSource.version`).See [PR 27709](https://github.com/Azure/azure-sdk-for-java/pull/27709)

#### Breaking Changes

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 4.8.0-beta.1 (Unreleased)

#### Features Added
* Added an API to determine installed version of the Cosmos Spark connector (`CosmosItemsDataSource.version`/`CosmosChangeFeedDataSource.version`).See [PR 27709](https://github.com/Azure/azure-sdk-for-java/pull/27709)

#### Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,13 @@ class CosmosChangeFeedDataSource
*/
override def supportsExternalMetadata(): Boolean = true
}

object CosmosChangeFeedDataSource {
/**
* Easy way to validate the version of the Cosmos Data Source
* @return the version of the Cosmos Data Source
*/
def version : String = {
CosmosConstants.currentVersion
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,31 +27,32 @@ class CosmosItemsDataSource extends DataSourceRegister with TableProvider with B
private lazy val sparkSession = SparkSession.active

/**
* Infer the schema of the table identified by the given options.
* @param options an immutable case-insensitive string-to-string
* @return StructType inferred schema
*/
* Infer the schema of the table identified by the given options.
*
* @param options an immutable case-insensitive string-to-string
* @return StructType inferred schema
*/
override def inferSchema(options: CaseInsensitiveStringMap): StructType = {
new ItemsTable(sparkSession, Array.empty, None, None, options).schema()
}

/**
* Represents the format that this data source provider uses.
*/
* Represents the format that this data source provider uses.
*/
override def shortName(): String = CosmosConstants.Names.ItemsDataSourceShortName

/**
* Return a `Table` instance with the specified table schema, partitioning and properties
* to do read/write. The returned table should report the same schema and partitioning with the
* specified ones, or Spark may fail the operation.
*
* @param schema The specified table schema.
* @param partitioning The specified table partitioning.
* @param properties The specified table properties. It's case preserving (contains exactly what
* users specified) and implementations are free to use it case sensitively or
* insensitively. It should be able to identify a table, e.g. file path, Kafka
* topic name, etc.
*/
* Return a `Table` instance with the specified table schema, partitioning and properties
* to do read/write. The returned table should report the same schema and partitioning with the
* specified ones, or Spark may fail the operation.
*
* @param schema The specified table schema.
* @param partitioning The specified table partitioning.
* @param properties The specified table properties. It's case preserving (contains exactly what
* users specified) and implementations are free to use it case sensitively or
* insensitively. It should be able to identify a table, e.g. file path, Kafka
* topic name, etc.
*/
override def getTable(schema: StructType, partitioning: Array[Transform], properties: util.Map[String, String]): Table = {
val diagnostics = DiagnosticsConfig.parseDiagnosticsConfig(properties.asScala.toMap)
// getTable - This is used for loading table with user specified schema and other transformations.
Expand All @@ -65,9 +66,19 @@ class CosmosItemsDataSource extends DataSourceRegister with TableProvider with B
}

/**
* Returns true if the source has the ability of accepting external table metadata when getting
* tables. The external table metadata includes user-specified schema from
* `DataFrameReader`/`DataStreamReader` and schema/partitioning stored in Spark catalog.
*/
* Returns true if the source has the ability of accepting external table metadata when getting
* tables. The external table metadata includes user-specified schema from
* `DataFrameReader`/`DataStreamReader` and schema/partitioning stored in Spark catalog.
*/
override def supportsExternalMetadata(): Boolean = true
}

object CosmosItemsDataSource {
/**
* Easy way to validate the version of the Cosmos Data Source
* @return the version of teh Cosmos Data Source
*/
def version : String = {
CosmosConstants.currentVersion
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ class SparkE2EChangeFeedITest
this.reinitializeContainer()
}

"spark change feed DataSource version" can "be determined" in {
CosmosChangeFeedDataSource.version shouldEqual CosmosConstants.currentVersion
}

"spark change feed query (incremental)" can "use default schema" in {
val cosmosEndpoint = TestConfigurations.HOST
val cosmosMasterKey = TestConfigurations.MASTER_KEY
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ package com.azure.cosmos.spark

import java.util.UUID
import com.azure.cosmos.implementation.{TestConfigurations, Utils}
import com.azure.cosmos.models.{CosmosItemResponse, PartitionKey}
import com.azure.cosmos.models.PartitionKey
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.node.ObjectNode

Expand Down Expand Up @@ -33,6 +33,10 @@ class SparkE2EQueryITest
// to ensure we don't do sub-range feed-range
// once emulator fixed switch back to default partitioning.

"spark items DataSource version" can "be determined" in {
CosmosItemsDataSource.version shouldEqual CosmosConstants.currentVersion
}

"spark query" can "basic nested query" in {
val cosmosEndpoint = TestConfigurations.HOST
val cosmosMasterKey = TestConfigurations.MASTER_KEY
Expand Down

0 comments on commit 4e287fa

Please sign in to comment.