diff --git a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md index cacc17dcc668b..173310fce1dd1 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md @@ -7,6 +7,7 @@ #### Breaking Changes #### Bugs Fixed +* Fixed `SecurityException` with message `java.lang.SecurityException: class "org.apache.spark.SparkInternalsBridge$"'s signer information does not match signer information of other classes in the same package` when deploying the Spark connector in Databricks by copying it directly to `/databricks/jars` instead of going through the usual deployment APIs or UI-deployment. To fix this issue, instead of using a `bridge-approach` reflection is used to use the internal API necessary to publish custom metrics. See [PR 37934](https://github.com/Azure/azure-sdk-for-java/pull/37934) #### Other Changes diff --git a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/main/scala/com/azure/cosmos/spark/CosmosWriter.scala b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/main/scala/com/azure/cosmos/spark/CosmosWriter.scala index b3040bf8e4a88..93e2ff5a887d8 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/main/scala/com/azure/cosmos/spark/CosmosWriter.scala +++ b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/main/scala/com/azure/cosmos/spark/CosmosWriter.scala @@ -5,7 +5,6 @@ package com.azure.cosmos.spark import com.azure.cosmos.CosmosDiagnosticsContext import com.azure.cosmos.implementation.ImplementationBridgeHelpers -import org.apache.spark.SparkInternalsBridge import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.write.WriterCommitMessage diff --git a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/main/scala/com/azure/cosmos/spark/SparkInternalsBridge.scala b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/main/scala/com/azure/cosmos/spark/SparkInternalsBridge.scala new file mode 100644 index 0000000000000..10537c098aa2d --- /dev/null +++ b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/main/scala/com/azure/cosmos/spark/SparkInternalsBridge.scala @@ -0,0 +1,125 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.spark + +import com.azure.cosmos.implementation.guava25.base.MoreObjects.firstNonNull +import com.azure.cosmos.implementation.guava25.base.Strings.emptyToNull +import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait +import org.apache.spark.TaskContext + +import java.lang.reflect.Method +import java.util.Locale +import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} + +//scalastyle:off multiple.string.literals +object SparkInternalsBridge extends BasicLoggingTrait { + private val SPARK_REFLECTION_ACCESS_ALLOWED_PROPERTY = "COSMOS.SPARK_REFLECTION_ACCESS_ALLOWED" + private val SPARK_REFLECTION_ACCESS_ALLOWED_VARIABLE = "COSMOS_SPARK_REFLECTION_ACCESS_ALLOWED" + + private val DEFAULT_SPARK_REFLECTION_ACCESS_ALLOWED = true + + val NUM_ROWS_PER_UPDATE = 100 + private val outputMetricsMethod: AtomicReference[Method] = new AtomicReference[Method]() + private val setBytesWrittenMethod: AtomicReference[Method] = new AtomicReference[Method]() + private val setRecordsWrittenMethod: AtomicReference[Method] = new AtomicReference[Method]() + + private def getSparkReflectionAccessAllowed: Boolean = { + val allowedText = System.getProperty( + SPARK_REFLECTION_ACCESS_ALLOWED_PROPERTY, + firstNonNull( + emptyToNull(System.getenv.get(SPARK_REFLECTION_ACCESS_ALLOWED_VARIABLE)), + String.valueOf(DEFAULT_SPARK_REFLECTION_ACCESS_ALLOWED))) + + try { + java.lang.Boolean.valueOf(allowedText.toUpperCase(Locale.ROOT)) + } + catch { + case e: Exception => + logError(s"Parsing spark reflection access allowed $allowedText failed. Using the default $DEFAULT_SPARK_REFLECTION_ACCESS_ALLOWED.", e) + DEFAULT_SPARK_REFLECTION_ACCESS_ALLOWED + } + } + + private final lazy val reflectionAccessAllowed = new AtomicBoolean(getSparkReflectionAccessAllowed) + + private def getOutputMetrics(taskCtx: TaskContext): Option[Object] = { + try { + val taskMetrics: Object = taskCtx.taskMetrics() + + val method = Option(outputMetricsMethod.get) match { + case Some(existing) => existing + case None => + val newMethod = taskMetrics.getClass.getMethod("outputMetrics") + newMethod.setAccessible(true) + outputMetricsMethod.set(newMethod) + newMethod + } + + val outputMetrics = method.invoke(taskMetrics) + Option(outputMetrics) + } catch { + case e: Exception => + logInfo(s"Could not invoke getOutputMetrics via reflection - Error ${e.getMessage}", e) + + // reflection failed - disabling it for the future + reflectionAccessAllowed.set(false) + None + } + } + + private def setBytesWritten(outputMetrics: Object, metricValue: Object): Unit = { + try { + val method = Option(setBytesWrittenMethod.get) match { + case Some(existing) => existing + case None => + val newMethod = outputMetrics.getClass.getMethod("setBytesWritten", java.lang.Long.TYPE) + newMethod.setAccessible(true) + setBytesWrittenMethod.set(newMethod) + newMethod + } + + method.invoke(outputMetrics, metricValue) + } catch { + case e: Exception => + logInfo(s"Could not invoke setBytesWritten via reflection - Error ${e.getMessage}", e) + + // reflection failed - disabling it for the future + reflectionAccessAllowed.set(false) + } + } + + private def setRecordsWritten(outputMetrics: Object, metricValue: Object): Unit = { + try { + val method = Option(setRecordsWrittenMethod.get) match { + case Some(existing) => existing + case None => + val newMethod = outputMetrics.getClass.getMethod("setRecordsWritten", java.lang.Long.TYPE) + newMethod.setAccessible(true) + setRecordsWrittenMethod.set(newMethod) + newMethod + } + method.invoke(outputMetrics, metricValue) + } catch { + case e: Exception => + logInfo(s"Could not invoke setRecordsWritten via reflection - Error ${e.getMessage}", e) + + // reflection failed - disabling it for the future + reflectionAccessAllowed.set(false) + } + } + + def updateInternalTaskMetrics(recordsWrittenSnapshot: Long, bytesWrittenSnapshot: Long): Unit = { + if (reflectionAccessAllowed.get) { + Option(TaskContext.get()) match { + case Some(taskContext) => + getOutputMetrics(taskContext) match { + case Some(outputMetrics) => + setRecordsWritten(outputMetrics, recordsWrittenSnapshot.asInstanceOf[Object]) + setBytesWritten(outputMetrics, bytesWrittenSnapshot.asInstanceOf[Object]) + case None => + } + } + } + } +} +//scalastyle:on multiple.string.literals diff --git a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/main/scala/org/apache/spark/SparkInternalsBridge.scala b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/main/scala/org/apache/spark/SparkInternalsBridge.scala deleted file mode 100644 index 1a501859fa7c6..0000000000000 --- a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/main/scala/org/apache/spark/SparkInternalsBridge.scala +++ /dev/null @@ -1,18 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. -package org.apache.spark - -object SparkInternalsBridge { - - val NUM_ROWS_PER_UPDATE = 100 - - def updateInternalTaskMetrics(recordsWrittenSnapshot: Long, bytesWrittenSnapshot: Long): Unit = { - Option(TaskContext.get()) match { - case Some(taskContext) => - val outputMetrics = taskContext.taskMetrics.outputMetrics - outputMetrics.setRecordsWritten(recordsWrittenSnapshot) - outputMetrics.setBytesWritten(bytesWrittenSnapshot) - case None => - } - } -} diff --git a/sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md index 9191f25d6649a..d8c87240a17e2 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md @@ -7,6 +7,7 @@ #### Breaking Changes #### Bugs Fixed +* Fixed `SecurityException` with message `java.lang.SecurityException: class "org.apache.spark.SparkInternalsBridge$"'s signer information does not match signer information of other classes in the same package` when deploying the Spark connector in Databricks by copying it directly to `/databricks/jars` instead of going through the usual deployment APIs or UI-deployment. To fix this issue, instead of using a `bridge-approach` reflection is used to use the internal API necessary to publish custom metrics. See [PR 37934](https://github.com/Azure/azure-sdk-for-java/pull/37934) #### Other Changes diff --git a/sdk/cosmos/azure-cosmos-spark_3-2_2-12/src/main/scala/com/azure/cosmos/spark/CosmosWriter.scala b/sdk/cosmos/azure-cosmos-spark_3-2_2-12/src/main/scala/com/azure/cosmos/spark/CosmosWriter.scala index 614199fa6e86f..edf9451faad72 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-2_2-12/src/main/scala/com/azure/cosmos/spark/CosmosWriter.scala +++ b/sdk/cosmos/azure-cosmos-spark_3-2_2-12/src/main/scala/com/azure/cosmos/spark/CosmosWriter.scala @@ -5,7 +5,6 @@ package com.azure.cosmos.spark import com.azure.cosmos.CosmosDiagnosticsContext import com.azure.cosmos.implementation.ImplementationBridgeHelpers -import org.apache.spark.SparkInternalsBridge import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.metric.CustomTaskMetric diff --git a/sdk/cosmos/azure-cosmos-spark_3-2_2-12/src/main/scala/com/azure/cosmos/spark/SparkInternalsBridge.scala b/sdk/cosmos/azure-cosmos-spark_3-2_2-12/src/main/scala/com/azure/cosmos/spark/SparkInternalsBridge.scala new file mode 100644 index 0000000000000..96835661e0abb --- /dev/null +++ b/sdk/cosmos/azure-cosmos-spark_3-2_2-12/src/main/scala/com/azure/cosmos/spark/SparkInternalsBridge.scala @@ -0,0 +1,196 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.spark + +import com.azure.cosmos.implementation.guava25.base.MoreObjects.firstNonNull +import com.azure.cosmos.implementation.guava25.base.Strings.emptyToNull +import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait +import org.apache.spark.TaskContext +import org.apache.spark.sql.connector.metric.CustomTaskMetric +import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.util.AccumulatorV2 + +import java.lang.reflect.Method +import java.util.Locale +import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} +import scala.collection.mutable.ArrayBuffer + +//scalastyle:off multiple.string.literals +object SparkInternalsBridge extends BasicLoggingTrait { + private val SPARK_REFLECTION_ACCESS_ALLOWED_PROPERTY = "COSMOS.SPARK_REFLECTION_ACCESS_ALLOWED" + private val SPARK_REFLECTION_ACCESS_ALLOWED_VARIABLE = "COSMOS_SPARK_REFLECTION_ACCESS_ALLOWED" + + private val DEFAULT_SPARK_REFLECTION_ACCESS_ALLOWED = true + val NUM_ROWS_PER_UPDATE = 100 + + private val BUILTIN_OUTPUT_METRICS = Set("bytesWritten", "recordsWritten") + + private val accumulatorsMethod : AtomicReference[Method] = new AtomicReference[Method]() + private val outputMetricsMethod : AtomicReference[Method] = new AtomicReference[Method]() + private val setBytesWrittenMethod : AtomicReference[Method] = new AtomicReference[Method]() + private val setRecordsWrittenMethod : AtomicReference[Method] = new AtomicReference[Method]() + + private def getSparkReflectionAccessAllowed: Boolean = { + val allowedText = System.getProperty( + SPARK_REFLECTION_ACCESS_ALLOWED_PROPERTY, + firstNonNull( + emptyToNull(System.getenv.get(SPARK_REFLECTION_ACCESS_ALLOWED_VARIABLE)), + String.valueOf(DEFAULT_SPARK_REFLECTION_ACCESS_ALLOWED))) + + try { + java.lang.Boolean.valueOf(allowedText.toUpperCase(Locale.ROOT)) + } + catch { + case e: Exception => + logError(s"Parsing spark reflection access allowed $allowedText failed. Using the default $DEFAULT_SPARK_REFLECTION_ACCESS_ALLOWED.", e) + DEFAULT_SPARK_REFLECTION_ACCESS_ALLOWED + } + } + + private final lazy val reflectionAccessAllowed = new AtomicBoolean(getSparkReflectionAccessAllowed) + + def getInternalCustomTaskMetricsAsSQLMetric(knownCosmosMetricNames: Set[String]): Map[String, SQLMetric] = { + if (!reflectionAccessAllowed.get) { + Map.empty[String, SQLMetric] + } else { + Option.apply(TaskContext.get()) match { + case Some(taskCtx) => getInternalCustomTaskMetricsAsSQLMetricInternal(knownCosmosMetricNames, taskCtx) + case None => Map.empty[String, SQLMetric] + } + } + } + + private def getAccumulators(taskCtx: TaskContext): Option[ArrayBuffer[AccumulatorV2[_, _]]] = { + try { + val taskMetrics: Object = taskCtx.taskMetrics() + val method = Option(accumulatorsMethod.get) match { + case Some(existing) => existing + case None => + val newMethod = taskMetrics.getClass.getMethod("externalAccums") + newMethod.setAccessible(true) + accumulatorsMethod.set(newMethod) + newMethod + } + + val accums = method.invoke(taskMetrics).asInstanceOf[ArrayBuffer[AccumulatorV2[_, _]]] + + Some(accums) + } catch { + case e: Exception => + logInfo(s"Could not invoke getAccumulators via reflection - Error ${e.getMessage}", e) + + // reflection failed - disabling it for the future + reflectionAccessAllowed.set(false) + None + } + } + + private def getOutputMetrics(taskCtx: TaskContext): Option[Object] = { + try { + val taskMetrics: Object = taskCtx.taskMetrics() + + val method = Option(outputMetricsMethod.get) match { + case Some(existing) => existing + case None => + val newMethod = taskMetrics.getClass.getMethod("outputMetrics") + newMethod.setAccessible(true) + outputMetricsMethod.set(newMethod) + newMethod + } + + val outputMetrics = method.invoke(taskMetrics) + Option(outputMetrics) + } catch { + case e: Exception => + logInfo(s"Could not invoke getOutputMetrics via reflection - Error ${e.getMessage}", e) + + // reflection failed - disabling it for the future + reflectionAccessAllowed.set(false) + None + } + } + + private def setBytesWritten(outputMetrics: Object, metricValue: Object): Unit = { + try { + val method = Option(setBytesWrittenMethod.get) match { + case Some(existing) => existing + case None => + val newMethod = outputMetrics.getClass.getMethod("setBytesWritten", java.lang.Long.TYPE) + newMethod.setAccessible(true) + setBytesWrittenMethod.set(newMethod) + newMethod + } + + method.invoke(outputMetrics, metricValue) + } catch { + case e: Exception => + logInfo(s"Could not invoke setBytesWritten via reflection - Error ${e.getMessage}", e) + + // reflection failed - disabling it for the future + reflectionAccessAllowed.set(false) + } + } + + private def setRecordsWritten(outputMetrics: Object, metricValue: Object): Unit = { + try { + val method = Option(setRecordsWrittenMethod.get) match { + case Some(existing) => existing + case None => + val newMethod = outputMetrics.getClass.getMethod("setRecordsWritten", java.lang.Long.TYPE) + newMethod.setAccessible(true) + setRecordsWrittenMethod.set(newMethod) + newMethod + } + method.invoke(outputMetrics, metricValue) + } catch { + case e: Exception => + logInfo(s"Could not invoke setRecordsWritten via reflection - Error ${e.getMessage}", e) + + // reflection failed - disabling it for the future + reflectionAccessAllowed.set(false) + } + } + + private def getInternalCustomTaskMetricsAsSQLMetricInternal( + knownCosmosMetricNames: Set[String], + taskCtx: TaskContext): Map[String, SQLMetric] = { + getAccumulators(taskCtx) match { + case Some(accumulators) => accumulators + .filter(accumulable => accumulable.isInstanceOf[SQLMetric] + && accumulable.name.isDefined + && knownCosmosMetricNames.contains(accumulable.name.get)) + .map(accumulable => { + val sqlMetric = accumulable.asInstanceOf[SQLMetric] + sqlMetric.name.get -> sqlMetric + }) + .toMap[String, SQLMetric] + case None => Map.empty[String, SQLMetric] + } + } + + def updateInternalTaskMetrics(currentMetricsValues: Seq[CustomTaskMetric]): Unit = { + if (reflectionAccessAllowed.get) { + currentMetricsValues.foreach { metric => + val metricName = metric.name() + val metricValue = metric.value() + + if (BUILTIN_OUTPUT_METRICS.contains(metricName)) { + Option(TaskContext.get()).map(getOutputMetrics).foreach { outputMetricsOption => + + outputMetricsOption match { + case Some(outputMetrics) => + + metricName match { + case "bytesWritten" => setBytesWritten(outputMetrics, metricValue.asInstanceOf[Object]) + case "recordsWritten" => setRecordsWritten(outputMetrics, metricValue.asInstanceOf[Object]) + case _ => // no-op + } + case None => + } + } + } + } + } + } +} +//scalastyle:on multiple.string.literals diff --git a/sdk/cosmos/azure-cosmos-spark_3-2_2-12/src/main/scala/org/apache/spark/SparkInternalsBridge.scala b/sdk/cosmos/azure-cosmos-spark_3-2_2-12/src/main/scala/org/apache/spark/SparkInternalsBridge.scala deleted file mode 100644 index 3037021c52efe..0000000000000 --- a/sdk/cosmos/azure-cosmos-spark_3-2_2-12/src/main/scala/org/apache/spark/SparkInternalsBridge.scala +++ /dev/null @@ -1,47 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. -package org.apache.spark - -import org.apache.spark.sql.connector.metric.CustomTaskMetric -import org.apache.spark.sql.execution.metric.SQLMetric - -object SparkInternalsBridge { - - val NUM_ROWS_PER_UPDATE = 100 - - private val BUILTIN_OUTPUT_METRICS = Set("bytesWritten", "recordsWritten") - - def getInternalCustomTaskMetricsAsSQLMetric(knownCosmosMetricNames: Set[String]) : Map[String, SQLMetric] = { - Option.apply(TaskContext.get()) match { - case Some(taskCtx) => taskCtx - .taskMetrics() - .externalAccums - .filter(accumulable => accumulable.isInstanceOf[SQLMetric] - && accumulable.name.isDefined - && knownCosmosMetricNames.contains(accumulable.name.get)) - .map(accumulable => { - val sqlMetric = accumulable.asInstanceOf[SQLMetric] - sqlMetric.name.get -> sqlMetric - }) - .toMap[String, SQLMetric] - case None => Map.empty[String, SQLMetric] - } - } - - def updateInternalTaskMetrics(currentMetricsValues: Seq[CustomTaskMetric]): Unit = { - currentMetricsValues.foreach { metric => - val metricName = metric.name() - val metricValue = metric.value() - - if (BUILTIN_OUTPUT_METRICS.contains(metricName)) { - Option(TaskContext.get()).map(_.taskMetrics().outputMetrics).foreach { outputMetrics => - metricName match { - case "bytesWritten" => outputMetrics.setBytesWritten(metricValue) - case "recordsWritten" => outputMetrics.setRecordsWritten(metricValue) - case _ => // no-op - } - } - } - } - } -} diff --git a/sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md index 7c782efad6b46..981103b6c8fcd 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md @@ -7,6 +7,7 @@ #### Breaking Changes #### Bugs Fixed +* Fixed `SecurityException` with message `java.lang.SecurityException: class "org.apache.spark.SparkInternalsBridge$"'s signer information does not match signer information of other classes in the same package` when deploying the Spark connector in Databricks by copying it directly to `/databricks/jars` instead of going through the usual deployment APIs or UI-deployment. To fix this issue, instead of using a `bridge-approach` reflection is used to use the internal API necessary to publish custom metrics. See [PR 37934](https://github.com/Azure/azure-sdk-for-java/pull/37934) #### Other Changes diff --git a/sdk/cosmos/azure-cosmos-spark_3-3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosWriter.scala b/sdk/cosmos/azure-cosmos-spark_3-3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosWriter.scala index c915860a1a5bc..042c6ca5636ee 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosWriter.scala +++ b/sdk/cosmos/azure-cosmos-spark_3-3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosWriter.scala @@ -5,7 +5,6 @@ package com.azure.cosmos.spark import com.azure.cosmos.CosmosDiagnosticsContext import com.azure.cosmos.implementation.ImplementationBridgeHelpers -import org.apache.spark.SparkInternalsBridge import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.connector.metric.CustomTaskMetric import org.apache.spark.sql.connector.write.WriterCommitMessage diff --git a/sdk/cosmos/azure-cosmos-spark_3-3_2-12/src/main/scala/com/azure/cosmos/spark/SparkInternalsBridge.scala b/sdk/cosmos/azure-cosmos-spark_3-3_2-12/src/main/scala/com/azure/cosmos/spark/SparkInternalsBridge.scala new file mode 100644 index 0000000000000..8ee9ecb84edd6 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-spark_3-3_2-12/src/main/scala/com/azure/cosmos/spark/SparkInternalsBridge.scala @@ -0,0 +1,96 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.spark + +import com.azure.cosmos.implementation.guava25.base.MoreObjects.firstNonNull +import com.azure.cosmos.implementation.guava25.base.Strings.emptyToNull +import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait +import org.apache.spark.TaskContext +import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.util.AccumulatorV2 + +import java.lang.reflect.Method +import java.util.Locale +import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} +import scala.collection.mutable.ArrayBuffer + +object SparkInternalsBridge extends BasicLoggingTrait { + private val SPARK_REFLECTION_ACCESS_ALLOWED_PROPERTY = "COSMOS.SPARK_REFLECTION_ACCESS_ALLOWED" + private val SPARK_REFLECTION_ACCESS_ALLOWED_VARIABLE = "COSMOS_SPARK_REFLECTION_ACCESS_ALLOWED" + + private val DEFAULT_SPARK_REFLECTION_ACCESS_ALLOWED = true + private val accumulatorsMethod : AtomicReference[Method] = new AtomicReference[Method]() + + private def getSparkReflectionAccessAllowed: Boolean = { + val allowedText = System.getProperty( + SPARK_REFLECTION_ACCESS_ALLOWED_PROPERTY, + firstNonNull( + emptyToNull(System.getenv.get(SPARK_REFLECTION_ACCESS_ALLOWED_VARIABLE)), + String.valueOf(DEFAULT_SPARK_REFLECTION_ACCESS_ALLOWED))) + + try { + java.lang.Boolean.valueOf(allowedText.toUpperCase(Locale.ROOT)) + } + catch { + case e: Exception => + logError(s"Parsing spark reflection access allowed $allowedText failed. Using the default $DEFAULT_SPARK_REFLECTION_ACCESS_ALLOWED.", e) + DEFAULT_SPARK_REFLECTION_ACCESS_ALLOWED + } + } + + private final lazy val reflectionAccessAllowed = new AtomicBoolean(getSparkReflectionAccessAllowed) + + def getInternalCustomTaskMetricsAsSQLMetric(knownCosmosMetricNames: Set[String]): Map[String, SQLMetric] = { + + if (!reflectionAccessAllowed.get) { + Map.empty[String, SQLMetric] + } else { + Option.apply(TaskContext.get()) match { + case Some(taskCtx) => getInternalCustomTaskMetricsAsSQLMetricInternal(knownCosmosMetricNames, taskCtx) + case None => Map.empty[String, SQLMetric] + } + } + } + + private def getAccumulators(taskCtx: TaskContext): Option[ArrayBuffer[AccumulatorV2[_, _]]] = { + try { + val taskMetrics: Object = taskCtx.taskMetrics() + val method = Option(accumulatorsMethod.get) match { + case Some(existing) => existing + case None => + val newMethod = taskMetrics.getClass.getMethod("externalAccums") + newMethod.setAccessible(true) + accumulatorsMethod.set(newMethod) + newMethod + } + + val accums = method.invoke(taskMetrics).asInstanceOf[ArrayBuffer[AccumulatorV2[_, _]]] + + Some(accums) + } catch { + case e: Exception => + logInfo(s"Could not invoke getAccumulators via reflection - Error ${e.getMessage}", e) + + // reflection failed - disabling it for the future + reflectionAccessAllowed.set(false) + None + } + } + + private def getInternalCustomTaskMetricsAsSQLMetricInternal( + knownCosmosMetricNames: Set[String], + taskCtx: TaskContext): Map[String, SQLMetric] = { + getAccumulators(taskCtx) match { + case Some(accumulators) => accumulators + .filter(accumulable => accumulable.isInstanceOf[SQLMetric] + && accumulable.name.isDefined + && knownCosmosMetricNames.contains(accumulable.name.get)) + .map(accumulable => { + val sqlMetric = accumulable.asInstanceOf[SQLMetric] + sqlMetric.name.get -> sqlMetric + }) + .toMap[String, SQLMetric] + case None => Map.empty[String, SQLMetric] + } + } +} diff --git a/sdk/cosmos/azure-cosmos-spark_3-3_2-12/src/main/scala/org/apache/spark/SparkInternalsBridge.scala b/sdk/cosmos/azure-cosmos-spark_3-3_2-12/src/main/scala/org/apache/spark/SparkInternalsBridge.scala deleted file mode 100644 index 94bf410c9244f..0000000000000 --- a/sdk/cosmos/azure-cosmos-spark_3-3_2-12/src/main/scala/org/apache/spark/SparkInternalsBridge.scala +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. -package org.apache.spark - -import org.apache.spark.sql.execution.metric.SQLMetric - -object SparkInternalsBridge { - def getInternalCustomTaskMetricsAsSQLMetric(knownCosmosMetricNames: Set[String]) : Map[String, SQLMetric] = { - Option.apply(TaskContext.get()) match { - case Some(taskCtx) => taskCtx - .taskMetrics() - .externalAccums - .filter(accumulable => accumulable.isInstanceOf[SQLMetric] - && accumulable.name.isDefined - && knownCosmosMetricNames.contains(accumulable.name.get)) - .map(accumulable => { - val sqlMetric = accumulable.asInstanceOf[SQLMetric] - sqlMetric.name.get -> sqlMetric - }) - .toMap[String, SQLMetric] - case None => Map.empty[String, SQLMetric] - } - } -} diff --git a/sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md index 8b7e5efecfd1f..fa11050831588 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md @@ -7,6 +7,7 @@ #### Breaking Changes #### Bugs Fixed +* Fixed `SecurityException` with message `java.lang.SecurityException: class "org.apache.spark.SparkInternalsBridge$"'s signer information does not match signer information of other classes in the same package` when deploying the Spark connector in Databricks by copying it directly to `/databricks/jars` instead of going through the usual deployment APIs or UI-deployment. To fix this issue, instead of using a `bridge-approach` reflection is used to use the internal API necessary to publish custom metrics. See [PR 37934](https://github.com/Azure/azure-sdk-for-java/pull/37934) #### Other Changes diff --git a/sdk/cosmos/azure-cosmos-spark_3-4_2-12/src/main/scala/com/azure/cosmos/spark/CosmosWriter.scala b/sdk/cosmos/azure-cosmos-spark_3-4_2-12/src/main/scala/com/azure/cosmos/spark/CosmosWriter.scala index c915860a1a5bc..042c6ca5636ee 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-4_2-12/src/main/scala/com/azure/cosmos/spark/CosmosWriter.scala +++ b/sdk/cosmos/azure-cosmos-spark_3-4_2-12/src/main/scala/com/azure/cosmos/spark/CosmosWriter.scala @@ -5,7 +5,6 @@ package com.azure.cosmos.spark import com.azure.cosmos.CosmosDiagnosticsContext import com.azure.cosmos.implementation.ImplementationBridgeHelpers -import org.apache.spark.SparkInternalsBridge import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.connector.metric.CustomTaskMetric import org.apache.spark.sql.connector.write.WriterCommitMessage diff --git a/sdk/cosmos/azure-cosmos-spark_3-4_2-12/src/main/scala/com/azure/cosmos/spark/SparkInternalsBridge.scala b/sdk/cosmos/azure-cosmos-spark_3-4_2-12/src/main/scala/com/azure/cosmos/spark/SparkInternalsBridge.scala new file mode 100644 index 0000000000000..3d4050c3ee595 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-spark_3-4_2-12/src/main/scala/com/azure/cosmos/spark/SparkInternalsBridge.scala @@ -0,0 +1,95 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.spark + +import com.azure.cosmos.implementation.guava25.base.MoreObjects.firstNonNull +import com.azure.cosmos.implementation.guava25.base.Strings.emptyToNull +import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait +import org.apache.spark.TaskContext +import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.util.AccumulatorV2 + +import java.lang.reflect.Method +import java.util.Locale +import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} + +object SparkInternalsBridge extends BasicLoggingTrait { + private val SPARK_REFLECTION_ACCESS_ALLOWED_PROPERTY = "COSMOS.SPARK_REFLECTION_ACCESS_ALLOWED" + private val SPARK_REFLECTION_ACCESS_ALLOWED_VARIABLE = "COSMOS_SPARK_REFLECTION_ACCESS_ALLOWED" + + private val DEFAULT_SPARK_REFLECTION_ACCESS_ALLOWED = true + private val accumulatorsMethod : AtomicReference[Method] = new AtomicReference[Method]() + + private def getSparkReflectionAccessAllowed: Boolean = { + val allowedText = System.getProperty( + SPARK_REFLECTION_ACCESS_ALLOWED_PROPERTY, + firstNonNull( + emptyToNull(System.getenv.get(SPARK_REFLECTION_ACCESS_ALLOWED_VARIABLE)), + String.valueOf(DEFAULT_SPARK_REFLECTION_ACCESS_ALLOWED))) + + try { + java.lang.Boolean.valueOf(allowedText.toUpperCase(Locale.ROOT)) + } + catch { + case e: Exception => + logError(s"Parsing spark reflection access allowed $allowedText failed. Using the default $DEFAULT_SPARK_REFLECTION_ACCESS_ALLOWED.", e) + DEFAULT_SPARK_REFLECTION_ACCESS_ALLOWED + } + } + + private final lazy val reflectionAccessAllowed = new AtomicBoolean(getSparkReflectionAccessAllowed) + + def getInternalCustomTaskMetricsAsSQLMetric(knownCosmosMetricNames: Set[String]) : Map[String, SQLMetric] = { + + if (!reflectionAccessAllowed.get) { + Map.empty[String, SQLMetric] + } else { + Option.apply(TaskContext.get()) match { + case Some(taskCtx) => getInternalCustomTaskMetricsAsSQLMetricInternal(knownCosmosMetricNames, taskCtx) + case None => Map.empty[String, SQLMetric] + } + } + } + + private def getAccumulators(taskCtx: TaskContext): Option[Seq[AccumulatorV2[_, _]]] = { + try { + val taskMetrics: Object = taskCtx.taskMetrics() + val method = Option(accumulatorsMethod.get) match { + case Some(existing) => existing + case None => + val newMethod = taskMetrics.getClass.getMethod("accumulators") + newMethod.setAccessible(true) + accumulatorsMethod.set(newMethod) + newMethod + } + + val accums = method.invoke(taskMetrics).asInstanceOf[Seq[AccumulatorV2[_, _]]] + + Some(accums) + } catch { + case e: Exception => + logInfo(s"Could not invoke getAccumulators via reflection - Error ${e.getMessage}", e) + + // reflection failed - disabling it for the future + reflectionAccessAllowed.set(false) + None + } + } + + private def getInternalCustomTaskMetricsAsSQLMetricInternal( + knownCosmosMetricNames: Set[String], + taskCtx: TaskContext): Map[String, SQLMetric] = { + getAccumulators(taskCtx) match { + case Some(accumulators) => accumulators + .filter(accumulable => accumulable.isInstanceOf[SQLMetric] + && accumulable.name.isDefined + && knownCosmosMetricNames.contains(accumulable.name.get)) + .map(accumulable => { + val sqlMetric = accumulable.asInstanceOf[SQLMetric] + sqlMetric.name.get -> sqlMetric + }) + .toMap[String, SQLMetric] + case None => Map.empty[String, SQLMetric] + } + } +} diff --git a/sdk/cosmos/azure-cosmos-spark_3-4_2-12/src/main/scala/org/apache/spark/SparkInternalsBridge.scala b/sdk/cosmos/azure-cosmos-spark_3-4_2-12/src/main/scala/org/apache/spark/SparkInternalsBridge.scala deleted file mode 100644 index c9355439f03b2..0000000000000 --- a/sdk/cosmos/azure-cosmos-spark_3-4_2-12/src/main/scala/org/apache/spark/SparkInternalsBridge.scala +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. -package org.apache.spark - -import org.apache.spark.sql.execution.metric.SQLMetric - -object SparkInternalsBridge { - def getInternalCustomTaskMetricsAsSQLMetric(knownCosmosMetricNames: Set[String]) : Map[String, SQLMetric] = { - Option.apply(TaskContext.get()) match { - case Some(taskCtx) => taskCtx - .taskMetrics() - .accumulators() - .filter(accumulable => accumulable.isInstanceOf[SQLMetric] - && accumulable.name.isDefined - && knownCosmosMetricNames.contains(accumulable.name.get)) - .map(accumulable => { - val sqlMetric = accumulable.asInstanceOf[SQLMetric] - sqlMetric.name.get -> sqlMetric - }) - .toMap[String, SQLMetric] - case None => Map.empty[String, SQLMetric] - } - } -}