Skip to content

Commit

Permalink
[Feature] support spark standalone deploy mode
Browse files Browse the repository at this point in the history
  • Loading branch information
monrg committed Dec 1, 2024
1 parent 4af158e commit 954f63b
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,35 +155,22 @@ public Boolean allowShutdownCluster(SparkCluster sparkCluster) {

@Override
public Boolean existsByClusterId(String clusterId, Long id) {
LambdaQueryWrapper<SparkCluster> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.ne(SparkCluster::getClusterId, id);
if (id != null) {
lambdaQueryWrapper.ne(SparkCluster::getId, id);
}
lambdaQueryWrapper.last("limit 1");
return this.getOne(lambdaQueryWrapper) != null;
return this.lambdaQuery().ne(SparkCluster::getId, id).ne(id != null, SparkCluster::getClusterId, id).exists();
}

@Override
public Boolean existsByClusterName(String clusterName) {
LambdaQueryWrapper<SparkCluster> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(SparkCluster::getClusterName, clusterName);
lambdaQueryWrapper.last("limit 1");
return this.getOne(lambdaQueryWrapper) != null;
return this.lambdaQuery().eq(SparkCluster::getClusterName, clusterName).exists();
}

@Override
public Boolean existsBySparkEnvId(Long id) {
LambdaQueryWrapper<SparkCluster> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(SparkCluster::getVersionId, id);
return this.getOne(lambdaQueryWrapper) != null;
return this.lambdaQuery().eq(SparkCluster::getVersionId, id).exists();
}

@Override
public List<SparkCluster> listByDeployModes(Collection<SparkDeployMode> deployModeEnums) {
LambdaQueryWrapper<SparkCluster> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.in(SparkCluster::getDeployModeEnum, deployModeEnums);
return this.list(lambdaQueryWrapper);
return this.lambdaQuery().in(SparkCluster::getDeployModeEnum, deployModeEnums).list();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -689,9 +689,9 @@ create table if not exists t_spark_cluster (
`resolve_order` INT,
`exception` VARCHAR(255),
`cluster_state` INT,
`create_time` TIMESTAMP,
`start_time` TIMESTAMP,
`end_time` TIMESTAMP,
`create_time` datetime,
`start_time` datetime,
`end_time` datetime,
`alert_id` BIGINT,
primary key(`id`)
);
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.streampark.console.core.enums.DistributedTaskEnum;
import org.apache.streampark.console.core.service.impl.DistributedTaskServiceImpl;

import com.fasterxml.jackson.core.JacksonException;
import com.fasterxml.jackson.core.JsonProcessingException;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -68,7 +68,7 @@ void testFlinkTaskAndApp() {
FlinkTaskItem flinkTaskItem = distributionTaskService.getFlinkTaskItem(distributedTask);
FlinkApplication newApplication = distributionTaskService.getAppByFlinkTaskItem(flinkTaskItem);
assert (application.equals(newApplication));
} catch (JacksonException e) {
} catch (JsonProcessingException e) {
log.error("testFlinkTaskAndApp failed:", e);
}
}
Expand All @@ -83,7 +83,7 @@ void testSparkTaskAndApp() {
SparkTaskItem sparkTaskItem = distributionTaskService.getSparkTaskItem(distributedTask);
SparkApplication newApplication = distributionTaskService.getAppBySparkTaskItem(sparkTaskItem);
assert (application.equals(newApplication));
} catch (JacksonException e) {
} catch (JsonProcessingException e) {
log.error("testSparkTaskAndApp failed:", e);
}
}
Expand Down

0 comments on commit 954f63b

Please sign in to comment.