Skip to content

Commit

Permalink
#306 Added a new pipeline for streaming applications.
Browse files Browse the repository at this point in the history
  • Loading branch information
dafreels committed May 11, 2022
1 parent 2bdb7b5 commit fe07ff5
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 0 deletions.
15 changes: 15 additions & 0 deletions metalus-common/docs/streamingmonitor.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[Documentation Home](../../docs/readme.md) | [Common Home](../readme.md)

# Streaming Monitor
This pipeline is designed to be chained with other pipelines that may produce a streaming DataFrame. The _STREAMING_DATAFRAME_
global is used to retrieve the DataFrame that will be monitored. This pipeline is useful as-is when the _streaming-job_
command line parameter is set to true.

## General Information
**Id**: _streaming-monitor_

**Name**: _Streaming Monitor_

## Required Parameters (all parameters should be part of the globals)
* **STREAMING_DATAFRAME** - The DataFrame to be monitored.
* **streamingMonitorClassName** - The **optional** fully qualified class name of the [streaming monitor class](./streamingquerymonitor.md) to use for the query.
1 change: 1 addition & 0 deletions metalus-common/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ using Spark.
## Pipelines/Step Groups
* [Copy File](docs/copyfile.md) * _Uses new connectors api_
* [Load To Bronze](docs/loadtobronze.md) * _Uses new connectors api_
* [Streaming Monitor](docs/streamingmonitor.md)

### Deprecated
* [SFTP to HDFS](docs/sftp2hdfs.md)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
{
"id": "streaming-monitor",
"name": "Streaming Monitor",
"category": "pipeline",
"description": "Provides a simple pipeline to monitor streaming queries",
"layout": {
"Monitor": {
"x": 493,
"y": 32
},
"LOG_STOP": {
"x": 428,
"y": 199
}
},
"steps": [
{
"id": "Monitor",
"category": "Streaming",
"creationDate": "2022-04-26T18:35:47.891Z",
"description": "Given a StreamingQuery, this step will invoke the monitor thread and wait while records are processed. The monitor class will be used to stop the query and determine if further processing should occur.",
"displayName": "Streaming Monitor",
"engineMeta": {
"spark": "FlowUtilsSteps.monitorStreamingQuery",
"pkg": "com.acxiom.pipeline.steps",
"results": {
"primaryType": "com.acxiom.pipeline.PipelineStepResponse"
}
},
"modifiedDate": "2022-04-26T18:35:47.891Z",
"params": [
{
"type": "object",
"name": "query",
"required": false,
"parameterType": "org.apache.spark.sql.streaming.StreamingQuery",
"description": "Pulls the DataFrame from the global STREAMING_DATAFRAME",
"value": "!STREAMING_DATAFRAME",
"className": "org.apache.spark.sql.streaming.StreamingQuery"
},
{
"type": "text",
"name": "streamingMonitorClassName",
"required": false,
"description": "Maps the value from the STREAMING_MONITOR_CLASS_NAME global. The default class used is com.acxiom.pipeline.streaming.BaseStreamingQueryMonitor which will continue running.",
"value": "!STREAMING_MONITOR_CLASS_NAME || com.acxiom.pipeline.streaming.BaseStreamingQueryMonitor"
},
{
"type": "result",
"name": "continue",
"required": false
},
{
"type": "result",
"name": "stop",
"required": false,
"value": "LOG_STOP"
}
],
"tags": [
"metalus-common_2.12-spark_3.1-1.8.5.jar"
],
"type": "branch",
"stepId": "64c983e2-5eac-4fb6-87b2-024b69aa0ded"
},
{
"id": "LOG_STOP",
"category": "Logging",
"creationDate": "2022-04-26T18:35:46.868Z",
"description": "Log a simple message",
"displayName": "Log Message",
"engineMeta": {
"spark": "LoggingSteps.logMessage",
"pkg": "com.acxiom.pipeline.steps"
},
"modifiedDate": "2022-04-26T18:35:46.868Z",
"params": [
{
"type": "text",
"name": "message",
"required": true,
"description": "The message to log",
"value": "End of streaming query"
},
{
"type": "text",
"name": "level",
"required": true,
"description": "Log level at which to log. Should be a valid log4j level",
"value": "INFO"
}
],
"tags": [
"metalus-common_2.12-spark_3.1-1.8.5.jar"
],
"type": "Pipeline",
"stepId": "931ad4e5-4501-4716-853a-30fbf8fb6090"
}
]
}

0 comments on commit fe07ff5

Please sign in to comment.