From 5da843e4709becd1cf34eabb2b8494eb0f7b8f20 Mon Sep 17 00:00:00 2001 From: Dat Tran Date: Mon, 3 Sep 2018 21:14:13 -0700 Subject: [PATCH] [BEAM-5107] Add support for ES-6.x to ElasticsearchIO --- .../io/elasticsearch/ElasticsearchIOIT.java | 28 +-- .../io/elasticsearch/ElasticsearchIOTest.java | 42 +--- .../src/test/contrib/create_elk_container.sh | 4 +- .../io/elasticsearch/ElasticsearchIOIT.java | 26 +-- .../io/elasticsearch/ElasticsearchIOTest.java | 40 +--- .../elasticsearch-tests-6/build.gradle | 66 ++++++ .../src/test/contrib/create_elk_container.sh | 24 +++ .../io/elasticsearch/ElasticsearchIOIT.java | 144 +++++++++++++ .../io/elasticsearch/ElasticsearchIOTest.java | 199 ++++++++++++++++++ .../org/elasticsearch/bootstrap/JarHell.java | 36 ++++ .../elasticsearch-tests-common/build.gradle | 2 +- .../ElasticsearchIOTestCommon.java | 49 ++++- sdks/java/io/elasticsearch/build.gradle | 2 +- .../sdk/io/elasticsearch/ElasticsearchIO.java | 47 ++++- sdks/java/javadoc/build.gradle | 1 + settings.gradle | 2 + 16 files changed, 565 insertions(+), 147 deletions(-) create mode 100644 sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/build.gradle create mode 100755 sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/contrib/create_elk_container.sh create mode 100644 sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java create mode 100644 sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java create mode 100644 sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/elasticsearch/bootstrap/JarHell.java diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java index 258121874d3..9eccb7f0982 100644 --- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java @@ -17,17 +17,10 @@ */ package org.apache.beam.sdk.io.elasticsearch; -import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.BoundedElasticsearchSource; import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration; -import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Read; -import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource; -import static org.junit.Assert.assertEquals; -import java.util.List; -import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOITCommon.ElasticsearchPipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.testing.SourceTestUtils; import org.apache.beam.sdk.testing.TestPipeline; import org.elasticsearch.client.RestClient; import org.junit.AfterClass; @@ -93,26 +86,7 @@ public static void afterClass() throws Exception { @Test public void testSplitsVolume() throws Exception { - Read read = ElasticsearchIO.read().withConnectionConfiguration(readConnectionConfiguration); - BoundedElasticsearchSource initialSource = - new BoundedElasticsearchSource(read, null, null, null); - // desiredBundleSize is ignored because in ES 2.x there is no way to split shards. So we get - // as many bundles as ES shards and bundle size is shard size - long desiredBundleSizeBytes = 0; - List> splits = - initialSource.split(desiredBundleSizeBytes, options); - SourceTestUtils.assertSourcesEqualReferenceSource(initialSource, splits, options); - // this is the number of ES shards - // (By default, each index in Elasticsearch is allocated 5 primary shards) - long expectedNumSplits = 5; - assertEquals(expectedNumSplits, splits.size()); - int nonEmptySplits = 0; - for (BoundedSource subSource : splits) { - if (readFromSource(subSource, options).size() > 0) { - nonEmptySplits += 1; - } - } - assertEquals(expectedNumSplits, nonEmptySplits); + elasticsearchIOTestCommon.testSplit(0); } @Test diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java index 9920dde53ba..52609e54d40 100644 --- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java @@ -17,26 +17,13 @@ */ package org.apache.beam.sdk.io.elasticsearch; -import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.BoundedElasticsearchSource; import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration; -import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Read; -import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ACCEPTABLE_EMPTY_SPLITS_PERCENTAGE; import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ES_TYPE; -import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.NUM_DOCS_UTESTS; import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.getEsIndex; -import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource; -import static org.hamcrest.Matchers.lessThan; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; import java.io.IOException; import java.io.Serializable; import java.net.ServerSocket; -import java.util.List; -import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.testing.SourceTestUtils; import org.apache.beam.sdk.testing.TestPipeline; import org.elasticsearch.client.RestClient; import org.elasticsearch.common.settings.Settings; @@ -173,32 +160,7 @@ public void testWriteWithMaxBatchSizeBytes() throws Exception { @Test public void testSplit() throws Exception { - ElasticSearchIOTestUtils.insertTestDocuments( - connectionConfiguration, NUM_DOCS_UTESTS, restClient); - PipelineOptions options = PipelineOptionsFactory.create(); - Read read = ElasticsearchIO.read().withConnectionConfiguration(connectionConfiguration); - BoundedElasticsearchSource initialSource = - new BoundedElasticsearchSource(read, null, null, null); - //desiredBundleSize is ignored because in ES 2.x there is no way to split shards. So we get - // as many bundles as ES shards and bundle size is shard size - int desiredBundleSizeBytes = 0; - List> splits = - initialSource.split(desiredBundleSizeBytes, options); - SourceTestUtils.assertSourcesEqualReferenceSource(initialSource, splits, options); - //this is the number of ES shards - // (By default, each index in Elasticsearch is allocated 5 primary shards) - int expectedNumSources = 5; - assertEquals("Wrong number of splits", expectedNumSources, splits.size()); - int emptySplits = 0; - for (BoundedSource subSource : splits) { - if (readFromSource(subSource, options).isEmpty()) { - emptySplits += 1; - } - } - assertThat( - "There are too many empty splits, parallelism is sub-optimal", - emptySplits, - lessThan((int) (ACCEPTABLE_EMPTY_SPLITS_PERCENTAGE * splits.size()))); + elasticsearchIOTestCommon.testSplit(0); } @Test @@ -216,7 +178,7 @@ public void testWriteWithIndexFn() throws Exception { @Test public void testWriteWithTypeFn() throws Exception { elasticsearchIOTestCommon.setPipeline(pipeline); - elasticsearchIOTestCommon.testWriteWithTypeFn(); + elasticsearchIOTestCommon.testWriteWithTypeFn2x5x(); } @Test diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/contrib/create_elk_container.sh b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/contrib/create_elk_container.sh index 48f6064cd07..715cb6d42a2 100755 --- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/contrib/create_elk_container.sh +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/contrib/create_elk_container.sh @@ -18,7 +18,7 @@ # ################################################################################ -#Create an ELK (Elasticsearch Logstash Kibana) container for ES v2.4 and compatible Logstash and Kibana versions, +#Create an ELK (Elasticsearch Logstash Kibana) container for ES v5.4.3 and compatible Logstash and Kibana versions, #bind then on host ports, allow shell access to container and mount current directory on /home/$USER inside the container -docker create -p 5601:5601 -p 9200:9200 -p 5044:5044 -p 5000:5000 -p 9300:9300 -it -v $(pwd):/home/$USER/ --name elk-2.4 sebp/elk:es240_l240_k460 +docker create -p 5601:5601 -p 9200:9200 -p 5044:5044 -p 5000:5000 -p 9300:9300 -it -v $(pwd):/home/$USER/ --name elk-5.4.3 sebp/elk:543 diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java index b38b1696141..c3eb9d11ad4 100644 --- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java @@ -17,17 +17,10 @@ */ package org.apache.beam.sdk.io.elasticsearch; -import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.BoundedElasticsearchSource; import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration; -import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Read; -import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource; -import static org.junit.Assert.assertEquals; -import java.util.List; -import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOITCommon.ElasticsearchPipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.testing.SourceTestUtils; import org.apache.beam.sdk.testing.TestPipeline; import org.elasticsearch.client.RestClient; import org.junit.AfterClass; @@ -93,24 +86,7 @@ public static void afterClass() throws Exception { @Test public void testSplitsVolume() throws Exception { - Read read = ElasticsearchIO.read().withConnectionConfiguration(readConnectionConfiguration); - BoundedElasticsearchSource initialSource = - new BoundedElasticsearchSource(read, null, null, null); - int desiredBundleSizeBytes = 10000; - List> splits = - initialSource.split(desiredBundleSizeBytes, options); - SourceTestUtils.assertSourcesEqualReferenceSource(initialSource, splits, options); - long indexSize = BoundedElasticsearchSource.estimateIndexSize(readConnectionConfiguration); - float expectedNumSourcesFloat = (float) indexSize / desiredBundleSizeBytes; - int expectedNumSources = (int) Math.ceil(expectedNumSourcesFloat); - assertEquals(expectedNumSources, splits.size()); - int nonEmptySplits = 0; - for (BoundedSource subSource : splits) { - if (readFromSource(subSource, options).size() > 0) { - nonEmptySplits += 1; - } - } - assertEquals("Wrong number of empty splits", expectedNumSources, nonEmptySplits); + elasticsearchIOTestCommon.testSplit(10_000); } @Test diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java index b453b9f4740..2414f8f44da 100644 --- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java @@ -17,15 +17,9 @@ */ package org.apache.beam.sdk.io.elasticsearch; -import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.BoundedElasticsearchSource; import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration; -import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Read; -import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ACCEPTABLE_EMPTY_SPLITS_PERCENTAGE; import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ES_TYPE; -import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.NUM_DOCS_UTESTS; import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.getEsIndex; -import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource; -import static org.hamcrest.Matchers.lessThan; import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope; import java.io.IOException; @@ -33,11 +27,6 @@ import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collection; -import java.util.List; -import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.testing.SourceTestUtils; import org.apache.beam.sdk.testing.TestPipeline; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.plugins.Plugin; @@ -160,33 +149,10 @@ public void testWriteWithMaxBatchSizeBytes() throws Exception { @Test public void testSplit() throws Exception { - //need to create the index using the helper method (not create it at first insertion) + // need to create the index using the helper method (not create it at first insertion) // for the indexSettings() to be run createIndex(getEsIndex()); - ElasticSearchIOTestUtils.insertTestDocuments( - connectionConfiguration, NUM_DOCS_UTESTS, getRestClient()); - PipelineOptions options = PipelineOptionsFactory.create(); - Read read = ElasticsearchIO.read().withConnectionConfiguration(connectionConfiguration); - BoundedElasticsearchSource initialSource = - new BoundedElasticsearchSource(read, null, null, null); - int desiredBundleSizeBytes = 2000; - List> splits = - initialSource.split(desiredBundleSizeBytes, options); - SourceTestUtils.assertSourcesEqualReferenceSource(initialSource, splits, options); - long indexSize = BoundedElasticsearchSource.estimateIndexSize(connectionConfiguration); - float expectedNumSourcesFloat = (float) indexSize / desiredBundleSizeBytes; - int expectedNumSources = (int) Math.ceil(expectedNumSourcesFloat); - assertEquals("Wrong number of splits", expectedNumSources, splits.size()); - int emptySplits = 0; - for (BoundedSource subSource : splits) { - if (readFromSource(subSource, options).isEmpty()) { - emptySplits += 1; - } - } - assertThat( - "There are too many empty splits, parallelism is sub-optimal", - emptySplits, - lessThan((int) (ACCEPTABLE_EMPTY_SPLITS_PERCENTAGE * splits.size()))); + elasticsearchIOTestCommon.testSplit(2_000); } @Test @@ -204,7 +170,7 @@ public void testWriteWithIndexFn() throws Exception { @Test public void testWriteWithTypeFn() throws Exception { elasticsearchIOTestCommon.setPipeline(pipeline); - elasticsearchIOTestCommon.testWriteWithTypeFn(); + elasticsearchIOTestCommon.testWriteWithTypeFn2x5x(); } @Test diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/build.gradle b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/build.gradle new file mode 100644 index 00000000000..9675e4bf458 --- /dev/null +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/build.gradle @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +apply plugin: org.apache.beam.gradle.BeamModulePlugin +applyJavaNature() +provideIntegrationTestingDependencies() +enableJavaPerformanceTesting() + +description = "Apache Beam :: SDKs :: Java :: IO :: Elasticsearch-Tests :: 6.x" +ext.summary = "Tests of ElasticsearchIO on Elasticsearch 6.x" + +test { + // needed for ESIntegTestCase + systemProperty "tests.security.manager", "false" +} + +def jna_version = "4.1.0" +def log4j_version = "2.6.2" +def elastic_search_version = "6.4.0" + +configurations.all { + resolutionStrategy { + // Make sure the log4j versions for api and core match instead of taking the default + // Gradle rule of using the latest. + force "org.apache.logging.log4j:log4j-api:$log4j_version" + force "org.apache.logging.log4j:log4j-core:$log4j_version" + } +} + +dependencies { + testCompile project(path: ":beam-sdks-java-io-elasticsearch-tests-common", configuration: "shadowTest") + testCompile "org.elasticsearch.test:framework:$elastic_search_version" + testCompile "org.elasticsearch.plugin:transport-netty4-client:$elastic_search_version" + testCompile "com.carrotsearch.randomizedtesting:randomizedtesting-runner:2.5.2" + testCompile "org.elasticsearch:elasticsearch:$elastic_search_version" + + testCompile project(path: ":beam-sdks-java-core", configuration: "shadow") + testCompile project(path: ":beam-sdks-java-io-elasticsearch", configuration: "shadow") + testCompile project(path: ":beam-sdks-java-io-common", configuration: "shadowTest") + testCompile project(path: ":beam-runners-direct-java", configuration: "shadow") + testCompile "org.apache.logging.log4j:log4j-core:$log4j_version" + testCompile "org.apache.logging.log4j:log4j-api:$log4j_version" + testCompile library.java.slf4j_api + testCompile "net.java.dev.jna:jna:$jna_version" + testCompile library.java.hamcrest_core + testCompile library.java.hamcrest_library + testCompile library.java.slf4j_jdk14 + testCompile library.java.commons_io_1x + testCompile library.java.junit + testCompile "org.elasticsearch.client:elasticsearch-rest-client:$elastic_search_version" +} diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/contrib/create_elk_container.sh b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/contrib/create_elk_container.sh new file mode 100755 index 00000000000..f29ad7060b5 --- /dev/null +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/contrib/create_elk_container.sh @@ -0,0 +1,24 @@ +#!/bin/sh +################################################################################ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +################################################################################ + +#Create an ELK (Elasticsearch Logstash Kibana) container for ES v6.4.0 and compatible Logstash and Kibana versions, +#bind then on host ports, allow shell access to container and mount current directory on /home/$USER inside the container + +docker create -p 5601:5601 -p 9200:9200 -p 5044:5044 -p 5000:5000 -p 9300:9300 -it -v $(pwd):/home/$USER/ --name elk-6.4.0 sebp/elk:640 diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java new file mode 100644 index 00000000000..6621d7be89e --- /dev/null +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.elasticsearch; + +import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration; + +import org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOITCommon.ElasticsearchPipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.TestPipeline; +import org.elasticsearch.client.RestClient; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +/** + * A test of {@link ElasticsearchIO} on an independent Elasticsearch v5.x instance. + * + *

This test requires a running instance of Elasticsearch, and the test dataset must exist in the + * database. See {@link ElasticsearchIOITCommon} for instructions to achieve this. + * + *

You can run this test by doing the following from the beam parent module directory with the + * correct server IP: + * + *

+ *  ./gradlew integrationTest -p sdks/java/io/elasticsearch-tests/elasticsearch-tests-6
+ *  -DintegrationTestPipelineOptions='[
+ *  "--elasticsearchServer=1.2.3.4",
+ *  "--elasticsearchHttpPort=9200"]'
+ *  --tests org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOIT
+ *  -DintegrationTestRunner=direct
+ * 
+ * + *

It is likely that you will need to configure thread_pool.bulk.queue_size: 250 (or + * higher) in the backend Elasticsearch server for this test to run. + */ +public class ElasticsearchIOIT { + private static RestClient restClient; + private static ElasticsearchPipelineOptions options; + private static ConnectionConfiguration readConnectionConfiguration; + private static ConnectionConfiguration writeConnectionConfiguration; + private static ConnectionConfiguration updateConnectionConfiguration; + private static ElasticsearchIOTestCommon elasticsearchIOTestCommon; + + @Rule public TestPipeline pipeline = TestPipeline.create(); + + @BeforeClass + public static void beforeClass() throws Exception { + PipelineOptionsFactory.register(ElasticsearchPipelineOptions.class); + options = TestPipeline.testingPipelineOptions().as(ElasticsearchPipelineOptions.class); + readConnectionConfiguration = + ElasticsearchIOITCommon.getConnectionConfiguration( + options, ElasticsearchIOITCommon.IndexMode.READ); + writeConnectionConfiguration = + ElasticsearchIOITCommon.getConnectionConfiguration( + options, ElasticsearchIOITCommon.IndexMode.WRITE); + updateConnectionConfiguration = + ElasticsearchIOITCommon.getConnectionConfiguration( + options, ElasticsearchIOITCommon.IndexMode.WRITE_PARTIAL); + restClient = readConnectionConfiguration.createClient(); + elasticsearchIOTestCommon = + new ElasticsearchIOTestCommon(readConnectionConfiguration, restClient, true); + } + + @AfterClass + public static void afterClass() throws Exception { + ElasticSearchIOTestUtils.deleteIndex(writeConnectionConfiguration, restClient); + ElasticSearchIOTestUtils.deleteIndex(updateConnectionConfiguration, restClient); + restClient.close(); + } + + @Test + public void testSplitsVolume() throws Exception { + elasticsearchIOTestCommon.testSplit(10_000); + } + + @Test + public void testReadVolume() throws Exception { + elasticsearchIOTestCommon.setPipeline(pipeline); + elasticsearchIOTestCommon.testRead(); + } + + @Test + public void testWriteVolume() throws Exception { + // cannot share elasticsearchIOTestCommon because tests run in parallel. + ElasticsearchIOTestCommon elasticsearchIOTestCommonWrite = + new ElasticsearchIOTestCommon(writeConnectionConfiguration, restClient, true); + elasticsearchIOTestCommonWrite.setPipeline(pipeline); + elasticsearchIOTestCommonWrite.testWrite(); + } + + @Test + public void testSizesVolume() throws Exception { + elasticsearchIOTestCommon.testSizes(); + } + + /** + * This test verifies volume loading of Elasticsearch using explicit document IDs and routed to an + * index named the same as the scientist, and type which is based on the modulo 2 of the scientist + * name. The goal of this IT is to help observe and verify that the overhead of adding the + * functions to parse the document and extract the ID is acceptable. + */ + @Test + public void testWriteWithFullAddressingVolume() throws Exception { + // cannot share elasticsearchIOTestCommon because tests run in parallel. + ElasticsearchIOTestCommon elasticsearchIOTestCommonWrite = + new ElasticsearchIOTestCommon(writeConnectionConfiguration, restClient, true); + elasticsearchIOTestCommonWrite.setPipeline(pipeline); + elasticsearchIOTestCommonWrite.testWriteWithFullAddressing(); + } + + /** + * This test verifies volume partial updates of Elasticsearch. The test dataset index is cloned + * and then a new field is added to each document using a partial update. The test then asserts + * the updates where appied. + */ + @Test + public void testWritePartialUpdate() throws Exception { + ElasticSearchIOTestUtils.copyIndex( + restClient, + readConnectionConfiguration.getIndex(), + updateConnectionConfiguration.getIndex()); + // cannot share elasticsearchIOTestCommon because tests run in parallel. + ElasticsearchIOTestCommon elasticsearchIOTestCommonUpdate = + new ElasticsearchIOTestCommon(updateConnectionConfiguration, restClient, true); + elasticsearchIOTestCommonUpdate.setPipeline(pipeline); + elasticsearchIOTestCommonUpdate.testWritePartialUpdate(); + } +} diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java new file mode 100644 index 00000000000..12b2c5af3c7 --- /dev/null +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.elasticsearch; + +import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration; +import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ES_TYPE; +import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.getEsIndex; + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope; +import java.io.IOException; +import java.io.Serializable; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collection; +import org.apache.beam.sdk.testing.TestPipeline; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.transport.Netty4Plugin; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +/* +Cannot use @RunWith(JUnit4.class) with ESIntegTestCase +Cannot have @BeforeClass @AfterClass with ESIntegTestCase +*/ + +/** Tests for {@link ElasticsearchIO} version 6. */ +@ThreadLeakScope(ThreadLeakScope.Scope.NONE) +public class ElasticsearchIOTest extends ESIntegTestCase implements Serializable { + + private ElasticsearchIOTestCommon elasticsearchIOTestCommon; + private ConnectionConfiguration connectionConfiguration; + + private String[] fillAddresses() { + ArrayList result = new ArrayList<>(); + for (InetSocketAddress address : cluster().httpAddresses()) { + result.add(String.format("http://%s:%s", address.getHostString(), address.getPort())); + } + return result.toArray(new String[result.size()]); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + System.setProperty("es.set.netty.runtime.available.processors", "false"); + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put("http.enabled", "true") + // had problems with some jdk, embedded ES was too slow for bulk insertion, + // and queue of 50 was full. No pb with real ES instance (cf testWrite integration test) + .put("thread_pool.bulk.queue_size", 400) + .build(); + } + + @Override + public Settings indexSettings() { + return Settings.builder() + .put(super.indexSettings()) + //useful to have updated sizes for getEstimatedSize + .put("index.store.stats_refresh_interval", 0) + .build(); + } + + @Override + protected Collection> nodePlugins() { + ArrayList> plugins = new ArrayList<>(); + plugins.add(Netty4Plugin.class); + return plugins; + } + + @Before + public void setup() { + if (connectionConfiguration == null) { + connectionConfiguration = + ConnectionConfiguration.create(fillAddresses(), getEsIndex(), ES_TYPE); + elasticsearchIOTestCommon = + new ElasticsearchIOTestCommon(connectionConfiguration, getRestClient(), false); + } + } + + @Rule public TestPipeline pipeline = TestPipeline.create(); + + @Test + public void testSizes() throws Exception { + // need to create the index using the helper method (not create it at first insertion) + // for the indexSettings() to be run + createIndex(getEsIndex()); + elasticsearchIOTestCommon.testSizes(); + } + + @Test + public void testRead() throws Exception { + // need to create the index using the helper method (not create it at first insertion) + // for the indexSettings() to be run + createIndex(getEsIndex()); + elasticsearchIOTestCommon.setPipeline(pipeline); + elasticsearchIOTestCommon.testRead(); + } + + @Test + public void testReadWithQuery() throws Exception { + // need to create the index using the helper method (not create it at first insertion) + // for the indexSettings() to be run + createIndex(getEsIndex()); + elasticsearchIOTestCommon.setPipeline(pipeline); + elasticsearchIOTestCommon.testReadWithQuery(); + } + + @Test + public void testWrite() throws Exception { + elasticsearchIOTestCommon.setPipeline(pipeline); + elasticsearchIOTestCommon.testWrite(); + } + + @Rule public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testWriteWithErrors() throws Exception { + elasticsearchIOTestCommon.setExpectedException(expectedException); + elasticsearchIOTestCommon.testWriteWithErrors(); + } + + @Test + public void testWriteWithMaxBatchSize() throws Exception { + elasticsearchIOTestCommon.testWriteWithMaxBatchSize(); + } + + @Test + public void testWriteWithMaxBatchSizeBytes() throws Exception { + elasticsearchIOTestCommon.testWriteWithMaxBatchSizeBytes(); + } + + @Test + public void testSplit() throws Exception { + // need to create the index using the helper method (not create it at first insertion) + // for the indexSettings() to be run + createIndex(getEsIndex()); + elasticsearchIOTestCommon.testSplit(2_000); + } + + @Test + public void testWriteWithIdFn() throws Exception { + elasticsearchIOTestCommon.setPipeline(pipeline); + elasticsearchIOTestCommon.testWriteWithIdFn(); + } + + @Test + public void testWriteWithIndexFn() throws Exception { + elasticsearchIOTestCommon.setPipeline(pipeline); + elasticsearchIOTestCommon.testWriteWithIndexFn(); + } + + @Test + public void testWriteFullAddressing() throws Exception { + elasticsearchIOTestCommon.setPipeline(pipeline); + elasticsearchIOTestCommon.testWriteWithFullAddressing(); + } + + @Test + public void testWritePartialUpdate() throws Exception { + elasticsearchIOTestCommon.setPipeline(pipeline); + elasticsearchIOTestCommon.testWritePartialUpdate(); + } + + @Test + public void testReadWithMetadata() throws Exception { + elasticsearchIOTestCommon.setPipeline(pipeline); + elasticsearchIOTestCommon.testReadWithMetadata(); + } + + @Test + public void testDefaultRetryPredicate() throws IOException { + elasticsearchIOTestCommon.testDefaultRetryPredicate(getRestClient()); + } + + @Test + public void testWriteRetry() throws Throwable { + elasticsearchIOTestCommon.setExpectedException(expectedException); + elasticsearchIOTestCommon.setPipeline(pipeline); + elasticsearchIOTestCommon.testWriteRetry(); + } +} diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/elasticsearch/bootstrap/JarHell.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/elasticsearch/bootstrap/JarHell.java new file mode 100644 index 00000000000..be74371dc2c --- /dev/null +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/elasticsearch/bootstrap/JarHell.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.elasticsearch.bootstrap; + +import java.util.function.Consumer; + +/** + * We need a real Elasticsearch instance to properly test the IO (split, slice API, scroll API, + * ...). Starting at ES 5, to have Elasticsearch embedded, we are forced to use Elasticsearch test + * framework. But this framework checks for class duplicates in classpath and it cannot be + * deactivated. When the class duplication come from a dependency, then it cannot be avoided. + * Elasticsearch community does not provide a way of deactivating the jar hell test, so skip it by + * making this hack. In this case duplicate class is class: + * org.apache.maven.surefire.report.SafeThrowable jar1: surefire-api-2.20.jar jar2: + * surefire-junit47-2.20.jar + */ +class JarHell { + + @SuppressWarnings("EmptyMethod") + public static void checkJarHell(Consumer output) {} +} diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/build.gradle b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/build.gradle index bf4fac8d5ac..941c526296d 100644 --- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/build.gradle +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/build.gradle @@ -45,5 +45,5 @@ dependencies { testCompile library.java.slf4j_jdk14 testCompile library.java.commons_io_1x testCompile library.java.junit - testCompile "org.elasticsearch.client:elasticsearch-rest-client:5.6.3" + testCompile "org.elasticsearch.client:elasticsearch-rest-client:6.4.0" } diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java index 5a8ad788226..345805e22a0 100644 --- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java @@ -27,7 +27,9 @@ import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Read; import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.RetryConfiguration.DEFAULT_RETRY_PREDICATE; import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Write; +import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.core.Is.isA; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -43,11 +45,13 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.RetryConfiguration.DefaultRetryPredicate; import org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.RetryConfiguration.RetryPredicate; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.SourceTestUtils; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; @@ -120,6 +124,43 @@ void setExpectedException(ExpectedException expectedException) { this.expectedException = expectedException; } + void testSplit(final int desiredBundleSizeBytes) throws Exception { + if (!useAsITests) { + ElasticSearchIOTestUtils.insertTestDocuments(connectionConfiguration, numDocs, restClient); + } + PipelineOptions options = PipelineOptionsFactory.create(); + Read read = ElasticsearchIO.read().withConnectionConfiguration(connectionConfiguration); + BoundedElasticsearchSource initialSource = + new BoundedElasticsearchSource(read, null, null, null); + List> splits = + initialSource.split(desiredBundleSizeBytes, options); + SourceTestUtils.assertSourcesEqualReferenceSource(initialSource, splits, options); + long indexSize = BoundedElasticsearchSource.estimateIndexSize(connectionConfiguration); + + int expectedNumSources; + if (desiredBundleSizeBytes == 0) { + // desiredBundleSize is ignored because in ES 2.x there is no way to split shards. + // 5 is the number of ES shards + // (By default, each index in Elasticsearch is allocated 5 primary shards) + expectedNumSources = 5; + } else { + float expectedNumSourcesFloat = (float) indexSize / desiredBundleSizeBytes; + expectedNumSources = (int) Math.ceil(expectedNumSourcesFloat); + } + assertEquals("Wrong number of splits", expectedNumSources, splits.size()); + + int emptySplits = 0; + for (BoundedSource subSource : splits) { + if (readFromSource(subSource, options).isEmpty()) { + emptySplits += 1; + } + } + assertThat( + "There are too many empty splits, parallelism is sub-optimal", + emptySplits, + lessThan((int) (ACCEPTABLE_EMPTY_SPLITS_PERCENTAGE * splits.size()))); + } + void testSizes() throws Exception { if (!useAsITests) { ElasticSearchIOTestUtils.insertTestDocuments(connectionConfiguration, numDocs, restClient); @@ -162,8 +203,7 @@ void testReadWithQuery() throws Exception { + " \"query\": {\n" + " \"match\" : {\n" + " \"scientist\" : {\n" - + " \"query\" : \"Einstein\",\n" - + " \"type\" : \"boolean\"\n" + + " \"query\" : \"Einstein\"\n" + " }\n" + " }\n" + " }\n" @@ -418,8 +458,11 @@ public String apply(JsonNode input) { * Tests that documents are dynamically routed to different types and not the type that is given * in the configuration. Documents should be routed to the a type of type_0 or type_1 using a * modulo approach of the explicit id. + * + *

This test does not work with ES 6 because ES 6 does not allow one mapping has more than 1 + * type */ - void testWriteWithTypeFn() throws Exception { + void testWriteWithTypeFn2x5x() throws Exception { // defensive coding: this test requires an even number of docs long adjustedNumDocs = (numDocs & 1) == 0 ? numDocs : numDocs + 1; diff --git a/sdks/java/io/elasticsearch/build.gradle b/sdks/java/io/elasticsearch/build.gradle index b6a06c882a8..8654dbdedf7 100644 --- a/sdks/java/io/elasticsearch/build.gradle +++ b/sdks/java/io/elasticsearch/build.gradle @@ -27,7 +27,7 @@ dependencies { shadow project(path: ":beam-sdks-java-core", configuration: "shadow") shadow library.java.jackson_databind shadow library.java.jackson_annotations - shadow "org.elasticsearch.client:elasticsearch-rest-client:5.6.3" + shadow "org.elasticsearch.client:elasticsearch-rest-client:6.4.0" shadow "org.apache.httpcomponents:httpasyncclient:4.1.4" shadow "org.apache.httpcomponents:httpcore-nio:4.4.10" shadow "org.apache.httpcomponents:httpcore:4.4.10" diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java index 30c93d1782a..9d8eaf268ab 100644 --- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java +++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java @@ -133,7 +133,9 @@ * *

Optionally, you can provide {@link ElasticsearchIO.Write.FieldValueExtractFn} using {@code * withIndexFn()} or {@code withTypeFn()} to enable per-document routing to the target Elasticsearch - * index and type. + * index (all versions) and type (version > 6). Support for type routing was removed in + * Elasticsearch 6 (see + * https://www.elastic.co/blog/index-type-parent-child-join-now-future-in-elasticsearch) * *

When {withUsePartialUpdate()} is enabled, the input document must contain an id field and * {@code withIdFn()} must be used to allow its extraction by the ElasticsearchIO. @@ -186,7 +188,7 @@ static void checkForErrors(Response response, int backendVersion) throws IOExcep String errorRootName = ""; if (backendVersion == 2) { errorRootName = "create"; - } else if (backendVersion == 5) { + } else if (backendVersion == 5 || backendVersion == 6) { errorRootName = "index"; } JsonNode errorRoot = item.path(errorRootName); @@ -277,6 +279,8 @@ public static ConnectionConfiguration create(String[] addresses, String index, S * If Elasticsearch authentication is enabled, provide the username. * * @param username the username used to authenticate to Elasticsearch + * @return a {@link ConnectionConfiguration} describes a connection configuration to + * Elasticsearch. */ public ConnectionConfiguration withUsername(String username) { checkArgument(username != null, "username can not be null"); @@ -288,6 +292,8 @@ public ConnectionConfiguration withUsername(String username) { * If Elasticsearch authentication is enabled, provide the password. * * @param password the password used to authenticate to Elasticsearch + * @return a {@link ConnectionConfiguration} describes a connection configuration to + * Elasticsearch. */ public ConnectionConfiguration withPassword(String password) { checkArgument(password != null, "password can not be null"); @@ -300,6 +306,8 @@ public ConnectionConfiguration withPassword(String password) { * containing the client key. * * @param keystorePath the location of the keystore containing the client key. + * @return a {@link ConnectionConfiguration} describes a connection configuration to + * Elasticsearch. */ public ConnectionConfiguration withKeystorePath(String keystorePath) { checkArgument(keystorePath != null, "keystorePath can not be null"); @@ -312,6 +320,8 @@ public ConnectionConfiguration withKeystorePath(String keystorePath) { * to open the client keystore. * * @param keystorePassword the password of the client keystore. + * @return a {@link ConnectionConfiguration} describes a connection configuration to + * Elasticsearch. */ public ConnectionConfiguration withKeystorePassword(String keystorePassword) { checkArgument(keystorePassword != null, "keystorePassword can not be null"); @@ -402,7 +412,13 @@ abstract static class Builder { abstract Read build(); } - /** Provide the Elasticsearch connection configuration object. */ + /** + * Provide the Elasticsearch connection configuration object. + * + * @param connectionConfiguration a {@link ConnectionConfiguration} describes a connection + * configuration to Elasticsearch. + * @return a {@link PTransform} reading data from Elasticsearch. + */ public Read withConnectionConfiguration(ConnectionConfiguration connectionConfiguration) { checkArgument(connectionConfiguration != null, "connectionConfiguration can not be null"); return builder().setConnectionConfiguration(connectionConfiguration).build(); @@ -414,6 +430,7 @@ public Read withConnectionConfiguration(ConnectionConfiguration connectionConfig * @param query the query. See Query * DSL + * @return a {@link PTransform} reading data from Elasticsearch. */ public Read withQuery(String query) { checkArgument(query != null, "query can not be null"); @@ -423,6 +440,8 @@ public Read withQuery(String query) { /** * Include metadata in result json documents. Document source will be under json node _source. + * + * @return a {@link PTransform} reading data from Elasticsearch. */ public Read withMetadata() { return builder().setWithMetadata(true).build(); @@ -432,6 +451,9 @@ public Read withMetadata() { * Provide a scroll keepalive. See scroll * API Default is "5m". Change this only if you get "No search context found" errors. + * + * @param scrollKeepalive keepalive duration of the scroll + * @return a {@link PTransform} reading data from Elasticsearch. */ public Read withScrollKeepalive(String scrollKeepalive) { checkArgument(scrollKeepalive != null, "scrollKeepalive can not be null"); @@ -447,6 +469,7 @@ public Read withScrollKeepalive(String scrollKeepalive) { * batchSize * * @param batchSize number of documents read in each scroll read + * @return a {@link PTransform} reading data from Elasticsearch. */ public Read withBatchSize(long batchSize) { checkArgument( @@ -522,9 +545,9 @@ public List> split( List sources = new ArrayList<>(); if (backendVersion == 2) { // 1. We split per shard : - // unfortunately, Elasticsearch 2. x doesn 't provide a way to do parallel reads on a single + // unfortunately, Elasticsearch 2.x doesn't provide a way to do parallel reads on a single // shard.So we do not use desiredBundleSize because we cannot split shards. - // With the slice API in ES 5.0 we will be able to use desiredBundleSize. + // With the slice API in ES 5.x+ we will be able to use desiredBundleSize. // Basically we will just ask the slice API to return data // in nbBundles = estimatedSize / desiredBundleSize chuncks. // So each beam source will read around desiredBundleSize volume of data. @@ -540,11 +563,11 @@ public List> split( sources.add(new BoundedElasticsearchSource(spec, shardId, null, null, backendVersion)); } checkArgument(!sources.isEmpty(), "No shard found"); - } else if (backendVersion == 5) { + } else if (backendVersion == 5 || backendVersion == 6) { long indexSize = BoundedElasticsearchSource.estimateIndexSize(connectionConfiguration); float nbBundlesFloat = (float) indexSize / desiredBundleSizeBytes; int nbBundles = (int) Math.ceil(nbBundlesFloat); - //ES slice api imposes that the number of slices is <= 1024 even if it can be overloaded + // ES slice api imposes that the number of slices is <= 1024 even if it can be overloaded if (nbBundles > 1024) { nbBundles = 1024; } @@ -573,7 +596,7 @@ static long estimateIndexSize(ConnectionConfiguration connectionConfiguration) // as Elasticsearch 2.x doesn't not support any way to do parallel read inside a shard // the estimated size bytes is not really used in the split into bundles. // However, we implement this method anyway as the runners can use it. - // NB: Elasticsearch 5.x now provides the slice API. + // NB: Elasticsearch 5.x+ now provides the slice API. // (https://www.elastic.co/guide/en/elasticsearch/reference/5.0/search-request-scroll.html // #sliced-scroll) JsonNode statsJson = getStats(connectionConfiguration, false); @@ -640,7 +663,9 @@ public boolean start() throws IOException { if (query == null) { query = "{\"query\": { \"match_all\": {} }}"; } - if (source.backendVersion == 5 && source.numSlices != null && source.numSlices > 1) { + if ((source.backendVersion == 5 || source.backendVersion == 6) + && source.numSlices != null + && source.numSlices > 1) { //if there is more than one slice, add the slice to the user query String sliceQuery = String.format("\"slice\": {\"id\": %s,\"max\": %s}", source.sliceId, source.numSlices); @@ -1240,10 +1265,10 @@ static int getBackendVersion(ConnectionConfiguration connectionConfiguration) { int backendVersion = Integer.parseInt(jsonNode.path("version").path("number").asText().substring(0, 1)); checkArgument( - (backendVersion == 2 || backendVersion == 5), + (backendVersion == 2 || backendVersion == 5 || backendVersion == 6), "The Elasticsearch version to connect to is %s.x. " + "This version of the ElasticsearchIO is only compatible with " - + "Elasticsearch v5.x and v2.x", + + "Elasticsearch v6.x, v5.x and v2.x", backendVersion); return backendVersion; diff --git a/sdks/java/javadoc/build.gradle b/sdks/java/javadoc/build.gradle index 4f73e23ee13..61565b0a5fe 100644 --- a/sdks/java/javadoc/build.gradle +++ b/sdks/java/javadoc/build.gradle @@ -54,6 +54,7 @@ def exportedJavadocProjects = [ ':beam-sdks-java-io-elasticsearch', ':beam-sdks-java-io-elasticsearch-tests-2', ':beam-sdks-java-io-elasticsearch-tests-5', + ':beam-sdks-java-io-elasticsearch-tests-6', ':beam-sdks-java-io-elasticsearch-tests-common', ':beam-sdks-java-io-google-cloud-platform', ':beam-sdks-java-io-hadoop-common', diff --git a/settings.gradle b/settings.gradle index 89e68951303..3ccdd4418c7 100644 --- a/settings.gradle +++ b/settings.gradle @@ -116,6 +116,8 @@ include "beam-sdks-java-io-elasticsearch-tests-2" project(":beam-sdks-java-io-elasticsearch-tests-2").dir = file("sdks/java/io/elasticsearch-tests/elasticsearch-tests-2") include "beam-sdks-java-io-elasticsearch-tests-5" project(":beam-sdks-java-io-elasticsearch-tests-5").dir = file("sdks/java/io/elasticsearch-tests/elasticsearch-tests-5") +include "beam-sdks-java-io-elasticsearch-tests-6" +project(":beam-sdks-java-io-elasticsearch-tests-6").dir = file("sdks/java/io/elasticsearch-tests/elasticsearch-tests-6") include "beam-sdks-java-io-elasticsearch-tests-common" project(":beam-sdks-java-io-elasticsearch-tests-common").dir = file("sdks/java/io/elasticsearch-tests/elasticsearch-tests-common") include "beam-sdks-java-io-file-based-io-tests"