From c5929d548edd694229481776ed3e49a189726455 Mon Sep 17 00:00:00 2001
From: iture123 <1011699225@qq.com>
Date: Mon, 1 Aug 2022 23:31:16 +0800
Subject: [PATCH 1/6] [Connector-V2][Elasticsearch-connector] new connecotor of
Elasticsearch sink(#2326)
---
plugin-mapping.properties | 1 +
seatunnel-connectors-v2-dist/pom.xml | 5 +
.../connector-elasticsearch/pom.xml | 49 +++++++
.../elasticsearch/client/EsRestClient.java | 120 ++++++++++++++++
.../elasticsearch/config/SinkConfig.java | 47 +++++++
.../elasticsearch/constant/BulkConfig.java | 37 +++++
.../constant/ElasticsearchVersion.java | 51 +++++++
.../elasticsearch/dto/BulkResponse.java | 61 +++++++++
.../elasticsearch/dto/IndexInfo.java | 53 +++++++
.../exception/BulkElasticsearchException.java | 30 ++++
.../GetElasticsearchVersionException.java | 25 ++++
.../serialize/ElasticsearchRowSerializer.java | 80 +++++++++++
.../serialize/SeaTunnelRowSerializer.java | 26 ++++
.../serialize/index/IndexSerializer.java | 28 ++++
.../index/IndexSerializerFactory.java | 37 +++++
.../index/impl/FixedValueIndexSerializer.java | 38 ++++++
.../index/impl/VariableIndexSerializer.java | 69 ++++++++++
.../serialize/type/IndexTypeSerializer.java | 27 ++++
.../type/IndexTypeSerializerFactory.java | 49 +++++++
.../type/impl/NotIndexTypeSerializer.java | 35 +++++
.../impl/RequiredIndexTypeSerializer.java | 39 ++++++
.../elasticsearch/sink/ElasticsearchSink.java | 76 +++++++++++
.../sink/ElasticsearchSinkWriter.java | 129 ++++++++++++++++++
.../ElasticsearchAggregatedCommitInfo.java | 27 ++++
.../state/ElasticsearchCommitInfo.java | 35 +++++
.../state/ElasticsearchSinkState.java | 24 ++++
.../elasticsearch/util/RegexUtils.java | 39 ++++++
seatunnel-connectors-v2/pom.xml | 1 +
28 files changed, 1238 insertions(+)
create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/pom.xml
create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java
create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/constant/BulkConfig.java
create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/constant/ElasticsearchVersion.java
create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/BulkResponse.java
create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/IndexInfo.java
create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/BulkElasticsearchException.java
create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/GetElasticsearchVersionException.java
create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/SeaTunnelRowSerializer.java
create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/index/IndexSerializer.java
create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/index/IndexSerializerFactory.java
create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/index/impl/FixedValueIndexSerializer.java
create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/index/impl/VariableIndexSerializer.java
create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/IndexTypeSerializer.java
create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/IndexTypeSerializerFactory.java
create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/impl/NotIndexTypeSerializer.java
create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/impl/RequiredIndexTypeSerializer.java
create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/state/ElasticsearchAggregatedCommitInfo.java
create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/state/ElasticsearchCommitInfo.java
create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/state/ElasticsearchSinkState.java
create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/util/RegexUtils.java
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index da3890a70a0..cadb9414065 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -101,3 +101,4 @@ seatunnel.sink.Clickhouse = connector-clickhouse
seatunnel.sink.ClickhouseFile = connector-clickhouse
seatunnel.source.Jdbc = connector-jdbc
seatunnel.sink.Jdbc = connector-jdbc
+seatunnel.sink.elasticsearch = connector-elasticsearch
diff --git a/seatunnel-connectors-v2-dist/pom.xml b/seatunnel-connectors-v2-dist/pom.xml
index 9e349fbb99d..0997fdb8b01 100644
--- a/seatunnel-connectors-v2-dist/pom.xml
+++ b/seatunnel-connectors-v2-dist/pom.xml
@@ -80,6 +80,11 @@
connector-hive
${project.version}
+
+ org.apache.seatunnel
+ connector-elasticsearch
+ ${project.version}
+
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/pom.xml b/seatunnel-connectors-v2/connector-elasticsearch/pom.xml
new file mode 100644
index 00000000000..5406a0ef032
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/pom.xml
@@ -0,0 +1,49 @@
+
+
+
+
+ org.apache.seatunnel
+ seatunnel-connectors-v2
+ ${revision}
+
+ 4.0.0
+
+ connector-elasticsearch
+
+
+
+ org.apache.seatunnel
+ seatunnel-api
+ ${project.version}
+
+
+ org.elasticsearch.client
+ elasticsearch-rest-client
+ 8.1.3
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
+
+
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
new file mode 100644
index 00000000000..c8bc9aced1d
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.client;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.util.EntityUtils;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.BulkElasticsearchException;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.GetElasticsearchVersionException;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+
+import java.io.IOException;
+import java.util.List;
+
+public class EsRestClient {
+
+ private static EsRestClient esRestClient;
+ private static RestClient restClient;
+
+ private EsRestClient() {
+
+ }
+
+ private static RestClientBuilder getRestClientBuilder(List hosts, String username, String password) {
+ HttpHost[] httpHosts = new HttpHost[hosts.size()];
+ for (int i = 0; i < hosts.size(); i++) {
+ String[] hostInfo = hosts.get(i).replace("http://", "").split(":");
+ httpHosts[i] = new HttpHost(hostInfo[0], Integer.parseInt(hostInfo[1]));
+ }
+
+ RestClientBuilder builder = RestClient.builder(httpHosts)
+ .setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder
+ .setConnectionRequestTimeout(10 * 1000)
+ .setSocketTimeout(5 * 60 * 1000));
+
+ if (StringUtils.isNotEmpty(username)) {
+ CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+ credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
+ builder.setHttpClientConfigCallback(httpAsyncClientBuilder -> httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
+ }
+ return builder;
+ }
+
+ public static EsRestClient getInstance(List hosts, String username, String password) {
+ if (restClient == null) {
+ RestClientBuilder restClientBuilder = getRestClientBuilder(hosts, username, password);
+ restClient = restClientBuilder.build();
+ esRestClient = new EsRestClient();
+ }
+ return esRestClient;
+ }
+
+ public BulkResponse bulk(String requestBody) {
+ Request request = new Request("POST", "_bulk");
+ request.setJsonEntity(requestBody);
+ try {
+ Response response = restClient.performRequest(request);
+ if (response.getStatusLine().getStatusCode() == 200) {
+ ObjectMapper objectMapper = new ObjectMapper();
+
+ String entity = EntityUtils.toString(response.getEntity());
+ JsonNode json = objectMapper.readTree(entity);
+ int took = json.get("took").asInt();
+ boolean errors = json.get("errors").asBoolean();
+ return new BulkResponse(errors, took, entity);
+ } else {
+ throw new BulkElasticsearchException(String.format("bulk es response status code=%d,request boy=%s", response.getStatusLine().getStatusCode(), requestBody));
+ }
+ } catch (IOException e) {
+ throw new BulkElasticsearchException(String.format("bulk es error,request boy=%s", requestBody), e);
+ }
+ }
+
+ /**
+ * @return version.number, example:2.0.0
+ */
+ public static String getClusterVersion() {
+ Request request = new Request("GET", "/");
+ try {
+ Response response = restClient.performRequest(request);
+ String result = EntityUtils.toString(response.getEntity());
+ ObjectMapper objectMapper = new ObjectMapper();
+ JsonNode jsonNode = objectMapper.readTree(result);
+ JsonNode versionNode = jsonNode.get("version");
+ return versionNode.get("number").asText();
+ } catch (IOException e) {
+ throw new GetElasticsearchVersionException("fail to get elasticsearch version.", e);
+ }
+ }
+
+ public void close() throws IOException {
+ restClient.close();
+ }
+
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java
new file mode 100644
index 00000000000..f747fad8583
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.config;
+
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.BulkConfig;
+
+public class SinkConfig {
+
+ public static final String INDEX = "index";
+
+ public static final String INDEX_TYPE = "index_type";
+
+ public static final String USERNAME = "username";
+
+ public static final String PASSWORD = "password";
+
+ public static final String HOSTS = "hosts";
+
+ public static final String MAX_BATCH_SIZE = "max_batch_size";
+
+ public static final String MAX_RETRY_SIZE = "max_retry_size";
+
+ public static void setValue(org.apache.seatunnel.shade.com.typesafe.config.Config pluginConfig){
+ if(pluginConfig.hasPath(MAX_BATCH_SIZE)){
+ BulkConfig.MAX_BATCH_SIZE = pluginConfig.getInt(MAX_BATCH_SIZE);
+ }
+ if(pluginConfig.hasPath(MAX_RETRY_SIZE)){
+ BulkConfig.MAX_RETRY_SIZE = pluginConfig.getInt(MAX_RETRY_SIZE);
+ }
+ }
+
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/constant/BulkConfig.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/constant/BulkConfig.java
new file mode 100644
index 00000000000..b6108dc4790
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/constant/BulkConfig.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant;
+
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig;
+
+/**
+ * bulk es config
+ */
+public class BulkConfig {
+ /**
+ * once bulk es include max document size
+ * {@link SinkConfig#MAX_BATCH_SIZE}
+ */
+ public static int MAX_BATCH_SIZE = 10;
+
+ /**
+ * the max retry size of bulk es
+ * {@link SinkConfig#MAX_RETRY_SIZE}
+ */
+ public static int MAX_RETRY_SIZE = 3;
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/constant/ElasticsearchVersion.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/constant/ElasticsearchVersion.java
new file mode 100644
index 00000000000..747ba7d2a06
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/constant/ElasticsearchVersion.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant;
+
+public enum ElasticsearchVersion {
+ ES2(2), ES5(5), ES6(6), ES7(7), ES8(8);
+
+ private int version;
+
+ ElasticsearchVersion(int version) {
+ this.version = version;
+ }
+
+ public int getVersion() {
+ return version;
+ }
+
+ public void setVersion(int version) {
+ this.version = version;
+ }
+
+ public static ElasticsearchVersion get(int version) {
+ for (ElasticsearchVersion elasticsearchVersion : ElasticsearchVersion.values()) {
+ if (elasticsearchVersion.getVersion() == version) {
+ return elasticsearchVersion;
+ }
+ }
+ throw new IllegalArgumentException(String.format("version=%d,fail fo find ElasticsearchVersion.", version));
+ }
+
+ public static ElasticsearchVersion get(String clusterVersion) {
+ String[] versionArr = clusterVersion.split("\\.");
+ int version = Integer.parseInt(versionArr[0]);
+ return get(version);
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/BulkResponse.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/BulkResponse.java
new file mode 100644
index 00000000000..348107952eb
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/BulkResponse.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto;
+
+/**
+ * the response of bulk ES by http request
+ */
+public class BulkResponse {
+
+ private boolean errors;
+ private int took;
+ private String response;
+
+ public BulkResponse() {
+ }
+
+ public BulkResponse(boolean errors, int took, String response) {
+ this.errors = errors;
+ this.took = took;
+ this.response = response;
+ }
+
+ public boolean isErrors() {
+ return errors;
+ }
+
+ public void setErrors(boolean errors) {
+ this.errors = errors;
+ }
+
+ public int getTook() {
+ return took;
+ }
+
+ public void setTook(int took) {
+ this.took = took;
+ }
+
+ public String getResponse() {
+ return response;
+ }
+
+ public void setResponse(String response) {
+ this.response = response;
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/IndexInfo.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/IndexInfo.java
new file mode 100644
index 00000000000..0c6907a4b38
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/IndexInfo.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto;
+
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig;
+
+/**
+ * index config by seatunnel
+ */
+public class IndexInfo {
+
+ private String index;
+ private String type;
+
+ public IndexInfo(org.apache.seatunnel.shade.com.typesafe.config.Config pluginConfig) {
+ index = pluginConfig.getString(SinkConfig.INDEX);
+ if (pluginConfig.hasPath(SinkConfig.INDEX_TYPE)) {
+ type = pluginConfig.getString(SinkConfig.INDEX_TYPE);
+ }
+ }
+
+ public String getIndex() {
+ return index;
+ }
+
+ public void setIndex(String index) {
+ this.index = index;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/BulkElasticsearchException.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/BulkElasticsearchException.java
new file mode 100644
index 00000000000..14dbfb7aed4
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/BulkElasticsearchException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception;
+
+public class BulkElasticsearchException extends RuntimeException {
+
+ public BulkElasticsearchException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public BulkElasticsearchException(String message) {
+ super(message);
+ }
+
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/GetElasticsearchVersionException.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/GetElasticsearchVersionException.java
new file mode 100644
index 00000000000..c146de07ad6
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/GetElasticsearchVersionException.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception;
+
+public class GetElasticsearchVersionException extends RuntimeException {
+
+ public GetElasticsearchVersionException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
new file mode 100644
index 00000000000..06c5581bbec
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.IndexInfo;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.ElasticsearchVersion;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.index.IndexSerializer;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.index.IndexSerializerFactory;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.type.IndexTypeSerializer;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.type.IndexTypeSerializerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * use in elasticsearch version >= 7.*
+ */
+public class ElasticsearchRowSerializer implements SeaTunnelRowSerializer{
+ private final SeaTunnelRowType seaTunnelRowType;
+ private final ObjectMapper objectMapper = new ObjectMapper();
+
+ private final IndexSerializer indexSerializer;
+
+ private final IndexTypeSerializer indexTypeSerializer;
+
+ public ElasticsearchRowSerializer(ElasticsearchVersion elasticsearchVersion, IndexInfo indexInfo, SeaTunnelRowType seaTunnelRowType) {
+ this.indexTypeSerializer = IndexTypeSerializerFactory.getIndexTypeSerializer(elasticsearchVersion,indexInfo.getType());
+ this.indexSerializer = IndexSerializerFactory.getIndexSerializer(indexInfo.getIndex(),seaTunnelRowType);
+ this.seaTunnelRowType = seaTunnelRowType;
+ }
+
+ @Override
+ public String serializeRow(SeaTunnelRow row){
+ String[] fieldNames = seaTunnelRowType.getFieldNames();
+ Map doc = new HashMap<>(fieldNames.length);
+ Object[] fields = row.getFields();
+ for (int i = 0; i < fieldNames.length; i++) {
+ doc.put(fieldNames[i], fields[i]);
+ }
+
+ StringBuilder sb = new StringBuilder();
+
+ Map indexInner = new HashMap<>();
+ String index = indexSerializer.serialize(row);
+ indexInner.put("_index",index);
+ indexTypeSerializer.fillType(indexInner);
+
+ Map> indexParam = new HashMap<>();
+ indexParam.put("index",indexInner);
+ try {
+ sb.append(objectMapper.writeValueAsString(indexParam));
+ sb.append("\n");
+ String indexDoc = objectMapper.writeValueAsString(doc);
+ sb.append(indexDoc);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException("Object json deserialization exception.", e);
+ }
+
+ return sb.toString();
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/SeaTunnelRowSerializer.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/SeaTunnelRowSerializer.java
new file mode 100644
index 00000000000..d1fbae8a4f6
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/SeaTunnelRowSerializer.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize;
+
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+public interface SeaTunnelRowSerializer {
+
+ String serializeRow(SeaTunnelRow row);
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/index/IndexSerializer.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/index/IndexSerializer.java
new file mode 100644
index 00000000000..67dd8945507
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/index/IndexSerializer.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.index;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+/**
+ * index is a variable
+ */
+public interface IndexSerializer {
+
+ String serialize(SeaTunnelRow row);
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/index/IndexSerializerFactory.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/index/IndexSerializerFactory.java
new file mode 100644
index 00000000000..152181c3e24
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/index/IndexSerializerFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.index;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.index.impl.FixedValueIndexSerializer;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.index.impl.VariableIndexSerializer;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.util.RegexUtils;
+
+import java.util.List;
+
+public class IndexSerializerFactory {
+
+ public static IndexSerializer getIndexSerializer(String index, SeaTunnelRowType seaTunnelRowType) {
+ List fieldNames = RegexUtils.extractDatas(index, "\\$\\{(.*?)\\}");
+ if (fieldNames != null && fieldNames.size() > 0) {
+ return new VariableIndexSerializer(seaTunnelRowType, index, fieldNames);
+ } else {
+ return new FixedValueIndexSerializer(index);
+ }
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/index/impl/FixedValueIndexSerializer.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/index/impl/FixedValueIndexSerializer.java
new file mode 100644
index 00000000000..07d5dd5b614
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/index/impl/FixedValueIndexSerializer.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.index.impl;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.index.IndexSerializer;
+
+/**
+ * index is a fixed value,not a variable
+ */
+public class FixedValueIndexSerializer implements IndexSerializer {
+
+ private final String index;
+
+ public FixedValueIndexSerializer(String index) {
+ this.index = index;
+ }
+
+ @Override
+ public String serialize(SeaTunnelRow row) {
+ return index;
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/index/impl/VariableIndexSerializer.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/index/impl/VariableIndexSerializer.java
new file mode 100644
index 00000000000..7997632359a
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/index/impl/VariableIndexSerializer.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.index.impl;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.index.IndexSerializer;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * index include variable
+ */
+public class VariableIndexSerializer implements IndexSerializer {
+
+ private final String index;
+ private final Map fieldIndexMap;
+
+ private final String NULL_DEFAULT = "null";
+
+ public VariableIndexSerializer(SeaTunnelRowType seaTunnelRowType, String index, List fieldNames) {
+ this.index = index;
+ String[] rowFieldNames = seaTunnelRowType.getFieldNames();
+ fieldIndexMap = new HashMap<>(rowFieldNames.length);
+ for (int i = 0; i < rowFieldNames.length; i++) {
+ if (fieldNames.contains(rowFieldNames[i])) {
+ fieldIndexMap.put(rowFieldNames[i], i);
+ }
+ }
+ }
+
+ @Override
+ public String serialize(SeaTunnelRow row) {
+ String indexName = this.index;
+ for (Map.Entry fieldIndexEntry : fieldIndexMap.entrySet()) {
+ String fieldName = fieldIndexEntry.getKey();
+ int fieldIndex = fieldIndexEntry.getValue();
+ String value = getValue(fieldIndex, row);
+ indexName = indexName.replace(String.format("${%s}", fieldName), value);
+ }
+ return indexName.toLowerCase();
+ }
+
+ private String getValue(int fieldIndex, SeaTunnelRow row) {
+ Object valueObj = row.getField(fieldIndex);
+ if (valueObj == null) {
+ return NULL_DEFAULT;
+ } else {
+ return valueObj.toString();
+ }
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/IndexTypeSerializer.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/IndexTypeSerializer.java
new file mode 100644
index 00000000000..3e528058ed1
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/IndexTypeSerializer.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.type;
+
+
+import java.util.Map;
+
+public interface IndexTypeSerializer {
+
+ void fillType(Map indexInner);
+
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/IndexTypeSerializerFactory.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/IndexTypeSerializerFactory.java
new file mode 100644
index 00000000000..878257cb631
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/IndexTypeSerializerFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.type;
+
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.ElasticsearchVersion;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.type.impl.NotIndexTypeSerializer;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.type.impl.RequiredIndexTypeSerializer;
+
+import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.ElasticsearchVersion.*;
+
+public class IndexTypeSerializerFactory {
+
+ private static final String DEFAULT_TYPE = "st";
+
+ private IndexTypeSerializerFactory() {
+
+ }
+
+ public static IndexTypeSerializer getIndexTypeSerializer(ElasticsearchVersion elasticsearchVersion, String type) {
+ if (elasticsearchVersion == ES2 || elasticsearchVersion == ES5) {
+ if (type == null || "".equals(type)) {
+ type = DEFAULT_TYPE;
+ }
+ return new RequiredIndexTypeSerializer(type);
+ }
+ if (elasticsearchVersion == ES6) {
+ if (type != null && !"".equals(type)) {
+ return new RequiredIndexTypeSerializer(type);
+ }
+ }
+ return new NotIndexTypeSerializer();
+ }
+
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/impl/NotIndexTypeSerializer.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/impl/NotIndexTypeSerializer.java
new file mode 100644
index 00000000000..fa5afb5b85c
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/impl/NotIndexTypeSerializer.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.type.impl;
+
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.type.IndexTypeSerializer;
+
+import java.util.Map;
+
+/**
+ * not need an index type for elasticsearch version:6.*,7.*,8.*
+ */
+public class NotIndexTypeSerializer implements IndexTypeSerializer {
+
+
+ @Override
+ public void fillType(Map indexInner) {
+
+ }
+
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/impl/RequiredIndexTypeSerializer.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/impl/RequiredIndexTypeSerializer.java
new file mode 100644
index 00000000000..11e4f1d5155
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/impl/RequiredIndexTypeSerializer.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.type.impl;
+
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.type.IndexTypeSerializer;
+
+import java.util.Map;
+
+/**
+ * generate an index type for elasticsearch version:2.*,5.*,6.*
+ */
+public class RequiredIndexTypeSerializer implements IndexTypeSerializer {
+
+ private final String type;
+
+ public RequiredIndexTypeSerializer(String type) {
+ this.type = type;
+ }
+
+ @Override
+ public void fillType(Map indexInner) {
+ indexInner.put("_type", type);
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
new file mode 100644
index 00000000000..a5eac83acd7
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink;
+
+import com.google.auto.service.AutoService;
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchAggregatedCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchSinkState;
+
+import java.util.Collections;
+
+
+@AutoService(SeaTunnelSink.class)
+public class ElasticsearchSink implements SeaTunnelSink {
+
+
+ private org.apache.seatunnel.shade.com.typesafe.config.Config pluginConfig;
+ private SeaTunnelContext seaTunnelContext;
+ private SeaTunnelRowType seaTunnelRowType;
+
+
+ @Override
+ public String getPluginName() {
+ return "Elasticsearch";
+ }
+
+ @Override
+ public void prepare(org.apache.seatunnel.shade.com.typesafe.config.Config pluginConfig) throws PrepareFailException {
+ this.pluginConfig = pluginConfig;
+ SinkConfig.setValue(pluginConfig);
+ }
+
+ @Override
+ public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+ this.seaTunnelRowType = seaTunnelRowType;
+ }
+
+ @Override
+ public SeaTunnelDataType getConsumedType() {
+ return this.seaTunnelRowType;
+ }
+
+ @Override
+ public SinkWriter createWriter(SinkWriter.Context context) {
+ return new ElasticsearchSinkWriter(context, seaTunnelRowType, pluginConfig, Collections.emptyList());
+ }
+
+ @Override
+ public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
+ this.seaTunnelContext = seaTunnelContext;
+ }
+
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
new file mode 100644
index 00000000000..048f48f441d
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink;
+
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.BulkConfig;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.ElasticsearchVersion;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.IndexInfo;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.BulkElasticsearchException;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.ElasticsearchRowSerializer;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.SeaTunnelRowSerializer;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchCommitInfo;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * ElasticsearchSinkWriter is a sink writer that will write {@link SeaTunnelRow} to Elasticsearch.
+ */
+public class ElasticsearchSinkWriter implements SinkWriter {
+
+ private final Context context;
+
+ private final SeaTunnelRowSerializer seaTunnelRowSerializer;
+ private final List requestEsList;
+ private EsRestClient esRestClient;
+
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchSinkWriter.class);
+
+
+ public ElasticsearchSinkWriter(
+ Context context,
+ SeaTunnelRowType seaTunnelRowType,
+ Config pluginConfig,
+ List elasticsearchStates) {
+ this.context = context;
+
+ IndexInfo indexInfo = new IndexInfo(pluginConfig);
+ initRestClient(pluginConfig);
+ ElasticsearchVersion elasticsearchVersion = ElasticsearchVersion.get(EsRestClient.getClusterVersion());
+ this.seaTunnelRowSerializer = new ElasticsearchRowSerializer(elasticsearchVersion, indexInfo, seaTunnelRowType);
+
+ this.requestEsList = new ArrayList<>(BulkConfig.MAX_BATCH_SIZE);
+ }
+
+ private void initRestClient(org.apache.seatunnel.shade.com.typesafe.config.Config pluginConfig) {
+ List hosts = pluginConfig.getStringList(SinkConfig.HOSTS);
+ String username = null;
+ String password = null;
+ if (pluginConfig.hasPath(SinkConfig.USERNAME)) {
+ username = pluginConfig.getString(SinkConfig.USERNAME);
+ if (pluginConfig.hasPath(SinkConfig.PASSWORD)) {
+ password = pluginConfig.getString(SinkConfig.PASSWORD);
+ }
+ }
+ esRestClient = EsRestClient.getInstance(hosts, username, password);
+ }
+
+ @Override
+ public void write(SeaTunnelRow element) {
+ String indexRequestRow = seaTunnelRowSerializer.serializeRow(element);
+ requestEsList.add(indexRequestRow);
+ if (requestEsList.size() >= BulkConfig.MAX_BATCH_SIZE) {
+ bulkEsWithRetry(this.esRestClient, this.requestEsList, BulkConfig.MAX_RETRY_SIZE);
+ requestEsList.clear();
+ }
+ }
+
+ @Override
+ public Optional prepareCommit() {
+ return Optional.empty();
+ }
+
+ @Override
+ public void abortPrepare() {
+ }
+
+ public void bulkEsWithRetry(EsRestClient esRestClient, List requestEsList, int maxRetry) {
+ for (int tryCnt = 1; tryCnt <= maxRetry; tryCnt++) {
+ if (requestEsList.size() > 0) {
+ String requestBody = String.join("\n", requestEsList) + "\n";
+ try {
+ BulkResponse bulkResponse = esRestClient.bulk(requestBody);
+ if (!bulkResponse.isErrors()) {
+ break;
+ }
+ } catch (Exception ex) {
+ if (tryCnt == maxRetry) {
+ throw new BulkElasticsearchException("bulk es error,try count=%d", ex);
+ }
+ LOGGER.warn(String.format("bulk es error,try count=%d", tryCnt), ex);
+ }
+
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ bulkEsWithRetry(this.esRestClient, this.requestEsList, BulkConfig.MAX_RETRY_SIZE);
+ esRestClient.close();
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/state/ElasticsearchAggregatedCommitInfo.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/state/ElasticsearchAggregatedCommitInfo.java
new file mode 100644
index 00000000000..6a0057b2be7
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/state/ElasticsearchAggregatedCommitInfo.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.state;
+
+import java.io.Serializable;
+
+/**
+ * Right now, we don't need aggregated commit in kafka.
+ * Todo: we need to add a default implementation of this state.
+ */
+public class ElasticsearchAggregatedCommitInfo implements Serializable {
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/state/ElasticsearchCommitInfo.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/state/ElasticsearchCommitInfo.java
new file mode 100644
index 00000000000..2fea90d3186
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/state/ElasticsearchCommitInfo.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.state;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.Properties;
+
+@Data
+@AllArgsConstructor
+public class ElasticsearchCommitInfo implements Serializable {
+
+ private final String transactionId;
+ private final Properties kafkaProperties;
+ private final long producerId;
+ private final short epoch;
+
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/state/ElasticsearchSinkState.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/state/ElasticsearchSinkState.java
new file mode 100644
index 00000000000..7cd01c65d66
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/state/ElasticsearchSinkState.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.state;
+
+import java.io.Serializable;
+
+public class ElasticsearchSinkState implements Serializable {
+
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/util/RegexUtils.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/util/RegexUtils.java
new file mode 100644
index 00000000000..9ccc413ff03
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/util/RegexUtils.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.util;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+
+import java.util.regex.Pattern;
+
+public class RegexUtils {
+
+ public static List extractDatas(String content, String regex) {
+ List datas = new ArrayList<>();
+ Pattern pattern = Pattern.compile(regex, Pattern.DOTALL);
+ Matcher matcher = pattern.matcher(content);
+ while (matcher.find()) {
+ String result = matcher.group(1);
+ datas.add(result);
+ }
+ return datas;
+ }
+
+}
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index be62464f81b..6c3ad6eb038 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -43,6 +43,7 @@
connector-pulsar
connector-socket
connector-assert
+ connector-elasticsearch
From 8bcc6500bb5f0753b8aa41066f87fab9af5b516d Mon Sep 17 00:00:00 2001
From: iture123 <1011699225@qq.com>
Date: Mon, 1 Aug 2022 23:32:07 +0800
Subject: [PATCH 2/6] [Connector-V2][Elasticsearch-connector] the document of
Elasticsearch sink(#2326)
---
docs/en/new-connector/sink/Elasticsearch.md | 56 +++++++++++++++++++++
1 file changed, 56 insertions(+)
create mode 100644 docs/en/new-connector/sink/Elasticsearch.md
diff --git a/docs/en/new-connector/sink/Elasticsearch.md b/docs/en/new-connector/sink/Elasticsearch.md
new file mode 100644
index 00000000000..c8fbb551e7e
--- /dev/null
+++ b/docs/en/new-connector/sink/Elasticsearch.md
@@ -0,0 +1,56 @@
+# Elasticsearch
+
+## Description
+
+Output data to `Elasticsearch`.
+
+:::tip
+
+Engine Supported
+
+* supported `ElasticSearch version is >= 2.x and < 8.x`
+
+:::
+
+## Options
+
+| name | type | required | default value |
+|-------------------| ------ | -------- |---------------|
+| hosts | array | yes | - |
+| index | string | yes | - |
+| index_type | string | no | |
+| username | string | no | |
+| password | string | no | |
+| max_retry_size | int | no | 3 |
+| max_batch_size | int | no | 10 |
+
+
+
+### hosts [array]
+`Elasticsearch` cluster http address, the format is `host:port` , allowing multiple hosts to be specified. Such as `["host1:9200", "host2:9200"]`.
+
+### index [string]
+`Elasticsearch` `index` name.Index support contains variables of field name,such as `seatunnel_${age}`,and the field must appear at seatunnel row.
+
+### index_type [string]
+`Elasticsearch` index type, it is recommended not to specify in elasticsearch 6 and above
+
+### username [string]
+x-pack username
+
+### password [string]
+x-pack password
+
+### max_retry_size [int]
+one bulk request max try size
+
+### max_batch_size [int]
+batch bulk doc max size
+
+## Examples
+```bash
+Elasticsearch {
+ hosts = ["localhost:9200"]
+ index = "seatunnel-${age}"
+}
+```
From 2ec70c8952d30b3dd51e57dec37d044f0cd31dd0 Mon Sep 17 00:00:00 2001
From: iture123 <1011699225@qq.com>
Date: Tue, 2 Aug 2022 22:31:24 +0800
Subject: [PATCH 3/6] [Connector-V2][Elasticsearch-connector] Optimize bulk es
Response code (#2326)
---
.../seatunnel/elasticsearch/client/EsRestClient.java | 7 +++++--
1 file changed, 5 insertions(+), 2 deletions(-)
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
index c8bc9aced1d..5a2a3df0995 100644
--- a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
@@ -21,6 +21,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHost;
+import org.apache.http.HttpStatus;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
@@ -80,9 +81,11 @@ public BulkResponse bulk(String requestBody) {
request.setJsonEntity(requestBody);
try {
Response response = restClient.performRequest(request);
- if (response.getStatusLine().getStatusCode() == 200) {
+ if (response == null) {
+ throw new BulkElasticsearchException("bulk es Response is null");
+ }
+ if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
ObjectMapper objectMapper = new ObjectMapper();
-
String entity = EntityUtils.toString(response.getEntity());
JsonNode json = objectMapper.readTree(entity);
int took = json.get("took").asInt();
From 191f31f2ade9ed1348d6c4e90ddf71beaa3a9884 Mon Sep 17 00:00:00 2001
From: iture123 <1011699225@qq.com>
Date: Tue, 2 Aug 2022 22:42:16 +0800
Subject: [PATCH 4/6] [Connector-V2][Elasticsearch-connector] The dependent
jars version managed uniformly in the root pom (#2326)
---
pom.xml | 1 +
seatunnel-connectors-v2/connector-elasticsearch/pom.xml | 2 +-
2 files changed, 2 insertions(+), 1 deletion(-)
diff --git a/pom.xml b/pom.xml
index 23e15e3c4b3..f4c89d528f4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -185,6 +185,7 @@
org.apache.seatunnel.shade
4.3.0
1.1.8.3
+ 8.1.3
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/pom.xml b/seatunnel-connectors-v2/connector-elasticsearch/pom.xml
index 5406a0ef032..7a6b6c8dc3b 100644
--- a/seatunnel-connectors-v2/connector-elasticsearch/pom.xml
+++ b/seatunnel-connectors-v2/connector-elasticsearch/pom.xml
@@ -38,7 +38,7 @@
org.elasticsearch.client
elasticsearch-rest-client
- 8.1.3
+ ${elasticsearch-rest-client.version}
com.fasterxml.jackson.core
From fd2bff78269aa7abe3a3813159dea09738c0de49 Mon Sep 17 00:00:00 2001
From: iture123 <1011699225@qq.com>
Date: Wed, 3 Aug 2022 23:15:29 +0800
Subject: [PATCH 5/6] [Connector-V2][Elasticsearch-connector] add
elasticsearch-rest-client dependency(#2326)
---
tools/dependencies/known-dependencies.txt | 1 +
1 file changed, 1 insertion(+)
diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt
index 9411a44bde3..cd67140da98 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -157,6 +157,7 @@ elasticsearch-core-7.5.1.jar
elasticsearch-geo-7.5.1.jar
elasticsearch-rest-client-6.3.1.jar
elasticsearch-rest-client-7.5.1.jar
+elasticsearch-rest-client-8.1.3.jar
elasticsearch-rest-high-level-client-6.3.1.jar
elasticsearch-rest-high-level-client-7.5.1.jar
elasticsearch-secure-sm-6.3.1.jar
From 7935cc0587e7e62196f08bb097f74dbd9aaa0a54 Mon Sep 17 00:00:00 2001
From: iture123 <1011699225@qq.com>
Date: Fri, 5 Aug 2022 22:47:48 +0800
Subject: [PATCH 6/6] [Connector-V2][Elasticsearch-connector] change
elasticsearch-rest-client version to 7.5.1 that existed before (#2326)
---
pom.xml | 2 +-
tools/dependencies/known-dependencies.txt | 1 -
2 files changed, 1 insertion(+), 2 deletions(-)
diff --git a/pom.xml b/pom.xml
index 65dea4ddf0e..401c89d5c51 100644
--- a/pom.xml
+++ b/pom.xml
@@ -222,7 +222,7 @@
6.2.2.Final
1.14.3
1.3.2
- 8.1.3
+ 7.5.1
diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt
index bf276654c40..3aeb12dbaf1 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -132,7 +132,6 @@ elasticsearch-core-7.5.1.jar
elasticsearch-geo-7.5.1.jar
elasticsearch-rest-client-6.3.1.jar
elasticsearch-rest-client-7.5.1.jar
-elasticsearch-rest-client-8.1.3.jar
elasticsearch-rest-high-level-client-6.3.1.jar
elasticsearch-rest-high-level-client-7.5.1.jar
elasticsearch-secure-sm-6.3.1.jar