Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Connector-V2][Elasticsearch] Support Elasticsearch source #2821

Merged
merged 26 commits into from
Nov 4, 2022

Conversation

iture123
Copy link
Contributor

close #2553

indexDocsCounts = indexDocsCounts.stream().filter(x -> x.getDocsCount() != null && x.getDocsCount() > 0)
.sorted(Comparator.comparingLong(IndexDocsCount::getDocsCount)).collect(Collectors.toList());
List<ElasticsearchSourceSplit> splits = new ArrayList<>();
int parallelism = enumeratorContext.currentParallelism();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EnumeratorContext.currentParallelism() is Runtime Method, Use it in SplitEnumerator#run()


private ElasticsearchContainer container;

@SuppressWarnings({"checkstyle:MagicNumber", "checkstyle:Indentation"})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@SuppressWarnings({"checkstyle:MagicNumber", "checkstyle:Indentation"})

IT will automatically ignore MagicNumber, please do not ignore the indentation.

Comment on lines 75 to 80
ScrollResult scrollResult = esRestClient.searchByScroll(sourceIndexInfo.getIndex(), sourceIndexInfo.getSource(), sourceIndexInfo.getScrollTime(), sourceIndexInfo.getScrollSize());
outputFromScrollResult(scrollResult, sourceIndexInfo.getSource(), output);
while (scrollResult.getDocs() != null && scrollResult.getDocs().size() > 0) {
scrollResult = esRestClient.searchWithScrollId(scrollResult.getScrollId(), sourceIndexInfo.getScrollTime());
outputFromScrollResult(scrollResult, sourceIndexInfo.getSource(), output);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ScrollResult scrollResult = esRestClient.searchByScroll(sourceIndexInfo.getIndex(), sourceIndexInfo.getSource(), sourceIndexInfo.getScrollTime(), sourceIndexInfo.getScrollSize());
outputFromScrollResult(scrollResult, sourceIndexInfo.getSource(), output);
while (scrollResult.getDocs() != null && scrollResult.getDocs().size() > 0) {
scrollResult = esRestClient.searchWithScrollId(scrollResult.getScrollId(), sourceIndexInfo.getScrollTime());
outputFromScrollResult(scrollResult, sourceIndexInfo.getSource(), output);
}
synchronized (output.getCheckpointLock()) {
ScrollResult scrollResult = esRestClient.searchByScroll(sourceIndexInfo.getIndex(), sourceIndexInfo.getSource(), sourceIndexInfo.getScrollTime(), sourceIndexInfo.getScrollSize());
outputFromScrollResult(scrollResult, sourceIndexInfo.getSource(), output);
while (scrollResult.getDocs() != null && scrollResult.getDocs().size() > 0) {
scrollResult = esRestClient.searchWithScrollId(scrollResult.getScrollId(), sourceIndexInfo.getScrollTime());
outputFromScrollResult(scrollResult, sourceIndexInfo.getSource(), output);
}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

import java.io.Serializable;
import java.util.List;

public class SourceIndexInfo implements Serializable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public class SourceIndexInfo implements Serializable {
@Data
@AllArgsConstructor
public class SourceIndexInfo implements Serializable {

Please use lombok.

Maximum number of hits to be returned with each Elasticsearch scroll request.

## Examples
simple
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add complex example

@@ -45,8 +45,7 @@ public String getPluginName() {
}

@Override
public void prepare(org.apache.seatunnel.shade.com.typesafe.config.Config pluginConfig) throws
PrepareFailException {
public void prepare(org.apache.seatunnel.shade.com.typesafe.config.Config pluginConfig) throws PrepareFailException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename org.apache.seatunnel.shade.com.typesafe.config.Config to Config

SeaTunnelRow seaTunnelRow = new SeaTunnelRow(sourceSize);
for (int i = 0; i < sourceSize; i++) {
Object value = doc.get(source.get(i));
seaTunnelRow.setField(i, String.valueOf(value));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

String.valueOf(value) Convert all field to string ?

you can mapping elasticsearch datatype to seatunnel datatype ?

reference
https://github.com/apache/incubator-seatunnel/tree/dev/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/serialize

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

@Test
public void testElasticsearchSourceToConsoleSink() throws IOException, InterruptedException {
Container.ExecResult execResult = executeSeaTunnelFlinkJob("/elasticsearch/elasticsearch_to_console.conf");
Assertions.assertEquals(0, execResult.getExitCode());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ashulin ashulin changed the title [Feature][Connector-V2]new connecotor of Elasticsearch source [Feature][Connector-V2][Elasticsearch] Support Elasticsearch source Sep 28, 2022
## Key features

- [x] [batch](../../concept/connector-v2-features.md)
- [x] [stream](../../concept/connector-v2-features.md)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

support stream read?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't support.I will revise.


- [x] [batch](../../concept/connector-v2-features.md)
- [x] [stream](../../concept/connector-v2-features.md)
- [x] [exactly-once](../../concept/connector-v2-features.md)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to restore split(exactly-once)?

reference restore split
#2917

Comment on lines 47 to 52
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>${jackson-datatype-jsr310.version}</version>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-api</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -43,43 +44,31 @@
/**
* ElasticsearchSinkWriter is a sink writer that will write {@link SeaTunnelRow} to Elasticsearch.
*/
@Slf4j
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why delete this?

Comment on lines 78 to 88
ElasticsearchSourceSplit split = splits.poll();
if (null != split) {
SourceIndexInfo sourceIndexInfo = split.getSourceIndexInfo();
synchronized (output.getCheckpointLock()) {
ScrollResult scrollResult = esRestClient.searchByScroll(sourceIndexInfo.getIndex(), sourceIndexInfo.getSource(), sourceIndexInfo.getScrollTime(), sourceIndexInfo.getScrollSize());
outputFromScrollResult(scrollResult, sourceIndexInfo.getSource(), output);
while (scrollResult.getDocs() != null && scrollResult.getDocs().size() > 0) {
scrollResult = esRestClient.searchWithScrollId(scrollResult.getScrollId(), sourceIndexInfo.getScrollTime());
outputFromScrollResult(scrollResult, sourceIndexInfo.getSource(), output);
}
}
Copy link
Member

@hailin0 hailin0 Oct 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move queue.poll into synchronized code block

Suggested change
ElasticsearchSourceSplit split = splits.poll();
if (null != split) {
SourceIndexInfo sourceIndexInfo = split.getSourceIndexInfo();
synchronized (output.getCheckpointLock()) {
ScrollResult scrollResult = esRestClient.searchByScroll(sourceIndexInfo.getIndex(), sourceIndexInfo.getSource(), sourceIndexInfo.getScrollTime(), sourceIndexInfo.getScrollSize());
outputFromScrollResult(scrollResult, sourceIndexInfo.getSource(), output);
while (scrollResult.getDocs() != null && scrollResult.getDocs().size() > 0) {
scrollResult = esRestClient.searchWithScrollId(scrollResult.getScrollId(), sourceIndexInfo.getScrollTime());
outputFromScrollResult(scrollResult, sourceIndexInfo.getSource(), output);
}
}
if (!splits.isEmpty()) {
synchronized (output.getCheckpointLock()) {
ElasticsearchSourceSplit split = splits.poll();
SourceIndexInfo sourceIndexInfo = split.getSourceIndexInfo();
ScrollResult scrollResult = esRestClient.searchByScroll(sourceIndexInfo.getIndex(), sourceIndexInfo.getSource(), sourceIndexInfo.getScrollTime(), sourceIndexInfo.getScrollSize());
outputFromScrollResult(scrollResult, sourceIndexInfo.getSource(), output);
while (scrollResult.getDocs() != null && scrollResult.getDocs().size() > 0) {
scrollResult = esRestClient.searchWithScrollId(scrollResult.getScrollId(), sourceIndexInfo.getScrollTime());
outputFromScrollResult(scrollResult, sourceIndexInfo.getSource(), output);
}
}

Comment on lines 34 to 46
public ElasticsearchSourceSplit(String splitId, SourceIndexInfo sourceIndexInfo) {
this.splitId = splitId;
this.sourceIndexInfo = sourceIndexInfo;
}

public SourceIndexInfo getSourceIndexInfo() {
return sourceIndexInfo;
}

@Override
public String splitId() {
return splitId;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use lombok annotation

Comment on lines 158 to 160
public void addSplitsBack(List<ElasticsearchSourceSplit> splits, int subtaskId) {

}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public void addSplitsBack(List<ElasticsearchSourceSplit> splits, int subtaskId) {
}
public void addSplitsBack(List<ElasticsearchSourceSplit> splits, int subtaskId) {
if (!splits.isEmpty()) {
addPendingSplit(splits);
assignSplit(Collections.singletonList(subtaskId));
}
}

private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchIT.class);

@BeforeEach
public void startMongoContainer() throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public void startMongoContainer() throws Exception {
public void startElasticsearchContainer() throws Exception {

}

private List<String> generateTestDataSet() throws JsonProcessingException, UnknownHostException {
String[] fiels = new String[]{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
String[] fiels = new String[]{
String[] fields = new String[]{

private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchIT.class);

@BeforeEach
public void startMongoContainer() throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public void startMongoContainer() throws Exception {
public void startElasticsearchContainer() throws Exception {

container = new ElasticsearchContainer(DockerImageName.parse("elasticsearch:6.8.23").asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch"))
.withNetwork(NETWORK)
.withNetworkAliases("elasticsearch")
.withLogConsumer(new Slf4jLogConsumer(LOGGER));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.withLogConsumer(new Slf4jLogConsumer(LOGGER));
.withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger("elasticsearch:6.8.23")));

}

private List<String> generateTestDataSet() throws JsonProcessingException, UnknownHostException {
String[] fiels = new String[]{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
String[] fiels = new String[]{
String[] fields = new String[]{

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All your suggestions is improved.Thinks.

Copy link
Member

@hailin0 hailin0 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member

@Hisoka-X Hisoka-X left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

}
}
}
```
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@EricJoy2048 EricJoy2048 merged commit ded5481 into apache:dev Nov 4, 2022
@EricJoy2048
Copy link
Member

I will merge this pr and please add I will add a pr to change log reference https://github.com/apache/incubator-seatunnel/blob/dev/docs/en/connector-v2/source/SftpFile.md

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Connector-V2][Elasticsearch-connector] new connecotor of Elasticsearch source
5 participants