Skip to content

Commit

Permalink
[Improvement][K8S] Remove ResourceQuota
Browse files Browse the repository at this point in the history
Signed-off-by: Gallardot <[email protected]>
  • Loading branch information
Gallardot committed Oct 13, 2023
1 parent 629fced commit 5d03771
Show file tree
Hide file tree
Showing 19 changed files with 120 additions and 432 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_CAN_USE_K8S_NAMESPACE_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_K8S_NAMESPACE_LIST_PAGING_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_UNAUTHORIZED_NAMESPACE_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.UPDATE_K8S_NAMESPACE_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.VERIFY_K8S_NAMESPACE_ERROR;

import org.apache.dolphinscheduler.api.exceptions.ApiException;
Expand All @@ -40,9 +39,7 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestAttribute;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
Expand Down Expand Up @@ -104,56 +101,21 @@ public Result queryNamespaceListPaging(@Parameter(hidden = true) @RequestAttribu
* @param loginUser
* @param namespace k8s namespace
* @param clusterCode clusterCode
* @param limitsCpu max cpu
* @param limitsMemory max memory
* @return
*/
@Operation(summary = "createK8sNamespace", description = "CREATE_NAMESPACE_NOTES")
@Parameters({
@Parameter(name = "namespace", description = "NAMESPACE", required = true, schema = @Schema(implementation = String.class)),
@Parameter(name = "clusterCode", description = "CLUSTER_CODE", required = true, schema = @Schema(implementation = long.class)),
@Parameter(name = "limits_cpu", description = "LIMITS_CPU", required = false, schema = @Schema(implementation = double.class)),
@Parameter(name = "limits_memory", description = "LIMITS_MEMORY", required = false, schema = @Schema(implementation = int.class))
@Parameter(name = "clusterCode", description = "CLUSTER_CODE", required = true, schema = @Schema(implementation = long.class))
})
@PostMapping()
@ResponseStatus(HttpStatus.CREATED)
@ApiException(CREATE_K8S_NAMESPACE_ERROR)
public Result createNamespace(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "namespace") String namespace,
@RequestParam(value = "clusterCode") Long clusterCode,
@RequestParam(value = "limitsCpu", required = false) Double limitsCpu,
@RequestParam(value = "limitsMemory", required = false) Integer limitsMemory) {
Map<String, Object> result =
k8sNamespaceService.createK8sNamespace(loginUser, namespace, clusterCode, limitsCpu, limitsMemory);
return returnDataList(result);
}

