From a5ee735a42c939ebf6f966ecad91f726e8a807af Mon Sep 17 00:00:00 2001 From: Daniel Kavan Date: Wed, 31 Mar 2021 14:18:18 +0200 Subject: [PATCH 1/5] S3Location moved to absa commons // todo replace snapshot version when commons are released --- atum-s3-sdk-extension/pom.xml | 6 ++ .../SparkQueryExecutionListenerSdkS3.scala | 9 ++- .../s3/{S3Location.scala => Regional.scala} | 2 +- atum/pom.xml | 7 ++ .../za/co/absa/atum/utils/InfoFile.scala | 2 +- examples/pom.xml | 8 +++ .../za/co/absa/atum/location/S3Location.scala | 63 ------------------ .../absa/atum/location/S3LocationSpec.scala | 64 ------------------- pom.xml | 1 + 9 files changed, 30 insertions(+), 132 deletions(-) rename atum-s3-sdk-extension/src/main/scala/za/co/absa/atum/persistence/s3/{S3Location.scala => Regional.scala} (97%) delete mode 100644 model/src/main/scala/za/co/absa/atum/location/S3Location.scala delete mode 100644 model/src/test/scala/za/co/absa/atum/location/S3LocationSpec.scala diff --git a/atum-s3-sdk-extension/pom.xml b/atum-s3-sdk-extension/pom.xml index af36f090..516ec5eb 100644 --- a/atum-s3-sdk-extension/pom.xml +++ b/atum-s3-sdk-extension/pom.xml @@ -47,6 +47,12 @@ ${project.version} + + za.co.absa.commons + commons_${scala.binary.version} + ${commons.version} + + software.amazon.awssdk s3 diff --git a/atum-s3-sdk-extension/src/main/scala/za/co/absa/atum/core/SparkQueryExecutionListenerSdkS3.scala b/atum-s3-sdk-extension/src/main/scala/za/co/absa/atum/core/SparkQueryExecutionListenerSdkS3.scala index 2b88971e..16915dbd 100644 --- a/atum-s3-sdk-extension/src/main/scala/za/co/absa/atum/core/SparkQueryExecutionListenerSdkS3.scala +++ b/atum-s3-sdk-extension/src/main/scala/za/co/absa/atum/core/SparkQueryExecutionListenerSdkS3.scala @@ -48,7 +48,7 @@ class SparkQueryExecutionListenerSdkS3(cf: ControlFrameworkStateSdkS3) extends S } } - /** Write _INFO file with control measurements to the output directory based on the query plan */ + /** Write _INFO file with control measurements to the output directory based on the query plan */ private def writeInfoFileForQueryForSdkS3(qe: QueryExecution, region: Region, kmsSettings: S3KmsSettings)(implicit credentialsProvider: AwsCredentialsProvider): Unit = { val infoFilePath = ExecutionPlanUtils.inferOutputInfoFileNameOnS3(qe, cf.outputInfoFileName) @@ -56,9 +56,12 @@ class SparkQueryExecutionListenerSdkS3(cf: ControlFrameworkStateSdkS3) extends S infoFilePath.foreach(path => { import za.co.absa.atum.persistence.s3.S3LocationRegionImplicits.SimpleS3LocationRegionExt - import za.co.absa.atum.location.S3Location.StringS3LocationExt + import za.co.absa.commons.s3.S3Location._ - val location = path.toS3LocationOrFail.withRegion(region) + val location = path.toS3Location match { + case Some(loc) => loc.withRegion(region) + case _ => throw new IllegalArgumentException(s"Could not parse S3 Location from $path!") + } AtumSdkS3.log.debug(s"Inferred _INFO Location = $location") cf.storeCurrentInfoFileOnSdkS3(location, kmsSettings) diff --git a/atum-s3-sdk-extension/src/main/scala/za/co/absa/atum/persistence/s3/S3Location.scala b/atum-s3-sdk-extension/src/main/scala/za/co/absa/atum/persistence/s3/Regional.scala similarity index 97% rename from atum-s3-sdk-extension/src/main/scala/za/co/absa/atum/persistence/s3/S3Location.scala rename to atum-s3-sdk-extension/src/main/scala/za/co/absa/atum/persistence/s3/Regional.scala index cbd10c7c..63f5d241 100644 --- a/atum-s3-sdk-extension/src/main/scala/za/co/absa/atum/persistence/s3/S3Location.scala +++ b/atum-s3-sdk-extension/src/main/scala/za/co/absa/atum/persistence/s3/Regional.scala @@ -17,7 +17,7 @@ package za.co.absa.atum.persistence.s3 import software.amazon.awssdk.regions.Region import software.amazon.awssdk.services.s3.model.ServerSideEncryption -import za.co.absa.atum.location.S3Location +import za.co.absa.commons.s3.S3Location trait Regional { def region: Region diff --git a/atum/pom.xml b/atum/pom.xml index 45d07ad8..08c6add1 100644 --- a/atum/pom.xml +++ b/atum/pom.xml @@ -35,6 +35,13 @@ ${project.version} + + za.co.absa.commons + commons_${scala.binary.version} + ${commons.version} + provided + + commons-configuration commons-configuration diff --git a/atum/src/main/scala/za/co/absa/atum/utils/InfoFile.scala b/atum/src/main/scala/za/co/absa/atum/utils/InfoFile.scala index 3752ca75..c3c8ccc4 100644 --- a/atum/src/main/scala/za/co/absa/atum/utils/InfoFile.scala +++ b/atum/src/main/scala/za/co/absa/atum/utils/InfoFile.scala @@ -20,7 +20,7 @@ import java.net.URI import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import za.co.absa.atum.AtumImplicits.{DefaultControlInfoLoader, DefaultControlInfoStorer, StringPathExt} -import za.co.absa.atum.location.S3Location.StringS3LocationExt +import za.co.absa.commons.s3.S3Location.StringS3LocationExt object InfoFile { /** diff --git a/examples/pom.xml b/examples/pom.xml index 7cba709f..feccc59a 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -43,6 +43,14 @@ ${project.version} + + + za.co.absa.commons + commons_${scala.binary.version} + ${commons.version} + test + + org.scalatest diff --git a/model/src/main/scala/za/co/absa/atum/location/S3Location.scala b/model/src/main/scala/za/co/absa/atum/location/S3Location.scala deleted file mode 100644 index 648f528c..00000000 --- a/model/src/main/scala/za/co/absa/atum/location/S3Location.scala +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright 2018 ABSA Group Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package za.co.absa.atum.location - -import scala.util.matching.Regex - -object S3Location { - - /** - * Generally usable regex for validating S3 path, e.g. `s3://my-cool-bucket1/path/to/file/on/s3.txt` - * Protocols `s3`, `s3n`, and `s3a` are allowed. - * Bucket naming rules defined at [[https://docs.aws.amazon.com/AmazonS3/latest/dev/BucketRestrictions.html#bucketnamingrules]] are instilled. - */ - val S3LocationRx: Regex = "(s3[an]?)://([-a-z0-9.]{3,63})/(.*)".r - - implicit class StringS3LocationExt(path: String) { - - def toS3Location: Option[SimpleS3Location] = { - path match { - case S3LocationRx(protocol, bucketName, relativePath) => Some(SimpleS3Location(protocol, bucketName, relativePath)) - case _ => None - } - } - - def toS3LocationOrFail: SimpleS3Location = { - this.toS3Location.getOrElse{ - throw new IllegalArgumentException(s"Could not parse S3 Location from $path using rx $S3LocationRx.") - } - } - - def isValidS3Path: Boolean = path match { - case S3LocationRx(_, _, _) => true - case _ => false - } - } -} - -trait S3Location { - def protocol: String - def bucketName: String - def path: String - - /** - * Returns formatted S3 string, e.g. `s3://myBucket/path/to/somewhere` - * @return formatted s3 string - */ - def s3String: String = s"$protocol://$bucketName/$path" -} - -case class SimpleS3Location(protocol: String, bucketName: String, path: String) extends S3Location diff --git a/model/src/test/scala/za/co/absa/atum/location/S3LocationSpec.scala b/model/src/test/scala/za/co/absa/atum/location/S3LocationSpec.scala deleted file mode 100644 index 2fdf6cc6..00000000 --- a/model/src/test/scala/za/co/absa/atum/location/S3LocationSpec.scala +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright 2018 ABSA Group Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package za.co.absa.atum.location - -import org.scalatest.flatspec.AnyFlatSpec -import org.scalatest.matchers.should.Matchers -import za.co.absa.atum.location.S3Location.StringS3LocationExt - -class S3LocationSpec extends AnyFlatSpec with Matchers { - - val validPathsWithExpectedLocations = Seq( - // (path, expected parsed value) - ("s3://mybucket-123/path/to/file.ext", SimpleS3Location("s3", "mybucket-123", "path/to/file.ext")), - ("s3n://mybucket-123/path/to/ends/with/slash/", SimpleS3Location("s3n","mybucket-123", "path/to/ends/with/slash/")), - ("s3a://mybucket-123.asdf.cz/path-to-$_file!@#$.ext", SimpleS3Location("s3a", "mybucket-123.asdf.cz", "path-to-$_file!@#$.ext")) - ) - - val invalidPaths = Seq( - "s3x://mybucket-123/path/to/file/on/invalid/prefix", - "s3://bb/some/path/but/bucketname/too/short" - ) - - "S3Utils.StringS3LocationExt" should "parse S3 path from String using toS3Location" in { - validPathsWithExpectedLocations.foreach { case (path, expectedLocation) => - path.toS3Location shouldBe Some(expectedLocation) - } - } - - it should "find no valid S3 path when parsing invalid S3 path from String using toS3Location" in { - invalidPaths.foreach { - _.toS3Location shouldBe None - } - } - - it should "fail parsing invalid S3 path from String using toS3LocationOrFail" in { - invalidPaths.foreach { path => - assertThrows[IllegalArgumentException] { - path.toS3LocationOrFail - } - } - } - - it should "check path using isValidS3Path" in { - validPathsWithExpectedLocations.map(_._1).foreach { path => - path.isValidS3Path shouldBe true - } - - invalidPaths.foreach(_.isValidS3Path shouldBe false) - } - -} diff --git a/pom.xml b/pom.xml index d18858ff..68480485 100644 --- a/pom.xml +++ b/pom.xml @@ -126,6 +126,7 @@ 2.5 2.13.65 1.15.0 + 0.0.27-SNAPSHOT 2.4.6 From aeb3cab85ac450c40b4912d2ea4c976dc9bd8ab5 Mon Sep 17 00:00:00 2001 From: Daniel Kavan Date: Thu, 1 Apr 2021 12:45:44 +0200 Subject: [PATCH 2/5] S3Location moved to absa commons - update: s3String shared from SimpleS3Location --- .../main/scala/za/co/absa/atum/persistence/s3/Regional.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/atum-s3-sdk-extension/src/main/scala/za/co/absa/atum/persistence/s3/Regional.scala b/atum-s3-sdk-extension/src/main/scala/za/co/absa/atum/persistence/s3/Regional.scala index 63f5d241..d694d0da 100644 --- a/atum-s3-sdk-extension/src/main/scala/za/co/absa/atum/persistence/s3/Regional.scala +++ b/atum-s3-sdk-extension/src/main/scala/za/co/absa/atum/persistence/s3/Regional.scala @@ -17,7 +17,7 @@ package za.co.absa.atum.persistence.s3 import software.amazon.awssdk.regions.Region import software.amazon.awssdk.services.s3.model.ServerSideEncryption -import za.co.absa.commons.s3.S3Location +import za.co.absa.commons.s3.{S3Location, SimpleS3Location} trait Regional { def region: Region @@ -25,6 +25,8 @@ trait Regional { case class SimpleS3LocationWithRegion(protocol: String, bucketName: String, path: String, region: Region) extends S3Location with Regional { def withRegion(region: Region): SimpleS3LocationWithRegion = this.copy(region = region) + + override def s3String: String = SimpleS3Location(protocol, bucketName, path).s3String } case class S3KmsSettings(kmsKeyId: String, serverSideEncryption: ServerSideEncryption = ServerSideEncryption.AWS_KMS) From 02d7c5b96771c8f67e171a3e4f0595bc4752db0e Mon Sep 17 00:00:00 2001 From: Daniel Kavan Date: Tue, 6 Apr 2021 14:35:39 +0200 Subject: [PATCH 3/5] AbsaCommons' (snapshot) S3Location update reflected, PR review update --- .../co/absa/atum/core/ControlFrameworkStateSdkS3.scala | 2 +- .../atum/core/SparkQueryExecutionListenerSdkS3.scala | 10 ++++------ .../s3/ControlMeasuresSdkS3LoaderJsonFile.scala | 2 +- .../s3/ControlMeasuresSdkS3StorerJsonFile.scala | 2 +- .../za/co/absa/atum/persistence/s3/Regional.scala | 2 +- .../main/scala/za/co/absa/atum/utils/InfoFile.scala | 6 +++--- 6 files changed, 11 insertions(+), 13 deletions(-) diff --git a/atum-s3-sdk-extension/src/main/scala/za/co/absa/atum/core/ControlFrameworkStateSdkS3.scala b/atum-s3-sdk-extension/src/main/scala/za/co/absa/atum/core/ControlFrameworkStateSdkS3.scala index 0337d0ae..51b1d493 100644 --- a/atum-s3-sdk-extension/src/main/scala/za/co/absa/atum/core/ControlFrameworkStateSdkS3.scala +++ b/atum-s3-sdk-extension/src/main/scala/za/co/absa/atum/core/ControlFrameworkStateSdkS3.scala @@ -28,6 +28,6 @@ class ControlFrameworkStateSdkS3(sparkSession: SparkSession) extends ControlFram val storer = ControlMeasuresSdkS3StorerJsonFile(s3Location, s3KmsSettings) storer.store(accumulator.getControlMeasure) - AtumSdkS3.log.info(s"Control measurements saved to ${s3Location.s3String}") + AtumSdkS3.log.info(s"Control measurements saved to ${s3Location.asSimpleS3LocationString}") } } diff --git a/atum-s3-sdk-extension/src/main/scala/za/co/absa/atum/core/SparkQueryExecutionListenerSdkS3.scala b/atum-s3-sdk-extension/src/main/scala/za/co/absa/atum/core/SparkQueryExecutionListenerSdkS3.scala index 16915dbd..4b8533d9 100644 --- a/atum-s3-sdk-extension/src/main/scala/za/co/absa/atum/core/SparkQueryExecutionListenerSdkS3.scala +++ b/atum-s3-sdk-extension/src/main/scala/za/co/absa/atum/core/SparkQueryExecutionListenerSdkS3.scala @@ -21,6 +21,7 @@ import software.amazon.awssdk.regions.Region import za.co.absa.atum.persistence.S3ControlMeasuresStorer import za.co.absa.atum.persistence.s3.S3KmsSettings import za.co.absa.atum.utils.ExecutionPlanUtils +import za.co.absa.commons.s3.SimpleS3Location /** * The class is responsible for listening to DataSet save events and outputting corresponding control measurements. @@ -33,7 +34,7 @@ class SparkQueryExecutionListenerSdkS3(cf: ControlFrameworkStateSdkS3) extends S // adding s3 processing cf.accumulator.getStorer match { case Some(s3storer: S3ControlMeasuresStorer) => - AtumSdkS3.log.debug(s"SparkQueryExecutionListener.onSuccess for S3ControlMeasuresStorer: writing to ${s3storer.outputLocation.s3String}") + AtumSdkS3.log.debug(s"SparkQueryExecutionListener.onSuccess for S3ControlMeasuresStorer: writing to ${s3storer.outputLocation.asSimpleS3LocationString}") writeInfoFileForQueryForSdkS3(qe, s3storer.outputLocation.region, s3storer.kmsSettings)(s3storer.credentialsProvider) // Notify listeners @@ -56,12 +57,9 @@ class SparkQueryExecutionListenerSdkS3(cf: ControlFrameworkStateSdkS3) extends S infoFilePath.foreach(path => { import za.co.absa.atum.persistence.s3.S3LocationRegionImplicits.SimpleS3LocationRegionExt - import za.co.absa.commons.s3.S3Location._ - val location = path.toS3Location match { - case Some(loc) => loc.withRegion(region) - case _ => throw new IllegalArgumentException(s"Could not parse S3 Location from $path!") - } + val location = SimpleS3Location(path) // would throw IAE on apply (parse error) + .withRegion(region) AtumSdkS3.log.debug(s"Inferred _INFO Location = $location") cf.storeCurrentInfoFileOnSdkS3(location, kmsSettings) diff --git a/atum-s3-sdk-extension/src/main/scala/za/co/absa/atum/persistence/s3/ControlMeasuresSdkS3LoaderJsonFile.scala b/atum-s3-sdk-extension/src/main/scala/za/co/absa/atum/persistence/s3/ControlMeasuresSdkS3LoaderJsonFile.scala index b9d50910..f149a39e 100644 --- a/atum-s3-sdk-extension/src/main/scala/za/co/absa/atum/persistence/s3/ControlMeasuresSdkS3LoaderJsonFile.scala +++ b/atum-s3-sdk-extension/src/main/scala/za/co/absa/atum/persistence/s3/ControlMeasuresSdkS3LoaderJsonFile.scala @@ -42,7 +42,7 @@ case class ControlMeasuresSdkS3LoaderJsonFile(inputLocation: SimpleS3LocationWit } override def getInfo: String = { - s"JSON deserializer from ${inputLocation.s3String}" + s"JSON deserializer from ${inputLocation.asSimpleS3LocationString}" } private[s3] def getS3Client: S3Client = SdkS3ClientUtils.getS3Client(inputLocation.region, credentialsProvider) diff --git a/atum-s3-sdk-extension/src/main/scala/za/co/absa/atum/persistence/s3/ControlMeasuresSdkS3StorerJsonFile.scala b/atum-s3-sdk-extension/src/main/scala/za/co/absa/atum/persistence/s3/ControlMeasuresSdkS3StorerJsonFile.scala index 57ecc903..cc14a24f 100644 --- a/atum-s3-sdk-extension/src/main/scala/za/co/absa/atum/persistence/s3/ControlMeasuresSdkS3StorerJsonFile.scala +++ b/atum-s3-sdk-extension/src/main/scala/za/co/absa/atum/persistence/s3/ControlMeasuresSdkS3StorerJsonFile.scala @@ -58,7 +58,7 @@ case class ControlMeasuresSdkS3StorerJsonFile(outputLocation: SimpleS3LocationWi } override def getInfo: String = { - s"JSON serializer for Storer to ${outputLocation.s3String}" + s"JSON serializer for Storer to ${outputLocation.asSimpleS3LocationString}" } private[s3] def getS3Client: S3Client = SdkS3ClientUtils.getS3Client(outputLocation.region, credentialsProvider) diff --git a/atum-s3-sdk-extension/src/main/scala/za/co/absa/atum/persistence/s3/Regional.scala b/atum-s3-sdk-extension/src/main/scala/za/co/absa/atum/persistence/s3/Regional.scala index d694d0da..45bc61e5 100644 --- a/atum-s3-sdk-extension/src/main/scala/za/co/absa/atum/persistence/s3/Regional.scala +++ b/atum-s3-sdk-extension/src/main/scala/za/co/absa/atum/persistence/s3/Regional.scala @@ -26,7 +26,7 @@ trait Regional { case class SimpleS3LocationWithRegion(protocol: String, bucketName: String, path: String, region: Region) extends S3Location with Regional { def withRegion(region: Region): SimpleS3LocationWithRegion = this.copy(region = region) - override def s3String: String = SimpleS3Location(protocol, bucketName, path).s3String + override def asSimpleS3LocationString: String = SimpleS3Location(protocol, bucketName, path).asSimpleS3LocationString } case class S3KmsSettings(kmsKeyId: String, serverSideEncryption: ServerSideEncryption = ServerSideEncryption.AWS_KMS) diff --git a/atum/src/main/scala/za/co/absa/atum/utils/InfoFile.scala b/atum/src/main/scala/za/co/absa/atum/utils/InfoFile.scala index c3c8ccc4..a741fc41 100644 --- a/atum/src/main/scala/za/co/absa/atum/utils/InfoFile.scala +++ b/atum/src/main/scala/za/co/absa/atum/utils/InfoFile.scala @@ -20,7 +20,7 @@ import java.net.URI import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import za.co.absa.atum.AtumImplicits.{DefaultControlInfoLoader, DefaultControlInfoStorer, StringPathExt} -import za.co.absa.commons.s3.S3Location.StringS3LocationExt +import za.co.absa.commons.s3.SimpleS3Location.SimpleS3LocationExt object InfoFile { /** @@ -37,11 +37,11 @@ object InfoFile { def convertFullPathToFsAndRelativePath(fullPath: String)(implicit hadoopConfiguration: Configuration): (FileSystem, Path) = { val sanitizedFullPath = fullPath.replaceAll("[\\*\\?]", "") - sanitizedFullPath.toS3Location match { + sanitizedFullPath.toSimpleS3Location match { case Some(s3Location) => // this is S3 over hadoop FS API, not SDK S3 approach - val s3Uri = new URI(s3Location.s3String) // s3:// + val s3Uri = new URI(s3Location.asSimpleS3LocationString) // s3:// val s3Path = new Path(s"/${s3Location.path}") // / val fs = FileSystem.get(s3Uri, hadoopConfiguration) From bd380ac578aab4738a01f7f2f372d6d4a7c5ceca Mon Sep 17 00:00:00 2001 From: Daniel Kavan Date: Wed, 7 Apr 2021 11:28:42 +0200 Subject: [PATCH 4/5] absa commons 0.0.27 final version used --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 68480485..13ffb097 100644 --- a/pom.xml +++ b/pom.xml @@ -126,7 +126,7 @@ 2.5 2.13.65 1.15.0 - 0.0.27-SNAPSHOT + 0.0.27 2.4.6 From fb7892c39c509bcc87d4c2bc279535c6ac56f9ce Mon Sep 17 00:00:00 2001 From: Daniel Kavan Date: Wed, 7 Apr 2021 11:50:23 +0200 Subject: [PATCH 5/5] absa commons scope = compile. --- atum/pom.xml | 1 - 1 file changed, 1 deletion(-) diff --git a/atum/pom.xml b/atum/pom.xml index 08c6add1..8269964c 100644 --- a/atum/pom.xml +++ b/atum/pom.xml @@ -39,7 +39,6 @@ za.co.absa.commons commons_${scala.binary.version} ${commons.version} - provided