Skip to content

Commit

Permalink
[Improve] job type custom_code rename to flink_jar (#4176)
Browse files Browse the repository at this point in the history
  • Loading branch information
wolfboys authored Jan 20, 2025
1 parent 246fab6 commit 5b1a1b4
Show file tree
Hide file tree
Showing 24 changed files with 93 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ public enum FlinkJobType {
UNKNOWN("Unknown", -1),

/**
* custom code
* Flink Jar
*/
CUSTOM_CODE("Custom Code", 1),
FLINK_JAR("Flink JAR", 1),

/**
* Flink SQL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ drop table if exists `t_spark_app`;
create table `t_spark_app` (
`id` bigint not null auto_increment,
`team_id` bigint not null,
`job_type` tinyint default null comment '(1)custom code(2)spark SQL',
`job_type` tinyint default null comment '(1) spark Jar(2) spark SQL',
`app_type` tinyint default null comment '(1)Apache Spark(2)StreamPark Spark',
`version_id` bigint default null comment 'spark version',
`app_name` varchar(255) collate utf8mb4_general_ci default null comment 'spark.app.name',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.baomidou.mybatisplus.core.toolkit.support.SFunction;
import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.Getter;
import lombok.Setter;
Expand All @@ -72,7 +71,7 @@ public class FlinkApplication extends BaseEntity {
private Long teamId;

/**
* 1) custom code 2) flink SQL
* 1) flink jar 2) flink SQL
*/
private Integer jobType;

Expand Down Expand Up @@ -130,6 +129,7 @@ public class FlinkApplication extends BaseEntity {

@Getter
private String ingressTemplate;

@Setter
private String defaultModeIngress;

Expand Down Expand Up @@ -261,7 +261,7 @@ public class FlinkApplication extends BaseEntity {
private Date modifyTime;

/**
* 1: cicd (build from csv) 2: upload (upload local jar job)
* 1: build (build from csv) 2: upload (upload local jar job)
*/
private Integer resourceFrom;

Expand Down Expand Up @@ -420,15 +420,6 @@ public boolean cpFailedTrigger() {
&& this.cpFailureAction != null;
}

public boolean eqFlinkJob(FlinkApplication other) {
if (this.isFlinkSqlJobOrCDC()
&& other.isFlinkSqlJobOrCDC()
&& this.getFlinkSql().trim().equals(other.getFlinkSql().trim())) {
return this.getDependencyObject().equals(other.getDependencyObject());
}
return false;
}

/**
* Local compilation and packaging working directory
*/
Expand Down Expand Up @@ -475,18 +466,19 @@ public String getAppHome() {
}

public String getMainClass() {
FlinkJobType flinkJobType = FlinkJobType.of(jobType);
if (flinkJobType == FlinkJobType.FLINK_SQL) {
return Constants.STREAMPARK_FLINKSQL_CLIENT_CLASS;
} else if (flinkJobType == FlinkJobType.FLINK_CDC) {
return Constants.STREAMPARK_FLINKCDC_CLIENT_CLASS;
} else if (flinkJobType == FlinkJobType.PYFLINK) {
return Constants.PYTHON_FLINK_DRIVER_CLASS_NAME; // Assuming this is the default behavior for other enum
// values
} else if (flinkJobType == FlinkJobType.CUSTOM_CODE) {
return mainClass;
} else {
return null;
FlinkJobType flinkJobType = this.getJobTypeEnum();
switch (flinkJobType) {
case FLINK_SQL:
return Constants.STREAMPARK_FLINKSQL_CLIENT_CLASS;
case FLINK_CDC:
return Constants.STREAMPARK_FLINKCDC_CLIENT_CLASS;
case PYFLINK:
return Constants.PYTHON_FLINK_DRIVER_CLASS_NAME;
case FLINK_JAR:
return mainClass;
case UNKNOWN:
default:
return null;
}
}

Expand All @@ -513,42 +505,35 @@ public Map<String, Object> getOptionMap() {
}

@JsonIgnore
public boolean isFlinkSqlJobOrCDC() {
public boolean isJobTypeFlinkSqlOrCDC() {
return FlinkJobType.FLINK_SQL.getMode().equals(this.getJobType()) ||
FlinkJobType.FLINK_CDC.getMode().equals(this.getJobType());
}

@JsonIgnore
public boolean isFlinkSqlJobOrPyFlinkJobOrFlinkCDC() {
return FlinkJobType.FLINK_SQL.getMode().equals(this.getJobType())
|| FlinkJobType.PYFLINK.getMode().equals(this.getJobType())
|| FlinkJobType.FLINK_CDC.getMode().equals(this.getJobType());
public boolean isJobTypeFlinkJar() {
return FlinkJobType.FLINK_JAR.getMode().equals(this.getJobType());
}

@JsonIgnore
public boolean isCustomCodeJob() {
return FlinkJobType.CUSTOM_CODE.getMode().equals(this.getJobType());
}

@JsonIgnore
public boolean isCustomCodeOrPyFlinkJob() {
return FlinkJobType.CUSTOM_CODE.getMode().equals(this.getJobType())
public boolean isJobTypeFlinkJarOrPyFlink() {
return FlinkJobType.FLINK_JAR.getMode().equals(this.getJobType())
|| FlinkJobType.PYFLINK.getMode().equals(this.getJobType());
}

@JsonIgnore
public boolean isUploadJob() {
return isCustomCodeOrPyFlinkJob()
public boolean isResourceFromUpload() {
return isJobTypeFlinkJarOrPyFlink()
&& ResourceFromEnum.UPLOAD.getValue().equals(this.getResourceFrom());
}

@JsonIgnore
public boolean isCICDJob() {
return isCustomCodeOrPyFlinkJob()
&& ResourceFromEnum.CICD.getValue().equals(this.getResourceFrom());
public boolean isResourceFromBuild() {
return isJobTypeFlinkJarOrPyFlink()
&& ResourceFromEnum.BUILD.getValue().equals(this.getResourceFrom());
}

public boolean isStreamParkJob() {
public boolean isAppTypeStreamPark() {
return this.getAppType() == ApplicationType.STREAMPARK_FLINK.getType();
}

Expand Down Expand Up @@ -673,23 +658,4 @@ public boolean isKubernetesModeJob() {
return FlinkDeployMode.isKubernetesMode(this.getDeployModeEnum());
}

public static class SFunc {

public static final SFunction<FlinkApplication, Long> ID = FlinkApplication::getId;
public static final SFunction<FlinkApplication, String> JOB_ID = FlinkApplication::getJobId;
public static final SFunction<FlinkApplication, Date> START_TIME = FlinkApplication::getStartTime;
public static final SFunction<FlinkApplication, Date> END_TIME = FlinkApplication::getEndTime;
public static final SFunction<FlinkApplication, Long> DURATION = FlinkApplication::getDuration;
public static final SFunction<FlinkApplication, Integer> TOTAL_TASK = FlinkApplication::getTotalTask;
public static final SFunction<FlinkApplication, Integer> TOTAL_TM = FlinkApplication::getTotalTM;
public static final SFunction<FlinkApplication, Integer> TOTAL_SLOT = FlinkApplication::getTotalSlot;
public static final SFunction<FlinkApplication, Integer> JM_MEMORY = FlinkApplication::getJmMemory;
public static final SFunction<FlinkApplication, Integer> TM_MEMORY = FlinkApplication::getTmMemory;
public static final SFunction<FlinkApplication, Integer> STATE = FlinkApplication::getState;
public static final SFunction<FlinkApplication, String> OPTIONS = FlinkApplication::getOptions;
public static final SFunction<FlinkApplication, Integer> AVAILABLE_SLOT = FlinkApplication::getAvailableSlot;
public static final SFunction<FlinkApplication, Integer> EXECUTION_MODE = FlinkApplication::getDeployMode;
public static final SFunction<FlinkApplication, String> JOB_MANAGER_URL = FlinkApplication::getJobManagerUrl;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public class SparkApplication extends BaseEntity {

private Integer deployMode;

/** 1: cicd (build from csv) 2: upload (upload local jar job) */
/** 1: build (build from csv) 2: upload (upload local jar job) */
private Integer resourceFrom;

private Long projectId;
Expand Down Expand Up @@ -426,15 +426,15 @@ public boolean isSparkJarOrPySparkJob() {
}

@JsonIgnore
public boolean isUploadJob() {
public boolean isFromUploadJob() {
return isSparkJarOrPySparkJob()
&& ResourceFromEnum.UPLOAD.getValue().equals(this.getResourceFrom());
}

@JsonIgnore
public boolean isCICDJob() {
public boolean isFromBuildJob() {
return isSparkJarOrPySparkJob()
&& ResourceFromEnum.CICD.getValue().equals(this.getResourceFrom());
&& ResourceFromEnum.BUILD.getValue().equals(this.getResourceFrom());
}

public boolean isStreamParkJob() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
@Getter
public enum ResourceFromEnum {

/** cicd(build from cvs) */
CICD(1),
/** build from cvs */
BUILD(1),

/** upload local jar */
UPLOAD(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ public void revoke(Long appId) throws ApplicationException {
// 3) restore related status
LambdaUpdateWrapper<FlinkApplication> updateWrapper = Wrappers.lambdaUpdate();
updateWrapper.eq(FlinkApplication::getId, application.getId());
if (application.isFlinkSqlJobOrCDC()) {
if (application.isJobTypeFlinkSqlOrCDC()) {
updateWrapper.set(FlinkApplication::getRelease, ReleaseStateEnum.FAILED.get());
} else {
updateWrapper.set(FlinkApplication::getRelease, ReleaseStateEnum.NEED_RELEASE.get());
Expand Down Expand Up @@ -452,7 +452,7 @@ public void start(FlinkApplication appParam, boolean auto) throws Exception {
applicationManageService.toEffective(application);

Map<String, Object> extraParameter = new HashMap<>(0);
if (application.isFlinkSqlJobOrCDC()) {
if (application.isJobTypeFlinkSqlOrCDC()) {
FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(), true);
// Get the sql of the replaced placeholder
String realSql = variableService.replaceVariable(application.getTeamId(), flinkSql.getSql());
Expand Down Expand Up @@ -725,8 +725,8 @@ private Tuple2<String, String> getUserJarAndAppConf(
flinkUserJar = resource.getFilePath();
break;

case CUSTOM_CODE:
if (application.isUploadJob()) {
case FLINK_JAR:
if (application.isResourceFromUpload()) {
appConf =
String.format(
"json://{\"%s\":\"%s\"}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void rollback(FlinkApplicationBackup bakParam) {
// If necessary, perform the backup first
if (bakParam.isBackup()) {
application.setBackUpDescription(bakParam.getDescription());
if (application.isFlinkSqlJobOrCDC()) {
if (application.isJobTypeFlinkSqlOrCDC()) {
FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(), false);
backup(application, flinkSql);
} else {
Expand All @@ -107,7 +107,7 @@ public void rollback(FlinkApplicationBackup bakParam) {
effectiveService.saveOrUpdate(
bakParam.getAppId(), EffectiveTypeEnum.CONFIG, bakParam.getId());
// if flink sql task, will be rollback sql and dependencies
if (application.isFlinkSqlJobOrCDC()) {
if (application.isJobTypeFlinkSqlOrCDC()) {
effectiveService.saveOrUpdate(
bakParam.getAppId(), EffectiveTypeEnum.FLINKSQL, bakParam.getSqlId());
}
Expand Down Expand Up @@ -190,7 +190,7 @@ public Boolean removeById(Long id) throws InternalException {
@Override
public void backup(FlinkApplication appParam, FlinkSql flinkSqlParam) {
// basic configuration file backup
String appHome = (appParam.isCustomCodeJob() && appParam.isCICDJob())
String appHome = (appParam.isJobTypeFlinkJar() && appParam.isResourceFromBuild())
? appParam.getDistHome()
: appParam.getAppHome();
FsOperator fsOperator = appParam.getFsOperator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,14 +192,15 @@ public boolean buildApplication(@Nonnull Long appId, boolean forceBuild) {
return true;
}
// rollback
if (app.isNeedRollback() && app.isFlinkSqlJobOrCDC()) {
if (app.isNeedRollback() && app.isJobTypeFlinkSqlOrCDC()) {
flinkSqlService.rollback(app);
}

// 1) flink sql setDependency
FlinkSql newFlinkSql = flinkSqlService.getCandidate(app.getId(), CandidateTypeEnum.NEW);
FlinkSql effectiveFlinkSql = flinkSqlService.getEffective(app.getId(), false);
if (app.isFlinkSqlJobOrPyFlinkJobOrFlinkCDC()) {
FlinkJobType jobType = app.getJobTypeEnum();
if (jobType == FlinkJobType.FLINK_SQL || jobType == FlinkJobType.PYFLINK || jobType == FlinkJobType.FLINK_CDC) {
FlinkSql flinkSql = newFlinkSql == null ? effectiveFlinkSql : newFlinkSql;
AssertUtils.notNull(flinkSql);
app.setDependency(flinkSql.getDependency());
Expand Down Expand Up @@ -235,12 +236,12 @@ public void onStart(PipelineSnapshot snapshot) {
// 2) some preparatory work
String appUploads = app.getWorkspace().APP_UPLOADS();

if (app.isCustomCodeOrPyFlinkJob()) {
// customCode upload jar to appHome...
if (app.isJobTypeFlinkJarOrPyFlink()) {
// flinkJar upload jar to appHome...
String appHome = app.getAppHome();
FsOperator fsOperator = app.getFsOperator();
fsOperator.delete(appHome);
if (app.isUploadJob()) {
if (app.isResourceFromUpload()) {
String uploadJar = appUploads.concat("/").concat(app.getJar());
File localJar = new File(
String.format(
Expand Down Expand Up @@ -274,7 +275,7 @@ public void onStart(PipelineSnapshot snapshot) {
break;
default:
throw new IllegalArgumentException(
"[StreamPark] unsupported ApplicationType of custom code: "
"[StreamPark] unsupported ApplicationType of FlinkJar: "
+ app.getApplicationType());
}
} else {
Expand Down Expand Up @@ -324,10 +325,10 @@ public void onFinish(PipelineSnapshot snapshot, BuildResult result) {
// If the current task is not running, or the task has just been added, directly
// set
// the candidate version to the official version
if (app.isFlinkSqlJobOrCDC()) {
if (app.isJobTypeFlinkSqlOrCDC()) {
applicationManageService.toEffective(app);
} else {
if (app.isStreamParkJob()) {
if (app.isAppTypeStreamPark()) {
FlinkApplicationConfig config =
applicationConfigService.getLatest(app.getId());
if (config != null) {
Expand All @@ -340,7 +341,7 @@ public void onFinish(PipelineSnapshot snapshot, BuildResult result) {
}
// backup.
if (!app.isNeedRollback()) {
if (app.isFlinkSqlJobOrCDC() && newFlinkSql != null) {
if (app.isJobTypeFlinkSqlOrCDC() && newFlinkSql != null) {
backUpService.backup(app, newFlinkSql);
} else {
backUpService.backup(app, null);
Expand Down Expand Up @@ -467,7 +468,7 @@ private BuildPipeline createPipelineInstance(@Nonnull FlinkApplication app) {
case YARN_APPLICATION:
String yarnProvidedPath = app.getAppLib();
String localWorkspace = app.getLocalAppHome().concat("/lib");
if (FlinkJobType.CUSTOM_CODE == app.getJobTypeEnum()
if (FlinkJobType.FLINK_JAR == app.getJobTypeEnum()
&& APACHE_FLINK == app.getApplicationType()) {
yarnProvidedPath = app.getAppHome();
localWorkspace = app.getLocalAppHome();
Expand Down Expand Up @@ -574,7 +575,7 @@ private FlinkRemotePerJobBuildRequest buildFlinkRemotePerJobBuildRequest(
app.getLocalAppHome(),
mainClass,
flinkUserJar,
app.isCustomCodeJob(),
app.isJobTypeFlinkJar(),
app.getDeployModeEnum(),
app.getJobTypeEnum(),
flinkEnv.getFlinkVersion(),
Expand All @@ -586,7 +587,7 @@ private FlinkRemotePerJobBuildRequest buildFlinkRemotePerJobBuildRequest(
*/
private String retrieveFlinkUserJar(FlinkEnv flinkEnv, FlinkApplication app) {
switch (app.getJobTypeEnum()) {
case CUSTOM_CODE:
case FLINK_JAR:
switch (app.getApplicationType()) {
case STREAMPARK_FLINK:
return String.format(
Expand All @@ -595,7 +596,7 @@ private String retrieveFlinkUserJar(FlinkEnv flinkEnv, FlinkApplication app) {
return String.format("%s/%s", app.getAppHome(), app.getJar());
default:
throw new IllegalArgumentException(
"[StreamPark] unsupported ApplicationType of custom code: "
"[StreamPark] unsupported ApplicationType of FlinkJar: "
+ app.getApplicationType());
}
case PYFLINK:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void setLatest(Long appId, Long configId) {
public synchronized void update(FlinkApplication appParam, Boolean latest) {
// flink sql job
FlinkApplicationConfig latestConfig = getLatest(appParam.getId());
if (appParam.isFlinkSqlJobOrCDC()) {
if (appParam.isJobTypeFlinkSqlOrCDC()) {
updateForFlinkSqlJob(appParam, latest, latestConfig);
} else {
updateForNonFlinkSqlJob(appParam, latest, latestConfig);
Expand Down
Loading

0 comments on commit 5b1a1b4

Please sign in to comment.