Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release 1.8.3 #262

Merged
merged 26 commits into from
Oct 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
c678acc
#253 Fixed an issue where the default credential parser for the secre…
dafreels Aug 30, 2021
bb37150
Updated build file to fix issue with release step failing and added d…
dafreels Sep 1, 2021
c603dc3
#252 Implemented DataConnectorSteps to allow generic read/write using…
dafreels Sep 7, 2021
490420c
#252 Fixed compilation issue with new credential
dafreels Sep 7, 2021
9c78f25
#256 Added ability to specify the credential name to use for connecti…
dafreels Sep 9, 2021
60044b0
#252 Implemented streaming to batch and batch to streaming within the…
dafreels Sep 13, 2021
7cc7f5c
#252 Fixed an issue with the KinesisWriter
dafreels Sep 13, 2021
b4c27dc
#252 Added BigQueryDataConnector
dafreels Sep 14, 2021
ed297ee
#252 Added KafkaDataConnector
dafreels Sep 14, 2021
2dd8d65
#252 Added MongoDataConnector
dafreels Sep 15, 2021
6fdf579
#252 Optimized the DataFrame to Pub/Sub publish logic to allow batching.
dafreels Sep 15, 2021
189947b
#252 Implemented the FileConnector implementations
dafreels Sep 16, 2021
dcc11ae
#252 Added tests for FileConnectors
dafreels Sep 16, 2021
2e51fc0
#252 Moved the read and write options from the constructor to the fun…
dafreels Sep 17, 2021
e656fb5
#254 Added retries at the step level
dafreels Oct 5, 2021
523fb59
#254 #258 Implemented a simple retry step to allow multiple steps to …
dafreels Oct 7, 2021
3abcdc3
#258 Updated copy pipeline copy step to make 5 attempts when failed a…
dafreels Oct 7, 2021
f6ba26f
#252 Created a load to bronze pipeline that uses connectors for reusa…
dafreels Oct 8, 2021
9f931da
#252 updated documentation
dafreels Oct 8, 2021
122ffc0
#252 updated documentation
dafreels Oct 11, 2021
278bbbe
Merge pull request #255 from dafreels/develop
dafreels Oct 11, 2021
ab0b4da
Adding SparkConfigurationSteps
djfreels Oct 12, 2021
4ed79ba
tweaking config steps, fixing a bug in S3Utilities.
djfreels Oct 14, 2021
3981366
* adding support to limit fork step threads
djfreels Oct 14, 2021
eeab70f
Added support to map from rootGlobals to application globals
djfreels Oct 15, 2021
efd0354
Merge pull request #259 from djfreels/develop
dafreels Oct 15, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 5 additions & 31 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ jobs:
echo 'METALUS_VERSION<<EOF' >> $GITHUB_ENV
mvn -q -Dexec.executable="echo" -Dexec.args='${project.version}' --non-recursive exec:exec >> $GITHUB_ENV
echo 'EOF' >> $GITHUB_ENV
- name: Display Metalus Application Target Directory
run: |
pwd
ls -l metalus-application/target
- name: Upload Application Jar
uses: actions/upload-artifact@v2
with:
Expand Down Expand Up @@ -177,38 +181,8 @@ jobs:
echo 'METALUS_VERSION<<EOF' >> $GITHUB_ENV
mvn -q -Dexec.executable="echo" -Dexec.args='${project.version}' --non-recursive exec:exec >> $GITHUB_ENV
echo 'EOF' >> $GITHUB_ENV
- name: Download Application Jar Spark 2.4
uses: actions/download-artifact@v2
with:
name: metalus-application_2.11-spark_2.4-${{ env.METALUS_VERSION }}.jar
- name: Download Utils Spark 2.4
uses: actions/download-artifact@v2
with:
name: metalus-utils_2.11-spark_2.4-${{ env.METALUS_VERSION }}.tar.gz
- name: Download Application Jar Spark 2.4 Scala 2.12
uses: actions/download-artifact@v2
with:
name: metalus-application_2.12-spark_2.4-${{ env.METALUS_VERSION }}.jar
- name: Download Utils Spark 2.4 Scala 2.12
uses: actions/download-artifact@v2
with:
name: metalus-utils_2.12-spark_2.4-${{ env.METALUS_VERSION }}.tar.gz
- name: Download Application Jar Spark 3.0
uses: actions/download-artifact@v2
with:
name: metalus-application_2.12-spark_3.0-${{ env.METALUS_VERSION }}.jar
- name: Download Utils Spark 3.0
uses: actions/download-artifact@v2
with:
name: metalus-utils_2.12-spark_3.0-${{ env.METALUS_VERSION }}.tar.gz
- name: Download Application Jar Spark 3.1
uses: actions/download-artifact@v2
with:
name: metalus-application_2.12-spark_3.1-${{ env.METALUS_VERSION }}.jar
- name: Download Utils Spark 3.1
- name: Download Artifacts
uses: actions/download-artifact@v2
with:
name: metalus-utils_2.12-spark_3.1-${{ env.METALUS_VERSION }}.tar.gz
- name: Create Github Release
uses: "marvinpinto/action-automatic-releases@latest"
with:
Expand Down
15 changes: 15 additions & 0 deletions docs/connectors.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[Documentation Home](readme.md)

