Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Connector-V2] new connecotor of Elasticsearch sink(#2326) #2330

Merged
merged 7 commits into from
Aug 5, 2022
56 changes: 56 additions & 0 deletions docs/en/new-connector/sink/Elasticsearch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Elasticsearch

## Description

Output data to `Elasticsearch`.

:::tip

Engine Supported

* supported `ElasticSearch version is >= 2.x and < 8.x`

:::

## Options

| name | type | required | default value |
|-------------------| ------ | -------- |---------------|
| hosts | array | yes | - |
| index | string | yes | - |
| index_type | string | no | |
| username | string | no | |
| password | string | no | |
| max_retry_size | int | no | 3 |
| max_batch_size | int | no | 10 |



### hosts [array]
`Elasticsearch` cluster http address, the format is `host:port` , allowing multiple hosts to be specified. Such as `["host1:9200", "host2:9200"]`.

### index [string]
`Elasticsearch` `index` name.Index support contains variables of field name,such as `seatunnel_${age}`,and the field must appear at seatunnel row.

### index_type [string]
`Elasticsearch` index type, it is recommended not to specify in elasticsearch 6 and above

### username [string]
x-pack username

### password [string]
x-pack password

### max_retry_size [int]
one bulk request max try size

### max_batch_size [int]
batch bulk doc max size

## Examples
```bash
Elasticsearch {
hosts = ["localhost:9200"]
index = "seatunnel-${age}"
}
```
1 change: 1 addition & 0 deletions plugin-mapping.properties
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,4 @@ seatunnel.sink.LocalFile = connector-file-local
seatunnel.source.Pulsar = connector-pulsar
seatunnel.source.Hudi = connector-hudi
seatunnel.sink.DingTalk = connector-dingtalk
seatunnel.sink.elasticsearch = connector-elasticsearch
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@
<hibernate.validator.version>6.2.2.Final</hibernate.validator.version>
<jsoup.version>1.14.3</jsoup.version>
<javax.annotation-api.version>1.3.2</javax.annotation-api.version>
<elasticsearch-rest-client.version>7.5.1</elasticsearch-rest-client.version>
</properties>

<dependencyManagement>
Expand Down
5 changes: 5 additions & 0 deletions seatunnel-connectors-v2-dist/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@
<artifactId>connector-email</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-elasticsearch</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

<build>
Expand Down
49 changes: 49 additions & 0 deletions seatunnel-connectors-v2/connector-elasticsearch/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--

Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-connectors-v2</artifactId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>connector-elasticsearch</artifactId>

<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>${elasticsearch-rest-client.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.elasticsearch.client;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHost;
import org.apache.http.HttpStatus;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.util.EntityUtils;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.BulkElasticsearchException;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.GetElasticsearchVersionException;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;

import java.io.IOException;
import java.util.List;

public class EsRestClient {

private static EsRestClient esRestClient;
private static RestClient restClient;

private EsRestClient() {

}

private static RestClientBuilder getRestClientBuilder(List<String> hosts, String username, String password) {
HttpHost[] httpHosts = new HttpHost[hosts.size()];
for (int i = 0; i < hosts.size(); i++) {
String[] hostInfo = hosts.get(i).replace("http://", "").split(":");
httpHosts[i] = new HttpHost(hostInfo[0], Integer.parseInt(hostInfo[1]));
}

RestClientBuilder builder = RestClient.builder(httpHosts)
.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder
.setConnectionRequestTimeout(10 * 1000)
.setSocketTimeout(5 * 60 * 1000));

if (StringUtils.isNotEmpty(username)) {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
builder.setHttpClientConfigCallback(httpAsyncClientBuilder -> httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
}
return builder;
}

public static EsRestClient getInstance(List<String> hosts, String username, String password) {
if (restClient == null) {
RestClientBuilder restClientBuilder = getRestClientBuilder(hosts, username, password);
restClient = restClientBuilder.build();
esRestClient = new EsRestClient();
}
return esRestClient;
}

public BulkResponse bulk(String requestBody) {
Request request = new Request("POST", "_bulk");
request.setJsonEntity(requestBody);
try {
Response response = restClient.performRequest(request);
if (response == null) {
throw new BulkElasticsearchException("bulk es Response is null");
}
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
ObjectMapper objectMapper = new ObjectMapper();
String entity = EntityUtils.toString(response.getEntity());
JsonNode json = objectMapper.readTree(entity);
int took = json.get("took").asInt();
boolean errors = json.get("errors").asBoolean();
return new BulkResponse(errors, took, entity);
} else {
throw new BulkElasticsearchException(String.format("bulk es response status code=%d,request boy=%s", response.getStatusLine().getStatusCode(), requestBody));
}
} catch (IOException e) {
throw new BulkElasticsearchException(String.format("bulk es error,request boy=%s", requestBody), e);
}
}

/**
* @return version.number, example:2.0.0
*/
public static String getClusterVersion() {
Request request = new Request("GET", "/");
try {
Response response = restClient.performRequest(request);
String result = EntityUtils.toString(response.getEntity());
ObjectMapper objectMapper = new ObjectMapper();
JsonNode jsonNode = objectMapper.readTree(result);
JsonNode versionNode = jsonNode.get("version");
return versionNode.get("number").asText();
} catch (IOException e) {
throw new GetElasticsearchVersionException("fail to get elasticsearch version.", e);
}
}

public void close() throws IOException {
restClient.close();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.elasticsearch.config;

import org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.BulkConfig;

public class SinkConfig {

public static final String INDEX = "index";

public static final String INDEX_TYPE = "index_type";

public static final String USERNAME = "username";

public static final String PASSWORD = "password";

public static final String HOSTS = "hosts";

public static final String MAX_BATCH_SIZE = "max_batch_size";

public static final String MAX_RETRY_SIZE = "max_retry_size";

public static void setValue(org.apache.seatunnel.shade.com.typesafe.config.Config pluginConfig){
if(pluginConfig.hasPath(MAX_BATCH_SIZE)){
BulkConfig.MAX_BATCH_SIZE = pluginConfig.getInt(MAX_BATCH_SIZE);
}
if(pluginConfig.hasPath(MAX_RETRY_SIZE)){
BulkConfig.MAX_RETRY_SIZE = pluginConfig.getInt(MAX_RETRY_SIZE);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant;

import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig;

/**
* bulk es config
*/
public class BulkConfig {
/**
* once bulk es include max document size
* {@link SinkConfig#MAX_BATCH_SIZE}
*/
public static int MAX_BATCH_SIZE = 10;

/**
* the max retry size of bulk es
* {@link SinkConfig#MAX_RETRY_SIZE}
*/
public static int MAX_RETRY_SIZE = 3;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant;

public enum ElasticsearchVersion {
ES2(2), ES5(5), ES6(6), ES7(7), ES8(8);

private int version;

ElasticsearchVersion(int version) {
this.version = version;
}

public int getVersion() {
return version;
}

public void setVersion(int version) {
this.version = version;
}

public static ElasticsearchVersion get(int version) {
for (ElasticsearchVersion elasticsearchVersion : ElasticsearchVersion.values()) {
if (elasticsearchVersion.getVersion() == version) {
return elasticsearchVersion;
}
}
throw new IllegalArgumentException(String.format("version=%d,fail fo find ElasticsearchVersion.", version));
}

public static ElasticsearchVersion get(String clusterVersion) {
String[] versionArr = clusterVersion.split("\\.");
int version = Integer.parseInt(versionArr[0]);
return get(version);
}
}
Loading