Skip to content

Commit

Permalink
[BUG] issues-4134 bug fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
wolfboys committed Nov 30, 2024
1 parent aaef7b4 commit 8dab676
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,6 @@

import org.apache.streampark.console.core.entity.Savepoint;

import org.apache.ibatis.annotations.Param;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;

public interface SavepointMapper extends BaseMapper<Savepoint> {
Savepoint findLatestByTime(@Param("appId") Long appId);

void cleanLatest(@Param("appId") Long appId);
}
public interface SavepointMapper extends BaseMapper<Savepoint> {}
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,10 @@ public class SavepointServiceImpl extends ServiceImpl<SavepointMapper, Savepoint

@Override
public void expire(Long appId) {
savepointMapper.cleanLatest(appId);
this.cleanLatest(appId);
}

private void expire(Savepoint entity) {
private void clearExpire(Savepoint entity) {
FlinkEnv flinkEnv = flinkEnvService.getByAppId(entity.getAppId());
Application application = applicationService.getById(entity.getAppId());
Utils.notNull(flinkEnv);
Expand Down Expand Up @@ -222,16 +222,20 @@ private void expire(Savepoint entity) {

@Override
public Savepoint getLatest(Long id) {
LambdaQueryWrapper<Savepoint> queryWrapper =
new LambdaQueryWrapper<Savepoint>()
List<Savepoint> savepointList =
this.lambdaQuery()
.eq(Savepoint::getAppId, id)
.eq(Savepoint::getLatest, true)
.orderByDesc(Savepoint::getCreateTime);
List<Savepoint> savepointList = this.baseMapper.selectList(queryWrapper);
.orderByDesc(Savepoint::getCreateTime)
.list();

if (!savepointList.isEmpty()) {
return savepointList.get(0);
}
return this.baseMapper.findLatestByTime(id);
return this.lambdaQuery()
.eq(Savepoint::getAppId, id)
.orderByDesc(Savepoint::getTriggerTime)
.one();
}

@Override
Expand Down Expand Up @@ -314,13 +318,13 @@ public String processPath(String path, String jobName, Long jobId) {

@Override
public void saveSavePoint(Savepoint savepoint) {
this.expire(savepoint);
this.clearExpire(savepoint);
this.cleanLatest(savepoint.getAppId());
super.save(savepoint);
}

private void cleanLatest(Long appId) {
savepointMapper.cleanLatest(appId);
this.lambdaUpdate().eq(Savepoint::getAppId, appId).set(Savepoint::getLatest, false).update();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,5 @@
-->
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.apache.streampark.console.core.mapper.SavepointMapper">
<select id="findLatestByTime" resultType="org.apache.streampark.console.core.entity.Savepoint">
select * from t_flink_savepoint
where app_id = #{appId}
order by trigger_time desc
limit 1;
</select>

<update id="cleanLatest">
update t_flink_savepoint
set latest = 0
where app_id = #{appId}
</update>

</mapper>

0 comments on commit 8dab676

Please sign in to comment.