Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collections API #1606

Merged
merged 26 commits into from
Sep 1, 2016
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
c70a674
collections api init
pomadchin Jul 27, 2016
688d088
LayerCollection readers for all backends without optimisations; Hadoo…
pomadchin Jul 28, 2016
591ab02
fix hadoop collection reader
pomadchin Jul 29, 2016
0928cc5
add reading
pomadchin Jul 30, 2016
c67d646
Merge branch 'master' of github.com:pomadchin/geotrellis into feature…
pomadchin Aug 1, 2016
43405d5
+file multithread reads
pomadchin Aug 1, 2016
ee67aa6
fix collections api
pomadchin Aug 1, 2016
2abeb1f
hbase collection reader
pomadchin Aug 1, 2016
2b91182
parallelize reads in collections api
pomadchin Aug 2, 2016
50e0023
improve collection api reads
pomadchin Aug 2, 2016
e85fb2d
fixed thread pools in collection readers
pomadchin Aug 9, 2016
7158cf2
collections reading threads are configurable
pomadchin Aug 16, 2016
47352b7
Accumulo sim; removed partitions number, generic njoin func
pomadchin Aug 22, 2016
16b4117
Merge branch 'master' of github.com:pomadchin/geotrellis into feature…
pomadchin Aug 22, 2016
807134f
accumulo and hbase etl fix
pomadchin Aug 22, 2016
7367d66
hide thread pool creation / closing inside njoin function
pomadchin Aug 22, 2016
821b09d
improve thread pool size definition
pomadchin Aug 26, 2016
1e083f0
explicit return type in all colelction readers
pomadchin Aug 26, 2016
bbf27c3
hbase conenction control fix
pomadchin Aug 30, 2016
3eead88
hbase reads performance improvements
pomadchin Aug 30, 2016
20df234
rollback readers; they were operating normally; problems were caused …
pomadchin Aug 30, 2016
d08f30d
safer hbase scanners handle
pomadchin Aug 31, 2016
238f320
LayerCollection.njoin function usage
pomadchin Aug 31, 2016
7a499dd
LayerCollection.njoin function usage
pomadchin Aug 31, 2016
7b2f659
Merge branch 'feature/collections-api' of https://github.com/pomadchi…
echeipesh Sep 1, 2016
3c0134c
Merge pull request #15 from echeipesh/feature/collections-api-njoin
pomadchin Sep 1, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import org.apache.accumulo.core.security.Authorizations
import org.apache.accumulo.core.data._
import org.apache.hadoop.io.Text


import scala.collection.JavaConversions._

