Skip to content

Commit

Permalink
Merge pull request #2252 from lossyrob/backport/1.1/2233
Browse files Browse the repository at this point in the history
[BACKPORT 1.1] Fixes to #2231 and #2232
  • Loading branch information
lossyrob authored Jun 22, 2017
2 parents 814332d + dc0ab4a commit 3709d33
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 15 deletions.
21 changes: 17 additions & 4 deletions s3/src/main/scala/geotrellis/spark/io/s3/S3GeoTiffRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import geotrellis.raster.io.geotiff.tags.TiffTags
import geotrellis.spark._
import geotrellis.spark.io.RasterReader
import geotrellis.spark.io.s3.util.S3RangeReader
import geotrellis.util.StreamingByteReader
import geotrellis.util.{StreamingByteReader, LazyLogging}
import geotrellis.vector._

import org.apache.hadoop.conf.Configuration
Expand All @@ -36,7 +36,7 @@ import java.nio.ByteBuffer
/**
* The S3GeoTiffRDD object allows for the creation of whole or windowed RDD[(K, V)]s from files on S3.
*/
object S3GeoTiffRDD {
object S3GeoTiffRDD extends LazyLogging {
final val GEOTIFF_TIME_TAG_DEFAULT = "TIFFTAG_DATETIME"
final val GEOTIFF_TIME_FORMAT_DEFAULT = "yyyy:MM:dd HH:mm:ss"

Expand All @@ -51,7 +51,8 @@ object S3GeoTiffRDD {
* May result in a one input GeoTiff being split amongst multiple records if it exceeds this size.
* If no maximum tile size is specific, then each file file is read fully.
* @param numPartitions How many partitions Spark should create when it repartitions the data.
* @param partitionBytes Desired partition size in bytes, at least one item per partition will be assigned
* @param partitionBytes Desired partition size in bytes, at least one item per partition will be assigned.
This option is incompatible with the maxTileSize option.
* @param chunkSize How many bytes should be read in at a time.
* @param delimiter Delimiter to use for S3 objet listings. See
* @param getS3Client A function to instantiate an S3Client. Must be serializable.
Expand Down Expand Up @@ -163,7 +164,19 @@ object S3GeoTiffRDD {
val repartitioned =
options.numPartitions match {
case Some(p) => windows.repartition(p)
case None => windows
case None =>
options.partitionBytes match {
case Some(byteCount) =>
// Because we do not have cell type information, we cannot
// perform the necessary estimates for the partition bytes.
logger.warn(
s"${classOf[Options].getName}.partitionBytes set with maxTileSize, " +
"cannot perform partitioning based on byte count. Option ignored. " +
"Use numPartitions instead.")
windows
case None =>
windows
}
}

repartitioned.map { case (objectRequest: GetObjectRequest, pixelWindow: GridBounds) =>
Expand Down
29 changes: 18 additions & 11 deletions s3/src/main/scala/geotrellis/spark/io/s3/S3InputFormat.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import geotrellis.proj4.CRS
import geotrellis.spark.io.hadoop._
import geotrellis.util.LazyLogging

import com.amazonaws.services.s3.model.{ListObjectsRequest, ObjectListing}
import com.amazonaws.auth._
import com.amazonaws.regions._
import com.amazonaws.services.s3.model.{ListObjectsRequest, ObjectListing, S3ObjectSummary}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.{InputFormat, Job, JobContext}

Expand Down Expand Up @@ -103,6 +103,21 @@ abstract class S3InputFormat[K, V] extends InputFormat[K,V] with LazyLogging {

var splits: Vector[S3InputSplit] = Vector(makeNewSplit)

val s3ObjectFilter: S3ObjectSummary => Boolean =
{ obj =>
val key = obj.getKey

val isDir = key.endsWith("/")
val isTiff =
if (extensions.isEmpty)
true
else {
extensions.map(key.endsWith).reduce(_ || _)
}

!isDir && isTiff
}

if (null == partitionCountConf) {
// By default attempt to make partitions the same size
val maxSplitBytes = if (null == partitionSizeConf) S3InputFormat.DEFAULT_PARTITION_BYTES else partitionSizeConf.toLong
Expand All @@ -125,15 +140,7 @@ abstract class S3InputFormat[K, V] extends InputFormat[K,V] with LazyLogging {

s3client
.listObjectsIterator(request)
.filter(!_.getKey.endsWith("/"))
.filter { obj =>
if (extensions.isEmpty)
true
else {
val key = obj.getKey
extensions.map(key.endsWith).reduce(_ || _)
}
}
.filter(s3ObjectFilter)
.foreach { obj =>
val objSize = obj.getSize
val curSplit =
Expand All @@ -159,7 +166,7 @@ abstract class S3InputFormat[K, V] extends InputFormat[K,V] with LazyLogging {
val keys =
s3client
.listObjectsIterator(request)
.filter(! _.getKey.endsWith("/"))
.filter(s3ObjectFilter)
.toVector

val groupCount = math.max(1, keys.length / partitionCount)
Expand Down

0 comments on commit 3709d33

Please sign in to comment.