Skip to content

Commit

Permalink
Merge pull request #262 from Acxiom/develop
Browse files Browse the repository at this point in the history
Release 1.8.3
  • Loading branch information
dafreels authored Oct 16, 2021
2 parents f9d59f0 + efd0354 commit cac3d2a
Show file tree
Hide file tree
Showing 96 changed files with 3,513 additions and 394 deletions.
36 changes: 5 additions & 31 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ jobs:
echo 'METALUS_VERSION<<EOF' >> $GITHUB_ENV
mvn -q -Dexec.executable="echo" -Dexec.args='${project.version}' --non-recursive exec:exec >> $GITHUB_ENV
echo 'EOF' >> $GITHUB_ENV
- name: Display Metalus Application Target Directory
run: |
pwd
ls -l metalus-application/target
- name: Upload Application Jar
uses: actions/upload-artifact@v2
with:
Expand Down Expand Up @@ -177,38 +181,8 @@ jobs:
echo 'METALUS_VERSION<<EOF' >> $GITHUB_ENV
mvn -q -Dexec.executable="echo" -Dexec.args='${project.version}' --non-recursive exec:exec >> $GITHUB_ENV
echo 'EOF' >> $GITHUB_ENV
- name: Download Application Jar Spark 2.4
uses: actions/download-artifact@v2
with:
name: metalus-application_2.11-spark_2.4-${{ env.METALUS_VERSION }}.jar
- name: Download Utils Spark 2.4
uses: actions/download-artifact@v2
with:
name: metalus-utils_2.11-spark_2.4-${{ env.METALUS_VERSION }}.tar.gz
- name: Download Application Jar Spark 2.4 Scala 2.12
uses: actions/download-artifact@v2
with:
name: metalus-application_2.12-spark_2.4-${{ env.METALUS_VERSION }}.jar
- name: Download Utils Spark 2.4 Scala 2.12
uses: actions/download-artifact@v2
with:
name: metalus-utils_2.12-spark_2.4-${{ env.METALUS_VERSION }}.tar.gz
- name: Download Application Jar Spark 3.0
uses: actions/download-artifact@v2
with:
name: metalus-application_2.12-spark_3.0-${{ env.METALUS_VERSION }}.jar
- name: Download Utils Spark 3.0
uses: actions/download-artifact@v2
with:
name: metalus-utils_2.12-spark_3.0-${{ env.METALUS_VERSION }}.tar.gz
- name: Download Application Jar Spark 3.1
uses: actions/download-artifact@v2
with:
name: metalus-application_2.12-spark_3.1-${{ env.METALUS_VERSION }}.jar
- name: Download Utils Spark 3.1
- name: Download Artifacts
uses: actions/download-artifact@v2
with:
name: metalus-utils_2.12-spark_3.1-${{ env.METALUS_VERSION }}.tar.gz
- name: Create Github Release
uses: "marvinpinto/action-automatic-releases@latest"
with:
Expand Down
15 changes: 15 additions & 0 deletions docs/connectors.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[Documentation Home](readme.md)

# Connectors
Connectors provide an abstraction for working with data. Connectors are designed to be modeled as JSON to easily include
in [applications](applications.md), [pipelines](pipelines.md) and [steps](steps.md). Each implementation implement
functions that will be used by steps to work with data in a generic manner.

![Connectors](images/Connectors.png)

## File Connectors
File Connectors are designed to operate with a single file. These connectors provide access to [FileManager](filemanager.md)
implementation for the specified file system. More information can be found [here](fileconnectors.md).
## Data Connectors
Data Connectors provide an abstraction for loading and writing data. The provided functions work specifically on DataFrames.
More information can be found [here](dataconnectors.md).
184 changes: 184 additions & 0 deletions docs/dataconnectors.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
[Documentation Home](readme.md) | [Connectors](connectors.md)

# Data Connectors
Data Connectors provide an abstraction for loading and writing data using the [DataConnectorSteps](../metalus-common/docs/dataconnectorsteps.md).
This is useful for creating generic pipelines that can used across providers without source/destination knowledge prior
to runtime. Each connector has the responsibility to load and write a DataFrame based on the underlying system.

**Parameters**
The following parameters are available to all data connectors:

* **name** - The name of the connector
* **credentialName** - The optional credential name to use to authenticate
* **credential** - The optional credential to use to authenticate

## Batch
Connectors that are designed to load and write data for batch processing will extend the _BatchDataConnector_. These
are very straightforward and offer the most reusable components.

### HDFSDataConnector
This connector provides access to HDFS. The _credentialName_ and _credential_ parameters are not used in this implementation,
instead relying on the permissions of the cluster. Below is an example setup:

