Skip to content

Commit

Permalink
AbsaCommons' (snapshot) S3Location update reflected
Browse files Browse the repository at this point in the history
  • Loading branch information
dk1844 committed Apr 6, 2021
1 parent aeb3cab commit 98a09f1
Show file tree
Hide file tree
Showing 6 changed files with 10 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,6 @@ class ControlFrameworkStateSdkS3(sparkSession: SparkSession) extends ControlFram
val storer = ControlMeasuresSdkS3StorerJsonFile(s3Location, s3KmsSettings)

storer.store(accumulator.getControlMeasure)
AtumSdkS3.log.info(s"Control measurements saved to ${s3Location.s3String}")
AtumSdkS3.log.info(s"Control measurements saved to ${s3Location.asSimpleS3LocationString}")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class SparkQueryExecutionListenerSdkS3(cf: ControlFrameworkStateSdkS3) extends S
// adding s3 processing
cf.accumulator.getStorer match {
case Some(s3storer: S3ControlMeasuresStorer) =>
AtumSdkS3.log.debug(s"SparkQueryExecutionListener.onSuccess for S3ControlMeasuresStorer: writing to ${s3storer.outputLocation.s3String}")
AtumSdkS3.log.debug(s"SparkQueryExecutionListener.onSuccess for S3ControlMeasuresStorer: writing to ${s3storer.outputLocation.asSimpleS3LocationString}")
writeInfoFileForQueryForSdkS3(qe, s3storer.outputLocation.region, s3storer.kmsSettings)(s3storer.credentialsProvider)

// Notify listeners
Expand All @@ -56,9 +56,9 @@ class SparkQueryExecutionListenerSdkS3(cf: ControlFrameworkStateSdkS3) extends S
infoFilePath.foreach(path => {

import za.co.absa.atum.persistence.s3.S3LocationRegionImplicits.SimpleS3LocationRegionExt
import za.co.absa.commons.s3.S3Location._
import za.co.absa.commons.s3.SimpleS3Location._

val location = path.toS3Location match {
val location = path.toSimpleS3Location match {
case Some(loc) => loc.withRegion(region)
case _ => throw new IllegalArgumentException(s"Could not parse S3 Location from $path!")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ case class ControlMeasuresSdkS3LoaderJsonFile(inputLocation: SimpleS3LocationWit
}

override def getInfo: String = {
s"JSON deserializer from ${inputLocation.s3String}"
s"JSON deserializer from ${inputLocation.asSimpleS3LocationString}"
}

private[s3] def getS3Client: S3Client = SdkS3ClientUtils.getS3Client(inputLocation.region, credentialsProvider)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ case class ControlMeasuresSdkS3StorerJsonFile(outputLocation: SimpleS3LocationWi
}

override def getInfo: String = {
s"JSON serializer for Storer to ${outputLocation.s3String}"
s"JSON serializer for Storer to ${outputLocation.asSimpleS3LocationString}"
}

private[s3] def getS3Client: S3Client = SdkS3ClientUtils.getS3Client(outputLocation.region, credentialsProvider)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ trait Regional {
case class SimpleS3LocationWithRegion(protocol: String, bucketName: String, path: String, region: Region) extends S3Location with Regional {
def withRegion(region: Region): SimpleS3LocationWithRegion = this.copy(region = region)

override def s3String: String = SimpleS3Location(protocol, bucketName, path).s3String
override def asSimpleS3LocationString: String = SimpleS3Location(protocol, bucketName, path).asSimpleS3LocationString
}

case class S3KmsSettings(kmsKeyId: String, serverSideEncryption: ServerSideEncryption = ServerSideEncryption.AWS_KMS)
Expand Down
6 changes: 3 additions & 3 deletions atum/src/main/scala/za/co/absa/atum/utils/InfoFile.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.net.URI
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import za.co.absa.atum.AtumImplicits.{DefaultControlInfoLoader, DefaultControlInfoStorer, StringPathExt}
import za.co.absa.commons.s3.S3Location.StringS3LocationExt
import za.co.absa.commons.s3.SimpleS3Location.SimpleS3LocationExt

object InfoFile {
/**
Expand All @@ -37,11 +37,11 @@ object InfoFile {
def convertFullPathToFsAndRelativePath(fullPath: String)(implicit hadoopConfiguration: Configuration): (FileSystem, Path) = {
val sanitizedFullPath = fullPath.replaceAll("[\\*\\?]", "")

sanitizedFullPath.toS3Location match {
sanitizedFullPath.toSimpleS3Location match {

case Some(s3Location) =>
// this is S3 over hadoop FS API, not SDK S3 approach
val s3Uri = new URI(s3Location.s3String) // s3://<bucket>
val s3Uri = new URI(s3Location.asSimpleS3LocationString) // s3://<bucket>
val s3Path = new Path(s"/${s3Location.path}") // /<text-file-object-path>

val fs = FileSystem.get(s3Uri, hadoopConfiguration)
Expand Down

0 comments on commit 98a09f1

Please sign in to comment.