Skip to content

Commit

Permalink
Fixing NPE in CosmosPagedFlux and iterating on change to limit prefet…
Browse files Browse the repository at this point in the history
…ch memory usage in CosmosPagedIterable (#27261)

* Fixing NPE in CosmosPagedFlux when trying to access non-existing diagnostics

* Iterating on the change to limit prefetch memory consumption in CosmosPagedIterable in Spark query/change feed scenarios

* Fixing tests

* Fixing typo in comment

Co-authored-by: Matias Quaranta <[email protected]>

* Updating changelogs

Co-authored-by: Matias Quaranta <[email protected]>
  • Loading branch information
FabianMeiswinkel and ealsur authored Feb 24, 2022
1 parent c169f76 commit 73b4974
Show file tree
Hide file tree
Showing 15 changed files with 166 additions and 24 deletions.
2 changes: 1 addition & 1 deletion sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* Fixed an issue preventing preferred regions configured in `spark.cosmos.preferredRegionsList` from being used - See [PR 27084](https://github.com/Azure/azure-sdk-for-java/pull/27084)
* Fixed `spark.cosmos.changeFeed.itemCountPerTriggerHint` handling when using structured streaming - there was an issue that would reduce the throughput in subsequent micro batches too aggressively. - See [PR 27101](https://github.com/Azure/azure-sdk-for-java/pull/27101)
* Fixed an issue preventing driver programs to cleanly shutting down due to active Cosmos Clients being cached. - See [PR 27137](https://github.com/Azure/azure-sdk-for-java/pull/27137)
* Fixed an issue resulting in excessive memory consumption due to unbounded prefetch of data in Spark queries (batch or structured streaming) using the `cosmos.oltp` or `cosmos.oltp.changeFeed` connector when the data gets consumed slower than retrieved from the Cosmos DB backend. - See [PR 27237](https://github.com/Azure/azure-sdk-for-java/pull/27237)
* Fixed an issue resulting in excessive memory consumption due to unbounded prefetch of data in Spark queries (batch or structured streaming) using the `cosmos.oltp` or `cosmos.oltp.changeFeed` connector when the data gets consumed slower than retrieved from the Cosmos DB backend. - See [PR 27237](https://github.com/Azure/azure-sdk-for-java/pull/27237) and [PR 27261](https://github.com/Azure/azure-sdk-for-java/pull/27261)

### 4.6.1 (2022-02-11)
#### Key Bug Fixes
Expand Down
2 changes: 1 addition & 1 deletion sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* Fixed an issue preventing preferred regions configured in `spark.cosmos.preferredRegionsList` from being used - See [PR 27084](https://github.com/Azure/azure-sdk-for-java/pull/27084)
* Fixed `spark.cosmos.changeFeed.itemCountPerTriggerHint` handling when using structured streaming - there was an issue that would reduce the throughput in subsequent micro batches too aggressively. - See [PR 27101](https://github.com/Azure/azure-sdk-for-java/pull/27101)
* Fixed an issue preventing driver programs to cleanly shutting down due to active Cosmos Clients being cached. - See [PR 27137](https://github.com/Azure/azure-sdk-for-java/pull/27137)
* Fixed an issue resulting in excessive memory consumption due to unbounded prefetch of data in Spark queries (batch or structured streaming) using the `cosmos.oltp` or `cosmos.oltp.changeFeed` connector when the data gets consumed slower than retrieved from the Cosmos DB backend. - See [PR 27237](https://github.com/Azure/azure-sdk-for-java/pull/27237)
* Fixed an issue resulting in excessive memory consumption due to unbounded prefetch of data in Spark queries (batch or structured streaming) using the `cosmos.oltp` or `cosmos.oltp.changeFeed` connector when the data gets consumed slower than retrieved from the Cosmos DB backend. - See [PR 27237](https://github.com/Azure/azure-sdk-for-java/pull/27237) and [PR 27261](https://github.com/Azure/azure-sdk-for-java/pull/27261)

### 4.6.1 (2022-02-11)
#### Key Bug Fixes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ private case class ChangeFeedPartitionReader
cosmosAsyncContainer.queryChangeFeed(changeFeedRequestOptions, classOf[ObjectNode])
},
readConfig.maxItemCount,
readConfig.prefetchBufferSize,
operationContextAndListenerTuple
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.read.streaming.ReadLimit
import reactor.util.concurrent.Queues

import java.net.{URI, URISyntaxException, URL}
import java.time.format.DateTimeFormatter
Expand Down Expand Up @@ -44,6 +45,7 @@ private[spark] object CosmosConfigNames {
val UseGatewayMode = "spark.cosmos.useGatewayMode"
val ReadCustomQuery = "spark.cosmos.read.customQuery"
val ReadMaxItemCount = "spark.cosmos.read.maxItemCount"
val ReadPrefetchBufferSize = "spark.cosmos.read.prefetchBufferSize"
val ReadForceEventualConsistency = "spark.cosmos.read.forceEventualConsistency"
val ReadSchemaConversionMode = "spark.cosmos.read.schemaConversionMode"
val ReadInferSchemaSamplingSize = "spark.cosmos.read.inferSchema.samplingSize"
Expand Down Expand Up @@ -96,6 +98,7 @@ private[spark] object CosmosConfigNames {
ReadForceEventualConsistency,
ReadSchemaConversionMode,
ReadMaxItemCount,
ReadPrefetchBufferSize,
ReadInferSchemaSamplingSize,
ReadInferSchemaEnabled,
ReadInferSchemaIncludeSystemProperties,
Expand Down Expand Up @@ -336,6 +339,7 @@ private object CosmosAccountConfig {
private case class CosmosReadConfig(forceEventualConsistency: Boolean,
schemaConversionMode: SchemaConversionMode,
maxItemCount: Int,
prefetchBufferSize: Int,
customQuery: Option[CosmosParameterizedQuery])

private object SchemaConversionModes extends Enumeration {
Expand Down Expand Up @@ -379,17 +383,46 @@ private object CosmosReadConfig {
private val MaxItemCount = CosmosConfigEntry[Int](
key = CosmosConfigNames.ReadMaxItemCount,
mandatory = false,
defaultValue = Some(DefaultMaxItemCount),
defaultValue = None,
parseFromStringFunction = queryText => queryText.toInt,
helpMessage = "The maximum number of documents returned in a single request. The default is 1000.")

private val PrefetchBufferSize = CosmosConfigEntry[Int](
key = CosmosConfigNames.ReadPrefetchBufferSize,
mandatory = false,
defaultValue = None,
parseFromStringFunction = queryText => queryText.toInt,
helpMessage = "The prefetch buffer size - this limits the number of pages (max. 5 MB per page) that are " +
s"prefetched from the Cosmos DB Service. The default is `1` if the '${CosmosConfigNames.ReadMaxItemCount}' " +
"parameter is specified and larger than `1000`, or `8` otherwise. If the provided value is not `1` internally " +
"`reactor.util.concurrent.Queues` will round it to the maximum of 8 and the next power of two. " +
"Examples: (1 -> 1), (2 -> 8), (3 -> 8), (8 -> 8), (9 -> 16), (31 -> 32), (33 -> 64) - " +
"See `reactor.util.concurrent.Queues.get(int)` for more details. This means by the max. memory used for " +
"buffering is 5 MB multiplied by the effective prefetch buffer size for each Executor/CPU-Core.")

def parseCosmosReadConfig(cfg: Map[String, String]): CosmosReadConfig = {
val forceEventualConsistency = CosmosConfigEntry.parse(cfg, ForceEventualConsistency)
val jsonSchemaConversionMode = CosmosConfigEntry.parse(cfg, JsonSchemaConversion)
val customQuery = CosmosConfigEntry.parse(cfg, CustomQuery)
val maxItemCount = CosmosConfigEntry.parse(cfg, MaxItemCount)

CosmosReadConfig(forceEventualConsistency.get, jsonSchemaConversionMode.get, maxItemCount.get, customQuery)
val prefetchBufferSize = CosmosConfigEntry.parse(cfg, PrefetchBufferSize)

CosmosReadConfig(
forceEventualConsistency.get,
jsonSchemaConversionMode.get,
maxItemCount.getOrElse(DefaultMaxItemCount),
prefetchBufferSize.getOrElse(
maxItemCount match {
case Some(itemCountProvidedByUser) => if (itemCountProvidedByUser > DefaultMaxItemCount) {
1
} else {
// Smallest possible number > 1 in Queues.get (2-7 will be rounded to 8)
CosmosConstants.smallestPossibleReactorQueueSizeLargerThanOne
}
case None => 8
}
),
customQuery)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ private object CosmosConstants {
val defaultDirectRequestTimeoutInSeconds = 10L
val feedRangesCacheIntervalInMinutes = 1
val defaultIoThreadCountFactorPerCore = 4
val smallestPossibleQueueSizeLargerThanOne = math.min(8, Queues.XS_BUFFER_SIZE)
val smallestPossibleReactorQueueSizeLargerThanOne = math.min(8, Queues.XS_BUFFER_SIZE)

object Names {
val ItemsDataSourceShortName = "cosmos.oltp"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ private case class ItemsPartitionReader
cosmosAsyncContainer.queryItems(cosmosQuery.toSqlQuerySpec, queryOptions, classOf[ObjectNode])
},
readConfig.maxItemCount,
readConfig.prefetchBufferSize,
operationContextAndListenerTuple
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ private class TransientIOErrorsRetryingIterator
(
val cosmosPagedFluxFactory: String => CosmosPagedFlux[ObjectNode],
val pageSize: Int,
val pagePrefetchBufferSize: Int,
val operationContextAndListener: Option[OperationContextAndListenerTuple]
) extends BufferedIterator[ObjectNode] with BasicLoggingTrait with AutoCloseable {

Expand Down Expand Up @@ -87,10 +88,11 @@ private class TransientIOErrorsRetryingIterator
case Some(oldPagedFlux) => oldPagedFlux.cancelOn(Schedulers.boundedElastic())
case None =>
}
currentFeedResponseIterator = Some(new CosmosPagedIterable[ObjectNode](
currentFeedResponseIterator = Some(
new CosmosPagedIterable[ObjectNode](
newPagedFlux.get,
pageSize,
CosmosConstants.smallestPossibleQueueSizeLargerThanOne
pagePrefetchBufferSize
)
.iterableByPage()
.iterator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,16 +226,91 @@ class CosmosConfigSpec extends UnitSpec {
}

it should "parse read configuration" in {
val userConfig = Map(
var userConfig = Map(
"spark.cosmos.read.forceEventualConsistency" -> "false",
"spark.cosmos.read.schemaConversionMode" -> "Strict"
)

val config = CosmosReadConfig.parseCosmosReadConfig(userConfig)
var config = CosmosReadConfig.parseCosmosReadConfig(userConfig)

config.forceEventualConsistency shouldBe false
config.schemaConversionMode shouldBe SchemaConversionModes.Strict
config.customQuery shouldBe empty
config.maxItemCount shouldBe 1000
config.prefetchBufferSize shouldBe 8

userConfig = Map(
"spark.cosmos.read.forceEventualConsistency" -> "false",
"spark.cosmos.read.schemaConversionMode" -> "Strict",
"spark.cosmos.read.maxItemCount" -> "1000"
)

config = CosmosReadConfig.parseCosmosReadConfig(userConfig)

config.forceEventualConsistency shouldBe false
config.schemaConversionMode shouldBe SchemaConversionModes.Strict
config.customQuery shouldBe empty
config.maxItemCount shouldBe 1000
config.prefetchBufferSize shouldBe 8

userConfig = Map(
"spark.cosmos.read.forceEventualConsistency" -> "false",
"spark.cosmos.read.schemaConversionMode" -> "Strict",
"spark.cosmos.read.maxItemCount" -> "1001",
"spark.cosmos.read.prefetchBufferSize" -> "16"
)

config = CosmosReadConfig.parseCosmosReadConfig(userConfig)

config.forceEventualConsistency shouldBe false
config.schemaConversionMode shouldBe SchemaConversionModes.Strict
config.customQuery shouldBe empty
config.maxItemCount shouldBe 1001
config.prefetchBufferSize shouldBe 16

userConfig = Map(
"spark.cosmos.read.forceEventualConsistency" -> "false",
"spark.cosmos.read.schemaConversionMode" -> "Strict",
"spark.cosmos.read.maxItemCount" -> "1001",
"spark.cosmos.read.prefetchBufferSize" -> "2"
)

config = CosmosReadConfig.parseCosmosReadConfig(userConfig)

config.forceEventualConsistency shouldBe false
config.schemaConversionMode shouldBe SchemaConversionModes.Strict
config.customQuery shouldBe empty
config.maxItemCount shouldBe 1001
config.prefetchBufferSize shouldBe 2 // will be converted/rounded to effectively 8 later at runtime not in config

userConfig = Map(
"spark.cosmos.read.forceEventualConsistency" -> "false",
"spark.cosmos.read.schemaConversionMode" -> "Strict",
"spark.cosmos.read.maxItemCount" -> "1001",
"spark.cosmos.read.prefetchBufferSize" -> "1"
)

config = CosmosReadConfig.parseCosmosReadConfig(userConfig)

config.forceEventualConsistency shouldBe false
config.schemaConversionMode shouldBe SchemaConversionModes.Strict
config.customQuery shouldBe empty
config.maxItemCount shouldBe 1001
config.prefetchBufferSize shouldBe 1

userConfig = Map(
"spark.cosmos.read.forceEventualConsistency" -> "false",
"spark.cosmos.read.schemaConversionMode" -> "Strict",
"spark.cosmos.read.maxItemCount" -> "1001"
)

config = CosmosReadConfig.parseCosmosReadConfig(userConfig)

config.forceEventualConsistency shouldBe false
config.schemaConversionMode shouldBe SchemaConversionModes.Strict
config.customQuery shouldBe empty
config.maxItemCount shouldBe 1001
config.prefetchBufferSize shouldBe 1
}

it should "parse custom query option of read configuration" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package com.azure.cosmos.spark
import com.azure.cosmos.models.CosmosParameterizedQuery
import org.apache.spark.sql.sources.{AlwaysFalse, AlwaysTrue, EqualTo, Filter, In, IsNotNull, IsNull, StringContains, StringEndsWith, StringStartsWith}
import org.assertj.core.api.Assertions.assertThat
import reactor.util.concurrent.Queues
// scalastyle:off underscore.import
import scala.collection.JavaConverters._
// scalastyle:on underscore.import
Expand All @@ -14,7 +15,8 @@ class FilterAnalyzerSpec extends UnitSpec {
//scalastyle:off magic.number

private[this] val readConfigWithoutCustomQuery =
new CosmosReadConfig(true, SchemaConversionModes.Relaxed, 100, None)
new CosmosReadConfig(
true, SchemaConversionModes.Relaxed, 100, Queues.XS_BUFFER_SIZE, None)
private[this] val queryText = "SELECT * FROM c WHERE c.abc='Hello World'"
private[this] val query = Some(CosmosParameterizedQuery(
queryText,
Expand All @@ -24,6 +26,7 @@ class FilterAnalyzerSpec extends UnitSpec {
true,
SchemaConversionModes.Relaxed,
100,
Queues.XS_BUFFER_SIZE,
query)

"many filters" should "be translated to cosmos predicates with AND" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import com.azure.cosmos.spark.TransientIOErrorsRetryingIteratorITest.maxRetryCou
import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait
import com.azure.cosmos.util.CosmosPagedIterable
import com.fasterxml.jackson.databind.node.ObjectNode
import reactor.util.concurrent.Queues

import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
Expand Down Expand Up @@ -71,6 +72,7 @@ class TransientIOErrorsRetryingIteratorITest
})
},
2,
Queues.XS_BUFFER_SIZE,
None
)
retryingIterator.maxRetryIntervalInMs = 5
Expand Down Expand Up @@ -174,6 +176,7 @@ class TransientIOErrorsRetryingIteratorITest
})
},
2,
Queues.XS_BUFFER_SIZE,
None
)
retryingIterator.maxRetryIntervalInMs = 5
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import com.fasterxml.jackson.core.JsonProcessingException
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.node.ObjectNode
import reactor.core.publisher.Flux
import reactor.util.concurrent.Queues

import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong
Expand All @@ -34,6 +35,7 @@ class TransientIOErrorsRetryingIteratorSpec extends UnitSpec with BasicLoggingTr
continuationToken =>generateMockedCosmosPagedFlux(
continuationToken, pageCount, transientErrorCount, injectEmptyPages = false),
pageSize,
1,
None
)
iterator.maxRetryIntervalInMs = 5
Expand All @@ -52,6 +54,7 @@ class TransientIOErrorsRetryingIteratorSpec extends UnitSpec with BasicLoggingTr
continuationToken =>generateMockedCosmosPagedFlux(
continuationToken, pageCount, transientErrorCount, injectEmptyPages = true),
pageSize,
1,
None
)
iterator.maxRetryIntervalInMs = 5
Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#### Bugs Fixed
* Fixed an issue in `CosmosPagedIterable` resulting in excessive memory consumption due to unbounded prefetch of pages when converting the `CosmosPagedIterable` into an `Iterator<FeedResponse<T>>`. - See [PR 27237](https://github.com/Azure/azure-sdk-for-java/pull/27237)
* Fixed a `NullPointerException` in `CosmosDiagnostics isDiagnosticsCapturedInPagedFlux` - See [PR 27261](https://github.com/Azure/azure-sdk-for-java/pull/27261)

#### Other Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,19 @@ private AtomicBoolean isDiagnosticsCapturedInPagedFlux(){
new ImplementationBridgeHelpers.CosmosDiagnosticsHelper.CosmosDiagnosticsAccessor() {
@Override
public FeedResponseDiagnostics getFeedResponseDiagnostics(CosmosDiagnostics cosmosDiagnostics) {
if (cosmosDiagnostics != null) {
return cosmosDiagnostics.getFeedResponseDiagnostics();
if (cosmosDiagnostics == null) {
return null;
}

return null;
return cosmosDiagnostics.getFeedResponseDiagnostics();
}

@Override
public AtomicBoolean isDiagnosticsCapturedInPagedFlux(CosmosDiagnostics cosmosDiagnostics) {
if (cosmosDiagnostics == null) {
return null;
}

return cosmosDiagnostics.isDiagnosticsCapturedInPagedFlux();
}
});
Expand Down
Loading

0 comments on commit 73b4974

Please sign in to comment.