Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3Location moved to absa commons #71

Merged
merged 5 commits into from
Apr 7, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions atum-s3-sdk-extension/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>za.co.absa.commons</groupId>
<artifactId>commons_${scala.binary.version}</artifactId>
<version>${commons.version}</version>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Last question. Does this need to be here if there are commons already in atum?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regional.scala in this package needs S3Location stuff from Absa commons.

It would work with the dependency on a transitive basis, too (so it would not need to be listed to work), but since we are explicitly using it in this package, we should also explicitly list it, IMHO.

</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,6 @@ class ControlFrameworkStateSdkS3(sparkSession: SparkSession) extends ControlFram
val storer = ControlMeasuresSdkS3StorerJsonFile(s3Location, s3KmsSettings)

storer.store(accumulator.getControlMeasure)
AtumSdkS3.log.info(s"Control measurements saved to ${s3Location.s3String}")
AtumSdkS3.log.info(s"Control measurements saved to ${s3Location.asSimpleS3LocationString}")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import software.amazon.awssdk.regions.Region
import za.co.absa.atum.persistence.S3ControlMeasuresStorer
import za.co.absa.atum.persistence.s3.S3KmsSettings
import za.co.absa.atum.utils.ExecutionPlanUtils
import za.co.absa.commons.s3.SimpleS3Location

