From 100b5edb37c93d93dd374b9c4aa9cef35dfa2978 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Thu, 6 Sep 2018 17:27:11 +0200 Subject: [PATCH 1/4] rewrite feature index builder to use AsyncTwoPhaseIndexer --- .../job/FeatureIndexBuilderIndexer.java | 200 ++++++++++-------- .../job/FeatureIndexBuilderJob.java | 6 + .../job/FeatureIndexBuilderJobStats.java | 67 ++++++ .../job/FeatureIndexBuilderJobTask.java | 12 +- 4 files changed, 195 insertions(+), 90 deletions(-) create mode 100644 x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobStats.java diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderIndexer.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderIndexer.java index e5f33c03fb8b..7611bda93bc5 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderIndexer.java +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderIndexer.java @@ -7,10 +7,14 @@ package org.elasticsearch.xpack.ml.featureindexbuilder.job; import org.apache.log4j.Logger; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.bulk.BulkAction; import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; @@ -23,23 +27,28 @@ import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation; -import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation.Bucket; import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.composite.CompositeValuesSourceBuilder; import org.elasticsearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder; import org.elasticsearch.search.aggregations.metrics.avg.InternalAvg; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer; +import org.elasticsearch.xpack.core.indexing.IndexerState; +import org.elasticsearch.xpack.core.indexing.IterationResult; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; -import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings.DOC_TYPE; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; -public class FeatureIndexBuilderIndexer { +public class FeatureIndexBuilderIndexer extends AsyncTwoPhaseIndexer, FeatureIndexBuilderJobStats> { private static final String PIVOT_INDEX = "pivot-reviews"; private static final String SOURCE_INDEX = "anonreviews"; @@ -47,77 +56,115 @@ public class FeatureIndexBuilderIndexer { private FeatureIndexBuilderJob job; private Client client; - public FeatureIndexBuilderIndexer(FeatureIndexBuilderJob job, Client client) { + public FeatureIndexBuilderIndexer(Executor executor, FeatureIndexBuilderJob job, AtomicReference initialState, + Map initialPosition, Client client) { + super(executor, initialState, initialPosition, new FeatureIndexBuilderJobStats()); this.job = job; this.client = client; - logger.info("delete pivot-reviews"); - } - public synchronized void start() { - deleteIndex(client); + @Override + protected String getJobId() { + return job.getConfig().getId(); + } - createIndex(client); - - int runs = 0; + @Override + protected void onStartJob(long now) { + } - Map after = null; - logger.info("start feature indexing"); - SearchResponse response; - - try { - response = runQuery(client, after); + @Override + protected IterationResult> doProcess(SearchResponse searchResponse) { + final CompositeAggregation agg = searchResponse.getAggregations().get("feature"); + return new IterationResult<>(processBuckets(agg), agg.afterKey(), agg.getBuckets().isEmpty()); + } - CompositeAggregation compositeAggregation = response.getAggregations().get("feature"); - after = compositeAggregation.afterKey(); + private List processBuckets(CompositeAggregation agg) { + return agg.getBuckets().stream().map(b -> { + InternalAvg avgAgg = b.getAggregations().get("avg_rating"); + XContentBuilder builder; + try { + builder = jsonBuilder(); + + builder.startObject(); + builder.field("reviewerId", b.getKey().get("reviewerId")); + builder.field("avg_rating", avgAgg.getValue()); + builder.endObject(); + } catch (IOException e) { + throw new RuntimeException(e); + } + IndexRequest request = new IndexRequest(PIVOT_INDEX, DOC_TYPE).source(builder); + return request; + }).collect(Collectors.toList()); + } - while (after != null) { - indexBuckets(compositeAggregation); + @Override + protected SearchRequest buildSearchRequest() { - ++runs; - response = runQuery(client, after); + final Map position = getPosition(); - compositeAggregation = response.getAggregations().get("feature"); - after = compositeAggregation.afterKey(); - - //after = null; - } - - indexBuckets(compositeAggregation); - } catch (InterruptedException | ExecutionException e) { - logger.error("Failed to build feature index", e); + if (position == null) { + deleteIndex(client); + createIndex(client); } - logger.info("Finished feature indexing"); + SearchRequest request = buildFeatureQuery(position); + + return request; } - private void indexBuckets(CompositeAggregation compositeAggregation) { - BulkRequest bulkIndexRequest = new BulkRequest(); - try { - for (Bucket b : compositeAggregation.getBuckets()) { + @Override + protected void doNextSearch(SearchRequest request, ActionListener nextPhase) { + ClientHelper.executeWithHeadersAsync(job.getHeaders(), ClientHelper.ML_ORIGIN, client, SearchAction.INSTANCE, request, nextPhase); + } - InternalAvg avgAgg = b.getAggregations().get("avg_rating"); + @Override + protected void doNextBulk(BulkRequest request, ActionListener nextPhase) { + ClientHelper.executeWithHeadersAsync(job.getHeaders(), ClientHelper.ML_ORIGIN, client, BulkAction.INSTANCE, request, nextPhase); + } - XContentBuilder builder; - builder = jsonBuilder(); - builder.startObject(); - builder.field("reviewerId", b.getKey().get("reviewerId")); - builder.field("avg_rating", avgAgg.getValue()); - builder.endObject(); - bulkIndexRequest.add(new IndexRequest(PIVOT_INDEX, DOC_TYPE).source(builder)); + @Override + protected void doSaveState(IndexerState indexerState, Map position, Runnable next) { + if (indexerState.equals(IndexerState.ABORTING)) { + // If we're aborting, just invoke `next` (which is likely an onFailure handler) + next.run(); + } else { + // to be implemented + + final FeatureIndexBuilderJobState state = new FeatureIndexBuilderJobState(indexerState); + logger.info("Updating persistent state of job [" + job.getConfig().getId() + "] to [" + state.toString() + "]"); + + // TODO: we can not persist the state right now, need to be called from the task + next.run(); + // updatePersistentTaskState(state, ActionListener.wrap(task -> next.run(), exc + // -> { + // We failed to update the persistent task for some reason, + // set our flag back to what it was before + // next.run(); + // })); - } - client.bulk(bulkIndexRequest); - } catch (IOException e) { - logger.error("Failed to index", e); } } - + + @Override + protected void onFailure(Exception exc) { + logger.warn("FeatureIndexBuilder job [" + job.getConfig().getId() + "] failed with an exception: ", exc); + } + + @Override + protected void onFinish() { + logger.info("Finished indexing for job [" + job.getConfig().getId() + "]"); + } + + @Override + protected void onAbort() { + logger.info("FeatureIndexBuilder job [" + job.getConfig().getId() + "] received abort request, stopping indexer"); + } + /* * Hardcoded demo case for pivoting */ - + private static void deleteIndex(Client client) { DeleteIndexRequest deleteIndex = new DeleteIndexRequest(PIVOT_INDEX); @@ -127,46 +174,35 @@ private static void deleteIndex(Client client) { } catch (IndexNotFoundException e) { } } - + private static void createIndex(Client client) { - + CreateIndexRequest request = new CreateIndexRequest(PIVOT_INDEX); request.settings(Settings.builder() // <1> - .put("index.number_of_shards", 1) - .put("index.number_of_replicas", 0) - ); + .put("index.number_of_shards", 1).put("index.number_of_replicas", 0)); request.mapping(DOC_TYPE, // <1> - "{\n" + - " \"" + DOC_TYPE + "\": {\n" + - " \"properties\": {\n" + - " \"reviewerId\": {\n" + - " \"type\": \"keyword\"\n" + - " },\n" + - " \"avg_rating\": {\n" + - " \"type\": \"integer\"\n" + - " }\n" + - " }\n" + - " }\n" + - "}", // <2> + "{\n" + " \"" + DOC_TYPE + "\": {\n" + " \"properties\": {\n" + " \"reviewerId\": {\n" + + " \"type\": \"keyword\"\n" + " },\n" + " \"avg_rating\": {\n" + " \"type\": \"integer\"\n" + + " }\n" + " }\n" + " }\n" + "}", // <2> XContentType.JSON); IndicesAdminClient adminClient = client.admin().indices(); adminClient.create(request).actionGet(); } - + private static SearchRequest buildFeatureQuery(Map after) { QueryBuilder queryBuilder = new MatchAllQueryBuilder(); SearchRequest searchRequest = new SearchRequest(SOURCE_INDEX); - + List> sources = new ArrayList<>(); sources.add(new TermsValuesSourceBuilder("reviewerId").field("reviewerId")); - + CompositeAggregationBuilder compositeAggregation = new CompositeAggregationBuilder("feature", sources); compositeAggregation.size(1000); - + if (after != null) { compositeAggregation.aggregateAfter(after); } - + compositeAggregation.subAggregation(AggregationBuilders.avg("avg_rating").field("rating")); compositeAggregation.subAggregation(AggregationBuilders.cardinality("dc_vendors").field("vendorId")); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); @@ -174,21 +210,7 @@ private static SearchRequest buildFeatureQuery(Map after) { sourceBuilder.size(0); sourceBuilder.query(queryBuilder); searchRequest.source(sourceBuilder); - + return searchRequest; - } - - private static SearchResponse runQuery(Client client, Map after) throws InterruptedException, ExecutionException { - - SearchRequest request = buildFeatureQuery(after); - SearchResponse response = client.search(request).get(); - - return response; - } - - private static void indexResult() { - - - } } diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJob.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJob.java index 16a4163e8135..a1edfca2684a 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJob.java +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJob.java @@ -16,6 +16,8 @@ import org.elasticsearch.xpack.core.XPackPlugin; import java.io.IOException; +import java.util.Collections; +import java.util.Map; import java.util.Objects; public class FeatureIndexBuilderJob implements XPackPlugin.XPackPersistentTaskParams { @@ -92,4 +94,8 @@ public boolean equals(Object other) { public int hashCode() { return Objects.hash(config); } + + public Map getHeaders() { + return Collections.emptyMap(); + } } diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobStats.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobStats.java new file mode 100644 index 000000000000..a7c9392800f0 --- /dev/null +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobStats.java @@ -0,0 +1,67 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.featureindexbuilder.job; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.xpack.core.indexing.IndexerJobStats; + +import java.io.IOException; + +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; + +public class FeatureIndexBuilderJobStats extends IndexerJobStats { + private static ParseField NUM_PAGES = new ParseField("pages_processed"); + private static ParseField NUM_INPUT_DOCUMENTS = new ParseField("documents_processed"); + private static ParseField NUM_OUTPUT_DOCUMENTS = new ParseField("documents_indexed"); + private static ParseField NUM_INVOCATIONS = new ParseField("trigger_count"); + + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + NAME.getPreferredName(), + args -> new FeatureIndexBuilderJobStats((long) args[0], (long) args[1], (long) args[2], (long) args[3])); + + static { + PARSER.declareLong(constructorArg(), NUM_PAGES); + PARSER.declareLong(constructorArg(), NUM_INPUT_DOCUMENTS); + PARSER.declareLong(constructorArg(), NUM_OUTPUT_DOCUMENTS); + PARSER.declareLong(constructorArg(), NUM_INVOCATIONS); + } + + public FeatureIndexBuilderJobStats() { + super(); + } + + public FeatureIndexBuilderJobStats(long numPages, long numInputDocuments, long numOuputDocuments, long numInvocations) { + super(numPages, numInputDocuments, numOuputDocuments, numInvocations); + } + + public FeatureIndexBuilderJobStats(StreamInput in) throws IOException { + super(in); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(NUM_PAGES.getPreferredName(), numPages); + builder.field(NUM_INPUT_DOCUMENTS.getPreferredName(), numInputDocuments); + builder.field(NUM_OUTPUT_DOCUMENTS.getPreferredName(), numOuputDocuments); + builder.field(NUM_INVOCATIONS.getPreferredName(), numInvocations); + builder.endObject(); + return builder; + } + + public static FeatureIndexBuilderJobStats fromXContent(XContentParser parser) { + try { + return PARSER.parse(parser, null); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobTask.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobTask.java index a4de3927e5bc..aaf49d716cbc 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobTask.java +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobTask.java @@ -17,6 +17,7 @@ import org.elasticsearch.persistent.PersistentTasksExecutor; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine.Event; import org.elasticsearch.xpack.ml.featureindexbuilder.FeatureIndexBuilder; @@ -24,6 +25,7 @@ import org.elasticsearch.xpack.ml.featureindexbuilder.action.StartFeatureIndexBuilderJobAction.Response; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; public class FeatureIndexBuilderJobTask extends AllocatedPersistentTask implements SchedulerEngine.Listener { @@ -82,7 +84,10 @@ public FeatureIndexBuilderJobTask(long id, String type, String action, TaskId pa this.job = job; logger.info("construct job task"); // todo: simplistic implementation for now - this.indexer = new FeatureIndexBuilderIndexer(job, client); + IndexerState initialState = IndexerState.STOPPED; + Map initialPosition = null; + this.indexer = new FeatureIndexBuilderIndexer(threadPool.executor(ThreadPool.Names.GENERIC), job, + new AtomicReference<>(initialState), initialPosition, client); } public FeatureIndexBuilderJobConfig getConfig() { @@ -96,6 +101,11 @@ public synchronized void start(ActionListener listener) { @Override public void triggered(Event event) { + if (event.getJobName().equals(SCHEDULE_NAME + "_" + job.getConfig().getId())) { + logger.debug( + "FeatureIndexBuilder indexer [" + event.getJobName() + "] schedule has triggered, state: [" + indexer.getState() + "]"); + indexer.maybeTriggerAsyncJob(System.currentTimeMillis()); + } } } From 4ad11f34b1f163b2ff206e6bff235a385b1ab7de Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Fri, 7 Sep 2018 15:10:43 +0200 Subject: [PATCH 2/4] move index creation into put job action --- .../FeatureIndexBuilder.java | 4 +- ...nsportPutFeatureIndexBuilderJobAction.java | 41 +++++- .../job/FeatureIndexBuilderIndexer.java | 107 ++-------------- ...ndexBuilderJobPersistentTasksExecutor.java | 62 +++++++++ .../job/FeatureIndexBuilderJobTask.java | 118 ++++++++++-------- 5 files changed, 179 insertions(+), 153 deletions(-) create mode 100644 x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobPersistentTasksExecutor.java diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/FeatureIndexBuilder.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/FeatureIndexBuilder.java index bd9bcd751be6..01537d493c3e 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/FeatureIndexBuilder.java +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/FeatureIndexBuilder.java @@ -36,7 +36,7 @@ import org.elasticsearch.xpack.ml.featureindexbuilder.action.TransportPutFeatureIndexBuilderJobAction; import org.elasticsearch.xpack.ml.featureindexbuilder.action.TransportStartFeatureIndexBuilderJobAction; import org.elasticsearch.xpack.ml.featureindexbuilder.job.FeatureIndexBuilderJob; -import org.elasticsearch.xpack.ml.featureindexbuilder.job.FeatureIndexBuilderJobTask; +import org.elasticsearch.xpack.ml.featureindexbuilder.job.FeatureIndexBuilderJobPersistentTasksExecutor; import org.elasticsearch.xpack.ml.featureindexbuilder.rest.action.RestPutFeatureIndexBuilderJobAction; import org.elasticsearch.xpack.ml.featureindexbuilder.rest.action.RestStartFeatureIndexBuilderJobAction; @@ -127,7 +127,7 @@ public List> getPersistentTasksExecutor(ClusterServic } SchedulerEngine schedulerEngine = new SchedulerEngine(settings, Clock.systemUTC()); - return Collections.singletonList(new FeatureIndexBuilderJobTask.FeatureIndexBuilderJobPersistentTasksExecutor(settings, client, + return Collections.singletonList(new FeatureIndexBuilderJobPersistentTasksExecutor(settings, client, schedulerEngine, threadPool)); } @Override diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/TransportPutFeatureIndexBuilderJobAction.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/TransportPutFeatureIndexBuilderJobAction.java index 16c541db7a01..37252bf82a41 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/TransportPutFeatureIndexBuilderJobAction.java +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/TransportPutFeatureIndexBuilderJobAction.java @@ -7,9 +7,11 @@ package org.elasticsearch.xpack.ml.featureindexbuilder.action; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.Client; +import org.elasticsearch.client.IndicesAdminClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; @@ -17,6 +19,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.persistent.PersistentTasksService; @@ -29,8 +32,14 @@ import org.elasticsearch.xpack.ml.featureindexbuilder.job.FeatureIndexBuilderJob; import org.elasticsearch.xpack.ml.featureindexbuilder.job.FeatureIndexBuilderJobConfig; +import static org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings.DOC_TYPE; + public class TransportPutFeatureIndexBuilderJobAction extends TransportMasterNodeAction { + + // TODO: hack, to be replaced + private static final String PIVOT_INDEX = "pivot-reviews"; + private final XPackLicenseState licenseState; private final PersistentTasksService persistentTasksService; private final Client client; @@ -67,7 +76,7 @@ protected void masterOperation(Request request, ClusterState clusterState, Actio XPackPlugin.checkReadyForXPackCustomMetadata(clusterState); FeatureIndexBuilderJob job = createFeatureIndexBuilderJob(request.getConfig(), threadPool); - + createIndex(client, job.getConfig().getId()); startPersistentTask(job, listener, persistentTasksService); } @@ -90,4 +99,34 @@ static void startPersistentTask(FeatureIndexBuilderJob job, ActionListener + .put("index.number_of_shards", 1).put("index.number_of_replicas", 0)); + request.mapping(DOC_TYPE, // <1> + "{\n" + + " \"" + DOC_TYPE + "\": {\n" + + " \"properties\": {\n" + + " \"reviewerId\": {\n" + + " \"type\": \"keyword\"\n" + + " },\n" + + " \"avg_rating\": {\n" + + " \"type\": \"integer\"\n" + + " }\n" + + " }\n" + + " }\n" + + "}", // <2> + XContentType.JSON); + IndicesAdminClient adminClient = client.admin().indices(); + adminClient.create(request).actionGet(); + } } diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderIndexer.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderIndexer.java index 7611bda93bc5..2dfc1daa8e1d 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderIndexer.java +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderIndexer.java @@ -7,22 +7,10 @@ package org.elasticsearch.xpack.ml.featureindexbuilder.job; import org.apache.log4j.Logger; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; -import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; -import org.elasticsearch.action.bulk.BulkAction; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.client.Client; -import org.elasticsearch.client.IndicesAdminClient; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.query.MatchAllQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.search.aggregations.AggregationBuilders; @@ -32,7 +20,6 @@ import org.elasticsearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder; import org.elasticsearch.search.aggregations.metrics.avg.InternalAvg; import org.elasticsearch.search.builder.SearchSourceBuilder; -import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.indexing.IterationResult; @@ -48,20 +35,18 @@ import static org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings.DOC_TYPE; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; -public class FeatureIndexBuilderIndexer extends AsyncTwoPhaseIndexer, FeatureIndexBuilderJobStats> { +public abstract class FeatureIndexBuilderIndexer extends AsyncTwoPhaseIndexer, FeatureIndexBuilderJobStats> { private static final String PIVOT_INDEX = "pivot-reviews"; private static final String SOURCE_INDEX = "anonreviews"; private static final Logger logger = Logger.getLogger(FeatureIndexBuilderIndexer.class.getName()); private FeatureIndexBuilderJob job; - private Client client; public FeatureIndexBuilderIndexer(Executor executor, FeatureIndexBuilderJob job, AtomicReference initialState, - Map initialPosition, Client client) { + Map initialPosition) { super(executor, initialState, initialPosition, new FeatureIndexBuilderJobStats()); this.job = job; - this.client = client; } @Override @@ -93,7 +78,9 @@ private List processBuckets(CompositeAggregation agg) { } catch (IOException e) { throw new RuntimeException(e); } - IndexRequest request = new IndexRequest(PIVOT_INDEX, DOC_TYPE).source(builder); + + String indexName = PIVOT_INDEX + "_" + job.getConfig().getId(); + IndexRequest request = new IndexRequest(indexName, DOC_TYPE).source(builder); return request; }).collect(Collectors.toList()); } @@ -102,93 +89,15 @@ private List processBuckets(CompositeAggregation agg) { protected SearchRequest buildSearchRequest() { final Map position = getPosition(); - - if (position == null) { - deleteIndex(client); - createIndex(client); - } - SearchRequest request = buildFeatureQuery(position); - return request; } - @Override - protected void doNextSearch(SearchRequest request, ActionListener nextPhase) { - ClientHelper.executeWithHeadersAsync(job.getHeaders(), ClientHelper.ML_ORIGIN, client, SearchAction.INSTANCE, request, nextPhase); - } - - @Override - protected void doNextBulk(BulkRequest request, ActionListener nextPhase) { - ClientHelper.executeWithHeadersAsync(job.getHeaders(), ClientHelper.ML_ORIGIN, client, BulkAction.INSTANCE, request, nextPhase); - } - - @Override - protected void doSaveState(IndexerState indexerState, Map position, Runnable next) { - if (indexerState.equals(IndexerState.ABORTING)) { - // If we're aborting, just invoke `next` (which is likely an onFailure handler) - next.run(); - } else { - // to be implemented - - final FeatureIndexBuilderJobState state = new FeatureIndexBuilderJobState(indexerState); - logger.info("Updating persistent state of job [" + job.getConfig().getId() + "] to [" + state.toString() + "]"); - - // TODO: we can not persist the state right now, need to be called from the task - next.run(); - // updatePersistentTaskState(state, ActionListener.wrap(task -> next.run(), exc - // -> { - // We failed to update the persistent task for some reason, - // set our flag back to what it was before - // next.run(); - // })); - - } - } - - @Override - protected void onFailure(Exception exc) { - logger.warn("FeatureIndexBuilder job [" + job.getConfig().getId() + "] failed with an exception: ", exc); - } - - @Override - protected void onFinish() { - logger.info("Finished indexing for job [" + job.getConfig().getId() + "]"); - } - - @Override - protected void onAbort() { - logger.info("FeatureIndexBuilder job [" + job.getConfig().getId() + "] received abort request, stopping indexer"); - } - /* - * Hardcoded demo case for pivoting + * Mocked demo case + * + * TODO: everything below will be replaced with proper implementation read from job configuration */ - - private static void deleteIndex(Client client) { - DeleteIndexRequest deleteIndex = new DeleteIndexRequest(PIVOT_INDEX); - - IndicesAdminClient adminClient = client.admin().indices(); - try { - adminClient.delete(deleteIndex).actionGet(); - } catch (IndexNotFoundException e) { - } - } - - private static void createIndex(Client client) { - - CreateIndexRequest request = new CreateIndexRequest(PIVOT_INDEX); - request.settings(Settings.builder() // <1> - .put("index.number_of_shards", 1).put("index.number_of_replicas", 0)); - request.mapping(DOC_TYPE, // <1> - "{\n" + " \"" + DOC_TYPE + "\": {\n" + " \"properties\": {\n" + " \"reviewerId\": {\n" - + " \"type\": \"keyword\"\n" + " },\n" + " \"avg_rating\": {\n" + " \"type\": \"integer\"\n" - + " }\n" + " }\n" + " }\n" + "}", // <2> - XContentType.JSON); - IndicesAdminClient adminClient = client.admin().indices(); - adminClient.create(request).actionGet(); - } - private static SearchRequest buildFeatureQuery(Map after) { QueryBuilder queryBuilder = new MatchAllQueryBuilder(); SearchRequest searchRequest = new SearchRequest(SOURCE_INDEX); diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobPersistentTasksExecutor.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobPersistentTasksExecutor.java new file mode 100644 index 000000000000..f090bda906fe --- /dev/null +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobPersistentTasksExecutor.java @@ -0,0 +1,62 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.featureindexbuilder.job; + +import org.elasticsearch.client.Client; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.persistent.AllocatedPersistentTask; +import org.elasticsearch.persistent.PersistentTaskState; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.persistent.PersistentTasksExecutor; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; +import org.elasticsearch.xpack.ml.featureindexbuilder.FeatureIndexBuilder; + +import java.util.Map; + +public class FeatureIndexBuilderJobPersistentTasksExecutor extends PersistentTasksExecutor { + private final Client client; + private final SchedulerEngine schedulerEngine; + private final ThreadPool threadPool; + + public FeatureIndexBuilderJobPersistentTasksExecutor(Settings settings, Client client, SchedulerEngine schedulerEngine, + ThreadPool threadPool) { + super(settings, "xpack/feature_index_builder/job", FeatureIndexBuilder.TASK_THREAD_POOL_NAME); + this.client = client; + this.schedulerEngine = schedulerEngine; + this.threadPool = threadPool; + } + + @Override + protected void nodeOperation(AllocatedPersistentTask task, @Nullable FeatureIndexBuilderJob params, PersistentTaskState state) { + FeatureIndexBuilderJobTask buildTask = (FeatureIndexBuilderJobTask) task; + SchedulerEngine.Job schedulerJob = new SchedulerEngine.Job(FeatureIndexBuilderJobTask.SCHEDULE_NAME + "_" + params.getConfig().getId(), next()); + + // Note that while the task is added to the scheduler here, the internal state + // will prevent + // it from doing any work until the task is "started" via the StartJob api + schedulerEngine.register(buildTask); + schedulerEngine.add(schedulerJob); + + logger.info("FeatureIndexBuilder job [" + params.getConfig().getId() + "] created."); + } + + static SchedulerEngine.Schedule next() { + return (startTime, now) -> { + return now + 1000; // to be fixed, hardcode something + }; + } + + @Override + protected AllocatedPersistentTask createTask(long id, String type, String action, TaskId parentTaskId, + PersistentTasksCustomMetaData.PersistentTask persistentTask, Map headers) { + return new FeatureIndexBuilderJobTask(id, type, action, parentTaskId, persistentTask.getParams(), + (FeatureIndexBuilderJobState) persistentTask.getState(), client, schedulerEngine, threadPool, headers); + } +} \ No newline at end of file diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobTask.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobTask.java index aaf49d716cbc..381e57e9027c 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobTask.java +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobTask.java @@ -8,19 +8,20 @@ import org.apache.log4j.Logger; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.BulkAction; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.search.SearchAction; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; -import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.persistent.AllocatedPersistentTask; -import org.elasticsearch.persistent.PersistentTaskState; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData; -import org.elasticsearch.persistent.PersistentTasksExecutor; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine.Event; -import org.elasticsearch.xpack.ml.featureindexbuilder.FeatureIndexBuilder; import org.elasticsearch.xpack.ml.featureindexbuilder.action.StartFeatureIndexBuilderJobAction; import org.elasticsearch.xpack.ml.featureindexbuilder.action.StartFeatureIndexBuilderJobAction.Response; @@ -30,64 +31,24 @@ public class FeatureIndexBuilderJobTask extends AllocatedPersistentTask implements SchedulerEngine.Listener { private static final Logger logger = Logger.getLogger(FeatureIndexBuilderJobTask.class.getName()); + + private final FeatureIndexBuilderJob job; + private final ThreadPool threadPool; private final FeatureIndexBuilderIndexer indexer; static final String SCHEDULE_NAME = "xpack/feature_index_builder/job" + "/schedule"; - public static class FeatureIndexBuilderJobPersistentTasksExecutor extends PersistentTasksExecutor { - private final Client client; - private final SchedulerEngine schedulerEngine; - private final ThreadPool threadPool; - - public FeatureIndexBuilderJobPersistentTasksExecutor(Settings settings, Client client, SchedulerEngine schedulerEngine, - ThreadPool threadPool) { - super(settings, "xpack/feature_index_builder/job", FeatureIndexBuilder.TASK_THREAD_POOL_NAME); - this.client = client; - this.schedulerEngine = schedulerEngine; - this.threadPool = threadPool; - } - - @Override - protected void nodeOperation(AllocatedPersistentTask task, @Nullable FeatureIndexBuilderJob params, PersistentTaskState state) { - FeatureIndexBuilderJobTask buildTask = (FeatureIndexBuilderJobTask) task; - SchedulerEngine.Job schedulerJob = new SchedulerEngine.Job(SCHEDULE_NAME + "_" + params.getConfig().getId(), next()); - - // Note that while the task is added to the scheduler here, the internal state - // will prevent - // it from doing any work until the task is "started" via the StartJob api - schedulerEngine.register(buildTask); - schedulerEngine.add(schedulerJob); - - logger.info("FeatureIndexBuilder job [" + params.getConfig().getId() + "] created."); - } - - static SchedulerEngine.Schedule next() { - return (startTime, now) -> { - return now + 1000; // to be fixed, hardcode something - }; - } - - @Override - protected AllocatedPersistentTask createTask(long id, String type, String action, TaskId parentTaskId, - PersistentTasksCustomMetaData.PersistentTask persistentTask, Map headers) { - return new FeatureIndexBuilderJobTask(id, type, action, parentTaskId, persistentTask.getParams(), - (FeatureIndexBuilderJobState) persistentTask.getState(), client, schedulerEngine, threadPool, headers); - } - } - - private final FeatureIndexBuilderJob job; - public FeatureIndexBuilderJobTask(long id, String type, String action, TaskId parentTask, FeatureIndexBuilderJob job, FeatureIndexBuilderJobState state, Client client, SchedulerEngine schedulerEngine, ThreadPool threadPool, Map headers) { super(id, type, action, "" + "_" + job.getConfig().getId(), parentTask, headers); this.job = job; + this.threadPool = threadPool; logger.info("construct job task"); // todo: simplistic implementation for now IndexerState initialState = IndexerState.STOPPED; Map initialPosition = null; - this.indexer = new FeatureIndexBuilderIndexer(threadPool.executor(ThreadPool.Names.GENERIC), job, - new AtomicReference<>(initialState), initialPosition, client); + this.indexer = new ClientFeatureIndexBuilderIndexer(job, new AtomicReference<>(initialState), initialPosition, client); } public FeatureIndexBuilderJobConfig getConfig() { @@ -108,4 +69,59 @@ public void triggered(Event event) { } } + protected class ClientFeatureIndexBuilderIndexer extends FeatureIndexBuilderIndexer { + private final Client client; + + public ClientFeatureIndexBuilderIndexer(FeatureIndexBuilderJob job, AtomicReference initialState, + Map initialPosition, Client client) { + super(threadPool.executor(ThreadPool.Names.GENERIC), job, initialState, initialPosition); + this.client = client; + } + + @Override + protected void doNextSearch(SearchRequest request, ActionListener nextPhase) { + ClientHelper.executeWithHeadersAsync(job.getHeaders(), ClientHelper.ML_ORIGIN, client, SearchAction.INSTANCE, request, + nextPhase); + } + + @Override + protected void doNextBulk(BulkRequest request, ActionListener nextPhase) { + ClientHelper.executeWithHeadersAsync(job.getHeaders(), ClientHelper.ML_ORIGIN, client, BulkAction.INSTANCE, request, nextPhase); + } + + @Override + protected void doSaveState(IndexerState indexerState, Map position, Runnable next) { + if (indexerState.equals(IndexerState.ABORTING)) { + // If we're aborting, just invoke `next` (which is likely an onFailure handler) + next.run(); + } else { + // to be implemented + + final FeatureIndexBuilderJobState state = new FeatureIndexBuilderJobState(indexerState); + logger.info("Updating persistent state of job [" + job.getConfig().getId() + "] to [" + state.toString() + "]"); + + // TODO: we can not persist the state right now, need to be called from the task + updatePersistentTaskState(state, ActionListener.wrap(task -> next.run(), exc -> { + // We failed to update the persistent task for some reason, + // set our flag back to what it was before + next.run(); + })); + } + } + + @Override + protected void onFailure(Exception exc) { + logger.warn("FeatureIndexBuilder job [" + job.getConfig().getId() + "] failed with an exception: ", exc); + } + + @Override + protected void onFinish() { + logger.info("Finished indexing for job [" + job.getConfig().getId() + "]"); + } + + @Override + protected void onAbort() { + logger.info("FeatureIndexBuilder job [" + job.getConfig().getId() + "] received abort request, stopping indexer"); + } + } } From be9f478d5554c9286d02a316e7dd410b72ae17eb Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Fri, 7 Sep 2018 16:45:54 +0200 Subject: [PATCH 3/4] change exception type --- .../job/FeatureIndexBuilderIndexer.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderIndexer.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderIndexer.java index 2dfc1daa8e1d..8ff748ad265d 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderIndexer.java +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderIndexer.java @@ -25,6 +25,7 @@ import org.elasticsearch.xpack.core.indexing.IterationResult; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -64,6 +65,11 @@ protected IterationResult> doProcess(SearchResponse searchRe return new IterationResult<>(processBuckets(agg), agg.afterKey(), agg.getBuckets().isEmpty()); } + /* + * Mocked demo case + * + * TODO: replace with proper implementation + */ private List processBuckets(CompositeAggregation agg) { return agg.getBuckets().stream().map(b -> { InternalAvg avgAgg = b.getAggregations().get("avg_rating"); @@ -76,7 +82,7 @@ private List processBuckets(CompositeAggregation agg) { builder.field("avg_rating", avgAgg.getValue()); builder.endObject(); } catch (IOException e) { - throw new RuntimeException(e); + throw new UncheckedIOException(e); } String indexName = PIVOT_INDEX + "_" + job.getConfig().getId(); From 460fe452ae23f1323467befaaa32cde6980098a9 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Mon, 10 Sep 2018 09:47:51 +0200 Subject: [PATCH 4/4] fix style violation --- .../job/FeatureIndexBuilderJobPersistentTasksExecutor.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobPersistentTasksExecutor.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobPersistentTasksExecutor.java index f090bda906fe..fefb383f94b0 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobPersistentTasksExecutor.java +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobPersistentTasksExecutor.java @@ -36,7 +36,8 @@ public FeatureIndexBuilderJobPersistentTasksExecutor(Settings settings, Client c @Override protected void nodeOperation(AllocatedPersistentTask task, @Nullable FeatureIndexBuilderJob params, PersistentTaskState state) { FeatureIndexBuilderJobTask buildTask = (FeatureIndexBuilderJobTask) task; - SchedulerEngine.Job schedulerJob = new SchedulerEngine.Job(FeatureIndexBuilderJobTask.SCHEDULE_NAME + "_" + params.getConfig().getId(), next()); + SchedulerEngine.Job schedulerJob = new SchedulerEngine.Job( + FeatureIndexBuilderJobTask.SCHEDULE_NAME + "_" + params.getConfig().getId(), next()); // Note that while the task is added to the scheduler here, the internal state // will prevent