From fe07ff5e4e8940cb779e32dc65a46c56adac02fc Mon Sep 17 00:00:00 2001 From: dafreels Date: Wed, 11 May 2022 10:07:58 -0400 Subject: [PATCH] #306 Added a new pipeline for streaming applications. --- metalus-common/docs/streamingmonitor.md | 15 +++ metalus-common/readme.md | 1 + .../metadata/pipelines/streaming-monitor.json | 100 ++++++++++++++++++ 3 files changed, 116 insertions(+) create mode 100644 metalus-common/docs/streamingmonitor.md create mode 100644 metalus-common/src/main/resources/metadata/pipelines/streaming-monitor.json diff --git a/metalus-common/docs/streamingmonitor.md b/metalus-common/docs/streamingmonitor.md new file mode 100644 index 00000000..c7b655f4 --- /dev/null +++ b/metalus-common/docs/streamingmonitor.md @@ -0,0 +1,15 @@ +[Documentation Home](../../docs/readme.md) | [Common Home](../readme.md) + +# Streaming Monitor +This pipeline is designed to be chained with other pipelines that may produce a streaming DataFrame. The _STREAMING_DATAFRAME_ +global is used to retrieve the DataFrame that will be monitored. This pipeline is useful as-is when the _streaming-job_ +command line parameter is set to true. + +## General Information +**Id**: _streaming-monitor_ + +**Name**: _Streaming Monitor_ + +## Required Parameters (all parameters should be part of the globals) +* **STREAMING_DATAFRAME** - The DataFrame to be monitored. +* **streamingMonitorClassName** - The **optional** fully qualified class name of the [streaming monitor class](./streamingquerymonitor.md) to use for the query. diff --git a/metalus-common/readme.md b/metalus-common/readme.md index 24433620..17ebdb0a 100644 --- a/metalus-common/readme.md +++ b/metalus-common/readme.md @@ -25,6 +25,7 @@ using Spark. ## Pipelines/Step Groups * [Copy File](docs/copyfile.md) * _Uses new connectors api_ * [Load To Bronze](docs/loadtobronze.md) * _Uses new connectors api_ +* [Streaming Monitor](docs/streamingmonitor.md) ### Deprecated * [SFTP to HDFS](docs/sftp2hdfs.md) diff --git a/metalus-common/src/main/resources/metadata/pipelines/streaming-monitor.json b/metalus-common/src/main/resources/metadata/pipelines/streaming-monitor.json new file mode 100644 index 00000000..8e57b77c --- /dev/null +++ b/metalus-common/src/main/resources/metadata/pipelines/streaming-monitor.json @@ -0,0 +1,100 @@ +{ + "id": "streaming-monitor", + "name": "Streaming Monitor", + "category": "pipeline", + "description": "Provides a simple pipeline to monitor streaming queries", + "layout": { + "Monitor": { + "x": 493, + "y": 32 + }, + "LOG_STOP": { + "x": 428, + "y": 199 + } + }, + "steps": [ + { + "id": "Monitor", + "category": "Streaming", + "creationDate": "2022-04-26T18:35:47.891Z", + "description": "Given a StreamingQuery, this step will invoke the monitor thread and wait while records are processed. The monitor class will be used to stop the query and determine if further processing should occur.", + "displayName": "Streaming Monitor", + "engineMeta": { + "spark": "FlowUtilsSteps.monitorStreamingQuery", + "pkg": "com.acxiom.pipeline.steps", + "results": { + "primaryType": "com.acxiom.pipeline.PipelineStepResponse" + } + }, + "modifiedDate": "2022-04-26T18:35:47.891Z", + "params": [ + { + "type": "object", + "name": "query", + "required": false, + "parameterType": "org.apache.spark.sql.streaming.StreamingQuery", + "description": "Pulls the DataFrame from the global STREAMING_DATAFRAME", + "value": "!STREAMING_DATAFRAME", + "className": "org.apache.spark.sql.streaming.StreamingQuery" + }, + { + "type": "text", + "name": "streamingMonitorClassName", + "required": false, + "description": "Maps the value from the STREAMING_MONITOR_CLASS_NAME global. The default class used is com.acxiom.pipeline.streaming.BaseStreamingQueryMonitor which will continue running.", + "value": "!STREAMING_MONITOR_CLASS_NAME || com.acxiom.pipeline.streaming.BaseStreamingQueryMonitor" + }, + { + "type": "result", + "name": "continue", + "required": false + }, + { + "type": "result", + "name": "stop", + "required": false, + "value": "LOG_STOP" + } + ], + "tags": [ + "metalus-common_2.12-spark_3.1-1.8.5.jar" + ], + "type": "branch", + "stepId": "64c983e2-5eac-4fb6-87b2-024b69aa0ded" + }, + { + "id": "LOG_STOP", + "category": "Logging", + "creationDate": "2022-04-26T18:35:46.868Z", + "description": "Log a simple message", + "displayName": "Log Message", + "engineMeta": { + "spark": "LoggingSteps.logMessage", + "pkg": "com.acxiom.pipeline.steps" + }, + "modifiedDate": "2022-04-26T18:35:46.868Z", + "params": [ + { + "type": "text", + "name": "message", + "required": true, + "description": "The message to log", + "value": "End of streaming query" + }, + { + "type": "text", + "name": "level", + "required": true, + "description": "Log level at which to log. Should be a valid log4j level", + "value": "INFO" + } + ], + "tags": [ + "metalus-common_2.12-spark_3.1-1.8.5.jar" + ], + "type": "Pipeline", + "stepId": "931ad4e5-4501-4716-853a-30fbf8fb6090" + } + ] +}