Skip to content

Commit

Permalink
Add alert & retry for delta/s3 initialization (#74)
Browse files Browse the repository at this point in the history
  • Loading branch information
oguzhanunlu authored Aug 1, 2024
1 parent 30e9e2a commit c2c088a
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ import software.amazon.awssdk.services.s3.model.{NoSuchBucketException, S3Except
import software.amazon.awssdk.services.sts.model.StsException
import software.amazon.awssdk.services.glue.model.{AccessDeniedException => GlueAccessDeniedException}

import org.apache.hadoop.fs.s3a.UnknownStoreException

import java.nio.file.AccessDeniedException

import com.snowplowanalytics.snowplow.sources.kinesis.{KinesisSource, KinesisSourceConfig}
import com.snowplowanalytics.snowplow.sinks.kinesis.{KinesisSink, KinesisSinkConfig}

Expand All @@ -36,6 +40,13 @@ object AwsApp extends LoaderApp[KinesisSourceConfig, KinesisSinkConfig](BuildInf
case _: StsException =>
// No permission to assume the role given to authenticate to S3/Glue
true
case _: UnknownStoreException =>
// no such bucket exist
true
case _: AccessDeniedException =>
// 1 - s3 bucket's permission policy denies all actions
// 2 - not authorized to assume the role
true
case t => TableFormatSetupError.check(t)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ package com.snowplowanalytics.snowplow.lakes

import org.apache.iceberg.exceptions.{ForbiddenException => IcebergForbiddenException, NotFoundException => IcebergNotFoundException}

import org.apache.spark.sql.delta.DeltaIOException

object TableFormatSetupError {

// Check if given exception is specific to iceberg format
Expand All @@ -23,6 +25,9 @@ object TableFormatSetupError {
case _: IcebergForbiddenException =>
// No permission to create a table in Glue catalog
true
case _: DeltaIOException =>
// no read/write permission in s3 bucket
true
case _ =>
false
}
Expand Down
6 changes: 3 additions & 3 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,9 @@ object Dependencies {

val icebergDeltaRuntimeDependencies = Seq(
iceberg,
delta % Runtime,
Spark.coreForIcebergDelta % Runtime,
Spark.sqlForIcebergDelta % Runtime
delta,
Spark.coreForIcebergDelta,
Spark.sqlForIcebergDelta % Runtime
)

val coreDependencies = Seq(
Expand Down

0 comments on commit c2c088a

Please sign in to comment.