diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/main/java/org/apache/seatunnel/datasource/plugin/elasticsearch/client/EsRestClient.java b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/main/java/org/apache/seatunnel/datasource/plugin/elasticsearch/client/EsRestClient.java index 4de06bf4a..01d20bb28 100644 --- a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/main/java/org/apache/seatunnel/datasource/plugin/elasticsearch/client/EsRestClient.java +++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/main/java/org/apache/seatunnel/datasource/plugin/elasticsearch/client/EsRestClient.java @@ -18,7 +18,6 @@ package org.apache.seatunnel.datasource.plugin.elasticsearch.client; import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode; -import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.seatunnel.shade.com.typesafe.config.Config; @@ -62,8 +61,6 @@ public class EsRestClient implements AutoCloseable { private final RestClient restClient; - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private EsRestClient(RestClient restClient) { this.restClient = restClient; } @@ -71,9 +68,9 @@ private EsRestClient(RestClient restClient) { public static EsRestClient createInstance(Config pluginConfig) { try { List hosts = - OBJECT_MAPPER.readValue( + JsonUtils.toList( pluginConfig.getString(ElasticSearchOptionRule.HOSTS.key()), - List.class); + String.class); Optional username = Optional.empty(); Optional password = Optional.empty(); if (pluginConfig.hasPath(ElasticSearchOptionRule.USERNAME.key())) { @@ -209,7 +206,7 @@ private static RestClientBuilder getRestClientBuilder( keystorePassword, truststorePath, truststorePassword); - sslContext.ifPresent(e -> httpClientBuilder.setSSLContext(e)); + sslContext.ifPresent(httpClientBuilder::setSSLContext); } else { SSLContext sslContext = SSLContexts.custom() @@ -233,15 +230,13 @@ public ElasticsearchClusterInfo getClusterInfo() { try { Response response = restClient.performRequest(request); String result = EntityUtils.toString(response.getEntity()); - ObjectMapper objectMapper = new ObjectMapper(); - JsonNode jsonNode = objectMapper.readTree(result); - JsonNode versionNode = jsonNode.get("version"); + String version = JsonUtils.findValue(JsonUtils.stringToJsonNode(result), "version"); + String number = JsonUtils.findValue(JsonUtils.stringToJsonNode(version), "number"); + String distribution = + JsonUtils.findValue(JsonUtils.stringToJsonNode(version), "distribution"); return ElasticsearchClusterInfo.builder() - .clusterVersion(versionNode.get("number").asText()) - .distribution( - Optional.ofNullable(versionNode.get("distribution")) - .map(JsonNode::asText) - .orElse(null)) + .clusterVersion(number) + .distribution(Optional.ofNullable(distribution).orElse(null)) .build(); } catch (IOException e) { throw new ResponseException("fail to get elasticsearch version.", e); diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-starrocks/src/main/java/org/apache/seatunnel/datasource/plugin/starrocks/StarRocksDataSourceChannel.java b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-starrocks/src/main/java/org/apache/seatunnel/datasource/plugin/starrocks/StarRocksDataSourceChannel.java index 9ae74fff5..af385bbc1 100644 --- a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-starrocks/src/main/java/org/apache/seatunnel/datasource/plugin/starrocks/StarRocksDataSourceChannel.java +++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-starrocks/src/main/java/org/apache/seatunnel/datasource/plugin/starrocks/StarRocksDataSourceChannel.java @@ -17,10 +17,9 @@ package org.apache.seatunnel.datasource.plugin.starrocks; -import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper; - import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.common.utils.JsonUtils; import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel; import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException; import org.apache.seatunnel.datasource.plugin.api.model.TableField; @@ -41,8 +40,6 @@ public class StarRocksDataSourceChannel implements DataSourceChannel { private static final Logger LOGGER = LoggerFactory.getLogger(StarRocksDataSourceChannel.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - @Override public boolean canAbleGetSchema() { return true; @@ -81,7 +78,7 @@ public boolean checkDataSourceConnectivity( try { StarRocksCatalog catalog = getCatalog(requestParams); String nodeUrls = requestParams.get(StarRocksOptionRule.NODE_URLS.key()); - List nodeList = OBJECT_MAPPER.readValue(nodeUrls, List.class); + List nodeList = JsonUtils.toList(nodeUrls, String.class); if (!telnet(nodeList.get(0))) { return false; } diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/SeatunnelDatasourceController.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/SeatunnelDatasourceController.java index bb6823125..37746b968 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/SeatunnelDatasourceController.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/SeatunnelDatasourceController.java @@ -31,8 +31,8 @@ import org.apache.seatunnel.app.domain.response.datasource.DatasourceRes; import org.apache.seatunnel.app.service.IDatasourceService; import org.apache.seatunnel.app.utils.CartesianProductUtils; -import org.apache.seatunnel.app.utils.JSONUtils; import org.apache.seatunnel.app.utils.PropertyUtils; +import org.apache.seatunnel.common.utils.JsonUtils; import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginInfo; import org.apache.seatunnel.datasource.plugin.api.model.TableField; import org.apache.seatunnel.server.common.SeatunnelErrorEnum; @@ -116,7 +116,7 @@ Result createDatasource( @ApiIgnore @RequestAttribute(value = SESSION_USER) User loginUser, @RequestBody DatasourceReq req) { String datasourceConfig = req.getDatasourceConfig(); - Map stringStringMap = JSONUtils.toMap(datasourceConfig); + Map stringStringMap = JsonUtils.toMap(datasourceConfig); return Result.success( datasourceService.createDatasource( loginUser.getId(), @@ -146,8 +146,6 @@ Result createDatasource( Result testConnect( @ApiIgnore @RequestAttribute(value = SESSION_USER) User loginUser, @RequestBody DatasourceCheckReq req) { - - // Map stringStringMap = JSONUtils.toMap(req.getDatasourceConfig()); return Result.success( datasourceService.testDatasourceConnectionAble( loginUser.getId(), @@ -182,7 +180,7 @@ Result updateDatasource( @ApiIgnore @RequestAttribute(value = SESSION_USER) User loginUser, @PathVariable("id") String id, @RequestBody DatasourceReq req) { - Map stringStringMap = JSONUtils.toMap(req.getDatasourceConfig()); + Map stringStringMap = JsonUtils.toMap(req.getDatasourceConfig()); Long datasourceId = Long.parseLong(id); return Result.success( datasourceService.updateDatasource( diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/ProcessTaskRelation.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/ProcessTaskRelation.java index be19c9b0d..19c6e115d 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/ProcessTaskRelation.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/ProcessTaskRelation.java @@ -17,14 +17,15 @@ package org.apache.seatunnel.app.dal.entity; +import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.annotation.JsonSerialize; + import org.apache.seatunnel.app.common.ConditionType; -import org.apache.seatunnel.app.utils.JSONUtils; +import org.apache.seatunnel.common.utils.JsonUtils; import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; -import com.fasterxml.jackson.databind.annotation.JsonDeserialize; -import com.fasterxml.jackson.databind.annotation.JsonSerialize; import lombok.Data; import java.util.Date; @@ -66,8 +67,8 @@ public class ProcessTaskRelation { private ConditionType conditionType; /** condition parameters */ - @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class) - @JsonSerialize(using = JSONUtils.JsonDataSerializer.class) + @JsonDeserialize(using = JsonUtils.JsonDataDeserializer.class) + @JsonSerialize(using = JsonUtils.JsonDataSerializer.class) private String conditionParams; /** create time */ diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/TaskDefinition.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/TaskDefinition.java index a56b6c6c4..41c268309 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/TaskDefinition.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/TaskDefinition.java @@ -17,13 +17,18 @@ package org.apache.seatunnel.app.dal.entity; +import org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode; +import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.annotation.JsonSerialize; + import org.apache.seatunnel.app.common.Constants; import org.apache.seatunnel.app.common.Flag; import org.apache.seatunnel.app.common.Priority; import org.apache.seatunnel.app.common.TaskTimeoutStrategy; import org.apache.seatunnel.app.common.TimeoutFlag; import org.apache.seatunnel.app.domain.model.Property; -import org.apache.seatunnel.app.utils.JSONUtils; +import org.apache.seatunnel.common.utils.JsonUtils; import org.apache.commons.collections4.CollectionUtils; @@ -32,9 +37,6 @@ import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.annotation.JsonDeserialize; -import com.fasterxml.jackson.databind.annotation.JsonSerialize; import com.google.common.base.Strings; import java.util.Date; @@ -73,8 +75,8 @@ public class TaskDefinition { private String taskType; /** user defined parameters */ - @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class) - @JsonSerialize(using = JSONUtils.JsonDataSerializer.class) + @JsonDeserialize(using = JsonUtils.JsonDataDeserializer.class) + @JsonSerialize(using = JsonUtils.JsonDataSerializer.class) private String taskParams; /** user defined parameter list */ @@ -230,9 +232,9 @@ public void setTaskParams(String taskParams) { } public List getTaskParamList() { - JsonNode localParams = JSONUtils.parseObject(taskParams).findValue("localParams"); + JsonNode localParams = JsonUtils.parseObject(taskParams).findValue("localParams"); if (localParams != null) { - taskParamList = JSONUtils.toList(localParams.toString(), Property.class); + taskParamList = JsonUtils.toList(localParams.toString(), Property.class); } return taskParamList; @@ -248,12 +250,12 @@ public void setTaskParamMap(Map taskParamMap) { public Map getTaskParamMap() { if (taskParamMap == null && !Strings.isNullOrEmpty(taskParams)) { - JsonNode localParams = JSONUtils.parseObject(taskParams).findValue("localParams"); + JsonNode localParams = JsonUtils.parseObject(taskParams).findValue("localParams"); // If a jsonNode is null, not only use !=null, but also it should use the isNull method // to be estimated. if (localParams != null && !localParams.isNull()) { - List propList = JSONUtils.toList(localParams.toString(), Property.class); + List propList = JsonUtils.toList(localParams.toString(), Property.class); if (CollectionUtils.isNotEmpty(propList)) { taskParamMap = new HashMap<>(); @@ -379,7 +381,12 @@ public void setDelayTime(int delayTime) { } public String getDependence() { - return JSONUtils.getNodeString(this.taskParams, Constants.DEPENDENCE); + try { + JsonNode jsonNode = JsonUtils.stringToJsonNode(this.taskParams); + return JsonUtils.findValue(jsonNode, Constants.DEPENDENCE); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } } public String getModifyBy() { diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/TaskDefinitionExpand.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/TaskDefinitionExpand.java index a7041df17..278ea4168 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/TaskDefinitionExpand.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/TaskDefinitionExpand.java @@ -17,11 +17,11 @@ package org.apache.seatunnel.app.dal.entity; +import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode; + import org.apache.seatunnel.app.parameters.DependentParameters; import org.apache.seatunnel.app.parameters.SubProcessParameters; -import org.apache.seatunnel.app.utils.JSONUtils; - -import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.seatunnel.common.utils.JsonUtils; import static org.apache.seatunnel.app.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE; @@ -29,14 +29,14 @@ public class TaskDefinitionExpand extends TaskDefinition { public SubProcessParameters getSubProcessParameters() { String parameter = super.getTaskParams(); - ObjectNode parameterJson = JSONUtils.parseObject(parameter); + ObjectNode parameterJson = JsonUtils.parseObject(parameter); if (parameterJson.get(CMD_PARAM_SUB_PROCESS_DEFINE_CODE) != null) { - return JSONUtils.parseObject(parameter, SubProcessParameters.class); + return JsonUtils.parseObject(parameter, SubProcessParameters.class); } return null; } public DependentParameters getDependentParameters() { - return JSONUtils.parseObject(super.getDependence(), DependentParameters.class); + return JsonUtils.parseObject(super.getDependence(), DependentParameters.class); } } diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/parameters/AbstractParameters.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/parameters/AbstractParameters.java index a76e06ba1..1f4d4e645 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/parameters/AbstractParameters.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/parameters/AbstractParameters.java @@ -17,18 +17,18 @@ package org.apache.seatunnel.app.parameters; +import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode; +import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ArrayNode; + import org.apache.seatunnel.app.common.Direct; import org.apache.seatunnel.app.domain.model.Property; import org.apache.seatunnel.app.domain.model.ResourceInfo; import org.apache.seatunnel.app.parameters.resource.ResourceParametersHelper; -import org.apache.seatunnel.app.utils.JSONUtils; +import org.apache.seatunnel.common.utils.JsonUtils; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ArrayNode; - import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashMap; @@ -121,7 +121,7 @@ public void setVarPool(String varPool) { if (StringUtils.isEmpty(varPool)) { this.varPool = new ArrayList<>(); } else { - this.varPool = JSONUtils.toList(varPool, Property.class); + this.varPool = JsonUtils.toList(varPool, Property.class); } } @@ -165,9 +165,9 @@ public List getOutProperty(List params) { public List> getListMapByString(String json) { List> allParams = new ArrayList<>(); - ArrayNode paramsByJson = JSONUtils.parseArray(json); + ArrayNode paramsByJson = JsonUtils.parseArray(json); for (JsonNode jsonNode : paramsByJson) { - Map param = JSONUtils.toMap(jsonNode.toString()); + Map param = JsonUtils.toMap(jsonNode.toString()); allParams.add(param); } return allParams; diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/parameters/resource/UdfFuncParameters.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/parameters/resource/UdfFuncParameters.java index 638d5a323..c51c9ce77 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/parameters/resource/UdfFuncParameters.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/parameters/resource/UdfFuncParameters.java @@ -18,7 +18,7 @@ package org.apache.seatunnel.app.parameters.resource; import org.apache.seatunnel.app.common.UdfType; -import org.apache.seatunnel.app.utils.JSONUtils; +import org.apache.seatunnel.common.utils.JsonUtils; import com.fasterxml.jackson.annotation.JsonProperty; @@ -218,6 +218,6 @@ public int hashCode() { @Override public String toString() { - return JSONUtils.toJsonString(this); + return JsonUtils.toJsonString(this); } } diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobConfigServiceImpl.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobConfigServiceImpl.java index 46b414370..4e7cde972 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobConfigServiceImpl.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobConfigServiceImpl.java @@ -25,6 +25,7 @@ import org.apache.seatunnel.app.permission.constants.SeatunnelFuncPermissionKeyConstant; import org.apache.seatunnel.app.service.IJobConfigService; import org.apache.seatunnel.common.constants.JobMode; +import org.apache.seatunnel.common.utils.JsonUtils; import org.apache.seatunnel.server.common.SeatunnelErrorEnum; import org.apache.seatunnel.server.common.SeatunnelException; @@ -34,18 +35,11 @@ import org.springframework.transaction.annotation.Transactional; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import javax.annotation.Resource; -import java.io.IOException; -import java.util.Map; - @Service public class JobConfigServiceImpl extends SeatunnelBaseServiceImpl implements IJobConfigService { - - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static final String JOB_MODE = "job.mode"; @Resource private IJobVersionDao jobVersionDao; @@ -65,14 +59,10 @@ public JobConfigRes getJobConfig(long jobVersionId) throws JsonProcessingExcepti jobConfigRes.setName(jobDefinition.getName()); jobConfigRes.setId(jobVersion.getId()); jobConfigRes.setDescription(jobDefinition.getDescription()); - try { - jobConfigRes.setEnv( - StringUtils.isEmpty(jobVersion.getEnv()) - ? null - : OBJECT_MAPPER.readValue(jobVersion.getEnv(), Map.class)); - } catch (IOException e) { - throw new RuntimeException(e); - } + jobConfigRes.setEnv( + StringUtils.isEmpty(jobVersion.getEnv()) + ? null + : JsonUtils.toMap(jobVersion.getEnv(), String.class, Object.class)); jobConfigRes.setEngine(jobVersion.getEngineName()); return jobConfigRes; } @@ -102,7 +92,7 @@ public void updateJobConfig(int userId, long jobVersionId, JobConfig jobConfig) .jobMode(jobMode) .engineName(jobConfig.getEngine()) .updateUserId(userId) - .env(OBJECT_MAPPER.writeValueAsString(jobConfig.getEnv())) + .env(JsonUtils.toJsonString(jobConfig.getEnv())) .build()); } else { throw new SeatunnelException(SeatunnelErrorEnum.ILLEGAL_STATE, "job mode is not set"); diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobDefinitionServiceImpl.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobDefinitionServiceImpl.java index 32b2aa6b2..e74f41283 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobDefinitionServiceImpl.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobDefinitionServiceImpl.java @@ -32,6 +32,7 @@ import org.apache.seatunnel.app.permission.constants.SeatunnelFuncPermissionKeyConstant; import org.apache.seatunnel.app.service.IJobDefinitionService; import org.apache.seatunnel.common.constants.JobMode; +import org.apache.seatunnel.common.utils.JsonUtils; import org.apache.seatunnel.server.common.CodeGenerateUtils; import org.apache.seatunnel.server.common.SeatunnelErrorEnum; import org.apache.seatunnel.server.common.SeatunnelException; @@ -42,12 +43,10 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; -import com.fasterxml.jackson.databind.ObjectMapper; import lombok.NonNull; import javax.annotation.Resource; -import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -61,8 +60,6 @@ public class JobDefinitionServiceImpl extends SeatunnelBaseServiceImpl private static final String DEFAULT_VERSION = "1.0"; - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - @Resource(name = "jobDefinitionDaoImpl") private IJobDefinitionDao jobDefinitionDao; @@ -198,16 +195,11 @@ public boolean getUsedByDataSourceIdAndVirtualTable(long datasourceId, String ta .map(JobTask::getDataSourceOption) .distinct() .map( - option -> { - try { - return StringUtils.isEmpty(option) + option -> + StringUtils.isEmpty(option) ? null - : OBJECT_MAPPER.readValue( - option, DataSourceOption.class); - } catch (IOException e) { - throw new RuntimeException(e); - } - }) + : JsonUtils.parseObject( + option, DataSourceOption.class)) .filter(Objects::nonNull) .collect(Collectors.toList()); return options.stream().anyMatch(option -> option.getTables().contains(tableName)); diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java index 5b6147a29..8d0b79579 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java @@ -64,6 +64,7 @@ import org.apache.seatunnel.app.utils.SeaTunnelConfigUtil; import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.common.utils.ExceptionUtils; +import org.apache.seatunnel.common.utils.JsonUtils; import org.apache.seatunnel.engine.core.job.JobResult; import org.apache.seatunnel.server.common.CodeGenerateUtils; import org.apache.seatunnel.server.common.SeatunnelErrorEnum; @@ -75,7 +76,6 @@ import org.springframework.stereotype.Service; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; @@ -123,8 +123,6 @@ public class JobInstanceServiceImpl extends SeatunnelBaseServiceImpl @Resource private IJobMetricsService jobMetricsService; - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - @Override public JobExecutorRes createExecuteResource( @NonNull Integer userId, @NonNull Long jobDefineId, JobExecParam executeParam) { @@ -391,25 +389,18 @@ private List findInputSchemas( }); checkArgument(outputSchemas.size() == 1, "input schema size must be 1"); - try { - List databaseTableSchemaReqs = - OBJECT_MAPPER.readValue( - outputSchemas.get(0), - new com.fasterxml.jackson.core.type.TypeReference< - List>() {}); - return databaseTableSchemaReqs.stream() - .map( - databaseTableSchemaReq -> { - TableSchemaReq tableSchemaReq = new TableSchemaReq(); - tableSchemaReq.setTableName(databaseTableSchemaReq.getTableName()); - tableSchemaReq.setFields(databaseTableSchemaReq.getFields()); - return tableSchemaReq; - }) - .collect(Collectors.toList()); - - } catch (JsonProcessingException e) { - throw new SeatunnelException(SeatunnelErrorEnum.ILLEGAL_STATE, e.getMessage()); - } + List databaseTableSchemaReqs = + JsonUtils.parseObject( + outputSchemas.get(0), new TypeReference>() {}); + return databaseTableSchemaReqs.stream() + .map( + databaseTableSchemaReq -> { + TableSchemaReq tableSchemaReq = new TableSchemaReq(); + tableSchemaReq.setTableName(databaseTableSchemaReq.getTableName()); + tableSchemaReq.setFields(databaseTableSchemaReq.getFields()); + return tableSchemaReq; + }) + .collect(Collectors.toList()); } private Config mergeTaskConfig( @@ -432,28 +423,15 @@ private Config mergeTaskConfig( datasourceDetailRes.getDatasourceConfig(), optionRule); - DataSourceOption dataSourceOption = null; - try { - dataSourceOption = - task.getDataSourceOption() == null - ? null - : new ObjectMapper() - .readValue(task.getDataSourceOption(), DataSourceOption.class); - } catch (IOException e) { - throw new RuntimeException(e); - } - SelectTableFields selectTableFields = null; - try { - selectTableFields = - task.getSelectTableFields() == null - ? null - : new ObjectMapper() - .readValue( - task.getSelectTableFields(), SelectTableFields.class); - } catch (IOException e) { - throw new RuntimeException(e); - } - + DataSourceOption dataSourceOption = + task.getDataSourceOption() == null + ? null + : JsonUtils.parseObject(task.getDataSourceOption(), DataSourceOption.class); + SelectTableFields selectTableFields = + task.getSelectTableFields() == null + ? null + : JsonUtils.parseObject( + task.getSelectTableFields(), SelectTableFields.class); SceneMode sceneMode = task.getSceneMode() == null ? null : SceneMode.valueOf(task.getSceneMode()); VirtualTableDetailRes virtualTableDetailRes = null; diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobMetricsServiceImpl.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobMetricsServiceImpl.java index 947bd07e9..edf49f7fd 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobMetricsServiceImpl.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobMetricsServiceImpl.java @@ -35,6 +35,7 @@ import org.apache.seatunnel.app.thirdparty.metrics.IEngineMetricsExtractor; import org.apache.seatunnel.app.utils.JobUtils; import org.apache.seatunnel.common.constants.JobMode; +import org.apache.seatunnel.common.utils.JsonUtils; import org.apache.seatunnel.engine.core.job.JobStatus; import org.apache.seatunnel.server.common.CodeGenerateUtils; import org.apache.seatunnel.server.common.Constants; @@ -47,13 +48,11 @@ import org.springframework.stereotype.Service; -import com.fasterxml.jackson.databind.ObjectMapper; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import javax.annotation.Resource; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Date; @@ -65,9 +64,6 @@ @Service @Slf4j public class JobMetricsServiceImpl extends SeatunnelBaseServiceImpl implements IJobMetricsService { - - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - @Resource private IJobMetricsDao jobMetricsDao; @Resource private IJobInstanceHistoryDao jobInstanceHistoryDao; @@ -500,11 +496,7 @@ public JobDAG getJobDAG(@NonNull Integer userId, @NonNull Long jobInstanceId) { JobInstanceHistory history = getJobHistoryFromDb(jobInstance, userId, jobEngineId); if (history != null) { String dag = history.getDag(); - try { - return OBJECT_MAPPER.readValue(dag, JobDAG.class); - } catch (IOException e) { - throw new RuntimeException(e); - } + return JsonUtils.parseObject(dag, JobDAG.class); } Engine engine = new Engine(jobInstance.getEngineName(), jobInstance.getEngineVersion()); IEngineMetricsExtractor engineMetricsExtractor = @@ -518,11 +510,7 @@ public JobDAG getJobDAG(@NonNull Integer userId, @NonNull Long jobInstanceId) { } if (history != null) { String dag = history.getDag(); - try { - return OBJECT_MAPPER.readValue(dag, JobDAG.class); - } catch (IOException e) { - throw new RuntimeException(e); - } + return JsonUtils.parseObject(dag, JobDAG.class); } return null; } diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobTaskServiceImpl.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobTaskServiceImpl.java index abc4a7e40..107f7fb9e 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobTaskServiceImpl.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobTaskServiceImpl.java @@ -16,6 +16,7 @@ */ package org.apache.seatunnel.app.service.impl; +import org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference; import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory; import org.apache.seatunnel.app.config.ConnectorDataSourceMapperConfig; @@ -51,6 +52,7 @@ import org.apache.seatunnel.app.service.IJobTaskService; import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.common.utils.ExceptionUtils; +import org.apache.seatunnel.common.utils.JsonUtils; import org.apache.seatunnel.common.utils.SeaTunnelException; import org.apache.seatunnel.datasource.plugin.api.model.TableField; import org.apache.seatunnel.server.common.CodeGenerateUtils; @@ -62,8 +64,6 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import javax.annotation.Resource; @@ -102,8 +102,6 @@ public class JobTaskServiceImpl extends SeatunnelBaseServiceImpl implements IJob @Resource private ConnectorDataSourceMapperConfig connectorDataSourceMapperConfig; - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private void checkConfigIntegrity(JobVersion version, JobTaskInfo jobTaskInfo) { if (StringUtils.isEmpty(version.getEnv())) { throw new SeatunnelException( @@ -314,7 +312,7 @@ private JobTaskCheckRes checkNextTaskSchema( Map options = nextConfig.getTransformOptions(); if (options != null && !options.isEmpty()) { Transform transform = Transform.valueOf(nextConfig.getConnectorType().toUpperCase()); - String transformOptionsStr = OBJECT_MAPPER.writeValueAsString(options); + String transformOptionsStr = JsonUtils.toJsonString(options); List transformOptions = new ArrayList<>(); @@ -425,7 +423,7 @@ public void saveSingleTask(long jobVersionId, PluginConfig pluginConfig) { connectorType = pluginConfig.getConnectorType(); if (pluginConfig.getTransformOptions() != null) { transformOptionsStr = - OBJECT_MAPPER.writeValueAsString(pluginConfig.getTransformOptions()); + JsonUtils.toJsonString(pluginConfig.getTransformOptions()); } transformOptionCheck(connectorType, transformOptionsStr); } else { @@ -448,17 +446,16 @@ public void saveSingleTask(long jobVersionId, PluginConfig pluginConfig) { .dataSourceOption( pluginConfig.getTableOption() == null ? null - : OBJECT_MAPPER.writeValueAsString( - pluginConfig.getTableOption())) + : JsonUtils.toJsonString(pluginConfig.getTableOption())) .selectTableFields( pluginConfig.getSelectTableFields() == null ? null - : OBJECT_MAPPER.writeValueAsString( + : JsonUtils.toJsonString( pluginConfig.getSelectTableFields())) .outputSchema( pluginConfig.getOutputSchema() == null ? null - : OBJECT_MAPPER.writeValueAsString( + : JsonUtils.toJsonString( pluginConfig.getOutputSchema())) .transformOptions(transformOptionsStr) .build(); @@ -596,24 +593,24 @@ private static PluginConfig getPluginConfigFromJobTask(JobTask jobTask) { .tableOption( StringUtils.isEmpty(jobTask.getDataSourceOption()) ? null - : OBJECT_MAPPER.readValue( + : JsonUtils.parseObject( jobTask.getDataSourceOption(), DataSourceOption.class)) .selectTableFields( StringUtils.isEmpty(jobTask.getSelectTableFields()) ? null - : OBJECT_MAPPER.readValue( + : JsonUtils.parseObject( jobTask.getSelectTableFields(), SelectTableFields.class)) .outputSchema( StringUtils.isEmpty(jobTask.getOutputSchema()) ? null - : OBJECT_MAPPER.readValue( + : JsonUtils.parseObject( jobTask.getOutputSchema(), new TypeReference>() {})) .transformOptions( StringUtils.isEmpty(jobTask.getTransformOptions()) ? null - : OBJECT_MAPPER.readValue( + : JsonUtils.parseObject( jobTask.getTransformOptions(), new TypeReference>() {})) .config(jobTask.getConfig()) diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/engine/SeaTunnelEngineMetricsExtractor.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/engine/SeaTunnelEngineMetricsExtractor.java index b9003d828..cf6b9db15 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/engine/SeaTunnelEngineMetricsExtractor.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/engine/SeaTunnelEngineMetricsExtractor.java @@ -18,7 +18,6 @@ import org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode; -import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.seatunnel.app.dal.entity.JobInstanceHistory; import org.apache.seatunnel.app.dal.entity.JobMetrics; @@ -51,8 +50,6 @@ public class SeaTunnelEngineMetricsExtractor implements IEngineMetricsExtractor { @Getter @Setter private SeaTunnelEngineProxy seaTunnelEngineProxy; - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - public static final String[] clusterHealthMetricsKeys = new String[] { "processors", @@ -152,11 +149,7 @@ public LinkedHashMap getJobPipelineStatus(@NonNull String jobEn public JobInstanceHistory getJobHistoryById(String jobEngineId) { JobDAGInfo jobInfo = seaTunnelEngineProxy.getJobInfo(jobEngineId); JobInstanceHistory jobInstanceHistory = new JobInstanceHistory(); - try { - jobInstanceHistory.setDag(OBJECT_MAPPER.writeValueAsString(jobInfo)); - } catch (JsonProcessingException e) { - throw new org.apache.seatunnel.common.utils.SeaTunnelException(e); - } + jobInstanceHistory.setDag(JsonUtils.toJsonString(jobInfo)); return jobInstanceHistory; } diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JSONUtils.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JSONUtils.java deleted file mode 100644 index 83fa9dfb6..000000000 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JSONUtils.java +++ /dev/null @@ -1,396 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.app.utils; - -import org.apache.seatunnel.app.common.Constants; - -import org.apache.commons.lang3.StringUtils; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.core.util.DefaultPrettyPrinter; -import com.fasterxml.jackson.databind.DeserializationContext; -import com.fasterxml.jackson.databind.JsonDeserializer; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.JsonSerializer; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.ObjectWriter; -import com.fasterxml.jackson.databind.SerializationFeature; -import com.fasterxml.jackson.databind.SerializerProvider; -import com.fasterxml.jackson.databind.json.JsonMapper; -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.fasterxml.jackson.databind.node.TextNode; -import com.fasterxml.jackson.databind.type.CollectionType; -import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; -import lombok.experimental.UtilityClass; - -import javax.annotation.Nullable; - -import java.io.IOException; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.TimeZone; - -import static com.fasterxml.jackson.databind.DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT; -import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES; -import static com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL; -import static com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS; -import static java.nio.charset.StandardCharsets.UTF_8; - -@UtilityClass -public class JSONUtils { - - private static final Logger logger = LoggerFactory.getLogger(JSONUtils.class); - - static { - logger.info("init timezone: {}", TimeZone.getDefault()); - } - - /** can use static singleton, inject: just make sure to reuse! */ - private static final ObjectMapper objectMapper = - JsonMapper.builder() - .configure(FAIL_ON_UNKNOWN_PROPERTIES, false) - .configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true) - .configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true) - .configure(REQUIRE_SETTERS_FOR_GETTERS, true) - .disable(SerializationFeature.FAIL_ON_EMPTY_BEANS) - .addModule(new JavaTimeModule()) - .defaultTimeZone(TimeZone.getDefault()) - .defaultDateFormat(new SimpleDateFormat(Constants.YYYY_MM_DD_HH_MM_SS)) - .defaultPrettyPrinter(new DefaultPrettyPrinter()) - .build(); - - public static ArrayNode createArrayNode() { - return objectMapper.createArrayNode(); - } - - public static ObjectNode createObjectNode() { - return objectMapper.createObjectNode(); - } - - public static JsonNode toJsonNode(Object obj) { - return objectMapper.valueToTree(obj); - } - - /** - * json representation of object - * - * @param object object - * @param feature feature - * @return object to json string - */ - public static @Nullable String toJsonString(Object object, SerializationFeature feature) { - try { - ObjectWriter writer = objectMapper.writer(feature); - return writer.writeValueAsString(object); - } catch (Exception e) { - logger.error("object to json exception!, obj: {}", object, e); - } - - return null; - } - - public static String toPrettyJsonString(Object object) { - try { - return objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(object); - } catch (Exception e) { - throw new RuntimeException("Object json deserialization exception.", e); - } - } - - /** - * This method deserializes the specified Json into an object of the specified class. It is not - * suitable to use if the specified class is a generic type since it will not have the generic - * type information because of the Type Erasure feature of Java. Therefore, this method should - * not be used if the desired type is a generic type. Note that this method works fine if the - * any of the fields of the specified object are generics, just the object itself should not be - * a generic type. - * - * @param json the string from which the object is to be deserialized - * @param clazz the class of T - * @param T - * @return an object of type T from the string classOfT - */ - public static @Nullable T parseObject(String json, Class clazz) { - if (StringUtils.isEmpty(json)) { - return null; - } - - try { - return objectMapper.readValue(json, clazz); - } catch (Exception e) { - logger.error("parse object exception! json: {}", json, e); - } - return null; - } - - /** - * deserialize - * - * @param src byte array - * @param clazz class - * @param deserialize type - * @return deserialize type - */ - public static @Nullable T parseObject(byte[] src, Class clazz) { - if (src == null) { - return null; - } - String json = new String(src, UTF_8); - return parseObject(json, clazz); - } - - /** - * json to list - * - * @param json json string - * @param clazz class - * @param T - * @return list - */ - public static List toList(String json, Class clazz) { - if (StringUtils.isEmpty(json)) { - return Collections.emptyList(); - } - - try { - CollectionType listType = - objectMapper.getTypeFactory().constructCollectionType(ArrayList.class, clazz); - return objectMapper.readValue(json, listType); - } catch (Exception e) { - logger.error("parse list exception! json: {}", json, e); - } - - return Collections.emptyList(); - } - - /** - * check json object valid - * - * @param json json - * @return true if valid - */ - public static boolean checkJsonValid(String json) { - - if (StringUtils.isEmpty(json)) { - return false; - } - - try { - objectMapper.readTree(json); - return true; - } catch (IOException e) { - logger.error("check json object valid exception! json: {}", json, e); - } - - return false; - } - - /** - * Method for finding a JSON Object field with specified name in this node or its child nodes, - * and returning value it has. If no matching field is found in this node or its descendants, - * returns null. - * - * @param jsonNode json node - * @param fieldName Name of field to look for - * @return Value of first matching node found, if any; null if none - */ - public static String findValue(JsonNode jsonNode, String fieldName) { - JsonNode node = jsonNode.findValue(fieldName); - - if (node == null) { - return null; - } - - return node.asText(); - } - - /** - * json to map {@link #toMap(String, Class, Class)} - * - * @param json json - * @return json to map - */ - public static Map toMap(String json) { - return parseObject(json, new TypeReference>() {}); - } - - /** - * json to map - * - * @param json json - * @param classK classK - * @param classV classV - * @param K - * @param V - * @return to map - */ - public static Map toMap(String json, Class classK, Class classV) { - if (StringUtils.isEmpty(json)) { - return Collections.emptyMap(); - } - - try { - return objectMapper.readValue(json, new TypeReference>() {}); - } catch (Exception e) { - logger.error("json to map exception! json: {}", json, e); - } - - return Collections.emptyMap(); - } - - /** - * from the key-value generated json to get the str value no matter the real type of value - * - * @param json the json str - * @param nodeName key - * @return the str value of key - */ - public static String getNodeString(String json, String nodeName) { - try { - JsonNode rootNode = objectMapper.readTree(json); - JsonNode jsonNode = rootNode.findValue(nodeName); - if (Objects.isNull(jsonNode)) { - return ""; - } - return jsonNode.isTextual() ? jsonNode.asText() : jsonNode.toString(); - } catch (JsonProcessingException e) { - logger.info("Json deserialize error, json: {}", json, e); - return ""; - } - } - - /** - * json to object - * - * @param json json string - * @param type type reference - * @param - * @return return parse object - */ - public static @Nullable T parseObject(String json, TypeReference type) { - if (StringUtils.isEmpty(json)) { - return null; - } - - try { - return objectMapper.readValue(json, type); - } catch (Exception e) { - logger.error("json to map exception!, json: {}", json, e); - } - - return null; - } - - /** - * object to json string - * - * @param object object - * @return json string - */ - public static String toJsonString(Object object) { - try { - return objectMapper.writeValueAsString(object); - } catch (Exception e) { - throw new RuntimeException("Object json deserialization exception. obj: " + object, e); - } - } - - public static String writeAsPrettyString(Object object) { - try { - return objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(object); - } catch (Exception ex) { - throw new RuntimeException("Object json write as pretty string error, obj: " + object); - } - } - - /** - * serialize to json byte - * - * @param obj object - * @param object type - * @return byte array - */ - public static @Nullable byte[] toJsonByteArray(T obj) { - if (obj == null) { - return null; - } - String json = ""; - try { - json = toJsonString(obj); - } catch (Exception e) { - logger.error("json serialize exception. obj: {}", obj, e); - } - - return json.getBytes(UTF_8); - } - - public static ObjectNode parseObject(String text) { - try { - if (text.isEmpty()) { - return parseObject(text, ObjectNode.class); - } else { - return (ObjectNode) objectMapper.readTree(text); - } - } catch (Exception e) { - throw new RuntimeException("String json deserialization exception. text: " + text, e); - } - } - - public static ArrayNode parseArray(String text) { - try { - return (ArrayNode) objectMapper.readTree(text); - } catch (Exception e) { - throw new RuntimeException("Json deserialization exception. text: " + text, e); - } - } - - /** json serializer */ - public static class JsonDataSerializer extends JsonSerializer { - - @Override - public void serialize(String value, JsonGenerator gen, SerializerProvider provider) - throws IOException { - gen.writeRawValue(value); - } - } - - /** json data deserializer */ - public static class JsonDataDeserializer extends JsonDeserializer { - - @Override - public String deserialize(JsonParser p, DeserializationContext ctxt) throws IOException { - JsonNode node = p.getCodec().readTree(p); - if (node instanceof TextNode) { - return node.asText(); - } else { - return node.toString(); - } - } - } -} diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/TaskOptionUtils.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/TaskOptionUtils.java index 1f727b6dd..02cc62ad5 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/TaskOptionUtils.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/TaskOptionUtils.java @@ -23,19 +23,16 @@ import org.apache.seatunnel.app.domain.request.job.transform.SplitTransformOptions; import org.apache.seatunnel.app.domain.request.job.transform.Transform; import org.apache.seatunnel.app.domain.request.job.transform.TransformOptions; +import org.apache.seatunnel.common.utils.JsonUtils; import org.apache.seatunnel.server.common.SeatunnelErrorEnum; import org.apache.seatunnel.server.common.SeatunnelException; import org.apache.commons.lang3.StringUtils; -import com.fasterxml.jackson.databind.ObjectMapper; - import java.io.IOException; public class TaskOptionUtils { - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - public static T getTransformOption( Transform transform, String transformOptionsStr) throws IOException { switch (transform) { @@ -58,13 +55,12 @@ public static T getTransformOption( } public static T convertTransformStrToOptions( - String transformOptionsStr, Class optionClass) - throws IOException { + String transformOptionsStr, Class optionClass) { if (StringUtils.isEmpty(transformOptionsStr)) { throw new SeatunnelException( SeatunnelErrorEnum.ILLEGAL_STATE, optionClass.getName() + " transformOptions can not be empty"); } - return (T) OBJECT_MAPPER.readValue(transformOptionsStr, optionClass); + return (T) JsonUtils.parseObject(transformOptionsStr, optionClass); } } diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/common/SeatunnelWebTestingBase.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/common/SeatunnelWebTestingBase.java index 19bd07467..fd17fbb54 100644 --- a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/common/SeatunnelWebTestingBase.java +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/common/SeatunnelWebTestingBase.java @@ -19,7 +19,7 @@ import org.apache.seatunnel.app.domain.request.user.UserLoginReq; import org.apache.seatunnel.app.domain.response.user.UserSimpleInfoRes; import org.apache.seatunnel.app.utils.JSONTestUtils; -import org.apache.seatunnel.app.utils.JSONUtils; +import org.apache.seatunnel.common.utils.JsonUtils; import com.fasterxml.jackson.core.type.TypeReference; @@ -30,12 +30,13 @@ import java.lang.reflect.Field; import java.net.HttpURLConnection; import java.net.URL; +import java.nio.charset.StandardCharsets; public class SeatunnelWebTestingBase { protected final String baseUrl = "http://localhost:8802/seatunnel/api/v1"; protected Result login(UserLoginReq userLoginReq) { - String requestBody = JSONUtils.toPrettyJsonString(userLoginReq); + String requestBody = JsonUtils.toJsonString(userLoginReq); String response = sendRequest(url("user/login"), requestBody, "POST"); return JSONTestUtils.parseObject( response, new TypeReference>() {}); @@ -71,7 +72,7 @@ protected String sendRequest(String url, String requestBody, String httpMethod) connection.setDoOutput(true); if (requestBody != null) { try (OutputStream os = connection.getOutputStream()) { - byte[] input = requestBody.getBytes("utf-8"); + byte[] input = requestBody.getBytes(StandardCharsets.UTF_8); os.write(input, 0, input.length); } } diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/ConnectorControllerWrapper.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/ConnectorControllerWrapper.java index 8a431febf..960a740cc 100644 --- a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/ConnectorControllerWrapper.java +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/ConnectorControllerWrapper.java @@ -16,13 +16,13 @@ */ package org.apache.seatunnel.app.controller; +import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode; + import org.apache.seatunnel.app.common.Result; import org.apache.seatunnel.app.common.SeatunnelWebTestingBase; import org.apache.seatunnel.app.domain.response.connector.ConnectorInfo; import org.apache.seatunnel.app.utils.JSONTestUtils; -import org.apache.seatunnel.app.utils.JSONUtils; - -import com.fasterxml.jackson.databind.JsonNode; +import org.apache.seatunnel.common.utils.JsonUtils; import java.util.List; @@ -30,19 +30,19 @@ public class ConnectorControllerWrapper extends SeatunnelWebTestingBase { public List listAllTransform() { String response = sendRequest(url("connector/transforms")); - JsonNode data = JSONUtils.parseObject(response).findValue("data"); + JsonNode data = JsonUtils.parseObject(response).findValue("data"); return JSONTestUtils.toList(data.toString(), ConnectorInfo.class); } public List listSource(String status) { String response = sendRequest(urlWithParam("connector/sources?status=" + status)); - JsonNode data = JSONUtils.parseObject(response).findValue("data"); + JsonNode data = JsonUtils.parseObject(response).findValue("data"); return JSONTestUtils.toList(data.toString(), ConnectorInfo.class); } public List listSink(String status) { String response = sendRequest(urlWithParam("connector/sinks?status=" + status)); - JsonNode data = JSONUtils.parseObject(response).findValue("data"); + JsonNode data = JsonUtils.parseObject(response).findValue("data"); return JSONTestUtils.toList(data.toString(), ConnectorInfo.class); } diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobConfigControllerWrapper.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobConfigControllerWrapper.java index e1835a1d3..82cd7b4d3 100644 --- a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobConfigControllerWrapper.java +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobConfigControllerWrapper.java @@ -22,8 +22,8 @@ import org.apache.seatunnel.app.domain.request.job.JobConfig; import org.apache.seatunnel.app.domain.response.job.JobConfigRes; import org.apache.seatunnel.app.utils.JSONTestUtils; -import org.apache.seatunnel.app.utils.JSONUtils; import org.apache.seatunnel.common.constants.JobMode; +import org.apache.seatunnel.common.utils.JsonUtils; import com.fasterxml.jackson.core.type.TypeReference; @@ -33,7 +33,7 @@ public class JobConfigControllerWrapper extends SeatunnelWebTestingBase { public Result updateJobConfig(long jobVersionId, JobConfig jobConfig) { - String requestBody = JSONUtils.toPrettyJsonString(jobConfig); + String requestBody = JsonUtils.toJsonString(jobConfig); String response = sendRequest(url("job/config/" + jobVersionId), requestBody, "PUT"); return JSONTestUtils.parseObject(response, Result.class); } diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobControllerWrapper.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobControllerWrapper.java index be654fbae..9a5b64bf9 100644 --- a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobControllerWrapper.java +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobControllerWrapper.java @@ -21,20 +21,20 @@ import org.apache.seatunnel.app.domain.request.job.JobCreateReq; import org.apache.seatunnel.app.domain.response.job.JobRes; import org.apache.seatunnel.app.utils.JSONTestUtils; -import org.apache.seatunnel.app.utils.JSONUtils; +import org.apache.seatunnel.common.utils.JsonUtils; import com.fasterxml.jackson.core.type.TypeReference; public class JobControllerWrapper extends SeatunnelWebTestingBase { public Result createJob(JobCreateReq jobCreateRequest) { - String requestBody = JSONUtils.toPrettyJsonString(jobCreateRequest); + String requestBody = JsonUtils.toJsonString(jobCreateRequest); String response = sendRequest(url("job/create"), requestBody, "POST"); return JSONTestUtils.parseObject(response, new TypeReference>() {}); } public Result updateJob(long jobVersionId, JobCreateReq jobCreateReq) { - String requestBody = JSONUtils.toPrettyJsonString(jobCreateReq); + String requestBody = JsonUtils.toJsonString(jobCreateReq); String response = sendRequest(urlWithParam("job/update/" + jobVersionId + "?"), requestBody, "PUT"); return JSONTestUtils.parseObject(response, new TypeReference>() {}); diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobDefinitionControllerWrapper.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobDefinitionControllerWrapper.java index b303bb79b..ccff9c3cd 100644 --- a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobDefinitionControllerWrapper.java +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobDefinitionControllerWrapper.java @@ -23,8 +23,8 @@ import org.apache.seatunnel.app.domain.response.PageInfo; import org.apache.seatunnel.app.domain.response.job.JobDefinitionRes; import org.apache.seatunnel.app.utils.JSONTestUtils; -import org.apache.seatunnel.app.utils.JSONUtils; import org.apache.seatunnel.common.constants.JobMode; +import org.apache.seatunnel.common.utils.JsonUtils; import com.fasterxml.jackson.core.type.TypeReference; @@ -33,7 +33,7 @@ public class JobDefinitionControllerWrapper extends SeatunnelWebTestingBase { public Result createJobDefinition(JobReq jobReq) { - String requestBody = JSONUtils.toPrettyJsonString(jobReq); + String requestBody = JsonUtils.toJsonString(jobReq); String response = sendRequest(url("job/definition"), requestBody, "POST"); return JSONTestUtils.parseObject(response, new TypeReference>() {}); } diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobExecutorControllerWrapper.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobExecutorControllerWrapper.java index a2aaefb9a..268b25602 100644 --- a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobExecutorControllerWrapper.java +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobExecutorControllerWrapper.java @@ -23,7 +23,7 @@ import org.apache.seatunnel.app.domain.response.executor.JobExecutionStatus; import org.apache.seatunnel.app.domain.response.executor.JobExecutorRes; import org.apache.seatunnel.app.utils.JSONTestUtils; -import org.apache.seatunnel.app.utils.JSONUtils; +import org.apache.seatunnel.common.utils.JsonUtils; import com.fasterxml.jackson.core.type.TypeReference; @@ -39,7 +39,7 @@ public Result jobExecutor(Long jobDefineId) { } public Result jobExecutor(Long jobDefineId, JobExecParam jobExecParam) { - String requestBody = JSONUtils.toPrettyJsonString(jobExecParam); + String requestBody = JsonUtils.toJsonString(jobExecParam); String response = sendRequest( urlWithParam("job/executor/execute?jobDefineId=" + jobDefineId), diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobTaskControllerWrapper.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobTaskControllerWrapper.java index 28cb6ba97..4d093e001 100644 --- a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobTaskControllerWrapper.java +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobTaskControllerWrapper.java @@ -27,14 +27,15 @@ import org.apache.seatunnel.app.domain.request.job.SelectTableFields; import org.apache.seatunnel.app.domain.response.job.JobTaskCheckRes; import org.apache.seatunnel.app.utils.JSONTestUtils; -import org.apache.seatunnel.app.utils.JSONUtils; import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.common.utils.JsonUtils; import org.apache.seatunnel.datasource.plugin.api.model.TableField; import com.fasterxml.jackson.core.type.TypeReference; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -43,7 +44,7 @@ public class JobTaskControllerWrapper extends SeatunnelWebTestingBase { public Result saveJobDAG(long jobVersionId, JobDAG jobDAG) { - String requestBody = JSONUtils.toPrettyJsonString(jobDAG); + String requestBody = JsonUtils.toJsonString(jobDAG); String response = sendRequest(url("job/dag/" + jobVersionId), requestBody, "POST"); return JSONTestUtils.parseObject(response, new TypeReference>() {}); } @@ -54,7 +55,7 @@ public Result getJob(long jobVersionId) { } public Result saveSingleTask(long jobVersionId, PluginConfig pluginConfig) { - String requestBody = JSONUtils.toPrettyJsonString(pluginConfig); + String requestBody = JsonUtils.toJsonString(pluginConfig); String response = sendRequest(url("job/task/" + jobVersionId), requestBody, "POST"); return JSONTestUtils.parseObject(response, Result.class); } @@ -73,8 +74,8 @@ public Result deleteSingleTask(long jobVersionId, String pluginId) { public String createFakeSourcePlugin(String datasourceId, long jobVersionId, String rows) { DataSourceOption tableOption = new DataSourceOption(); - tableOption.setDatabases(Arrays.asList("fake_database")); - tableOption.setTables(Arrays.asList("fake_table")); + tableOption.setDatabases(Collections.singletonList("fake_database")); + tableOption.setTables(Collections.singletonList("fake_table")); String sourcePluginId = "src_" + System.currentTimeMillis(); PluginConfig sourcePluginConfig = PluginConfig.builder() @@ -110,8 +111,8 @@ public String createFakeSourcePluginThatFails(String datasourceId, long jobVersi public String createConsoleSinkPlugin(String datasourceId, long jobVersionId) { DataSourceOption sinkTableOption = new DataSourceOption(); - sinkTableOption.setDatabases(Arrays.asList("console_fake_database")); - sinkTableOption.setTables(Arrays.asList("console_fake_table")); + sinkTableOption.setDatabases(Collections.singletonList("console_fake_database")); + sinkTableOption.setTables(Collections.singletonList("console_fake_table")); String sinkPluginId = "sink_" + System.currentTimeMillis(); PluginConfig sinkPluginConfig = @@ -157,7 +158,7 @@ private List getOutputSchema() { databaseTableSchemaReq.setDatabase("fake_database"); databaseTableSchemaReq.setTableName("fake_table"); databaseTableSchemaReq.setFields(createFields()); - return Arrays.asList(databaseTableSchemaReq); + return Collections.singletonList(databaseTableSchemaReq); } private List createFields() { diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/SeatunnelDatasourceControllerWrapper.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/SeatunnelDatasourceControllerWrapper.java index 4fe17e544..3f2af1f20 100644 --- a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/SeatunnelDatasourceControllerWrapper.java +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/SeatunnelDatasourceControllerWrapper.java @@ -24,7 +24,7 @@ import org.apache.seatunnel.app.domain.response.datasource.DatasourceDetailRes; import org.apache.seatunnel.app.domain.response.datasource.DatasourceRes; import org.apache.seatunnel.app.utils.JSONTestUtils; -import org.apache.seatunnel.app.utils.JSONUtils; +import org.apache.seatunnel.common.utils.JsonUtils; import com.fasterxml.jackson.core.type.TypeReference; @@ -73,19 +73,19 @@ private DatasourceReq getConsoleDatasourceReq(String datasourceName) { } public Result createDatasource(DatasourceReq req) { - String requestBody = JSONUtils.toPrettyJsonString(req); + String requestBody = JsonUtils.toJsonString(req); String response = sendRequest(url("datasource/create"), requestBody, "POST"); return JSONTestUtils.parseObject(response, new TypeReference>() {}); } public Result testConnect(DatasourceCheckReq req) { - String requestBody = JSONUtils.toPrettyJsonString(req); + String requestBody = JsonUtils.toJsonString(req); String response = sendRequest(url("datasource/check/connect"), requestBody, "POST"); return JSONTestUtils.parseObject(response, new TypeReference>() {}); } public Result updateDatasource(String id, DatasourceReq req) { - String requestBody = JSONUtils.toPrettyJsonString(req); + String requestBody = JsonUtils.toJsonString(req); String response = sendRequest(url("datasource/" + id), requestBody, "PUT"); return JSONTestUtils.parseObject(response, new TypeReference>() {}); } diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/TaskInstanceControllerWrapper.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/TaskInstanceControllerWrapper.java index 6558bff98..e45a89935 100644 --- a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/TaskInstanceControllerWrapper.java +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/TaskInstanceControllerWrapper.java @@ -35,7 +35,7 @@ public class TaskInstanceControllerWrapper extends SeatunnelWebTestingBase { - private static SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + private static final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); public Result> getTaskInstanceList( String taskName, diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/UserControllerWrapper.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/UserControllerWrapper.java index af0cb5563..711ef6702 100644 --- a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/UserControllerWrapper.java +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/UserControllerWrapper.java @@ -22,20 +22,20 @@ import org.apache.seatunnel.app.domain.request.user.UpdateUserReq; import org.apache.seatunnel.app.domain.response.user.AddUserRes; import org.apache.seatunnel.app.utils.JSONTestUtils; -import org.apache.seatunnel.app.utils.JSONUtils; +import org.apache.seatunnel.common.utils.JsonUtils; import com.fasterxml.jackson.core.type.TypeReference; public class UserControllerWrapper extends SeatunnelWebTestingBase { public Result addUser(AddUserReq addUserReq) { - String requestBody = JSONUtils.toPrettyJsonString(addUserReq); + String requestBody = JsonUtils.toJsonString(addUserReq); String response = sendRequest(url("user"), requestBody, "POST"); return JSONTestUtils.parseObject(response, new TypeReference>() {}); } public Result updateUser(String userId, UpdateUserReq updateUserReq) { - String requestBody = JSONUtils.toPrettyJsonString(updateUserReq); + String requestBody = JsonUtils.toJsonString(updateUserReq); String response = sendRequest(url("user/" + userId), requestBody, "PUT"); return JSONTestUtils.parseObject(response, Result.class); } diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobTestingUtils.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobTestingUtils.java index e9dc309f3..6ca28f2d0 100644 --- a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobTestingUtils.java +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobTestingUtils.java @@ -30,6 +30,7 @@ import org.apache.seatunnel.app.domain.request.job.PluginConfig; import org.apache.seatunnel.app.domain.response.job.JobTaskCheckRes; import org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes; +import org.apache.seatunnel.common.utils.JsonUtils; import java.io.IOException; import java.nio.file.Files; @@ -167,7 +168,7 @@ public static JobCreateReq populateMySQLJobCreateReqFromFile() { } catch (IOException e) { throw new RuntimeException(e); } - return JSONTestUtils.parseObject(jsonContent, JobCreateReq.class); + return JsonUtils.parseObject(jsonContent, JobCreateReq.class); } public static JobCreateReq populateJobCreateReqFromFile( @@ -179,7 +180,7 @@ public static JobCreateReq populateJobCreateReqFromFile( } catch (IOException e) { throw new RuntimeException(e); } - JobCreateReq jobCreateReq = JSONTestUtils.parseObject(jsonContent, JobCreateReq.class); + JobCreateReq jobCreateReq = JsonUtils.parseObject(jsonContent, JobCreateReq.class); jobCreateReq.getJobConfig().setName(jobName); jobCreateReq.getJobConfig().setDescription(jobName + " description"); setSourceIds(jobCreateReq, fsdSourceName, csSourceName);