From ab0328821aab0f130398cd522d59e9696ce2ae3b Mon Sep 17 00:00:00 2001 From: Oguzhan Unlu Date: Wed, 31 Jul 2024 14:33:42 +0300 Subject: [PATCH] Add alert & retry for delta/s3 initialization --- .../AwsApp.scala | 16 ++++++++++++++++ project/Dependencies.scala | 6 +++--- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/modules/aws/src/main/scala/com.snowplowanalytics.snowplow.lakes/AwsApp.scala b/modules/aws/src/main/scala/com.snowplowanalytics.snowplow.lakes/AwsApp.scala index c2f0cc5..efcbf14 100644 --- a/modules/aws/src/main/scala/com.snowplowanalytics.snowplow.lakes/AwsApp.scala +++ b/modules/aws/src/main/scala/com.snowplowanalytics.snowplow.lakes/AwsApp.scala @@ -14,6 +14,12 @@ import software.amazon.awssdk.services.s3.model.{NoSuchBucketException, S3Except import software.amazon.awssdk.services.sts.model.StsException import software.amazon.awssdk.services.glue.model.{AccessDeniedException => GlueAccessDeniedException} +import org.apache.hadoop.fs.s3a.UnknownStoreException + +import java.nio.file.AccessDeniedException + +import org.apache.spark.sql.delta.DeltaIOException + import com.snowplowanalytics.snowplow.sources.kinesis.{KinesisSource, KinesisSourceConfig} import com.snowplowanalytics.snowplow.sinks.kinesis.{KinesisSink, KinesisSinkConfig} @@ -36,6 +42,16 @@ object AwsApp extends LoaderApp[KinesisSourceConfig, KinesisSinkConfig](BuildInf case _: StsException => // No permission to assume the role given to authenticate to S3/Glue true + case _: UnknownStoreException => + // no such bucket exist + true + case _: AccessDeniedException => + // 1 - no permission to see s3 bucket + // 2 - not authorized to assume the role + true + case _: DeltaIOException => + // no read/write permission in s3 bucket + true case t => TableFormatSetupError.check(t) } } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index a8d4cb0..88fec70 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -141,9 +141,9 @@ object Dependencies { val icebergDeltaRuntimeDependencies = Seq( iceberg, - delta % Runtime, - Spark.coreForIcebergDelta % Runtime, - Spark.sqlForIcebergDelta % Runtime + delta, + Spark.coreForIcebergDelta, + Spark.sqlForIcebergDelta % Runtime ) val coreDependencies = Seq(