Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] automatically update deprecated datafeed aggregations #55678

Merged
merged 4 commits into from
Apr 28, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand Down Expand Up @@ -34,14 +35,16 @@ class AggProvider implements Writeable, ToXContentObject {
private Exception parsingException;
private AggregatorFactories.Builder parsedAggs;
private Map<String, Object> aggs;
private boolean rewroteAggs;

static AggProvider fromXContent(XContentParser parser, boolean lenient) throws IOException {
Map<String, Object> aggs = parser.mapOrdered();
// NOTE: Always rewrite potentially old date histogram intervals.
// This should occur in 8.x+ but not 7.x.
// 7.x is BWC with versions that do not support the new date_histogram fields
boolean rewroteAggs = false;
if (lenient) {
rewriteDateHistogramInterval(aggs, false);
rewroteAggs = rewriteDateHistogramInterval(aggs, false);
}
AggregatorFactories.Builder parsedAggs = null;
Exception exception = null;
Expand All @@ -61,7 +64,7 @@ static AggProvider fromXContent(XContentParser parser, boolean lenient) throws I
throw ExceptionsHelper.badRequestException(Messages.DATAFEED_CONFIG_AGG_BAD_FORMAT, ex);
}
}
return new AggProvider(aggs, parsedAggs, exception);
return new AggProvider(aggs, parsedAggs, exception, rewroteAggs);
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -99,30 +102,40 @@ static AggProvider fromParsedAggs(AggregatorFactories.Builder parsedAggs) throws
new AggProvider(
XContentObjectTransformer.aggregatorTransformer(NamedXContentRegistry.EMPTY).toMap(parsedAggs),
parsedAggs,
null);
null,
false);
}

static AggProvider fromStream(StreamInput in) throws IOException {
return new AggProvider(in.readMap(), in.readOptionalWriteable(AggregatorFactories.Builder::new), in.readException());
return new AggProvider(
in.readMap(),
in.readOptionalWriteable(AggregatorFactories.Builder::new),
in.readException(),
in.getVersion().onOrAfter(Version.V_8_0_0) ? in.readBoolean() : false);
}

AggProvider(Map<String, Object> aggs, AggregatorFactories.Builder parsedAggs, Exception parsingException) {
AggProvider(Map<String, Object> aggs, AggregatorFactories.Builder parsedAggs, Exception parsingException, boolean rewroteAggs) {
this.aggs = Collections.unmodifiableMap(new LinkedHashMap<>(Objects.requireNonNull(aggs, "[aggs] must not be null")));
this.parsedAggs = parsedAggs;
this.parsingException = parsingException;
this.rewroteAggs = rewroteAggs;
}

AggProvider(AggProvider other) {
this.aggs = new LinkedHashMap<>(other.aggs);
this.parsedAggs = other.parsedAggs;
this.parsingException = other.parsingException;
this.rewroteAggs = other.rewroteAggs;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeMap(aggs);
out.writeOptionalWriteable(parsedAggs);
out.writeException(parsingException);
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeBoolean(rewroteAggs);
}
}