/**
* The class is responsible for listening to DataSet save events and outputting corresponding control measurements.
Expand All @@ -33,7 +34,7 @@ class SparkQueryExecutionListenerSdkS3(cf: ControlFrameworkStateSdkS3) extends S
// adding s3 processing
cf.accumulator.getStorer match {
case Some(s3storer: S3ControlMeasuresStorer) =>
AtumSdkS3.log.debug(s"SparkQueryExecutionListener.onSuccess for S3ControlMeasuresStorer: writing to ${s3storer.outputLocation.s3String}")
AtumSdkS3.log.debug(s"SparkQueryExecutionListener.onSuccess for S3ControlMeasuresStorer: writing to ${s3storer.outputLocation.asSimpleS3LocationString}")
writeInfoFileForQueryForSdkS3(qe, s3storer.outputLocation.region, s3storer.kmsSettings)(s3storer.credentialsProvider)

// Notify listeners
Expand All @@ -48,17 +49,17 @@ class SparkQueryExecutionListenerSdkS3(cf: ControlFrameworkStateSdkS3) extends S
}
}

/** Write _INFO file with control measurements to the output directory based on the query plan */
/** Write _INFO file with control measurements to the output directory based on the query plan */
private def writeInfoFileForQueryForSdkS3(qe: QueryExecution, region: Region, kmsSettings: S3KmsSettings)(implicit credentialsProvider: AwsCredentialsProvider): Unit = {
val infoFilePath = ExecutionPlanUtils.inferOutputInfoFileNameOnS3(qe, cf.outputInfoFileName)

// Write _INFO file to the output directory
infoFilePath.foreach(path => {

import za.co.absa.atum.persistence.s3.S3LocationRegionImplicits.SimpleS3LocationRegionExt
import za.co.absa.atum.location.S3Location.StringS3LocationExt

val location = path.toS3LocationOrFail.withRegion(region)
val location = SimpleS3Location(path) // would throw IAE on apply (parse error)
.withRegion(region)

AtumSdkS3.log.debug(s"Inferred _INFO Location = $location")
cf.storeCurrentInfoFileOnSdkS3(location, kmsSettings)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ case class ControlMeasuresSdkS3LoaderJsonFile(inputLocation: SimpleS3LocationWit
}

override def getInfo: String = {
s"JSON deserializer from ${inputLocation.s3String}"
s"JSON deserializer from ${inputLocation.asSimpleS3LocationString}"
}

private[s3] def getS3Client: S3Client = SdkS3ClientUtils.getS3Client(inputLocation.region, credentialsProvider)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ case class ControlMeasuresSdkS3StorerJsonFile(outputLocation: SimpleS3LocationWi
}

override def getInfo: String = {
s"JSON serializer for Storer to ${outputLocation.s3String}"
s"JSON serializer for Storer to ${outputLocation.asSimpleS3LocationString}"
}

private[s3] def getS3Client: S3Client = SdkS3ClientUtils.getS3Client(outputLocation.region, credentialsProvider)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@ package za.co.absa.atum.persistence.s3

import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.s3.model.ServerSideEncryption
import za.co.absa.atum.location.S3Location
import za.co.absa.commons.s3.{S3Location, SimpleS3Location}

trait Regional {
def region: Region
}

case class SimpleS3LocationWithRegion(protocol: String, bucketName: String, path: String, region: Region) extends S3Location with Regional {
def withRegion(region: Region): SimpleS3LocationWithRegion = this.copy(region = region)

override def asSimpleS3LocationString: String = SimpleS3Location(protocol, bucketName, path).asSimpleS3LocationString
}

case class S3KmsSettings(kmsKeyId: String, serverSideEncryption: ServerSideEncryption = ServerSideEncryption.AWS_KMS)
Expand Down
7 changes: 7 additions & 0 deletions atum/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>za.co.absa.commons</groupId>
<artifactId>commons_${scala.binary.version}</artifactId>
<version>${commons.version}</version>
<scope>provided</scope>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this provided?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for noticing, my mistake, that was a remnant of an exploratory build. Removed.

</dependency>

<dependency>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
Expand Down
6 changes: 3 additions & 3 deletions atum/src/main/scala/za/co/absa/atum/utils/InfoFile.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.net.URI
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import za.co.absa.atum.AtumImplicits.{DefaultControlInfoLoader, DefaultControlInfoStorer, StringPathExt}
import za.co.absa.atum.location.S3Location.StringS3LocationExt
import za.co.absa.commons.s3.SimpleS3Location.SimpleS3LocationExt

object InfoFile {
/**
Expand All @@ -37,11 +37,11 @@ object InfoFile {
def convertFullPathToFsAndRelativePath(fullPath: String)(implicit hadoopConfiguration: Configuration): (FileSystem, Path) = {
val sanitizedFullPath = fullPath.replaceAll("[\\*\\?]", "")

sanitizedFullPath.toS3Location match {
sanitizedFullPath.toSimpleS3Location match {

case Some(s3Location) =>
// this is S3 over hadoop FS API, not SDK S3 approach
val s3Uri = new URI(s3Location.s3String) // s3://<bucket>
val s3Uri = new URI(s3Location.asSimpleS3LocationString) // s3://<bucket>
val s3Path = new Path(s"/${s3Location.path}") // /<text-file-object-path>

val fs = FileSystem.get(s3Uri, hadoopConfiguration)
Expand Down
8 changes: 8 additions & 0 deletions examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@
<version>${project.version}</version>
</dependency>

<!-- Commons -->
<dependency>
<groupId>za.co.absa.commons</groupId>
<artifactId>commons_${scala.binary.version}</artifactId>
<version>${commons.version}</version>
<scope>test</scope>
</dependency>

<!-- Scalatest is added as a provided scope so it won't be included to the uber jar -->
<dependency>
<groupId>org.scalatest</groupId>
Expand Down
63 changes: 0 additions & 63 deletions model/src/main/scala/za/co/absa/atum/location/S3Location.scala

This file was deleted.

64 changes: 0 additions & 64 deletions model/src/test/scala/za/co/absa/atum/location/S3LocationSpec.scala

This file was deleted.

1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@
<specs.version>2.5</specs.version>
<aws.java.sdk.version>2.13.65</aws.java.sdk.version>
<mockito.scala.version>1.15.0</mockito.scala.version>
<commons.version>0.0.27</commons.version>

<!-- Spark versions -->
<spark-24.version>2.4.6</spark-24.version>
Expand Down