# Connectors
Connectors provide an abstraction for working with data. Connectors are designed to be modeled as JSON to easily include
in [applications](applications.md), [pipelines](pipelines.md) and [steps](steps.md). Each implementation implement
functions that will be used by steps to work with data in a generic manner.

![Connectors](images/Connectors.png)

## File Connectors
File Connectors are designed to operate with a single file. These connectors provide access to [FileManager](filemanager.md)
implementation for the specified file system. More information can be found [here](fileconnectors.md).
## Data Connectors
Data Connectors provide an abstraction for loading and writing data. The provided functions work specifically on DataFrames.
More information can be found [here](dataconnectors.md).
184 changes: 184 additions & 0 deletions docs/dataconnectors.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
[Documentation Home](readme.md) | [Connectors](connectors.md)

# Data Connectors
Data Connectors provide an abstraction for loading and writing data using the [DataConnectorSteps](../metalus-common/docs/dataconnectorsteps.md).
This is useful for creating generic pipelines that can used across providers without source/destination knowledge prior
to runtime. Each connector has the responsibility to load and write a DataFrame based on the underlying system.

**Parameters**
The following parameters are available to all data connectors:

* **name** - The name of the connector
* **credentialName** - The optional credential name to use to authenticate
* **credential** - The optional credential to use to authenticate

## Batch
Connectors that are designed to load and write data for batch processing will extend the _BatchDataConnector_. These
are very straightforward and offer the most reusable components.

### HDFSDataConnector
This connector provides access to HDFS. The _credentialName_ and _credential_ parameters are not used in this implementation,
instead relying on the permissions of the cluster. Below is an example setup:

#### Scala
```scala
val connector = HDFSDataConnector("my-connector", None, None)
```
#### Globals JSON
```json
{
"myConnector": {
"className": "com.acxiom.pipeline.connectors.HDFSDataConnector",
"object": {
"name": "my-connector"
}
}
}
```
### S3DataConnector
This connector provides access to S3. Below is an example setup that expects a secrets manager credential provider:
#### Scala
```scala
val connector = S3DataConnector("my-connector", Some("my-credential-name-for-secrets-manager"), None)
```
#### Globals JSON
```json
{
"myS3Connector": {
"className": "com.acxiom.aws.pipeline.connectors.S3DataConnector",
"object": {
"name": "my-connector",
"credentialName": "my-credential-name-for-secrets-manager"
}
}
}
```
### GCSDataConnector
This connector provides access to GCS. Below is an example setup that expects a secrets manager credential provider:
#### Scala
```scala
val connector = GCSDataConnector("my-connector", Some("my-credential-name-for-secrets-manager"), None)
```
#### Globals JSON
```json
{
"myGCSConnector": {
"className": "com.acxiom.gcp.pipeline.connectors.GCSDataConnector",
"object": {
"name": "my-connector",
"credentialName": "my-credential-name-for-secrets-manager"
}
}
}
```
### BigQueryDataConnector
This connector provides access to BigQuery. Below is an example setup that expects a secrets manager credential provider:
#### Scala
```scala
val connector = BigQueryDataConnector("temp-bucket-name", "my-connector", Some("my-credential-name-for-secrets-manager"), None)
```
#### Globals JSON
```json
{
"bigQueryConnector": {
"className": "com.acxiom.gcp.pipeline.connectors.BigQueryDataConnector",
"object": {
"name": "my-connector",
"credentialName": "my-credential-name-for-secrets-manager",
"tempWriteBucket": "temp-bucket-name"
}
}
}
```
### MongoDataConnector
This connector provides access to Mongo. Security is handled using the uri or a _UserNameCredential_. In addition to
the standard parameters, the following parameters are available:

