Skip to content

Commit

Permalink
[Bug-7292][ApiServer] fix cache error when standalone (apache#7293)
Browse files Browse the repository at this point in the history
  • Loading branch information
caishunfeng authored and Lucaszlei committed Dec 26, 2021
1 parent 1a5538d commit 6112ca5
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,9 @@

import org.apache.commons.lang3.StringUtils;

import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
Expand All @@ -39,7 +36,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.bind.Name;
import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.expression.EvaluationContext;
Expand All @@ -61,6 +57,11 @@ public class CacheEvictAspect {
*/
private static final String EL_SYMBOL = "#";

/**
* prefix of spring el
*/
private static final String P = "p";

@Autowired
private CacheKeyGenerator cacheKeyGenerator;

Expand Down Expand Up @@ -91,9 +92,8 @@ public Object doAround(ProceedingJoinPoint proceedingJoinPoint) throws Throwable
cacheKey = (String) cacheKeyGenerator.generate(target, method, args);
} else {
cacheKey = cacheEvict.key();
List<Name> paramsList = getParamAnnotationsByType(method, Name.class);
if (cacheEvict.key().contains(EL_SYMBOL)) {
cacheKey = parseKey(cacheEvict.key(), paramsList.stream().map(o -> o.value()).collect(Collectors.toList()), Arrays.asList(args));
cacheKey = parseKey(cacheEvict.key(), Arrays.asList(args));
}
}
if (StringUtils.isNotEmpty(cacheKey)) {
Expand Down Expand Up @@ -123,30 +123,16 @@ private CacheType getCacheType(CacheConfig cacheConfig, CacheEvict cacheEvict) {
return null;
}

private String parseKey(String key, List<String> paramNameList, List<Object> paramList) {
private String parseKey(String key, List<Object> paramList) {
SpelExpressionParser spelParser = new SpelExpressionParser();
EvaluationContext ctx = new StandardEvaluationContext();
for (int i = 0; i < paramNameList.size(); i++) {
ctx.setVariable("p" + i, paramList.get(i));
for (int i = 0; i < paramList.size(); i++) {
ctx.setVariable(P + i, paramList.get(i));
}
Object obj = spelParser.parseExpression(key).getValue(ctx);
if (null == obj) {
throw new RuntimeException("parseKey error");
}
return obj.toString();
}

private <T extends Annotation> List<T> getParamAnnotationsByType(Method method, Class<T> annotationClass) {
List<T> annotationsList = new ArrayList<>();
Annotation[][] annotations = method.getParameterAnnotations();
for (int i = 0; i < annotations.length; i++) {
Annotation[] annotationsI = annotations[i];
for (Annotation annotation : annotationsI) {
if (annotation.annotationType().equals(annotationClass)) {
annotationsList.add((T) annotation);
}
}
}
return annotationsList;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,19 @@
/**
* process definition log mapper interface
*/
@CacheConfig(cacheNames = "processDefinition")
@CacheConfig(cacheNames = "processDefinition", keyGenerator = "cacheKeyGenerator")
public interface ProcessDefinitionLogMapper extends BaseMapper<ProcessDefinitionLog> {

/**
* query the certain process definition version info by process definition code and version number
*
* @param code process definition code
* @param version version number
* @return the process definition version info
*/
@Cacheable(sync = true)
ProcessDefinitionLog queryByDefinitionCodeAndVersion(@Param("code") long code, @Param("version") int version);

/**
* query process definition log by name
*
Expand Down Expand Up @@ -63,16 +73,6 @@ public interface ProcessDefinitionLogMapper extends BaseMapper<ProcessDefinition
*/
ProcessDefinitionLog queryMaxVersionDefinitionLog(@Param("code") long code);

/**
* query the certain process definition version info by process definition code and version number
*
* @param code process definition code
* @param version version number
* @return the process definition version info
*/
@Cacheable(sync = true, key = "#processDefinitionCode + '_' + #version")
ProcessDefinitionLog queryByDefinitionCodeAndVersion(@Param("code") long code, @Param("version") int version);

/**
* query the paging process definition version list by pagination info
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.util.List;
import java.util.Map;

import org.springframework.boot.context.properties.bind.Name;
import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable;
Expand All @@ -54,30 +53,30 @@ public interface ProcessDefinitionMapper extends BaseMapper<ProcessDefinition> {
* update
*/
@CacheEvict(key = "#p0.code")
int updateById(@Name("processDefinition") @Param("et") ProcessDefinition processDefinition);
int updateById(@Param("et") ProcessDefinition processDefinition);

/**
* query process definition by code list
* delete process definition by code
*
* @param codes codes
* @return process definition list
* @param code code
* @return delete result
*/
List<ProcessDefinition> queryByCodes(@Param("codes") Collection<Long> codes);
@CacheEvict
int deleteByCode(@Param("code") long code);

/**
* delete process definition by code
* query process definition by code list
*
* @param code code
* @return delete result
* @param codes codes
* @return process definition list
*/
@CacheEvict(key = "#code")
int deleteByCode(@Name("code") @Param("code") long code);
List<ProcessDefinition> queryByCodes(@Param("codes") Collection<Long> codes);

/**
* verify process definition by name
*
* @param projectCode projectCode
* @param name name
* @param name name
* @return process definition
*/
ProcessDefinition verifyByDefineName(@Param("projectCode") long projectCode,
Expand All @@ -87,7 +86,7 @@ ProcessDefinition verifyByDefineName(@Param("projectCode") long projectCode,
* query process definition by name
*
* @param projectCode projectCode
* @param name name
* @param name name
* @return process definition
*/
ProcessDefinition queryByDefineName(@Param("projectCode") long projectCode,
Expand All @@ -104,11 +103,11 @@ ProcessDefinition queryByDefineName(@Param("projectCode") long projectCode,
/**
* process definition page
*
* @param page page
* @param searchVal searchVal
* @param userId userId
* @param page page
* @param searchVal searchVal
* @param userId userId
* @param projectCode projectCode
* @param isAdmin isAdmin
* @param isAdmin isAdmin
* @return process definition IPage
*/
IPage<ProcessDefinition> queryDefineListPaging(IPage<ProcessDefinition> page,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.List;
import java.util.Map;

import org.springframework.boot.context.properties.bind.Name;
import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable;
Expand All @@ -45,15 +44,25 @@ public interface ProcessTaskRelationMapper extends BaseMapper<ProcessTaskRelatio
* @param processCode processCode
* @return ProcessTaskRelation list
*/
@Cacheable(sync = true)
@Cacheable(unless = "#result == null || #result.size() == 0")
List<ProcessTaskRelation> queryByProcessCode(@Param("projectCode") long projectCode,
@Param("processCode") long processCode);

/**
* update
*/
@CacheEvict(key = "#processTaskRelation.projectCode + '_' + #processTaskRelation.processDefinitionCode")
int updateById(@Name("processTaskRelation") @Param("et") ProcessTaskRelation processTaskRelation);
@CacheEvict(key = "#p0.projectCode + '_' + #p0.processDefinitionCode")
int updateById(@Param("et") ProcessTaskRelation processTaskRelation);

/**
* delete process task relation by processCode
*
* @param projectCode projectCode
* @param processCode processCode
* @return int
*/
@CacheEvict
int deleteByCode(@Param("projectCode") long projectCode, @Param("processCode") long processCode);

/**
* process task relation by taskCode
Expand All @@ -71,17 +80,6 @@ List<ProcessTaskRelation> queryByProcessCode(@Param("projectCode") long projectC
*/
List<ProcessTaskRelation> queryByTaskCode(@Param("taskCode") long taskCode);

/**
* delete process task relation by processCode
*
* @param projectCode projectCode
* @param processCode processCode
* @return int
*/
@CacheEvict(key = "#projectCode + '_' + #processCode")
int deleteByCode(@Name("projectCode") @Param("projectCode") long projectCode,
@Name("processCode") @Param("processCode") long processCode);

/**
* batch insert process task relation
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.dao.mapper;

import org.apache.dolphinscheduler.dao.entity.Schedule;
Expand All @@ -22,7 +23,6 @@

import java.util.List;

import org.springframework.boot.context.properties.bind.Name;
import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable;
Expand All @@ -36,8 +36,24 @@
@CacheConfig(cacheNames = "schedule", keyGenerator = "cacheKeyGenerator")
public interface ScheduleMapper extends BaseMapper<Schedule> {

@CacheEvict(key = "#p0.processDefinitionCode")
int insert(Schedule entity);

@CacheEvict(key = "#p0.processDefinitionCode")
int updateById(@Param("et") Schedule entity);

/**
* query schedule list by process definition code
*
* @param processDefinitionCode processDefinitionCode
* @return schedule list
*/
@Cacheable(sync = true)
List<Schedule> queryReleaseSchedulerListByProcessDefinitionCode(@Param("processDefinitionCode") long processDefinitionCode);

/**
* scheduler page
*
* @param page page
* @param processDefinitionCode processDefinitionCode
* @param searchVal searchVal
Expand All @@ -49,36 +65,25 @@ IPage<Schedule> queryByProcessDefineCodePaging(IPage<Schedule> page,

/**
* query schedule list by project name
*
* @param projectName projectName
* @return schedule list
*/
List<Schedule> querySchedulerListByProjectName(@Param("projectName") String projectName);

/**
* query schedule list by process definition codes
*
* @param processDefineCodes processDefineCodes
* @return schedule list
*/
List<Schedule> selectAllByProcessDefineArray(@Param("processDefineCodes") long[] processDefineCodes);

/**
* query schedule list by process definition code
*
* @param processDefinitionCode processDefinitionCode
* @return schedule
*/
Schedule queryByProcessDefinitionCode(@Param("processDefinitionCode") long processDefinitionCode);

/**
* query schedule list by process definition code
* @param processDefinitionCode processDefinitionCode
* @return schedule list
*/
@Cacheable(sync = true)
List<Schedule> queryReleaseSchedulerListByProcessDefinitionCode(@Param("processDefinitionCode") long processDefinitionCode);

@CacheEvict(key = "#entity.processDefinitionCode")
int insert(@Name("entity") Schedule entity);

@CacheEvict(key = "#entity.processDefinitionCode")
int updateById(@Name("entity") @Param("et")Schedule entity);
}
Loading

0 comments on commit 6112ca5

Please sign in to comment.