Skip to content

Commit

Permalink
address PR comments and add advance and clean up dag tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
Meeth Gala committed Sep 18, 2023
1 parent 0026724 commit cffd894
Show file tree
Hide file tree
Showing 22 changed files with 649 additions and 264 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.service.modules.orchestration;

import java.io.IOException;

import org.apache.gobblin.annotation.Alpha;


/**
* An implementation of {@link DagTask} that is responsible for advancing the dag to the next node based
* on its current flow and job status. It is added to the {@link DagTaskStream} by the
* {@link org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor} after it consumes the appropriate
* {@link org.apache.gobblin.metrics.GobblinTrackingEvent} for the {@link org.apache.gobblin.service.modules.flowgraph.Dag}
*/

@Alpha
public class AdvanceDagTask extends DagTask {

protected DagManager.DagId advanceDagId;

@Override
void initialize(Object state, long triggerTimeStamp) {

}

@Override
AdvanceDagProc host(DagTaskVisitor visitor) throws IOException, InstantiationException, IllegalAccessException {
return (AdvanceDagProc) visitor.meet(this);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.service.modules.orchestration;

import java.io.IOException;
import java.util.LinkedList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import com.google.common.base.Optional;

import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
import org.apache.gobblin.service.FlowId;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.service.monitoring.JobStatus;


/**
* An implementation of {@link DagProc} that is responsible for clean up {@link Dag} that has been completed
* or has reached an end state likewise: FAILED, COMPLETE or CANCELED
*
*/
@Slf4j
@Alpha
public class CleanUpDagProc extends DagProc {

private MultiActiveLeaseArbiter multiActiveLeaseArbiter;

private DagManagementStateStore dagManagementStateStore;

private MetricContext metricContext;

private Optional<EventSubmitter> eventSubmitter;

private DagManagerMetrics dagManagerMetrics;

private DagStateStore dagStateStore;

private DagStateStore failedDagStateStore;

private DagTaskStream dagTaskStream;

private static final long DAG_FLOW_STATUS_TOLERANCE_TIME_MILLIS = TimeUnit.MINUTES.toMillis(5);

//TODO: instantiate an object of this class

@Override
protected Object initialize() throws MaybeRetryableException, IOException {
String dagIdToClean = ""; //TODO: implement this dagID
if(!this.dagManagementStateStore.hasRunningJobs(dagIdToClean)) {
Dag<JobExecutionPlan> dag = this.dagManagementStateStore.getDagIdToDags().get(dagIdToClean);
return dag;
}
return null;
}

@Override
protected Object act(Object state) throws ExecutionException, InterruptedException, IOException {
Dag<JobExecutionPlan> dag = (Dag<JobExecutionPlan>) state;
DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
LinkedList<Dag.DagNode<JobExecutionPlan>> dagNodeList = this.dagManagementStateStore.getDagToJobs().get(dagId);
while (!dagNodeList.isEmpty()) {
Dag.DagNode<JobExecutionPlan> dagNode = dagNodeList.poll();
this.dagManagementStateStore.deleteJobState(dagId.toString(), dagNode);
}
if (dag.getFlowEvent() == null) {
// If the dag flow event is not set, then it is successful
dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_SUCCEEDED);
} else {
addFailedDag(dagId.toString(), dag);
}
JobStatus flowStatus = dagTaskStream.pollFlowStatus(dag);
if (flowStatus != null && FlowStatusGenerator.FINISHED_STATUSES.contains(flowStatus.getEventName())) {
FlowId flowId = DagManagerUtils.getFlowId(dag);

switch (dag.getFlowEvent()) {
case TimingEvent.FlowTimings.FLOW_SUCCEEDED:
this.dagManagerMetrics.emitFlowSuccessMetrics(flowId);
this.dagManagerMetrics.conditionallyMarkFlowAsState(flowId, DagManager.FlowState.SUCCESSFUL);
break;
case TimingEvent.FlowTimings.FLOW_FAILED:
this.dagManagerMetrics.emitFlowFailedMetrics(flowId);
this.dagManagerMetrics.conditionallyMarkFlowAsState(flowId, DagManager.FlowState.FAILED);
break;
case TimingEvent.FlowTimings.FLOW_CANCELLED:
this.dagManagerMetrics.emitFlowSlaExceededMetrics(flowId);
this.dagManagerMetrics.conditionallyMarkFlowAsState(flowId, DagManager.FlowState.FAILED);
break;
default:
log.warn("Unexpected flow event {} for dag {}", dag.getFlowEvent(), dagId);
}
log.info("Dag {} has finished with status {}; Cleaning up dag from the state store.", dagId, dag.getFlowEvent());
cleanUpDag(dagId.toString());
}
return null;
}

@Override
protected void sendNotification(Object result) throws MaybeRetryableException {
long cleanUpProcessingTime = System.currentTimeMillis();
Dag<JobExecutionPlan> dag = (Dag<JobExecutionPlan>) result;
String dagId = DagManagerUtils.generateDagId(dag).toString();
DagManagerUtils.emitFlowEvent(this.eventSubmitter, this.dagManagementStateStore.getDagIdToDags().get(dagId), dag.getFlowEvent());
dag.setEventEmittedTimeMillis(cleanUpProcessingTime);
}

/**
* Add a dag to failed dag state store
*/
private synchronized void addFailedDag(String dagId, Dag<JobExecutionPlan> dag) {
try {
log.info("Adding dag " + dagId + " to failed dag state store");
this.failedDagStateStore.writeCheckpoint(this.dagManagementStateStore.getDagIdToDags().get(dagId));
} catch (IOException e) {
log.error("Failed to add dag " + dagId + " to failed dag state store", e);
}
this.dagManagementStateStore.getFailedDagIds().add(dagId);
}

/**
* Note that removal of a {@link Dag} entry in {@link #dags} needs to be happen after {@link #cleanUp()}
* since the real {@link Dag} object is required for {@link #cleanUp()},
* and cleaning of all relevant states need to be atomic
* @param dagId
*/
private synchronized void cleanUpDag(String dagId) {
log.info("Cleaning up dagId {}", dagId);
// clears flow event after cancelled job to allow resume event status to be set
this.dagManagementStateStore.getDagIdToDags().get(dagId).setFlowEvent(null);
try {
this.dagStateStore.cleanUp(this.dagManagementStateStore.getDagIdToDags().get(dagId));
} catch (IOException ioe) {
log.error(String.format("Failed to clean %s from backStore due to:", dagId), ioe);
}
this.dagManagementStateStore.getDagIdToDags().remove(dagId);
this.dagManagementStateStore.getDagToJobs().remove(dagId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,32 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.service.modules.orchestration;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.service.modules.flowgraph.Dag;


/**
* Custom Annotation for classes that are under development.
* It will make the classes available only during compilation phase, and not in the build.
* An implementation of {@link DagTask} that is responsible for clean up {@link Dag} that has been completed
* or has reached an end state likewise: FAILED, COMPLETE or CANCELED. It is added to the {@link DagTaskStream}
* by the {@link org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor} after it consumes the appropriate
* {@link org.apache.gobblin.metrics.GobblinTrackingEvent}.
*
*/
@Retention(RetentionPolicy.SOURCE)
@Target(ElementType.TYPE)
public @interface WorkInProgress {
String value() default "This class/interface is a work in progress.";
}
@Alpha
public class CleanUpDagTask extends DagTask {

protected DagManager.DagId cleanUpDagId;

@Override
void initialize(Object state, long triggerTimeStamp) {

}

@Override
CleanUpDagProc host(DagTaskVisitor visitor) {
return (CleanUpDagProc) visitor.meet(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.concurrent.ExecutionException;

import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.monitoring.JobStatus;
Expand All @@ -30,23 +31,37 @@
* and flow completion deadlines
*
*/
@WorkInProgress
@Alpha
public interface DagManagement {

/**
* Currently, it is handling just the launch of a {@link Dag} request via REST client for adhoc flows
* @param launchDagTask
* @param flowGroup
* @param flowName
* @param triggerTimeStamp
*/
void launchFlow(LaunchDagTask launchDagTask);
void launchFlow(String flowGroup, String flowName, long triggerTimeStamp);

/**
* Currently, it is handling just the resume of a {@link Dag} request via REST client for adhoc flows
* @param resumeDagTask
* @param flowGroup
* @param flowName
* @param flowExecutionId
* @param triggerTimeStamp
* @throws IOException
*/
void resumeFlow(ResumeDagTask resumeDagTask) throws IOException;
void resumeFlow(String flowGroup, String flowName, String flowExecutionId, long triggerTimeStamp)
throws IOException, InterruptedException;

/**
* Currently, it is handling just the kill/cancel of a {@link Dag} request via REST client for adhoc flows
* @param killDagTask
* @param flowGroup
* @param flowName
* @param flowExecutionId
* @param triggerTimeStamp
*/
void killFlow(KillDagTask killDagTask);
void killFlow(String flowGroup, String flowName, String flowExecutionId, long triggerTimeStamp)
throws InterruptedException;

boolean enforceFlowCompletionDeadline(Dag.DagNode<JobExecutionPlan> node) throws ExecutionException, InterruptedException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import com.google.common.base.Optional;
import com.google.common.collect.Lists;

import lombok.Getter;
import lombok.Synchronized;

import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.runtime.api.DagActionStore;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
Expand All @@ -42,27 +44,27 @@
* Going forward, each of these in-memory references will be read/write from MySQL store.
* Thus, the {@link DagManager} would then be stateless and operate independently.
*/
@Getter
@WorkInProgress
@Getter(onMethod_={@Synchronized})
@Alpha
public class DagManagementStateStore {
private final Map<Dag.DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> jobToDag = new HashMap<>();
private final Map<String, Dag<JobExecutionPlan>> dags = new HashMap<>();
private final Map<String, Dag<JobExecutionPlan>> dagIdToDags = new HashMap<>();
private final Set<String> failedDagIds = new HashSet<>();
private final Map<String, Dag<JobExecutionPlan>> resumingDags = new HashMap<>();
private final Map<String, Dag<JobExecutionPlan>> dagIdToResumingDags = new HashMap<>();
// dagToJobs holds a map of dagId to running jobs of that dag
final Map<String, LinkedList<Dag.DagNode<JobExecutionPlan>>> dagToJobs = new HashMap<>();
final Map<String, Long> dagToSLA = new HashMap<>();
private final Set<String> dagIdstoClean = new HashSet<>();
private Optional<DagActionStore> dagActionStore;

protected void deleteJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode) {
protected synchronized void deleteJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode) {
this.jobToDag.remove(dagNode);
this.dagToJobs.get(dagId).remove(dagNode);
this.dagToSLA.remove(dagId);
}

protected void addJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode) {
Dag<JobExecutionPlan> dag = this.dags.get(dagId);
protected synchronized void addJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode) {
Dag<JobExecutionPlan> dag = this.dagIdToDags.get(dagId);
this.jobToDag.put(dagNode, dag);
if (this.dagToJobs.containsKey(dagId)) {
this.dagToJobs.get(dagId).add(dagNode);
Expand All @@ -73,11 +75,11 @@ protected void addJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode)
}
}

protected boolean hasRunningJobs(String dagId) {
protected synchronized boolean hasRunningJobs(String dagId) {
List<Dag.DagNode<JobExecutionPlan>> dagNodes = this.dagToJobs.get(dagId);
return dagNodes != null && !dagNodes.isEmpty();
}
protected void removeDagActionFromStore(DagManager.DagId dagId, DagActionStore.FlowActionType flowActionType) throws IOException {
protected synchronized void removeDagActionFromStore(DagManager.DagId dagId, DagActionStore.FlowActionType flowActionType) throws IOException {
if (this.dagActionStore.isPresent()) {
this.dagActionStore.get().deleteDagAction(
new DagActionStore.DagAction(dagId.flowGroup, dagId.flowName, dagId.flowExecutionId, flowActionType));
Expand Down
Loading

0 comments on commit cffd894

Please sign in to comment.