Skip to content

Commit

Permalink
[Feature] Backend modifications support the cdc yaml api (#4175)
Browse files Browse the repository at this point in the history
* [Feature] Backend modifications support the cdc yaml api

* fixd imports
  • Loading branch information
Mrart authored Jan 19, 2025
1 parent f512b6b commit 246fab6
Show file tree
Hide file tree
Showing 11 changed files with 200 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,35 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/** The flink deployment mode enum. */
/**
* The flink deployment mode enum.
*/
public enum FlinkJobType {

/** Unknown type replace null */
/**
* Unknown type replace null
*/
UNKNOWN("Unknown", -1),

/** custom code */
/**
* custom code
*/
CUSTOM_CODE("Custom Code", 1),

/** Flink SQL */
/**
* Flink SQL
*/
FLINK_SQL("Flink SQL", 2),

/** Py flink Mode */
PYFLINK("Python Flink", 3);
/**
* Py flink Mode
*/
PYFLINK("Python Flink", 3),

/**
* Flink CDC
*/
FLINK_CDC("Flink CDC", 4);

private final String name;

Expand All @@ -60,7 +75,9 @@ public static FlinkJobType of(@Nullable Integer value) {
return FlinkJobType.UNKNOWN;
}

/** Get the mode value of the current {@link FlinkJobType} enum. */
/**
* Get the mode value of the current {@link FlinkJobType} enum.
*/
@Nonnull
public Integer getMode() {
return mode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,38 +71,58 @@ public class FlinkApplication extends BaseEntity {

private Long teamId;

/** 1) custom code 2) flink SQL */
/**
* 1) custom code 2) flink SQL
*/
private Integer jobType;

private Long projectId;
/** creator */
/**
* creator
*/
private Long userId;

/** The name of the frontend and program displayed in yarn */
/**
* The name of the frontend and program displayed in yarn
*/
private String jobName;

@TableField(updateStrategy = FieldStrategy.IGNORED)
private String jobId;

/** The address of the jobmanager, that is, the direct access address of the Flink web UI */
/**
* The address of the jobmanager, that is, the direct access address of the Flink web UI
*/
@TableField(updateStrategy = FieldStrategy.IGNORED)
private String jobManagerUrl;

/** flink version */
/**
* flink version
*/
private Long versionId;

/** 1. yarn application id(on yarn) 2. k8s application id (on k8s application) */
/**
* 1. yarn application id(on yarn) 2. k8s application id (on k8s application)
*/
private String clusterId;

/** flink docker base image */
/**
* flink docker base image
*/
private String flinkImage;

/** k8s namespace */
/**
* k8s namespace
*/
private String k8sNamespace = Constants.DEFAULT;

/** The exposed type of the rest service of K8s(kubernetes.rest-service.exposed.type) */
/**
* The exposed type of the rest service of K8s(kubernetes.rest-service.exposed.type)
*/
private Integer k8sRestExposedType;
/** flink kubernetes pod template */
/**
* flink kubernetes pod template
*/
private String k8sPodTemplate;

private String k8sJmPodTemplate;
Expand All @@ -113,32 +133,46 @@ public class FlinkApplication extends BaseEntity {
@Setter
private String defaultModeIngress;

/** flink-hadoop integration on flink-k8s mode */
/**
* flink-hadoop integration on flink-k8s mode
*/
private Boolean k8sHadoopIntegration;

private Integer state;
/** task release status */
/**
* task release status
*/
@TableField("`release`")
private Integer release;

/** determine if a task needs to be built */
/**
* determine if a task needs to be built
*/
private Boolean build;

/** max restart retries after job failed */
/**
* max restart retries after job failed
*/
@TableField(updateStrategy = FieldStrategy.IGNORED)
private Integer restartSize;

/** has restart count */
/**
* has restart count
*/
private Integer restartCount;

private Integer optionState;

/** alert id */
/**
* alert id
*/
@TableField(updateStrategy = FieldStrategy.IGNORED)
private Long alertId;

private String args;
/** application module */
/**
* application module
*/
private String module;

private String options;
Expand All @@ -155,7 +189,9 @@ public class FlinkApplication extends BaseEntity {

private Integer appType;

/** determine if tracking status */
/**
* determine if tracking status
*/
private Integer tracking;

private String jar;
Expand All @@ -176,19 +212,27 @@ public class FlinkApplication extends BaseEntity {
@TableField(updateStrategy = FieldStrategy.IGNORED)
private Long duration;

/** checkpoint max failure interval */
/**
* checkpoint max failure interval
*/
@TableField(updateStrategy = FieldStrategy.IGNORED)
private Integer cpMaxFailureInterval;

/** checkpoint failure rate interval */
/**
* checkpoint failure rate interval
*/
@TableField(updateStrategy = FieldStrategy.IGNORED)
private Integer cpFailureRateInterval;

/** Actions triggered after X minutes failed Y times: 1: send alert 2: restart */
/**
* Actions triggered after X minutes failed Y times: 1: send alert 2: restart
*/
@TableField(updateStrategy = FieldStrategy.IGNORED)
private Integer cpFailureAction;

/** overview */
/**
* overview
*/
@TableField("TOTAL_TM")
private Integer totalTM;

Expand All @@ -201,7 +245,9 @@ public class FlinkApplication extends BaseEntity {
private Integer tmMemory;
private Integer totalTask;

/** the cluster id bound to the task in remote mode */
/**
* the cluster id bound to the task in remote mode
*/
@TableField(updateStrategy = FieldStrategy.IGNORED)
private Long flinkClusterId;

Expand All @@ -214,13 +260,17 @@ public class FlinkApplication extends BaseEntity {

private Date modifyTime;

/** 1: cicd (build from csv) 2: upload (upload local jar job) */
/**
* 1: cicd (build from csv) 2: upload (upload local jar job)
*/
private Integer resourceFrom;

@TableField(updateStrategy = FieldStrategy.IGNORED)
private String tags;

/** running job */
/**
* running job
*/
private transient JobsOverview.Task overview;

private transient String teamResource;
Expand Down Expand Up @@ -254,10 +304,14 @@ public class FlinkApplication extends BaseEntity {
private transient String yarnQueue;
private transient String serviceAccount;

/** Flink Web UI Url */
/**
* Flink Web UI Url
*/
private transient String flinkRestUrl;

/** refer to {@link org.apache.streampark.flink.packer.pipeline.BuildPipeline} */
/**
* refer to {@link org.apache.streampark.flink.packer.pipeline.BuildPipeline}
*/
private transient Integer buildStatus;

private transient AppControl appControl;
Expand Down Expand Up @@ -367,15 +421,17 @@ public boolean cpFailedTrigger() {
}

public boolean eqFlinkJob(FlinkApplication other) {
if (this.isFlinkSqlJob()
&& other.isFlinkSqlJob()
if (this.isFlinkSqlJobOrCDC()
&& other.isFlinkSqlJobOrCDC()
&& this.getFlinkSql().trim().equals(other.getFlinkSql().trim())) {
return this.getDependencyObject().equals(other.getDependencyObject());
}
return false;
}

/** Local compilation and packaging working directory */
/**
* Local compilation and packaging working directory
*/
@JsonIgnore
public String getDistHome() {
String path = String.format("%s/%s/%s", Workspace.APP_LOCAL_DIST(), projectId.toString(), getModule());
Expand All @@ -397,7 +453,9 @@ public String getRemoteAppHome() {
return path;
}

/** Automatically identify remoteAppHome or localAppHome based on app FlinkDeployMode */
/**
* Automatically identify remoteAppHome or localAppHome based on app FlinkDeployMode
*/
@JsonIgnore
public String getAppHome() {
switch (this.getDeployModeEnum()) {
Expand All @@ -416,6 +474,22 @@ public String getAppHome() {
}
}

public String getMainClass() {
FlinkJobType flinkJobType = FlinkJobType.of(jobType);
if (flinkJobType == FlinkJobType.FLINK_SQL) {
return Constants.STREAMPARK_FLINKSQL_CLIENT_CLASS;
} else if (flinkJobType == FlinkJobType.FLINK_CDC) {
return Constants.STREAMPARK_FLINKCDC_CLIENT_CLASS;
} else if (flinkJobType == FlinkJobType.PYFLINK) {
return Constants.PYTHON_FLINK_DRIVER_CLASS_NAME; // Assuming this is the default behavior for other enum
// values
} else if (flinkJobType == FlinkJobType.CUSTOM_CODE) {
return mainClass;
} else {
return null;
}
}

@JsonIgnore
public String getAppLib() {
return getAppHome().concat("/lib");
Expand All @@ -439,14 +513,16 @@ public Map<String, Object> getOptionMap() {
}

@JsonIgnore
public boolean isFlinkSqlJob() {
return FlinkJobType.FLINK_SQL.getMode().equals(this.getJobType());
public boolean isFlinkSqlJobOrCDC() {
return FlinkJobType.FLINK_SQL.getMode().equals(this.getJobType()) ||
FlinkJobType.FLINK_CDC.getMode().equals(this.getJobType());
}

@JsonIgnore
public boolean isFlinkSqlJobOrPyFlinkJob() {
public boolean isFlinkSqlJobOrPyFlinkJobOrFlinkCDC() {
return FlinkJobType.FLINK_SQL.getMode().equals(this.getJobType())
|| FlinkJobType.PYFLINK.getMode().equals(this.getJobType());
|| FlinkJobType.PYFLINK.getMode().equals(this.getJobType())
|| FlinkJobType.FLINK_CDC.getMode().equals(this.getJobType());
}

@JsonIgnore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ public void revoke(Long appId) throws ApplicationException {
// 3) restore related status
LambdaUpdateWrapper<FlinkApplication> updateWrapper = Wrappers.lambdaUpdate();
updateWrapper.eq(FlinkApplication::getId, application.getId());
if (application.isFlinkSqlJob()) {
if (application.isFlinkSqlJobOrCDC()) {
updateWrapper.set(FlinkApplication::getRelease, ReleaseStateEnum.FAILED.get());
} else {
updateWrapper.set(FlinkApplication::getRelease, ReleaseStateEnum.NEED_RELEASE.get());
Expand Down Expand Up @@ -452,7 +452,7 @@ public void start(FlinkApplication appParam, boolean auto) throws Exception {
applicationManageService.toEffective(application);

Map<String, Object> extraParameter = new HashMap<>(0);
if (application.isFlinkSqlJob()) {
if (application.isFlinkSqlJobOrCDC()) {
FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(), true);
// Get the sql of the replaced placeholder
String realSql = variableService.replaceVariable(application.getTeamId(), flinkSql.getSql());
Expand Down Expand Up @@ -690,6 +690,24 @@ private Tuple2<String, String> getUserJarAndAppConf(
}
break;

case FLINK_CDC:
log.info("the current job id: {}", application.getId());
FlinkSql flinkCDC = flinkSqlService.getEffective(application.getId(), false);
AssertUtils.notNull(flinkCDC);
// 1) dist_userJar
String cdcDistJar = ServiceHelper.getFlinkCDCClientJar(flinkEnv);
// 2) appConfig
appConf =
applicationConfig == null
? null
: String.format("yaml://%s", applicationConfig.getContent());
// 3) client
if (FlinkDeployMode.YARN_APPLICATION == deployModeEnum) {
String clientPath = Workspace.remote().APP_CLIENT();
flinkUserJar = String.format("%s/%s", clientPath, cdcDistJar);
}
break;

case PYFLINK:
Resource resource =
resourceService.findByResourceName(application.getTeamId(), application.getJar());
Expand Down
Loading

0 comments on commit 246fab6

Please sign in to comment.