object AccumuloAttributeStore {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package geotrellis.spark.io.accumulo

import geotrellis.spark.io.avro.codecs.KeyValueRecordCodec
import geotrellis.spark.io.avro.{AvroEncoder, AvroRecordCodec}
import geotrellis.spark.{Boundable, KeyBounds}

import org.apache.accumulo.core.data.{Range => AccumuloRange}
import org.apache.accumulo.core.security.Authorizations
import org.apache.avro.Schema
import org.apache.hadoop.io.Text

import scala.collection.JavaConversions._
import scala.reflect.ClassTag

object AccumuloCollectionReader {
def read[K: Boundable: AvroRecordCodec: ClassTag, V: AvroRecordCodec: ClassTag](
table: String,
columnFamily: Text,
queryKeyBounds: Seq[KeyBounds[K]],
decomposeBounds: KeyBounds[K] => Seq[AccumuloRange],
filterIndexOnly: Boolean,
writerSchema: Option[Schema] = None
)(implicit instance: AccumuloInstance): Seq[(K, V)] = {
if(queryKeyBounds.isEmpty) return Seq.empty[(K, V)]

val codec = KeyValueRecordCodec[K, V]
val includeKey = (key: K) => queryKeyBounds.includeKey(key)

val ranges = queryKeyBounds.flatMap(decomposeBounds)

ranges flatMap { range: AccumuloRange =>
val scanner = instance.connector.createScanner(table, new Authorizations())
scanner.setRange(range)
scanner.fetchColumnFamily(columnFamily)
scanner.iterator.map { case entry =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know scanner will have a thread pool available to run multiple requests, does it do the thing that is very useful here and process multiple ranges async or are they essentially sequential from range to range but async for each range?

Copy link
Member Author

@pomadchin pomadchin Aug 18, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

async for each range; would fix that issue with updating that pr up to master; i thought it is completely async, but forgot that thing is valid only in case of hbase, where we can setup multirange scanner

AvroEncoder.fromBinary(writerSchema.getOrElse(codec.schema), entry.getValue.get)(codec)
}.flatMap { pairs: Vector[(K, V)] =>
if(filterIndexOnly) pairs
else pairs.filter { pair => includeKey(pair._1) }
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package geotrellis.spark.io.accumulo

import geotrellis.spark._
import geotrellis.spark.io._
import geotrellis.spark.io.avro._
import geotrellis.util._

import org.apache.accumulo.core.data.{Range => AccumuloRange}
import org.apache.hadoop.io.Text
import spray.json._

import scala.reflect._

class AccumuloLayerCollectionReader(val attributeStore: AttributeStore)(implicit instance: AccumuloInstance) extends CollectionLayerReader[LayerId] {

def read[
K: AvroRecordCodec: Boundable: JsonFormat: ClassTag,
V: AvroRecordCodec: ClassTag,
M: JsonFormat: GetComponent[?, Bounds[K]]
](id: LayerId, rasterQuery: LayerQuery[K, M], numPartitions: Int, filterIndexOnly: Boolean) = {
if (!attributeStore.layerExists(id)) throw new LayerNotFoundError(id)

val LayerAttributes(header, metadata, keyIndex, writerSchema) = try {
attributeStore.readLayerAttributes[AccumuloLayerHeader, M, K](id)
} catch {
case e: AttributeNotFoundError => throw new LayerReadError(id).initCause(e)
}

val queryKeyBounds = rasterQuery(metadata)

val decompose = (bounds: KeyBounds[K]) =>
keyIndex.indexRanges(bounds).map { case (min, max) =>
new AccumuloRange(new Text(AccumuloKeyEncoder.long2Bytes(min)), new Text(AccumuloKeyEncoder.long2Bytes(max)))
}

val seq = AccumuloCollectionReader.read[K, V](header.tileTable, columnFamily(id), queryKeyBounds, decompose, filterIndexOnly, Some(writerSchema))
new ContextCollection(seq, metadata)
}
}

object AccumuloLayerCollectionReader {
def apply(attributeStore: AccumuloAttributeStore)(implicit instance: AccumuloInstance): AccumuloLayerCollectionReader =
new AccumuloLayerCollectionReader(attributeStore)

def apply(implicit instance: AccumuloInstance): AccumuloLayerCollectionReader =
new AccumuloLayerCollectionReader(AccumuloAttributeStore(instance.connector))
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ class AccumuloSpaceTimeSpec
implicit lazy val instance = MockAccumuloInstance()

lazy val reader = AccumuloLayerReader(instance)
lazy val writer = AccumuloLayerWriter(instance, "tiles", SocketWriteStrategy())
lazy val creader = AccumuloLayerCollectionReader(instance)
lazy val writer = AccumuloLayerWriter(instance, "tiles", SocketWriteStrategy())
lazy val deleter = AccumuloLayerDeleter(instance)
lazy val reindexer = AccumuloLayerReindexer(instance, SocketWriteStrategy())
lazy val updater = AccumuloLayerUpdater(instance, SocketWriteStrategy())
lazy val tiles = AccumuloValueReader(instance)
lazy val sample = CoordinateSpaceTime
lazy val copier = AccumuloLayerCopier(instance, reader, writer)
lazy val mover = AccumuloLayerMover(copier, deleter)
lazy val copier = AccumuloLayerCopier(instance, reader, writer)
lazy val mover = AccumuloLayerMover(copier, deleter)
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ class AccumuloSpatialSpec

implicit lazy val instance = MockAccumuloInstance()

lazy val reader = AccumuloLayerReader(instance)
lazy val writer = AccumuloLayerWriter(instance, "tiles", SocketWriteStrategy())
lazy val deleter = AccumuloLayerDeleter(instance)
lazy val reader = AccumuloLayerReader(instance)
lazy val creader = AccumuloLayerCollectionReader(instance)
lazy val writer = AccumuloLayerWriter(instance, "tiles", SocketWriteStrategy())
lazy val deleter = AccumuloLayerDeleter(instance)
lazy val reindexer = AccumuloLayerReindexer(instance, SocketWriteStrategy())
lazy val updater = AccumuloLayerUpdater(instance, SocketWriteStrategy())
lazy val tiles = AccumuloValueReader(instance)
lazy val sample = AllOnesTestFile

lazy val copier = AccumuloLayerCopier(instance, reader, writer)
lazy val mover = AccumuloLayerMover(copier, deleter)
lazy val tiles = AccumuloValueReader(instance)
lazy val sample = AllOnesTestFile
lazy val copier = AccumuloLayerCopier(instance, reader, writer)
lazy val mover = AccumuloLayerMover(copier, deleter)
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ class AccumuloTileFeatureSpaceTimeSpec
implicit lazy val instance = MockAccumuloInstance()

lazy val reader = AccumuloLayerReader(instance)
lazy val writer = AccumuloLayerWriter(instance, "tiles", SocketWriteStrategy())
lazy val creader = AccumuloLayerCollectionReader(instance)
lazy val writer = AccumuloLayerWriter(instance, "tiles", SocketWriteStrategy())
lazy val deleter = AccumuloLayerDeleter(instance)
lazy val reindexer = AccumuloLayerReindexer(instance, SocketWriteStrategy())
lazy val updater = AccumuloLayerUpdater(instance, SocketWriteStrategy())
lazy val tiles = AccumuloValueReader(instance)
lazy val sample = CoordinateSpaceTime
lazy val copier = AccumuloLayerCopier(instance, reader, writer)
lazy val mover = AccumuloLayerMover(copier, deleter)
lazy val copier = AccumuloLayerCopier(instance, reader, writer)
lazy val mover = AccumuloLayerMover(copier, deleter)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ class AccumuloTileFeatureSpatialSpec

implicit lazy val instance = MockAccumuloInstance()

lazy val reader = AccumuloLayerReader(instance)
lazy val writer = AccumuloLayerWriter(instance, "tiles", SocketWriteStrategy())
lazy val deleter = AccumuloLayerDeleter(instance)
lazy val reader = AccumuloLayerReader(instance)
lazy val creader = AccumuloLayerCollectionReader(instance)
lazy val writer = AccumuloLayerWriter(instance, "tiles", SocketWriteStrategy())
lazy val deleter = AccumuloLayerDeleter(instance)
lazy val reindexer = AccumuloLayerReindexer(instance, SocketWriteStrategy())
lazy val updater = AccumuloLayerUpdater(instance, SocketWriteStrategy())
lazy val tiles = AccumuloValueReader(instance)
lazy val sample = AllOnesTestFile

lazy val copier = AccumuloLayerCopier(instance, reader, writer)
lazy val mover = AccumuloLayerMover(copier, deleter)
lazy val tiles = AccumuloValueReader(instance)
lazy val sample = AllOnesTestFile
lazy val copier = AccumuloLayerCopier(instance, reader, writer)
lazy val mover = AccumuloLayerMover(copier, deleter)
}
3 changes: 3 additions & 0 deletions cassandra/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,7 @@ geotrellis.cassandra {
localDc = "datacenter1"
usedHostsPerRemoteDc = 0
allowRemoteDCsForLocalConsistencyLevel = false
threads = {
collection.read = 32
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package geotrellis.spark.io.cassandra

import geotrellis.spark.{Boundable, KeyBounds, LayerId}
import geotrellis.spark.io.CollectionLayerReader
import geotrellis.spark.io.avro.codecs.KeyValueRecordCodec
import geotrellis.spark.io.avro.{AvroEncoder, AvroRecordCodec}
import geotrellis.spark.io.index.{IndexRanges, MergeQueue}
import geotrellis.spark.util.KryoWrapper

import org.apache.avro.Schema
import com.datastax.driver.core.querybuilder.QueryBuilder
import com.datastax.driver.core.querybuilder.QueryBuilder.{eq => eqs}
import com.typesafe.config.ConfigFactory
import scalaz.std.vector._
import scalaz.concurrent.{Strategy, Task}
import scalaz.stream.{Process, nondeterminism}

import java.util.concurrent.Executors
import scala.collection.JavaConversions._
import scala.reflect.ClassTag

object CassandraCollectionReader {
def read[K: Boundable : AvroRecordCodec : ClassTag, V: AvroRecordCodec : ClassTag](
instance: CassandraInstance,
keyspace: String,
table: String,
layerId: LayerId,
queryKeyBounds: Seq[KeyBounds[K]],
decomposeBounds: KeyBounds[K] => Seq[(Long, Long)],
filterIndexOnly: Boolean,
writerSchema: Option[Schema] = None,
numPartitions: Option[Int] = None,
threads: Int = ConfigFactory.load().getInt("geotrellis.cassandra.threads.collection.read")
): Seq[(K, V)] = {
if (queryKeyBounds.isEmpty) return Seq.empty[(K, V)]

val includeKey = (key: K) => queryKeyBounds.includeKey(key)
val _recordCodec = KeyValueRecordCodec[K, V]
val kwWriterSchema = KryoWrapper(writerSchema) //Avro Schema is not Serializable

val ranges = if (queryKeyBounds.length > 1)
MergeQueue(queryKeyBounds.flatMap(decomposeBounds))
else
queryKeyBounds.flatMap(decomposeBounds)

val bins = IndexRanges.bin(ranges, numPartitions.getOrElse(CollectionLayerReader.defaultNumPartitions)).toVector.map(_.toIterator)

val query = QueryBuilder.select("value")
.from(keyspace, table)
.where(eqs("key", QueryBuilder.bindMarker()))
.and(eqs("name", layerId.name))
.and(eqs("zoom", layerId.zoom))
.toString

val pool = Executors.newFixedThreadPool(threads)

val result = instance.withSessionDo { session =>
val statement = session.prepare(query)

bins flatMap { partition =>
val range: Process[Task, Iterator[Long]] = Process.unfold(partition) { iter =>
if (iter.hasNext) {
val (start, end) = iter.next()
Some((start to end).toIterator, iter)
}
else None
}

val read: Iterator[Long] => Process[Task, Vector[(K, V)]] = { iterator =>
Process.unfold(iterator) { iter =>
if (iter.hasNext) {
val index = iter.next()
val row = session.execute(statement.bind(index.asInstanceOf[java.lang.Long]))
if (row.nonEmpty) {
val bytes = row.one().getBytes("value").array()
val recs = AvroEncoder.fromBinary(kwWriterSchema.value.getOrElse(_recordCodec.schema), bytes)(_recordCodec)
if (filterIndexOnly) Some(recs, iter)
else Some(recs.filter { row => includeKey(row._1) }, iter)
} else Some(Vector.empty, iter)
} else {
None
}
}
}

nondeterminism.njoin(maxOpen = threads, maxQueued = threads) { range map read }(Strategy.Executor(pool)).runFoldMap(identity).unsafePerformSync
}
}

pool.shutdown(); result
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package geotrellis.spark.io.cassandra

import geotrellis.spark._
import geotrellis.spark.io._
import geotrellis.spark.io.avro._
import geotrellis.util._

import spray.json._

import scala.reflect._

class CassandraLayerCollectionReader(val attributeStore: AttributeStore, instance: CassandraInstance) extends CollectionLayerReader[LayerId] {

def read[
K: AvroRecordCodec: Boundable: JsonFormat: ClassTag,
V: AvroRecordCodec: ClassTag,
M: JsonFormat: GetComponent[?, Bounds[K]]
](id: LayerId, rasterQuery: LayerQuery[K, M], numPartitions: Int, filterIndexOnly: Boolean) = {
if (!attributeStore.layerExists(id)) throw new LayerNotFoundError(id)

val LayerAttributes(header, metadata, keyIndex, writerSchema) = try {
attributeStore.readLayerAttributes[CassandraLayerHeader, M, K](id)
} catch {
case e: AttributeNotFoundError => throw new LayerReadError(id).initCause(e)
}

val queryKeyBounds = rasterQuery(metadata)

val decompose = (bounds: KeyBounds[K]) => keyIndex.indexRanges(bounds)

val seq = CassandraCollectionReader.read[K, V](instance, header.keyspace, header.tileTable, id, queryKeyBounds, decompose, filterIndexOnly, Some(writerSchema))
new ContextCollection(seq, metadata)
}
}

object CassandraLayerCollectionReader {
def apply(instance: CassandraInstance): CassandraLayerCollectionReader =
new CassandraLayerCollectionReader(CassandraAttributeStore(instance), instance)

def apply(attributeStore: CassandraAttributeStore): CassandraLayerCollectionReader =
new CassandraLayerCollectionReader(attributeStore, attributeStore.instance)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class CassandraSpaceTimeSpec
lazy val attributeStore = CassandraAttributeStore(instance)

lazy val reader = CassandraLayerReader(attributeStore)
lazy val creader = CassandraLayerCollectionReader(attributeStore)
lazy val writer = CassandraLayerWriter(attributeStore, "geotrellis", "tiles")
lazy val deleter = CassandraLayerDeleter(attributeStore)
lazy val updater = CassandraLayerUpdater(attributeStore)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class CassandraSpatialSpec
lazy val attributeStore = CassandraAttributeStore(instance)

lazy val reader = CassandraLayerReader(attributeStore)
lazy val creader = CassandraLayerCollectionReader(attributeStore)
lazy val writer = CassandraLayerWriter(attributeStore, "geotrellis", "tiles")
lazy val deleter = CassandraLayerDeleter(attributeStore)
lazy val updater = CassandraLayerUpdater(attributeStore)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class CassandraTileFeatureSpaceTimeSpec
lazy val attributeStore = CassandraAttributeStore(instance, "geotrellis_tf", "metadata")

lazy val reader = CassandraLayerReader(attributeStore)
lazy val creader = CassandraLayerCollectionReader(attributeStore)
lazy val writer = CassandraLayerWriter(attributeStore, "geotrellis_tf", "tiles")
lazy val deleter = CassandraLayerDeleter(attributeStore)
lazy val updater = CassandraLayerUpdater(attributeStore)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class CassandraTileFeatureSpatialSpec
lazy val attributeStore = CassandraAttributeStore(instance, "geotrellis_tf", "metadata")

lazy val reader = CassandraLayerReader(attributeStore)
lazy val creader = CassandraLayerCollectionReader(attributeStore)
lazy val writer = CassandraLayerWriter(attributeStore, "geotrellis_tf", "tiles")
lazy val deleter = CassandraLayerDeleter(attributeStore)
lazy val updater = CassandraLayerUpdater(attributeStore)
Expand Down
Loading