#### Scala
```scala
val connector = HDFSDataConnector("my-connector", None, None)
```
#### Globals JSON
```json
{
"myConnector": {
"className": "com.acxiom.pipeline.connectors.HDFSDataConnector",
"object": {
"name": "my-connector"
}
}
}
```
### S3DataConnector
This connector provides access to S3. Below is an example setup that expects a secrets manager credential provider:
#### Scala
```scala
val connector = S3DataConnector("my-connector", Some("my-credential-name-for-secrets-manager"), None)
```
#### Globals JSON
```json
{
"myS3Connector": {
"className": "com.acxiom.aws.pipeline.connectors.S3DataConnector",
"object": {
"name": "my-connector",
"credentialName": "my-credential-name-for-secrets-manager"
}
}
}
```
### GCSDataConnector
This connector provides access to GCS. Below is an example setup that expects a secrets manager credential provider:
#### Scala
```scala
val connector = GCSDataConnector("my-connector", Some("my-credential-name-for-secrets-manager"), None)
```
#### Globals JSON
```json
{
"myGCSConnector": {
"className": "com.acxiom.gcp.pipeline.connectors.GCSDataConnector",
"object": {
"name": "my-connector",
"credentialName": "my-credential-name-for-secrets-manager"
}
}
}
```
### BigQueryDataConnector
This connector provides access to BigQuery. Below is an example setup that expects a secrets manager credential provider:
#### Scala
```scala
val connector = BigQueryDataConnector("temp-bucket-name", "my-connector", Some("my-credential-name-for-secrets-manager"), None)
```
#### Globals JSON
```json
{
"bigQueryConnector": {
"className": "com.acxiom.gcp.pipeline.connectors.BigQueryDataConnector",
"object": {
"name": "my-connector",
"credentialName": "my-credential-name-for-secrets-manager",
"tempWriteBucket": "temp-bucket-name"
}
}
}
```
### MongoDataConnector
This connector provides access to Mongo. Security is handled using the uri or a _UserNameCredential_. In addition to
the standard parameters, the following parameters are available:

* **uri** - The name connection URI
* **collectionName** - The name of the collection

#### Scala
```scala
val connector = MongoDataConnector("mongodb://127.0.0.1/test", "myCollectionName", "my-connector", Some("my-credential-name-for-secrets-manager"), None)
```
#### Globals JSON
```json
{
"customMongoConnector": {
"className": "com.acxiom.metalus.pipeline.connectors.MongoDataConnector",
"object": {
"name": "my-connector",
"credentialName": "my-credential-name-for-secrets-manager",
"uri": "mongodb://127.0.0.1/test",
"collectionName": "myCollectionName"
}
}
}
```
## Streaming
Streaming connectors offer a way to use pipelines with [Spark Structured Streaming](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html) without
the need to write new [drivers](pipeline-drivers.md). When designing pipelines for streaming, care must be taken to not
inject steps that are more batch oriented such as doing a file copy.

### KinesisDataConnector
This connector provides access to Kinesis. In addition to the standard parameters, the following parameters are
available:

* **streamName** - The name of the Kinesis stream.
* **region** - The region containing the Kinesis stream
* **partitionKey** - The optional static partition key to use
* **partitionKeyIndex** - The optional field index in the DataFrame row containing the value to use as the partition key
* **separator** - The field separator to use when formatting the row data

Below is an example setup that expects a secrets manager credential provider:
#### Scala
```scala
val connector = KinesisDataConnector("stream-name", "us-east-1", None, Some(15), ",", "my-connector",
Some("my-credential-name-for-secrets-manager"))
```
#### Globals JSON
```json
{
"kinesisConnector": {
"className": "com.acxiom.aws.pipeline.connectors.KinesisDataConnector",
"object": {
"name": "my-connector",
"credentialName": "my-credential-name-for-secrets-manager",
"streamName": "stream-name",
"region": "us-east-1",
"separator": ","
}
}
}
```
### KafkaDataConnector
This connector provides access to Kinesis. In addition to the standard parameters, the following parameters are
available:

* **topics** - The name of the Kinesis stream.
* **kafkaNodes** - The region containing the Kinesis stream
* **key** - The optional static key to use
* **keyField** - The optional field name in the DataFrame row containing the value to use as the key
* **separator** - The field separator to use when formatting the row data
*
Below is an example setup that expects a secrets manager credential provider:
#### Scala
```scala
val connector = KafkaDataConnector("topic-name1,topic-name2", "host1:port1,host2:port2", "message-key", None,
"my-connector", Some("my-credential-name-for-secrets-manager"))
```
#### Globals JSON
```json
{
"kafkaConnector": {
"className": "com.acxiom.kafka.pipeline.connectors.KafkaDataConnector",
"object": {
"name": "my-connector",
"credentialName": "my-credential-name-for-secrets-manager",
"topics": "topic-name1,topic-name2",
"kafkaNodes": "host1:port1,host2:port2",
"key": "message-key"
}
}
}
```
5 changes: 5 additions & 0 deletions docs/error-handling.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ both execute normally and invoke _Error Step_. The _Error Step_ will throw an e
_Error Handler_ step and then complete the pipeline. _Step 4_ will never be executed when there is an exception. Metalus
will consider this as a successful execution.

