From 59149bf65f6e94252e63c3a1cc92bb1bc818f4e0 Mon Sep 17 00:00:00 2001 From: Dan Hermann Date: Mon, 27 Sep 2021 08:38:10 -0500 Subject: [PATCH] Consolidate logic for creating ingest pipelines (#78289) --- .../ingest/PutPipelineTransportAction.java | 53 ++----- .../elasticsearch/ingest/IngestService.java | 75 ++++++++-- .../PutPipelineTransportActionTests.java | 136 ------------------ .../ingest/IngestServiceTests.java | 127 +++++++++++++++- 4 files changed, 190 insertions(+), 201 deletions(-) delete mode 100644 server/src/test/java/org/elasticsearch/action/ingest/PutPipelineTransportActionTests.java diff --git a/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java b/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java index 9d084151fa4ea..119b0db82f644 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java @@ -8,9 +8,7 @@ package org.elasticsearch.action.ingest; -import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.AcknowledgedResponse; @@ -21,20 +19,12 @@ import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.xcontent.XContentHelper; -import org.elasticsearch.ingest.IngestInfo; -import org.elasticsearch.ingest.IngestMetadata; import org.elasticsearch.ingest.IngestService; -import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import java.util.HashMap; -import java.util.Map; - import static org.elasticsearch.ingest.IngestService.INGEST_ORIGIN; public class PutPipelineTransportAction extends AcknowledgedTransportMasterNodeAction { @@ -59,42 +49,15 @@ public PutPipelineTransportAction(ThreadPool threadPool, TransportService transp @Override protected void masterOperation(Task task, PutPipelineRequest request, ClusterState state, ActionListener listener) throws Exception { - - Map pipelineConfig = null; - IngestMetadata currentIngestMetadata = state.metadata().custom(IngestMetadata.TYPE); - if (currentIngestMetadata != null && currentIngestMetadata.getPipelines().containsKey(request.getId())) { - pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2(); - var currentPipeline = currentIngestMetadata.getPipelines().get(request.getId()); - if (currentPipeline.getConfigAsMap().equals(pipelineConfig)) { - // existing pipeline matches request pipeline -- no need to update - listener.onResponse(AcknowledgedResponse.TRUE); - return; - } - } - - if (state.getNodes().getMinNodeVersion().before(Version.V_7_15_0)) { - pipelineConfig = pipelineConfig == null - ? XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2() - : pipelineConfig; - if (pipelineConfig.containsKey(Pipeline.META_KEY)) { - throw new IllegalStateException("pipelines with _meta field require minimum node version of " + Version.V_7_15_0); + ingestService.putPipeline( + request, + listener, + (nodeListener) -> { + NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); + nodesInfoRequest.clear(); + nodesInfoRequest.addMetric(NodesInfoRequest.Metric.INGEST.metricName()); + client.admin().cluster().nodesInfo(nodesInfoRequest, nodeListener); } - } - NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); - nodesInfoRequest.clear(); - nodesInfoRequest.addMetric(NodesInfoRequest.Metric.INGEST.metricName()); - client.admin().cluster().nodesInfo( - nodesInfoRequest, - ActionListener.wrap( - nodeInfos -> { - Map ingestInfos = new HashMap<>(); - for (NodeInfo nodeInfo : nodeInfos.getNodes()) { - ingestInfos.put(nodeInfo.getNode(), nodeInfo.getInfo(IngestInfo.class)); - } - ingestService.putPipeline(ingestInfos, request, listener); - }, - listener::onFailure - ) ); } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 66f7ac8007f64..6a08f972f85ce 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -14,8 +14,11 @@ import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; +import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.action.bulk.TransportBulkAction; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.ingest.DeletePipelineRequest; @@ -34,12 +37,12 @@ import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.core.Tuple; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.core.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.core.Tuple; import org.elasticsearch.env.Environment; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.index.IndexSettings; @@ -331,17 +334,56 @@ static List innerGetPipelines(IngestMetadata ingestMetada /** * Stores the specified pipeline definition in the request. */ - public void putPipeline(Map ingestInfos, PutPipelineRequest request, - ActionListener listener) throws Exception { - // validates the pipeline and processor configuration before submitting a cluster update task: - validatePipeline(ingestInfos, request); - clusterService.submitStateUpdateTask("put-pipeline-" + request.getId(), - new AckedClusterStateUpdateTask(request, listener) { - @Override - public ClusterState execute(ClusterState currentState) { - return innerPut(request, currentState); + public void putPipeline( + PutPipelineRequest request, + ActionListener listener, + Consumer> nodeInfoListener + ) throws Exception { + + Map pipelineConfig = null; + IngestMetadata currentIngestMetadata = state.metadata().custom(IngestMetadata.TYPE); + if (currentIngestMetadata != null && currentIngestMetadata.getPipelines().containsKey(request.getId())) { + pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2(); + var currentPipeline = currentIngestMetadata.getPipelines().get(request.getId()); + if (currentPipeline.getConfigAsMap().equals(pipelineConfig)) { + // existing pipeline matches request pipeline -- no need to update + listener.onResponse(AcknowledgedResponse.TRUE); + return; + } + } + + if (state.getNodes().getMinNodeVersion().before(Version.V_7_15_0)) { + pipelineConfig = pipelineConfig == null + ? XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2() + : pipelineConfig; + if (pipelineConfig.containsKey(Pipeline.META_KEY)) { + throw new IllegalStateException("pipelines with _meta field require minimum node version of " + Version.V_7_15_0); + } + } + + final Map config = pipelineConfig == null + ? XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2() + : pipelineConfig; + nodeInfoListener.accept(ActionListener.wrap( + nodeInfos -> { + Map ingestInfos = new HashMap<>(); + for (NodeInfo nodeInfo : nodeInfos.getNodes()) { + ingestInfos.put(nodeInfo.getNode(), nodeInfo.getInfo(IngestInfo.class)); } - }); + + validatePipeline(ingestInfos, request.getId(), config); + clusterService.submitStateUpdateTask( + "put-pipeline-" + request.getId(), + new AckedClusterStateUpdateTask(request, listener) { + @Override + public ClusterState execute(ClusterState currentState) { + return innerPut(request, currentState); + } + } + ); + }, + listener::onFailure) + ); } /** @@ -417,13 +459,16 @@ static ClusterState innerPut(PutPipelineRequest request, ClusterState currentSta return newState.build(); } - void validatePipeline(Map ingestInfos, PutPipelineRequest request) throws Exception { + void validatePipeline( + Map ingestInfos, + String pipelineId, + Map pipelineConfig + ) throws Exception { if (ingestInfos.isEmpty()) { throw new IllegalStateException("Ingest info is empty"); } - Map pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2(); - Pipeline pipeline = Pipeline.create(request.getId(), pipelineConfig, processorFactories, scriptService); + Pipeline pipeline = Pipeline.create(pipelineId, pipelineConfig, processorFactories, scriptService); List exceptions = new ArrayList<>(); for (Processor processor : pipeline.flattenAllProcessors()) { for (Map.Entry entry : ingestInfos.entrySet()) { diff --git a/server/src/test/java/org/elasticsearch/action/ingest/PutPipelineTransportActionTests.java b/server/src/test/java/org/elasticsearch/action/ingest/PutPipelineTransportActionTests.java deleted file mode 100644 index 22d7c6521f7d6..0000000000000 --- a/server/src/test/java/org/elasticsearch/action/ingest/PutPipelineTransportActionTests.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0 and the Server Side Public License, v 1; you may not use this file except - * in compliance with, at your election, the Elastic License 2.0 or the Server - * Side Public License, v 1. - */ - -package org.elasticsearch.action.ingest; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.Metadata; -import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.core.Tuple; -import org.elasticsearch.ingest.IngestMetadata; -import org.elasticsearch.ingest.IngestService; -import org.elasticsearch.ingest.PipelineConfiguration; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.client.NoOpNodeClient; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportService; - -import java.io.OutputStream; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicLong; - -import static org.elasticsearch.core.Tuple.tuple; -import static org.hamcrest.Matchers.equalTo; -import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class PutPipelineTransportActionTests extends ESTestCase { - - public void testUpdatingRandomPipelineWithoutChangesIsNoOp() throws Exception { - var randomMap = randomMap(10, 50, PutPipelineTransportActionTests::randomMapEntry); - - XContentBuilder x = XContentBuilder.builder(XContentType.JSON.xContent()) - .startObject() - .field("processors", randomMap) - .endObject(); - - OutputStream os = x.getOutputStream(); - x.generator().close(); - testUpdatingPipeline(os.toString()); - } - - public void testUpdatingPipelineWithoutChangesIsNoOp() throws Exception { - var value = randomAlphaOfLength(5); - var pipelineString = "{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"" + value + "\"}}]}"; - testUpdatingPipeline(pipelineString); - } - - private void testUpdatingPipeline(String pipelineString) throws Exception { - var threadPool = mock(ThreadPool.class); - when(threadPool.generic()).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE); - when(threadPool.executor(anyString())).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE); - var client = new NoOpNodeClient(threadPool); - var action = new PutPipelineTransportAction( - threadPool, - mock(TransportService.class), - mock(ActionFilters.class), - null, - mock(IngestService.class), - client - ); - - var pipelineId = randomAlphaOfLength(5); - var value = randomAlphaOfLength(5); - var existingPipeline = new PipelineConfiguration(pipelineId, new BytesArray(pipelineString), XContentType.JSON); - var clusterState = ClusterState.builder(new ClusterName("test")) - .metadata(Metadata.builder().putCustom( - IngestMetadata.TYPE, - new IngestMetadata(Map.of(pipelineId, existingPipeline)) - ).build() - ).build(); - - CountDownLatch latch = new CountDownLatch(1); - var listener = new ActionListener() { - final AtomicLong successCount = new AtomicLong(0); - final AtomicLong failureCount = new AtomicLong(0); - - @Override - public void onResponse(AcknowledgedResponse acknowledgedResponse) { - successCount.incrementAndGet(); - latch.countDown(); - } - - @Override - public void onFailure(Exception e) { - failureCount.incrementAndGet(); - latch.countDown(); - } - - public long getSuccessCount() { - return successCount.get(); - } - - public long getFailureCount() { - return failureCount.get(); - } - }; - - var request = new PutPipelineRequest(pipelineId, new BytesArray(pipelineString), XContentType.JSON); - action.masterOperation(null, request, clusterState, listener); - latch.await(); - - assertThat(client.getExecutionCount(), equalTo(0L)); - assertThat(listener.getSuccessCount(), equalTo(1L)); - assertThat(listener.getFailureCount(), equalTo(0L)); - } - - private static Tuple randomMapEntry() { - return tuple(randomAlphaOfLength(5), randomObject()); - } - - private static Object randomObject() { - return randomFrom( - random(), - ESTestCase::randomLong, - () -> generateRandomStringArray(10, 5, true), - () -> randomMap(3, 5, PutPipelineTransportActionTests::randomMapEntry), - () -> randomAlphaOfLength(5), - ESTestCase::randomTimeValue, - ESTestCase::randomDouble - ); - } -} diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 6458cfc76cf18..631efa75ae36d 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -15,13 +15,16 @@ import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.TransportBulkAction; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.ingest.DeletePipelineRequest; import org.elasticsearch.action.ingest.PutPipelineRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; @@ -40,8 +43,10 @@ import org.elasticsearch.common.time.DateFormatter; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.cbor.CborXContent; +import org.elasticsearch.core.Tuple; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; import org.elasticsearch.plugins.IngestPlugin; @@ -59,6 +64,7 @@ import org.mockito.ArgumentMatcher; import org.mockito.invocation.InvocationOnMock; +import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Collections; @@ -69,6 +75,7 @@ import java.util.Objects; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -78,6 +85,7 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; +import static org.elasticsearch.core.Tuple.tuple; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; @@ -274,13 +282,18 @@ public void testValidateNoIngestInfo() throws Exception { IngestService ingestService = createWithProcessors(); PutPipelineRequest putRequest = new PutPipelineRequest("_id", new BytesArray( "{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}"), XContentType.JSON); - Exception e = expectThrows(IllegalStateException.class, () -> ingestService.validatePipeline(emptyMap(), putRequest)); + + var pipelineConfig = XContentHelper.convertToMap(putRequest.getSource(), false, putRequest.getXContentType()).v2(); + Exception e = expectThrows( + IllegalStateException.class, + () -> ingestService.validatePipeline(emptyMap(), putRequest.getId(), pipelineConfig) + ); assertEquals("Ingest info is empty", e.getMessage()); DiscoveryNode discoveryNode = new DiscoveryNode("_node_id", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); IngestInfo ingestInfo = new IngestInfo(Collections.singletonList(new ProcessorInfo("set"))); - ingestService.validatePipeline(Collections.singletonMap(discoveryNode, ingestInfo), putRequest); + ingestService.validatePipeline(Collections.singletonMap(discoveryNode, ingestInfo), putRequest.getId(), pipelineConfig); } public void testGetProcessorsInPipeline() throws Exception { @@ -586,6 +599,7 @@ public void testValidate() throws Exception { "{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\", \"tag\": \"tag1\"}}," + "{\"remove\" : {\"field\": \"_field\", \"tag\": \"tag2\"}}]}"), XContentType.JSON); + var pipelineConfig = XContentHelper.convertToMap(putRequest.getSource(), false, putRequest.getXContentType()).v2(); DiscoveryNode node1 = new DiscoveryNode("_node_id1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); @@ -595,14 +609,17 @@ public void testValidate() throws Exception { ingestInfos.put(node1, new IngestInfo(Arrays.asList(new ProcessorInfo("set"), new ProcessorInfo("remove")))); ingestInfos.put(node2, new IngestInfo(Arrays.asList(new ProcessorInfo("set")))); - ElasticsearchParseException e = - expectThrows(ElasticsearchParseException.class, () -> ingestService.validatePipeline(ingestInfos, putRequest)); + ElasticsearchParseException e = expectThrows( + ElasticsearchParseException.class, + () -> ingestService.validatePipeline(ingestInfos, putRequest.getId(), pipelineConfig) + ); assertEquals("Processor type [remove] is not installed on node [" + node2 + "]", e.getMessage()); assertEquals("remove", e.getMetadata("es.processor_type").get(0)); assertEquals("tag2", e.getMetadata("es.processor_tag").get(0)); + var pipelineConfig2 = XContentHelper.convertToMap(putRequest.getSource(), false, putRequest.getXContentType()).v2(); ingestInfos.put(node2, new IngestInfo(Arrays.asList(new ProcessorInfo("set"), new ProcessorInfo("remove")))); - ingestService.validatePipeline(ingestInfos, putRequest); + ingestService.validatePipeline(ingestInfos, putRequest.getId(), pipelineConfig2); } public void testExecuteIndexPipelineExistsButFailedParsing() { @@ -1446,6 +1463,106 @@ public void testResolveRequestOrDefaultPipelineAndFinalPipeline() { } } + public void testUpdatingRandomPipelineWithoutChangesIsNoOp() throws Exception { + var randomMap = randomMap(10, 50, IngestServiceTests::randomMapEntry); + + XContentBuilder x = XContentBuilder.builder(XContentType.JSON.xContent()) + .startObject() + .field("processors", randomMap) + .endObject(); + + OutputStream os = x.getOutputStream(); + x.generator().close(); + testUpdatingPipeline(os.toString()); + } + + public void testUpdatingPipelineWithoutChangesIsNoOp() throws Exception { + var value = randomAlphaOfLength(5); + var pipelineString = "{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"" + value + "\"}}]}"; + testUpdatingPipeline(pipelineString); + } + + private void testUpdatingPipeline(String pipelineString) throws Exception { + var pipelineId = randomAlphaOfLength(5); + var existingPipeline = new PipelineConfiguration(pipelineId, new BytesArray(pipelineString), XContentType.JSON); + var clusterState = ClusterState.builder(new ClusterName("test")) + .metadata(Metadata.builder().putCustom( + IngestMetadata.TYPE, + new IngestMetadata(Map.of(pipelineId, existingPipeline)) + ).build() + ).build(); + + Client client = mock(Client.class); + ClusterService clusterService = mock(ClusterService.class); + when(clusterService.state()).thenReturn(clusterState); + IngestService ingestService = new IngestService(clusterService, threadPool, null, null, + null, Collections.singletonList(DUMMY_PLUGIN), client); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, clusterState)); + + CountDownLatch latch = new CountDownLatch(1); + var listener = new ActionListener() { + final AtomicLong successCount = new AtomicLong(0); + final AtomicLong failureCount = new AtomicLong(0); + + @Override + public void onResponse(AcknowledgedResponse acknowledgedResponse) { + successCount.incrementAndGet(); + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + failureCount.incrementAndGet(); + latch.countDown(); + } + + public long getSuccessCount() { + return successCount.get(); + } + + public long getFailureCount() { + return failureCount.get(); + } + }; + + var consumer = new Consumer>() { + final AtomicLong executionCount = new AtomicLong(0); + + @Override + public void accept(ActionListener nodesInfoResponseActionListener) { + executionCount.incrementAndGet(); + } + + public long getExecutionCount() { + return executionCount.get(); + } + }; + + var request = new PutPipelineRequest(pipelineId, new BytesArray(pipelineString), XContentType.JSON); + ingestService.putPipeline(request, listener, consumer); + latch.await(); + + assertThat(consumer.getExecutionCount(), equalTo(0L)); + assertThat(listener.getSuccessCount(), equalTo(1L)); + assertThat(listener.getFailureCount(), equalTo(0L)); + } + + private static Tuple randomMapEntry() { + return tuple(randomAlphaOfLength(5), randomObject()); + } + + private static Object randomObject() { + return randomFrom( + random(), + ESTestCase::randomLong, + () -> generateRandomStringArray(10, 5, true), + () -> randomMap(3, 5, IngestServiceTests::randomMapEntry), + () -> randomAlphaOfLength(5), + ESTestCase::randomTimeValue, + ESTestCase::randomDouble + ); + } + private IngestDocument eqIndexTypeId(final Map source) { return argThat(new IngestDocumentMatcher("_index", "_type", "_id", -3L, VersionType.INTERNAL, source)); }