Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Random cast error while writting to CosmosDB with pyspark #42329

Open
Uranium2 opened this issue Oct 14, 2024 · 2 comments
Open

[BUG] Random cast error while writting to CosmosDB with pyspark #42329

Uranium2 opened this issue Oct 14, 2024 · 2 comments
Labels
Client This issue points to a problem in the data-plane of the library. Cosmos customer-reported Issues that are reported by GitHub users external to the Azure organization. needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team question The issue doesn't require a change to the product in order to be resolved. Most issues start as that Service Attention Workflow: This issue is responsible by Azure service team.

Comments

@Uranium2
Copy link

Uranium2 commented Oct 14, 2024

Describe the bug
I have some pyspark code that loads a csv file and appens the data into a CosmosDB container. Sometime I get an error, I can rerun the code and it can work even if the data or the cluster did not change.

Before I was using Databricks Runtime 10.4 TLS with an other version of cosmosdb library into the cluster. And I never had this issue. The issue started to occure only when I've updated databricks runtime to 12.2 TLS + azure-cosmos-spark_3-3_2-12:4.30.0

Exception or Stack Trace

An error occurred while calling o76.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 87 in stage 9.0 failed 4 times, most recent failure: Lost task 87.3 in stage 9.0 (TID 197) (10.34.208.176 executor 14): java.lang.ClassCastException: com.azure.cosmos.spark.CosmosClientMetadataCachesSnapshots cannot be cast to com.azure.cosmos.spark.CosmosClientMetadataCachesSnapshots
        at com.azure.cosmos.spark.CosmosWriterBase.<init>(CosmosWriterBase.scala:39)
        at com.azure.cosmos.spark.CosmosWriter.<init>(CosmosWriter.scala:33)
        at com.azure.cosmos.spark.ItemsDataWriteFactory.createWriter(ItemsDataWriteFactory.scala:51)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:477)
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:432)
        at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
        at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
        at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
        at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
        at org.apache.spark.scheduler.Task.doRunTask(Task.scala:179)
        at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:142)
        at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:126)
        at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:142)
        at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
        at org.apache.spark.scheduler.Task.run(Task.scala:97)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:904)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1740)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:907)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:761)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3440)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3362)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3351)
        at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3351)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1460)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1460)
        at scala.Option.foreach(Option.scala:407)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1460)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3651)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3589)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3577)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:51)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$runJob$1(DAGScheduler.scala:1209)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1197)
        at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2758)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2741)
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:428)
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:404)
        at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.writeWithV2(WriteToDataSourceV2Exec.scala:290)
        at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run(WriteToDataSourceV2Exec.scala:383)
        at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run$(WriteToDataSourceV2Exec.scala:382)
        at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:290)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.$anonfun$result$1(V2CommandExec.scala:47)
        at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:47)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:45)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:54)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$3(QueryExecution.scala:256)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:165)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$2(QueryExecution.scala:256)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$9(SQLExecution.scala:258)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:448)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:203)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1073)
        at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:131)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:398)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$1(QueryExecution.scala:255)
        at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$withMVTagsIfNecessary(QueryExecution.scala:238)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:251)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:244)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:519)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:106)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:519)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:339)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:335)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:495)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$1(QueryExecution.scala:244)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:395)
        at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:244)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:198)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:189)
        at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:305)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:964)
        at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:346)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:258)
        at com.databricks.service.DataFrameWriteCommand.run(DataFrameWriteCommand.scala:70)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.$anonfun$sideEffectResult$1(commands.scala:82)
        at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:80)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:79)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:91)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$3(QueryExecution.scala:256)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:165)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$2(QueryExecution.scala:256)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$9(SQLExecution.scala:258)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:448)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:203)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1073)
        at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:131)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:398)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$1(QueryExecution.scala:255)
        at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$withMVTagsIfNecessary(QueryExecution.scala:238)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:251)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:244)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:519)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:106)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:519)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:339)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:335)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:495)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$1(QueryExecution.scala:244)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:395)
        at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:244)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:198)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:189)
        at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:305)
        at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:310)
        at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:307)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$writePlans$4(QueryExecution.scala:543)
        at org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:751)
        at org.apache.spark.sql.execution.QueryExecution.writePlans(QueryExecution.scala:543)
        at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:560)
        at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:553)
        at java.lang.String.valueOf(String.java:2994)
        at java.lang.StringBuilder.append(StringBuilder.java:136)
        at com.databricks.service.SparkServiceImpl$.$anonfun$executePlan$2(SparkServiceImpl.scala:121)
        at org.apache.spark.internal.Logging.logInfo(Logging.scala:61)
        at org.apache.spark.internal.Logging.logInfo$(Logging.scala:60)
        at com.databricks.service.SparkServiceImpl$.logInfo(SparkServiceImpl.scala:58)
        at com.databricks.service.SparkServiceImpl$.$anonfun$executePlan$1(SparkServiceImpl.scala:121)
        at com.databricks.logging.UsageLogging.$anonfun$recordOperation$1(UsageLogging.scala:560)
        at com.databricks.logging.UsageLogging.executeThunkAndCaptureResultTags$1(UsageLogging.scala:657)
        at com.databricks.logging.UsageLogging.$anonfun$recordOperationWithResultTags$4(UsageLogging.scala:678)
        at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:414)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
        at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:158)
        at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:412)
        at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:409)
        at com.databricks.spark.util.PublicDBLogging.withAttributionContext(DatabricksSparkUsageLogger.scala:27)
        at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:457)
        at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:442)
        at com.databricks.spark.util.PublicDBLogging.withAttributionTags(DatabricksSparkUsageLogger.scala:27)
        at com.databricks.logging.UsageLogging.recordOperationWithResultTags(UsageLogging.scala:652)
        at com.databricks.logging.UsageLogging.recordOperationWithResultTags$(UsageLogging.scala:569)
        at com.databricks.spark.util.PublicDBLogging.recordOperationWithResultTags(DatabricksSparkUsageLogger.scala:27)
        at com.databricks.logging.UsageLogging.recordOperation(UsageLogging.scala:560)
        at com.databricks.logging.UsageLogging.recordOperation$(UsageLogging.scala:528)
        at com.databricks.spark.util.PublicDBLogging.recordOperation(DatabricksSparkUsageLogger.scala:27)
        at com.databricks.spark.util.PublicDBLogging.recordOperation0(DatabricksSparkUsageLogger.scala:68)
        at com.databricks.spark.util.DatabricksSparkUsageLogger.recordOperation(DatabricksSparkUsageLogger.scala:150)
        at com.databricks.spark.util.UsageLogger.recordOperation(UsageLogger.scala:72)
        at com.databricks.spark.util.UsageLogger.recordOperation$(UsageLogger.scala:59)
        at com.databricks.spark.util.DatabricksSparkUsageLogger.recordOperation(DatabricksSparkUsageLogger.scala:109)
        at com.databricks.spark.util.UsageLogging.recordOperation(UsageLogger.scala:433)
        at com.databricks.spark.util.UsageLogging.recordOperation$(UsageLogger.scala:412)
        at com.databricks.service.SparkServiceImpl$.super$recordOperation(SparkServiceImpl.scala:89)
        at com.databricks.service.SparkServiceImpl$.$anonfun$recordOperation$4(SparkServiceImpl.scala:99)
        at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:414)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
        at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:158)
        at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:412)
        at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:409)
        at com.databricks.spark.util.PublicDBLogging.withAttributionContext(DatabricksSparkUsageLogger.scala:27)
        at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:457)
        at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:442)
        at com.databricks.spark.util.PublicDBLogging.withAttributionTags(DatabricksSparkUsageLogger.scala:27)
        at com.databricks.spark.util.PublicDBLogging.withAttributionTags0(DatabricksSparkUsageLogger.scala:72)
        at com.databricks.spark.util.DatabricksSparkUsageLogger.withAttributionTags(DatabricksSparkUsageLogger.scala:172)
        at com.databricks.spark.util.UsageLogging.$anonfun$withAttributionTags$1(UsageLogger.scala:495)
        at com.databricks.spark.util.UsageLogging$.withAttributionTags(UsageLogger.scala:607)
        at com.databricks.spark.util.UsageLogging$.withAttributionTags(UsageLogger.scala:616)
        at com.databricks.spark.util.UsageLogging.withAttributionTags(UsageLogger.scala:495)
        at com.databricks.spark.util.UsageLogging.withAttributionTags$(UsageLogger.scala:493)
        at com.databricks.service.SparkServiceImpl$.withAttributionTags(SparkServiceImpl.scala:58)
        at com.databricks.service.SparkServiceImpl$.recordOperation(SparkServiceImpl.scala:99)
        at com.databricks.service.SparkServiceImpl$.executePlan(SparkServiceImpl.scala:119)
        at com.databricks.service.SparkServiceRPCHandler.execute0(SparkServiceRPCHandler.scala:697)
        at com.databricks.service.SparkServiceRPCHandler.$anonfun$executeRPC0$1(SparkServiceRPCHandler.scala:500)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
        at com.databricks.service.SparkServiceRPCHandler.executeRPC0(SparkServiceRPCHandler.scala:372)
        at com.databricks.service.SparkServiceRPCHandler$$anon$2.call(SparkServiceRPCHandler.scala:323)
        at com.databricks.service.SparkServiceRPCHandler$$anon$2.call(SparkServiceRPCHandler.scala:309)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at com.databricks.service.SparkServiceRPCHandler.$anonfun$executeRPC$1(SparkServiceRPCHandler.scala:359)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
        at com.databricks.service.SparkServiceRPCHandler.executeRPC(SparkServiceRPCHandler.scala:336)
        at com.databricks.service.SparkServiceRPCServlet.doPost(SparkServiceRPCServer.scala:167)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:523)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:590)
        at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:799)
        at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:550)
        at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:190)
        at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:501)
        at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
        at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
        at org.eclipse.jetty.server.Server.handle(Server.java:516)
        at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:487)
        at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:732)
        at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:479)
        at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:277)
        at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
        at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:105)
        at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104)
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:338)
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:315)
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:173)
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:131)
        at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:409)
        at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:883)
        at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1034)
        at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ClassCastException: com.azure.cosmos.spark.CosmosClientMetadataCachesSnapshots cannot be cast to com.azure.cosmos.spark.CosmosClientMetadataCachesSnapshots
        at com.azure.cosmos.spark.CosmosWriterBase.<init>(CosmosWriterBase.scala:39)
        at com.azure.cosmos.spark.CosmosWriter.<init>(CosmosWriter.scala:33)
        at com.azure.cosmos.spark.ItemsDataWriteFactory.createWriter(ItemsDataWriteFactory.scala:51)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:477)
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:432)
        at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
        at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
        at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
        at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
        at org.apache.spark.scheduler.Task.doRunTask(Task.scala:179)
        at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:142)
        at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:126)
        at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:142)
        at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
        at org.apache.spark.scheduler.Task.run(Task.scala:97)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:904)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1740)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:907)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:761)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more