* **uri** - The name connection URI
* **collectionName** - The name of the collection

#### Scala
```scala
val connector = MongoDataConnector("mongodb://127.0.0.1/test", "myCollectionName", "my-connector", Some("my-credential-name-for-secrets-manager"), None)
```
#### Globals JSON
```json
{
"customMongoConnector": {
"className": "com.acxiom.metalus.pipeline.connectors.MongoDataConnector",
"object": {
"name": "my-connector",
"credentialName": "my-credential-name-for-secrets-manager",
"uri": "mongodb://127.0.0.1/test",
"collectionName": "myCollectionName"
}
}
}
```
## Streaming
Streaming connectors offer a way to use pipelines with [Spark Structured Streaming](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html) without
the need to write new [drivers](pipeline-drivers.md). When designing pipelines for streaming, care must be taken to not
inject steps that are more batch oriented such as doing a file copy.

### KinesisDataConnector
This connector provides access to Kinesis. In addition to the standard parameters, the following parameters are
available:

* **streamName** - The name of the Kinesis stream.
* **region** - The region containing the Kinesis stream
* **partitionKey** - The optional static partition key to use
* **partitionKeyIndex** - The optional field index in the DataFrame row containing the value to use as the partition key
* **separator** - The field separator to use when formatting the row data

Below is an example setup that expects a secrets manager credential provider:
#### Scala
```scala
val connector = KinesisDataConnector("stream-name", "us-east-1", None, Some(15), ",", "my-connector",
Some("my-credential-name-for-secrets-manager"))
```
#### Globals JSON
```json
{
"kinesisConnector": {
"className": "com.acxiom.aws.pipeline.connectors.KinesisDataConnector",
"object": {
"name": "my-connector",
"credentialName": "my-credential-name-for-secrets-manager",
"streamName": "stream-name",
"region": "us-east-1",
"separator": ","
}
}
}
```
### KafkaDataConnector
This connector provides access to Kinesis. In addition to the standard parameters, the following parameters are
available:

* **topics** - The name of the Kinesis stream.
* **kafkaNodes** - The region containing the Kinesis stream
* **key** - The optional static key to use
* **keyField** - The optional field name in the DataFrame row containing the value to use as the key
* **separator** - The field separator to use when formatting the row data
*
Below is an example setup that expects a secrets manager credential provider:
#### Scala
```scala
val connector = KafkaDataConnector("topic-name1,topic-name2", "host1:port1,host2:port2", "message-key", None,
"my-connector", Some("my-credential-name-for-secrets-manager"))
```
#### Globals JSON
```json
{
"kafkaConnector": {
"className": "com.acxiom.kafka.pipeline.connectors.KafkaDataConnector",
"object": {
"name": "my-connector",
"credentialName": "my-credential-name-for-secrets-manager",
"topics": "topic-name1,topic-name2",
"kafkaNodes": "host1:port1,host2:port2",
"key": "message-key"
}
}
}
```
5 changes: 5 additions & 0 deletions docs/error-handling.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ both execute normally and invoke _Error Step_. The _Error Step_ will throw an e
_Error Handler_ step and then complete the pipeline. _Step 4_ will never be executed when there is an exception. Metalus
will consider this as a successful execution.

