Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collections API #1606

Merged
merged 26 commits into from
Sep 1, 2016
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
c70a674
collections api init
pomadchin Jul 27, 2016
688d088
LayerCollection readers for all backends without optimisations; Hadoo…
pomadchin Jul 28, 2016
591ab02
fix hadoop collection reader
pomadchin Jul 29, 2016
0928cc5
add reading
pomadchin Jul 30, 2016
c67d646
Merge branch 'master' of github.com:pomadchin/geotrellis into feature…
pomadchin Aug 1, 2016
43405d5
+file multithread reads
pomadchin Aug 1, 2016
ee67aa6
fix collections api
pomadchin Aug 1, 2016
2abeb1f
hbase collection reader
pomadchin Aug 1, 2016
2b91182
parallelize reads in collections api
pomadchin Aug 2, 2016
50e0023
improve collection api reads
pomadchin Aug 2, 2016
e85fb2d
fixed thread pools in collection readers
pomadchin Aug 9, 2016
7158cf2
collections reading threads are configurable
pomadchin Aug 16, 2016
47352b7
Accumulo sim; removed partitions number, generic njoin func
pomadchin Aug 22, 2016
16b4117
Merge branch 'master' of github.com:pomadchin/geotrellis into feature…
pomadchin Aug 22, 2016
807134f
accumulo and hbase etl fix
pomadchin Aug 22, 2016
7367d66
hide thread pool creation / closing inside njoin function
pomadchin Aug 22, 2016
821b09d
improve thread pool size definition
pomadchin Aug 26, 2016
1e083f0
explicit return type in all colelction readers
pomadchin Aug 26, 2016
bbf27c3
hbase conenction control fix
pomadchin Aug 30, 2016
3eead88
hbase reads performance improvements
pomadchin Aug 30, 2016
20df234
rollback readers; they were operating normally; problems were caused …
pomadchin Aug 30, 2016
d08f30d
safer hbase scanners handle
pomadchin Aug 31, 2016
238f320
LayerCollection.njoin function usage
pomadchin Aug 31, 2016
7a499dd
LayerCollection.njoin function usage
pomadchin Aug 31, 2016
7b2f659
Merge branch 'feature/collections-api' of https://github.com/pomadchi…
echeipesh Sep 1, 2016
3c0134c
Merge pull request #15 from echeipesh/feature/collections-api-njoin
pomadchin Sep 1, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions accumulo/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
geotrellis.accumulo {
catalog = "metadata"
threads.rdd.write = 32
}
threads {
collection.read = default
rdd.write = default
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import org.apache.accumulo.core.security.Authorizations
import org.apache.accumulo.core.data._
import org.apache.hadoop.io.Text


import scala.collection.JavaConversions._

object AccumuloAttributeStore {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package geotrellis.spark.io.accumulo

import geotrellis.spark.io._
import geotrellis.spark.io.avro.codecs.KeyValueRecordCodec
import geotrellis.spark.io.avro.{AvroEncoder, AvroRecordCodec}
import geotrellis.spark.{Boundable, KeyBounds}

import scalaz.std.vector._
import scalaz.concurrent.{Strategy, Task}
import scalaz.stream.{Process, channel, nondeterminism, tee}
import org.apache.accumulo.core.data.{Range => AccumuloRange}
import org.apache.accumulo.core.security.Authorizations
import org.apache.avro.Schema
import org.apache.hadoop.io.Text
import com.typesafe.config.ConfigFactory

import scala.collection.JavaConversions._
import scala.reflect.ClassTag
import java.util.concurrent.Executors

object AccumuloCollectionReader {
def read[K: Boundable: AvroRecordCodec: ClassTag, V: AvroRecordCodec: ClassTag](
table: String,
columnFamily: Text,
queryKeyBounds: Seq[KeyBounds[K]],
decomposeBounds: KeyBounds[K] => Seq[AccumuloRange],
filterIndexOnly: Boolean,
writerSchema: Option[Schema] = None,
threads: Int = ConfigFactory.load().getThreads("geotrellis.accumulo.threads.collection.read")
)(implicit instance: AccumuloInstance): Seq[(K, V)] = {
if(queryKeyBounds.isEmpty) return Seq.empty[(K, V)]

val codec = KeyValueRecordCodec[K, V]
val includeKey = (key: K) => queryKeyBounds.includeKey(key)

val ranges = queryKeyBounds.flatMap(decomposeBounds).toIterator

val pool = Executors.newFixedThreadPool(threads)

val range: Process[Task, AccumuloRange] = Process.unfold(ranges) { iter =>
if (iter.hasNext) Some(iter.next(), iter)
else None
}

val readChannel = channel.lift { (range: AccumuloRange) => Task {
val scanner = instance.connector.createScanner(table, new Authorizations())
scanner.setRange(range)
scanner.fetchColumnFamily(columnFamily)
val result = scanner.iterator.map { case entry =>
AvroEncoder.fromBinary(writerSchema.getOrElse(codec.schema), entry.getValue.get)(codec)
}.flatMap { pairs: Vector[(K, V)] =>
if(filterIndexOnly) pairs
else pairs.filter { pair => includeKey(pair._1) }
}.toVector
scanner.close()
result
}(pool) }

val read = range.tee(readChannel)(tee.zipApply).map(Process.eval)

val result: Seq[(K, V)] =
nondeterminism
.njoin(maxOpen = threads, maxQueued = threads) { read }(Strategy.Executor(pool))
.runFoldMap(identity).unsafePerformSync

pool.shutdown(); result
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package geotrellis.spark.io.accumulo

import geotrellis.spark._
import geotrellis.spark.io._
import geotrellis.spark.io.avro._
import geotrellis.util._

import org.apache.accumulo.core.data.{Range => AccumuloRange}
import org.apache.hadoop.io.Text
import spray.json._

import scala.reflect._

class AccumuloLayerCollectionReader(val attributeStore: AttributeStore)(implicit instance: AccumuloInstance) extends CollectionLayerReader[LayerId] {

def read[
K: AvroRecordCodec: Boundable: JsonFormat: ClassTag,
V: AvroRecordCodec: ClassTag,
M: JsonFormat: GetComponent[?, Bounds[K]]
](id: LayerId, rasterQuery: LayerQuery[K, M], filterIndexOnly: Boolean) = {
if (!attributeStore.layerExists(id)) throw new LayerNotFoundError(id)

val LayerAttributes(header, metadata, keyIndex, writerSchema) = try {
attributeStore.readLayerAttributes[AccumuloLayerHeader, M, K](id)
} catch {
case e: AttributeNotFoundError => throw new LayerReadError(id).initCause(e)
}

val queryKeyBounds = rasterQuery(metadata)

val decompose = (bounds: KeyBounds[K]) =>
keyIndex.indexRanges(bounds).map { case (min, max) =>
new AccumuloRange(new Text(AccumuloKeyEncoder.long2Bytes(min)), new Text(AccumuloKeyEncoder.long2Bytes(max)))
}

val seq = AccumuloCollectionReader.read[K, V](header.tileTable, columnFamily(id), queryKeyBounds, decompose, filterIndexOnly, Some(writerSchema))
new ContextCollection(seq, metadata)
}
}

object AccumuloLayerCollectionReader {
def apply(attributeStore: AccumuloAttributeStore)(implicit instance: AccumuloInstance): AccumuloLayerCollectionReader =
new AccumuloLayerCollectionReader(attributeStore)

def apply(implicit instance: AccumuloInstance): AccumuloLayerCollectionReader =
new AccumuloLayerCollectionReader(AccumuloAttributeStore(instance.connector))
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package geotrellis.spark.io.accumulo

import geotrellis.spark.util._
import geotrellis.spark.io._
import geotrellis.spark.io.hadoop._

import org.apache.hadoop.mapreduce.Job
Expand All @@ -17,7 +18,7 @@ import java.util.UUID
import java.util.concurrent.Executors

object AccumuloWriteStrategy {
val threads = ConfigFactory.load().getInt("geotrellis.accumulo.threads.rdd.write")
val threads = ConfigFactory.load().getThreads("geotrellis.accumulo.threads.rdd.write")

def DEFAULT = HdfsWriteStrategy("/geotrellis-ingest")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ class AccumuloSpaceTimeSpec
implicit lazy val instance = MockAccumuloInstance()

lazy val reader = AccumuloLayerReader(instance)
lazy val writer = AccumuloLayerWriter(instance, "tiles", SocketWriteStrategy())
lazy val creader = AccumuloLayerCollectionReader(instance)
lazy val writer = AccumuloLayerWriter(instance, "tiles", SocketWriteStrategy())
lazy val deleter = AccumuloLayerDeleter(instance)
lazy val reindexer = AccumuloLayerReindexer(instance, SocketWriteStrategy())
lazy val updater = AccumuloLayerUpdater(instance, SocketWriteStrategy())
lazy val tiles = AccumuloValueReader(instance)
lazy val sample = CoordinateSpaceTime
lazy val copier = AccumuloLayerCopier(instance, reader, writer)
lazy val mover = AccumuloLayerMover(copier, deleter)
lazy val copier = AccumuloLayerCopier(instance, reader, writer)
lazy val mover = AccumuloLayerMover(copier, deleter)
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ class AccumuloSpatialSpec

implicit lazy val instance = MockAccumuloInstance()

lazy val reader = AccumuloLayerReader(instance)
lazy val writer = AccumuloLayerWriter(instance, "tiles", SocketWriteStrategy())
lazy val deleter = AccumuloLayerDeleter(instance)
lazy val reader = AccumuloLayerReader(instance)
lazy val creader = AccumuloLayerCollectionReader(instance)
lazy val writer = AccumuloLayerWriter(instance, "tiles", SocketWriteStrategy())
lazy val deleter = AccumuloLayerDeleter(instance)
lazy val reindexer = AccumuloLayerReindexer(instance, SocketWriteStrategy())
lazy val updater = AccumuloLayerUpdater(instance, SocketWriteStrategy())
lazy val tiles = AccumuloValueReader(instance)
lazy val sample = AllOnesTestFile

lazy val copier = AccumuloLayerCopier(instance, reader, writer)
lazy val mover = AccumuloLayerMover(copier, deleter)
lazy val tiles = AccumuloValueReader(instance)
lazy val sample = AllOnesTestFile
lazy val copier = AccumuloLayerCopier(instance, reader, writer)
lazy val mover = AccumuloLayerMover(copier, deleter)
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ class AccumuloTileFeatureSpaceTimeSpec
implicit lazy val instance = MockAccumuloInstance()

lazy val reader = AccumuloLayerReader(instance)
lazy val writer = AccumuloLayerWriter(instance, "tiles", SocketWriteStrategy())
lazy val creader = AccumuloLayerCollectionReader(instance)
lazy val writer = AccumuloLayerWriter(instance, "tiles", SocketWriteStrategy())
lazy val deleter = AccumuloLayerDeleter(instance)
lazy val reindexer = AccumuloLayerReindexer(instance, SocketWriteStrategy())
lazy val updater = AccumuloLayerUpdater(instance, SocketWriteStrategy())
lazy val tiles = AccumuloValueReader(instance)
lazy val sample = CoordinateSpaceTime
lazy val copier = AccumuloLayerCopier(instance, reader, writer)
lazy val mover = AccumuloLayerMover(copier, deleter)
lazy val copier = AccumuloLayerCopier(instance, reader, writer)
lazy val mover = AccumuloLayerMover(copier, deleter)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ class AccumuloTileFeatureSpatialSpec

implicit lazy val instance = MockAccumuloInstance()

lazy val reader = AccumuloLayerReader(instance)
lazy val writer = AccumuloLayerWriter(instance, "tiles", SocketWriteStrategy())
lazy val deleter = AccumuloLayerDeleter(instance)
lazy val reader = AccumuloLayerReader(instance)
lazy val creader = AccumuloLayerCollectionReader(instance)
lazy val writer = AccumuloLayerWriter(instance, "tiles", SocketWriteStrategy())
lazy val deleter = AccumuloLayerDeleter(instance)
lazy val reindexer = AccumuloLayerReindexer(instance, SocketWriteStrategy())
lazy val updater = AccumuloLayerUpdater(instance, SocketWriteStrategy())
lazy val tiles = AccumuloValueReader(instance)
lazy val sample = AllOnesTestFile

lazy val copier = AccumuloLayerCopier(instance, reader, writer)
lazy val mover = AccumuloLayerMover(copier, deleter)
lazy val tiles = AccumuloValueReader(instance)
lazy val sample = AllOnesTestFile
lazy val copier = AccumuloLayerCopier(instance, reader, writer)
lazy val mover = AccumuloLayerMover(copier, deleter)
}
5 changes: 3 additions & 2 deletions cassandra/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ geotrellis.cassandra {
usedHostsPerRemoteDc = 0
allowRemoteDCsForLocalConsistencyLevel = false
threads {
collection.read = default
rdd {
write = 32
read = 32
write = default
read = default
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package geotrellis.spark.io.cassandra

import geotrellis.spark.{Boundable, KeyBounds, LayerId}
import geotrellis.spark.io._
import geotrellis.spark.io.avro.codecs.KeyValueRecordCodec
import geotrellis.spark.io.avro.{AvroEncoder, AvroRecordCodec}
import geotrellis.spark.io.index.MergeQueue
import geotrellis.spark.util.KryoWrapper

import org.apache.avro.Schema
import com.datastax.driver.core.querybuilder.QueryBuilder
import com.datastax.driver.core.querybuilder.QueryBuilder.{eq => eqs}
import com.typesafe.config.ConfigFactory

import scala.collection.JavaConversions._
import scala.reflect.ClassTag

object CassandraCollectionReader {
def read[K: Boundable : AvroRecordCodec : ClassTag, V: AvroRecordCodec : ClassTag](
instance: CassandraInstance,
keyspace: String,
table: String,
layerId: LayerId,
queryKeyBounds: Seq[KeyBounds[K]],
decomposeBounds: KeyBounds[K] => Seq[(Long, Long)],
filterIndexOnly: Boolean,
writerSchema: Option[Schema] = None,
threads: Int = ConfigFactory.load().getThreads("geotrellis.cassandra.threads.collection.read")
): Seq[(K, V)] = {
if (queryKeyBounds.isEmpty) return Seq.empty[(K, V)]

val includeKey = (key: K) => queryKeyBounds.includeKey(key)
val _recordCodec = KeyValueRecordCodec[K, V]
val kwWriterSchema = KryoWrapper(writerSchema) //Avro Schema is not Serializable

val ranges = if (queryKeyBounds.length > 1)
MergeQueue(queryKeyBounds.flatMap(decomposeBounds))
else
queryKeyBounds.flatMap(decomposeBounds)

val query = QueryBuilder.select("value")
.from(keyspace, table)
.where(eqs("key", QueryBuilder.bindMarker()))
.and(eqs("name", layerId.name))
.and(eqs("zoom", layerId.zoom))
.toString

instance.withSessionDo { session =>
val statement = session.prepare(query)

CollectionLayerReader.njoin[K, V](ranges, { iter =>
if (iter.hasNext) {
val index = iter.next()
val row = session.execute(statement.bind(index.asInstanceOf[java.lang.Long]))
if (row.nonEmpty) {
val bytes = row.one().getBytes("value").array()
val recs = AvroEncoder.fromBinary(kwWriterSchema.value.getOrElse(_recordCodec.schema), bytes)(_recordCodec)
if (filterIndexOnly) Some(recs, iter)
else Some(recs.filter { row => includeKey(row._1) }, iter)
} else Some(Vector.empty, iter)
} else None
}, threads)
}: Seq[(K, V)]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package geotrellis.spark.io.cassandra

import geotrellis.spark._
import geotrellis.spark.io._
import geotrellis.spark.io.avro._
import geotrellis.util._

import spray.json._

import scala.reflect._

class CassandraLayerCollectionReader(val attributeStore: AttributeStore, instance: CassandraInstance) extends CollectionLayerReader[LayerId] {

def read[
K: AvroRecordCodec: Boundable: JsonFormat: ClassTag,
V: AvroRecordCodec: ClassTag,
M: JsonFormat: GetComponent[?, Bounds[K]]
](id: LayerId, rasterQuery: LayerQuery[K, M], filterIndexOnly: Boolean) = {
if (!attributeStore.layerExists(id)) throw new LayerNotFoundError(id)

val LayerAttributes(header, metadata, keyIndex, writerSchema) = try {
attributeStore.readLayerAttributes[CassandraLayerHeader, M, K](id)
} catch {
case e: AttributeNotFoundError => throw new LayerReadError(id).initCause(e)
}

val queryKeyBounds = rasterQuery(metadata)

val decompose = (bounds: KeyBounds[K]) => keyIndex.indexRanges(bounds)

val seq = CassandraCollectionReader.read[K, V](instance, header.keyspace, header.tileTable, id, queryKeyBounds, decompose, filterIndexOnly, Some(writerSchema))
new ContextCollection(seq, metadata)
}
}

object CassandraLayerCollectionReader {
def apply(instance: CassandraInstance): CassandraLayerCollectionReader =
new CassandraLayerCollectionReader(CassandraAttributeStore(instance), instance)

def apply(attributeStore: CassandraAttributeStore): CassandraLayerCollectionReader =
new CassandraLayerCollectionReader(attributeStore, attributeStore.instance)
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package geotrellis.spark.io.cassandra

import geotrellis.spark.io.avro.codecs.KeyValueRecordCodec
import geotrellis.spark.util.KryoWrapper
import geotrellis.spark.{Boundable, KeyBounds, LayerId}
import geotrellis.spark.io._
import geotrellis.spark.io.avro.codecs.KeyValueRecordCodec
import geotrellis.spark.io.avro.{AvroEncoder, AvroRecordCodec}
import geotrellis.spark.io.index.{IndexRanges, MergeQueue}
import geotrellis.spark.util.KryoWrapper

import scalaz.concurrent.{Strategy, Task}
import scalaz.std.vector._
Expand Down Expand Up @@ -32,7 +33,7 @@ object CassandraRDDReader {
filterIndexOnly: Boolean,
writerSchema: Option[Schema] = None,
numPartitions: Option[Int] = None,
threads: Int = ConfigFactory.load().getInt("geotrellis.cassandra.threads.rdd.read")
threads: Int = ConfigFactory.load().getThreads("geotrellis.cassandra.threads.rdd.read")
)(implicit sc: SparkContext): RDD[(K, V)] = {
if (queryKeyBounds.isEmpty) return sc.emptyRDD[(K, V)]

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package geotrellis.spark.io.cassandra

import geotrellis.spark.io._
import geotrellis.spark.io.avro._
import geotrellis.spark.io.avro.codecs._
import geotrellis.spark.LayerId
Expand Down Expand Up @@ -27,7 +28,7 @@ object CassandraRDDWriter {
decomposeKey: K => Long,
keyspace: String,
table: String,
threads: Int = ConfigFactory.load().getInt("geotrellis.cassandra.threads.rdd.write")
threads: Int = ConfigFactory.load().getThreads("geotrellis.cassandra.threads.rdd.write")
): Unit = {
implicit val sc = raster.sparkContext

Expand Down
Loading