Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ETL Refactor #1553

Merged
merged 22 commits into from
Aug 19, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
233 changes: 179 additions & 54 deletions docs/spark-etl/spark-etl-intro.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,24 @@ object GeoTrellisETL {
implicit val sc = SparkUtils.createSparkContext("GeoTrellis ETL", new SparkConf(true))

try {
/* parse command line arguments */
val etl = Etl(args)
/* load source tiles using input module specified */
val sourceTiles = etl.load[ProjectedExtent, Tile]
/* perform the reprojection and mosaicing step to fit tiles to LayoutScheme specified */
val (zoom, tiled) = etl.tile(sourceTiles)
/* save and optionally pyramid the mosaiced layer */
etl.save(LayerId(etl.conf.layerName(), zoom), tiled, ZCurveKeyIndexMethod)
EtlConf(args) foreach { conf =>
/* parse command line arguments */
val etl = Etl(conf, modules)
/* load source tiles using input module specified */
val sourceTiles = etl.load[I, V]
/* perform the reprojection and mosaicing step to fit tiles to LayoutScheme specified */
val (zoom, tiled) = etl.tile(sourceTiles)
/* save and optionally pyramid the mosaiced layer */
etl.save[K, V](LayerId(etl.input.name, zoom), tiled)
}
} finally {
sc.stop()
}
}
}
```

### User defined ETL jobs
### User defined ETL configs

The above sample application can be placed in a new SBT project that
has a dependency on `"com.azavea.geotrellis" %% "geotrellis-spark-etl" % s"$VERSION"`
Expand All @@ -56,9 +58,9 @@ with `sbt-assembly` plugin. You should be careful to include a
[spark-etl build file](build.sbt).

At this point you would create a seperate `App` object for each one
of your ETL jobs.
of your ETL configs.

### Build-in ETL jobs
### Build-in ETL configs

For convinence and as an example the `spark-etl` project provides two
`App` objects that perform vanilla ETL:
Expand Down Expand Up @@ -86,16 +88,16 @@ inputs can be setup through command line arguments like so:

```bash
#!/bin/sh
export JAR="geotrellis-etl-assembly-0.10-SNAPSHOT.jar"
export JAR="geotrellis-etl-assembly-1.0.0-SNAPSHOT.jar"