public Exception getParsingException() {
Expand All @@ -137,6 +150,10 @@ public Map<String, Object> getAggs() {
return aggs;
}

public boolean isRewroteAggs() {
return rewroteAggs;
}

@Override
public boolean equals(Object other) {
if (this == other) {
Expand All @@ -151,17 +168,38 @@ public boolean equals(Object other) {

return Objects.equals(this.aggs, that.aggs)
&& Objects.equals(this.parsedAggs, that.parsedAggs)
&& Objects.equals(this.parsingException, that.parsingException);
&& equalExceptionMessages(this.parsingException, that.parsingException)
&& Objects.equals(this.rewroteAggs, that.rewroteAggs);
}

private static boolean equalExceptionMessages(Exception lft, Exception rgt) {
if (lft == rgt) {
return true;
}
if (lft == null || rgt == null) {
return false;
}
return Objects.equals(lft.getMessage(), rgt.getMessage());
}

@Override
public int hashCode() {
return Objects.hash(aggs, parsedAggs, parsingException);
return Objects.hash(aggs, parsedAggs, parsingException == null ? null : parsingException.getMessage(), rewroteAggs);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.map(aggs);
return builder;
}

@Override
public String toString() {
return "AggProvider{" +
"parsingException=" + parsingException +
", parsedAggs=" + parsedAggs +
", aggs=" + aggs +
", rewroteAggs=" + rewroteAggs +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,14 @@ public boolean hasAggregations() {
return aggProvider != null && aggProvider.getAggs() != null && aggProvider.getAggs().size() > 0;
}

public boolean aggsRewritten() {
return aggProvider != null && aggProvider.isRewroteAggs();
}

public AggProvider getAggProvider() {
return aggProvider;
}

public List<SearchSourceBuilder.ScriptField> getScriptFields() {
return scriptFields == null ? Collections.emptyList() : scriptFields;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public static AggProvider createRandomValidAggProvider(String name, String field
AggregatorFactories.Builder aggs =
XContentObjectTransformer.aggregatorTransformer(new NamedXContentRegistry(searchModule.getNamedXContents()))
.fromMap(agg);
return new AggProvider(agg, aggs, null);
return new AggProvider(agg, aggs, null, false);
} catch (IOException ex) {
fail(ex.getMessage());
}
Expand Down Expand Up @@ -182,6 +182,6 @@ protected AggProvider mutateInstance(AggProvider instance) throws IOException {
default:
throw new AssertionError("Illegal randomisation branch");
}
return new AggProvider(instance.getAggs(), parsedAggs, parsingException);
return new AggProvider(instance.getAggs(), parsedAggs, parsingException, false);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.datafeed;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.xpack.core.ml.AbstractBWCWireSerializationTestCase;
import org.elasticsearch.xpack.core.ml.utils.XContentObjectTransformer;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;

public class AggProviderWireSerializationTests extends AbstractBWCWireSerializationTestCase<AggProvider> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

toXContent for the agg provider should only provide the agg map. This test is to make sure that we appropriate write between nodes of various versions.


@Override
protected NamedWriteableRegistry writableRegistry() {
SearchModule searchModule = new SearchModule(Settings.EMPTY, Collections.emptyList());
return new NamedWriteableRegistry(searchModule.getNamedWriteables());
}

@Override
protected NamedWriteableRegistry getNamedWriteableRegistry() {
return writableRegistry();
}

@Override
protected AggProvider createTestInstance() {
return createRandomValidAggProvider();
}

@Override
protected Writeable.Reader<AggProvider> instanceReader() {
return AggProvider::fromStream;
}

public static AggProvider createRandomValidAggProvider() {
Map<String, Object> agg = Collections.singletonMap(randomAlphaOfLengthBetween(1, 10),
Collections.singletonMap("avg", Collections.singletonMap("field", randomAlphaOfLengthBetween(1, 10))));
try {
SearchModule searchModule = new SearchModule(Settings.EMPTY, Collections.emptyList());
AggregatorFactories.Builder aggs =
XContentObjectTransformer.aggregatorTransformer(new NamedXContentRegistry(searchModule.getNamedXContents()))
.fromMap(agg);
Exception parsingException = null;
if (randomBoolean()) {
aggs = null;
parsingException = new ElasticsearchException("bad configs");
}
return new AggProvider(agg, aggs, parsingException, randomBoolean());
} catch (IOException ex) {
fail(ex.getMessage());
}
return null;
}

@Override
protected AggProvider mutateInstanceForVersion(AggProvider instance, Version version) {
if (version.onOrBefore(Version.V_8_0_0)) {
return new AggProvider(instance.getAggs(), instance.getParsedAggs(), instance.getParsingException(), false);
}
return instance;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,8 @@ public void testSerializationOfComplexAggsBetweenVersions() throws IOException {
datafeedUpdateBuilder.setAggregations(new AggProvider(
XContentObjectTransformer.aggregatorTransformer(xContentRegistry()).toMap(aggs),
aggs,
null));
null,
false));
// So equality check between the streamed and current passes
// Streamed DatafeedConfigs when they are before 6.6.0 require a parsed object for aggs and queries, consequently all the default
// values are added between them
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@
import org.elasticsearch.xpack.ml.action.TransportUpdateProcessAction;
import org.elasticsearch.xpack.ml.action.TransportValidateDetectorAction;
import org.elasticsearch.xpack.ml.action.TransportValidateJobConfigAction;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfigAutoUpdater;
import org.elasticsearch.xpack.ml.datafeed.DatafeedJobBuilder;
import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
Expand Down Expand Up @@ -680,6 +681,9 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
MlAssignmentNotifier mlAssignmentNotifier = new MlAssignmentNotifier(anomalyDetectionAuditor, dataFrameAnalyticsAuditor, threadPool,
new MlConfigMigrator(settings, client, clusterService, indexNameExpressionResolver), clusterService);

MlAutoUpdateService mlAutoUpdateService = new MlAutoUpdateService(threadPool,
clusterService,
List.of(new DatafeedConfigAutoUpdater(datafeedConfigProvider)));
// this object registers as a license state listener, and is never removed, so there's no need to retain another reference to it
final InvalidLicenseEnforcer enforcer =
new InvalidLicenseEnforcer(getLicenseState(), threadPool, datafeedManager, autodetectProcessManager);
Expand All @@ -705,6 +709,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
dataFrameAnalyticsAuditor,
inferenceAuditor,
mlAssignmentNotifier,
mlAutoUpdateService,
memoryTracker,
analyticsProcessManager,
memoryEstimationProcessManager,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.ml;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

public class MlAutoUpdateService implements ClusterStateListener {
private static final Logger logger = LogManager.getLogger(MlAutoUpdateService.class);

public interface UpdateAction {
boolean minNodeSupported(Version minNodeVersion);
String getName();
void runUpdate();
}

private final List<UpdateAction> updateActions;
private final Set<String> currentlyUpdating;
private final Set<String> completedUpdates;
private final ThreadPool threadPool;

public MlAutoUpdateService(ThreadPool threadPool, ClusterService clusterService, List<UpdateAction> updateActions) {
clusterService.addListener(this);
benwtrent marked this conversation as resolved.
Show resolved Hide resolved
this.updateActions = updateActions;
this.completedUpdates = ConcurrentHashMap.newKeySet();
this.currentlyUpdating = ConcurrentHashMap.newKeySet();
this.threadPool = threadPool;
}

@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.localNodeMaster() == false) {
return;
}

Version minNodeVersion = event.state().getNodes().getMinNodeVersion();
final List<UpdateAction> toRun = updateActions.stream()
.filter(action -> action.minNodeSupported(minNodeVersion))
.filter(action -> completedUpdates.contains(action.getName()) == false)
.filter(action -> currentlyUpdating.add(action.getName()))
.collect(Collectors.toList());
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(
() -> toRun.forEach(this::runUpdate)
);
}

private void runUpdate(UpdateAction action) {
try {
logger.debug(() -> new ParameterizedMessage("[{}] starting executing update action", action.getName()));
action.runUpdate();
this.completedUpdates.add(action.getName());
logger.debug(() -> new ParameterizedMessage("[{}] succeeded executing update action", action.getName()));
} catch (Exception ex) {
logger.warn(new ParameterizedMessage("[{}] failure executing update action", action.getName()), ex);
} finally {
this.currentlyUpdating.remove(action.getName());
logger.debug(() -> new ParameterizedMessage("[{}] no longer executing update action", action.getName()));
}
}

}
Loading