Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improvement-#14995] get namespace in real time #15106

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,6 @@ protected TaskExecutionContext getTaskExecutionContext(TaskInstance taskInstance

TaskChannel taskChannel = taskPluginManager.getTaskChannel(taskInstance.getTaskType());
ResourceParametersHelper resources = taskChannel.getResources(taskInstance.getTaskParams());

AbstractParameters baseParam = taskPluginManager.getParameters(
ParametersNode.builder()
.taskType(taskInstance.getTaskType())
Expand All @@ -348,7 +347,7 @@ protected TaskExecutionContext getTaskExecutionContext(TaskInstance taskInstance
// process instance id default 0
taskExecutionContext.setProcessInstanceId(0);
taskExecutionContextFactory.setDataQualityTaskExecutionContext(taskExecutionContext, taskInstance, tenantCode);
taskExecutionContextFactory.setK8sTaskRelatedInfo(taskExecutionContext, taskInstance);
taskExecutionContextFactory.setK8sTaskRelation(taskExecutionContext, taskInstance);
return taskExecutionContext;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,22 @@
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TARGET_CONNECTOR_TYPE;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TARGET_DATASOURCE_ID;

import org.apache.commons.lang3.ObjectUtils;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.dao.entity.Cluster;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.DqComparisonType;
import org.apache.dolphinscheduler.dao.entity.DqRule;
import org.apache.dolphinscheduler.dao.entity.DqRuleExecuteSql;
import org.apache.dolphinscheduler.dao.entity.DqRuleInputEntry;
import org.apache.dolphinscheduler.dao.entity.K8sNamespace;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.dao.mapper.ClusterMapper;
import org.apache.dolphinscheduler.dao.mapper.K8sNamespaceMapper;
import org.apache.dolphinscheduler.plugin.task.api.DataQualityTaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
Expand All @@ -59,10 +64,7 @@
import org.apache.dolphinscheduler.plugin.task.api.parameters.K8sTaskParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
import org.apache.dolphinscheduler.plugin.task.api.parameters.dataquality.DataQualityParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.AbstractResourceParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.DataSourceParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.UdfFuncParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.*;
import org.apache.dolphinscheduler.plugin.task.api.utils.JdbcUrlParser;
import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
import org.apache.dolphinscheduler.plugin.task.spark.SparkParameters;
Expand All @@ -84,6 +86,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;

import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -111,6 +114,12 @@ public class TaskExecutionContextFactory {
@Autowired
private HikariDataSource hikariDataSource;

@Autowired
private K8sNamespaceMapper k8sNamespaceMapper;

@Autowired
private ClusterMapper clusterMapper;

public TaskExecutionContext createTaskExecutionContext(TaskInstance taskInstance) throws TaskExecutionContextCreateException {
ProcessInstance workflowInstance = taskInstance.getProcessInstance();
taskInstance.setResources(getResourceFullNames(taskInstance));
Expand Down Expand Up @@ -139,7 +148,7 @@ public TaskExecutionContext createTaskExecutionContext(TaskInstance taskInstance
.create();

setDataQualityTaskExecutionContext(taskExecutionContext, taskInstance, workflowInstance.getTenantCode());
setK8sTaskRelatedInfo(taskExecutionContext, taskInstance);
setK8sTaskRelation(taskExecutionContext, taskInstance);
return taskExecutionContext;
}

Expand All @@ -154,11 +163,6 @@ public void setDataQualityTaskExecutionContext(TaskExecutionContext taskExecutio
taskExecutionContext.setDataQualityTaskExecutionContext(dataQualityTaskExecutionContext);
}

public void setK8sTaskRelatedInfo(TaskExecutionContext taskExecutionContext, TaskInstance taskInstance) {
K8sTaskExecutionContext k8sTaskExecutionContext = setK8sTaskRelation(taskInstance);
taskExecutionContext.setK8sTaskExecutionContext(k8sTaskExecutionContext);
}

private Map<String, String> getResourceFullNames(TaskInstance taskInstance) {
Map<String, String> resourcesMap = new HashMap<>();
AbstractParameters baseParam = taskPluginManager.getParameters(ParametersNode.builder()
Expand Down Expand Up @@ -187,6 +191,9 @@ private void setTaskResourceInfo(ResourceParametersHelper resourceParametersHelp
case UDF:
setTaskUdfFuncResourceInfo(map);
break;
case K8S_NAMESPACE:
setTaskK8SNamespaceResourceInfo(map);
break;
default:
break;
}
Expand Down Expand Up @@ -226,6 +233,26 @@ private void setTaskUdfFuncResourceInfo(Map<Integer, AbstractResourceParameters>
});
}

private void setTaskK8SNamespaceResourceInfo(Map<Integer, AbstractResourceParameters> map) {
if (MapUtils.isEmpty(map)) {
return;
}
List<K8sNamespace> k8sNamespaceUsers = k8sNamespaceMapper.selectBatchIds(map.keySet().stream().collect(Collectors.toList()));
k8sNamespaceUsers.forEach(k8sNamespace -> {
Cluster cluster = clusterMapper.selectById(k8sNamespace.getClusterCode());
if(ObjectUtils.isEmpty(cluster)){
return;
}
K8sNamespaceParameters k8sNamespaceParameters = new K8sNamespaceParameters();
k8sNamespaceParameters.setId(k8sNamespace.getId());
k8sNamespaceParameters.setNamespace(k8sNamespace.getNamespace());
k8sNamespaceParameters.setClusterName(cluster.getName());
k8sNamespaceParameters.setConfigYaml(cluster.getConfig());
map.put(k8sNamespace.getId(), k8sNamespaceParameters);
});

}

private void setDataQualityTaskRelation(DataQualityTaskExecutionContext dataQualityTaskExecutionContext,
TaskInstance taskInstance, String tenantCode) {
DataQualityParameters dataQualityParameters =
Expand Down Expand Up @@ -270,36 +297,28 @@ private void setDataQualityTaskRelation(DataQualityTaskExecutionContext dataQual
setStatisticsValueWriterConfig(dataQualityTaskExecutionContext);
}

private K8sTaskExecutionContext setK8sTaskRelation(TaskInstance taskInstance) {
public void setK8sTaskRelation(TaskExecutionContext taskExecutionContext, TaskInstance taskInstance) {
K8sTaskExecutionContext k8sTaskExecutionContext = null;
String namespace = "";
String namespaceId = "";
switch (taskInstance.getTaskType()) {
case "K8S":
case "KUBEFLOW":
K8sTaskParameters k8sTaskParameters =
JSONUtils.parseObject(taskInstance.getTaskParams(), K8sTaskParameters.class);
namespace = k8sTaskParameters.getNamespace();
break;
case "SPARK":
SparkParameters sparkParameters =
JSONUtils.parseObject(taskInstance.getTaskParams(), SparkParameters.class);
if (StringUtils.isNotEmpty(sparkParameters.getNamespace())) {
namespace = sparkParameters.getNamespace();
}
Map <String,Object> map = JSONUtils.parseObject(taskInstance.getTaskParams(), Map.class);
namespaceId = null == map.get("namespaceId")? null:map.get("namespaceId").toString();
break;
default:
break;
}

if (StringUtils.isNotEmpty(namespace)) {
String clusterName = JSONUtils.toMap(namespace).get(CLUSTER);
String configYaml = processService.findConfigYamlByName(clusterName);
if (configYaml != null) {
if (StringUtils.isNotEmpty(namespaceId)) {
K8sNamespaceParameters k8sNamespaceParameters = (K8sNamespaceParameters) taskExecutionContext.getResourceParametersHelper().getResourceMap(org.apache.dolphinscheduler.plugin.task.api.enums.ResourceType.K8S_NAMESPACE).get(namespaceId);
if (StringUtils.isNotEmpty(k8sNamespaceParameters.getConfigYaml())) {
k8sTaskExecutionContext =
new K8sTaskExecutionContext(configYaml, JSONUtils.toMap(namespace).get(NAMESPACE_NAME));
new K8sTaskExecutionContext(k8sNamespaceParameters.getConfigYaml(), k8sNamespaceParameters.getNamespace());
}
}
return k8sTaskExecutionContext;
taskExecutionContext.setK8sTaskExecutionContext(k8sTaskExecutionContext);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@
public enum ResourceType {

DATASOURCE,
UDF;
UDF,
K8S_NAMESPACE;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.dolphinscheduler.plugin.task.api.parameters;

import org.apache.dolphinscheduler.plugin.task.api.enums.ResourceType;
import org.apache.dolphinscheduler.plugin.task.api.model.Label;
import org.apache.dolphinscheduler.plugin.task.api.model.NodeSelectorExpression;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
Expand All @@ -27,6 +28,8 @@
import java.util.List;

import lombok.Data;
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
import org.apache.dolphinscheduler.spi.enums.Flag;

/**
* k8s task parameters
Expand All @@ -35,7 +38,7 @@
public class K8sTaskParameters extends AbstractParameters {

private String image;
private String namespace;
private int namespaceId;
private String command;
private String args;
private String pullSecret;
Expand All @@ -47,11 +50,22 @@ public class K8sTaskParameters extends AbstractParameters {

@Override
public boolean checkParameters() {
return StringUtils.isNotEmpty(image) && StringUtils.isNotEmpty(namespace);
return StringUtils.isNotEmpty(image) && 0!= namespaceId;
}

@Override
public List<ResourceInfo> getResourceFilesList() {
return new ArrayList<>();
}

@Override
public ResourceParametersHelper getResources() {
ResourceParametersHelper resources = super.getResources();

if (0 == namespaceId) {
return resources;
}
resources.put(ResourceType.K8S_NAMESPACE, namespaceId);
return resources;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, visible = true, property = "resourceType")
@JsonSubTypes({
@Type(value = DataSourceParameters.class, name = "DATASOURCE"),
@Type(value = UdfFuncParameters.class, name = "UDF")
@Type(value = UdfFuncParameters.class, name = "UDF"),
@Type(value = K8sNamespaceParameters.class, name = "K8S_NAMESPACE")
})
public abstract class AbstractResourceParameters {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.plugin.task.api.parameters.resource;

import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.UdfType;

import java.util.Date;

@Data
public class K8sNamespaceParameters extends AbstractResourceParameters {

/**
* id
*/
private int id;

@JsonProperty(value = "K8S_NAMESPACE")
private String resourceType;

/**
* namespace
*/
private String namespace;

/**
* cluster name
*/
private String clusterName;

/**
* cluster config yaml
*/
private String configYaml;

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.ResourceType;
import org.apache.dolphinscheduler.plugin.task.api.k8s.AbstractK8sTask;
import org.apache.dolphinscheduler.plugin.task.api.k8s.K8sTaskMainParameters;
import org.apache.dolphinscheduler.plugin.task.api.model.Label;
import org.apache.dolphinscheduler.plugin.task.api.model.NodeSelectorExpression;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.K8sTaskParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.K8sNamespaceParameters;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;

import org.apache.commons.collections4.CollectionUtils;
Expand Down Expand Up @@ -83,9 +85,9 @@ public AbstractParameters getParameters() {
protected String buildCommand() {
K8sTaskMainParameters k8sTaskMainParameters = new K8sTaskMainParameters();
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
Map<String, String> namespace = JSONUtils.toMap(k8sTaskParameters.getNamespace());
String namespaceName = namespace.get(NAMESPACE_NAME);
String clusterName = namespace.get(CLUSTER);
K8sNamespaceParameters k8sNamespaceParameters = (K8sNamespaceParameters) k8sTaskParameters.getResources().getResourceMap(ResourceType.K8S_NAMESPACE).get(k8sTaskParameters.getNamespaceId());
String namespaceName = k8sNamespaceParameters.getNamespace();
String clusterName = k8sNamespaceParameters.getClusterName();
k8sTaskMainParameters.setImage(k8sTaskParameters.getImage());
k8sTaskMainParameters.setPullSecret(k8sTaskParameters.getPullSecret());
k8sTaskMainParameters.setNamespaceName(namespaceName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public AbstractParameters parseParameters(ParametersNode parametersNode) {

@Override
public ResourceParametersHelper getResources(String parameters) {
return null;
return JSONUtils.parseObject(parameters, K8sTaskParameters.class).getResources();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class K8sParametersTest {
private K8sTaskParameters k8sTaskParameters = null;
private final String image = "ds-dev";
private final String imagePullPolicy = "IfNotPresent";
private final String namespace = "{\"name\":\"default\",\"cluster\":\"lab\"}";
private final int namespaceId = 1;
private final double minCpuCores = 2;
private final double minMemorySpace = 10;
private final String command = "[\"/bin/bash\", \"-c\"]";
Expand All @@ -47,7 +47,7 @@ public void before() {
k8sTaskParameters = new K8sTaskParameters();
k8sTaskParameters.setImage(image);
k8sTaskParameters.setImagePullPolicy(imagePullPolicy);
k8sTaskParameters.setNamespace(namespace);
k8sTaskParameters.setNamespaceId(namespaceId);
k8sTaskParameters.setMinCpuCores(minCpuCores);
k8sTaskParameters.setMinMemorySpace(minMemorySpace);
k8sTaskParameters.setCommand(command);
Expand All @@ -71,7 +71,7 @@ public void testGetResourceFilesListNormal() {
public void testK8sParameters() {
Assertions.assertEquals(image, k8sTaskParameters.getImage());
Assertions.assertEquals(imagePullPolicy, k8sTaskParameters.getImagePullPolicy());
Assertions.assertEquals(namespace, k8sTaskParameters.getNamespace());
Assertions.assertEquals(namespaceId, k8sTaskParameters.getNamespaceId());
Assertions.assertEquals(0, Double.compare(minCpuCores, k8sTaskParameters.getMinCpuCores()));
Assertions.assertEquals(0, Double.compare(minMemorySpace, k8sTaskParameters.getMinMemorySpace()));
Assertions.assertEquals(command, k8sTaskParameters.getCommand());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ public class KubeflowParameters extends AbstractParameters {

private String clusterYAML;

private int namespaceId;

public boolean checkParameters() {
return StringUtils.isNotEmpty(yamlContent);
return StringUtils.isNotEmpty(yamlContent) && 0 != namespaceId;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,6 @@ public AbstractParameters parseParameters(ParametersNode parametersNode) {

@Override
public ResourceParametersHelper getResources(String parameters) {
return null;
return JSONUtils.parseObject(parameters, KubeflowParameters.class).getResources();
}
}
Loading
Loading