## Step Retry
A step which defines the _retryLimit_ attribute will be automatically restarted when an exception is thrown. Once the
limit has been reached, the [nextStepOnError](#next-step-error-handling) will be invoked or the exception will
be thrown to stop the pipeline.

![Next Step On Error Flow](images/next_step_on_error_flow.png)
## Pipeline Exceptions
Metalus uses the _PipelineStepException_ trait as the base for application exceptions. Any exception that extends
Expand Down
113 changes: 113 additions & 0 deletions docs/fileconnectors.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
[Documentation Home](readme.md) | [Connectors](connectors.md)

# FileConnectors
File connectors are used to invoke [FileManager](filemanager.md) implementations that can be used with the
[FileManagerSteps](../metalus-common/docs/filemanagersteps.md) object. FileConnectors are used when a pipeline/step
needs to work on a single file without a DataFrame. Most common operations are expected to be the [Copy step](../metalus-common/docs/filemanagersteps.md#copy)
and the [Create FileManager step](../metalus-common/docs/filemanagersteps.md#create-a-filemanager).

**Parameters**
The following parameters are available to all file connectors:

* **name** - The name of the connector
* **credentialName** - The optional credential name to use to authenticate
* **credential** - The optional credential to use to authenticate

## HDFSFileConnector
This connector provides access to the HDFS file system. The _credentialName_ and _credential_ parameters are not used in
this implementation, instead relying on the permissions of the cluster. Below is an example setup:
#### Scala
```scala
val connector = HDFSFileConnector("my-hdfs-connector", None, None)
```
#### Globals JSON
```json
{
"myHdfsConnector": {
"className": "com.acxiom.pipeline.connectors.HDFSFileConnector",
"object": {
"name": "my-connector"
}
}
}
```
## SFTPFileManager
This connector provides access to an SFTP server. In addition to the standard parameters, the following parameters are
available:

* **hostName** - The host name of the SFTP resource
* **port** - The optional SFTP port
* **knownHosts** - The optional file path to the known_hosts file
* **bulkRequests** - The optional number of requests that may be sent at one time
* **config** - Optional config options
* **timeout** - Optional connection timeout

Below is an example setup:
#### Scala
```scala
val connector = SFTPFileConnector("sftp.myhost.com", "my-sftp-connector", None, None)
```
#### Globals JSON
```json
{
"sftpConnector": {
"className": "com.acxiom.pipeline.connectors.SFTPFileConnector",
"object": {
"name": "my-connector",
"hostName": "sftp.myhost.com"
}
}
}
```
## S3FileManager
This connector provides access to the S3 file system. In addition to the standard parameters, the following parameters are
available:

* **region** - The AWS region
* **bucket** - The S3 bucket

Below is an example setup:
#### Scala
```scala
val connector = S3FileConnector("us-east-1", "my-bucket", "my-connector", Some("my-credential-name-for-secrets-manager"), None)
```
#### Globals JSON
```json
{
"connector": {
"className": "com.acxiom.aws.pipeline.connectors.S3FileConnector",
"object": {
"name": "my-connector",
"region": "us-east-1",
"bucket": "my-bucket",
"credentialName": "my-credential-name-for-secrets-manager"
}
}
}
```
## GCSFileManager
This connector provides access to the S3 file system. In addition to the standard parameters, the following parameters are
available:

* **projectId** - The project id of the GCS project
* **bucket** - The name of the GCS bucket

Below is an example setup:
#### Scala
```scala
val connector = GCSFileConnector("my-dev-project", "my-bucket", "my-connector", Some("my-credential-name-for-secrets-manager"), None)
```
#### Globals JSON
```json
{
"connector": {
"className": "com.acxiom.gcp.pipeline.connectors.GCSFileConnector",
"object": {
"name": "my-connector",
"projectId": "my-dev-project",
"bucket": "my-bucket",
"credentialName": "my-credential-name-for-secrets-manager"
}
}
}
```
Binary file added docs/images/Connectors.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
5 changes: 5 additions & 0 deletions docs/pipeline-steps.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ The pipeline step will include a new attribute named **nextStepId** which indica
that should be executed. Branch steps will not have this attribute since the next step id is determined by the **params**
array **result** type parameters.

### retryLimit
The pipeline step has an optional attribute named **retryLimit** which will retry the step if an exception is thrown.
Once the limit has been reached, the **nextStepOnError** will be called if it has been defined or the exception will
be thrown to stop the pipeline.

### params
At a minimum, the parameter from the step template should be replicated here. Required parameters will have to additionally
set the **value** attribute unless the **defaultValue** attribute is provided. Additional attributes will be required
Expand Down
Loading