Skip to content
This repository has been archived by the owner on Jan 27, 2020. It is now read-only.

DistCp Transformations

Utz Westermann edited this page Aug 4, 2017 · 6 revisions

Summary

Schedoscope includes a way to leverage DistCp for view materialization. The DistCp transformation starts a DistCp job that copies files from diverse sources into a view's fullPath. As such, they are particularily suitable in the staging areas of a data warehouse.

Syntax

case class DistCpTransformation(v: View,
                            var sources: List[String],
                            var target: String,
                            deleteViewPath: Boolean = false,
                            config: Configuration = new Configuration())

Description

The DistCp transformation copies source files and folders matching a GLOB pattern to a target path.

  • v: The view using the transformation.

  • sources: A list of source files / folders.

  • target: Target folder.

  • deleteViewPath: Deletes the fullPath of the view before copying.

  • config: Configuration for the MapReduce job. Can be left at default for most cases.

For detailed information regarding DistCp check out the official documentation: here.

Helpers

DistCp's handling of destination and target paths is a little bit unconventional. So the DistCp transformation has the following helpers:

  • copyToView(sourceView: View, targetView: View): Will copy the content of the fullPath from the sourceView to the fullPath of the target view.

  • copyToDirToView(sourcePath: String, targetView: View): Will copy the content of the sourcePath folder to the fullPath of the target view.

  • copyToFileToView(sourceFile: String, targetView: View): Will copy the sourceFile to the fullPath of the target view.

Example:

val product = dependsOn(() => Product(shopCode, year, month, day))

transformVia(() => DistCpTransformation.copyToView(product(), this))

Configuration

The behavior of DistCp is highly configurable. To expose all the available options, Schedoscope includes the DistConfiguration class.

val conf = DistCpConfiguration()
conf.maxMaps = 20
conf.atomicCommit = true
transformVia(() => DistCpTransformation.copyToView(product(), this)
  .configureWith(conf))

The class has the following options:

Option Description
sourcePaths List of source paths. Setting this will overwrite the sources parameter of DistCpTransformation.
targetPath Target path. Setting this will overwrite the target parameter of DistCpTransformation.
atomicCommit Enable atomic commit. Data will either be available at final target in a complete and consistent form, or not at all.
update Set if source and target folder contents be sync'ed up.
deleteMissing Delete the files existing in the dst but not in src.
ignoreFailures Set if failures during copy be ignored.
overwrite Overwrite folders/files at destination.
skipCRC Whether to skip CRC checks between source and target paths.
blocking Set if Disctp should run blocking or non-blocking
useDiff

Use snapshot diff report between given two snapshots to identify the difference between source and target, and apply the diff to the target to make it in sync with source.

This option is valid only with update option and the following conditions should be satisfied.

  • Both the source and the target FileSystem must be DistributedFileSystem.
  • Two snapshots fromSnapshot and toSnapshot have been created on the source FS, and fromSnapshot is older than toSnapshot.
  • The target has the same snapshot fromSnapshot. No changes have been made on the target since fromSnapshot was created, thus fromSnapshot has the same content as the current state of the target. All the files/directories in the target are the same with source’s fromSnapshot.
useRDiff

Use snapshot diff report between given two snapshots to identify what has been changed on the target since the snapshot fromSnapshot was created on the target, and apply the diff reversely to the target, and copy modified files from the source’s fromSnapshot, to make the target the same as fromSnapshot.

This option is valid only with syncFolder option and the following conditions should be satisfied.

  • Both the source and the target FileSystem must be DistributedFileSystem. The source and the target can be two different clusters/paths, or they can be exactly the same cluster/path. In the latter case, modified files are copied from target’s to target’s current state).
  • Two snapshots and have been created on the target FS, and is older than . No change has been made on target since was created on the target.
  • The source has the same snapshot , which has the same content as the on the target. All the files/directories in the target’s are the same with source’s .
numListstatusThreads Set the number of threads to use for listStatus. We allow max 40 threads. Setting numThreads to zero signify we should use the value from conf properties.
maxMaps Set the max number of mappers to use for copy.
mapBandwidth Specify bandwidth per map, in MB/second.
sslConfigurationFile Set the SSL configuration file path to use with hftps:// (local path).
copyStrategy Set the copy strategy to use. Should map to a strategy implementation in distp-default.xml.
preserveStatus A set of file attributes that need to be preserved.
preserveRawXattrs Indicate that raw.* xattrs should be preserved.
atomicWorkPath Set the tmp folder for atomic commit.
logPath Set the log path where distcp output logs are stored. Uses JobStagingDir/_logs by default.
sourceFileListing File containing list of source paths. This will overwrite sourcePaths.
filtersFile The path to a list of patterns to exclude from copy.
append Set if we want to append new data to target files. This is valid only with syncFolder option and CRC is not skipped.
fromSnapshot Set the old snapshot folder for useDiff/useRdiff
toSnapshot Set the new snapshot folder for useDiff/useRdiff

Examples

An example of using DistCp to copy a file into a view's fullPath:

transformVia(() => DistCpTransformation.copyToView("/hdp/prod/stage/input.csv", this))

An example of using DistCp to copy the underlying data of a dependency into a view's fullPath:

val product = dependsOn(() => Product(shopCode, year, month, day))

transformVia(() => DistCpTransformation.copyToView(product(), this))

Packaging and Deployment

DistCp is shipped within the Hadoop core; the DistCp transformation is part of the Schedoscope core as well.

Change detection

Schedoscope tries to automatically detect changes to DistCp transformation-based views and to initiate rematerialization of views if the tranformation logic has potentially changed. For DistCp transformations, this checksum is based on the sources and target paths.

Clone this wiki locally