## Step Retry
A step which defines the _retryLimit_ attribute will be automatically restarted when an exception is thrown. Once the
limit has been reached, the [nextStepOnError](#next-step-error-handling) will be invoked or the exception will
be thrown to stop the pipeline.

![Next Step On Error Flow](images/next_step_on_error_flow.png)
## Pipeline Exceptions
Metalus uses the _PipelineStepException_ trait as the base for application exceptions. Any exception that extends
Expand Down
113 changes: 113 additions & 0 deletions docs/fileconnectors.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
[Documentation Home](readme.md) | [Connectors](connectors.md)

# FileConnectors
File connectors are used to invoke [FileManager](filemanager.md) implementations that can be used with the
[FileManagerSteps](../metalus-common/docs/filemanagersteps.md) object. FileConnectors are used when a pipeline/step
needs to work on a single file without a DataFrame. Most common operations are expected to be the [Copy step](../metalus-common/docs/filemanagersteps.md#copy)
and the [Create FileManager step](../metalus-common/docs/filemanagersteps.md#create-a-filemanager).

**Parameters**
The following parameters are available to all file connectors:

* **name** - The name of the connector
* **credentialName** - The optional credential name to use to authenticate
* **credential** - The optional credential to use to authenticate

## HDFSFileConnector
This connector provides access to the HDFS file system. The _credentialName_ and _credential_ parameters are not used in
this implementation, instead relying on the permissions of the cluster. Below is an example setup:
#### Scala
```scala
val connector = HDFSFileConnector("my-hdfs-connector", None, None)
```
#### Globals JSON
```json
{
"myHdfsConnector": {
"className": "com.acxiom.pipeline.connectors.HDFSFileConnector",
"object": {
"name": "my-connector"
}
}
}
```
## SFTPFileManager
This connector provides access to an SFTP server. In addition to the standard parameters, the following parameters are
available:

* **hostName** - The host name of the SFTP resource
* **port** - The optional SFTP port
* **knownHosts** - The optional file path to the known_hosts file
* **bulkRequests** - The optional number of requests that may be sent at one time
* **config** - Optional config options
* **timeout** - Optional connection timeout

Below is an example setup:
#### Scala
```scala
val connector = SFTPFileConnector("sftp.myhost.com", "my-sftp-connector", None, None)
```
#### Globals JSON
```json
{
"sftpConnector": {
"className": "com.acxiom.pipeline.connectors.SFTPFileConnector",
"object": {
"name": "my-connector",
"hostName": "sftp.myhost.com"
}
}
}
```
## S3FileManager
This connector provides access to the S3 file system. In addition to the standard parameters, the following parameters are
available:

* **region** - The AWS region
* **bucket** - The S3 bucket

Below is an example setup:
#### Scala
```scala
val connector = S3FileConnector("us-east-1", "my-bucket", "my-connector", Some("my-credential-name-for-secrets-manager"), None)
```
#### Globals JSON
```json
{
"connector": {
"className": "com.acxiom.aws.pipeline.connectors.S3FileConnector",
"object": {
"name": "my-connector",
"region": "us-east-1",
"bucket": "my-bucket",
"credentialName": "my-credential-name-for-secrets-manager"
}
}
}
```
## GCSFileManager
This connector provides access to the S3 file system. In addition to the standard parameters, the following parameters are
available:

* **projectId** - The project id of the GCS project
* **bucket** - The name of the GCS bucket

Below is an example setup:
#### Scala
```scala
val connector = GCSFileConnector("my-dev-project", "my-bucket", "my-connector", Some("my-credential-name-for-secrets-manager"), None)
```
#### Globals JSON
```json
{
"connector": {
"className": "com.acxiom.gcp.pipeline.connectors.GCSFileConnector",
"object": {
"name": "my-connector",
"projectId": "my-dev-project",
"bucket": "my-bucket",
"credentialName": "my-credential-name-for-secrets-manager"
}
}
}
```
Binary file added docs/images/Connectors.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
5 changes: 5 additions & 0 deletions docs/pipeline-steps.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ The pipeline step will include a new attribute named **nextStepId** which indica
that should be executed. Branch steps will not have this attribute since the next step id is determined by the **params**
array **result** type parameters.

### retryLimit
The pipeline step has an optional attribute named **retryLimit** which will retry the step if an exception is thrown.
Once the limit has been reached, the **nextStepOnError** will be called if it has been defined or the exception will
be thrown to stop the pipeline.

### params
At a minimum, the parameter from the step template should be replicated here. Required parameters will have to additionally
set the **value** attribute unless the **defaultValue** attribute is provided. Additional attributes will be required
Expand Down
Loading

0 comments on commit cac3d2a

Please sign in to comment.