spark-submit \
--class geotrellis.spark.etl.SinglebandIngest \
--master local[*] \
--driver-memory 2G \
$JAR \
--input hadoop --format geotiff --cache NONE -I path="file:///Data/nlcd/tiles" \
--output s3 -O bucket=com.azavea.datahub key=catalog \
--layer nlcd-tms --crs EPSG:3857 --pyramid --layoutScheme tms
--backend-profiles "file://backend-profiles.json" \
--input "file://input.json" \
--output "file://output.json"
```

Note that the arguments before the `$JAR` configure `SparkContext`
Expand All @@ -107,53 +109,164 @@ and arguments after configure GeoTrellis ETL inputs and outputs.

### Command Line Arguments

Option | Description
------------- | -------------
input | Name of input module to use (ex: s3, hadoop)
format | Format of the tile files to be read (ex: geotiff)
inputProps | List of `key=value` pairs that will be passed to the input module as configuration
cache | Spark RDD storage level to be used for caching (default: MEMORY_AND_DISK_SER)
layerName | Layer name to provide as result of the input
crs | Desired CRS for input layer. May trigger raster reprojection. (ex: EPSG:3857")
tileSize | Pixel height and width of each tile in the input layer
layoutScheme | Scheme to be used to determine raster resolution and extent (ex: tms, floating)
layoutExtent | Explicit alternative to use of `layoutScheme` (format: xmin,ymin,xmax,ymax)
reproject | Reproject method to use during tiling (ex: buffered or per-tile)
cellSize | Width and Height of each pixel (format: width,height)
cellType | Value of type of the target raster (ex: bool, int8, int32, int64, float32, float64)
output | Name of output module to use (ex: s3, hadoop, accumulo)
outputProps | List of `key=value` pairs that will be passed to the output module as configuration
pyramid | Pyramid the layer on save starting from current zoom level to zoom level 1

#### Supported Inputs
Option | Description
-----------------| -------------
backend-profiles | Path to a json file (local fs / hdfs) with credentials for ingest datasets (required field)
input | Path to a json file (local fs / hdfs) with datasets to ingest, with optional credentials
output | Path to a json file (local fs / hdfs) with output backend params to ingest, with optional credentials

#### Backend profiles JSON description

```json
{
"backend-profiles": [{
"name": "accumulo-name",
"type": "accumulo",
"zookeepers": "zookeepers",
"instance": "instance",
"user": "user",
"password": "password"
},
{
"name": "cassandra-name",
"type": "cassandra",
"allowRemoteDCsForLocalConsistencyLevel": false,
"localDc": "datacenter1",
"usedHostsPerRemoteDc": 0,
"hosts": "hosts",
"replicationStrategy": "SimpleStrategy",
"replicationFactor": 1,
"user": "user",
"password": "password"
}]
}
```

Input | Options
----------|----------------
hadoop | path (local path / hdfs)
s3 | bucket, key, partitionCount, partitionBytes
Sets of *named* profiles for each backend.

#### Output JSON description

```json
{
"backend":{
"type":"accumulo",
"path":"output",
"profile":"accumulo-name"
},
"breaks":"0:ffffe5ff;0.1:f7fcb9ff;0.2:d9f0a3ff;0.3:addd8eff;0.4:78c679ff;0.5:41ab5dff;0.6:238443ff;0.7:006837ff;1:004529ff",
"reprojectMethod":"buffered",
"cellSize":{
"width":256.0,
"height":256.0
},
"encoding":"geotiff",
"tileSize":256,
"layoutExtent":{
"xmin":1.0,
"ymin":2.0,
"xmax":3.0,
"ymax":4.0
},
"resolutionThreshold":0.1,
"pyramid":true,
"resampleMethod":"nearest-neighbor",
"keyIndexMethod":{
"type":"zorder"
},
"layoutScheme":"zoomed",
"cellType":"int8",
"crs":"EPSG:3857"
}
```

#### Supported Outputs
Key | Value
--------------------|----------------
backend | backend description is presented below
breaks | breaks string for `render` output (optional field)
partitions | partitions number during pyramid build
reprojectMethod | buffered | per-tile
cellSize | sell size
encoding | png | geotiff for `render` output
tileSize | tile size (optional field)
layoutExtent | layout extent (optional field)
resolutionThreshold | resolution for user defined Layout Scheme (optional field)
pyramid | true | false - ingest with / with out building pyramid
resampleMethod | nearest-neighbor | bilinear | cubic-convolution | cubic-spline | lanczos
keyIndexMethod | key index method (zorder | row-major | hilbert)
layoutScheme | tms | floating (optional field) (optional field)
cellType | int8 | int16 | etc... (optional field)
crs | destination crs name (example: EPSG:3857) (optional field)


##### Backend JSON description

Key | Value
-----------------|----------------
type | Input backend type (file / hadoop / s3 / accumulo / cassandra)
path | Input path (local path / hdfs), or s3:// url
profile | Profile name to use for input

Output | Options
----------|----------------
hadoop | path
accumulo | instance, zookeeper, user, password, table, strategy={hdfs|socket}, ingestPath
s3 | bucket, key
render | path, encoding=(`geotiff` or `png`), breaks='{limit}:{RGBA};{limit}:{RGBA};...'
###### Supported Layout Schemes

#### Supported Formats
Layout Scheme | Options
-----------------|----------------
zoomed | zoomed layout scheme
floating | floating layout scheme in a native projection

Format | Options
###### KeyIndexMethod JSON description

Key | Options
-------------------|----------------
type | zorder | row-major | hilbert
temporalResolution | temporal resolution for temporal indexing (optional field)
timeTag | time tag name for input geotiff tiles (optional field)
timeFormat | time format to parse time stored in time tag geotiff tag (optional field)

#### Input JSON description

```json
[{
"format": "geotiff",
"name": "test",
"cache": "NONE",
"noData": 0.0,
"backend": {
"type": "hadoop",
"path": "input"
}
}]
```

Key | Value
-----------------|----------------
geotiff | spatial ingest
temporal-geotiff | temporal ingest
format | Format of the tile files to be read (ex: geotiff)
name | Input dataset name
cache | Spark RDD cache strategy
noData | NoData value

#### Supported Layout Schemes
###### Supported Formats

Layout Scheme | Options
Format | Options
-----------------|----------------
tms | zoomed layout scheme
floating | floating layout scheme in a native projection
geotiff | Spatial ingest
temporal-geotiff | Temporal ingest

###### Supported Inputs

Input | Options
----------|----------------
hadoop | path (local path / hdfs)
s3 | s3:// url

###### Supported Outputs

Output | Options
----------|----------------
hadoop | path
accumulo | table name
cassandra | table name with keysapce (keyspace.tablename)
s3 | s3:// url
render | path

##### Accumulo Output

Expand Down Expand Up @@ -217,11 +330,23 @@ The `path` module argument is actually a path template, that allows the followin
- `{name}` layer name

A sample render output configuration template could be:
`--output render -O encodering=png path=s3://tms-bucket/layers/{name}/{z}-{x}-{y}.png`.
```json
"path": "s3://tms-bucket/layers/{name}/{z}-{x}-{y}.png",
"ingestType":{
"format":"geotiff",
"output":"render"
}
```

## Extension

In order to provide your own input or output modules you must extend
[`InputPlugin`](src/main/scala/geotrellis/spark/etl/InputPlugin) and
[`OutputPlugin`](src/main/scala/geotrellis/spark/etl/OutputPlugin)
and register them in the `Etl` constructor via a `TypedModule`.

## Input JSON validation schema

* [--backend-profiles](/spark-etl/src/main/resources/backend-profiles-schema.json)
* [--input](/spark-etl/src/main/resources/input-schema.json)
* [--output](/spark-etl/src/main/resources/output-schema.json)
Loading