Skip to content

Commit

Permalink
Catch correct exception from x-pack-less cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
davidkyle committed Jun 13, 2018
1 parent 2f8edfc commit 64fe766
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
* Datafeed configuration options. Describes where to proactively pull input
Expand Down Expand Up @@ -222,15 +221,6 @@ public List<String> getIndices() {
return indices;
}

/**
* Get any indices used for cross cluster search
* that are on a remote cluster.
* @return List of remote cluster indices
*/
public List<String> getRemoteIndices() {
return indices.stream().filter(index -> index.indexOf(':') != -1).collect(Collectors.toList());
}

public List<String> getTypes() {
return types;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,18 +493,6 @@ public void testDefaultFrequency_GivenAggregationsWithHistogramInterval_1_Hour()
assertEquals(TimeValue.timeValueHours(1), datafeed.defaultFrequency(TimeValue.timeValueHours(12)));
}

public void testGetRemoteIndices() {
DatafeedConfig.Builder builder = new DatafeedConfig.Builder("datafeed1", "job1");
builder.setIndices(Collections.singletonList("local-index"));

DatafeedConfig config = builder.build();
assertThat(config.getRemoteIndices(), is(empty()));

builder.setIndices(Arrays.asList("local-index", "remote-cluster:index1", "local-index2", "remote-cluster2:index1"));
config = builder.build();
assertThat(config.getRemoteIndices(), containsInAnyOrder("remote-cluster:index1", "remote-cluster2:index1"));
}

public static String randomValidDatafeedId() {
CodepointSetGenerator generator = new CodepointSetGenerator("abcdefghijklmnopqrstuvwxyz".toCharArray());
return generator.ofCodePointsLength(random(), 10, 10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ public void onFailure(Exception e) {
createDataExtractor(job, datafeed, params, waitForTaskListener);
}
},
e -> listener.onFailure(createUnknownLicenseError(datafeed.getId(), datafeed.getRemoteIndices(), e))
e -> listener.onFailure(createUnknownLicenseError(datafeed.getId(),
MlRemoteLicenseChecker.remoteIndices(datafeed.getIndices()), e))
));
} else {
createDataExtractor(job, datafeed, params, waitForTaskListener);
Expand Down Expand Up @@ -244,7 +245,7 @@ private ElasticsearchStatusException createUnknownLicenseError(String datafeedId
+ " indices on a remote cluster " + remoteIndices
+ " but the license type could not be verified";

return new ElasticsearchStatusException(message, RestStatus.BAD_REQUEST, cause);
return new ElasticsearchStatusException(message, RestStatus.BAD_REQUEST, new Exception(cause.getMessage()));
}

public static class StartDatafeedPersistentTasksExecutor extends PersistentTasksExecutor<StartDatafeedAction.DatafeedParams> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ private AssignmentFailure verifyIndicesActive(DatafeedConfig datafeed) {
List<String> indices = datafeed.getIndices();
for (String index : indices) {

if (isRemoteIndex(index)) {
if (MlRemoteLicenseChecker.isRemoteIndex(index)) {
// We cannot verify remote indices
continue;
}
Expand Down Expand Up @@ -122,10 +122,6 @@ private AssignmentFailure verifyIndicesActive(DatafeedConfig datafeed) {
return null;
}

private boolean isRemoteIndex(String index) {
return index.indexOf(':') != -1;
}

private static class AssignmentFailure {
private final String reason;
private final boolean isCriticalForTaskCreation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.indices.InvalidIndexNameException;
import org.elasticsearch.license.License;
import org.elasticsearch.license.XPackInfoResponse;
import org.elasticsearch.transport.ActionNotFoundTransportException;
import org.elasticsearch.xpack.core.action.XPackInfoAction;
import org.elasticsearch.xpack.core.action.XPackInfoRequest;

Expand All @@ -27,15 +27,15 @@
/**
* ML datafeeds can use cross cluster search to access data in a remote cluster.
* The remote cluster should be licenced for ML this class performs that check
* using the _xpack endpoint.
* using the _xpack (info) endpoint.
*/
public class MlRemoteLicenseChecker {

private final Client client;

public static class RemoteClusterLicenseInfo {
private String clusterName;
private XPackInfoResponse.LicenseInfo licenseInfo;
private final String clusterName;
private final XPackInfoResponse.LicenseInfo licenseInfo;

RemoteClusterLicenseInfo(String clusterName, XPackInfoResponse.LicenseInfo licenseInfo) {
this.clusterName = clusterName;
Expand Down Expand Up @@ -73,8 +73,8 @@ public MlRemoteLicenseChecker(Client client) {

/**
* Check each cluster is licensed for ML.
* This function terminates early when the first cluster that is not licensed
* is found or an error occurs.
* This function evaluates lazily and will terminate when the first cluster
* that is not licensed is found or an error occurs.
*
* @param clusterNames List of remote cluster names
* @param listener Response listener
Expand Down Expand Up @@ -108,7 +108,7 @@ public void onResponse(XPackInfoResponse xPackInfoResponse) {
@Override
public void onFailure(Exception e) {
String message = "Could not determine the X-Pack licence type for cluster [" + clusterName.get() + "]";
if (e instanceof InvalidIndexNameException) {
if (e instanceof ActionNotFoundTransportException) {
// This is likely to be because x-pack is not installed in the target cluster
message += ". Is X-Pack installed on the target cluster?";
}
Expand Down Expand Up @@ -139,14 +139,23 @@ static boolean licenseSupportsML(XPackInfoResponse.LicenseInfo licenseInfo) {
(mode == License.OperationMode.PLATINUM || mode == License.OperationMode.TRIAL);
}

private static boolean isRemoteIndex(String index) {
public static boolean isRemoteIndex(String index) {
return index.indexOf(':') != -1;
}

public static boolean containsRemoteIndex(List<String> indices) {
return indices.stream().anyMatch(MlRemoteLicenseChecker::isRemoteIndex);
}

/**
* Get any remote indices used in cross cluster search.
* Remote indices are of the form {@code cluster_name:index_name}
* @return List of remote cluster indices
*/
public static List<String> remoteIndices(List<String> indices) {
return indices.stream().filter(index -> index.indexOf(':') != -1).collect(Collectors.toList());
}

/**
* Extract the list of remote cluster names from the list of indices.
* @param indices List of indices. Remote cluster indices are prefixed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,19 @@
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.action.XPackInfoAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.same;
Expand All @@ -43,6 +47,13 @@ public void testIsRemoteIndex() {
assertTrue(MlRemoteLicenseChecker.containsRemoteIndex(indices));
}

public void testRemoteIndices() {
List<String> indices = Collections.singletonList("local-index");
assertThat(MlRemoteLicenseChecker.remoteIndices(indices), is(empty()));
indices = Arrays.asList("local-index", "remote-cluster:index1", "local-index2", "remote-cluster2:index1");
assertThat(MlRemoteLicenseChecker.remoteIndices(indices), containsInAnyOrder("remote-cluster:index1", "remote-cluster2:index1"));
}

public void testRemoteClusterNames() {
List<String> indices = Arrays.asList("local-index1", "local-index2");
assertThat(MlRemoteLicenseChecker.remoteClusterNames(indices), empty());
Expand Down

0 comments on commit 64fe766

Please sign in to comment.