From ded5481d98f939507743586723dbf209f5e0e56f Mon Sep 17 00:00:00 2001 From: iture123 <1011699225@qq.com> Date: Fri, 4 Nov 2022 10:00:11 +0800 Subject: [PATCH] [Feature][Connector-V2][Elasticsearch] Support Elasticsearch source (#2821) --- docs/en/connector-v2/source/Elasticsearch.md | 89 +++++++ plugin-mapping.properties | 3 +- .../connector-elasticsearch/pom.xml | 19 +- .../elasticsearch/client/EsRestClient.java | 250 ++++++++++++++++-- .../config/EsClusterConnectionConfig.java | 28 ++ .../config/source/SourceConfig.java | 30 +++ .../source/SourceConfigDeaultConstant.java | 26 ++ .../constant/EsTypeMappingSeaTunnelType.java | 50 ++++ .../elasticsearch/dto/IndexInfo.java | 21 +- .../dto/source/IndexDocsCount.java | 43 +++ .../dto/source/ScrollResult.java | 30 +++ .../dto/source/SourceIndexInfo.java | 34 +++ .../exception/GetIndexDocsCountException.java | 30 +++ .../exception/ScrollRequestException.java | 30 +++ .../serialize/ElasticsearchRowSerializer.java | 14 +- .../DeaultSeaTunnelRowDeserializer.java | 175 ++++++++++++ .../serialize/source/ElasticsearchRecord.java | 33 +++ .../source/SeaTunnelRowDeserializer.java | 26 ++ .../elasticsearch/sink/ElasticsearchSink.java | 18 +- .../sink/ElasticsearchSinkWriter.java | 31 +-- .../source/ElasticsearchSource.java | 104 ++++++++ .../source/ElasticsearchSourceReader.java | 122 +++++++++ .../source/ElasticsearchSourceSplit.java | 43 +++ .../ElasticsearchSourceSplitEnumerator.java | 195 ++++++++++++++ .../source/ElasticsearchSourceState.java | 32 +++ .../connector-elasticsearch-flink-e2e/pom.xml | 59 +++++ .../v2/elasticsearch/ElasticsearchIT.java | 170 ++++++++++++ .../elasticsearch_source_and_sink.conf | 64 +++++ .../src/test/resources/log4j.properties | 22 ++ .../connector-elasticsearch-spark-e2e/pom.xml | 61 +++++ .../v2/elasticsearch/ElasticsearchIT.java | 171 ++++++++++++ .../elasticsearch_source_and_sink.conf | 67 +++++ .../src/test/resources/log4j.properties | 22 ++ 33 files changed, 2027 insertions(+), 85 deletions(-) create mode 100644 docs/en/connector-v2/source/Elasticsearch.md create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/EsClusterConnectionConfig.java create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/source/SourceConfig.java create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/source/SourceConfigDeaultConstant.java create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/constant/EsTypeMappingSeaTunnelType.java create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/source/IndexDocsCount.java create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/source/ScrollResult.java create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/source/SourceIndexInfo.java create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/GetIndexDocsCountException.java create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ScrollRequestException.java create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DeaultSeaTunnelRowDeserializer.java create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/ElasticsearchRecord.java create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/SeaTunnelRowDeserializer.java create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplit.java create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceState.java create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-elasticsearch-flink-e2e/pom.xml create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-elasticsearch-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/elasticsearch/ElasticsearchIT.java create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-elasticsearch-flink-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-elasticsearch-flink-e2e/src/test/resources/log4j.properties create mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/pom.xml create mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/elasticsearch/ElasticsearchIT.java create mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf create mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/src/test/resources/log4j.properties diff --git a/docs/en/connector-v2/source/Elasticsearch.md b/docs/en/connector-v2/source/Elasticsearch.md new file mode 100644 index 00000000000..11606dda057 --- /dev/null +++ b/docs/en/connector-v2/source/Elasticsearch.md @@ -0,0 +1,89 @@ +# Elasticsearch + +> Elasticsearch source connector + +## Description + +Used to read data from Elasticsearch. + +support version >= 2.x and < 8.x. + +## Key features + +- [x] [batch](../../concept/connector-v2-features.md) +- [ ] [stream](../../concept/connector-v2-features.md) +- [ ] [exactly-once](../../concept/connector-v2-features.md) +- [x] [schema projection](../../concept/connector-v2-features.md) +- [ ] [parallelism](../../concept/connector-v2-features.md) +- [ ] [support user-defined split](../../concept/connector-v2-features.md) + +## Options + +| name | type | required | default value | +|-------------|--------| -------- |---------------| +| hosts | array | yes | - | +| username | string | no | - | +| password | string | no | - | +| index | string | yes | - | +| source | array | yes | - | +| scroll_time | string | no | 1m | +| scroll_size | int | no | 100 | + + + +### hosts [array] +Elasticsearch cluster http address, the format is `host:port`, allowing multiple hosts to be specified. Such as `["host1:9200", "host2:9200"]`. + +### username [string] +x-pack username. + +### password [string] +x-pack password. + +### index [string] +Elasticsearch index name, support * fuzzy matching. + +### source [array] +The fields of index. +You can get the document id by specifying the field `_id`.If sink _id to other index,you need specify an alias for _id due to the Elasticsearch limit. + +### scroll_time [String] +Amount of time Elasticsearch will keep the search context alive for scroll requests. + +### scroll_size [int] +Maximum number of hits to be returned with each Elasticsearch scroll request. + +## Examples +simple +```hocon +Elasticsearch { + hosts = ["localhost:9200"] + index = "seatunnel-*" + source = ["_id","name","age"] +} +``` +complex +```hocon +Elasticsearch { + hosts = ["elasticsearch:9200"] + index = "st_index" + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_decimal = "decimal(2, 1)" + c_bytes = bytes + c_date = date + c_timestamp = timestamp + } + } +} +``` \ No newline at end of file diff --git a/plugin-mapping.properties b/plugin-mapping.properties index e022fa59b1b..8080a44f444 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -116,7 +116,8 @@ seatunnel.sink.OssFile = connector-file-oss seatunnel.source.Pulsar = connector-pulsar seatunnel.source.Hudi = connector-hudi seatunnel.sink.DingTalk = connector-dingtalk -seatunnel.sink.elasticsearch = connector-elasticsearch +seatunnel.source.Elasticsearch = connector-elasticsearch +seatunnel.sink.Elasticsearch = connector-elasticsearch seatunnel.source.IoTDB = connector-iotdb seatunnel.sink.IoTDB = connector-iotdb seatunnel.source.Neo4j = connector-neo4j diff --git a/seatunnel-connectors-v2/connector-elasticsearch/pom.xml b/seatunnel-connectors-v2/connector-elasticsearch/pom.xml index 8419003d7ae..ab8b1e9b85c 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/pom.xml +++ b/seatunnel-connectors-v2/connector-elasticsearch/pom.xml @@ -28,11 +28,8 @@ 4.0.0 connector-elasticsearch - 7.5.1 - 2.12.6 - 2.13.3 @@ -42,15 +39,15 @@ ${elasticsearch-rest-client.version} - com.fasterxml.jackson.core - jackson-databind - ${jackson.databind.version} + org.apache.seatunnel + connector-common + ${project.version} + compile - com.fasterxml.jackson.datatype - jackson-datatype-jsr310 - ${jackson-datatype-jsr310.version} + org.apache.seatunnel + seatunnel-format-json + ${project.version} - - \ No newline at end of file + diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java index 661d5b7ca6c..809654b24dc 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java @@ -17,12 +17,23 @@ package org.apache.seatunnel.connectors.seatunnel.elasticsearch.client; +import org.apache.seatunnel.common.utils.JsonUtils; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.IndexDocsCount; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.ScrollResult; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.BulkElasticsearchException; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.GetElasticsearchVersionException; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.GetIndexDocsCountException; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ScrollRequestException; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.TextNode; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpHost; import org.apache.http.HttpStatus; @@ -30,6 +41,7 @@ import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.util.Asserts; import org.apache.http.util.EntityUtils; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; @@ -37,18 +49,45 @@ import org.elasticsearch.client.RestClientBuilder; import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; +import java.util.Map; +@Slf4j public class EsRestClient { - private static EsRestClient ES_REST_CLIENT; - private static RestClient REST_CLIENT; + private static final int CONNECTION_REQUEST_TIMEOUT = 10 * 1000; + + private static final int SOCKET_TIMEOUT = 5 * 60 * 1000; + + private final RestClient restClient; - private EsRestClient() { + private final ObjectMapper mapper = new ObjectMapper(); + + private EsRestClient(RestClient restClient) { + this.restClient = restClient; + } + + public static EsRestClient createInstance(Config pluginConfig) { + List hosts = pluginConfig.getStringList(EsClusterConnectionConfig.HOSTS); + String username = null; + String password = null; + if (pluginConfig.hasPath(EsClusterConnectionConfig.USERNAME)) { + username = pluginConfig.getString(EsClusterConnectionConfig.USERNAME); + if (pluginConfig.hasPath(EsClusterConnectionConfig.PASSWORD)) { + password = pluginConfig.getString(EsClusterConnectionConfig.PASSWORD); + } + } + return createInstance(hosts, username, password); + } + public static EsRestClient createInstance(List hosts, String username, String password) { + RestClientBuilder restClientBuilder = getRestClientBuilder(hosts, username, password); + return new EsRestClient(restClientBuilder.build()); } - @SuppressWarnings("checkstyle:MagicNumber") private static RestClientBuilder getRestClientBuilder(List hosts, String username, String password) { HttpHost[] httpHosts = new HttpHost[hosts.size()]; for (int i = 0; i < hosts.size(); i++) { @@ -58,8 +97,8 @@ private static RestClientBuilder getRestClientBuilder(List hosts, String RestClientBuilder builder = RestClient.builder(httpHosts) .setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder - .setConnectionRequestTimeout(10 * 1000) - .setSocketTimeout(5 * 60 * 1000)); + .setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT) + .setSocketTimeout(SOCKET_TIMEOUT)); if (StringUtils.isNotEmpty(username)) { CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); @@ -69,20 +108,11 @@ private static RestClientBuilder getRestClientBuilder(List hosts, String return builder; } - public static EsRestClient getInstance(List hosts, String username, String password) { - if (REST_CLIENT == null) { - RestClientBuilder restClientBuilder = getRestClientBuilder(hosts, username, password); - REST_CLIENT = restClientBuilder.build(); - ES_REST_CLIENT = new EsRestClient(); - } - return ES_REST_CLIENT; - } - public BulkResponse bulk(String requestBody) { Request request = new Request("POST", "_bulk"); request.setJsonEntity(requestBody); try { - Response response = REST_CLIENT.performRequest(request); + Response response = restClient.performRequest(request); if (response == null) { throw new BulkElasticsearchException("bulk es Response is null"); } @@ -104,10 +134,10 @@ public BulkResponse bulk(String requestBody) { /** * @return version.number, example:2.0.0 */ - public static String getClusterVersion() { + public String getClusterVersion() { Request request = new Request("GET", "/"); try { - Response response = REST_CLIENT.performRequest(request); + Response response = restClient.performRequest(request); String result = EntityUtils.toString(response.getEntity()); ObjectMapper objectMapper = new ObjectMapper(); JsonNode jsonNode = objectMapper.readTree(result); @@ -118,8 +148,188 @@ public static String getClusterVersion() { } } - public void close() throws IOException { - REST_CLIENT.close(); + public void close() { + try { + restClient.close(); + } catch (IOException e) { + log.warn("close elasticsearch connection error", e); + } + } + + /** + * first time to request search documents by scroll + * call /${index}/_search?scroll=${scroll} + * + * @param index index name + * @param source select fields + * @param scrollTime such as:1m + * @param scrollSize fetch documents count in one request + */ + public ScrollResult searchByScroll(String index, List source, String scrollTime, int scrollSize) { + Map param = new HashMap<>(); + Map query = new HashMap<>(); + query.put("match_all", new HashMap()); + param.put("query", query); + param.put("_source", source); + param.put("sort", new String[]{"_doc"}); + param.put("size", scrollSize); + String endpoint = index + "/_search?scroll=" + scrollTime; + ScrollResult scrollResult = getDocsFromScrollRequest(endpoint, JsonUtils.toJsonString(param)); + return scrollResult; + } + + /** + * scroll to get result + * call _search/scroll + * + * @param scrollId the scroll id of the last request + * @param scrollTime such as:1m + */ + public ScrollResult searchWithScrollId(String scrollId, String scrollTime) { + Map param = new HashMap<>(); + param.put("scroll_id", scrollId); + param.put("scroll", scrollTime); + ScrollResult scrollResult = getDocsFromScrollRequest("_search/scroll", JsonUtils.toJsonString(param)); + return scrollResult; + } + + private ScrollResult getDocsFromScrollRequest(String endpoint, String requestBody) { + Request request = new Request("POST", endpoint); + request.setJsonEntity(requestBody); + try { + Response response = restClient.performRequest(request); + if (response == null) { + throw new ScrollRequestException("POST " + endpoint + " response null"); + } + if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) { + String entity = EntityUtils.toString(response.getEntity()); + ObjectNode responseJson = JsonUtils.parseObject(entity); + + JsonNode shards = responseJson.get("_shards"); + int totalShards = shards.get("total").intValue(); + int successful = shards.get("successful").intValue(); + Asserts.check(totalShards == successful, String.format("POST %s,total shards(%d)!= successful shards(%d)", endpoint, totalShards, successful)); + + ScrollResult scrollResult = getDocsFromScrollResponse(responseJson); + return scrollResult; + } else { + throw new ScrollRequestException(String.format("POST %s response status code=%d,request boy=%s", endpoint, response.getStatusLine().getStatusCode(), requestBody)); + } + } catch (IOException e) { + throw new ScrollRequestException(String.format("POST %s error,request boy=%s", endpoint, requestBody), e); + + } + } + + private ScrollResult getDocsFromScrollResponse(ObjectNode responseJson) { + ScrollResult scrollResult = new ScrollResult(); + String scrollId = responseJson.get("_scroll_id").asText(); + scrollResult.setScrollId(scrollId); + + JsonNode hitsNode = responseJson.get("hits").get("hits"); + List> docs = new ArrayList<>(hitsNode.size()); + scrollResult.setDocs(docs); + + Iterator iter = hitsNode.iterator(); + while (iter.hasNext()) { + Map doc = new HashMap<>(); + JsonNode hitNode = iter.next(); + doc.put("_index", hitNode.get("_index").textValue()); + doc.put("_id", hitNode.get("_id").textValue()); + JsonNode source = hitNode.get("_source"); + for (Iterator> iterator = source.fields(); iterator.hasNext(); ) { + Map.Entry entry = iterator.next(); + String fieldName = entry.getKey(); + if (entry.getValue() instanceof TextNode){ + doc.put(fieldName, entry.getValue().textValue()); + } else { + doc.put(fieldName, entry.getValue()); + } + } + docs.add(doc); + } + return scrollResult; + } + + public List getIndexDocsCount(String index) { + String endpoint = String.format("_cat/indices/%s?h=index,docsCount&format=json", index); + Request request = new Request("GET", endpoint); + try { + Response response = restClient.performRequest(request); + if (response == null) { + throw new GetIndexDocsCountException("GET " + endpoint + " response null"); + } + if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) { + String entity = EntityUtils.toString(response.getEntity()); + List indexDocsCounts = JsonUtils.toList(entity, IndexDocsCount.class); + return indexDocsCounts; + } else { + throw new GetIndexDocsCountException(String.format("GET %s response status code=%d", endpoint, response.getStatusLine().getStatusCode())); + } + } catch (IOException ex) { + throw new GetIndexDocsCountException(ex); + } + } + + /** + * get es field name and type mapping realtion + * + * @param index index name + * @return {key-> field name,value->es type} + */ + public Map getFieldTypeMapping(String index, List source) { + String endpoint = String.format("%s/_mappings", index); + Request request = new Request("GET", endpoint); + Map mapping = new HashMap<>(); + try { + Response response = restClient.performRequest(request); + if (response == null) { + throw new GetIndexDocsCountException("GET " + endpoint + " response null"); + } + if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) { + throw new GetIndexDocsCountException(String.format("GET %s response status code=%d", endpoint, response.getStatusLine().getStatusCode())); + } + String entity = EntityUtils.toString(response.getEntity()); + log.info(String.format("GET %s respnse=%s", endpoint, entity)); + ObjectNode responseJson = JsonUtils.parseObject(entity); + for (Iterator it = responseJson.elements(); it.hasNext(); ) { + JsonNode indexProperty = it.next(); + JsonNode mappingsProperty = indexProperty.get("mappings"); + if (mappingsProperty.has("mappingsProperty")) { + JsonNode properties = mappingsProperty.get("properties"); + mapping = getFieldTypeMappingFromProperties(properties, source); + } else { + for (Iterator iter = mappingsProperty.iterator(); iter.hasNext(); ) { + JsonNode typeNode = iter.next(); + JsonNode properties = typeNode.get("properties"); + mapping.putAll(getFieldTypeMappingFromProperties(properties, source)); + } + } + + } + } catch (IOException ex) { + throw new GetIndexDocsCountException(ex); + } + return mapping; + } + + private static Map getFieldTypeMappingFromProperties(JsonNode properties, List source) { + Map mapping = new HashMap<>(); + for (String field : source) { + JsonNode fieldProperty = properties.get(field); + if (fieldProperty == null) { + mapping.put(field, "text"); + } else { + if (fieldProperty.has("type")) { + String type = fieldProperty.get("type").asText(); + mapping.put(field, type); + } else { + log.warn(String.format("fail to get elasticsearch field %s mapping type,so give a default type text", field)); + mapping.put(field, "text"); + } + } + } + return mapping; } } diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/EsClusterConnectionConfig.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/EsClusterConnectionConfig.java new file mode 100644 index 00000000000..fd482db5efd --- /dev/null +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/EsClusterConnectionConfig.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.elasticsearch.config; + +public class EsClusterConnectionConfig { + + public static final String HOSTS = "hosts"; + + public static final String USERNAME = "username"; + + public static final String PASSWORD = "password"; + +} diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/source/SourceConfig.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/source/SourceConfig.java new file mode 100644 index 00000000000..8040803079b --- /dev/null +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/source/SourceConfig.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.source; + +public class SourceConfig { + + public static final String INDEX = "index"; + + public static final String SOURCE = "source"; + + public static final String SCROLL_TIME = "scroll_time"; + + public static final String SCROLL_SIZE = "scroll_size"; + +} diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/source/SourceConfigDeaultConstant.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/source/SourceConfigDeaultConstant.java new file mode 100644 index 00000000000..035b556b6ff --- /dev/null +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/source/SourceConfigDeaultConstant.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.source; + +public class SourceConfigDeaultConstant { + + public static final String SCROLLL_TIME = "1m"; + + public static final int SCROLLL_SIZE = 100; + +} diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/constant/EsTypeMappingSeaTunnelType.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/constant/EsTypeMappingSeaTunnelType.java new file mode 100644 index 00000000000..2a459b5fc27 --- /dev/null +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/constant/EsTypeMappingSeaTunnelType.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant; + +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; + +import java.util.HashMap; +import java.util.Map; + +public class EsTypeMappingSeaTunnelType { + + private static final Map MAPPING = new HashMap() { + { + put("string", BasicType.STRING_TYPE); + put("keyword", BasicType.STRING_TYPE); + put("text", BasicType.STRING_TYPE); + put("boolean", BasicType.BOOLEAN_TYPE); + put("byte", BasicType.BYTE_TYPE); + put("short", BasicType.SHORT_TYPE); + put("integer", BasicType.INT_TYPE); + put("long", BasicType.LONG_TYPE); + put("float", BasicType.FLOAT_TYPE); + put("half_float", BasicType.FLOAT_TYPE); + put("double", BasicType.DOUBLE_TYPE); + put("date", LocalTimeType.LOCAL_DATE_TIME_TYPE); + } + }; + + public static SeaTunnelDataType getSeaTunnelDataType(String esType) { + return MAPPING.get(esType); + } + +} diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/IndexInfo.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/IndexInfo.java index 0c6907a4b38..dad62cf74df 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/IndexInfo.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/IndexInfo.java @@ -19,35 +19,28 @@ import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig; +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import lombok.Data; + /** * index config by seatunnel */ +@Data public class IndexInfo { private String index; private String type; - public IndexInfo(org.apache.seatunnel.shade.com.typesafe.config.Config pluginConfig) { + public IndexInfo(Config pluginConfig) { index = pluginConfig.getString(SinkConfig.INDEX); if (pluginConfig.hasPath(SinkConfig.INDEX_TYPE)) { type = pluginConfig.getString(SinkConfig.INDEX_TYPE); } } - public String getIndex() { - return index; - } - - public void setIndex(String index) { + public IndexInfo(String index, String type) { this.index = index; - } - - public String getType() { - return type; - } - - public void setType(String type) { this.type = type; } - } diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/source/IndexDocsCount.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/source/IndexDocsCount.java new file mode 100644 index 00000000000..314c1ed4867 --- /dev/null +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/source/IndexDocsCount.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source; + +public class IndexDocsCount { + + private String index; + /** + * index docs count + */ + private Long docsCount; + + public String getIndex() { + return index; + } + + public void setIndex(String index) { + this.index = index; + } + + public Long getDocsCount() { + return docsCount; + } + + public void setDocsCount(Long docsCount) { + this.docsCount = docsCount; + } +} diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/source/ScrollResult.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/source/ScrollResult.java new file mode 100644 index 00000000000..1368775e52c --- /dev/null +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/source/ScrollResult.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source; + +import lombok.Data; + +import java.util.List; +import java.util.Map; + +@Data +public class ScrollResult { + + private String scrollId; + private List> docs; +} diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/source/SourceIndexInfo.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/source/SourceIndexInfo.java new file mode 100644 index 00000000000..1cead4b47e5 --- /dev/null +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/source/SourceIndexInfo.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source; + +import lombok.AllArgsConstructor; +import lombok.Data; + +import java.io.Serializable; +import java.util.List; + +@Data +@AllArgsConstructor +public class SourceIndexInfo implements Serializable { + private String index; + private List source; + private String scrollTime; + private int scrollSize; + +} diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/GetIndexDocsCountException.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/GetIndexDocsCountException.java new file mode 100644 index 00000000000..19925a966f4 --- /dev/null +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/GetIndexDocsCountException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception; + +public class GetIndexDocsCountException extends RuntimeException { + + public GetIndexDocsCountException(String message) { + super(message); + } + + public GetIndexDocsCountException(Throwable cause) { + super(cause); + } + +} diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ScrollRequestException.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ScrollRequestException.java new file mode 100644 index 00000000000..f0948341790 --- /dev/null +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ScrollRequestException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception; + +public class ScrollRequestException extends RuntimeException { + + public ScrollRequestException(String message, Throwable cause) { + super(message, cause); + } + + public ScrollRequestException(String message) { + super(message); + } + +} diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java index 3c1ca1e00da..fd7fe4a1f46 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java @@ -28,9 +28,8 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.SerializationFeature; -import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import java.time.temporal.Temporal; import java.util.HashMap; import java.util.Map; @@ -39,8 +38,7 @@ */ public class ElasticsearchRowSerializer implements SeaTunnelRowSerializer { private final SeaTunnelRowType seaTunnelRowType; - private final ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()) - .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + private final ObjectMapper objectMapper = new ObjectMapper(); private final IndexSerializer indexSerializer; @@ -58,7 +56,13 @@ public String serializeRow(SeaTunnelRow row){ Map doc = new HashMap<>(fieldNames.length); Object[] fields = row.getFields(); for (int i = 0; i < fieldNames.length; i++) { - doc.put(fieldNames[i], fields[i]); + Object value = fields[i]; + if (value instanceof Temporal){ + //jackson not support jdk8 new time api + doc.put(fieldNames[i], value.toString()); + } else { + doc.put(fieldNames[i], value); + } } StringBuilder sb = new StringBuilder(); diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DeaultSeaTunnelRowDeserializer.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DeaultSeaTunnelRowDeserializer.java new file mode 100644 index 00000000000..6a89dbd689d --- /dev/null +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DeaultSeaTunnelRowDeserializer.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.source; + +import static org.apache.seatunnel.api.table.type.BasicType.BOOLEAN_TYPE; +import static org.apache.seatunnel.api.table.type.BasicType.BYTE_TYPE; +import static org.apache.seatunnel.api.table.type.BasicType.DOUBLE_TYPE; +import static org.apache.seatunnel.api.table.type.BasicType.FLOAT_TYPE; +import static org.apache.seatunnel.api.table.type.BasicType.INT_TYPE; +import static org.apache.seatunnel.api.table.type.BasicType.LONG_TYPE; +import static org.apache.seatunnel.api.table.type.BasicType.SHORT_TYPE; +import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE; +import static org.apache.seatunnel.api.table.type.BasicType.VOID_TYPE; + +import org.apache.seatunnel.api.table.type.ArrayType; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.DecimalType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.MapType; +import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.utils.JsonUtils; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.lang.reflect.Array; +import java.math.BigDecimal; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Base64; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class DeaultSeaTunnelRowDeserializer implements SeaTunnelRowDeserializer { + + private final SeaTunnelRowType rowTypeInfo; + + private final ObjectMapper mapper = new ObjectMapper(); + + private final Map dateTimeFormatterMap = new HashMap(){ + { + put("yyyy-MM-dd HH".length(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH")); + put("yyyy-MM-dd HH:mm".length(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm")); + put("yyyyMMdd HH:mm:ss".length(), DateTimeFormatter.ofPattern("yyyyMMdd HH:mm:ss")); + put("yyyy-MM-dd HH:mm:ss".length(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); + put("yyyy-MM-dd HH:mm:ss.S".length(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.S")); + put("yyyy-MM-dd HH:mm:ss.SS".length(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SS")); + put("yyyy-MM-dd HH:mm:ss.SSS".length(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")); + } + }; + + public DeaultSeaTunnelRowDeserializer(SeaTunnelRowType rowTypeInfo) { + this.rowTypeInfo = rowTypeInfo; + } + + @Override + public SeaTunnelRow deserialize(ElasticsearchRecord rowRecord) { + return convert(rowRecord); + } + + SeaTunnelRow convert(ElasticsearchRecord rowRecord) { + Object[] seaTunnelFields = new Object[rowTypeInfo.getTotalFields()]; + String fieldName = null; + Object value = null; + SeaTunnelDataType seaTunnelDataType = null; + try { + for (int i = 0; i < rowTypeInfo.getTotalFields(); i++) { + fieldName = rowTypeInfo.getFieldName(i); + value = rowRecord.getDoc().get(fieldName); + if (value != null) { + seaTunnelDataType = rowTypeInfo.getFieldType(i); + seaTunnelFields[i] = convertValue(seaTunnelDataType, value.toString()); + } + } + } catch (Exception ex){ + throw new RuntimeException(String.format("error fieldName=%s,fieldValue=%s,seaTunnelDataType=%s,rowRecord=%s", fieldName, value, seaTunnelDataType, JsonUtils.toJsonString(rowRecord)), ex); + } + return new SeaTunnelRow(seaTunnelFields); + } + + Object convertValue(SeaTunnelDataType fieldType, String fieldValue) throws JsonProcessingException { + if (BOOLEAN_TYPE.equals(fieldType)) { + return Boolean.parseBoolean(fieldValue); + } else if (BYTE_TYPE.equals(fieldType)) { + return Byte.valueOf(fieldValue); + } else if (SHORT_TYPE.equals(fieldType)) { + return Short.parseShort(fieldValue); + } else if (INT_TYPE.equals(fieldType)) { + return Integer.parseInt(fieldValue); + } else if (LONG_TYPE.equals(fieldType)) { + return Long.parseLong(fieldValue); + } else if (FLOAT_TYPE.equals(fieldType)) { + return Float.parseFloat(fieldValue); + } else if (DOUBLE_TYPE.equals(fieldType)) { + return Double.parseDouble(fieldValue); + } else if (STRING_TYPE.equals(fieldType)) { + return fieldValue; + } else if (LocalTimeType.LOCAL_DATE_TYPE.equals(fieldType)) { + LocalDateTime localDateTime = parseDate(fieldValue); + return localDateTime.toLocalDate(); + } else if (LocalTimeType.LOCAL_TIME_TYPE.equals(fieldType)) { + LocalDateTime localDateTime = parseDate(fieldValue); + return localDateTime.toLocalTime(); + } else if (LocalTimeType.LOCAL_DATE_TIME_TYPE.equals(fieldType)) { + LocalDateTime localDateTime = parseDate(fieldValue); + return localDateTime; + } else if (fieldType instanceof DecimalType) { + return new BigDecimal(fieldValue); + } else if (fieldType instanceof ArrayType) { + ArrayType arrayType = (ArrayType) fieldType; + BasicType elementType = arrayType.getElementType(); + List stringList = JsonUtils.toList(fieldValue, String.class); + Object arr = Array.newInstance(elementType.getTypeClass(), stringList.size()); + for (int i = 0; i < stringList.size(); i++){ + Object convertValue = convertValue(elementType, stringList.get(i)); + Array.set(arr, 0, convertValue); + } + return arr; + } else if (fieldType instanceof MapType) { + MapType mapType = (MapType) fieldType; + SeaTunnelDataType keyType = mapType.getKeyType(); + + SeaTunnelDataType valueType = mapType.getValueType(); + Map stringMap = mapper.readValue(fieldValue, new TypeReference>() {}); + Map convertMap = new HashMap(); + for (Map.Entry entry : stringMap.entrySet()){ + Object convertKey = convertValue(keyType, entry.getKey()); + Object convertValue = convertValue(valueType, entry.getValue()); + convertMap.put(convertKey, convertValue); + } + return convertMap; + } else if (fieldType instanceof PrimitiveByteArrayType) { + return Base64.getDecoder().decode(fieldValue); + } else if (VOID_TYPE.equals(fieldType) || fieldType == null) { + return null; + } else { + throw new UnsupportedOperationException("Unexpected value: " + fieldType); + } + + } + + private LocalDateTime parseDate(String fieldValue){ + String formatDate = fieldValue.replace("T", " "); + if (fieldValue.length() == "yyyyMMdd".length() || fieldValue.length() == "yyyy-MM-dd".length()){ + formatDate = fieldValue + " 00:00:00"; + } + DateTimeFormatter dateTimeFormatter = dateTimeFormatterMap.get(formatDate.length()); + if (dateTimeFormatter == null){ + throw new UnsupportedOperationException("unsupported date format"); + } + LocalDateTime localDateTime = LocalDateTime.parse(formatDate, dateTimeFormatter); + return localDateTime; + } +} + diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/ElasticsearchRecord.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/ElasticsearchRecord.java new file mode 100644 index 00000000000..3e5eb10b582 --- /dev/null +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/ElasticsearchRecord.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.source; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.ToString; + +import java.util.List; +import java.util.Map; + +@Getter +@ToString +@AllArgsConstructor +public class ElasticsearchRecord { + private Map doc; + private List source; +} diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/SeaTunnelRowDeserializer.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/SeaTunnelRowDeserializer.java new file mode 100644 index 00000000000..ff64dbad559 --- /dev/null +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/SeaTunnelRowDeserializer.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.source; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; + +public interface SeaTunnelRowDeserializer { + + SeaTunnelRow deserialize(ElasticsearchRecord rowRecord); + +} diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java index f6b51191589..3924c7199a8 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java @@ -17,23 +17,19 @@ package org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink; -import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.HOSTS; -import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.INDEX; - import org.apache.seatunnel.api.common.PrepareFailException; import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.common.config.CheckConfigUtil; -import org.apache.seatunnel.common.config.CheckResult; -import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchAggregatedCommitInfo; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchCommitInfo; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchSinkState; +import org.apache.seatunnel.shade.com.typesafe.config.Config; + import com.google.auto.service.AutoService; import java.util.Collections; @@ -42,7 +38,7 @@ public class ElasticsearchSink implements SeaTunnelSink { - private org.apache.seatunnel.shade.com.typesafe.config.Config pluginConfig; + private Config pluginConfig; private SeaTunnelRowType seaTunnelRowType; @Override @@ -51,12 +47,7 @@ public String getPluginName() { } @Override - public void prepare(org.apache.seatunnel.shade.com.typesafe.config.Config pluginConfig) throws - PrepareFailException { - CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, HOSTS, INDEX); - if (!result.isSuccess()) { - throw new PrepareFailException(getPluginName(), PluginType.SINK, result.getMsg()); - } + public void prepare(Config pluginConfig) throws PrepareFailException { this.pluginConfig = pluginConfig; SinkConfig.setValue(pluginConfig); } @@ -75,4 +66,5 @@ public SeaTunnelDataType getConsumedType() { public SinkWriter createWriter(SinkWriter.Context context) { return new ElasticsearchSinkWriter(context, seaTunnelRowType, pluginConfig, Collections.emptyList()); } + } diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java index 6a2c32cd666..1bc2ef55f73 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java @@ -21,7 +21,6 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient; -import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.BulkConfig; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.ElasticsearchVersion; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse; @@ -30,6 +29,7 @@ import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.ElasticsearchRowSerializer; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.SeaTunnelRowSerializer; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchSinkState; import org.apache.seatunnel.shade.com.typesafe.config.Config; @@ -40,11 +40,11 @@ import java.util.List; import java.util.Optional; +@Slf4j /** * ElasticsearchSinkWriter is a sink writer that will write {@link SeaTunnelRow} to Elasticsearch. */ -@Slf4j -public class ElasticsearchSinkWriter implements SinkWriter { +public class ElasticsearchSinkWriter implements SinkWriter { private final SinkWriter.Context context; @@ -56,30 +56,17 @@ public ElasticsearchSinkWriter( SinkWriter.Context context, SeaTunnelRowType seaTunnelRowType, Config pluginConfig, - List elasticsearchStates) { + List elasticsearchStates) { this.context = context; IndexInfo indexInfo = new IndexInfo(pluginConfig); - initRestClient(pluginConfig); - ElasticsearchVersion elasticsearchVersion = ElasticsearchVersion.get(EsRestClient.getClusterVersion()); + esRestClient = EsRestClient.createInstance(pluginConfig); + ElasticsearchVersion elasticsearchVersion = ElasticsearchVersion.get(esRestClient.getClusterVersion()); this.seaTunnelRowSerializer = new ElasticsearchRowSerializer(elasticsearchVersion, indexInfo, seaTunnelRowType); this.requestEsList = new ArrayList<>(BulkConfig.MAX_BATCH_SIZE); } - private void initRestClient(org.apache.seatunnel.shade.com.typesafe.config.Config pluginConfig) { - List hosts = pluginConfig.getStringList(SinkConfig.HOSTS); - String username = null; - String password = null; - if (pluginConfig.hasPath(SinkConfig.USERNAME)) { - username = pluginConfig.getString(SinkConfig.USERNAME); - if (pluginConfig.hasPath(SinkConfig.PASSWORD)) { - password = pluginConfig.getString(SinkConfig.PASSWORD); - } - } - esRestClient = EsRestClient.getInstance(hosts, username, password); - } - @Override public void write(SeaTunnelRow element) { String indexRequestRow = seaTunnelRowSerializer.serializeRow(element); @@ -107,12 +94,14 @@ public void bulkEsWithRetry(EsRestClient esRestClient, List requestEsLis BulkResponse bulkResponse = esRestClient.bulk(requestBody); if (!bulkResponse.isErrors()) { break; + } else { + throw new BulkElasticsearchException(bulkResponse.getResponse()); } } catch (Exception ex) { if (tryCnt == maxRetry) { - throw new BulkElasticsearchException("bulk es error,try count=%d", ex); + throw new BulkElasticsearchException("bulk elasticsearch error,try count=%d", ex); } - log.warn(String.format("bulk es error,try count=%d", tryCnt), ex); + log.warn(String.format("bulk elasticsearch error,try count=%d", tryCnt), ex); } } diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java new file mode 100644 index 00000000000..674bc2da495 --- /dev/null +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.elasticsearch.source; + +import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.source.Boundedness; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceReader; +import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.source.SourceConfig; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.EsTypeMappingSeaTunnelType; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.google.auto.service.AutoService; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +@AutoService(SeaTunnelSource.class) +public class ElasticsearchSource implements SeaTunnelSource { + + + private Config pluginConfig; + + private SeaTunnelRowType rowTypeInfo; + + private List source; + + @Override + public String getPluginName() { + return "Elasticsearch"; + } + + @Override + public void prepare(Config pluginConfig) throws PrepareFailException { + this.pluginConfig = pluginConfig; + if (pluginConfig.hasPath(SeaTunnelSchema.SCHEMA)) { + Config schemaConfig = pluginConfig.getConfig(SeaTunnelSchema.SCHEMA); + rowTypeInfo = SeaTunnelSchema.buildWithConfig(schemaConfig).getSeaTunnelRowType(); + source = Arrays.asList(rowTypeInfo.getFieldNames()); + } else { + source = pluginConfig.getStringList(SourceConfig.SOURCE); + EsRestClient esRestClient = EsRestClient.createInstance(this.pluginConfig); + Map esFieldType = esRestClient.getFieldTypeMapping(pluginConfig.getString(SourceConfig.INDEX), source); + esRestClient.close(); + SeaTunnelDataType[] fieldTypes = new SeaTunnelDataType[source.size()]; + for (int i = 0; i < source.size(); i++) { + String esType = esFieldType.get(source.get(i)); + SeaTunnelDataType seaTunnelDataType = EsTypeMappingSeaTunnelType.getSeaTunnelDataType(esType); + fieldTypes[i] = seaTunnelDataType; + } + rowTypeInfo = new SeaTunnelRowType(source.toArray(new String[source.size()]), fieldTypes); + } + } + + @Override + public Boundedness getBoundedness() { + return Boundedness.BOUNDED; + } + + @Override + public SeaTunnelDataType getProducedType() { + return this.rowTypeInfo; + } + + @Override + public SourceReader createReader(SourceReader.Context readerContext) { + return new ElasticsearchSourceReader(readerContext, pluginConfig, rowTypeInfo); + } + + @Override + public SourceSplitEnumerator createEnumerator(SourceSplitEnumerator.Context enumeratorContext) { + return new ElasticsearchSourceSplitEnumerator(enumeratorContext, pluginConfig, source); + } + + @Override + public SourceSplitEnumerator restoreEnumerator(SourceSplitEnumerator.Context enumeratorContext, ElasticsearchSourceState sourceState) { + return new ElasticsearchSourceSplitEnumerator(enumeratorContext, sourceState, pluginConfig, source); + } + +} + diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java new file mode 100644 index 00000000000..6df646bb9ea --- /dev/null +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.elasticsearch.source; + +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.source.SourceReader; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.ScrollResult; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.SourceIndexInfo; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.source.DeaultSeaTunnelRowDeserializer; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.source.ElasticsearchRecord; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.source.SeaTunnelRowDeserializer; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Deque; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +@Slf4j +public class ElasticsearchSourceReader implements SourceReader { + + SourceReader.Context context; + + private Config pluginConfig; + + private EsRestClient esRestClient; + + private final SeaTunnelRowDeserializer deserializer; + + Deque splits = new LinkedList<>(); + boolean noMoreSplit; + + private final long pollNextWaitTime = 1000L; + + public ElasticsearchSourceReader(SourceReader.Context context, Config pluginConfig, SeaTunnelRowType rowTypeInfo) { + this.context = context; + this.pluginConfig = pluginConfig; + this.deserializer = new DeaultSeaTunnelRowDeserializer(rowTypeInfo); + } + + @Override + public void open() { + esRestClient = EsRestClient.createInstance(this.pluginConfig); + } + + @Override + public void close() throws IOException { + esRestClient.close(); + } + + @Override + public void pollNext(Collector output) throws Exception { + synchronized (output.getCheckpointLock()) { + ElasticsearchSourceSplit split = splits.poll(); + if (split != null){ + SourceIndexInfo sourceIndexInfo = split.getSourceIndexInfo(); + ScrollResult scrollResult = esRestClient.searchByScroll(sourceIndexInfo.getIndex(), sourceIndexInfo.getSource(), sourceIndexInfo.getScrollTime(), sourceIndexInfo.getScrollSize()); + outputFromScrollResult(scrollResult, sourceIndexInfo.getSource(), output); + while (scrollResult.getDocs() != null && scrollResult.getDocs().size() > 0) { + scrollResult = esRestClient.searchWithScrollId(scrollResult.getScrollId(), sourceIndexInfo.getScrollTime()); + outputFromScrollResult(scrollResult, sourceIndexInfo.getSource(), output); + } + } else if (noMoreSplit) { + // signal to the source that we have reached the end of the data. + log.info("Closed the bounded ELasticsearch source"); + context.signalNoMoreElement(); + } else { + Thread.sleep(pollNextWaitTime); + } + } + } + + private void outputFromScrollResult(ScrollResult scrollResult, List source, Collector output) { + for (Map doc : scrollResult.getDocs()) { + SeaTunnelRow seaTunnelRow = deserializer.deserialize(new ElasticsearchRecord(doc, source)); + output.collect(seaTunnelRow); + } + } + + @Override + public List snapshotState(long checkpointId) throws Exception { + return new ArrayList<>(splits); + } + + @Override + public void addSplits(List splits) { + this.splits.addAll(splits); + } + + @Override + public void handleNoMoreSplits() { + noMoreSplit = true; + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + + } +} diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplit.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplit.java new file mode 100644 index 00000000000..9f1e4c3a8f4 --- /dev/null +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplit.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.elasticsearch.source; + +import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.SourceIndexInfo; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.ToString; + +@ToString +@AllArgsConstructor +public class ElasticsearchSourceSplit implements SourceSplit { + + private static final long serialVersionUID = -1L; + + private String splitId; + + @Getter + private SourceIndexInfo sourceIndexInfo; + + @Override + public String splitId() { + return splitId; + } +} + diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java new file mode 100644 index 00000000000..2ba7c58ac7c --- /dev/null +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.elasticsearch.source; + +import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.source.SourceConfig; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.source.SourceConfigDeaultConstant; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.IndexDocsCount; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.SourceIndexInfo; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +@Slf4j +public class ElasticsearchSourceSplitEnumerator implements SourceSplitEnumerator { + + private SourceSplitEnumerator.Context context; + + private Config pluginConfig; + + private EsRestClient esRestClient; + + private final Object stateLock = new Object(); + + private Map> pendingSplit; + + private List source; + + private volatile boolean shouldEnumerate; + + public ElasticsearchSourceSplitEnumerator(SourceSplitEnumerator.Context context, Config pluginConfig, List source) { + this(context, null, pluginConfig, source); + } + + public ElasticsearchSourceSplitEnumerator(SourceSplitEnumerator.Context context, ElasticsearchSourceState sourceState, Config pluginConfig, List source) { + this.context = context; + this.pluginConfig = pluginConfig; + this.pendingSplit = new HashMap<>(); + this.shouldEnumerate = sourceState == null; + if (sourceState != null) { + this.shouldEnumerate = sourceState.isShouldEnumerate(); + this.pendingSplit.putAll(sourceState.getPendingSplit()); + } + this.source = source; + } + + @Override + public void open() { + esRestClient = EsRestClient.createInstance(pluginConfig); + } + + @Override + public void run() { + Set readers = context.registeredReaders(); + if (shouldEnumerate) { + List newSplits = getElasticsearchSplit(); + + synchronized (stateLock) { + addPendingSplit(newSplits); + shouldEnumerate = false; + } + + assignSplit(readers); + } + + log.debug("No more splits to assign." + + " Sending NoMoreSplitsEvent to reader {}.", readers); + readers.forEach(context::signalNoMoreSplits); + } + + private void addPendingSplit(Collection splits) { + int readerCount = context.currentParallelism(); + for (ElasticsearchSourceSplit split : splits) { + int ownerReader = getSplitOwner(split.splitId(), readerCount); + log.info("Assigning {} to {} reader.", split, ownerReader); + pendingSplit.computeIfAbsent(ownerReader, r -> new ArrayList<>()) + .add(split); + } + } + + private static int getSplitOwner(String tp, int numReaders) { + return (tp.hashCode() & Integer.MAX_VALUE) % numReaders; + } + + private void assignSplit(Collection readers) { + log.debug("Assign pendingSplits to readers {}", readers); + + for (int reader : readers) { + List assignmentForReader = pendingSplit.remove(reader); + if (assignmentForReader != null && !assignmentForReader.isEmpty()) { + log.info("Assign splits {} to reader {}", + assignmentForReader, reader); + try { + context.assignSplit(reader, assignmentForReader); + } catch (Exception e) { + log.error("Failed to assign splits {} to reader {}", + assignmentForReader, reader, e); + pendingSplit.put(reader, assignmentForReader); + } + } + } + } + + private List getElasticsearchSplit() { + List splits = new ArrayList<>(); + String scrolllTime = SourceConfigDeaultConstant.SCROLLL_TIME; + if (pluginConfig.hasPath(SourceConfig.SCROLL_TIME)) { + scrolllTime = pluginConfig.getString(SourceConfig.SCROLL_TIME); + } + int scrollSize = SourceConfigDeaultConstant.SCROLLL_SIZE; + if (pluginConfig.hasPath(SourceConfig.SCROLL_SIZE)) { + scrollSize = pluginConfig.getInt(SourceConfig.SCROLL_SIZE); + } + + List indexDocsCounts = esRestClient.getIndexDocsCount(pluginConfig.getString(SourceConfig.INDEX)); + indexDocsCounts = indexDocsCounts.stream().filter(x -> x.getDocsCount() != null && x.getDocsCount() > 0) + .sorted(Comparator.comparingLong(IndexDocsCount::getDocsCount)).collect(Collectors.toList()); + for (IndexDocsCount indexDocsCount : indexDocsCounts) { + splits.add(new ElasticsearchSourceSplit(String.valueOf(indexDocsCount.getIndex().hashCode()), new SourceIndexInfo(indexDocsCount.getIndex(), source, scrolllTime, scrollSize))); + } + return splits; + } + + @Override + public void close() throws IOException { + esRestClient.close(); + } + + @Override + public void addSplitsBack(List splits, int subtaskId) { + if (!splits.isEmpty()) { + addPendingSplit(splits); + assignSplit(Collections.singletonList(subtaskId)); + } + } + + @Override + public int currentUnassignedSplitSize() { + return pendingSplit.size(); + } + + @Override + public void handleSplitRequest(int subtaskId) { + throw new UnsupportedOperationException("Unsupported handleSplitRequest: " + subtaskId); + } + + @Override + public void registerReader(int subtaskId) { + log.debug("Register reader {} to IoTDBSourceSplitEnumerator.", + subtaskId); + if (!pendingSplit.isEmpty()) { + assignSplit(Collections.singletonList(subtaskId)); + } + } + + @Override + public ElasticsearchSourceState snapshotState(long checkpointId) throws Exception { + synchronized (stateLock) { + return new ElasticsearchSourceState(shouldEnumerate, pendingSplit); + } + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + + } +} diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceState.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceState.java new file mode 100644 index 00000000000..dda0a9eea8c --- /dev/null +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceState.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.elasticsearch.source; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +@AllArgsConstructor +@Getter +public class ElasticsearchSourceState implements Serializable { + private boolean shouldEnumerate; + private Map> pendingSplit; +} diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-elasticsearch-flink-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-elasticsearch-flink-e2e/pom.xml new file mode 100644 index 00000000000..e1c1e4841a8 --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-elasticsearch-flink-e2e/pom.xml @@ -0,0 +1,59 @@ + + + + + org.apache.seatunnel + seatunnel-flink-connector-v2-e2e + ${revision} + + 4.0.0 + + connector-elasticsearch-flink-e2e + + + + org.apache.seatunnel + connector-flink-e2e-base + ${project.version} + tests + test-jar + test + + + + + org.apache.seatunnel + connector-fake + ${project.version} + test + + + org.apache.seatunnel + connector-elasticsearch + ${project.version} + test + + + + org.testcontainers + elasticsearch + 1.17.3 + test + + + \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-elasticsearch-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/elasticsearch/ElasticsearchIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-elasticsearch-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/elasticsearch/ElasticsearchIT.java new file mode 100644 index 00000000000..eded55f569b --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-elasticsearch-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/elasticsearch/ElasticsearchIT.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.flink.v2.elasticsearch; + +import org.apache.seatunnel.common.utils.JsonUtils; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.ScrollResult; +import org.apache.seatunnel.e2e.flink.FlinkContainer; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.elasticsearch.ElasticsearchContainer; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.DockerLoggerFactory; + +import java.io.IOException; +import java.math.BigDecimal; +import java.net.UnknownHostException; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +@Slf4j +public class ElasticsearchIT extends FlinkContainer { + + private List testDataset; + + private ElasticsearchContainer container; + + private EsRestClient esRestClient; + + @BeforeEach + public void startMongoContainer() throws Exception { + container = new ElasticsearchContainer(DockerImageName.parse("elasticsearch:6.8.23").asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch")) + .withNetwork(NETWORK) + .withNetworkAliases("elasticsearch") + .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger("elasticsearch:6.8.23"))); + container.start(); + log.info("Elasticsearch container started"); + esRestClient = EsRestClient.createInstance(Lists.newArrayList(container.getHttpHostAddress()), "", ""); + testDataset = generateTestDataSet(); + createIndexDocs(); + } + + /** + * create a index,and bulk some documents + */ + private void createIndexDocs() { + StringBuilder requestBody = new StringBuilder(); + Map indexInner = new HashMap<>(); + indexInner.put("_index", "st"); + + Map> indexParam = new HashMap<>(); + indexParam.put("index", indexInner); + String indexHeader = "{\"index\":{\"_index\":\"st_index\",\"_type\":\"st\"}\n"; + for (int i = 0; i < testDataset.size(); i++) { + String row = testDataset.get(i); + requestBody.append(indexHeader); + requestBody.append(row); + requestBody.append("\n"); + } + esRestClient.bulk(requestBody.toString()); + } + + @Test + public void testElasticsearch() throws IOException, InterruptedException { + Container.ExecResult execResult = executeSeaTunnelFlinkJob("/elasticsearch/elasticsearch_source_and_sink.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + List sinData = readSinkData(); + Assertions.assertIterableEquals( + testDataset, + sinData + ); + } + + private List generateTestDataSet() throws JsonProcessingException, UnknownHostException { + String[] fields = new String[]{ + "c_map", + "c_array", + "c_string", + "c_boolean", + "c_tinyint", + "c_smallint", + "c_int", + "c_bigint", + "c_float", + "c_double", + "c_decimal", + "c_bytes", + "c_date", + "c_timestamp" + }; + + List documents = new ArrayList<>(); + ObjectMapper objectMapper = new ObjectMapper(); + for (int i = 0; i < 100; i++) { + Map doc = new HashMap<>(); + Object[] values = new Object[]{ + Collections.singletonMap("key", Short.parseShort(String.valueOf(i))), + new Byte[]{Byte.parseByte("1")}, + "string", + Boolean.FALSE, + Byte.parseByte("1"), + Short.parseShort("1"), + i, + Long.parseLong("1"), + Float.parseFloat("1.1"), + Double.parseDouble("1.1"), + BigDecimal.valueOf(11, 1), + "test".getBytes(), + LocalDate.now().toString(), + LocalDateTime.now().toString() + }; + for (int j = 0; j < fields.length; j++){ + doc.put(fields[j], values[j]); + } + documents.add(objectMapper.writeValueAsString(doc)); + } + return documents; + } + + private List readSinkData() throws InterruptedException { + //wait for index refresh + Thread.sleep(2000); + List source = Lists.newArrayList("c_map", "c_array", "c_string", "c_boolean", "c_tinyint", "c_smallint", "c_int", "c_bigint", "c_float", "c_double", "c_decimal", "c_bytes", "c_date", "c_timestamp"); + ScrollResult scrollResult = esRestClient.searchByScroll("st_index2", source, "1m", 1000); + scrollResult.getDocs().forEach(x -> { + x.remove("_index"); + x.remove("_type"); + x.remove("_id"); + }); + List docs = scrollResult.getDocs().stream().sorted(Comparator.comparingInt(o -> Integer.valueOf(o.get("c_int").toString()))).map(JsonUtils::toJsonString).collect(Collectors.toList()); + return docs; + } + + @AfterEach + public void close() { + esRestClient.close(); + container.close(); + } +} diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-elasticsearch-flink-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-elasticsearch-flink-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf new file mode 100644 index 00000000000..483fdeaeec3 --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-elasticsearch-flink-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf @@ -0,0 +1,64 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set flink configuration here + execution.parallelism = 1 + job.mode = "BATCH" + #execution.checkpoint.interval = 10000 + #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" +} + +source { + Elasticsearch { + hosts = ["elasticsearch:9200"] + index = "st_index" + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_decimal = "decimal(2, 1)" + c_bytes = bytes + c_date = date + c_timestamp = timestamp + } + } + } +} + +transform { +} + +sink { + Elasticsearch{ + hosts = ["elasticsearch:9200"] + index = "st_index2" + index_type = "st" + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-elasticsearch-flink-e2e/src/test/resources/log4j.properties b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-elasticsearch-flink-e2e/src/test/resources/log4j.properties new file mode 100644 index 00000000000..db5d9e51220 --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-elasticsearch-flink-e2e/src/test/resources/log4j.properties @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Set everything to be logged to the console +log4j.rootCategory=INFO, console +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/pom.xml new file mode 100644 index 00000000000..9f7a1a72ac3 --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/pom.xml @@ -0,0 +1,61 @@ + + + + + org.apache.seatunnel + seatunnel-spark-connector-v2-e2e + ${revision} + + 4.0.0 + + connector-elasticsearch-spark-e2e + + + + org.apache.seatunnel + connector-spark-e2e-base + ${project.version} + tests + test-jar + test + + + + + org.apache.seatunnel + connector-fake + ${project.version} + test + + + org.apache.seatunnel + connector-elasticsearch + ${project.version} + test + + + + + org.testcontainers + elasticsearch + 1.17.3 + test + + + + \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/elasticsearch/ElasticsearchIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/elasticsearch/ElasticsearchIT.java new file mode 100644 index 00000000000..81ac664eb81 --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/elasticsearch/ElasticsearchIT.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.spark.v2.elasticsearch; + +import org.apache.seatunnel.common.utils.JsonUtils; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.ScrollResult; +import org.apache.seatunnel.e2e.spark.SparkContainer; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.elasticsearch.ElasticsearchContainer; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.DockerLoggerFactory; + +import java.io.IOException; +import java.math.BigDecimal; +import java.net.UnknownHostException; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +@Slf4j +public class ElasticsearchIT extends SparkContainer { + + private List testDataset; + + private ElasticsearchContainer container; + + private EsRestClient esRestClient; + + @BeforeEach + public void startElasticsearchContainer() throws Exception { + container = new ElasticsearchContainer(DockerImageName.parse("elasticsearch:6.8.23").asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch")) + .withNetwork(NETWORK) + .withNetworkAliases("elasticsearch") + .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger("elasticsearch:6.8.23"))); + container.start(); + log.info("Elasticsearch container started"); + esRestClient = EsRestClient.createInstance(Lists.newArrayList(container.getHttpHostAddress()), "", ""); + testDataset = generateTestDataSet(); + createIndexDocs(); + + } + + /** + * create a index,and bulk some documents + */ + private void createIndexDocs() { + StringBuilder requestBody = new StringBuilder(); + Map indexInner = new HashMap<>(); + indexInner.put("_index", "st"); + + Map> indexParam = new HashMap<>(); + indexParam.put("index", indexInner); + String indexHeader = "{\"index\":{\"_index\":\"st_index\",\"_type\":\"st\"}\n"; + for (int i = 0; i < testDataset.size(); i++) { + String row = testDataset.get(i); + requestBody.append(indexHeader); + requestBody.append(row); + requestBody.append("\n"); + } + esRestClient.bulk(requestBody.toString()); + } + + @Test + public void testElasticsearch() throws IOException, InterruptedException { + Container.ExecResult execResult = executeSeaTunnelSparkJob("/elasticsearch/elasticsearch_source_and_sink.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + List sinData = readSinkData(); + Assertions.assertIterableEquals( + testDataset, + sinData + ); + } + + private List generateTestDataSet() throws JsonProcessingException, UnknownHostException { + String[] fields = new String[]{ + "c_map", + "c_array", + "c_string", + "c_boolean", + "c_tinyint", + "c_smallint", + "c_int", + "c_bigint", + "c_float", + "c_double", + "c_decimal", + "c_bytes", + "c_date", + "c_timestamp" + }; + + List documents = new ArrayList<>(); + ObjectMapper objectMapper = new ObjectMapper(); + for (int i = 0; i < 100; i++) { + Map doc = new HashMap<>(); + Object[] values = new Object[]{ + Collections.singletonMap("key", Short.parseShort(String.valueOf(i))), + new Byte[]{Byte.parseByte("1")}, + "string", + Boolean.FALSE, + Byte.parseByte("1"), + Short.parseShort("1"), + i, + Long.parseLong("1"), + Float.parseFloat("1.1"), + Double.parseDouble("1.1"), + BigDecimal.valueOf(11, 1), + "test".getBytes(), + LocalDate.now().toString(), + LocalDateTime.now().toString() + }; + for (int j = 0; j < fields.length; j++){ + doc.put(fields[j], values[j]); + } + documents.add(objectMapper.writeValueAsString(doc)); + } + return documents; + } + + private List readSinkData() throws InterruptedException{ + //wait for index refresh + Thread.sleep(2000); + List source = Lists.newArrayList("c_map", "c_array", "c_string", "c_boolean", "c_tinyint", "c_smallint", "c_int", "c_bigint", "c_float", "c_double", "c_decimal", "c_bytes", "c_date", "c_timestamp"); + ScrollResult scrollResult = esRestClient.searchByScroll("st_index2", source, "1m", 1000); + scrollResult.getDocs().forEach(x -> { + x.remove("_index"); + x.remove("_type"); + x.remove("_id"); + }); + List docs = scrollResult.getDocs().stream().sorted(Comparator.comparingInt(o -> Integer.valueOf(o.get("c_int").toString()))).map(JsonUtils::toJsonString).collect(Collectors.toList()); + return docs; + } + + @AfterEach + public void close() { + esRestClient.close(); + container.close(); + } +} diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf new file mode 100644 index 00000000000..9d4f7d8a8d2 --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf @@ -0,0 +1,67 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set spark configuration here + # see available properties defined by spark: https://spark.apache.org/docs/latest/configuration.html#available-properties + job.mode = "BATCH" + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + Elasticsearch { + hosts = ["elasticsearch:9200"] + index = "st_index" + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_decimal = "decimal(2, 1)" + c_bytes = bytes + c_date = date + c_timestamp = timestamp + } + } + } +} + +transform { +} + +sink { + Elasticsearch{ + hosts = ["elasticsearch:9200"] + index = "st_index2" + index_type = "st" + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/src/test/resources/log4j.properties b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/src/test/resources/log4j.properties new file mode 100644 index 00000000000..db5d9e51220 --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/src/test/resources/log4j.properties @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Set everything to be logged to the console +log4j.rootCategory=INFO, console +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n