An application is a container that provides everything required to execute a Spark job. The configuration is stored as JSON which will be parsed when the spark-submit command invokes the main driver class.
When invoking the spark-submit command, two things are required in order to run a custom application, a jar file and
a class. Metalus provides a self contained jar that may be used for the jar file and a default
class is provided: com.acxiom.pipeline.drivers.DefaultPipelineDriver
The DefaultPipelineDriver class requires an additional parameter named driverSetupClass
This is an extension point where developers may customize how applications are loaded.
Sometimes an application will need to be restarted without having to run all of the executions again. A parameter named root-executions is a comma separated list of one or more execution ids to start processing. This will override the default behavior of identifying the root executions. Note: Application designers should ensure that any required state is saved off and executions can intelligently read this state when restarts happeen.
The Metalus application framework provides a default implementation that may be used:
com.acxiom.pipeline.applications.DefaultApplicationDriverSetup
Developers wishing to provide custom application driver setup may extend this trait:
com.acxiom.pipeline.applications.ApplicationDriverSetup
This driver setup class will load an application JSON configuration and create the execution plan used by the DefaultPipelineDriver class to execute the Spark job. The following parameters are required:
- applicationId - This is the name of a JSON file stored in the metadata/applications folder on the classpath resources. The '.json' extension is assumed and should be left off of the parameter.
- applicationJson - This parameter is useful when passing the contents of the configuration as a string. This is not a recommended option since large configurations could have issues.
- applicationConfigPath - This is the path to the configuration JSON. This may be a path on a file system or a URL to a REST API.
- applicationConfigurationLoader - This optional parameter is used when the applicationConfigPath is not a URL. A FileManager implementation will be used to load files for a given path. By default the local file system will be checked. An HDFS implementation is also provided in the core project and an S3 implementation is provided by the metlaus-aws project. The value must be a fully qualified class name. All command line parameters will be used to try and initialize the file manager.
- authorization.class - This is used to specify an authorization implementation when the applicationConfigPath is a URL. Any parameters required to instantiate the Authorization instance need to be prefixed with authorization. so they are identified and passed in. To use basic authorization, consider this class signature:
This implementation overrides the default implementation provided by DriverSetup to use the application globals object. This allows developers the opportunity to override the default implementation using the credential-provider global. The does not credential-provider may still be passed on the command line. The main purpose of this override is to allow credentials to be embedded in the application JSON instead of being passed on the command line.
There are a number of sections that may be used to customize an application.
A special structure is used when overriding default classes such as:
- pipelineListener - The main pipeline listener that will be notified during processing. If the PipelineListener also implement the SparkListener interface it will be registered with Spark.
- sparkListeners - A list of classes that implement the SparkListener interface to register with the Spark session.
- securityManager - The PipelineSecurityManager that is used when mapping parameters.
- stepMapper - The PipelineStepMapper to perform mapping prior to execution of pipeline steps.
- pipelineManager - Performs lookups based on pipeline ids. Parameters for the PipelineManager will attempt to parse objects and lists of objects.
- sparkUdfs - A list of UDF classes to register with the Spark session.
- json4sSerializers - Contains lists of custom serializer and enum serializer classes used by json4s.
Using any of the options listed requires a JSON object that contains two parameters:
- className - The fully qualified class name to load
- parameters - This is an object that contains attributes where the name matches the parameters of the constructor of the class. The parameters may also embed objects/list of objects using this same pattern.
The application properties section provides a location to place data that may need to be used when setting up the execution plan. These properties will not be available during execution. The default implementation does not use these properties.
The globals object will be seeded with any parameters that have been provided after the jar file. These parameters need to
begin with two dashes (--) followed by a name , a space and the value. The library does not support parameters without
values. Example: --myparam someValue --anotherparam anotherValue
Any globals added in the application configuration will be added to the seed globals and made available to te executions. Globals may be strings, numbers, arrays or objects.
Globals attributes may contain JSON objects which will be converted into a Scala map unless the object contains two attributes named className and object. The two attributes are only required for the top level object. Any objects embedded will automatically get picked up as long as they are defined in the main object case class. By default, library will attempt to instantiate case classes with the values stored in the object attribute. To serialize more complicated objects with traits and Enumerations, custom serializers can be provided using the json4sSerializers object. More on custom serializers can be found here.
In addition to className and object, the mapEmbeddedVariables attribute in the object will indicate that the map should parse for embedded values using the command line parameters. These values should start with thee ! character.
Values of a global entry may be an array of any of the supported types except array. When embedding objects, refer to the object section above.
{
"globals": {
"number": 1,
"float": 1.5,
"string": "some string",
"mappedObject": {
"className": "com.acxiom.pipeline.applications.TestGlobalObject",
"object": {
"name": "Global Mapped Object",
"subObjects": [
{
"name": "Sub Object 1"
},
{
"name": "Sub Object 2"
}
]
}
}
}
}
The step packages array provides a list of strings that are used when search for step functions. Any function that is used and not in one of the listed packages will cause an error.
The required parameters array is a list of strings containing the names of all application parameters that are expected to be provided to the command line. Any parameters not present will cause an exception to be thrown and the submission to fail.
{
"requiredParameters": [
"param1",
"param2"
]
}
The sparkConf section is a JSON object that allows developers to specify values that can be set on the SparkConf. Two attributes are supported:
This attribute is an array of string representing classes that should be serialized. Leaving this empty will use the default classes:
- "org.apache.hadoop.io.LongWritable",
- "org.apache.http.client.entity.UrlEncodedFormEntity"
This array allows the developer to specify options to be passed to the SparkConf when it is created. Each object in the array needs to have a name attribute of the SparkConf option and a value attribute to pass to the setOption call of the SparkConf. Both values must be a string.
"sparkConf": {
"kryoClasses": [
"org.apache.hadoop.io.LongWritable",
"org.apache.http.client.entity.UrlEncodedFormEntity"
],
"setOptions": [
{
"name": "spark.hadoop.io.compression.codecs",
"value": "org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.DeflateCodec,org.apache.hadoop.io.compress.SnappyCodec,org.apache.hadoop.io.compress.GzipCodec"
}
]
}
This section allows the developer to optionally establish runtime parameters for each pipeline.
"pipelineParameters": {
"parameters":[
{
"pipelineId": "Pipeline2",
"parameters": {
"howard": "johnson"
}
}
]
}
The pipelines section should contain all pipelines that may be used by the executions. Executions may override pipelines directly, but it is advised to place all pipelines in this section and use the pipelineIds array in the execution which will invoke the PipelineManager to fetch the pipeline.
Note: When overriding the PipelineManager, it is advised that executions use pipelineIds and not rely on the pipelines array.
The executions array contains definitions for how pipelines should be executed as well as dependencies. In addition to the execution instructions, each execution has the ability to override the classes and settings defined in the application configuration as well as override default settings.
Below is a list of configuration items:
- id - A an id that is unique with this application configuration file.
- pipelineIds - A list of pipeline ids to execute in provided order. If the pipeline is not part of the pipelines array then the application pipelines will be checked and then finally the classpath will be checked.
- initialPipelineId - Which pipeline should the processing begin with.
- parents - An array containing the _id_s of executions that this execution is dependent upon.
- evaluationPipelines - An optional array of pipelines to run to determine if the execution should run, stop or skip. These pipelines will be executed before the pipelines. When the pipelines result in a SKIP, then the main pipelines will be skipped, but children will be executed. A new exception SkipExecutionPipelineStepException and step throwSkipExecutionException have been created to make it easier to control this behavior.
- evaluationPipelineIds - An optional array of pipelines ids to run prior to executing the main pipelines. This is an alternate to the evaluationPipelines array.
- executionType - An optional type. Default is pipeline. fork and join are also options.
- forkByValues - An optional mapping that will be applied to globals to identify a list of values that should be processed in parallel. This attribute is only used when executionType is set to fork.
Below is a list of overrides:
- globals - The develop may choose to define custom globals for an execution that differs from the application level or using the mergeGlobals boolean attribute, merge these globals with the application globals.
- pipelines - Define which pipelines to execute. This will override the application level pipelines with the same id.
- pipelineListener - Override the application defined or default PipelineListener.
- sparkListeners - Add additional _SparkListener_s for this execution.
- securityManager - Override the application defined or default PipelineSecurityManager.
- stepMapper - Override the application defined or default PipelineStepMapper.
- pipelineParameters - Override the application defined or default PipelineParameters.