Code Snippet

cfg = {
    "spark.cosmos.accountEndpoint": f"https://{cosmosdb_name}.documents.azure.com:443/",
    "spark.cosmos.accountKey": cosmosdb.primary_master_key,
    "spark.cosmos.database": database_name,
    "spark.cosmos.container": table_name,
}
spark.conf.set(
    "spark.cosmos.throughputControl.globalControl.database", database_name
)
spark.conf.set("spark.cosmos.throughputControl.enabled", "true")
spark.conf.set(
    "spark.cosmos.throughputControl.name", "SourceContainerThroughputControl"
)
spark.conf.set("spark.cosmos.throughputControl.targetThroughputThreshold", "0.95")

data = (
    (
        spark.read.format("csv")
        .options(header="True", inferSchema="True", delimiter=";")
        .load(spark_file_path)
    )
    .withColumnRenamed("my_col", "id")
    .na.drop(subset=["id"])
)
data = data.withColumn("id", data["id"].cast("string"))

data = data.dropDuplicates(["id"])

data.write.format("cosmos.oltp").options(**cfg).mode("APPEND").save()

Expected behavior
No errors or a retry since it does not fail all the time. The data are made of strings, doubles and nans.

Setup:

  • OS: WSL 1 or Azure Machine Learning Compute Instances/Clusters (any size), windows 10.
  • Library/Libraries: azure-cosmos-spark_3-3_2-12:4.30.0
  • Java version:
    openjdk 19.0.1 2022-10-18
    OpenJDK Runtime Environment (build 19.0.1+10-21)
    OpenJDK 64-Bit Server VM (build 19.0.1+10-21, mixed mode, sharing)
  • App Server/Environment: Databricks cluster 12.2 LTS
  • Spark Config:
    spark.sql.catalog.cosmosCatalog com.azure.cosmos.spark.CosmosCatalog
    spark.jars.packages com.azure.cosmos.spark:azure-cosmos-spark_3-3_2-12:4.30.0
@github-actions github-actions bot added Client This issue points to a problem in the data-plane of the library. Cosmos customer-reported Issues that are reported by GitHub users external to the Azure organization. needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team question The issue doesn't require a change to the product in order to be resolved. Most issues start as that Service Attention Workflow: This issue is responsible by Azure service team. labels Oct 14, 2024
Copy link

Thanks for the feedback! We are routing this to the appropriate team for follow-up. cc @kushagraThapar @pjohari-ms @TheovanKraay.

@kushagraThapar
Copy link
Member

@Uranium2 thanks for creating this issue, @tvaron3 please take a look at this when you get a chance.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Client This issue points to a problem in the data-plane of the library. Cosmos customer-reported Issues that are reported by GitHub users external to the Azure organization. needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team question The issue doesn't require a change to the product in order to be resolved. Most issues start as that Service Attention Workflow: This issue is responsible by Azure service team.
Projects
None yet
Development

No branches or pull requests

2 participants