/**
* update namespace,namespace and k8s not allowed update, because may create on k8s,can delete and create new instead
*
* @param loginUser
* @param userName owner
* @param limitsCpu max cpu
* @param limitsMemory max memory
* @return
*/
@Operation(summary = "updateK8sNamespace", description = "UPDATE_NAMESPACE_NOTES")
@Parameters({
@Parameter(name = "id", description = "K8S_NAMESPACE_ID", required = true, schema = @Schema(implementation = int.class, example = "100")),
@Parameter(name = "userName", description = "OWNER", required = false, schema = @Schema(implementation = String.class)),
@Parameter(name = "limitsCpu", description = "LIMITS_CPU", required = false, schema = @Schema(implementation = double.class)),
@Parameter(name = "limitsMemory", description = "LIMITS_MEMORY", required = false, schema = @Schema(implementation = int.class))})
@PutMapping(value = "/{id}")
@ResponseStatus(HttpStatus.CREATED)
@ApiException(UPDATE_K8S_NAMESPACE_ERROR)
public Result updateNamespace(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@PathVariable(value = "id") int id,
@RequestParam(value = "userName", required = false) String userName,
@RequestParam(value = "tag", required = false) String tag,
@RequestParam(value = "limitsCpu", required = false) Double limitsCpu,
@RequestParam(value = "limitsMemory", required = false) Integer limitsMemory) {
@RequestParam(value = "clusterCode") Long clusterCode) {
Map<String, Object> result =
k8sNamespaceService.updateK8sNamespace(loginUser, id, userName, limitsCpu, limitsMemory);
k8sNamespaceService.createK8sNamespace(loginUser, namespace, clusterCode);
return returnDataList(result);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import io.fabric8.kubernetes.api.model.Namespace;
import io.fabric8.kubernetes.api.model.NamespaceList;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.ResourceQuota;
import io.fabric8.kubernetes.client.KubernetesClient;

/**
Expand All @@ -41,14 +40,12 @@ public class K8sClientService {
@Autowired
private K8sManager k8sManager;

public ResourceQuota upsertNamespaceAndResourceToK8s(K8sNamespace k8sNamespace,
String yamlStr) {
public void upsertNamespaceAndResourceToK8s(K8sNamespace k8sNamespace) {
if (!checkNamespaceToK8s(k8sNamespace.getNamespace(), k8sNamespace.getClusterCode())) {
throw new RuntimeException(String.format(
"namespace %s does not exist in k8s cluster, please create namespace in k8s cluster first",
k8sNamespace.getNamespace()));
}
return upsertNamespacedResourceToK8s(k8sNamespace, yamlStr);
}

public Optional<Namespace> deleteNamespaceToK8s(String name, Long clusterCode) {
Expand All @@ -65,33 +62,6 @@ public Optional<Namespace> deleteNamespaceToK8s(String name, Long clusterCode) {
return getNamespaceFromK8s(name, clusterCode);
}

private ResourceQuota upsertNamespacedResourceToK8s(K8sNamespace k8sNamespace,
String yamlStr) {

KubernetesClient client = k8sManager.getK8sClient(k8sNamespace.getClusterCode());

// 创建资源
ResourceQuota queryExist = client.resourceQuotas()
.inNamespace(k8sNamespace.getNamespace())
.withName(k8sNamespace.getNamespace())
.get();

ResourceQuota body = yaml.loadAs(yamlStr, ResourceQuota.class);

if (queryExist != null) {
if (k8sNamespace.getLimitsCpu() == null && k8sNamespace.getLimitsMemory() == null) {
client.resourceQuotas().inNamespace(k8sNamespace.getNamespace())
.withName(k8sNamespace.getNamespace())
.delete();
return null;
}
}

return client.resourceQuotas().inNamespace(k8sNamespace.getNamespace())
.withName(k8sNamespace.getNamespace())
.createOrReplace(body);
}

private Optional<Namespace> getNamespaceFromK8s(String name, Long clusterCode) {
NamespaceList listNamespace =
k8sManager.getK8sClient(clusterCode).namespaces().list();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,24 +46,9 @@ public interface K8sNamespaceService {
* @param loginUser login user
* @param namespace namespace
* @param clusterCode k8s not null
* @param limitsCpu limits cpu, can null means not limit
* @param limitsMemory limits memory, can null means not limit
* @return
*/
Map<String, Object> createK8sNamespace(User loginUser, String namespace, Long clusterCode, Double limitsCpu,
Integer limitsMemory);

/**
* update K8s Namespace tag and resource limit
*
* @param loginUser login user
* @param userName owner
* @param limitsCpu max cpu
* @param limitsMemory max memory
* @return
*/
Map<String, Object> updateK8sNamespace(User loginUser, int id, String userName, Double limitsCpu,
Integer limitsMemory);
Map<String, Object> createK8sNamespace(User loginUser, String namespace, Long clusterCode);

/**
* verify namespace and k8s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,6 @@
@Slf4j
public class K8SNamespaceServiceImpl extends BaseServiceImpl implements K8sNamespaceService {

private static String resourceYaml = "apiVersion: v1\n"
+ "kind: ResourceQuota\n"
+ "metadata:\n"
+ " name: ${name}\n"
+ " namespace: ${namespace}\n"
+ "spec:\n"
+ " hard:\n"
+ " ${limitCpu}\n"
+ " ${limitMemory}\n";

@Autowired
private K8sNamespaceMapper k8sNamespaceMapper;

Expand Down Expand Up @@ -114,13 +104,10 @@ public Result queryListPaging(User loginUser, String searchVal, Integer pageNo,
* @param loginUser login user
* @param namespace namespace
* @param clusterCode k8s not null
* @param limitsCpu limits cpu, can null means not limit
* @param limitsMemory limits memory, can null means not limit
* @return
*/
@Override
public Map<String, Object> createK8sNamespace(User loginUser, String namespace, Long clusterCode, Double limitsCpu,
Integer limitsMemory) {
public Map<String, Object> createK8sNamespace(User loginUser, String namespace, Long clusterCode) {
Map<String, Object> result = new HashMap<>();
if (isNotAdmin(loginUser, result)) {
log.warn("Only admin can create K8s namespace, current login user name:{}.", loginUser.getUserName());
Expand All @@ -139,18 +126,6 @@ public Map<String, Object> createK8sNamespace(User loginUser, String namespace,
return result;
}

if (limitsCpu != null && limitsCpu < 0.0) {
log.warn("Parameter limitsCpu is invalid.");
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.LIMITS_CPU);
return result;
}

if (limitsMemory != null && limitsMemory < 0) {
log.warn("Parameter limitsMemory is invalid.");
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.LIMITS_MEMORY);
return result;
}

if (checkNamespaceExistInDb(namespace, clusterCode)) {
log.warn("K8S namespace already exists.");
putMsg(result, Status.K8S_NAMESPACE_EXIST, namespace, clusterCode);
Expand Down Expand Up @@ -183,18 +158,12 @@ public Map<String, Object> createK8sNamespace(User loginUser, String namespace,
k8sNamespaceObj.setNamespace(namespace);
k8sNamespaceObj.setClusterCode(clusterCode);
k8sNamespaceObj.setUserId(loginUser.getId());
k8sNamespaceObj.setLimitsCpu(limitsCpu);
k8sNamespaceObj.setLimitsMemory(limitsMemory);
k8sNamespaceObj.setPodReplicas(0);
k8sNamespaceObj.setPodRequestCpu(0.0);
k8sNamespaceObj.setPodRequestMemory(0);
k8sNamespaceObj.setCreateTime(now);
k8sNamespaceObj.setUpdateTime(now);

if (!Constants.K8S_LOCAL_TEST_CLUSTER_CODE.equals(k8sNamespaceObj.getClusterCode())) {
try {
String yamlStr = genDefaultResourceYaml(k8sNamespaceObj);
k8sClientService.upsertNamespaceAndResourceToK8s(k8sNamespaceObj, yamlStr);
k8sClientService.upsertNamespaceAndResourceToK8s(k8sNamespaceObj);
} catch (Exception e) {
log.error("Namespace create to k8s error", e);
putMsg(result, Status.K8S_CLIENT_OPS_ERROR, e.getMessage());
Expand All @@ -209,65 +178,6 @@ public Map<String, Object> createK8sNamespace(User loginUser, String namespace,
return result;
}

/**
* update K8s Namespace tag and resource limit
*
* @param loginUser login user
* @param userName owner
* @param limitsCpu max cpu
* @param limitsMemory max memory
* @return
*/
@Override
public Map<String, Object> updateK8sNamespace(User loginUser, int id, String userName, Double limitsCpu,
Integer limitsMemory) {
Map<String, Object> result = new HashMap<>();
if (isNotAdmin(loginUser, result)) {
log.warn("Only admin can update K8s namespace, current login user name:{}.", loginUser.getUserName());
return result;
}

if (limitsCpu != null && limitsCpu < 0.0) {
log.warn("Parameter limitsCpu is invalid.");
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.LIMITS_CPU);
return result;
}

if (limitsMemory != null && limitsMemory < 0) {
log.warn("Parameter limitsMemory is invalid.");
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.LIMITS_MEMORY);
return result;
}

K8sNamespace k8sNamespaceObj = k8sNamespaceMapper.selectById(id);
if (k8sNamespaceObj == null) {
log.error("K8s namespace does not exist, namespaceId:{}.", id);
putMsg(result, Status.K8S_NAMESPACE_NOT_EXIST, id);
return result;
}

Date now = new Date();
k8sNamespaceObj.setLimitsCpu(limitsCpu);
k8sNamespaceObj.setLimitsMemory(limitsMemory);
k8sNamespaceObj.setUpdateTime(now);

if (!Constants.K8S_LOCAL_TEST_CLUSTER_CODE.equals(k8sNamespaceObj.getClusterCode())) {
try {
String yamlStr = genDefaultResourceYaml(k8sNamespaceObj);
k8sClientService.upsertNamespaceAndResourceToK8s(k8sNamespaceObj, yamlStr);
} catch (Exception e) {
log.error("Namespace update to k8s error", e);
putMsg(result, Status.K8S_CLIENT_OPS_ERROR, e.getMessage());
return result;
}
}
// update to db
k8sNamespaceMapper.updateById(k8sNamespaceObj);
log.info("K8s namespace update complete, namespace:{}.", k8sNamespaceObj.getNamespace());
putMsg(result, Status.SUCCESS);
return result;
}

/**
* verify namespace and k8s
*
Expand Down Expand Up @@ -338,42 +248,6 @@ private boolean checkNamespaceExistInDb(String namespace, Long clusterCode) {
return k8sNamespaceMapper.existNamespace(namespace, clusterCode) == Boolean.TRUE;
}

/**
* use cpu memory create yaml
*
* @param k8sNamespace
* @return yaml file
*/
private String genDefaultResourceYaml(K8sNamespace k8sNamespace) {
// resource use same name with namespace
String name = k8sNamespace.getNamespace();
String namespace = k8sNamespace.getNamespace();
String cpuStr = null;
if (k8sNamespace.getLimitsCpu() != null) {
cpuStr = k8sNamespace.getLimitsCpu() + "";
}

String memoryStr = null;
if (k8sNamespace.getLimitsMemory() != null) {
memoryStr = k8sNamespace.getLimitsMemory() + "Gi";
}

String result = resourceYaml.replace("${name}", name)
.replace("${namespace}", namespace);
if (cpuStr == null) {
result = result.replace("${limitCpu}", "");
} else {
result = result.replace("${limitCpu}", "limits.cpu: '" + cpuStr + "'");
}

if (memoryStr == null) {
result = result.replace("${limitMemory}", "");
} else {
result = result.replace("${limitMemory}", "limits.memory: " + memoryStr);
}
return result;
}

/**
* query unauthorized namespace
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.put;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;

Expand Down Expand Up @@ -83,24 +82,6 @@ public void createNamespace() throws Exception {
logger.info("create queue return result:{}", mvcResult.getResponse().getContentAsString());
}

@Test
public void updateNamespace() throws Exception {
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("id", "1");
paramsMap.add("owner", "owmer1");
paramsMap.add("tag", "flink");

MvcResult mvcResult = mockMvc.perform(put("/k8s-namespace/{id}", 1)
.header(SESSION_ID, sessionId)
.params(paramsMap))
.andExpect(status().isCreated())
.andExpect(content().contentType(MediaType.APPLICATION_JSON))
.andReturn();
Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class);
Assertions.assertEquals(Status.SUCCESS.getCode(), result.getCode().intValue());
logger.info("update queue return result:{}", mvcResult.getResponse().getContentAsString());
}

@Test
public void verifyNamespace() throws Exception {
// queue value exist
Expand Down
Loading

0 comments on commit 5d03771

Please sign in to comment.