diff --git a/metalus-core/src/main/scala/com/acxiom/pipeline/Constants.scala b/metalus-core/src/main/scala/com/acxiom/pipeline/Constants.scala index d52878c3..4026d589 100644 --- a/metalus-core/src/main/scala/com/acxiom/pipeline/Constants.scala +++ b/metalus-core/src/main/scala/com/acxiom/pipeline/Constants.scala @@ -9,6 +9,7 @@ object Constants { val FIVE: Int = 5 val SIX: Int = 6 val SEVEN: Int = 7 + val NINE = 9 val TEN: Int = 10 val ONE_HUNDRED: Int = 100 diff --git a/metalus-core/src/main/scala/com/acxiom/pipeline/PipelineExecutor.scala b/metalus-core/src/main/scala/com/acxiom/pipeline/PipelineExecutor.scala index 84b5df8b..4f804e2d 100644 --- a/metalus-core/src/main/scala/com/acxiom/pipeline/PipelineExecutor.scala +++ b/metalus-core/src/main/scala/com/acxiom/pipeline/PipelineExecutor.scala @@ -1,20 +1,12 @@ package com.acxiom.pipeline import com.acxiom.pipeline.audits.{AuditType, ExecutionAudit} -import com.acxiom.pipeline.utils.ReflectionUtils +import com.acxiom.pipeline.flow.{PipelineFlow, PipelineStepFlow} import org.apache.log4j.Logger -import scala.annotation.tailrec -import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.duration.Duration -import scala.concurrent.{Await, Future} -import scala.runtime.BoxedUnit - object PipelineExecutor { private val logger = Logger.getLogger(getClass) - private val NINE = 9 - def executePipelines(pipelines: List[Pipeline], initialPipelineId: Option[String], initialContext: PipelineContext): PipelineExecutionResult = { @@ -24,31 +16,15 @@ object PipelineExecutor { }), pipelines.length) } else { pipelines } val executionId = initialContext.getGlobalString("executionId").getOrElse("root") - val esContext = handleEvent(initialContext.setRootAudit(ExecutionAudit(executionId, AuditType.EXECUTION, Map[String, Any](), System.currentTimeMillis())), + val esContext = PipelineFlow + .handleEvent(initialContext.setRootAudit(ExecutionAudit(executionId, AuditType.EXECUTION, Map[String, Any](), System.currentTimeMillis())), "executionStarted", List(executingPipelines, initialContext)) try { val pipelineLookup = executingPipelines.map(p => p.id.getOrElse("") -> p.name.getOrElse("")).toMap - val ctx = executingPipelines.foldLeft(esContext)((accCtx, pipeline) => { - // Map the steps for easier lookup during execution - val stepLookup = PipelineExecutorValidations.validateAndCreateStepLookup(pipeline) - val auditCtx = accCtx.setPipelineAudit( - ExecutionAudit(pipeline.id.get, AuditType.PIPELINE, Map[String, Any](), System.currentTimeMillis(), None, None, Some(List[ExecutionAudit]( - ExecutionAudit(pipeline.steps.get.head.id.get, AuditType.STEP, Map[String, Any](), System.currentTimeMillis()))))) - val updatedCtx = handleEvent(auditCtx, "pipelineStarted", List(pipeline, auditCtx)) - .setGlobal("pipelineId", pipeline.id).setGlobal("stepId", pipeline.steps.get.head.id.get) - try { - val resultPipelineContext = executeStep(pipeline.steps.get.head, pipeline, stepLookup, updatedCtx) - val messages = resultPipelineContext.getStepMessages - processStepMessages(messages, pipelineLookup) - val auditCtx = resultPipelineContext.setPipelineAudit( - resultPipelineContext.getPipelineAudit(pipeline.id.get).get.setEnd(System.currentTimeMillis())) - handleEvent(auditCtx, "pipelineFinished", List(pipeline, auditCtx)) - } catch { - case t: Throwable => throw handleStepExecutionExceptions(t, pipeline, accCtx, Some(executingPipelines)) - } - }) + val ctx = executingPipelines.foldLeft(esContext)((accCtx, pipeline) => + PipelineStepFlow(pipeline, accCtx, pipelineLookup, executingPipelines).execute().pipelineContext) val exCtx = ctx.setRootAudit(ctx.rootAudit.setEnd(System.currentTimeMillis())) - PipelineExecutionResult(handleEvent(exCtx, "executionFinished", List(executingPipelines, exCtx)), + PipelineExecutionResult(PipelineFlow.handleEvent(exCtx, "executionFinished", List(executingPipelines, exCtx)), success = true, paused = false, None) } catch { case fe: ForkedPipelineStepException => @@ -64,654 +40,4 @@ object PipelineExecutor { case t: Throwable => throw t } } - - /** - * This function will process step messages and throw any appropriate exceptions - * - * @param messages A list of PipelineStepMessages that need to be processed. - * @param pipelineLookup A map of Pipelines keyed by the id. This is used to quickly retrieve additional Pipeline data. - */ - private def processStepMessages(messages: Option[List[PipelineStepMessage]], pipelineLookup: Map[String, String]): Unit = { - if (messages.isDefined && messages.get.nonEmpty) { - messages.get.foreach(m => m.messageType match { - case PipelineStepMessageType.error => - throw PipelineException(message = Some(m.message), pipelineProgress = Some(PipelineExecutionInfo(Some(m.stepId), Some(m.pipelineId)))) - case PipelineStepMessageType.pause => - throw PauseException(message = Some(m.message), pipelineProgress = Some(PipelineExecutionInfo(Some(m.stepId), Some(m.pipelineId)))) - case PipelineStepMessageType.warn => - logger.warn(s"Step ${m.stepId} in pipeline ${pipelineLookup(m.pipelineId)} issued a warning: ${m.message}") - case _ => - }) - } - } - - @tailrec - private def executeStep(step: PipelineStep, pipeline: Pipeline, steps: Map[String, PipelineStep], - pipelineContext: PipelineContext): PipelineContext = { - logger.debug(s"Executing Step (${step.id.getOrElse("")}) ${step.displayName.getOrElse("")}") - val ssContext = handleEvent(pipelineContext, "pipelineStepStarted", List(pipeline, step, pipelineContext)) - val (nextStepId, sfContext) = try { - val result = processPipelineStep(step, pipeline, steps, pipelineContext) - // setup the next step - val nextStepId = getNextStepId(step, result) - val newPipelineContext = updatePipelineContext(step, result, nextStepId, ssContext) - // run the step finished event - val sfContext = handleEvent(newPipelineContext, "pipelineStepFinished", List(pipeline, step, newPipelineContext)) - (nextStepId, sfContext) - } catch { - case e: Throwable if step.nextStepOnError.isDefined => - // handle exception - val ex = handleStepExecutionExceptions(e, pipeline, pipelineContext) - // put exception on the context as the "result" for this step. - val updateContext = updatePipelineContext(step, PipelineStepResponse(Some(ex), None), step.nextStepOnError, ssContext) - (step.nextStepOnError, updateContext) - case e => throw e - } - // Call the next step here - if (steps.contains(nextStepId.getOrElse("")) && steps(nextStepId.getOrElse("")).`type`.getOrElse("") == PipelineStepType.JOIN) { - sfContext - } else if (steps.contains(nextStepId.getOrElse(""))) { - executeStep(steps(nextStepId.get), pipeline, steps, sfContext) - } else if (nextStepId.isDefined && nextStepId.get.nonEmpty) { - val s = steps.find(_._2.nextStepId.getOrElse("") == nextStepId.getOrElse("FUNKYSTEPID")) - // See if this is a sub-fork - if (s.isDefined && s.get._2.`type`.getOrElse("") == PipelineStepType.JOIN) { - sfContext - } else { - throw PipelineException(message = Some(s"Step Id (${nextStepId.get}) does not exist in pipeline"), - pipelineProgress = Some(PipelineExecutionInfo(nextStepId, Some(sfContext.getGlobalString("pipelineId").getOrElse(""))))) - } - } else { - sfContext - } - } - - private def processPipelineStep(step: PipelineStep, pipeline: Pipeline, steps: Map[String, PipelineStep], - pipelineContext: PipelineContext): Any = { - // Create a map of values for each defined parameter - val parameterValues: Map[String, Any] = pipelineContext.parameterMapper.createStepParameterMap(step, pipelineContext) - (step.executeIfEmpty.getOrElse(""), step.`type`.getOrElse("").toLowerCase) match { - // process step normally if empty - case ("", PipelineStepType.FORK) => processForkStep(step, pipeline, steps, parameterValues, pipelineContext) - case ("", PipelineStepType.STEPGROUP) => processStepGroup(step, pipeline, parameterValues, pipelineContext) - case ("", _) => ReflectionUtils.processStep(step, pipeline, parameterValues, pipelineContext) - case (value, _) => - logger.debug(s"Evaluating execute if empty: $value") - // wrap the value in a parameter object - val param = Parameter(Some("text"), Some("dynamic"), Some(true), None, Some(value)) - val ret = pipelineContext.parameterMapper.mapParameter(param, pipelineContext) - ret match { - case some: Some[_] => - logger.debug("Returning existing value") - PipelineStepResponse(some, None) - case None => - logger.debug("Executing step normally") - ReflectionUtils.processStep(step, pipeline, parameterValues, pipelineContext) - case _ => - logger.debug("Returning existing value") - PipelineStepResponse(Some(ret), None) - } - } - } - - private def updatePipelineContext(step: PipelineStep, result: Any, nextStepId: Option[String], pipelineContext: PipelineContext): PipelineContext = { - val pipelineProgress = pipelineContext.getPipelineExecutionInfo - val pipelineId = pipelineProgress.pipelineId.getOrElse("") - val groupId = pipelineProgress.groupId - val ctx = result match { - case forkResult: ForkStepResult => forkResult.pipelineContext - case groupResult: StepGroupResult => - val updatedCtx = pipelineContext.setStepAudit(pipelineId, groupResult.audit) - .setParameterByPipelineId(pipelineId, step.id.getOrElse(""), groupResult.pipelineStepResponse) - .setGlobal("pipelineId", pipelineId) - .setGlobal("lastStepId", step.id.getOrElse("")) - .setGlobal("stepId", nextStepId) - if (groupResult.globalUpdates.nonEmpty) { - groupResult.globalUpdates.foldLeft(updatedCtx)((ctx, update) => { - updateGlobals(update.stepName, update.pipelineId, ctx, update.global, update.globalName) - }) - } else { - updatedCtx - } - case _ => - processResponseGlobals(step, result, pipelineId, pipelineContext) - .setParameterByPipelineId(pipelineId, step.id.getOrElse(""), result) - .setGlobal("pipelineId", pipelineId) - .setGlobal("lastStepId", step.id.getOrElse("")) - .setGlobal("stepId", nextStepId) - } - - val updateCtx = if (nextStepId.isDefined) { - val metrics = if (ctx.sparkSession.isDefined) { - val executorStatus = ctx.sparkSession.get.sparkContext.getExecutorMemoryStatus - Map[String, Any]("startExecutorCount" -> executorStatus.size) - } else { Map[String, Any]() } - ctx.setStepAudit(pipelineId, - ExecutionAudit(nextStepId.get, AuditType.STEP, metrics, System.currentTimeMillis(), None, groupId)) - } else { - ctx - } - val audit = if (updateCtx.sparkSession.isDefined) { - val executorStatus = updateCtx.sparkSession.get.sparkContext.getExecutorMemoryStatus - updateCtx.getStepAudit(pipelineId, step.id.get, groupId).get.setEnd(System.currentTimeMillis()) - .setMetric("endExecutorCount", executorStatus.size) - } else { - updateCtx.getStepAudit(pipelineId, step.id.get, groupId).get.setEnd(System.currentTimeMillis()) - } - updateCtx.setStepAudit(pipelineId, audit) - } - - private def processResponseGlobals(step: PipelineStep, result: Any, pipelineId: String, updatedCtx: PipelineContext) = { - result match { - case response: PipelineStepResponse if response.namedReturns.isDefined && response.namedReturns.get.nonEmpty => - response.namedReturns.get.foldLeft(updatedCtx)((context, entry) => { - entry._1 match { - case e if e.startsWith("$globals.") => - val keyName = entry._1.substring(NINE) - updateGlobals(step.displayName.get, pipelineId, context, entry._2, keyName) - case e if e.startsWith("$metrics.") => - val keyName = entry._1.substring(NINE) - context.setStepMetric(pipelineId, step.id.getOrElse(""), None, keyName, entry._2) - case _ => context - } - }) - case _ => updatedCtx - } - } - - private def updateGlobals(stepName: String, pipelineId: String, context: PipelineContext, global: Any, keyName: String) = { - if (context.globals.get.contains(keyName)) { - logger.warn(s"Overwriting global named $keyName with value provided by step $stepName in pipeline $pipelineId") - } else { - logger.info(s"Adding global named $keyName with value provided by step $stepName in pipeline $pipelineId") - } - context.setGlobal(keyName, global) - } - - private def getNextStepId(step: PipelineStep, result: Any): Option[String] = { - step.`type`.getOrElse("").toLowerCase match { - case PipelineStepType.BRANCH => - // match the result against the step parameter name until we find a match - val matchValue = result match { - case response: PipelineStepResponse => response.primaryReturn.getOrElse("").toString - case _ => result - } - val matchedParameter = step.params.get.find(p => p.name.get == matchValue.toString) - // Use the value of the matched parameter as the next step id - if (matchedParameter.isDefined) { - Some(matchedParameter.get.value.get.asInstanceOf[String]) - } else { - None - } - case PipelineStepType.FORK => result.asInstanceOf[ForkStepResult].nextStepId - case _ => step.nextStepId - } - } - - private def handleEvent(pipelineContext: PipelineContext, funcName: String, params: List[Any]): PipelineContext = { - if (pipelineContext.pipelineListener.isDefined) { - val rCtx = ReflectionUtils.executeFunctionByName(pipelineContext.pipelineListener.get, funcName, params).asInstanceOf[Option[PipelineContext]] - if (rCtx.isEmpty) pipelineContext else rCtx.get - } else { pipelineContext } - } - - private def handleStepExecutionExceptions(t: Throwable, pipeline: Pipeline, - pipelineContext: PipelineContext, - pipelines: Option[List[Pipeline]] = None): PipelineStepException = { - val ex = t match { - case se: PipelineStepException => se - case t: Throwable => PipelineException(message = Some("An unknown exception has occurred"), cause = t, - pipelineProgress = Some(PipelineExecutionInfo(Some("Unknown"), pipeline.id))) - } - if (pipelineContext.pipelineListener.isDefined) { - pipelineContext.pipelineListener.get.registerStepException(ex, pipelineContext) - if (pipelines.isDefined && pipelines.get.nonEmpty) { - pipelineContext.pipelineListener.get.executionStopped(pipelines.get.slice(0, pipelines.get.indexWhere(pipeline => { - pipeline.id.get == pipeline.id.getOrElse("") - }) + 1), pipelineContext) - } - } - ex - } - - private def processStepGroup(step: PipelineStep, pipeline: Pipeline, parameterValues: Map[String, Any], - pipelineContext: PipelineContext): StepGroupResult = { - val subPipeline =if (parameterValues.contains("pipelineId")) { - pipelineContext.pipelineManager.getPipeline(parameterValues("pipelineId").toString) - .getOrElse(throw PipelineException(message = Some(s"Unable to retrieve required step group id ${parameterValues("pipelineId")}"), - pipelineProgress = Some(PipelineExecutionInfo(step.id, pipeline.id)))) - } else { parameterValues("pipeline").asInstanceOf[Pipeline] } - val firstStep = subPipeline.steps.get.head - val stepLookup = PipelineExecutorValidations.validateAndCreateStepLookup(subPipeline) - val pipelineId = pipeline.id.getOrElse("") - val stepId = step.id.getOrElse("") - val groupId = pipelineContext.getGlobalString("groupId") - val stepAudit = ExecutionAudit(firstStep.id.getOrElse(""), AuditType.STEP, Map[String, Any](), - System.currentTimeMillis(), groupId = Some(s"$pipelineId::$stepId")) - val pipelineAudit = ExecutionAudit(subPipeline.id.getOrElse(""), AuditType.PIPELINE, Map[String, Any](), - System.currentTimeMillis(), None, None, Some(List(stepAudit))) - // Inject the mappings into the globals object of the PipelineContext - val ctx = (if (parameterValues.getOrElse("useParentGlobals", false).asInstanceOf[Boolean]) { - pipelineContext.copy(globals = - Some(pipelineContext.globals.get ++ - parameterValues.getOrElse("pipelineMappings", Map[String, Any]()).asInstanceOf[Map[String, Any]])) - } else { - pipelineContext.copy(globals = Some(parameterValues.getOrElse("pipelineMappings", Map[String, Any]()).asInstanceOf[Map[String, Any]])) - }).setGlobal("pipelineId", subPipeline.id.getOrElse("")) - .setGlobal("stepId", firstStep.id.getOrElse("")) - .setGlobal("groupId", s"$pipelineId::$stepId") - .setRootAudit(pipelineContext.getStepAudit(pipelineId, stepId, groupId).get.setChildAudit(pipelineAudit)) - .copy(parameters = PipelineParameters(List(PipelineParameter(subPipeline.id.getOrElse(""), Map[String, Any]())))) - val resultCtx = executeStep(firstStep, subPipeline, stepLookup, ctx) - val pipelineParams = resultCtx.parameters.getParametersByPipelineId(subPipeline.id.getOrElse("")) - val response = extractStepGroupResponse(step, subPipeline, pipelineParams, resultCtx) - val updates = subPipeline.steps.get - .filter { step => - pipelineParams.isDefined && pipelineParams.get.parameters.get(step.id.getOrElse("")) - .exists(r => r.isInstanceOf[PipelineStepResponse] && r.asInstanceOf[PipelineStepResponse].namedReturns.isDefined) - }.foldLeft(List[GlobalUpdates]())((updates, step) => { - val updateList = pipelineParams.get.parameters(step.id.getOrElse("")).asInstanceOf[PipelineStepResponse] - .namedReturns.get.foldLeft(List[GlobalUpdates]())((list, entry) => { - if (entry._1.startsWith("$globals.")) { - list :+ GlobalUpdates(step.displayName.get, subPipeline.id.get, entry._1.substring(NINE), entry._2) - } else { list } - }) - updates ++ updateList - }) - StepGroupResult(resultCtx.rootAudit, response, updates) - } - - /** - * This function is responsible for creating the PipelineStepResponse for a step group - * - * @param step The step group step - * @param stepGroup The step group pipeline - * @param pipelineParameter The pipeline parameter for the step group pipeline - * @param pipelineContext The PipelineContext result from the execution of the Step Group Pipeline - * @return A PipelineStepResponse - */ - private def extractStepGroupResponse(step: PipelineStep, - stepGroup: Pipeline, - pipelineParameter: Option[PipelineParameter], - pipelineContext: PipelineContext) = { - if (pipelineParameter.isDefined) { - val resultParam = step.params.get.find(_.`type`.getOrElse("text") == "result") - val resultMappingParam = if (resultParam.isDefined) { - resultParam - } else if (stepGroup.stepGroupResult.isDefined) { - Some(Parameter(Some("result"), Some("output"), None, None, stepGroup.stepGroupResult)) - } else { - None - } - val stepResponseMap = Some(stepGroup.steps.get.map { step => - step.id.getOrElse("") -> pipelineParameter.get.parameters.get(step.id.getOrElse("")).map(_.asInstanceOf[PipelineStepResponse]) - }.toMap.collect { case (k, v: Some[_]) => k -> v.get }) - if (resultMappingParam.isDefined) { - val mappedValue = pipelineContext.parameterMapper.mapParameter(resultMappingParam.get, pipelineContext) match { - case value: Option[_] => value - case _: BoxedUnit => None - case response => Some(response) - } - PipelineStepResponse(mappedValue, stepResponseMap) - } else { - PipelineStepResponse(stepResponseMap, None) - } - } else { - PipelineStepResponse(None, None) - } - } - - /** - * Special handling of fork steps. - * - * @param step The fork step - * @param pipeline The pipeline being executed - * @param steps The step lookup - * @param parameterValues The parameterValues for this step - * @param pipelineContext The current pipeline context - * @return The result of processing the forked steps. - */ - private def processForkStep(step: PipelineStep, pipeline: Pipeline, steps: Map[String, PipelineStep], - parameterValues: Map[String, Any], pipelineContext: PipelineContext): ForkStepResult = { - val firstStep = steps(step.nextStepId.getOrElse("")) - // Create the list of steps that need to be executed starting with the "nextStepId" - val forkFlow = getForkSteps(firstStep, pipeline, steps, - ForkStepFlow(List(), pipeline, List[ForkPair](ForkPair(step, None, root = true)))) - forkFlow.validate() - val newSteps = forkFlow.steps - // Identify the join steps and verify that only one is present - val joinSteps = newSteps.filter(_.`type`.getOrElse("").toLowerCase == PipelineStepType.JOIN) - val newStepLookup = newSteps.foldLeft(Map[String, PipelineStep]())((map, s) => map + (s.id.get -> s)) - // See if the forks should be executed in threads or a loop - val forkByValues = parameterValues("forkByValues").asInstanceOf[List[Any]] - val results = if (parameterValues("forkMethod").asInstanceOf[String] == "parallel") { - processForkStepsParallel(forkByValues, firstStep, step.id.get, pipeline, newStepLookup, pipelineContext) - } else { // "serial" - processForkStepsSerial(forkByValues, firstStep, step.id.get, pipeline, newStepLookup, pipelineContext) - } - // Gather the results and create a list - val finalResult = results.sortBy(_.index).foldLeft(ForkStepExecutionResult(-1, Some(pipelineContext), None))((combinedResult, result) => { - if (result.result.isDefined) { - val ctx = result.result.get - mergeMessages(combinedResult.result.get, ctx.getStepMessages.get, result.index) - combinedResult.copy(result = Some(mergeResponses(combinedResult.result.get, ctx, pipeline.id.getOrElse(""), newSteps, result.index))) - } else if (result.error.isDefined) { - if (combinedResult.error.isDefined) { - combinedResult.copy(error = Some(combinedResult.error.get.asInstanceOf[ForkedPipelineStepException].addException(result.error.get, result.index))) - } else { - combinedResult.copy(error = - Some(ForkedPipelineStepException(message = Some("One or more errors has occurred while processing fork step:\n"), - exceptions = Map(result.index -> result.error.get)))) - } - } else { combinedResult } - }) - if (finalResult.error.isDefined) { - throw finalResult.error.get - } else { - val pair = forkFlow.forkPairs.find(p => p.forkStep.id.getOrElse("N0R00TID") == step.id.getOrElse("N0ID")) - ForkStepResult(if (joinSteps.nonEmpty) { - if (pair.isDefined && pair.get.joinStep.isDefined) { - if (steps.contains(pair.get.joinStep.get.nextStepId.getOrElse("N0R00TID")) && - steps(pair.get.joinStep.get.nextStepId.getOrElse("N0R00TID")).`type`.getOrElse("") == PipelineStepType.JOIN) { - steps(pair.get.joinStep.get.nextStepId.getOrElse("N0R00TID")).nextStepId - } else { pair.get.joinStep.get.nextStepId } - } else { - joinSteps.head.nextStepId - } - } else { - None - }, finalResult.result.get) - } - } - - /** - * Merges any messages into the provided PipelineContext. Each message will be converted to a ForkedPipelineStepMessage - * to allow tracking of the execution id. - * - * @param pipelineContext The PipelineContext to merge the messages into - * @param messages A list of messages to merge - * @param executionId The execution id to attach to each message - */ - private def mergeMessages(pipelineContext: PipelineContext, messages: List[PipelineStepMessage], executionId: Int): Unit = { - messages.foreach(message => - pipelineContext.addStepMessage(ForkedPipelineStepMessage(message.message, message.stepId, message.pipelineId, message.messageType, Some(executionId))) - ) - } - - /** - * Iterates the list of fork steps merging the results into the provided PipelineContext. Results will be stored as - * Options in a list. If this execution does not have a result, then None will be stored in it's place. Secondary - * response maps fill have the values stored as a list as well. - * - * @param pipelineContext The context to write the results. - * @param source The source context to retrieve the execution results - * @param pipelineId The pipeline id that is used to run these steps. - * @param forkSteps A list of steps that were used during the fork porcessing - * @param executionId The execution id of this process. This will be used as a position for result storage in the list. - * @return A PipelineContext with the merged results. - */ - private def mergeResponses(pipelineContext: PipelineContext, source: PipelineContext, pipelineId: String, - forkSteps: List[PipelineStep], executionId: Int): PipelineContext = { - val sourceParameter = source.parameters.getParametersByPipelineId(pipelineId) - val sourceParameters = sourceParameter.get.parameters - val mergeAuditCtx = pipelineContext.copy(rootAudit = pipelineContext.rootAudit.merge(source.rootAudit)) - forkSteps.foldLeft(mergeAuditCtx)((ctx, step) => { - val rootParameter = ctx.parameters.getParametersByPipelineId(pipelineId) - val parameters = if (rootParameter.isEmpty) { - Map[String, Any]() - } else { - rootParameter.get.parameters - } - // Get the root step response - val response = if (parameters.contains(step.id.getOrElse(""))) { - val r = parameters(step.id.getOrElse("")).asInstanceOf[PipelineStepResponse] - if (r.primaryReturn.isDefined && r.primaryReturn.get.isInstanceOf[List[_]]) { - r - } else { - PipelineStepResponse(Some(List[Any]()), r.namedReturns) - } - } else { - PipelineStepResponse(Some(List[Any]()), Some(Map[String, Any]())) - } - // Get the source response - val updatedResponse = if (sourceParameters.contains(step.id.getOrElse(""))) { - val r = sourceParameters(step.id.getOrElse("")) - val stepResponse = r match { - case a: PipelineStepResponse => a - case option: Option[_] if option.isDefined && option.get.isInstanceOf[PipelineStepResponse] => option.get.asInstanceOf[PipelineStepResponse] - case option: Option[_] if option.isDefined => PipelineStepResponse(option, None) - case any => PipelineStepResponse(Some(any), None) - } - // Merge the primary response with the root - val primaryList = response.primaryReturn.get.asInstanceOf[List[Option[_]]] - // See if the list needs to be filled in - val responseList = appendForkedResponseToList(primaryList, stepResponse.primaryReturn, executionId) - val rootNamedReturns = response.namedReturns.getOrElse(Map[String, Any]()) - val sourceNamedReturns = stepResponse.namedReturns.getOrElse(Map[String, Any]()) - val mergedSecondaryReturns = mergeSecondaryReturns(rootNamedReturns, sourceNamedReturns, executionId) - // Append this response to the list and update the PipelineStepResponse - PipelineStepResponse(Some(responseList), Some(mergedSecondaryReturns)) - } else { - response - } - ctx.setParameterByPipelineId(pipelineId, step.id.getOrElse(""), updatedResponse) - }) - } - - /** - * Appends the provided value to the list at the correct index based on the executionId. - * @param list the list to append the value - * @param executionId The execution id about to be appended - * @return A list with any missing elements populated with None and the provided element appended. - */ - private def appendForkedResponseToList(list: List[Option[_]], value: Option[Any], executionId: Int): List[Option[_]] = { - val updateList = if (list.length < executionId) { - list ::: List.fill(executionId - list.length)(None) - } else { - list - } - updateList :+ value - } - - /** - * Merges the values in the sourceNamedReturns into the elements in the rootNamedReturns - * @param rootNamedReturns The base map to merge into - * @param sourceNamedReturns The source map containing the values - * @param executionId The executionId used for list positioning. - * @return A map containing the values of the source merged into the root. - */ - private def mergeSecondaryReturns(rootNamedReturns: Map[String, Any], - sourceNamedReturns: Map[String, Any], - executionId: Int): Map[String, Any] = { - val keys = rootNamedReturns.keySet ++ sourceNamedReturns.keySet - keys.foldLeft(rootNamedReturns)((map, key) => { - map + (key -> appendForkedResponseToList( - rootNamedReturns.getOrElse(key, List[Option[_]]()) match { - case list: List[Option[_]] => list - case option: Option[_] => List(option) - case any => List(Some(any)) - }, - sourceNamedReturns.getOrElse(key, None) match { - case option: Option[_] => option - case any: Any => Some(any) - }, executionId)) - }) - } - - /** - * Processes a set of forked steps in serial. All values will be processed regardless of individual failures. - * @param forkByValues The values to fork - * @param firstStep The first step to process - * @param forkStepId The id of the fork step used to store this value - * @param pipeline The pipeline being processed/ - * @param steps The step lookup for the forked steps. - * @param pipelineContext The pipeline context to clone while processing. - * @return A list of execution results. - */ - private def processForkStepsSerial(forkByValues: Seq[Any], - firstStep: PipelineStep, - forkStepId: String, - pipeline: Pipeline, - steps: Map[String, PipelineStep], - pipelineContext: PipelineContext): List[ForkStepExecutionResult] = { - forkByValues.zipWithIndex.map(value => { - startForkedStepExecution(firstStep, forkStepId, pipeline, steps, pipelineContext, value) - }).toList - } - - /** - * Processes a set of forked steps in parallel. All values will be processed regardless of individual failures. - * @param forkByValues The values to fork - * @param firstStep The first step to process - * @param forkStepId The id of the fork step used to store this value - * @param pipeline The pipeline being processed/ - * @param steps The step lookup for the forked steps. - * @param pipelineContext The pipeline context to clone while processing. - * @return A list of execution results. - */ - private def processForkStepsParallel(forkByValues: Seq[Any], - firstStep: PipelineStep, - forkStepId: String, - pipeline: Pipeline, - steps: Map[String, PipelineStep], - pipelineContext: PipelineContext): List[ForkStepExecutionResult] = { - val futures = forkByValues.zipWithIndex.map(value => { - Future { - startForkedStepExecution(firstStep, forkStepId, pipeline, steps, pipelineContext, value) - } - }) - // Wait for all futures to complete - Await.ready(Future.sequence(futures), Duration.Inf) - // Iterate the futures an extract the result - futures.map(_.value.get.get).toList - } - - private def startForkedStepExecution(firstStep: PipelineStep, - forkStepId: String, - pipeline: Pipeline, - steps: Map[String, PipelineStep], - pipelineContext: PipelineContext, value: (Any, Int)) = { - try { - ForkStepExecutionResult(value._2, - Some(executeStep(firstStep, pipeline, steps, - createForkPipelineContext(pipelineContext, value._2, firstStep) - .setParameterByPipelineId(pipeline.id.get, - forkStepId, PipelineStepResponse(Some(value._1), None)))), None) - } catch { - case t: Throwable => ForkStepExecutionResult(value._2, None, Some(t)) - } - } - - /** - * This function will create a new PipelineContext from the provided that includes new StepMessages - * - * @param pipelineContext The PipelineContext to be cloned. - * @param groupId The id of the fork process - * @return A cloned PipelineContext - */ - private def createForkPipelineContext(pipelineContext: PipelineContext, groupId: Int, firstStep: PipelineStep): PipelineContext = { - pipelineContext.copy(stepMessages = - Some(pipelineContext.sparkSession.get.sparkContext.collectionAccumulator[PipelineStepMessage]("stepMessages"))) - .setGlobal("groupId", groupId.toString) - .setGlobal("stepId", firstStep.id) - .setStepAudit(pipelineContext.getGlobalString("pipelineId").get, - ExecutionAudit(firstStep.id.get, AuditType.STEP, Map[String, Any](), System.currentTimeMillis(), None, Some(groupId.toString))) - } - - /** - * Returns a list of steps that should be executed as part of the fork step - * @param step The first step in the chain. - * @param steps The full pipeline stepLookup - * @param forkSteps The list used to store the steps - * @return A list of steps that may be executed as part of fork processing. - */ - private def getForkSteps(step: PipelineStep, - pipeline: Pipeline, - steps: Map[String, PipelineStep], - forkSteps: ForkStepFlow): ForkStepFlow = { - val list = step.`type`.getOrElse("").toLowerCase match { - case PipelineStepType.BRANCH => - step.params.get.foldLeft(forkSteps.conditionallyAddStepToList(step))((stepList, param) => { - if (param.`type`.getOrElse("") == "result") { - getForkSteps(steps(param.value.getOrElse("").asInstanceOf[String]), pipeline, steps, stepList) - } else { - stepList - } - }) - case PipelineStepType.JOIN => - val flow = forkSteps.conditionallyAddStepToList(step) - if (flow.remainingUnclosedForks() > 0) { - getForkSteps(steps(step.nextStepId.getOrElse("")), pipeline, steps, flow) - } else { - flow - } - case _ if !steps.contains(step.nextStepId.getOrElse("")) => forkSteps.conditionallyAddStepToList(step) - case _ => getForkSteps(steps(step.nextStepId.getOrElse("")), pipeline, steps, forkSteps.conditionallyAddStepToList(step)) - } - - if (step.nextStepOnError.isDefined) { - getForkSteps(steps(step.nextStepOnError.getOrElse("")), pipeline, steps, list) - } else { - list - } - } -} - -case class StepGroupResult(audit: ExecutionAudit, pipelineStepResponse: PipelineStepResponse, globalUpdates: List[GlobalUpdates]) -case class GlobalUpdates(stepName: String, pipelineId: String, globalName: String, global: Any) -case class ForkStepResult(nextStepId: Option[String], pipelineContext: PipelineContext) -case class ForkStepExecutionResult(index: Int, result: Option[PipelineContext], error: Option[Throwable]) - -case class ForkPair(forkStep: PipelineStep, joinStep: Option[PipelineStep], root: Boolean = false) - -case class ForkStepFlow(steps: List[PipelineStep], - pipeline: Pipeline, - forkPairs: List[ForkPair]) { - /** - * Prevents duplicate steps from being added to the list - * @param step The step to be added - * @return A new list containing the steps - */ - def conditionallyAddStepToList(step: PipelineStep): ForkStepFlow = { - if (this.steps.exists(_.id.getOrElse("") == step.id.getOrElse("NONE"))) { - this - } else { - step.`type`.getOrElse("").toLowerCase match { - case PipelineStepType.FORK => - this.copy(steps = steps :+ step, forkPairs = this.forkPairs :+ ForkPair(step, None)) - case PipelineStepType.JOIN => - val newPairs = this.forkPairs.reverse.map(p => { - if (p.joinStep.isEmpty) { - p.copy(joinStep = Some(step)) - } else { - p - } - }) - this.copy(steps = steps :+ step, forkPairs = newPairs.reverse) - case _ => this.copy(steps = steps :+ step) - } - } - } - - def remainingUnclosedForks(): Int = getUnclosedForkPairs.length - - def validate(): Unit = { - val unclosedPairs = getUnclosedForkPairs - if (this.forkPairs.length > 1 && unclosedPairs.length > 1) { - val msg = s"Fork step(s) (${unclosedPairs.map(_.forkStep.id).mkString(",")}) must be closed by join when embedding other forks!" - throw PipelineException(message = Some(msg), - pipelineProgress = Some(PipelineExecutionInfo(unclosedPairs.head.forkStep.id, pipeline.id))) - } - } - - private def getUnclosedForkPairs: List[ForkPair] = { - val unclosedPairs = this.forkPairs.foldLeft(List[ForkPair]())((list, p) => { - if (p.joinStep.isEmpty) { - list :+ p - } else { - list - } - }) - unclosedPairs - } } diff --git a/metalus-core/src/main/scala/com/acxiom/pipeline/flow/ForkStepFlow.scala b/metalus-core/src/main/scala/com/acxiom/pipeline/flow/ForkStepFlow.scala new file mode 100644 index 00000000..12faba4a --- /dev/null +++ b/metalus-core/src/main/scala/com/acxiom/pipeline/flow/ForkStepFlow.scala @@ -0,0 +1,60 @@ +package com.acxiom.pipeline.flow + +import com.acxiom.pipeline._ + +case class ForkStepFlow(steps: List[PipelineStep], + pipeline: Pipeline, + forkPairs: List[ForkPair]) { + /** + * Prevents duplicate steps from being added to the list + * @param step The step to be added + * @return A new list containing the steps + */ + def conditionallyAddStepToList(step: PipelineStep): ForkStepFlow = { + if (this.steps.exists(_.id.getOrElse("") == step.id.getOrElse("NONE"))) { + this + } else { + step.`type`.getOrElse("").toLowerCase match { + case PipelineStepType.FORK => + this.copy(steps = steps :+ step, forkPairs = this.forkPairs :+ ForkPair(step, None)) + case PipelineStepType.JOIN => + val newPairs = this.forkPairs.reverse.map(p => { + if (p.joinStep.isEmpty) { + p.copy(joinStep = Some(step)) + } else { + p + } + }) + this.copy(steps = steps :+ step, forkPairs = newPairs.reverse) + case _ => this.copy(steps = steps :+ step) + } + } + } + + def remainingUnclosedForks(): Int = getUnclosedForkPairs.length + + def validate(): Unit = { + val unclosedPairs = getUnclosedForkPairs + if (this.forkPairs.length > 1 && unclosedPairs.length > 1) { + val msg = s"Fork step(s) (${unclosedPairs.map(_.forkStep.id).mkString(",")}) must be closed by join when embedding other forks!" + throw PipelineException(message = Some(msg), + pipelineProgress = Some(PipelineExecutionInfo(unclosedPairs.head.forkStep.id, pipeline.id))) + } + } + + private def getUnclosedForkPairs: List[ForkPair] = { + val unclosedPairs = this.forkPairs.foldLeft(List[ForkPair]())((list, p) => { + if (p.joinStep.isEmpty) { + list :+ p + } else { + list + } + }) + unclosedPairs + } +} + +case class ForkStepResult(nextStepId: Option[String], pipelineContext: PipelineContext) +case class ForkStepExecutionResult(index: Int, result: Option[PipelineContext], error: Option[Throwable]) + +case class ForkPair(forkStep: PipelineStep, joinStep: Option[PipelineStep], root: Boolean = false) diff --git a/metalus-core/src/main/scala/com/acxiom/pipeline/flow/PipelineFlow.scala b/metalus-core/src/main/scala/com/acxiom/pipeline/flow/PipelineFlow.scala new file mode 100644 index 00000000..c04f01af --- /dev/null +++ b/metalus-core/src/main/scala/com/acxiom/pipeline/flow/PipelineFlow.scala @@ -0,0 +1,571 @@ +package com.acxiom.pipeline.flow + +import com.acxiom.pipeline._ +import com.acxiom.pipeline.audits.{AuditType, ExecutionAudit} +import com.acxiom.pipeline.utils.ReflectionUtils +import org.apache.log4j.Logger + +import scala.annotation.tailrec +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, Future} + +object PipelineFlow { + private val logger = Logger.getLogger(getClass) + /** + * + * @param pipelineContext The current PipelineContext + * @param funcName The name of the function to invoke on the PipelineListener + * @param params List of parameters to pass to the function + * @return An updated PipelineContext + */ + def handleEvent(pipelineContext: PipelineContext, funcName: String, params: List[Any]): PipelineContext = { + if (pipelineContext.pipelineListener.isDefined) { + val rCtx = ReflectionUtils.executeFunctionByName(pipelineContext.pipelineListener.get, funcName, params).asInstanceOf[Option[PipelineContext]] + if (rCtx.isEmpty) pipelineContext else rCtx.get + } else { pipelineContext } + } + + /** + * + * @param t The exception to process + * @param pipeline The current executing pipeline + * @param pipelineContext The current PipelineContext + * @param pipelines List of pipelines + * @return A PipelineStepException + */ + def handleStepExecutionExceptions(t: Throwable, pipeline: Pipeline, + pipelineContext: PipelineContext, + pipelines: Option[List[Pipeline]] = None): PipelineStepException = { + val ex = t match { + case se: PipelineStepException => se + case t: Throwable => PipelineException(message = Some("An unknown exception has occurred"), cause = t, + pipelineProgress = Some(PipelineExecutionInfo(Some("Unknown"), pipeline.id))) + } + if (pipelineContext.pipelineListener.isDefined) { + pipelineContext.pipelineListener.get.registerStepException(ex, pipelineContext) + if (pipelines.isDefined && pipelines.get.nonEmpty) { + pipelineContext.pipelineListener.get.executionStopped(pipelines.get.slice(0, pipelines.get.indexWhere(pipeline => { + pipeline.id.get == pipeline.id.getOrElse("") + }) + 1), pipelineContext) + } + } + ex + } + + /** + * Updates the PipelineContext globals with the provided "global". + * @param stepName The name of the step that updated the value + * @param pipelineId The id of the pipeline containing the step + * @param context The context to be updated + * @param global The global value to use + * @param keyName The name of the global value + * @return An jupdated PipelineContext + */ + def updateGlobals(stepName: String, pipelineId: String, context: PipelineContext, global: Any, keyName: String): PipelineContext = { + if (context.globals.get.contains(keyName)) { + logger.warn(s"Overwriting global named $keyName with value provided by step $stepName in pipeline $pipelineId") + } else { + logger.info(s"Adding global named $keyName with value provided by step $stepName in pipeline $pipelineId") + } + context.setGlobal(keyName, global) + } +} + +trait PipelineFlow { + private val logger = Logger.getLogger(getClass) + + def pipeline: Pipeline + def initialContext: PipelineContext + def pipelineLookup: Map[String, String] + def executingPipelines: List[Pipeline] + + protected val stepLookup: Map[String, PipelineStep] = PipelineExecutorValidations.validateAndCreateStepLookup(pipeline) + protected val context: PipelineContext = initialContext.setPipelineAudit( + ExecutionAudit(pipeline.id.get, AuditType.PIPELINE, Map[String, Any](), System.currentTimeMillis(), None, None, Some(List[ExecutionAudit]( + ExecutionAudit(pipeline.steps.get.head.id.get, AuditType.STEP, Map[String, Any](), System.currentTimeMillis()))))) + + def execute(): FlowResult = { + val updatedCtx = PipelineFlow.handleEvent(context, "pipelineStarted", List(pipeline, context)) + .setGlobal("pipelineId", pipeline.id) + .setGlobal("stepId", pipeline.steps.get.head.id.get) + try { + val resultPipelineContext = executeStep(pipeline.steps.get.head, pipeline, stepLookup, updatedCtx) + val messages = resultPipelineContext.getStepMessages + processStepMessages(messages, pipelineLookup) + val auditCtx = resultPipelineContext.setPipelineAudit( + resultPipelineContext.getPipelineAudit(pipeline.id.get).get.setEnd(System.currentTimeMillis())) + FlowResult(PipelineFlow.handleEvent(auditCtx, "pipelineFinished", List(pipeline, auditCtx)), None, None) + } catch { + case t: Throwable => throw PipelineFlow.handleStepExecutionExceptions(t, pipeline, context, Some(executingPipelines)) + } + } + + /** + * This function will process step messages and throw any appropriate exceptions + * + * @param messages A list of PipelineStepMessages that need to be processed. + * @param pipelineLookup A map of Pipelines keyed by the id. This is used to quickly retrieve additional Pipeline data. + */ + private def processStepMessages(messages: Option[List[PipelineStepMessage]], pipelineLookup: Map[String, String]): Unit = { + if (messages.isDefined && messages.get.nonEmpty) { + messages.get.foreach(m => m.messageType match { + case PipelineStepMessageType.error => + throw PipelineException(message = Some(m.message), pipelineProgress = Some(PipelineExecutionInfo(Some(m.stepId), Some(m.pipelineId)))) + case PipelineStepMessageType.pause => + throw PauseException(message = Some(m.message), pipelineProgress = Some(PipelineExecutionInfo(Some(m.stepId), Some(m.pipelineId)))) + case PipelineStepMessageType.warn => + logger.warn(s"Step ${m.stepId} in pipeline ${pipelineLookup(m.pipelineId)} issued a warning: ${m.message}") + case _ => + }) + } + } + + @tailrec + protected final def executeStep(step: PipelineStep, pipeline: Pipeline, steps: Map[String, PipelineStep], + pipelineContext: PipelineContext): PipelineContext = { + logger.debug(s"Executing Step (${step.id.getOrElse("")}) ${step.displayName.getOrElse("")}") + val ssContext = PipelineFlow.handleEvent(pipelineContext, "pipelineStepStarted", List(pipeline, step, pipelineContext)) + val (nextStepId, sfContext) = try { + val result = processPipelineStep(step, pipeline, steps, ssContext) + // setup the next step + val nextStepId = getNextStepId(step, result) + val newPipelineContext = result match { + case flowResult: FlowResult => updatePipelineContext(step, result, nextStepId, flowResult.pipelineContext) + case _ => updatePipelineContext(step, result, nextStepId, ssContext) + } + // run the step finished event + val sfContext = PipelineFlow.handleEvent(newPipelineContext, "pipelineStepFinished", List(pipeline, step, newPipelineContext)) + (nextStepId, sfContext) + } catch { + case e: Throwable if step.nextStepOnError.isDefined => + // handle exception + val ex = PipelineFlow.handleStepExecutionExceptions(e, pipeline, pipelineContext) + // put exception on the context as the "result" for this step. + val updateContext = updatePipelineContext(step, PipelineStepResponse(Some(ex), None), step.nextStepOnError, ssContext) + (step.nextStepOnError, updateContext) + case e => throw e + } + // Call the next step here + if (steps.contains(nextStepId.getOrElse("")) && steps(nextStepId.getOrElse("")).`type`.getOrElse("") == PipelineStepType.JOIN) { + sfContext + } else if (steps.contains(nextStepId.getOrElse(""))) { + executeStep(steps(nextStepId.get), pipeline, steps, sfContext) + } else if (nextStepId.isDefined && nextStepId.get.nonEmpty) { + val s = steps.find(_._2.nextStepId.getOrElse("") == nextStepId.getOrElse("FUNKYSTEPID")) + // See if this is a sub-fork + if (s.isDefined && s.get._2.`type`.getOrElse("") == PipelineStepType.JOIN) { + sfContext + } else { + throw PipelineException(message = Some(s"Step Id (${nextStepId.get}) does not exist in pipeline"), + pipelineProgress = Some(PipelineExecutionInfo(nextStepId, Some(sfContext.getGlobalString("pipelineId").getOrElse(""))))) + } + } else { + sfContext + } + } + + private def processPipelineStep(step: PipelineStep, pipeline: Pipeline, steps: Map[String, PipelineStep], + pipelineContext: PipelineContext): Any = { + // Create a map of values for each defined parameter + val parameterValues: Map[String, Any] = pipelineContext.parameterMapper.createStepParameterMap(step, pipelineContext) + (step.executeIfEmpty.getOrElse(""), step.`type`.getOrElse("").toLowerCase) match { + // process step normally if empty + case ("", PipelineStepType.FORK) => processForkStep(step, pipeline, steps, parameterValues, pipelineContext) + case ("", PipelineStepType.STEPGROUP) => + StepGroupFlow(pipeline, pipelineContext, this.pipelineLookup, this.executingPipelines, step, parameterValues).execute() + case ("", _) => ReflectionUtils.processStep(step, pipeline, parameterValues, pipelineContext) + case (value, _) => + logger.debug(s"Evaluating execute if empty: $value") + // wrap the value in a parameter object + val param = Parameter(Some("text"), Some("dynamic"), Some(true), None, Some(value)) + val ret = pipelineContext.parameterMapper.mapParameter(param, pipelineContext) + ret match { + case some: Some[_] => + logger.debug("Returning existing value") + PipelineStepResponse(some, None) + case None => + logger.debug("Executing step normally") + ReflectionUtils.processStep(step, pipeline, parameterValues, pipelineContext) + case _ => + logger.debug("Returning existing value") + PipelineStepResponse(Some(ret), None) + } + } + } + + private def updatePipelineContext(step: PipelineStep, result: Any, nextStepId: Option[String], pipelineContext: PipelineContext): PipelineContext = { + val pipelineProgress = pipelineContext.getPipelineExecutionInfo + val pipelineId = pipelineProgress.pipelineId.getOrElse("") + val groupId = pipelineProgress.groupId + val ctx = result match { + case forkResult: ForkStepResult => forkResult.pipelineContext + case flowResult: FlowResult => flowResult.pipelineContext + case _ => + processResponseGlobals(step, result, pipelineId, pipelineContext) + .setParameterByPipelineId(pipelineId, step.id.getOrElse(""), result) + .setGlobal("pipelineId", pipelineId) + .setGlobal("lastStepId", step.id.getOrElse("")) + .setGlobal("stepId", nextStepId) + } + + val updateCtx = if (nextStepId.isDefined) { + val metrics = if (ctx.sparkSession.isDefined) { + val executorStatus = ctx.sparkSession.get.sparkContext.getExecutorMemoryStatus + Map[String, Any]("startExecutorCount" -> executorStatus.size) + } else { Map[String, Any]() } + ctx.setStepAudit(pipelineId, + ExecutionAudit(nextStepId.get, AuditType.STEP, metrics, System.currentTimeMillis(), None, groupId)) + } else { + ctx + } + val audit = if (updateCtx.sparkSession.isDefined) { + val executorStatus = updateCtx.sparkSession.get.sparkContext.getExecutorMemoryStatus + updateCtx.getStepAudit(pipelineId, step.id.get, groupId).get.setEnd(System.currentTimeMillis()) + .setMetric("endExecutorCount", executorStatus.size) + } else { + updateCtx.getStepAudit(pipelineId, step.id.get, groupId).get.setEnd(System.currentTimeMillis()) + } + updateCtx.setStepAudit(pipelineId, audit) + } + + private def getNextStepId(step: PipelineStep, result: Any): Option[String] = { + step.`type`.getOrElse("").toLowerCase match { + case PipelineStepType.BRANCH => + // match the result against the step parameter name until we find a match + val matchValue = result match { + case response: PipelineStepResponse => response.primaryReturn.getOrElse("").toString + case _ => result + } + val matchedParameter = step.params.get.find(p => p.name.get == matchValue.toString) + // Use the value of the matched parameter as the next step id + if (matchedParameter.isDefined) { + Some(matchedParameter.get.value.get.asInstanceOf[String]) + } else { + None + } + case PipelineStepType.FORK => result.asInstanceOf[ForkStepResult].nextStepId + case _ => step.nextStepId + } + } + + private def processResponseGlobals(step: PipelineStep, result: Any, pipelineId: String, updatedCtx: PipelineContext) = { + result match { + case response: PipelineStepResponse if response.namedReturns.isDefined && response.namedReturns.get.nonEmpty => + response.namedReturns.get.foldLeft(updatedCtx)((context, entry) => { + entry._1 match { + case e if e.startsWith("$globals.") => + val keyName = entry._1.substring(Constants.NINE) + PipelineFlow.updateGlobals(step.displayName.get, pipelineId, context, entry._2, keyName) + case e if e.startsWith("$metrics.") => + val keyName = entry._1.substring(Constants.NINE) + context.setStepMetric(pipelineId, step.id.getOrElse(""), None, keyName, entry._2) + case _ => context + } + }) + case _ => updatedCtx + } + } + + /** + * Special handling of fork steps. + * + * @param step The fork step + * @param pipeline The pipeline being executed + * @param steps The step lookup + * @param parameterValues The parameterValues for this step + * @param pipelineContext The current pipeline context + * @return The result of processing the forked steps. + */ + private def processForkStep(step: PipelineStep, pipeline: Pipeline, steps: Map[String, PipelineStep], + parameterValues: Map[String, Any], pipelineContext: PipelineContext): ForkStepResult = { + val firstStep = steps(step.nextStepId.getOrElse("")) + // Create the list of steps that need to be executed starting with the "nextStepId" + val forkFlow = getForkSteps(firstStep, pipeline, steps, + ForkStepFlow(List(), pipeline, List[ForkPair](ForkPair(step, None, root = true)))) + forkFlow.validate() + val newSteps = forkFlow.steps + // Identify the join steps and verify that only one is present + val joinSteps = newSteps.filter(_.`type`.getOrElse("").toLowerCase == PipelineStepType.JOIN) + val newStepLookup = newSteps.foldLeft(Map[String, PipelineStep]())((map, s) => map + (s.id.get -> s)) + // See if the forks should be executed in threads or a loop + val forkByValues = parameterValues("forkByValues").asInstanceOf[List[Any]] + val results = if (parameterValues("forkMethod").asInstanceOf[String] == "parallel") { + processForkStepsParallel(forkByValues, firstStep, step.id.get, pipeline, newStepLookup, pipelineContext) + } else { // "serial" + processForkStepsSerial(forkByValues, firstStep, step.id.get, pipeline, newStepLookup, pipelineContext) + } + // Gather the results and create a list + val finalResult = results.sortBy(_.index).foldLeft(ForkStepExecutionResult(-1, Some(pipelineContext), None))((combinedResult, result) => { + if (result.result.isDefined) { + val ctx = result.result.get + mergeMessages(combinedResult.result.get, ctx.getStepMessages.get, result.index) + combinedResult.copy(result = Some(mergeResponses(combinedResult.result.get, ctx, pipeline.id.getOrElse(""), newSteps, result.index))) + } else if (result.error.isDefined) { + if (combinedResult.error.isDefined) { + combinedResult.copy(error = Some(combinedResult.error.get.asInstanceOf[ForkedPipelineStepException].addException(result.error.get, result.index))) + } else { + combinedResult.copy(error = + Some(ForkedPipelineStepException(message = Some("One or more errors has occurred while processing fork step:\n"), + exceptions = Map(result.index -> result.error.get)))) + } + } else { combinedResult } + }) + if (finalResult.error.isDefined) { + throw finalResult.error.get + } else { + val pair = forkFlow.forkPairs.find(p => p.forkStep.id.getOrElse("N0R00TID") == step.id.getOrElse("N0ID")) + ForkStepResult(if (joinSteps.nonEmpty) { + if (pair.isDefined && pair.get.joinStep.isDefined) { + if (steps.contains(pair.get.joinStep.get.nextStepId.getOrElse("N0R00TID")) && + steps(pair.get.joinStep.get.nextStepId.getOrElse("N0R00TID")).`type`.getOrElse("") == PipelineStepType.JOIN) { + steps(pair.get.joinStep.get.nextStepId.getOrElse("N0R00TID")).nextStepId + } else { pair.get.joinStep.get.nextStepId } + } else { + joinSteps.head.nextStepId + } + } else { + None + }, finalResult.result.get) + } + } + + /** + * Merges any messages into the provided PipelineContext. Each message will be converted to a ForkedPipelineStepMessage + * to allow tracking of the execution id. + * + * @param pipelineContext The PipelineContext to merge the messages into + * @param messages A list of messages to merge + * @param executionId The execution id to attach to each message + */ + private def mergeMessages(pipelineContext: PipelineContext, messages: List[PipelineStepMessage], executionId: Int): Unit = { + messages.foreach(message => + pipelineContext.addStepMessage(ForkedPipelineStepMessage(message.message, message.stepId, message.pipelineId, message.messageType, Some(executionId))) + ) + } + + /** + * Iterates the list of fork steps merging the results into the provided PipelineContext. Results will be stored as + * Options in a list. If this execution does not have a result, then None will be stored in it's place. Secondary + * response maps fill have the values stored as a list as well. + * + * @param pipelineContext The context to write the results. + * @param source The source context to retrieve the execution results + * @param pipelineId The pipeline id that is used to run these steps. + * @param forkSteps A list of steps that were used during the fork porcessing + * @param executionId The execution id of this process. This will be used as a position for result storage in the list. + * @return A PipelineContext with the merged results. + */ + private def mergeResponses(pipelineContext: PipelineContext, source: PipelineContext, pipelineId: String, + forkSteps: List[PipelineStep], executionId: Int): PipelineContext = { + val sourceParameter = source.parameters.getParametersByPipelineId(pipelineId) + val sourceParameters = sourceParameter.get.parameters + val mergeAuditCtx = pipelineContext.copy(rootAudit = pipelineContext.rootAudit.merge(source.rootAudit)) + forkSteps.foldLeft(mergeAuditCtx)((ctx, step) => { + val rootParameter = ctx.parameters.getParametersByPipelineId(pipelineId) + val parameters = if (rootParameter.isEmpty) { + Map[String, Any]() + } else { + rootParameter.get.parameters + } + // Get the root step response + val response = if (parameters.contains(step.id.getOrElse(""))) { + val r = parameters(step.id.getOrElse("")).asInstanceOf[PipelineStepResponse] + if (r.primaryReturn.isDefined && r.primaryReturn.get.isInstanceOf[List[_]]) { + r + } else { + PipelineStepResponse(Some(List[Any]()), r.namedReturns) + } + } else { + PipelineStepResponse(Some(List[Any]()), Some(Map[String, Any]())) + } + // Get the source response + val updatedResponse = if (sourceParameters.contains(step.id.getOrElse(""))) { + val r = sourceParameters(step.id.getOrElse("")) + val stepResponse = r match { + case a: PipelineStepResponse => a + case option: Option[_] if option.isDefined && option.get.isInstanceOf[PipelineStepResponse] => option.get.asInstanceOf[PipelineStepResponse] + case option: Option[_] if option.isDefined => PipelineStepResponse(option, None) + case any => PipelineStepResponse(Some(any), None) + } + // Merge the primary response with the root + val primaryList = response.primaryReturn.get.asInstanceOf[List[Option[_]]] + // See if the list needs to be filled in + val responseList = appendForkedResponseToList(primaryList, stepResponse.primaryReturn, executionId) + val rootNamedReturns = response.namedReturns.getOrElse(Map[String, Any]()) + val sourceNamedReturns = stepResponse.namedReturns.getOrElse(Map[String, Any]()) + val mergedSecondaryReturns = mergeSecondaryReturns(rootNamedReturns, sourceNamedReturns, executionId) + // Append this response to the list and update the PipelineStepResponse + PipelineStepResponse(Some(responseList), Some(mergedSecondaryReturns)) + } else { + response + } + ctx.setParameterByPipelineId(pipelineId, step.id.getOrElse(""), updatedResponse) + }) + } + + /** + * Merges the values in the sourceNamedReturns into the elements in the rootNamedReturns + * @param rootNamedReturns The base map to merge into + * @param sourceNamedReturns The source map containing the values + * @param executionId The executionId used for list positioning. + * @return A map containing the values of the source merged into the root. + */ + private def mergeSecondaryReturns(rootNamedReturns: Map[String, Any], + sourceNamedReturns: Map[String, Any], + executionId: Int): Map[String, Any] = { + val keys = rootNamedReturns.keySet ++ sourceNamedReturns.keySet + keys.foldLeft(rootNamedReturns)((map, key) => { + map + (key -> appendForkedResponseToList( + rootNamedReturns.getOrElse(key, List[Option[_]]()) match { + case list: List[Option[_]] => list + case option: Option[_] => List(option) + case any => List(Some(any)) + }, + sourceNamedReturns.getOrElse(key, None) match { + case option: Option[_] => option + case any: Any => Some(any) + }, executionId)) + }) + } + + /** + * Appends the provided value to the list at the correct index based on the executionId. + * @param list the list to append the value + * @param executionId The execution id about to be appended + * @return A list with any missing elements populated with None and the provided element appended. + */ + private def appendForkedResponseToList(list: List[Option[_]], value: Option[Any], executionId: Int): List[Option[_]] = { + val updateList = if (list.length < executionId) { + list ::: List.fill(executionId - list.length)(None) + } else { + list + } + updateList :+ value + } + + /** + * Processes a set of forked steps in serial. All values will be processed regardless of individual failures. + * @param forkByValues The values to fork + * @param firstStep The first step to process + * @param forkStepId The id of the fork step used to store this value + * @param pipeline The pipeline being processed/ + * @param steps The step lookup for the forked steps. + * @param pipelineContext The pipeline context to clone while processing. + * @return A list of execution results. + */ + private def processForkStepsSerial(forkByValues: Seq[Any], + firstStep: PipelineStep, + forkStepId: String, + pipeline: Pipeline, + steps: Map[String, PipelineStep], + pipelineContext: PipelineContext): List[ForkStepExecutionResult] = { + forkByValues.zipWithIndex.map(value => { + startForkedStepExecution(firstStep, forkStepId, pipeline, steps, pipelineContext, value) + }).toList + } + + /** + * Processes a set of forked steps in parallel. All values will be processed regardless of individual failures. + * @param forkByValues The values to fork + * @param firstStep The first step to process + * @param forkStepId The id of the fork step used to store this value + * @param pipeline The pipeline being processed/ + * @param steps The step lookup for the forked steps. + * @param pipelineContext The pipeline context to clone while processing. + * @return A list of execution results. + */ + private def processForkStepsParallel(forkByValues: Seq[Any], + firstStep: PipelineStep, + forkStepId: String, + pipeline: Pipeline, + steps: Map[String, PipelineStep], + pipelineContext: PipelineContext): List[ForkStepExecutionResult] = { + val futures = forkByValues.zipWithIndex.map(value => { + Future { + startForkedStepExecution(firstStep, forkStepId, pipeline, steps, pipelineContext, value) + } + }) + // Wait for all futures to complete + Await.ready(Future.sequence(futures), Duration.Inf) + // Iterate the futures an extract the result + futures.map(_.value.get.get).toList + } + + private def startForkedStepExecution(firstStep: PipelineStep, + forkStepId: String, + pipeline: Pipeline, + steps: Map[String, PipelineStep], + pipelineContext: PipelineContext, value: (Any, Int)) = { + try { + ForkStepExecutionResult(value._2, + Some(executeStep(firstStep, pipeline, steps, + createForkPipelineContext(pipelineContext, value._2, firstStep) + .setParameterByPipelineId(pipeline.id.get, + forkStepId, PipelineStepResponse(Some(value._1), None)))), None) + } catch { + case t: Throwable => ForkStepExecutionResult(value._2, None, Some(t)) + } + } + + /** + * This function will create a new PipelineContext from the provided that includes new StepMessages + * + * @param pipelineContext The PipelineContext to be cloned. + * @param groupId The id of the fork process + * @return A cloned PipelineContext + */ + private def createForkPipelineContext(pipelineContext: PipelineContext, groupId: Int, firstStep: PipelineStep): PipelineContext = { + pipelineContext.copy(stepMessages = + Some(pipelineContext.sparkSession.get.sparkContext.collectionAccumulator[PipelineStepMessage]("stepMessages"))) + .setGlobal("groupId", groupId.toString) + .setGlobal("stepId", firstStep.id) + .setStepAudit(pipelineContext.getGlobalString("pipelineId").get, + ExecutionAudit(firstStep.id.get, AuditType.STEP, Map[String, Any](), System.currentTimeMillis(), None, Some(groupId.toString))) + } + + /** + * Returns a list of steps that should be executed as part of the fork step + * @param step The first step in the chain. + * @param steps The full pipeline stepLookup + * @param forkSteps The list used to store the steps + * @return A list of steps that may be executed as part of fork processing. + */ + private def getForkSteps(step: PipelineStep, + pipeline: Pipeline, + steps: Map[String, PipelineStep], + forkSteps: ForkStepFlow): ForkStepFlow = { + val list = step.`type`.getOrElse("").toLowerCase match { + case PipelineStepType.BRANCH => + step.params.get.foldLeft(forkSteps.conditionallyAddStepToList(step))((stepList, param) => { + if (param.`type`.getOrElse("") == "result") { + getForkSteps(steps(param.value.getOrElse("").asInstanceOf[String]), pipeline, steps, stepList) + } else { + stepList + } + }) + case PipelineStepType.JOIN => + val flow = forkSteps.conditionallyAddStepToList(step) + if (flow.remainingUnclosedForks() > 0) { + getForkSteps(steps(step.nextStepId.getOrElse("")), pipeline, steps, flow) + } else { + flow + } + case _ if !steps.contains(step.nextStepId.getOrElse("")) => forkSteps.conditionallyAddStepToList(step) + case _ => getForkSteps(steps(step.nextStepId.getOrElse("")), pipeline, steps, forkSteps.conditionallyAddStepToList(step)) + } + + if (step.nextStepOnError.isDefined) { + getForkSteps(steps(step.nextStepOnError.getOrElse("")), pipeline, steps, list) + } else { + list + } + } +} + +case class FlowResult(pipelineContext: PipelineContext, nextStepId: Option[String], result: Option[Any]) + +case class PipelineStepFlow(pipeline: Pipeline, + initialContext: PipelineContext, + pipelineLookup: Map[String, String], + executingPipelines: List[Pipeline]) extends PipelineFlow diff --git a/metalus-core/src/main/scala/com/acxiom/pipeline/flow/StepGroupFlow.scala b/metalus-core/src/main/scala/com/acxiom/pipeline/flow/StepGroupFlow.scala new file mode 100644 index 00000000..a4e97b04 --- /dev/null +++ b/metalus-core/src/main/scala/com/acxiom/pipeline/flow/StepGroupFlow.scala @@ -0,0 +1,123 @@ +package com.acxiom.pipeline.flow + +import com.acxiom.pipeline._ +import com.acxiom.pipeline.audits.{AuditType, ExecutionAudit} + +import scala.runtime.BoxedUnit + +case class StepGroupFlow(pipeline: Pipeline, + initialContext: PipelineContext, + pipelineLookup: Map[String, String], + executingPipelines: List[Pipeline], + step: PipelineStep, + parameterValues: Map[String, Any]) extends PipelineFlow { + override def execute(): FlowResult = { + val groupResult = processStepGroup(step, pipeline, parameterValues, initialContext) + val pipelineProgress = initialContext.getPipelineExecutionInfo + val pipelineId = pipelineProgress.pipelineId.getOrElse("") + val updatedCtx = initialContext.setStepAudit(pipelineId, groupResult.audit) + .setParameterByPipelineId(pipelineId, step.id.getOrElse(""), groupResult.pipelineStepResponse) + .setGlobal("pipelineId", pipelineId) + .setGlobal("lastStepId", step.id.getOrElse("")) + .setGlobal("stepId", step.nextStepId) + val finalCtx = if (groupResult.globalUpdates.nonEmpty) { + groupResult.globalUpdates.foldLeft(updatedCtx)((ctx, update) => { + PipelineFlow.updateGlobals(update.stepName, update.pipelineId, ctx, update.global, update.globalName) + }) + } else { + updatedCtx + } + FlowResult(finalCtx, step.nextStepId, Some(groupResult)) + } + + private def processStepGroup(step: PipelineStep, pipeline: Pipeline, parameterValues: Map[String, Any], + pipelineContext: PipelineContext): StepGroupResult = { + val subPipeline =if (parameterValues.contains("pipelineId")) { + pipelineContext.pipelineManager.getPipeline(parameterValues("pipelineId").toString) + .getOrElse(throw PipelineException(message = Some(s"Unable to retrieve required step group id ${parameterValues("pipelineId")}"), + pipelineProgress = Some(PipelineExecutionInfo(step.id, pipeline.id)))) + } else { parameterValues("pipeline").asInstanceOf[Pipeline] } + val firstStep = subPipeline.steps.get.head + val stepLookup = PipelineExecutorValidations.validateAndCreateStepLookup(subPipeline) + val pipelineId = pipeline.id.getOrElse("") + val stepId = step.id.getOrElse("") + val groupId = pipelineContext.getGlobalString("groupId") + val stepAudit = ExecutionAudit(firstStep.id.getOrElse(""), AuditType.STEP, Map[String, Any](), + System.currentTimeMillis(), groupId = Some(s"$pipelineId::$stepId")) + val pipelineAudit = ExecutionAudit(subPipeline.id.getOrElse(""), AuditType.PIPELINE, Map[String, Any](), + System.currentTimeMillis(), None, None, Some(List(stepAudit))) + // Inject the mappings into the globals object of the PipelineContext + val ctx = (if (parameterValues.getOrElse("useParentGlobals", false).asInstanceOf[Boolean]) { + pipelineContext.copy(globals = + Some(pipelineContext.globals.get ++ + parameterValues.getOrElse("pipelineMappings", Map[String, Any]()).asInstanceOf[Map[String, Any]])) + } else { + pipelineContext.copy(globals = Some(parameterValues.getOrElse("pipelineMappings", Map[String, Any]()).asInstanceOf[Map[String, Any]])) + }).setGlobal("pipelineId", subPipeline.id.getOrElse("")) + .setGlobal("stepId", firstStep.id.getOrElse("")) + .setGlobal("groupId", s"$pipelineId::$stepId") + .setRootAudit(pipelineContext.getStepAudit(pipelineId, stepId, groupId).get.setChildAudit(pipelineAudit)) + .copy(parameters = PipelineParameters(List(PipelineParameter(subPipeline.id.getOrElse(""), Map[String, Any]())))) + val resultCtx = executeStep(firstStep, subPipeline, stepLookup, ctx) + val pipelineParams = resultCtx.parameters.getParametersByPipelineId(subPipeline.id.getOrElse("")) + val response = extractStepGroupResponse(step, subPipeline, pipelineParams, resultCtx) + val updates = subPipeline.steps.get + .filter { step => + pipelineParams.isDefined && pipelineParams.get.parameters.get(step.id.getOrElse("")) + .exists(r => r.isInstanceOf[PipelineStepResponse] && r.asInstanceOf[PipelineStepResponse].namedReturns.isDefined) + }.foldLeft(List[GlobalUpdates]())((updates, step) => { + val updateList = pipelineParams.get.parameters(step.id.getOrElse("")).asInstanceOf[PipelineStepResponse] + .namedReturns.get.foldLeft(List[GlobalUpdates]())((list, entry) => { + if (entry._1.startsWith("$globals.")) { + list :+ GlobalUpdates(step.displayName.get, subPipeline.id.get, entry._1.substring(Constants.NINE), entry._2) + } else { list } + }) + updates ++ updateList + }) + StepGroupResult(resultCtx.rootAudit, response, updates) + } + + /** + * This function is responsible for creating the PipelineStepResponse for a step group + * + * @param step The step group step + * @param stepGroup The step group pipeline + * @param pipelineParameter The pipeline parameter for the step group pipeline + * @param pipelineContext The PipelineContext result from the execution of the Step Group Pipeline + * @return A PipelineStepResponse + */ + private def extractStepGroupResponse(step: PipelineStep, + stepGroup: Pipeline, + pipelineParameter: Option[PipelineParameter], + pipelineContext: PipelineContext) = { + if (pipelineParameter.isDefined) { + val resultParam = step.params.get.find(_.`type`.getOrElse("text") == "result") + val resultMappingParam = if (resultParam.isDefined) { + resultParam + } else if (stepGroup.stepGroupResult.isDefined) { + Some(Parameter(Some("result"), Some("output"), None, None, stepGroup.stepGroupResult)) + } else { + None + } + val stepResponseMap = Some(stepGroup.steps.get.map { step => + step.id.getOrElse("") -> pipelineParameter.get.parameters.get(step.id.getOrElse("")).map(_.asInstanceOf[PipelineStepResponse]) + }.toMap.collect { case (k, v: Some[_]) => k -> v.get }) + if (resultMappingParam.isDefined) { + val mappedValue = pipelineContext.parameterMapper.mapParameter(resultMappingParam.get, pipelineContext) match { + case value: Option[_] => value + case _: BoxedUnit => None + case response => Some(response) + } + PipelineStepResponse(mappedValue, stepResponseMap) + } else { + PipelineStepResponse(stepResponseMap, None) + } + } else { + PipelineStepResponse(None, None) + } + } + +} + +case class StepGroupResult(audit: ExecutionAudit, pipelineStepResponse: PipelineStepResponse, globalUpdates: List[GlobalUpdates]) +case class GlobalUpdates(stepName: String, pipelineId: String, globalName: String, global: Any)