diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/monitoring/action/MonitoringMigrateAlertsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/monitoring/action/MonitoringMigrateAlertsAction.java new file mode 100644 index 0000000000000..72851840a1167 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/monitoring/action/MonitoringMigrateAlertsAction.java @@ -0,0 +1,18 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.monitoring.action; + +import org.elasticsearch.action.ActionType; + +public class MonitoringMigrateAlertsAction extends ActionType { + + public static final MonitoringMigrateAlertsAction INSTANCE = new MonitoringMigrateAlertsAction(); + public static final String NAME = "cluster:admin/xpack/monitoring/migrate/alerts"; + + public MonitoringMigrateAlertsAction() { + super(NAME, MonitoringMigrateAlertsResponse::new); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/monitoring/action/MonitoringMigrateAlertsRequest.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/monitoring/action/MonitoringMigrateAlertsRequest.java new file mode 100644 index 0000000000000..62b3596ffbbcd --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/monitoring/action/MonitoringMigrateAlertsRequest.java @@ -0,0 +1,27 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.monitoring.action; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.master.MasterNodeRequest; +import org.elasticsearch.common.io.stream.StreamInput; + +import java.io.IOException; + +public class MonitoringMigrateAlertsRequest extends MasterNodeRequest { + + public MonitoringMigrateAlertsRequest() {} + + public MonitoringMigrateAlertsRequest(StreamInput in) throws IOException { + super(in); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/monitoring/action/MonitoringMigrateAlertsResponse.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/monitoring/action/MonitoringMigrateAlertsResponse.java new file mode 100644 index 0000000000000..0a85edc631eb4 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/monitoring/action/MonitoringMigrateAlertsResponse.java @@ -0,0 +1,159 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.monitoring.action; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; + +public class MonitoringMigrateAlertsResponse extends ActionResponse implements ToXContentObject { + + private final List exporters; + + public MonitoringMigrateAlertsResponse(List exporters) { + this.exporters = exporters; + } + + public MonitoringMigrateAlertsResponse(StreamInput in) throws IOException { + super(in); + this.exporters = in.readList(ExporterMigrationResult::new); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeList(exporters); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + return builder.startObject() + .array("exporters", exporters) + .endObject(); + } + + public List getExporters() { + return exporters; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + MonitoringMigrateAlertsResponse response = (MonitoringMigrateAlertsResponse) o; + return Objects.equals(exporters, response.exporters); + } + + @Override + public int hashCode() { + return Objects.hash(exporters); + } + + @Override + public String toString() { + return "MonitoringMigrateAlertsResponse{" + + "exporters=" + exporters + + '}'; + } + + public static class ExporterMigrationResult implements Writeable, ToXContentObject { + + private final String name; + private final String type; + private final boolean migrationComplete; + private final Exception reason; + + public ExporterMigrationResult(String name, String type, boolean migrationComplete, Exception reason) { + this.name = name; + this.type = type; + this.migrationComplete = migrationComplete; + this.reason = reason; + } + + public ExporterMigrationResult(StreamInput in) throws IOException { + this.name = in.readString(); + this.type = in.readString(); + this.migrationComplete = in.readBoolean(); + this.reason = in.readException(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(name); + out.writeString(type); + out.writeBoolean(migrationComplete); + out.writeException(reason); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + { + builder.field("name", name); + builder.field("type", type); + builder.field("migration_complete", migrationComplete); + if (reason != null) { + builder.startObject("reason"); + ElasticsearchException.generateThrowableXContent(builder, params, reason); + builder.endObject(); + } + } + return builder.endObject(); + } + + public String getName() { + return name; + } + + public String getType() { + return type; + } + + public boolean isMigrationComplete() { + return migrationComplete; + } + + @Nullable + public Exception getReason() { + return reason; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ExporterMigrationResult that = (ExporterMigrationResult) o; + return migrationComplete == that.migrationComplete && + Objects.equals(name, that.name) && + Objects.equals(type, that.type); + } + + @Override + public int hashCode() { + return Objects.hash(name, type, migrationComplete); + } + + @Override + public String toString() { + return "ExporterMigrationResult{" + + "name='" + name + '\'' + + ", type='" + type + '\'' + + ", migrationComplete=" + migrationComplete + + ", reason=" + reason + + '}'; + } + } +} diff --git a/x-pack/plugin/monitoring/src/internalClusterTest/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExporterIT.java b/x-pack/plugin/monitoring/src/internalClusterTest/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExporterIT.java index 9ed1e90e3c67a..523f3de315b68 100644 --- a/x-pack/plugin/monitoring/src/internalClusterTest/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExporterIT.java +++ b/x-pack/plugin/monitoring/src/internalClusterTest/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExporterIT.java @@ -52,6 +52,7 @@ import org.elasticsearch.xpack.monitoring.exporter.ClusterAlertsUtil; import org.elasticsearch.xpack.monitoring.exporter.ExportBulk; import org.elasticsearch.xpack.monitoring.exporter.Exporter; +import org.elasticsearch.xpack.monitoring.exporter.MonitoringMigrationCoordinator; import org.elasticsearch.xpack.monitoring.test.MonitoringIntegTestCase; import org.junit.After; import org.junit.Before; @@ -99,6 +100,7 @@ public class HttpExporterIT extends MonitoringIntegTestCase { private final boolean watcherAlreadyExists = randomBoolean(); private final Environment environment = TestEnvironment.newEnvironment(Settings.builder().put("path.home", createTempDir()).build()); private final String userName = "elasticuser"; + private final MonitoringMigrationCoordinator coordinator = new MonitoringMigrationCoordinator(); private MockWebServer webServer; @@ -651,7 +653,7 @@ private HttpExporter createHttpExporter(final Settings settings) { final Exporter.Config config = new Exporter.Config("_http", "http", settings, clusterService(), TestUtils.newTestLicenseState()); - return new HttpExporter(config, new SSLService(settings, environment), new ThreadContext(settings)); + return new HttpExporter(config, new SSLService(settings, environment), new ThreadContext(settings), coordinator); } private void export(final Settings settings, final Collection docs) throws Exception { diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/Monitoring.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/Monitoring.java index bdcef93e71fa9..ee67283db62ed 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/Monitoring.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/Monitoring.java @@ -36,8 +36,10 @@ import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.monitoring.MonitoringField; import org.elasticsearch.xpack.core.monitoring.action.MonitoringBulkAction; +import org.elasticsearch.xpack.core.monitoring.action.MonitoringMigrateAlertsAction; import org.elasticsearch.xpack.core.ssl.SSLService; import org.elasticsearch.xpack.monitoring.action.TransportMonitoringBulkAction; +import org.elasticsearch.xpack.monitoring.action.TransportMonitoringMigrateAlertsAction; import org.elasticsearch.xpack.monitoring.cleaner.CleanerService; import org.elasticsearch.xpack.monitoring.collector.Collector; import org.elasticsearch.xpack.monitoring.collector.ccr.StatsCollector; @@ -50,9 +52,11 @@ import org.elasticsearch.xpack.monitoring.collector.shards.ShardsCollector; import org.elasticsearch.xpack.monitoring.exporter.Exporter; import org.elasticsearch.xpack.monitoring.exporter.Exporters; +import org.elasticsearch.xpack.monitoring.exporter.MonitoringMigrationCoordinator; import org.elasticsearch.xpack.monitoring.exporter.http.HttpExporter; import org.elasticsearch.xpack.monitoring.exporter.local.LocalExporter; import org.elasticsearch.xpack.monitoring.rest.action.RestMonitoringBulkAction; +import org.elasticsearch.xpack.monitoring.rest.action.RestMonitoringMigrateAlertsAction; import java.util.ArrayList; import java.util.Arrays; @@ -65,7 +69,6 @@ import java.util.Set; import java.util.function.Supplier; -import static java.util.Collections.singletonList; import static org.elasticsearch.common.settings.Setting.boolSetting; /** @@ -127,10 +130,12 @@ public Collection createComponents(Client client, ClusterService cluster final ClusterSettings clusterSettings = clusterService.getClusterSettings(); final CleanerService cleanerService = new CleanerService(settings, clusterSettings, threadPool, getLicenseState()); final SSLService dynamicSSLService = getSslService().createDynamicSSLService(); + final MonitoringMigrationCoordinator migrationCoordinator = new MonitoringMigrationCoordinator(); Map exporterFactories = new HashMap<>(); - exporterFactories.put(HttpExporter.TYPE, config -> new HttpExporter(config, dynamicSSLService, threadPool.getThreadContext())); - exporterFactories.put(LocalExporter.TYPE, config -> new LocalExporter(config, client, cleanerService)); + exporterFactories.put(HttpExporter.TYPE, config -> new HttpExporter(config, dynamicSSLService, threadPool.getThreadContext(), + migrationCoordinator)); + exporterFactories.put(LocalExporter.TYPE, config -> new LocalExporter(config, client, migrationCoordinator, cleanerService)); exporters = new Exporters(settings, exporterFactories, clusterService, getLicenseState(), threadPool.getThreadContext(), dynamicSSLService); @@ -147,19 +152,22 @@ public Collection createComponents(Client client, ClusterService cluster final MonitoringService monitoringService = new MonitoringService(settings, clusterService, threadPool, collectors, exporters); - return Arrays.asList(monitoringService, exporters, cleanerService); + return Arrays.asList(monitoringService, exporters, migrationCoordinator, cleanerService); } @Override public List> getActions() { - return singletonList(new ActionHandler<>(MonitoringBulkAction.INSTANCE, TransportMonitoringBulkAction.class)); + return Arrays.asList( + new ActionHandler<>(MonitoringBulkAction.INSTANCE, TransportMonitoringBulkAction.class), + new ActionHandler<>(MonitoringMigrateAlertsAction.INSTANCE, TransportMonitoringMigrateAlertsAction.class) + ); } @Override public List getRestHandlers(Settings settings, RestController restController, ClusterSettings clusterSettings, IndexScopedSettings indexScopedSettings, SettingsFilter settingsFilter, IndexNameExpressionResolver indexNameExpressionResolver, Supplier nodesInCluster) { - return singletonList(new RestMonitoringBulkAction()); + return Arrays.asList(new RestMonitoringBulkAction(), new RestMonitoringMigrateAlertsAction()); } @Override @@ -167,7 +175,6 @@ public List> getSettings() { List> settings = new ArrayList<>(); settings.add(MonitoringField.HISTORY_DURATION); settings.add(CLEAN_WATCHER_HISTORY); - settings.add(MIGRATION_DECOMMISSION_ALERTS); settings.add(MonitoringService.ENABLED); settings.add(MonitoringService.ELASTICSEARCH_COLLECTION_ENABLED); settings.add(MonitoringService.INTERVAL); @@ -181,6 +188,7 @@ public List> getSettings() { settings.add(NodeStatsCollector.NODE_STATS_TIMEOUT); settings.add(EnrichStatsCollector.STATS_TIMEOUT); settings.addAll(Exporters.getSettings()); + settings.add(Monitoring.MIGRATION_DECOMMISSION_ALERTS); return Collections.unmodifiableList(settings); } diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/action/TransportMonitoringMigrateAlertsAction.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/action/TransportMonitoringMigrateAlertsAction.java new file mode 100644 index 0000000000000..ad3c84715a289 --- /dev/null +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/action/TransportMonitoringMigrateAlertsAction.java @@ -0,0 +1,234 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.monitoring.action; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.monitoring.action.MonitoringMigrateAlertsAction; +import org.elasticsearch.xpack.core.monitoring.action.MonitoringMigrateAlertsRequest; +import org.elasticsearch.xpack.core.monitoring.action.MonitoringMigrateAlertsResponse; +import org.elasticsearch.xpack.core.monitoring.action.MonitoringMigrateAlertsResponse.ExporterMigrationResult; +import org.elasticsearch.xpack.monitoring.Monitoring; +import org.elasticsearch.xpack.monitoring.exporter.Exporter; +import org.elasticsearch.xpack.monitoring.exporter.Exporter.ExporterResourceStatus; +import org.elasticsearch.xpack.monitoring.exporter.Exporters; +import org.elasticsearch.xpack.monitoring.exporter.MonitoringMigrationCoordinator; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +public class TransportMonitoringMigrateAlertsAction extends TransportMasterNodeAction { + + private static final Logger logger = LogManager.getLogger(TransportMonitoringMigrateAlertsAction.class); + + private final Client client; + private final MonitoringMigrationCoordinator migrationCoordinator; + private final Exporters exporters; + + @Inject + public TransportMonitoringMigrateAlertsAction(Client client, Exporters exporters, MonitoringMigrationCoordinator migrationCoordinator, + TransportService transportService, ClusterService clusterService, ThreadPool threadPool, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { + super(MonitoringMigrateAlertsAction.NAME, transportService, clusterService, threadPool, actionFilters, + MonitoringMigrateAlertsRequest::new, indexNameExpressionResolver, MonitoringMigrateAlertsResponse::new, + ThreadPool.Names.MANAGEMENT); + this.client = client; + this.migrationCoordinator = migrationCoordinator; + this.exporters = exporters; + } + + @Override + protected void masterOperation(MonitoringMigrateAlertsRequest request, ClusterState state, + ActionListener listener) throws Exception { + // First, set the migration coordinator as currently running + if (migrationCoordinator.tryBlockInstallationTasks() == false) { + throw new EsRejectedExecutionException("Could not migrate cluster alerts. Migration already in progress."); + } + try { + // Wrap the listener to unblock resource installation before completing + listener = ActionListener.runBefore(listener, migrationCoordinator::unblockInstallationTasks); + Settings.Builder decommissionAlertSetting = Settings.builder().put(Monitoring.MIGRATION_DECOMMISSION_ALERTS.getKey(), true); + client.admin().cluster().prepareUpdateSettings().setPersistentSettings(decommissionAlertSetting) + .execute(completeOnManagementThread(listener)); + } catch (Exception e) { + // unblock resource installation if something fails here + migrationCoordinator.unblockInstallationTasks(); + throw e; + } + } + + private ActionListener completeOnManagementThread( + ActionListener delegate) { + // Send failures to the final listener directly, and on success, fork to management thread and execute best effort alert removal + return ActionListener.wrap( + (response) -> threadPool.executor(ThreadPool.Names.MANAGEMENT).execute( + ActionRunnable.wrap(delegate, (listener) -> afterSettingUpdate(listener, response))), + delegate::onFailure + ); + } + + /** + * Executed after the settings update has been accepted, this collects the enabled and disabled exporters, requesting each of them + * to explicitly remove their installed alerts if possible. This makes sure that alerts are removed in a timely fashion instead of + * waiting for metrics to be bulked into the monitoring cluster. + */ + private void afterSettingUpdate(ActionListener listener, + ClusterUpdateSettingsResponse clusterUpdateSettingsResponse) { + logger.info("THREAD NAME: {}" + Thread.currentThread().getName()); + + // Ensure positive result + if (!clusterUpdateSettingsResponse.isAcknowledged()) { + listener.onFailure(new ElasticsearchException("Failed to update monitoring migration settings")); + } + + // iterate over all the exporters and refresh the alerts + Collection enabledExporters = exporters.getEnabledExporters(); + Collection disabledExporterConfigs = exporters.getDisabledExporterConfigs(); + + List refreshTasks = new ArrayList<>(); + AtomicInteger remaining = new AtomicInteger(enabledExporters.size() + disabledExporterConfigs.size()); + List results = Collections.synchronizedList(new ArrayList<>(remaining.get())); + logger.debug("Exporters in need of refreshing [{}]; enabled [{}], disabled [{}]", remaining.get(), enabledExporters.size(), + disabledExporterConfigs.size()); + + for (Exporter enabledExporter : enabledExporters) { + refreshTasks.add(ActionRunnable.wrap( + resultCollector(enabledExporter.config(), listener, remaining, results), + (resultCollector) -> deleteAlertsFromOpenExporter(enabledExporter, resultCollector))); + } + for (Exporter.Config disabledExporter : disabledExporterConfigs) { + refreshTasks.add(ActionRunnable.wrap( + resultCollector(disabledExporter, listener, remaining, results), + (resultCollector) -> deleteAlertsFromDisabledExporter(disabledExporter, resultCollector))); + } + for (Runnable refreshTask : refreshTasks) { + threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(refreshTask); + } + } + + /** + * Create an action listener that will collect results and finish the request once all results are in. + * @param exporterConfig The exporter being refreshed in this operation (in case of failure) + * @param listener The listener to call after all refresh operations are complete + * @param remaining The counter used to determine if any other operations are in flight + * @param results A thread-safe collection to hold results + */ + private ActionListener resultCollector(final Exporter.Config exporterConfig, + final ActionListener listener, + final AtomicInteger remaining, + final List results) { + return new ActionListener() { + @Override + public void onResponse(ExporterResourceStatus exporterResourceStatus) { + addStatus(exporterResourceStatus); + } + + @Override + public void onFailure(Exception e) { + // Need the exporter name and type here for reporting purposes. Maybe we need to make multiple of these listeners + addStatus(ExporterResourceStatus.notReady(exporterConfig.name(), exporterConfig.type(), e)); + } + + private void addStatus(ExporterResourceStatus exporterResourceStatus) { + results.add(exporterResourceStatus); + int tasksRemaining = remaining.decrementAndGet(); + if (tasksRemaining == 0) { + finalResult(); + } + } + + private void finalResult() { + try { + List collectedResults = results.stream().map(status -> + new ExporterMigrationResult( + status.getExporterName(), + status.getExporterType(), + status.isComplete(), + compileReason(status)) + ).collect(Collectors.toList()); + MonitoringMigrateAlertsResponse response = new MonitoringMigrateAlertsResponse(collectedResults); + listener.onResponse(response); + } catch (Exception e) { + // Make this self contained, we don't want to bubble up exceptions in a way where this listener's + // onFailure method could be called multiple times. + listener.onFailure(e); + } + } + + private Exception compileReason(ExporterResourceStatus status) { + // The reason for unsuccessful setup could be multiple exceptions: one or more watches + // may fail to be removed for any reason. + List exceptions = status.getExceptions(); + if (exceptions == null || exceptions.size() == 0) { + return null; + } else if (exceptions.size() == 1) { + return exceptions.get(0); + } else { + // Set first exception as the cause, and the rest as suppressed under it. + Exception head = new ElasticsearchException("multiple errors occurred during migration", exceptions.get(0)); + List tail = exceptions.subList(1, exceptions.size()); + return tail.stream().reduce(head, ExceptionsHelper::useOrSuppress); + } + } + }; + } + + /** + * Attempts to migrate a given exporter's alerts + * @param exporter The exporter to migrate + * @param listener Notified of success or failure + */ + private void deleteAlertsFromOpenExporter(Exporter exporter, ActionListener listener) { + assert exporter.isOpen(); + try { + exporter.removeAlerts(status -> { + logger.debug("exporter [{}]: completed setup with status [{}]", exporter.config().name(), status.isComplete()); + // Exporter completed its setup (teardown) successfully or unsuccessfully. + listener.onResponse(status); + }); + } catch (Exception e) { + logger.debug("exporter [" + exporter.config().name() + "]: exception encountered during refresh", e); + listener.onFailure(e); + } + } + + /** + * Opens a disabled exporter in order to migrate it (best-effort), then makes sure it is closed at completion. + */ + private void deleteAlertsFromDisabledExporter(Exporter.Config exporterConf, ActionListener listener) { + Exporter disabledExporter = exporters.openExporter(exporterConf); + deleteAlertsFromOpenExporter(disabledExporter, ActionListener.runBefore(listener, disabledExporter::close)); + } + + @Override + protected ClusterBlockException checkBlock(MonitoringMigrateAlertsRequest request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } +} diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/Exporter.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/Exporter.java index ac39be4a94827..6a35772fb2b94 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/Exporter.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/Exporter.java @@ -5,8 +5,10 @@ */ package org.elasticsearch.xpack.monitoring.exporter; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; @@ -21,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import java.util.function.Function; public abstract class Exporter implements AutoCloseable { @@ -137,6 +140,14 @@ public boolean isSingleton() { return false; } + /** + * Forces an exporter to remove cluster alerts immediately instead of waiting to do it + * lazily as part of the normal exporter setup. + * + * @param listener the listener to call with the result of the watch removal + */ + public abstract void removeAlerts(Consumer listener); + /** * Opens up a new export bulk. * @@ -144,6 +155,10 @@ public boolean isSingleton() { */ public abstract void openBulk(ActionListener listener); + public final boolean isOpen() { + return closed.get() == false; + } + protected final boolean isClosed() { return closed.get(); } @@ -218,4 +233,51 @@ public interface Factory { /** Create an exporter with the given configuration. */ Exporter create(Config config); } + + public static class ExporterResourceStatus { + private final String exporterName; + private final String exporterType; + private final boolean complete; + private final List exceptions; + + public ExporterResourceStatus(String exporterName, String exporterType, boolean complete, List exceptions) { + this.exporterName = exporterName; + this.exporterType = exporterType; + this.complete = complete; + this.exceptions = exceptions; + } + + public static ExporterResourceStatus ready(String exporterName, String exporterType) { + return new ExporterResourceStatus(exporterName, exporterType, true, null); + } + + public static ExporterResourceStatus notReady(String exporterName, String exporterType, String reason, Object... args) { + return notReady(exporterName, exporterType, new ElasticsearchException(reason, args)); + } + + public static ExporterResourceStatus notReady(String exporterName, String exporterType, Exception reason) { + return new ExporterResourceStatus(exporterName, exporterType, false, Collections.singletonList(reason)); + } + + public static ExporterResourceStatus determineReadiness(String exporterName, String exporterType, List exceptions) { + return new ExporterResourceStatus(exporterName, exporterType, exceptions.size() <= 0, exceptions); + } + + public String getExporterName() { + return exporterName; + } + + public String getExporterType() { + return exporterType; + } + + public boolean isComplete() { + return complete; + } + + @Nullable + public List getExceptions() { + return exceptions; + } + } } diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/Exporters.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/Exporters.java index 7e1df36b4ba0b..08263a6f5d4a0 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/Exporters.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/Exporters.java @@ -47,6 +47,7 @@ public class Exporters extends AbstractLifecycleComponent { private final Settings settings; private final Map factories; private final AtomicReference> exporters; + private final AtomicReference> disabledExporterConfigs; private final ClusterService clusterService; private final XPackLicenseState licenseState; private final ThreadContext threadContext; @@ -57,6 +58,7 @@ public Exporters(Settings settings, Map factories, this.settings = settings; this.factories = factories; this.exporters = new AtomicReference<>(emptyMap()); + this.disabledExporterConfigs = new AtomicReference<>(emptyMap()); this.threadContext = Objects.requireNonNull(threadContext); this.clusterService = Objects.requireNonNull(clusterService); this.licenseState = Objects.requireNonNull(licenseState); @@ -73,16 +75,30 @@ public Exporters(Settings settings, Map factories, } } + static class InitializedExporters { + final Map enabledExporters; + final Map disabledExporters; + + InitializedExporters(Map enabledExporters, Map disabledExporters) { + this.enabledExporters = enabledExporters; + this.disabledExporters = disabledExporters; + } + } + public void setExportersSetting(Settings exportersSetting) { if (this.lifecycle.started()) { - Map updated = initExporters(exportersSetting); + InitializedExporters exporters = initExporters(exportersSetting); + Map updated = exporters.enabledExporters; closeExporters(logger, this.exporters.getAndSet(updated)); + this.disabledExporterConfigs.getAndSet(exporters.disabledExporters); } } @Override protected void doStart() { - exporters.set(initExporters(settings)); + InitializedExporters exporters = initExporters(settings); + this.exporters.set(exporters.enabledExporters); + this.disabledExporterConfigs.set(exporters.disabledExporters); } @Override @@ -107,6 +123,28 @@ public Collection getEnabledExporters() { return exporters.get().values(); } + /** + * Get all disabled {@linkplain Exporter.Config}s. + * + * @return Never {@code null}. Can be empty if none are disabled. + */ + public Collection getDisabledExporterConfigs() { + return disabledExporterConfigs.get().values(); + } + + /** + * Attempt to construct a one-off exporter, separate from the list of enabled exporters. + */ + public Exporter openExporter(Exporter.Config config) { + String name = config.name(); + String type = config.type(); + Exporter.Factory factory = factories.get(type); + if (factory == null) { + throw new SettingsException("unknown exporter type [" + type + "] set for exporter [" + name + "]"); + } + return factory.create(config); + } + static void closeExporters(Logger logger, Map exporters) { for (Exporter exporter : exporters.values()) { try { @@ -117,9 +155,10 @@ static void closeExporters(Logger logger, Map exporters) { } } - Map initExporters(Settings settings) { + InitializedExporters initExporters(Settings settings) { Set singletons = new HashSet<>(); Map exporters = new HashMap<>(); + Map disabled = new HashMap<>(); boolean hasDisabled = false; Settings exportersSettings = settings.getByPrefix("xpack.monitoring.exporters."); for (String name : exportersSettings.names()) { @@ -138,6 +177,7 @@ Map initExporters(Settings settings) { if (logger.isDebugEnabled()) { logger.debug("exporter [{}/{}] is disabled", type, name); } + disabled.put(config.name(), config); continue; } Exporter exporter = factory.create(config); @@ -164,7 +204,7 @@ Map initExporters(Settings settings) { exporters.put(config.name(), factories.get(LocalExporter.TYPE).create(config)); } - return exporters; + return new InitializedExporters(exporters, disabled); } /** diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/MonitoringMigrationCoordinator.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/MonitoringMigrationCoordinator.java new file mode 100644 index 0000000000000..bac0cb87ae946 --- /dev/null +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/MonitoringMigrationCoordinator.java @@ -0,0 +1,34 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.monitoring.exporter; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * A shared coordination object for pausing execution of exporter installation tasks when a migration that involves them is in progress + */ +public class MonitoringMigrationCoordinator { + + // True value signals a migration is in progress + private final AtomicBoolean migrationBlock; + + public MonitoringMigrationCoordinator() { + this.migrationBlock = new AtomicBoolean(false); + } + + public boolean tryBlockInstallationTasks() throws InterruptedException { + return migrationBlock.compareAndSet(false, true); + } + + public void unblockInstallationTasks() { + migrationBlock.set(false); + } + + public boolean canInstall() { + return migrationBlock.get() == false; + } +} diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/ClusterAlertHttpResource.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/ClusterAlertHttpResource.java index b4d28bbc09988..30949bf817290 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/ClusterAlertHttpResource.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/ClusterAlertHttpResource.java @@ -103,7 +103,7 @@ protected void doCheck(final RestClient client, final ActionListener li * Publish the missing {@linkplain #watchId Watch}. */ @Override - protected void doPublish(final RestClient client, final ActionListener listener) { + protected void doPublish(final RestClient client, final ActionListener listener) { putResource(client, listener, logger, "/_watcher/watch", watchId.get(), Collections.emptyMap(), this::watchToHttpEntity, "monitoring cluster alert", resourceOwnerName, "monitoring cluster"); diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExporter.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExporter.java index 2fdf80e5d0484..0e4963ca19c18 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExporter.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExporter.java @@ -15,6 +15,7 @@ import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.RestClient; @@ -47,6 +48,7 @@ import org.elasticsearch.xpack.monitoring.exporter.ClusterAlertsUtil; import org.elasticsearch.xpack.monitoring.exporter.ExportBulk; import org.elasticsearch.xpack.monitoring.exporter.Exporter; +import org.elasticsearch.xpack.monitoring.exporter.MonitoringMigrationCoordinator; import javax.net.ssl.SSLContext; import java.util.ArrayList; @@ -59,6 +61,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -411,24 +414,51 @@ public Iterator> settings() { */ private final HttpResource resource; + /** + * {@link HttpResource} for setting up or tearing down cluster alerts specifically. + */ + private final HttpResource alertingResource; + /** * Track whether cluster alerts are allowed or not between requests. This allows us to avoid wiring a listener and to lazily change it. */ private final AtomicBoolean clusterAlertsAllowed = new AtomicBoolean(false); + /** + * A barrier object to keep the exporter from installing or operating during a migration operation. + */ + private final MonitoringMigrationCoordinator migrationCoordinator; + private static final ConcurrentHashMap SECURE_AUTH_PASSWORDS = new ConcurrentHashMap<>(); private final ThreadContext threadContext; private final DateFormatter dateTimeFormatter; private final ClusterStateListener onLocalMasterListener; + + /** + * Helper class to separate all resources from just watcher resources + */ + static class Resources { + MultiHttpResource allResources; + HttpResource alertingResource; + + Resources(MultiHttpResource allResources, HttpResource alertingResource) { + this.allResources = allResources; + this.alertingResource = alertingResource; + } + } + /** * Create an {@link HttpExporter}. * * @param config The HTTP Exporter's configuration * @param sslService The SSL Service used to create the SSL Context necessary for TLS / SSL communication + * @param threadContext The thread context that should be used for async operations + * @param migrationCoordinator The shared coordinator for determining monitoring migrations in progress * @throws SettingsException if any setting is malformed */ - public HttpExporter(final Config config, final SSLService sslService, final ThreadContext threadContext) { - this(config, sslService, threadContext, new NodeFailureListener(), createResources(config)); + public HttpExporter(final Config config, final SSLService sslService, final ThreadContext threadContext, + MonitoringMigrationCoordinator migrationCoordinator) { + this(config, sslService, threadContext, migrationCoordinator, new NodeFailureListener(), createResources(config)); } /** @@ -436,12 +466,35 @@ public HttpExporter(final Config config, final SSLService sslService, final Thre * * @param config The HTTP Exporter's configuration * @param sslService The SSL Service used to create the SSL Context necessary for TLS / SSL communication + * @param threadContext The thread context that should be used for async operations + * @param migrationCoordinator The shared coordinator for determining monitoring migrations in progress * @param listener The node failure listener used to notify an optional sniffer and resources + * @param resource Both the resource for all things required for bulk operations and those for just cluster alerts * @throws SettingsException if any setting is malformed */ - HttpExporter(final Config config, final SSLService sslService, final ThreadContext threadContext, final NodeFailureListener listener, - final HttpResource resource) { - this(config, createRestClient(config, sslService, listener), threadContext, listener, resource); + private HttpExporter(final Config config, final SSLService sslService, final ThreadContext threadContext, + final MonitoringMigrationCoordinator migrationCoordinator, final NodeFailureListener listener, + final Resources resource) { + this(config, sslService, threadContext, migrationCoordinator, listener, resource.allResources, resource.alertingResource); + } + + /** + * Create an {@link HttpExporter}. + * + * @param config The HTTP Exporter's configuration + * @param sslService The SSL Service used to create the SSL Context necessary for TLS / SSL communication + * @param threadContext The thread context that should be used for async operations + * @param migrationCoordinator The shared coordinator for determining monitoring migrations in progress + * @param listener The node failure listener used to notify an optional sniffer and resources + * @param resource Blocking HTTP resource to prevent bulks until all requirements are met + * @param alertingResource The HTTP resource used to configure cluster alerts + * @throws SettingsException if any setting is malformed + */ + HttpExporter(final Config config, final SSLService sslService, final ThreadContext threadContext, + final MonitoringMigrationCoordinator migrationCoordinator, final NodeFailureListener listener, + final HttpResource resource, final HttpResource alertingResource) { + this(config, createRestClient(config, sslService, listener), threadContext, migrationCoordinator, listener, resource, + alertingResource); } /** @@ -449,12 +502,18 @@ public HttpExporter(final Config config, final SSLService sslService, final Thre * * @param config The HTTP Exporter's configuration * @param client The REST Client used to make all requests to the remote Elasticsearch cluster + * @param threadContext The thread context that should be used for async operations + * @param migrationCoordinator The shared coordinator for determining monitoring migrations in progress * @param listener The node failure listener used to notify an optional sniffer and resources + * @param resource Blocking HTTP resource to prevent bulks until all requirements are met + * @param alertingResource The HTTP resource used to configure cluster alerts * @throws SettingsException if any setting is malformed */ - HttpExporter(final Config config, final RestClient client, final ThreadContext threadContext, final NodeFailureListener listener, - final HttpResource resource) { - this(config, client, createSniffer(config, client, listener), threadContext, listener, resource); + HttpExporter(final Config config, final RestClient client, final ThreadContext threadContext, + final MonitoringMigrationCoordinator migrationCoordinator, final NodeFailureListener listener, + final HttpResource resource, final HttpResource alertingResource) { + this(config, client, createSniffer(config, client, listener), threadContext, migrationCoordinator, listener, resource, + alertingResource); } /** @@ -463,19 +522,25 @@ public HttpExporter(final Config config, final SSLService sslService, final Thre * @param config The HTTP Exporter's configuration * @param client The REST Client used to make all requests to the remote Elasticsearch cluster * @param sniffer The optional sniffer, which has already been associated with the {@code listener} + * @param threadContext The thread context that should be used for async operations + * @param migrationCoordinator The shared coordinator for determining monitoring migrations in progress * @param listener The node failure listener used to notify resources * @param resource Blocking HTTP resource to prevent bulks until all requirements are met + * @param alertingResource The HTTP resource used to configure cluster alerts * @throws SettingsException if any setting is malformed */ HttpExporter(final Config config, final RestClient client, @Nullable final Sniffer sniffer, final ThreadContext threadContext, - final NodeFailureListener listener, final HttpResource resource) { + final MonitoringMigrationCoordinator migrationCoordinator, final NodeFailureListener listener, + final HttpResource resource, final HttpResource alertingResource) { super(config); this.client = Objects.requireNonNull(client); this.sniffer = sniffer; this.resource = resource; + this.alertingResource = alertingResource; this.defaultParams = createDefaultParams(config); this.threadContext = threadContext; + this.migrationCoordinator = migrationCoordinator; this.dateTimeFormatter = dateTimeFormatter(config); // mark resources as dirty after any node failure or license change @@ -598,7 +663,7 @@ static Sniffer createSniffer(final Config config, final RestClient client, final * @param config The HTTP Exporter's configuration * @return Never {@code null}. */ - static MultiHttpResource createResources(final Config config) { + static Resources createResources(final Config config) { final String resourceOwnerName = "xpack.monitoring.exporters." + config.name(); // order controls the order that each is checked; more direct checks should always happen first (e.g., version checks) final List resources = new ArrayList<>(); @@ -611,9 +676,12 @@ static MultiHttpResource createResources(final Config config) { configurePipelineResources(config, resourceOwnerName, resources); // load the watches for cluster alerts if Watcher is available - configureClusterAlertsResources(config, resourceOwnerName, resources); + final HttpResource alertingResource = configureClusterAlertsResources(config, resourceOwnerName); + if (alertingResource != null) { + resources.add(alertingResource); + } - return new MultiHttpResource(resourceOwnerName, resources); + return new Resources(new MultiHttpResource(resourceOwnerName, resources), alertingResource); } /** @@ -897,10 +965,8 @@ private static void configurePipelineResources(final Config config, final String * * @param config The HTTP Exporter's configuration * @param resourceOwnerName The resource owner name to display for any logging messages. - * @param resources The resources to add too. */ - private static void configureClusterAlertsResources(final Config config, final String resourceOwnerName, - final List resources) { + private static HttpResource configureClusterAlertsResources(final Config config, final String resourceOwnerName) { // don't create watches if we're not using them if (CLUSTER_ALERTS_MANAGEMENT_SETTING.getConcreteSettingForNamespace(config.name()).get(config.settings())) { final ClusterService clusterService = config.clusterService(); @@ -918,9 +984,37 @@ private static void configureClusterAlertsResources(final Config config, final S } // wrap the watches in a conditional resource check to ensure the remote cluster has watcher available / enabled - resources.add(new WatcherExistsHttpResource(resourceOwnerName, clusterService, - new MultiHttpResource(resourceOwnerName, watchResources))); + return new WatcherExistsHttpResource(resourceOwnerName, clusterService, + new MultiHttpResource(resourceOwnerName, watchResources)); } + return null; + } + + @Override + public void removeAlerts(Consumer listener) { + alertingResource.checkAndPublish(client, ActionListener.wrap( + (result) -> { + ExporterResourceStatus status; + if (result.isSuccess()) { + status = ExporterResourceStatus.ready(name(), TYPE); + } else { + switch (result.getResourceState()) { + case CLEAN: + status = ExporterResourceStatus.ready(name(), TYPE); + break; + case CHECKING: + case DIRTY: + // CHECKING should be unlikely, but in case of that, we mark it as not ready + status = ExporterResourceStatus.notReady(name(), TYPE, result.getReason()); + break; + default: + throw new ElasticsearchException("Illegal exporter resource status state [{}]", result.getResourceState()); + } + } + listener.accept(status); + }, + (exception) -> listener.accept(ExporterResourceStatus.notReady(name(), TYPE, exception)) + )); } @Override @@ -932,16 +1026,21 @@ public void openBulk(final ActionListener listener) { resource.markDirty(); } - resource.checkAndPublishIfDirty(client, ActionListener.wrap((success) -> { - if (success) { - final String name = "xpack.monitoring.exporters." + config.name(); - - listener.onResponse(new HttpExportBulk(name, client, defaultParams, dateTimeFormatter, threadContext)); - } else { - // we're not ready yet, so keep waiting - listener.onResponse(null); - } - }, listener::onFailure)); + if (migrationCoordinator.canInstall()) { + resource.checkAndPublishIfDirty(client, ActionListener.wrap((success) -> { + if (success) { + final String name = "xpack.monitoring.exporters." + config.name(); + + listener.onResponse(new HttpExportBulk(name, client, defaultParams, dateTimeFormatter, threadContext)); + } else { + // we're not ready yet, so keep waiting + listener.onResponse(null); + } + }, listener::onFailure)); + } else { + // we're migrating right now, so keep waiting + listener.onResponse(null); + } } @Override diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpResource.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpResource.java index 557ed9603996d..57e2dc60bf21c 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpResource.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpResource.java @@ -42,6 +42,86 @@ enum State { DIRTY } + /** + * Encapsulates the results of a check-and-publish operation, listing the operational state of the resource. + */ + public static class ResourcePublishResult { + private final boolean success; + private final String reason; + private final State resourceState; + + public ResourcePublishResult(boolean success) { + this(success, null, success ? State.CLEAN : State.DIRTY); + } + + public ResourcePublishResult(boolean success, String reason, State resourceState) { + this.success = success; + this.reason = reason; + this.resourceState = resourceState; + } + + /** + * The publish operation succeeded without any problems. + */ + public static ResourcePublishResult ready() { + return new ResourcePublishResult(true, null, State.CLEAN); + } + + /** + * The publish operation succeeded without any problems. + */ + public static ResourcePublishResult notReady(String reason) { + return new ResourcePublishResult(false, reason, State.DIRTY); + } + + /** + * The publish operation was not attempted, since another publishing operation is already in flight. + */ + public static ResourcePublishResult inProgress() { + return new ResourcePublishResult(false, null, State.CHECKING); + } + + public boolean isSuccess() { + return success; + } + + public String getReason() { + return reason; + } + + public State getResourceState() { + return resourceState; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ResourcePublishResult that = (ResourcePublishResult) o; + return success == that.success && + Objects.equals(reason, that.reason) && + resourceState == that.resourceState; + } + + @Override + public int hashCode() { + return Objects.hash(success, reason, resourceState); + } + + @Override + public String toString() { + return "ResourcePublishResult{" + + "success=" + success + + ", reason='" + reason + '\'' + + ", resourceState=" + resourceState + + '}'; + } + } + /** * The user-recognizable name for whatever owns this {@link HttpResource}. */ @@ -116,7 +196,7 @@ public final void checkAndPublishIfDirty(final RestClient client, final ActionLi if (state.get() == State.CLEAN) { listener.onResponse(true); } else { - checkAndPublish(client, listener); + checkAndPublish(client, listener.map(ResourcePublishResult::isSuccess)); } } @@ -134,18 +214,18 @@ public final void checkAndPublishIfDirty(final RestClient client, final ActionLi * @param listener Returns {@code true} if the resource is available for use. {@code false} to stop. * @see #isDirty() */ - public final void checkAndPublish(final RestClient client, final ActionListener listener) { + public final void checkAndPublish(final RestClient client, final ActionListener listener) { // we always check when asked, regardless of clean or dirty, but we do not run parallel checks if (state.getAndSet(State.CHECKING) != State.CHECKING) { - doCheckAndPublish(client, ActionListener.wrap(success -> { - state.compareAndSet(State.CHECKING, success ? State.CLEAN : State.DIRTY); - listener.onResponse(success); + doCheckAndPublish(client, ActionListener.wrap(publishResult -> { + state.compareAndSet(State.CHECKING, publishResult.success ? State.CLEAN : State.DIRTY); + listener.onResponse(publishResult); }, e -> { state.compareAndSet(State.CHECKING, State.DIRTY); listener.onFailure(e); })); } else { - listener.onResponse(false); + listener.onResponse(ResourcePublishResult.inProgress()); } } @@ -155,6 +235,6 @@ public final void checkAndPublish(final RestClient client, final ActionListener< * @param client The REST client to make the request(s). * @param listener Returns {@code true} if the resource is available for use. {@code false} to stop. */ - protected abstract void doCheckAndPublish(RestClient client, ActionListener listener); + protected abstract void doCheckAndPublish(RestClient client, ActionListener listener); } diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/MultiHttpResource.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/MultiHttpResource.java index 44bda96a82d1d..0bbc5fb3d034c 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/MultiHttpResource.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/MultiHttpResource.java @@ -60,23 +60,22 @@ public List getResources() { * Check and publish all {@linkplain #resources sub-resources}. */ @Override - protected void doCheckAndPublish(final RestClient client, final ActionListener listener) { + protected void doCheckAndPublish(final RestClient client, final ActionListener listener) { logger.trace("checking sub-resources existence and publishing on the [{}]", resourceOwnerName); final Iterator iterator = resources.iterator(); // short-circuits on the first failure, thus marking the whole thing dirty - iterator.next().checkAndPublish(client, new ActionListener() { + iterator.next().checkAndPublish(client, new ActionListener() { @Override - public void onResponse(final Boolean success) { + public void onResponse(final ResourcePublishResult publishResult) { // short-circuit on the first failure - if (success && iterator.hasNext()) { + if (publishResult.isSuccess() && iterator.hasNext()) { iterator.next().checkAndPublish(client, this); } else { - logger.trace("all sub-resources exist [{}] on the [{}]", success, resourceOwnerName); - - listener.onResponse(success); + logger.trace("all sub-resources exist [{}] on the [{}]", publishResult.isSuccess(), resourceOwnerName); + listener.onResponse(publishResult); } } diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/PipelineHttpResource.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/PipelineHttpResource.java index 8dd48c2876f42..cd7c775a5500b 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/PipelineHttpResource.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/PipelineHttpResource.java @@ -71,7 +71,7 @@ protected void doCheck(final RestClient client, final ActionListener li * Publish the current {@linkplain #pipelineName pipeline}. */ @Override - protected void doPublish(final RestClient client, final ActionListener listener) { + protected void doPublish(final RestClient client, final ActionListener listener) { putResource(client, listener, logger, "/_ingest/pipeline", pipelineName, Collections.emptyMap(), this::pipelineToHttpEntity, "monitoring pipeline", resourceOwnerName, "monitoring cluster"); diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/PublishableHttpResource.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/PublishableHttpResource.java index 4b73a71604599..ea6722bd8145a 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/PublishableHttpResource.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/PublishableHttpResource.java @@ -124,11 +124,11 @@ public Map getDefaultParameters() { * @param listener Returns {@code true} if the resource is available for use. {@code false} to stop. */ @Override - protected final void doCheckAndPublish(final RestClient client, final ActionListener listener) { + protected final void doCheckAndPublish(final RestClient client, final ActionListener listener) { doCheck(client, ActionListener.wrap(exists -> { if (exists) { // it already exists, so we can skip publishing it - listener.onResponse(true); + listener.onResponse(ResourcePublishResult.ready()); } else { doPublish(client, listener); } @@ -290,7 +290,7 @@ public void onFailure(final Exception exception) { * @param client The REST client to make the request(s). * @param listener Returns {@code true} if the resource is available to use. Otherwise {@code false}. */ - protected abstract void doPublish(RestClient client, ActionListener listener); + protected abstract void doPublish(RestClient client, ActionListener listener); /** * Upload the {@code resourceName} to the {@code resourceBasePath} endpoint. @@ -307,7 +307,7 @@ public void onFailure(final Exception exception) { * @param resourceOwnerType The type of resource owner being dealt with (e.g., "monitoring cluster"). */ protected void putResource(final RestClient client, - final ActionListener listener, + final ActionListener listener, final Logger logger, final String resourceBasePath, final String resourceName, @@ -334,7 +334,7 @@ public void onSuccess(final Response response) { if (statusCode == RestStatus.OK.getStatus() || statusCode == RestStatus.CREATED.getStatus()) { logger.debug("{} [{}] uploaded to the [{}] {}", resourceType, resourceName, resourceOwnerName, resourceOwnerType); - listener.onResponse(true); + listener.onResponse(ResourcePublishResult.ready()); } else { onFailure(new RuntimeException("[" + resourceBasePath + "/" + resourceName + "] responded with [" + statusCode + "]")); } diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/TemplateHttpResource.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/TemplateHttpResource.java index c8edf780c168c..724d7b39a757c 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/TemplateHttpResource.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/TemplateHttpResource.java @@ -93,7 +93,7 @@ protected void doCheck(final RestClient client, final ActionListener li * Publish the missing {@linkplain #templateName template}. */ @Override - protected void doPublish(final RestClient client, final ActionListener listener) { + protected void doPublish(final RestClient client, final ActionListener listener) { putResource(client, listener, logger, "/_template", templateName, Collections.emptyMap(), this::templateToHttpEntity, "monitoring template", resourceOwnerName, "monitoring cluster"); diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/VersionHttpResource.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/VersionHttpResource.java index ef89f8d62e65b..75ad9cc15cd43 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/VersionHttpResource.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/VersionHttpResource.java @@ -52,7 +52,7 @@ public VersionHttpResource(final String resourceOwnerName, final Version minimum * If it does not, then there is nothing that can be done except wait until it does. There is no publishing aspect to this operation. */ @Override - protected void doCheckAndPublish(final RestClient client, final ActionListener listener) { + protected void doCheckAndPublish(final RestClient client, final ActionListener listener) { logger.trace("checking [{}] to ensure that it supports the minimum version [{}]", resourceOwnerName, minimumVersion); final Request request = new Request("GET", "/"); @@ -86,12 +86,12 @@ public void onFailure(final Exception exception) { * {@link #minimumVersion}. * * @param response The response to parse. - * @return {@code true} if the remote cluster is running a supported version. + * @return A ready result if the remote cluster is running a supported version. * @throws NullPointerException if the response is malformed. * @throws ClassCastException if the response is malformed. * @throws IOException if any parsing issue occurs. */ - private boolean validateVersion(final Response response) throws IOException { + private ResourcePublishResult validateVersion(final Response response) throws IOException { Map map = XContentHelper.convertToMap(JsonXContent.jsonXContent, response.getEntity().getContent(), false); // the response should be filtered to just '{"version":{"number":"xyz"}}', so this is cheap and guaranteed @SuppressWarnings("unchecked") @@ -104,10 +104,11 @@ private boolean validateVersion(final Response response) throws IOException { if (version.onOrAfter(minimumVersion)) { logger.debug("version [{}] >= [{}] and supported for [{}]", version, minimumVersion, resourceOwnerName); - return true; + return ResourcePublishResult.ready(); } else { logger.error("version [{}] < [{}] and NOT supported for [{}]", version, minimumVersion, resourceOwnerName); - return false; + return ResourcePublishResult.notReady("version [" + version + "] < [" + minimumVersion + "] and NOT supported for [" + + resourceOwnerName + "]"); } } diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/WatcherExistsHttpResource.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/WatcherExistsHttpResource.java index 87aae0367a5d2..e1e6380a54d53 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/WatcherExistsHttpResource.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/WatcherExistsHttpResource.java @@ -141,7 +141,7 @@ private boolean canUseWatcher(final Response response, final XContent xContent) * Add Watches to the remote cluster. */ @Override - protected void doPublish(final RestClient client, final ActionListener listener) { + protected void doPublish(final RestClient client, final ActionListener listener) { watches.checkAndPublish(client, listener); } } diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporter.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporter.java index 7e159d8179f41..2e7ba0dfc58d4 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporter.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporter.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest; @@ -56,6 +57,7 @@ import org.elasticsearch.xpack.monitoring.exporter.ClusterAlertsUtil; import org.elasticsearch.xpack.monitoring.exporter.ExportBulk; import org.elasticsearch.xpack.monitoring.exporter.Exporter; +import org.elasticsearch.xpack.monitoring.exporter.MonitoringMigrationCoordinator; import java.time.Instant; import java.time.ZoneOffset; @@ -63,6 +65,7 @@ import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Objects; @@ -70,6 +73,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.stream.Collectors; import static org.elasticsearch.common.Strings.collectionToCommaDelimitedString; @@ -107,6 +111,7 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle private final DateFormatter dateTimeFormatter; private final List clusterAlertBlacklist; private final boolean decommissionClusterAlerts; + private final MonitoringMigrationCoordinator migrationCoordinator; private final AtomicReference state = new AtomicReference<>(State.INITIALIZED); private final AtomicBoolean installingSomething = new AtomicBoolean(false); @@ -115,7 +120,8 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle private long stateInitializedTime; - public LocalExporter(Exporter.Config config, Client client, CleanerService cleanerService) { + public LocalExporter(Exporter.Config config, Client client, MonitoringMigrationCoordinator migrationCoordinator, + CleanerService cleanerService) { super(config); this.client = client; this.clusterService = config.clusterService(); @@ -123,6 +129,7 @@ public LocalExporter(Exporter.Config config, Client client, CleanerService clean this.useIngest = USE_INGEST_PIPELINE_SETTING.getConcreteSettingForNamespace(config.name()).get(config.settings()); this.clusterAlertBlacklist = ClusterAlertsUtil.getClusterAlertsBlacklist(config); this.decommissionClusterAlerts = Monitoring.MIGRATION_DECOMMISSION_ALERTS.get(config.settings()); + this.migrationCoordinator = migrationCoordinator; this.cleanerService = cleanerService; this.dateTimeFormatter = dateTimeFormatter(config); // if additional listeners are added here, adjust LocalExporterTests#testLocalExporterRemovesListenersOnClose accordingly @@ -140,7 +147,7 @@ public void clusterChanged(ClusterChangedEvent event) { stateInitializedTime = client.threadPool().relativeTimeInMillis(); } } - if (state.get() == State.INITIALIZED) { + if (state.get() == State.INITIALIZED && migrationCoordinator.canInstall()) { resolveBulk(event.state(), true); } } @@ -167,6 +174,53 @@ boolean isExporterReady() { return running && installingSomething.get() == false && alertsProcessed; } + @Override + public void removeAlerts(Consumer listener) { + if (state.get() == State.TERMINATED) { + throw new IllegalStateException("Cannot refresh alerts on terminated exporter"); + } + + ClusterState clusterState = clusterService.state(); + if (clusterState.nodes().isLocalNodeElectedMaster()) { + // we are on the elected master + // Check that there is nothing that could block metadata updates + if (clusterState.blocks().hasGlobalBlockWithLevel(ClusterBlockLevel.METADATA_WRITE)) { + throw new ElasticsearchException("waiting until metadata writes are unblocked"); + } + + // We haven't blocked off other resource installation from happening. That must be done first. + assert migrationCoordinator.canInstall() == false : "migration attempted while resources could be erroneously installed"; + + final List asyncActions = new ArrayList<>(); + final AtomicInteger pendingResponses = new AtomicInteger(0); + final List errors = Collections.synchronizedList(new ArrayList<>()); + + removeClusterAlertsTasks(clusterState, listener, asyncActions, pendingResponses, errors); + if (asyncActions.size() > 0) { + if (installingSomething.compareAndSet(false, true)) { + pendingResponses.set(asyncActions.size()); + try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(MONITORING_ORIGIN)) { + asyncActions.forEach(Runnable::run); + } + } else { + // This shouldn't be changed by now since resource installation should be blocked, but throw an exception for sanity + throw new ElasticsearchException("exporter is busy installing resources"); + } + } else { + // Nothing to setup. Check status flags to see if anything was missed, or if anything is in flight. + if (errors.size() > 0) { + // in case we run into scenarios where resource tasks were not created for some reason (like watcher is disabled). + listener.accept(ExporterResourceStatus.determineReadiness(name(), TYPE, errors)); + } else { + // no errors reported, no tasks to run, nothing currently installing. + listener.accept(ExporterResourceStatus.ready(name(), TYPE)); + } + } + } else { + throw new ElasticsearchException("Cannot refresh alerts from nodes other than currently elected master."); + } + } + @Override public void openBulk(final ActionListener listener) { if (state.get() != State.RUNNING) { @@ -207,14 +261,8 @@ LocalBulk resolveBulk(ClusterState clusterState, boolean clusterStateChange) { return null; } - boolean setup = true; - - // elected master node needs to setup templates; non-master nodes need to wait for it to be setup - if (clusterService.state().nodes().isLocalNodeElectedMaster()) { - setup = setupIfElectedMaster(clusterState, clusterStateChange); - } else { - setup = setupIfNotElectedMaster(clusterState); - } + // When running normally we don't care so much about the final setup result, only if we need to run it again. + boolean setup = performSetup(clusterState, clusterStateChange); // any failure/delay to setup the local exporter stops it until the next pass (10s by default) if (setup == false) { @@ -231,6 +279,23 @@ LocalBulk resolveBulk(ClusterState clusterState, boolean clusterStateChange) { return new LocalBulk(name(), logger, client, dateTimeFormatter, useIngest); } + /** + * Kickstarts the set up process for the local exporter. On non leader nodes, this method is completely synchronous. On + * the leader node, this returns immediately with a boolean stating whether the setup tasks have started. Setup tasks are + * asynchronous. To determine exactly the outcome of setup tasks, an action listener can be passed in to be called after + * any asynchronous operations. + * @return true if local resources are up to date, false if they are still in progress, true on master nodes if setup has started. + */ + private boolean performSetup(ClusterState clusterState, boolean clusterStateChange) { + boolean setup;// elected master node needs to setup templates; non-master nodes need to wait for it to be setup + if (clusterService.state().nodes().isLocalNodeElectedMaster()) { + setup = setupIfElectedMaster(clusterState, clusterStateChange); + } else { + setup = setupIfNotElectedMaster(clusterState); + } + return setup; + } + /** * When not on the elected master, we require all resources (mapping types, templates, and pipelines) to be available before we * attempt to run the exporter. If those resources do not exist, then it means the elected master's exporter has not yet run, so the @@ -282,6 +347,11 @@ private boolean setupIfElectedMaster(final ClusterState clusterState, final bool return false; } + if (migrationCoordinator.canInstall() == false) { + logger.debug("already installing something, waiting for migration to complete"); + return false; + } + if (installingSomething.get()) { logger.trace("already installing something, waiting for install to complete"); return false; @@ -316,9 +386,7 @@ private boolean setupIfElectedMaster(final ClusterState clusterState, final bool final String pipelineName = pipelineName(pipelineId); logger.debug("pipeline [{}] not found", pipelineName); asyncActions.add(() -> putIngestPipeline(pipelineId, - new ResponseActionListener<>("pipeline", - pipelineName, - pendingResponses))); + new ResponseActionListener<>("pipeline", pipelineName, pendingResponses))); } } else { logger.trace("all pipelines found"); @@ -326,18 +394,8 @@ private boolean setupIfElectedMaster(final ClusterState clusterState, final bool } // avoid constantly trying to setup Watcher, which requires a lot of overhead and avoid attempting to setup during a cluster state - // change - if (state.get() == State.RUNNING && clusterStateChange == false && canUseWatcher()) { - final IndexRoutingTable watches = clusterState.routingTable().index(Watch.INDEX); - final boolean indexExists = watches != null && watches.allPrimaryShardsActive(); - - // we cannot do anything with watches until the index is allocated, so we wait until it's ready - if (watches != null && watches.allPrimaryShardsActive() == false) { - logger.trace("cannot manage cluster alerts because [.watches] index is not allocated"); - } else if ((watches == null || indexExists) && watcherSetup.compareAndSet(false, true)) { - getClusterAlertsInstallationAsyncActions(indexExists, asyncActions, pendingResponses); - } - } + // change. Provide a way to force it to initialize though. + setupClusterAlertsTasks(clusterState, clusterStateChange, asyncActions, pendingResponses); if (asyncActions.size() > 0) { if (installingSomething.compareAndSet(false, true)) { @@ -347,6 +405,7 @@ private boolean setupIfElectedMaster(final ClusterState clusterState, final bool } } else { // let the cluster catch up since requested installations may be ongoing + logger.trace("already installing something, waiting for install to complete"); return false; } } else { @@ -357,7 +416,48 @@ private boolean setupIfElectedMaster(final ClusterState clusterState, final bool return true; } - private void responseReceived(final AtomicInteger pendingResponses, final boolean success, final @Nullable AtomicBoolean setup) { + private void setupClusterAlertsTasks(ClusterState clusterState, boolean clusterStateChange, List asyncActions, + AtomicInteger pendingResponses) { + boolean shouldSetUpWatcher = state.get() == State.RUNNING && clusterStateChange == false; + if (canUseWatcher()) { + if (shouldSetUpWatcher) { + final IndexRoutingTable watches = clusterState.routingTable().index(Watch.INDEX); + final boolean indexExists = watches != null && watches.allPrimaryShardsActive(); + + // we cannot do anything with watches until the index is allocated, so we wait until it's ready + if (watches != null && watches.allPrimaryShardsActive() == false) { + logger.trace("cannot manage cluster alerts because [.watches] index is not allocated"); + } else if ((watches == null || indexExists) && watcherSetup.compareAndSet(false, true)) { + getClusterAlertsInstallationAsyncActions(indexExists, asyncActions, pendingResponses); + } + } + } + } + + private void removeClusterAlertsTasks(ClusterState clusterState, Consumer setupListener, + List asyncActions, AtomicInteger pendingResponses, List errors) { + if (canUseWatcher()) { + if (state.get() != State.TERMINATED) { + final IndexRoutingTable watches = clusterState.routingTable().index(Watch.INDEX); + final boolean indexExists = watches != null && watches.allPrimaryShardsActive(); + + // we cannot do anything with watches until the index is allocated, so we wait until it's ready + if (watches != null && watches.allPrimaryShardsActive() == false) { + errors.add(new ElasticsearchException("cannot manage cluster alerts because [.watches] index is not allocated")); + logger.trace("cannot manage cluster alerts because [.watches] index is not allocated"); + } else if ((watches == null || indexExists) && watcherSetup.compareAndSet(false, true)) { + addClusterAlertsRemovalAsyncActions(indexExists, asyncActions, pendingResponses, setupListener, errors); + } + } else { + errors.add(new ElasticsearchException("cannot manage cluster alerts because exporter is terminated")); + } + } else { + errors.add(new ElasticsearchException("cannot manage cluster alerts because alerting is disabled")); + } + } + + private void responseReceived(final AtomicInteger pendingResponses, final boolean success, final Runnable onComplete, + final @Nullable AtomicBoolean setup) { if (setup != null && success == false) { setup.set(false); } @@ -367,6 +467,7 @@ private void responseReceived(final AtomicInteger pendingResponses, final boolea if (installingSomething.compareAndSet(true, false) == false) { throw new IllegalStateException("could not reset installing flag to false"); } + onComplete.run(); } } @@ -483,6 +584,32 @@ private void getClusterAlertsInstallationAsyncActions(final boolean indexExists, } } + /** + * Creates actions that remove cluster alerts (watches) from the cluster + * + * @param indexExists True for watch index existing, false otherwise. + * @param asyncActions Asynchronous actions are added to for each Watch. + * @param pendingResponses Pending response countdown we use to track completion. + * @param setupListener The listener to call with the status of the watch if there are watches to remove. + * @param errors A list to collect errors during the watch removal process. + */ + private void addClusterAlertsRemovalAsyncActions(final boolean indexExists, final List asyncActions, + final AtomicInteger pendingResponses, + Consumer setupListener, final List errors) { + final XPackClient xpackClient = new XPackClient(client); + final WatcherClient watcher = xpackClient.watcher(); + + for (final String watchId : ClusterAlertsUtil.WATCH_IDS) { + final String uniqueWatchId = ClusterAlertsUtil.createUniqueWatchId(clusterService, watchId); + if (indexExists) { + logger.trace("pruning monitoring watch [{}]", uniqueWatchId); + + asyncActions.add(() -> watcher.deleteWatch(new DeleteWatchRequest(uniqueWatchId), + new ErrorCapturingResponseListener<>("watch", uniqueWatchId, pendingResponses, setupListener, errors))); + } + } + } + private void putWatch(final WatcherClient watcher, final String watchId, final String uniqueWatchId, final AtomicInteger pendingResponses) { final String watch = ClusterAlertsUtil.loadWatch(clusterService, watchId); @@ -601,25 +728,35 @@ enum State { */ private class ResponseActionListener implements ActionListener { - private final String type; - private final String name; + protected final String type; + protected final String name; private final AtomicInteger countDown; + private final Runnable onComplete; private final AtomicBoolean setup; private ResponseActionListener(String type, String name, AtomicInteger countDown) { - this(type, name, countDown, null); + this(type, name, countDown, () -> {}, null); + } + + private ResponseActionListener(String type, String name, AtomicInteger countDown, Runnable onComplete) { + this(type, name, countDown, onComplete, null); } private ResponseActionListener(String type, String name, AtomicInteger countDown, @Nullable AtomicBoolean setup) { + this(type, name, countDown, () -> {}, setup); + } + + private ResponseActionListener(String type, String name, AtomicInteger countDown, Runnable onComplete, + @Nullable AtomicBoolean setup) { this.type = Objects.requireNonNull(type); this.name = Objects.requireNonNull(name); this.countDown = Objects.requireNonNull(countDown); + this.onComplete = Objects.requireNonNull(onComplete); this.setup = setup; } @Override public void onResponse(Response response) { - responseReceived(countDown, true, setup); if (response instanceof AcknowledgedResponse) { if (((AcknowledgedResponse)response).isAcknowledged()) { logger.trace("successfully set monitoring {} [{}]", type, name); @@ -629,15 +766,44 @@ public void onResponse(Response response) { } else { logger.trace("successfully handled monitoring {} [{}]", type, name); } + responseReceived(countDown, true, onComplete, setup); } @Override public void onFailure(Exception e) { - responseReceived(countDown, false, setup); + responseReceived(countDown, false, onComplete, setup); logger.error((Supplier) () -> new ParameterizedMessage("failed to set monitoring {} [{}]", type, name), e); } } + private class ErrorCapturingResponseListener extends ResponseActionListener { + private final List errors; + + ErrorCapturingResponseListener(String type, String name, AtomicInteger countDown, + Consumer setupListener, List errors) { + super(type, name, countDown, () -> { + // Called on completion of all removal tasks + ExporterResourceStatus status = ExporterResourceStatus.determineReadiness(LocalExporter.this.name(), TYPE, errors); + setupListener.accept(status); + }); + this.errors = errors; + } + + @Override + public void onResponse(Response response) { + if (response instanceof AcknowledgedResponse && ((AcknowledgedResponse)response).isAcknowledged() == false) { + errors.add(new ElasticsearchException("failed to set monitoring {} [{}]", type, name)); + } + super.onResponse(response); + } + + @Override + public void onFailure(Exception e) { + errors.add(new ElasticsearchException("failed to set monitoring {} [{}]", e, type, name)); + super.onFailure(e); + } + } + private class GetAndPutWatchResponseActionListener implements ActionListener { private final WatcherClient watcher; @@ -659,8 +825,7 @@ public void onResponse(GetWatchResponse response) { if (response.isFound() && hasValidVersion(response.getSource().getValue("metadata.xpack.version_created"), ClusterAlertsUtil.LAST_UPDATED_VERSION)) { logger.trace("found monitoring watch [{}]", uniqueWatchId); - - responseReceived(countDown, true, watcherSetup); + responseReceived(countDown, true, () -> {}, watcherSetup); } else { putWatch(watcher, watchId, uniqueWatchId, countDown); } @@ -668,7 +833,7 @@ public void onResponse(GetWatchResponse response) { @Override public void onFailure(Exception e) { - responseReceived(countDown, false, watcherSetup); + responseReceived(countDown, false, () -> {}, watcherSetup); if ((e instanceof IndexNotFoundException) == false) { logger.error((Supplier) () -> diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/rest/action/RestMonitoringMigrateAlertsAction.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/rest/action/RestMonitoringMigrateAlertsAction.java new file mode 100644 index 0000000000000..afcec22db6b82 --- /dev/null +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/rest/action/RestMonitoringMigrateAlertsAction.java @@ -0,0 +1,57 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.monitoring.rest.action; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.BytesRestResponse; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestResponse; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.rest.action.RestBuilderListener; +import org.elasticsearch.xpack.core.monitoring.action.MonitoringMigrateAlertsAction; +import org.elasticsearch.xpack.core.monitoring.action.MonitoringMigrateAlertsRequest; +import org.elasticsearch.xpack.core.monitoring.action.MonitoringMigrateAlertsResponse; + +import static org.elasticsearch.rest.RestRequest.Method.POST; + +public class RestMonitoringMigrateAlertsAction extends BaseRestHandler { + + @Override + public List routes() { + return Collections.singletonList( + new Route(POST, "/_monitoring/migrate/alerts") + ); + } + + @Override + public String getName() { + return "monitoring_migrate_alerts"; + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + MonitoringMigrateAlertsRequest migrateRequest = new MonitoringMigrateAlertsRequest(); + return channel -> client.execute(MonitoringMigrateAlertsAction.INSTANCE, migrateRequest, getRestBuilderListener(channel)); + } + + static RestBuilderListener getRestBuilderListener(RestChannel channel) { + return new RestBuilderListener(channel) { + @Override + public RestResponse buildResponse(MonitoringMigrateAlertsResponse response, XContentBuilder builder) throws Exception { + return new BytesRestResponse(RestStatus.OK, response.toXContent(builder, ToXContent.EMPTY_PARAMS)); + } + }; + } +} diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/LocalStateMonitoring.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/LocalStateMonitoring.java index e9f53c2048d54..f978d05759146 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/LocalStateMonitoring.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/LocalStateMonitoring.java @@ -5,15 +5,19 @@ */ package org.elasticsearch.xpack.monitoring; +import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.license.LicenseService; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; +import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.ssl.SSLService; import org.elasticsearch.xpack.ilm.IndexLifecycle; import org.elasticsearch.xpack.watcher.Watcher; import java.nio.file.Path; +import java.util.Collection; +import java.util.Collections; public class LocalStateMonitoring extends LocalStateCompositeXPackPlugin { @@ -50,6 +54,13 @@ protected SSLService getSslService() { protected XPackLicenseState getLicenseState() { return thisVar.getLicenseState(); } + + @Override + public Collection createGuiceModules() { + return XPackPlugin.transportClientMode(settings) ? + Collections.emptyList() : + super.createGuiceModules(); + } }); plugins.add(new IndexLifecycle(settings)); } diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/action/MonitoringMigrateAlertsResponseTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/action/MonitoringMigrateAlertsResponseTests.java new file mode 100644 index 0000000000000..129e8dc3f31b5 --- /dev/null +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/action/MonitoringMigrateAlertsResponseTests.java @@ -0,0 +1,65 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.monitoring.action; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchTimeoutException; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.xpack.core.monitoring.action.MonitoringMigrateAlertsResponse; +import org.elasticsearch.xpack.core.monitoring.action.MonitoringMigrateAlertsResponse.ExporterMigrationResult; +import org.elasticsearch.xpack.monitoring.exporter.http.HttpExporter; +import org.elasticsearch.xpack.monitoring.exporter.local.LocalExporter; +import org.junit.Before; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class MonitoringMigrateAlertsResponseTests extends AbstractWireSerializingTestCase { + + private static final class TestException extends IOException { + TestException(String message) { + super(message); + } + } + + private final List exceptionSamples = new ArrayList<>(); + + @Before + public void populateExceptionSamples() { + // A lineup of exceptions to make sure a number of errors are able to be serialized, even if they are not equal to themselves on + // the other side of the wire. + exceptionSamples.add(new ElasticsearchException(randomAlphaOfLength(15))); // Eminently serializable + exceptionSamples.add(new ElasticsearchTimeoutException(randomAlphaOfLength(15))); // Inherited serialization + exceptionSamples.add(new IOException(randomAlphaOfLength(15))); // Well known, special handling + exceptionSamples.add(new TestException(randomAlphaOfLength(15))); // Class resolution loss during serialization + exceptionSamples.add(new RuntimeException(randomAlphaOfLength(15))); // Unserializable + } + + @Override + protected Writeable.Reader instanceReader() { + return MonitoringMigrateAlertsResponse::new; + } + + @Override + protected MonitoringMigrateAlertsResponse createTestInstance() { + List results = new ArrayList<>(); + for (int i = 0; i < randomInt(10); i++) { + String name = randomAlphaOfLength(10); + String type = randomFrom(LocalExporter.TYPE, HttpExporter.TYPE); + boolean migrationComplete = randomBoolean(); + Exception reason = null; + if (migrationComplete == false) { + reason = randomFrom(exceptionSamples); + } + ExporterMigrationResult result = new ExporterMigrationResult(name, type, migrationComplete, reason); + results.add(result); + } + return new MonitoringMigrateAlertsResponse(results); + } +} diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/action/TransportMonitoringMigrateAlertsActionTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/action/TransportMonitoringMigrateAlertsActionTests.java new file mode 100644 index 0000000000000..e451a1baab903 --- /dev/null +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/action/TransportMonitoringMigrateAlertsActionTests.java @@ -0,0 +1,598 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.monitoring.action; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse; +import org.elasticsearch.cluster.metadata.IndexTemplateMetadata; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.http.MockRequest; +import org.elasticsearch.test.http.MockResponse; +import org.elasticsearch.test.http.MockWebServer; +import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.monitoring.action.MonitoringMigrateAlertsAction; +import org.elasticsearch.xpack.core.monitoring.action.MonitoringMigrateAlertsRequest; +import org.elasticsearch.xpack.core.monitoring.action.MonitoringMigrateAlertsResponse; +import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringTemplateUtils; +import org.elasticsearch.xpack.core.watcher.transport.actions.get.GetWatchAction; +import org.elasticsearch.xpack.core.watcher.transport.actions.get.GetWatchRequest; +import org.elasticsearch.xpack.monitoring.LocalStateMonitoring; +import org.elasticsearch.xpack.monitoring.Monitoring; +import org.elasticsearch.xpack.monitoring.MonitoringService; +import org.elasticsearch.xpack.monitoring.exporter.ClusterAlertsUtil; +import org.elasticsearch.xpack.monitoring.exporter.http.HttpExporter; +import org.elasticsearch.xpack.monitoring.exporter.local.LocalExporter; +import org.elasticsearch.xpack.monitoring.test.MonitoringIntegTestCase; +import org.junit.After; +import org.junit.Before; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.xpack.monitoring.exporter.http.ClusterAlertHttpResource.CLUSTER_ALERT_VERSION_PARAMETERS; +import static org.elasticsearch.xpack.monitoring.exporter.http.WatcherExistsHttpResource.WATCHER_CHECK_PARAMETERS; +import static org.hamcrest.Matchers.endsWith; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.startsWith; + +@ESIntegTestCase.ClusterScope(numDataNodes = 3) +public class TransportMonitoringMigrateAlertsActionTests extends MonitoringIntegTestCase { + + private MockWebServer webServer; + + private MockWebServer createMockWebServer() throws IOException { + MockWebServer server = new MockWebServer(); + server.start(); + return server; + } + + @Override + protected Collection> transportClientPlugins() { + return Arrays.asList(LocalStateMonitoring.class); + } + + @Override + protected Settings transportClientSettings() { + return Settings.builder().put(super.transportClientSettings()) + .put(XPackSettings.SECURITY_ENABLED.getKey(), false) + .put(XPackSettings.WATCHER_ENABLED.getKey(), false) + .build(); + } + + @Before + public void startWebServer() throws IOException { + webServer = createMockWebServer(); + } + + @After + public void stopWebServer() { + if (webServer != null) { + webServer.close(); + } + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder() + // Parent conf + .put(super.nodeSettings(nodeOrdinal)) + + // Disable monitoring + .put("xpack.monitoring.collection.enabled", false) + .put("xpack.monitoring.collection.interval", "1s") + + // X-Pack configuration + .put("xpack.license.self_generated.type", "trial") + .put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false) + .build(); + } + + private void stopMonitoring() { + // Clean up any transient settings we have added + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder() + .putNull(MonitoringService.ENABLED.getKey()) + .putNull("xpack.monitoring.elasticsearch.collection.enabled") + .putNull("xpack.monitoring.exporters._local.type") + .putNull("xpack.monitoring.exporters._local.enabled") + .putNull("xpack.monitoring.exporters._local.cluster_alerts.management.enabled") + .putNull("xpack.monitoring.exporters.remoteCluster.type") + .putNull("xpack.monitoring.exporters.remoteCluster.enabled") + .putNull("xpack.monitoring.exporters.remoteCluster.host") + .putNull("xpack.monitoring.exporters.remoteCluster.cluster_alerts.management.enabled") + )); + // Make sure to clean up the migration setting if it is set + assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(Settings.builder() + .putNull(Monitoring.MIGRATION_DECOMMISSION_ALERTS.getKey()) + )); + } + + public void testLocalAlertsRemoval() throws Exception { + try { + // start monitoring service + final Settings.Builder exporterSettings = Settings.builder() + .put(MonitoringService.ENABLED.getKey(), true) + .put("xpack.monitoring.exporters._local.type", LocalExporter.TYPE) + .put("xpack.monitoring.exporters._local.enabled", true) + .put("xpack.monitoring.exporters._local.cluster_alerts.management.enabled", true); + + // enable local exporter + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(exporterSettings)); + + // ensure resources exist + assertBusy(() -> { + assertThat(indexExists(".monitoring-*"), is(true)); + ensureYellowAndNoInitializingShards(".monitoring-*"); + checkMonitoringTemplates(); + assertWatchesExist(true); + }); + + // call migration api + MonitoringMigrateAlertsResponse response = client().execute(MonitoringMigrateAlertsAction.INSTANCE, + new MonitoringMigrateAlertsRequest()).actionGet(); + + // check response + assertThat(response.getExporters().size(), is(1)); + MonitoringMigrateAlertsResponse.ExporterMigrationResult localExporterResult = response.getExporters().get(0); + assertThat(localExporterResult.getName(), is("_local")); + assertThat(localExporterResult.getType(), is(LocalExporter.TYPE)); + assertThat(localExporterResult.isMigrationComplete(), is(true)); + assertThat(localExporterResult.getReason(), nullValue()); + + // ensure no watches + assertWatchesExist(false); + } finally { + stopMonitoring(); + } + } + + public void testRepeatedLocalAlertsRemoval() throws Exception { + try { + // start monitoring service + final Settings.Builder exporterSettings = Settings.builder() + .put(MonitoringService.ENABLED.getKey(), true) + .put("xpack.monitoring.exporters._local.type", LocalExporter.TYPE) + .put("xpack.monitoring.exporters._local.enabled", true) + .put("xpack.monitoring.exporters._local.cluster_alerts.management.enabled", true); + + // enable local exporter + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(exporterSettings)); + + // ensure resources exist + assertBusy(() -> { + assertThat(indexExists(".monitoring-*"), is(true)); + ensureYellowAndNoInitializingShards(".monitoring-*"); + checkMonitoringTemplates(); + assertWatchesExist(true); + }); + + // call migration api + MonitoringMigrateAlertsResponse response = client().execute(MonitoringMigrateAlertsAction.INSTANCE, + new MonitoringMigrateAlertsRequest()).actionGet(); + + // check response + assertThat(response.getExporters().size(), is(1)); + MonitoringMigrateAlertsResponse.ExporterMigrationResult localExporterResult = response.getExporters().get(0); + assertThat(localExporterResult.getName(), is("_local")); + assertThat(localExporterResult.getType(), is(LocalExporter.TYPE)); + assertThat(localExporterResult.isMigrationComplete(), is(true)); + assertThat(localExporterResult.getReason(), nullValue()); + + // ensure no watches + assertWatchesExist(false); + + // call migration api again + response = client().execute(MonitoringMigrateAlertsAction.INSTANCE, new MonitoringMigrateAlertsRequest()).actionGet(); + + // check second response + assertThat(response.getExporters().size(), is(1)); + localExporterResult = response.getExporters().get(0); + assertThat(localExporterResult.getName(), is("_local")); + assertThat(localExporterResult.getType(), is(LocalExporter.TYPE)); + assertThat(localExporterResult.isMigrationComplete(), is(true)); + assertThat(localExporterResult.getReason(), nullValue()); + } finally { + stopMonitoring(); + } + } + + public void testDisabledLocalExporterAlertsRemoval() throws Exception { + try { + // start monitoring service + final Settings.Builder exporterSettings = Settings.builder() + .put(MonitoringService.ENABLED.getKey(), true) + .put("xpack.monitoring.exporters._local.type", LocalExporter.TYPE) + .put("xpack.monitoring.exporters._local.enabled", true) + .put("xpack.monitoring.exporters._local.cluster_alerts.management.enabled", true); + + // enable local exporter + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(exporterSettings)); + + // ensure resources exist + ensureInitialLocalResources(); + + // new disable local exporter + final Settings.Builder disableSettings = Settings.builder() + .put(MonitoringService.ENABLED.getKey(), true) + .put("xpack.monitoring.exporters._local.type", LocalExporter.TYPE) + .put("xpack.monitoring.exporters._local.enabled", false) + .put("xpack.monitoring.exporters._local.cluster_alerts.management.enabled", true); + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(disableSettings)); + + // call migration api + MonitoringMigrateAlertsResponse response = client().execute(MonitoringMigrateAlertsAction.INSTANCE, + new MonitoringMigrateAlertsRequest()).actionGet(); + + // check response + assertThat(response.getExporters().size(), is(1)); + MonitoringMigrateAlertsResponse.ExporterMigrationResult localExporterResult = response.getExporters().get(0); + assertThat(localExporterResult.getName(), is("_local")); + assertThat(localExporterResult.getType(), is(LocalExporter.TYPE)); + assertThat(localExporterResult.isMigrationComplete(), is(true)); + assertThat(localExporterResult.getReason(), nullValue()); + + // ensure no watches + assertWatchesExist(false); + } finally { + stopMonitoring(); + } + } + + public void testLocalExporterWithAlertingDisabled() throws Exception { + try { + // start monitoring service + final Settings.Builder exporterSettings = Settings.builder() + .put(MonitoringService.ENABLED.getKey(), true) + .put("xpack.monitoring.exporters._local.type", LocalExporter.TYPE) + .put("xpack.monitoring.exporters._local.enabled", true) + .put("xpack.monitoring.exporters._local.cluster_alerts.management.enabled", true); + + // enable local exporter + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(exporterSettings)); + + // ensure resources exist + ensureInitialLocalResources(); + + // new disable local exporter's cluster alerts + final Settings.Builder disableSettings = Settings.builder() + .put(MonitoringService.ENABLED.getKey(), true) + .put("xpack.monitoring.exporters._local.type", LocalExporter.TYPE) + .put("xpack.monitoring.exporters._local.enabled", true) + .put("xpack.monitoring.exporters._local.cluster_alerts.management.enabled", false); + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(disableSettings)); + + // call migration api + MonitoringMigrateAlertsResponse response = client().execute(MonitoringMigrateAlertsAction.INSTANCE, + new MonitoringMigrateAlertsRequest()).actionGet(); + + // check response + assertThat(response.getExporters().size(), is(1)); + MonitoringMigrateAlertsResponse.ExporterMigrationResult localExporterResult = response.getExporters().get(0); + assertThat(localExporterResult.getName(), is("_local")); + assertThat(localExporterResult.getType(), is(LocalExporter.TYPE)); + assertThat(localExporterResult.isMigrationComplete(), is(false)); + assertThat(localExporterResult.getReason(), notNullValue()); + assertThat(localExporterResult.getReason().getMessage(), is("cannot manage cluster alerts because alerting is disabled")); + } finally { + stopMonitoring(); + } + } + + public void testRemoteAlertsRemoval() throws Exception { + try { + // start monitoring service + final Settings.Builder exporterSettings = Settings.builder() + .put(MonitoringService.ENABLED.getKey(), true) + // Make sure to not collect ES stats in background. Our web server expects requests in a particular order. + .put("xpack.monitoring.elasticsearch.collection.enabled", false) + .put("xpack.monitoring.exporters.remoteCluster.type", HttpExporter.TYPE) + .put("xpack.monitoring.exporters.remoteCluster.enabled", true) + .put("xpack.monitoring.exporters.remoteCluster.host", webServer.getHostName() + ":" + webServer.getPort()) + .put("xpack.monitoring.exporters.remoteCluster.cluster_alerts.management.enabled", true); + + // enable http exporter + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(exporterSettings)); + + // enqueue delete request expectations for alerts + enqueueWatcherResponses(webServer, true); + + // call migration api + MonitoringMigrateAlertsResponse response = client().execute(MonitoringMigrateAlertsAction.INSTANCE, + new MonitoringMigrateAlertsRequest()).actionGet(); + + // check that all "remote watches" were deleted by the exporter + assertThat(response.getExporters().size(), is(1)); + MonitoringMigrateAlertsResponse.ExporterMigrationResult localExporterResult = response.getExporters().get(0); + assertThat(localExporterResult.getName(), is("remoteCluster")); + assertThat(localExporterResult.getType(), is(HttpExporter.TYPE)); + assertThat(localExporterResult.isMigrationComplete(), is(true)); + assertThat(localExporterResult.getReason(), nullValue()); + + // ensure no watches + assertMonitorWatches(webServer, true); + } finally { + stopMonitoring(); + webServer.clearRequests(); + } + } + + public void testDisabledRemoteAlertsRemoval() throws Exception { + try { + // start monitoring service + final Settings.Builder exporterSettings = Settings.builder() + .put(MonitoringService.ENABLED.getKey(), true) + // Make sure to not collect ES stats in background. Our web server expects requests in a particular order. + .put("xpack.monitoring.elasticsearch.collection.enabled", false) + .put("xpack.monitoring.exporters.remoteCluster.type", HttpExporter.TYPE) + .put("xpack.monitoring.exporters.remoteCluster.enabled", false) + .put("xpack.monitoring.exporters.remoteCluster.host", webServer.getHostName() + ":" + webServer.getPort()) + .put("xpack.monitoring.exporters.remoteCluster.cluster_alerts.management.enabled", true); + + // configure disabled http exporter + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(exporterSettings)); + + // enqueue delete request expectations for alerts + enqueueWatcherResponses(webServer, true); + + // call migration api + MonitoringMigrateAlertsResponse response = client().execute(MonitoringMigrateAlertsAction.INSTANCE, + new MonitoringMigrateAlertsRequest()).actionGet(); + + // check that the disabled http exporter was enabled this one time in order to remove watches + assertThat(response.getExporters().size(), is(1)); + MonitoringMigrateAlertsResponse.ExporterMigrationResult localExporterResult = response.getExporters().get(0); + assertThat(localExporterResult.getName(), is("remoteCluster")); + assertThat(localExporterResult.getType(), is(HttpExporter.TYPE)); + assertThat(localExporterResult.isMigrationComplete(), is(true)); + assertThat(localExporterResult.getReason(), nullValue()); + + // ensure no watches + assertMonitorWatches(webServer, true); + } finally { + stopMonitoring(); + webServer.clearRequests(); + } + } + + public void testRemoteAlertsRemovalWhenOriginalMonitoringClusterIsGone() throws Exception { + try { + // start monitoring service + final Settings.Builder exporterSettings = Settings.builder() + .put(MonitoringService.ENABLED.getKey(), true) + // Make sure to not collect ES stats in background. Our web server expects requests in a particular order. + .put("xpack.monitoring.elasticsearch.collection.enabled", false) + .put("xpack.monitoring.exporters.remoteCluster.type", HttpExporter.TYPE) + .put("xpack.monitoring.exporters.remoteCluster.enabled", false) + .put("xpack.monitoring.exporters.remoteCluster.host", webServer.getHostName() + ":" + webServer.getPort()) + .put("xpack.monitoring.exporters.remoteCluster.cluster_alerts.management.enabled", true); + + // create a disabled http exporter + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(exporterSettings)); + + // call migration api + MonitoringMigrateAlertsResponse response = client().execute(MonitoringMigrateAlertsAction.INSTANCE, + new MonitoringMigrateAlertsRequest()).actionGet(); + + // check that migration failed due to monitoring cluster not responding + assertThat(response.getExporters().size(), is(1)); + MonitoringMigrateAlertsResponse.ExporterMigrationResult localExporterResult = response.getExporters().get(0); + assertThat(localExporterResult.getName(), is("remoteCluster")); + assertThat(localExporterResult.getType(), is(HttpExporter.TYPE)); + assertThat(localExporterResult.isMigrationComplete(), is(false)); + // this might be a messier exception in practice like connection refused, but hey, testability + assertThat(localExporterResult.getReason().getMessage(), is("Connection is closed")); + } finally { + stopMonitoring(); + webServer.clearRequests(); + } + } + + public void testRemoteAlertsRemovalFailure() throws Exception { + try { + // start monitoring service + final Settings.Builder exporterSettings = Settings.builder() + .put(MonitoringService.ENABLED.getKey(), true) + // Make sure to not collect ES stats in background. Our web server expects requests in a particular order. + .put("xpack.monitoring.elasticsearch.collection.enabled", false) + .put("xpack.monitoring.exporters.remoteCluster.type", HttpExporter.TYPE) + .put("xpack.monitoring.exporters.remoteCluster.enabled", true) + .put("xpack.monitoring.exporters.remoteCluster.host", webServer.getHostName() + ":" + webServer.getPort()) + .put("xpack.monitoring.exporters.remoteCluster.cluster_alerts.management.enabled", true); + + // enable http exporter + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(exporterSettings)); + + // enqueue a "watcher available" response, but then a "failure to delete watch" response + enqueueResponse(webServer, 200, "{\"features\":{\"watcher\":{\"available\":true,\"enabled\":true}}}"); + enqueueResponse(webServer, 500, "{\"error\":{}}"); + + // call migration api + MonitoringMigrateAlertsResponse response = client().execute(MonitoringMigrateAlertsAction.INSTANCE, + new MonitoringMigrateAlertsRequest()).actionGet(); + + // check that an error is reported while trying to remove a remote watch + assertThat(response.getExporters().size(), is(1)); + MonitoringMigrateAlertsResponse.ExporterMigrationResult localExporterResult = response.getExporters().get(0); + assertThat(localExporterResult.getName(), is("remoteCluster")); + assertThat(localExporterResult.getType(), is(HttpExporter.TYPE)); + assertThat(localExporterResult.isMigrationComplete(), is(false)); + assertThat(localExporterResult.getReason().getMessage(), startsWith("method [DELETE], host [")); + assertThat(localExporterResult.getReason().getMessage(), + endsWith("status line [HTTP/1.1 500 Internal Server Error]\n{\"error\":{}}")); + + } finally { + stopMonitoring(); + webServer.clearRequests(); + } + } + + public void testRemoteAlertsRemoteDisallowsWatcher() throws Exception { + try { + // start monitoring service + final Settings.Builder exporterSettings = Settings.builder() + .put(MonitoringService.ENABLED.getKey(), true) + // Make sure to not collect ES stats in background. Our web server expects requests in a particular order. + .put("xpack.monitoring.elasticsearch.collection.enabled", false) + .put("xpack.monitoring.exporters.remoteCluster.type", HttpExporter.TYPE) + .put("xpack.monitoring.exporters.remoteCluster.enabled", true) + .put("xpack.monitoring.exporters.remoteCluster.host", webServer.getHostName() + ":" + webServer.getPort()) + .put("xpack.monitoring.exporters.remoteCluster.cluster_alerts.management.enabled", true); + + // enable http exporter + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(exporterSettings)); + + // enqueue a "watcher available" response, but then a "failure to delete watch" response + enqueueWatcherResponses(webServer, false); + + // call migration api + MonitoringMigrateAlertsResponse response = client().execute(MonitoringMigrateAlertsAction.INSTANCE, + new MonitoringMigrateAlertsRequest()).actionGet(); + + // Migration is marked as complete since watcher is disabled on remote cluster. + assertThat(response.getExporters().size(), is(1)); + MonitoringMigrateAlertsResponse.ExporterMigrationResult localExporterResult = response.getExporters().get(0); + assertThat(localExporterResult.getName(), is("remoteCluster")); + assertThat(localExporterResult.getType(), is(HttpExporter.TYPE)); + assertThat(localExporterResult.isMigrationComplete(), is(true)); + + // ensure responses + assertMonitorWatches(webServer, false); + } finally { + stopMonitoring(); + webServer.clearRequests(); + } + } + + private void ensureInitialLocalResources() throws Exception { + assertBusy(() -> { + assertThat(indexExists(".monitoring-*"), is(true)); + ensureYellowAndNoInitializingShards(".monitoring-*"); + checkMonitoringTemplates(); + assertWatchesExist(true); + }, 20, TimeUnit.SECONDS); // Watcher can be slow to allocate all watches required + } + + /** + * Checks that the monitoring templates have been created by the local exporter + */ + private void checkMonitoringTemplates() { + final Set templates = new HashSet<>(); + templates.add(".monitoring-alerts-7"); + templates.add(".monitoring-es"); + templates.add(".monitoring-kibana"); + templates.add(".monitoring-logstash"); + templates.add(".monitoring-beats"); + + GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(".monitoring-*").get(); + Set actualTemplates = response.getIndexTemplates().stream().map(IndexTemplateMetadata::getName).collect(Collectors.toSet()); + assertEquals(templates, actualTemplates); + } + + private void assertWatchesExist(boolean exist) { + // Check if watches index exists + if (client().admin().indices().prepareGetIndex().addIndices(".watches").get().getIndices().length == 0) { + fail("Expected [.watches] index with cluster alerts present, but no [.watches] index was found"); + } + + Arrays.stream(ClusterAlertsUtil.WATCH_IDS) + .map(n -> ClusterAlertsUtil.createUniqueWatchId(clusterService(), n)) + .map(watch -> client().execute(GetWatchAction.INSTANCE, new GetWatchRequest(watch)).actionGet()) + .filter(r -> r.isFound() != exist) + .findAny() + .ifPresent(r -> fail((exist ? "missing" : "found") + " watch [" + r.getId() + "]")); + } + + protected List monitoringTemplateNames() { + return Arrays.stream(MonitoringTemplateUtils.TEMPLATE_IDS) + .map(MonitoringTemplateUtils::templateName) + .collect(Collectors.toList()); + } + + private void enqueueWatcherResponses(final MockWebServer webServer, final boolean remoteClusterAllowsWatcher) throws IOException { + // if the remote cluster doesn't allow watcher, then we only check for it and we're done + if (remoteClusterAllowsWatcher) { + // X-Pack exists and Watcher can be used + enqueueResponse(webServer, 200, "{\"features\":{\"watcher\":{\"available\":true,\"enabled\":true}}}"); + + // add delete responses + enqueueDeleteClusterAlertResponses(webServer); + } else { + // X-Pack exists but Watcher just cannot be used + if (randomBoolean()) { + final String responseBody = randomFrom( + "{\"features\":{\"watcher\":{\"available\":false,\"enabled\":true}}}", + "{\"features\":{\"watcher\":{\"available\":true,\"enabled\":false}}}", + "{}" + ); + + enqueueResponse(webServer, 200, responseBody); + } else { + // X-Pack is not installed + enqueueResponse(webServer, 404, "{}"); + } + } + } + + private void enqueueDeleteClusterAlertResponses(final MockWebServer webServer) throws IOException { + for (final String watchId : ClusterAlertsUtil.WATCH_IDS) { + enqueueDeleteClusterAlertResponse(webServer, watchId); + } + } + + private void enqueueDeleteClusterAlertResponse(final MockWebServer webServer, final String watchId) throws IOException { + if (randomBoolean()) { + enqueueResponse(webServer, 404, "watch [" + watchId + "] did not exist"); + } else { + enqueueResponse(webServer, 200, "watch [" + watchId + "] deleted"); + } + } + + private void enqueueResponse(MockWebServer mockWebServer, int responseCode, String body) throws IOException { + mockWebServer.enqueue(new MockResponse().setResponseCode(responseCode).setBody(body)); + } + + private String watcherCheckQueryString() { + return "filter_path=" + WATCHER_CHECK_PARAMETERS.get("filter_path"); + } + + private String resourceClusterAlertQueryString() { + return "filter_path=" + CLUSTER_ALERT_VERSION_PARAMETERS.get("filter_path"); + } + + private void assertMonitorWatches(final MockWebServer webServer, final boolean remoteClusterAllowsWatcher) { + MockRequest request = webServer.takeRequest(); + + // GET /_xpack + assertThat(request.getMethod(), equalTo("GET")); + assertThat(request.getUri().getPath(), equalTo("/_xpack")); + assertThat(request.getUri().getQuery(), equalTo(watcherCheckQueryString())); + + if (remoteClusterAllowsWatcher) { + for (final Tuple watch : monitoringWatches()) { + final String uniqueWatchId = ClusterAlertsUtil.createUniqueWatchId(clusterService(), watch.v1()); + + request = webServer.takeRequest(); + + // GET / PUT if we are allowed to use it + assertThat(request.getMethod(), equalTo("DELETE")); + assertThat(request.getUri().getPath(), equalTo("/_watcher/watch/" + uniqueWatchId)); + assertThat(request.getUri().getQuery(), equalTo(resourceClusterAlertQueryString())); + } + } + } +} diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/ExportersTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/ExportersTests.java index c12c05bcedfbf..2ef79313417a2 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/ExportersTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/ExportersTests.java @@ -49,6 +49,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.stream.Collectors; import static org.hamcrest.Matchers.contains; @@ -100,7 +101,8 @@ public void init() { sslService = mock(SSLService.class); // we always need to have the local exporter as it serves as the default one - factories.put(LocalExporter.TYPE, config -> new LocalExporter(config, client, mock(CleanerService.class))); + factories.put(LocalExporter.TYPE, config -> new LocalExporter(config, client, new MonitoringMigrationCoordinator(), + mock(CleanerService.class))); exporters = new Exporters(Settings.EMPTY, factories, clusterService, licenseState, threadContext, sslService); } @@ -146,7 +148,7 @@ public void testExporterIndexPattern() { public void testInitExportersDefault() throws Exception { factories.put("_type", TestExporter::new); - Map internalExporters = exporters.initExporters(Settings.builder().build()); + Map internalExporters = exporters.initExporters(Settings.builder().build()).enabledExporters; assertThat(internalExporters, notNullValue()); assertThat(internalExporters.size(), is(1)); @@ -158,7 +160,7 @@ public void testInitExportersSingle() throws Exception { factories.put("local", TestExporter::new); Map internalExporters = exporters.initExporters(Settings.builder() .put("xpack.monitoring.exporters._name.type", "local") - .build()); + .build()).enabledExporters; assertThat(internalExporters, notNullValue()); assertThat(internalExporters.size(), is(1)); @@ -172,7 +174,7 @@ public void testInitExportersSingleDisabled() throws Exception { Map internalExporters = exporters.initExporters(Settings.builder() .put("xpack.monitoring.exporters._name.type", "local") .put("xpack.monitoring.exporters._name.enabled", false) - .build()); + .build()).enabledExporters; assertThat(internalExporters, notNullValue()); @@ -198,7 +200,7 @@ public void testInitExportersMultipleSameType() throws Exception { Map internalExporters = exporters.initExporters(Settings.builder() .put("xpack.monitoring.exporters._name0.type", "_type") .put("xpack.monitoring.exporters._name1.type", "_type") - .build()); + .build()).enabledExporters; assertThat(internalExporters, notNullValue()); assertThat(internalExporters.size(), is(2)); @@ -236,7 +238,7 @@ public void testSettingsUpdate() throws Exception { exporters = new Exporters(nodeSettings, factories, clusterService, licenseState, threadContext, sslService) { @Override - Map initExporters(Settings settings) { + InitializedExporters initExporters(Settings settings) { settingsHolder.set(settings); return super.initExporters(settings); } @@ -447,6 +449,10 @@ static class TestExporter extends Exporter { super(config); } + @Override + public void removeAlerts(Consumer listener) { + } + @Override public void openBulk(final ActionListener listener) { listener.onResponse(mock(ExportBulk.class)); @@ -479,6 +485,10 @@ static class CountingExporter extends Exporter { this.threadContext = threadContext; } + @Override + public void removeAlerts(Consumer listener) { + } + @Override public void openBulk(final ActionListener listener) { CountingBulk bulk = new CountingBulk(config.type() + "#" + count.getAndIncrement(), threadContext); diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/AbstractPublishableHttpResourceTestCase.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/AbstractPublishableHttpResourceTestCase.java index 8eebd195b4938..8c2c8b3a9833c 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/AbstractPublishableHttpResourceTestCase.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/AbstractPublishableHttpResourceTestCase.java @@ -22,6 +22,7 @@ import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.monitoring.exporter.http.HttpResource.ResourcePublishResult; import org.mockito.ArgumentCaptor; import java.io.IOException; @@ -33,7 +34,9 @@ import static org.elasticsearch.rest.BaseRestHandler.INCLUDE_TYPE_NAME_PARAMETER; import static org.elasticsearch.xpack.monitoring.exporter.http.AsyncHttpResourceHelper.mockBooleanActionListener; +import static org.elasticsearch.xpack.monitoring.exporter.http.AsyncHttpResourceHelper.mockPublishResultActionListener; import static org.elasticsearch.xpack.monitoring.exporter.http.AsyncHttpResourceHelper.whenPerformRequestAsyncWith; +import static org.elasticsearch.xpack.monitoring.exporter.http.AsyncHttpResourceHelper.wrapMockListener; import static org.elasticsearch.xpack.monitoring.exporter.http.PublishableHttpResource.GET_DOES_NOT_EXIST; import static org.elasticsearch.xpack.monitoring.exporter.http.PublishableHttpResource.GET_EXISTS; import static org.hamcrest.Matchers.is; @@ -56,7 +59,8 @@ public abstract class AbstractPublishableHttpResourceTestCase extends ESTestCase protected final TimeValue masterTimeout = randomFrom(TimeValue.timeValueMinutes(5), TimeValue.MINUS_ONE, null); protected final RestClient client = mock(RestClient.class); - protected final ActionListener listener = mockBooleanActionListener(); + protected final ActionListener checkListener = mockBooleanActionListener(); + protected final ActionListener publishListener = mockPublishResultActionListener(); /** * Perform {@link PublishableHttpResource#doCheck(RestClient, ActionListener) doCheck} against the {@code resource} and assert that it @@ -105,9 +109,9 @@ protected void assertCheckWithException(final PublishableHttpResource resource, addParameters(request, expectedParameters); whenPerformRequestAsyncWith(client, request, e); - resource.doCheck(client, listener); + resource.doCheck(client, wrapMockListener(checkListener)); - verifyListener(null); + verifyCheckListener(null); } /** @@ -144,9 +148,9 @@ protected void assertCheckAsDeleteWithException(final PublishableHttpResource re addParameters(request, deleteParameters(resource.getDefaultParameters())); whenPerformRequestAsyncWith(client, request, e); - resource.doCheck(client, listener); + resource.doCheck(client, wrapMockListener(checkListener)); - verifyListener(null); + verifyCheckListener(null); } /** @@ -183,9 +187,9 @@ protected void assertPublishWithException(final PublishableHttpResource resource whenPerformRequestAsyncWith(client, e); - resource.doPublish(client, listener); + resource.doPublish(client, wrapMockListener(publishListener)); - verifyListener(null); + verifyPublishListener(null); Map allParameters = new HashMap<>(); allParameters.putAll(resource.getDefaultParameters()); @@ -264,10 +268,10 @@ protected void doCheckWithStatusCode(final PublishableHttpResource resource, fin whenPerformRequestAsyncWith(client, request, response); - resource.doCheck(client, listener); + resource.doCheck(client, wrapMockListener(checkListener)); verify(client).performRequestAsync(eq(request), any(ResponseListener.class)); - verifyListener(expected); + verifyCheckListener(expected); } private void doPublishWithStatusCode(final PublishableHttpResource resource, final String resourceBasePath, final String resourceName, @@ -280,9 +284,9 @@ private void doPublishWithStatusCode(final PublishableHttpResource resource, fin whenPerformRequestAsyncWith(client, response); - resource.doPublish(client, listener); + resource.doPublish(client, wrapMockListener(publishListener)); - verifyListener(errorFree ? true : null); + verifyPublishListener(errorFree ? ResourcePublishResult.ready() : null); final ArgumentCaptor request = ArgumentCaptor.forClass(Request.class); verify(client).performRequestAsync(request.capture(), any(ResponseListener.class)); @@ -314,9 +318,9 @@ protected void doCheckAsDeleteWithStatusCode(final PublishableHttpResource resou addParameters(request, deleteParameters(resource.getDefaultParameters())); whenPerformRequestAsyncWith(client, request, response); - resource.doCheck(client, listener); + resource.doCheck(client, wrapMockListener(checkListener)); - verifyListener(expected); + verifyCheckListener(expected); } protected RestStatus successfulCheckStatus() { @@ -450,13 +454,23 @@ protected void addParameters(final Request request, final Map pa } } - protected void verifyListener(final Boolean expected) { + protected void verifyPublishListener(final ResourcePublishResult expected) { if (expected == null) { - verify(listener, never()).onResponse(anyBoolean()); - verify(listener).onFailure(any(Exception.class)); + verify(publishListener, never()).onResponse(any()); + verify(publishListener).onFailure(any(Exception.class)); } else { - verify(listener).onResponse(expected); - verify(listener, never()).onFailure(any(Exception.class)); + verify(publishListener).onResponse(expected); + verify(publishListener, never()).onFailure(any(Exception.class)); + } + } + + protected void verifyCheckListener(final Boolean expected) { + if (expected == null) { + verify(checkListener, never()).onResponse(anyBoolean()); + verify(checkListener).onFailure(any(Exception.class)); + } else { + verify(checkListener).onResponse(expected); + verify(checkListener, never()).onFailure(any(Exception.class)); } } diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/AsyncHttpResourceHelper.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/AsyncHttpResourceHelper.java index d973bc196863a..780bb2b87ea55 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/AsyncHttpResourceHelper.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/AsyncHttpResourceHelper.java @@ -28,6 +28,16 @@ static ActionListener mockBooleanActionListener() { return mock(ActionListener.class); } + @SuppressWarnings("unchecked") + static ActionListener mockPublishResultActionListener() { + return mock(ActionListener.class); + } + + static ActionListener wrapMockListener(ActionListener mock) { + // wraps the mock listener so that default functions on the ActionListener interface can be used + return ActionListener.wrap(mock::onResponse, mock::onFailure); + } + static void whenPerformRequestAsyncWith(final RestClient client, final Response response) { doAnswer(invocation -> { ((ResponseListener)invocation.getArguments()[1]).onSuccess(response); diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExporterResourceTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExporterResourceTests.java index c9b8fb197d8e6..02dbfcf1fa11a 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExporterResourceTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExporterResourceTests.java @@ -25,6 +25,7 @@ import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringTemplateUtils; import org.elasticsearch.xpack.monitoring.exporter.ClusterAlertsUtil; import org.elasticsearch.xpack.monitoring.exporter.Exporter; +import org.elasticsearch.xpack.monitoring.exporter.http.HttpResource.ResourcePublishResult; import org.hamcrest.Description; import org.hamcrest.Matcher; import org.hamcrest.TypeSafeMatcher; @@ -40,6 +41,7 @@ import static org.elasticsearch.xpack.core.monitoring.exporter.MonitoringTemplateUtils.PIPELINE_IDS; import static org.elasticsearch.xpack.core.monitoring.exporter.MonitoringTemplateUtils.TEMPLATE_IDS; import static org.elasticsearch.xpack.monitoring.exporter.http.AsyncHttpResourceHelper.whenPerformRequestAsyncWith; +import static org.elasticsearch.xpack.monitoring.exporter.http.AsyncHttpResourceHelper.wrapMockListener; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.startsWith; @@ -82,7 +84,7 @@ public class HttpExporterResourceTests extends AbstractPublishableHttpResourceTe private final MultiHttpResource resources = HttpExporter.createResources( - new Exporter.Config("_http", "http", exporterSettings, clusterService, licenseState)); + new Exporter.Config("_http", "http", exporterSettings, clusterService, licenseState)).allResources; @Before public void setupResources() { @@ -106,9 +108,12 @@ public void awaitCheckAndPublish(final Boolean expected) { } public void awaitCheckAndPublish(HttpResource resource, final Boolean expected) { - resource.checkAndPublish(client, listener); + awaitCheckAndPublish(resource, expected != null ? new ResourcePublishResult(expected) : null); + } - verifyListener(expected); + public void awaitCheckAndPublish(HttpResource resource, final ResourcePublishResult expected) { + resource.checkAndPublish(client, wrapMockListener(publishListener)); + verifyPublishListener(expected); } public void testInvalidVersionBlocks() { @@ -118,7 +123,8 @@ public void testInvalidVersionBlocks() { whenPerformRequestAsyncWith(client, new RequestMatcher(is("GET"), is("/")), versionResponse); assertTrue(resources.isDirty()); - awaitCheckAndPublish(false); + awaitCheckAndPublish(resources, new ResourcePublishResult(false, + "version [3.0.0] < [7.0.0] and NOT supported for [xpack.monitoring.exporters._http]", HttpResource.State.DIRTY)); // ensure it didn't magically become clean assertTrue(resources.isDirty()); @@ -519,7 +525,7 @@ public void testDeployClusterAlerts() { .put("xpack.monitoring.migration.decommission_alerts", true) .build(); MultiHttpResource overrideResource = HttpExporter.createResources( - new Exporter.Config("_http", "http", removalExporterSettings, clusterService, licenseState)); + new Exporter.Config("_http", "http", removalExporterSettings, clusterService, licenseState)).allResources; assertTrue(overrideResource.isDirty()); awaitCheckAndPublish(overrideResource, true); @@ -595,7 +601,7 @@ public void testSuccessfulChecksIfNotElectedMasterNode() { final MultiHttpResource resources = HttpExporter.createResources( - new Exporter.Config("_http", "http", exporterSettings, clusterService, licenseState)); + new Exporter.Config("_http", "http", exporterSettings, clusterService, licenseState)).allResources; final int successfulGetTemplates = randomIntBetween(0, EXPECTED_TEMPLATES); final int unsuccessfulGetTemplates = EXPECTED_TEMPLATES - successfulGetTemplates; @@ -611,9 +617,9 @@ public void testSuccessfulChecksIfNotElectedMasterNode() { assertTrue(resources.isDirty()); // it should be able to proceed! (note: we are not using the instance "resources" here) - resources.checkAndPublish(client, listener); + resources.checkAndPublish(client, wrapMockListener(publishListener)); - verifyListener(true); + verifyPublishListener(ResourcePublishResult.ready()); assertFalse(resources.isDirty()); verifyVersionCheck(); diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExporterTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExporterTests.java index dc597f4eb05bd..7549b63ecb6d0 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExporterTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExporterTests.java @@ -32,6 +32,7 @@ import org.elasticsearch.xpack.monitoring.exporter.ExportBulk; import org.elasticsearch.xpack.monitoring.exporter.Exporter; import org.elasticsearch.xpack.monitoring.exporter.Exporter.Config; +import org.elasticsearch.xpack.monitoring.exporter.MonitoringMigrationCoordinator; import org.junit.Before; import org.mockito.InOrder; @@ -216,9 +217,10 @@ public void testExporterWithBlacklistedHeaders() { } final Config config = createConfig(builder.build()); + final MonitoringMigrationCoordinator coordinator = new MonitoringMigrationCoordinator(); final SettingsException exception = - expectThrows(SettingsException.class, () -> new HttpExporter(config, sslService, threadContext)); + expectThrows(SettingsException.class, () -> new HttpExporter(config, sslService, threadContext, coordinator)); assertThat(exception.getMessage(), equalTo(expected)); } @@ -236,9 +238,10 @@ public void testExporterWithEmptyHeaders() { } final Config config = createConfig(builder.build()); + final MonitoringMigrationCoordinator coordinator = new MonitoringMigrationCoordinator(); final SettingsException exception = - expectThrows(SettingsException.class, () -> new HttpExporter(config, sslService, threadContext)); + expectThrows(SettingsException.class, () -> new HttpExporter(config, sslService, threadContext, coordinator)); assertThat(exception.getMessage(), equalTo(expected)); } @@ -280,9 +283,10 @@ public void testExporterWithUnknownBlacklistedClusterAlerts() { .putList("xpack.monitoring.exporters._http.cluster_alerts.management.blacklist", blacklist); final Config config = createConfig(builder.build()); + final MonitoringMigrationCoordinator coordinator = new MonitoringMigrationCoordinator(); final SettingsException exception = - expectThrows(SettingsException.class, () -> new HttpExporter(config, sslService, threadContext)); + expectThrows(SettingsException.class, () -> new HttpExporter(config, sslService, threadContext, coordinator)); assertThat(exception.getMessage(), equalTo("[xpack.monitoring.exporters._http.cluster_alerts.management.blacklist] contains unrecognized Cluster " + @@ -298,8 +302,9 @@ public void testExporterWithHostOnly() throws Exception { .put("xpack.monitoring.exporters._http.host", "http://localhost:9200"); final Config config = createConfig(builder.build()); + final MonitoringMigrationCoordinator coordinator = new MonitoringMigrationCoordinator(); - new HttpExporter(config, sslService, threadContext).close(); + new HttpExporter(config, sslService, threadContext, coordinator).close(); } public void testExporterWithInvalidProxyBasePath() throws Exception { @@ -439,7 +444,7 @@ public void testCreateResources() { final Config config = createConfig(builder.build()); - final MultiHttpResource multiResource = HttpExporter.createResources(config); + final MultiHttpResource multiResource = HttpExporter.createResources(config).allResources; final List resources = multiResource.getResources(); final int version = (int)resources.stream().filter((resource) -> resource instanceof VersionHttpResource).count(); @@ -522,6 +527,37 @@ public void testCreateDefaultParams() { assertThat(parameters.size(), equalTo(0)); } + public void testHttpExporterMigrationInProgressBlock() throws Exception { + final Config config = createConfig(Settings.EMPTY); + final RestClient client = mock(RestClient.class); + final Sniffer sniffer = randomFrom(mock(Sniffer.class), null); + final NodeFailureListener listener = mock(NodeFailureListener.class); + // this is configured to throw an error when the resource is checked + final HttpResource resource = new MockHttpResource(exporterName(), true, null, false); + final HttpResource alertsResource = new MockHttpResource(exporterName(), false, null, false); + final MonitoringMigrationCoordinator migrationCoordinator = new MonitoringMigrationCoordinator(); + assertTrue(migrationCoordinator.tryBlockInstallationTasks()); + + try (HttpExporter exporter = new HttpExporter(config, client, sniffer, threadContext, migrationCoordinator, listener, resource, + alertsResource)) { + verify(listener).setResource(resource); + + final CountDownLatch awaitResponseAndClose = new CountDownLatch(1); + final ActionListener bulkListener = ActionListener.wrap( + bulk -> { + assertNull("should have been invoked with null value to denote migration in progress", bulk); + awaitResponseAndClose.countDown(); + }, + e -> fail("[onResponse] should have been invoked with null value to denote migration in progress") + ); + + exporter.openBulk(bulkListener); + + // wait for it to actually respond + assertTrue(awaitResponseAndClose.await(15, TimeUnit.SECONDS)); + } + } + public void testHttpExporterDirtyResourcesBlock() throws Exception { final Config config = createConfig(Settings.EMPTY); final RestClient client = mock(RestClient.class); @@ -529,8 +565,11 @@ public void testHttpExporterDirtyResourcesBlock() throws Exception { final NodeFailureListener listener = mock(NodeFailureListener.class); // this is configured to throw an error when the resource is checked final HttpResource resource = new MockHttpResource(exporterName(), true, null, false); + final HttpResource alertsResource = new MockHttpResource(exporterName(), false, null, false); + final MonitoringMigrationCoordinator migrationCoordinator = new MonitoringMigrationCoordinator(); - try (HttpExporter exporter = new HttpExporter(config, client, sniffer, threadContext, listener, resource)) { + try (HttpExporter exporter = new HttpExporter(config, client, sniffer, threadContext, migrationCoordinator, listener, resource, + alertsResource)) { verify(listener).setResource(resource); final CountDownLatch awaitResponseAndClose = new CountDownLatch(1); @@ -553,8 +592,11 @@ public void testHttpExporterReturnsNullForOpenBulkIfNotReady() throws Exception final NodeFailureListener listener = mock(NodeFailureListener.class); // always has to check, and never succeeds checks but it does not throw an exception (e.g., version check fails) final HttpResource resource = new MockHttpResource(exporterName(), true, false, false); + final HttpResource alertsResource = new MockHttpResource(exporterName(), false, null, false); + final MonitoringMigrationCoordinator migrationCoordinator = new MonitoringMigrationCoordinator(); - try (HttpExporter exporter = new HttpExporter(config, client, sniffer, threadContext, listener, resource)) { + try (HttpExporter exporter = new HttpExporter(config, client, sniffer, threadContext, migrationCoordinator, listener, resource, + alertsResource)) { verify(listener).setResource(resource); final CountDownLatch awaitResponseAndClose = new CountDownLatch(1); @@ -581,8 +623,11 @@ public void testHttpExporter() throws Exception { final NodeFailureListener listener = mock(NodeFailureListener.class); // sometimes dirty to start with and sometimes not; but always succeeds on checkAndPublish final HttpResource resource = new MockHttpResource(exporterName(), randomBoolean()); + final HttpResource alertsResource = new MockHttpResource(exporterName(), false, null, false); + final MonitoringMigrationCoordinator migrationCoordinator = new MonitoringMigrationCoordinator(); - try (HttpExporter exporter = new HttpExporter(config, client, sniffer, threadContext, listener, resource)) { + try (HttpExporter exporter = new HttpExporter(config, client, sniffer, threadContext, migrationCoordinator, listener, resource, + alertsResource)) { verify(listener).setResource(resource); final CountDownLatch awaitResponseAndClose = new CountDownLatch(1); @@ -608,6 +653,8 @@ public void testHttpExporterShutdown() throws Exception { final Sniffer sniffer = randomFrom(mock(Sniffer.class), null); final NodeFailureListener listener = mock(NodeFailureListener.class); final MultiHttpResource resource = mock(MultiHttpResource.class); + final HttpResource alertsResource = mock(MultiHttpResource.class); + final MonitoringMigrationCoordinator migrationCoordinator = new MonitoringMigrationCoordinator(); if (sniffer != null && rarely()) { doThrow(new RuntimeException("expected")).when(sniffer).close(); @@ -617,7 +664,7 @@ public void testHttpExporterShutdown() throws Exception { doThrow(randomFrom(new IOException("expected"), new RuntimeException("expected"))).when(client).close(); } - new HttpExporter(config, client, sniffer, threadContext, listener, resource).close(); + new HttpExporter(config, client, sniffer, threadContext, migrationCoordinator, listener, resource, alertsResource).close(); // order matters; sniffer must close first if (sniffer != null) { diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpResourceTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpResourceTests.java index dfb71e4e0488b..52139e8534c3a 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpResourceTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpResourceTests.java @@ -10,10 +10,13 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.RestClient; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.monitoring.exporter.http.HttpResource.ResourcePublishResult; import java.util.function.Supplier; import static org.elasticsearch.xpack.monitoring.exporter.http.AsyncHttpResourceHelper.mockBooleanActionListener; +import static org.elasticsearch.xpack.monitoring.exporter.http.AsyncHttpResourceHelper.mockPublishResultActionListener; +import static org.elasticsearch.xpack.monitoring.exporter.http.AsyncHttpResourceHelper.wrapMockListener; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -30,8 +33,8 @@ public class HttpResourceTests extends ESTestCase { public void testConstructorRequiresOwner() { expectThrows(NullPointerException.class, () -> new HttpResource(null) { @Override - protected void doCheckAndPublish(RestClient client, ActionListener listener) { - listener.onResponse(false); + protected void doCheckAndPublish(RestClient client, ActionListener listener) { + listener.onResponse(ResourcePublishResult.notReady("always false")); } }); } @@ -39,8 +42,8 @@ protected void doCheckAndPublish(RestClient client, ActionListener list public void testConstructor() { final HttpResource resource = new HttpResource(owner) { @Override - protected void doCheckAndPublish(RestClient client, ActionListener listener) { - listener.onResponse(false); + protected void doCheckAndPublish(RestClient client, ActionListener listener) { + listener.onResponse(ResourcePublishResult.notReady("always false")); } }; @@ -52,8 +55,8 @@ public void testConstructorDirtiness() { final boolean dirty = randomBoolean(); final HttpResource resource = new HttpResource(owner, dirty) { @Override - protected void doCheckAndPublish(RestClient client, ActionListener listener) { - listener.onResponse(false); + protected void doCheckAndPublish(RestClient client, ActionListener listener) { + listener.onResponse(ResourcePublishResult.notReady("always false")); } }; @@ -62,7 +65,7 @@ protected void doCheckAndPublish(RestClient client, ActionListener list } public void testDirtiness() { - final ActionListener listener = mockBooleanActionListener(); + final ActionListener listener = mockPublishResultActionListener(); // MockHttpResponse always succeeds for checkAndPublish final HttpResource resource = new MockHttpResource(owner); @@ -75,17 +78,18 @@ public void testDirtiness() { // if this fails, then the mocked resource needs to be fixed resource.checkAndPublish(client, listener); - verify(listener).onResponse(true); + verify(listener).onResponse(ResourcePublishResult.ready()); assertFalse(resource.isDirty()); } public void testCheckAndPublish() { - final ActionListener listener = mockBooleanActionListener(); - final boolean expected = randomBoolean(); + final ActionListener listener = mockPublishResultActionListener(); + final ResourcePublishResult expected = randomBoolean() ? ResourcePublishResult.ready() : ResourcePublishResult + .notReady("test unready"); // the default dirtiness should be irrelevant; it should always be run! final HttpResource resource = new HttpResource(owner) { @Override - protected void doCheckAndPublish(RestClient client, ActionListener listener) { + protected void doCheckAndPublish(RestClient client, ActionListener listener) { listener.onResponse(expected); } }; @@ -96,25 +100,25 @@ protected void doCheckAndPublish(RestClient client, ActionListener list } public void testCheckAndPublishEvenWhenDirty() { - final ActionListener listener1 = mockBooleanActionListener(); - final ActionListener listener2 = mockBooleanActionListener(); + final ActionListener listener1 = mockPublishResultActionListener(); + final ActionListener listener2 = mockPublishResultActionListener(); @SuppressWarnings("unchecked") - final Supplier supplier = mock(Supplier.class); - when(supplier.get()).thenReturn(true, false); + final Supplier supplier = mock(Supplier.class); + when(supplier.get()).thenReturn(ResourcePublishResult.ready(), ResourcePublishResult.notReady("test unready")); final HttpResource resource = new HttpResource(owner) { @Override - protected void doCheckAndPublish(RestClient client, ActionListener listener) { + protected void doCheckAndPublish(RestClient client, ActionListener listener) { listener.onResponse(supplier.get()); } }; assertTrue(resource.isDirty()); resource.checkAndPublish(client, listener1); - verify(listener1).onResponse(true); + verify(listener1).onResponse(ResourcePublishResult.ready()); assertFalse(resource.isDirty()); resource.checkAndPublish(client, listener2); - verify(listener2).onResponse(false); + verify(listener2).onResponse(ResourcePublishResult.notReady("test unready")); verify(supplier, times(2)).get(); } @@ -141,13 +145,13 @@ public void testCheckAndPublishIfDirtyFalseWhileChecking() throws InterruptedExc // the default dirtiness should be irrelevant; it should always be run! final HttpResource resource = new HttpResource(owner) { @Override - protected void doCheckAndPublish(RestClient client, ActionListener listener) { + protected void doCheckAndPublish(RestClient client, ActionListener listener) { // wait until the second check has had a chance to run to completion, // then respond here final Thread thread = new Thread(() -> { try { assertTrue(secondCheck.await(15, TimeUnit.SECONDS)); - listener.onResponse(response); + listener.onResponse(response ? ResourcePublishResult.ready() : ResourcePublishResult.notReady("test unready")); } catch (InterruptedException e) { listener.onFailure(e); } @@ -158,7 +162,7 @@ protected void doCheckAndPublish(RestClient client, ActionListener list } }; - resource.checkAndPublishIfDirty(client, listener); + resource.checkAndPublishIfDirty(client, wrapMockListener(listener)); resource.checkAndPublishIfDirty(client, checkingListener); assertTrue(firstCheck.await(15, TimeUnit.SECONDS)); @@ -171,21 +175,21 @@ public void testCheckAndPublishIfDirty() { final ActionListener listener1 = mockBooleanActionListener(); final ActionListener listener2 = mockBooleanActionListener(); @SuppressWarnings("unchecked") - final Supplier supplier = mock(Supplier.class); - when(supplier.get()).thenReturn(true, false); + final Supplier supplier = mock(Supplier.class); + when(supplier.get()).thenReturn(ResourcePublishResult.ready(), ResourcePublishResult.notReady("test unready")); final HttpResource resource = new HttpResource(owner) { @Override - protected void doCheckAndPublish(RestClient client, ActionListener listener) { + protected void doCheckAndPublish(RestClient client, ActionListener listener) { listener.onResponse(supplier.get()); } }; assertTrue(resource.isDirty()); - resource.checkAndPublishIfDirty(client, listener1); + resource.checkAndPublishIfDirty(client, wrapMockListener(listener1)); verify(listener1).onResponse(true); assertFalse(resource.isDirty()); - resource.checkAndPublishIfDirty(client, listener2); + resource.checkAndPublishIfDirty(client, wrapMockListener(listener2)); verify(listener2).onResponse(true); // once is the default! diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/MockHttpResource.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/MockHttpResource.java index 57f6f96ca83be..0ca90f6dde4ef 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/MockHttpResource.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/MockHttpResource.java @@ -112,7 +112,7 @@ protected void doCheck(final RestClient client, final ActionListener li } @Override - protected void doPublish(final RestClient client, final ActionListener listener) { + protected void doPublish(final RestClient client, final ActionListener listener) { assert client != null; ++published; @@ -121,7 +121,7 @@ protected void doPublish(final RestClient client, final ActionListener if (publish == null) { listener.onFailure(new RuntimeException("TEST - expected")); } else { - listener.onResponse(publish); + listener.onResponse(new ResourcePublishResult(publish)); } } diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/MultiHttpResourceTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/MultiHttpResourceTests.java index 5a20733a9e9d5..ceb848e1192d7 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/MultiHttpResourceTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/MultiHttpResourceTests.java @@ -8,12 +8,13 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.RestClient; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.monitoring.exporter.http.HttpResource.ResourcePublishResult; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import static org.elasticsearch.xpack.monitoring.exporter.http.AsyncHttpResourceHelper.mockBooleanActionListener; +import static org.elasticsearch.xpack.monitoring.exporter.http.AsyncHttpResourceHelper.mockPublishResultActionListener; import static org.hamcrest.Matchers.equalTo; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; @@ -26,15 +27,15 @@ public class MultiHttpResourceTests extends ESTestCase { private final String owner = getClass().getSimpleName(); private final RestClient client = mock(RestClient.class); - private final ActionListener listener = mockBooleanActionListener(); + private final ActionListener publishListener = mockPublishResultActionListener(); public void testDoCheckAndPublish() { final List allResources = successfulResources(); final MultiHttpResource multiResource = new MultiHttpResource(owner, allResources); - multiResource.doCheckAndPublish(client, listener); + multiResource.doCheckAndPublish(client, publishListener); - verify(listener).onResponse(true); + verify(publishListener).onResponse(ResourcePublishResult.ready()); for (final MockHttpResource resource : allResources) { assertSuccessfulResource(resource); @@ -54,12 +55,12 @@ public void testDoCheckAndPublishShortCircuits() { final MultiHttpResource multiResource = new MultiHttpResource(owner, allResources); - multiResource.doCheckAndPublish(client, listener); + multiResource.doCheckAndPublish(client, publishListener); if (check == null) { - verify(listener).onFailure(any(Exception.class)); + verify(publishListener).onFailure(any(Exception.class)); } else { - verify(listener).onResponse(false); + verify(publishListener).onResponse(new ResourcePublishResult(false)); } boolean found = false; diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/PublishableHttpResourceTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/PublishableHttpResourceTests.java index 4f511fd130f8f..466d2dbbea193 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/PublishableHttpResourceTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/PublishableHttpResourceTests.java @@ -22,12 +22,14 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.xpack.monitoring.exporter.http.HttpResource.ResourcePublishResult; import org.mockito.ArgumentCaptor; import java.io.IOException; import java.util.function.Supplier; import static org.elasticsearch.xpack.monitoring.exporter.http.AsyncHttpResourceHelper.whenPerformRequestAsyncWith; +import static org.elasticsearch.xpack.monitoring.exporter.http.AsyncHttpResourceHelper.wrapMockListener; import static org.hamcrest.Matchers.is; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; @@ -107,11 +109,11 @@ public void testVersionCheckForResourceUnexpectedResponse() { whenPerformRequestAsyncWith(client, request, response); - resource.versionCheckForResource(client, listener, logger, + resource.versionCheckForResource(client, wrapMockListener(checkListener), logger, resourceBasePath, resourceName, resourceType, owner, ownerType, xContent, minimumVersion); - verifyListener(null); + verifyCheckListener(null); verify(logger).trace("checking if {} [{}] exists on the [{}] {}", resourceType, resourceName, owner, ownerType); verify(client).performRequestAsync(eq(request), any(ResponseListener.class)); verify(logger).error(any(org.apache.logging.log4j.util.Supplier.class), any(ResponseException.class)); @@ -131,11 +133,11 @@ public void testVersionCheckForResourceMalformedResponse() { whenPerformRequestAsyncWith(client, request, response); - resource.versionCheckForResource(client, listener, logger, + resource.versionCheckForResource(client, wrapMockListener(checkListener), logger, resourceBasePath, resourceName, resourceType, owner, ownerType, xContent, minimumVersion); - verifyListener(null); + verifyCheckListener(null); verify(logger).trace("checking if {} [{}] exists on the [{}] {}", resourceType, resourceName, owner, ownerType); verify(logger).debug("{} [{}] found on the [{}] {}", resourceType, resourceName, owner, ownerType); verify(client).performRequestAsync(eq(request), any(ResponseListener.class)); @@ -183,9 +185,10 @@ public void testPutResourceFalseWithException() { whenPerformRequestAsyncWith(client, request, e); final Map parameters = Collections.emptyMap(); - resource.putResource(client, listener, logger, resourceBasePath, resourceName, parameters, body, resourceType, owner, ownerType); + resource.putResource(client, wrapMockListener(publishListener), logger, resourceBasePath, resourceName, parameters, body, + resourceType, owner, ownerType); - verifyListener(null); + verifyPublishListener(null); verify(logger).trace("uploading {} [{}] to the [{}] {}", resourceType, resourceName, owner, ownerType); verify(client).performRequestAsync(eq(request), any(ResponseListener.class)); @@ -215,9 +218,10 @@ public void testDeleteResourceErrors() { whenPerformRequestAsyncWith(client, request, e); - resource.deleteResource(client, listener, logger, resourceBasePath, resourceName, resourceType, owner, ownerType); + resource.deleteResource(client, wrapMockListener(checkListener), logger, resourceBasePath, resourceName, resourceType, owner, + ownerType); - verifyListener(null); + verifyCheckListener(null); verify(logger).trace("deleting {} [{}] from the [{}] {}", resourceType, resourceName, owner, ownerType); verify(client).performRequestAsync(eq(request), any(ResponseListener.class)); @@ -234,9 +238,9 @@ public void testDoCheckAndPublishIgnoresPublishWhenCheckErrors() { final PublishableHttpResource resource = new MockHttpResource(owner, masterTimeout, PublishableHttpResource.NO_BODY_PARAMETERS, null, true); - resource.doCheckAndPublish(client, listener); + resource.doCheckAndPublish(client, wrapMockListener(publishListener)); - verifyListener(null); + verifyPublishListener(null); } public void testDoCheckAndPublish() { @@ -247,9 +251,9 @@ public void testDoCheckAndPublish() { final PublishableHttpResource resource = new MockHttpResource(owner, masterTimeout, PublishableHttpResource.NO_BODY_PARAMETERS, exists, publish); - resource.doCheckAndPublish(client, listener); + resource.doCheckAndPublish(client, wrapMockListener(publishListener)); - verifyListener(exists || publish); + verifyPublishListener(new ResourcePublishResult(exists || publish)); } public void testShouldReplaceResourceRethrowsIOException() throws IOException { @@ -343,7 +347,7 @@ private void assertVersionCheckForResource(final RestStatus status, final Boolea whenPerformRequestAsyncWith(client, request, response); - resource.versionCheckForResource(client, listener, logger, + resource.versionCheckForResource(client, wrapMockListener(checkListener), logger, resourceBasePath, resourceName, resourceType, owner, ownerType, xContent, minimumVersion); @@ -362,7 +366,7 @@ private void assertVersionCheckForResource(final RestStatus status, final Boolea verify(response).getEntity(); } - verifyListener(expected); + verifyCheckListener(expected); verify(logger).debug(debugLogMessage, resourceType, resourceName, owner, ownerType); verifyNoMoreInteractions(client, response, logger); @@ -378,9 +382,10 @@ private void assertPutResource(final RestStatus status, final boolean errorFree) whenPerformRequestAsyncWith(client, request, response); final Map parameters = Collections.emptyMap(); - resource.putResource(client, listener, logger, resourceBasePath, resourceName, parameters, body, resourceType, owner, ownerType); + resource.putResource(client, wrapMockListener(publishListener), logger, resourceBasePath, resourceName, parameters, body, + resourceType, owner, ownerType); - verifyListener(errorFree ? true : null); + verifyPublishListener(errorFree ? ResourcePublishResult.ready() : null); verify(client).performRequestAsync(eq(request), any(ResponseListener.class)); verify(response).getStatusLine(); @@ -414,9 +419,9 @@ private void assertCheckForResource(final RestClient client, final Logger logger when(dneResponseChecker.apply(response)).thenReturn(false == expected); } - resource.checkForResource(client, listener, logger, resourceBasePath, resourceName, resourceType, owner, ownerType, - PublishableHttpResource.GET_EXISTS, PublishableHttpResource.GET_DOES_NOT_EXIST, - responseChecker, dneResponseChecker); + resource.checkForResource(client, wrapMockListener(checkListener), logger, resourceBasePath, resourceName, resourceType, owner, + ownerType, PublishableHttpResource.GET_EXISTS, PublishableHttpResource.GET_DOES_NOT_EXIST, + responseChecker, dneResponseChecker); if (expected == Boolean.TRUE) { verify(responseChecker).apply(response); @@ -428,7 +433,7 @@ private void assertCheckForResource(final RestClient client, final Logger logger verifyZeroInteractions(responseChecker, dneResponseChecker); } - verifyListener(expected); + verifyCheckListener(expected); } private void assertDeleteResource(final RestStatus status, final boolean expected) { @@ -440,7 +445,8 @@ private void assertDeleteResource(final RestStatus status, final boolean expecte whenPerformRequestAsyncWith(client, request, response); - resource.deleteResource(client, listener, logger, resourceBasePath, resourceName, resourceType, owner, ownerType); + resource.deleteResource(client, wrapMockListener(checkListener), logger, resourceBasePath, resourceName, resourceType, owner, + ownerType); verify(client).performRequestAsync(eq(request), any(ResponseListener.class)); verify(response).getStatusLine(); @@ -449,7 +455,7 @@ private void assertDeleteResource(final RestStatus status, final boolean expecte if (expected) { verify(logger).debug("{} [{}] deleted from the [{}] {}", resourceType, resourceName, owner, ownerType); - verifyListener(true); + verifyCheckListener(true); } else { ArgumentCaptor e = ArgumentCaptor.forClass(RuntimeException.class); @@ -457,7 +463,7 @@ private void assertDeleteResource(final RestStatus status, final boolean expecte assertThat(e.getValue().getMessage(), is("[" + resourceBasePath + "/" + resourceName + "] responded with [" + status.getStatus() + "]")); - verifyListener(null); + verifyCheckListener(null); } verifyNoMoreInteractions(client, response, logger, entity); diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/VersionHttpResourceTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/VersionHttpResourceTests.java index 9fab23c7c4040..2092fef80267e 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/VersionHttpResourceTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/VersionHttpResourceTests.java @@ -14,10 +14,11 @@ import org.elasticsearch.client.RestClient; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; +import org.elasticsearch.xpack.monitoring.exporter.http.HttpResource.ResourcePublishResult; import java.io.IOException; -import static org.elasticsearch.xpack.monitoring.exporter.http.AsyncHttpResourceHelper.mockBooleanActionListener; +import static org.elasticsearch.xpack.monitoring.exporter.http.AsyncHttpResourceHelper.mockPublishResultActionListener; import static org.elasticsearch.xpack.monitoring.exporter.http.AsyncHttpResourceHelper.whenPerformRequestAsyncWith; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; @@ -31,7 +32,7 @@ public class VersionHttpResourceTests extends ESTestCase { private final String owner = getClass().getSimpleName(); private final RestClient client = mock(RestClient.class); - private final ActionListener listener = mockBooleanActionListener(); + private final ActionListener publishListener = mockPublishResultActionListener(); public void testDoCheckAndPublishSuccess() { final Version minimumVersion = VersionUtils.randomVersion(random()); @@ -40,9 +41,9 @@ public void testDoCheckAndPublishSuccess() { final VersionHttpResource resource = new VersionHttpResource(owner, minimumVersion); - resource.doCheckAndPublish(client, listener); + resource.doCheckAndPublish(client, publishListener); - verify(listener).onResponse(true); + verify(publishListener).onResponse(ResourcePublishResult.ready()); verify(response).getEntity(); } @@ -52,9 +53,9 @@ public void testDoCheckAndPublishFailedParsing() { final VersionHttpResource resource = new VersionHttpResource(owner, Version.CURRENT); - resource.doCheckAndPublish(client, listener); + resource.doCheckAndPublish(client, publishListener); - verify(listener).onFailure(any(Exception.class)); + verify(publishListener).onFailure(any(Exception.class)); verify(response).getEntity(); } @@ -64,9 +65,9 @@ public void testDoCheckAndPublishFailedFieldMissing() { final VersionHttpResource resource = new VersionHttpResource(owner, Version.CURRENT); - resource.doCheckAndPublish(client, listener); + resource.doCheckAndPublish(client, publishListener); - verify(listener).onFailure(any(Exception.class)); + verify(publishListener).onFailure(any(Exception.class)); verify(response).getEntity(); } @@ -76,9 +77,9 @@ public void testDoCheckAndPublishFailedFieldWrongType() { final VersionHttpResource resource = new VersionHttpResource(owner, Version.CURRENT); - resource.doCheckAndPublish(client, listener); + resource.doCheckAndPublish(client, publishListener); - verify(listener).onFailure(any(Exception.class)); + verify(publishListener).onFailure(any(Exception.class)); verify(response).getEntity(); } @@ -90,9 +91,9 @@ public void testDoCheckAndPublishFailedWithIOException() { final VersionHttpResource resource = new VersionHttpResource(owner, Version.CURRENT); - resource.doCheckAndPublish(client, listener); + resource.doCheckAndPublish(client, publishListener); - verify(listener).onFailure(any(Exception.class)); + verify(publishListener).onFailure(any(Exception.class)); } private Response responseForJSON(final String json) { diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/WatcherExistsHttpResourceTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/WatcherExistsHttpResourceTests.java index 522fc7368a207..465b3eb7c0774 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/WatcherExistsHttpResourceTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/WatcherExistsHttpResourceTests.java @@ -14,9 +14,11 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.xpack.monitoring.exporter.http.HttpResource.ResourcePublishResult; import java.util.Map; +import static org.elasticsearch.xpack.monitoring.exporter.http.AsyncHttpResourceHelper.wrapMockListener; import static org.elasticsearch.xpack.monitoring.exporter.http.PublishableHttpResource.GET_EXISTS; import static org.elasticsearch.xpack.monitoring.exporter.http.WatcherExistsHttpResource.XPACK_DOES_NOT_EXIST; import static org.hamcrest.Matchers.is; @@ -40,9 +42,9 @@ public class WatcherExistsHttpResourceTests extends AbstractPublishableHttpResou public void testDoCheckIgnoresClientWhenNotElectedMaster() { whenNotElectedMaster(); - resource.doCheck(client, listener); + resource.doCheck(client, wrapMockListener(checkListener)); - verify(listener).onResponse(true); + verify(checkListener).onResponse(true); verifyZeroInteractions(client); } @@ -140,9 +142,9 @@ public void testDoPublishTrue() { final MultiHttpResource watches = new MultiHttpResource(owner, Collections.singletonList(mockWatch)); final WatcherExistsHttpResource resource = new WatcherExistsHttpResource(owner, clusterService, watches); - resource.doPublish(client, listener); + resource.doPublish(client, wrapMockListener(publishListener)); - verifyListener(true); + verifyPublishListener(ResourcePublishResult.ready()); assertThat(mockWatch.checked, is(1)); assertThat(mockWatch.published, is(publish ? 1 : 0)); @@ -153,9 +155,9 @@ public void testDoPublishFalse() { final MultiHttpResource watches = new MultiHttpResource(owner, Collections.singletonList(mockWatch)); final WatcherExistsHttpResource resource = new WatcherExistsHttpResource(owner, clusterService, watches); - resource.doPublish(client, listener); + resource.doPublish(client, wrapMockListener(publishListener)); - verifyListener(false); + verifyPublishListener(new ResourcePublishResult(false)); assertThat(mockWatch.checked, is(1)); assertThat(mockWatch.published, is(1)); @@ -166,9 +168,9 @@ public void testDoPublishException() { final MultiHttpResource watches = new MultiHttpResource(owner, Collections.singletonList(mockWatch)); final WatcherExistsHttpResource resource = new WatcherExistsHttpResource(owner, clusterService, watches); - resource.doPublish(client, listener); + resource.doPublish(client, wrapMockListener(publishListener)); - verifyListener(null); + verifyPublishListener(null); assertThat(mockWatch.checked, is(1)); assertThat(mockWatch.published, is(1)); diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterIntegTestCase.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterIntegTestCase.java index af5c86ab0b085..6473f7395eb49 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterIntegTestCase.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterIntegTestCase.java @@ -13,6 +13,7 @@ import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.monitoring.cleaner.CleanerService; import org.elasticsearch.xpack.monitoring.exporter.Exporter; +import org.elasticsearch.xpack.monitoring.exporter.MonitoringMigrationCoordinator; import org.elasticsearch.xpack.monitoring.test.MonitoringIntegTestCase; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -63,7 +64,7 @@ protected Settings nodeSettings(int nodeOrdinal) { * @return Never {@code null}. */ protected LocalExporter createLocalExporter() { - return createLocalExporter(exporterName, null); + return createLocalExporter(exporterName, null, new MonitoringMigrationCoordinator()); } /** @@ -78,12 +79,16 @@ protected LocalExporter createLocalExporter() { * @return Never {@code null}. */ protected LocalExporter createLocalExporter(String exporterName, Settings exporterSettings) { + return createLocalExporter(exporterName, exporterSettings, new MonitoringMigrationCoordinator()); + } + + protected LocalExporter createLocalExporter(String exporterName, Settings exporterSettings, + MonitoringMigrationCoordinator coordinator) { final XPackLicenseState licenseState = TestUtils.newTestLicenseState(); final Exporter.Config config = new Exporter.Config(exporterName, "local", exporterSettings, clusterService(), licenseState); final CleanerService cleanerService = new CleanerService(exporterSettings, clusterService().getClusterSettings(), THREADPOOL, licenseState); - - return new LocalExporter(config, client(), cleanerService); + return new LocalExporter(config, client(), coordinator, cleanerService); } } diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterResourceIntegTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterResourceIntegTests.java index 44c74c7cbfcb2..2b7e00b042850 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterResourceIntegTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterResourceIntegTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringTemplateUtils; import org.elasticsearch.xpack.core.watcher.transport.actions.put.PutWatchAction; import org.elasticsearch.xpack.monitoring.exporter.ClusterAlertsUtil; +import org.elasticsearch.xpack.monitoring.exporter.MonitoringMigrationCoordinator; import java.io.IOException; import java.util.ArrayList; @@ -94,7 +95,25 @@ public void testRemoveWhenResourcesShouldBeRemoved() throws Exception { assertPipelinesExist(); assertNoWatchesExist(); }); + } + + public void testResourcesBlockedDuringMigration() throws Exception { + putResources(newEnoughVersion()); + + assertResourcesExist(); + waitNoPendingTasksOnAll(); + Settings exporterSettings = Settings.builder().put(localExporterSettings()) + .put("xpack.monitoring.migration.decommission_alerts", true).build(); + + MonitoringMigrationCoordinator coordinator = new MonitoringMigrationCoordinator(); + assertTrue(coordinator.tryBlockInstallationTasks()); + assertFalse(coordinator.canInstall()); + + assertThat(clusterService().state().version(), not(ClusterState.UNKNOWN_VERSION)); + try (LocalExporter exporter = createLocalExporter("decommission_local", exporterSettings, coordinator)) { + assertThat(exporter.isExporterReady(), is(false)); + } } @Override diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterTests.java index 84d143a2c5b18..bb8a8a69b5721 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterTests.java @@ -13,6 +13,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.monitoring.cleaner.CleanerService; import org.elasticsearch.xpack.monitoring.exporter.Exporter; +import org.elasticsearch.xpack.monitoring.exporter.MonitoringMigrationCoordinator; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -24,7 +25,8 @@ public void testLocalExporterRemovesListenersOnClose() { final XPackLicenseState licenseState = mock(XPackLicenseState.class); final Exporter.Config config = new Exporter.Config("name", "type", Settings.EMPTY, clusterService, licenseState); final CleanerService cleanerService = mock(CleanerService.class); - final LocalExporter exporter = new LocalExporter(config, mock(Client.class), cleanerService); + final MonitoringMigrationCoordinator migrationCoordinator = new MonitoringMigrationCoordinator(); + final LocalExporter exporter = new LocalExporter(config, mock(Client.class), migrationCoordinator, cleanerService); verify(clusterService).addListener(exporter); verify(cleanerService).add(exporter); verify(licenseState).addListener(exporter); diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/rest/action/RestMonitoringMigrateAlertsActionTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/rest/action/RestMonitoringMigrateAlertsActionTests.java new file mode 100644 index 0000000000000..1499b7491961c --- /dev/null +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/rest/action/RestMonitoringMigrateAlertsActionTests.java @@ -0,0 +1,60 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.monitoring.rest.action; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestResponse; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.monitoring.action.MonitoringMigrateAlertsResponse; +import org.elasticsearch.xpack.core.monitoring.action.MonitoringMigrateAlertsResponse.ExporterMigrationResult; +import org.elasticsearch.xpack.monitoring.exporter.http.HttpExporter; +import org.elasticsearch.xpack.monitoring.exporter.local.LocalExporter; + +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.startsWith; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class RestMonitoringMigrateAlertsActionTests extends ESTestCase { + + private final RestMonitoringMigrateAlertsAction action = new RestMonitoringMigrateAlertsAction(); + + public void testGetName() { + assertThat(action.getName(), is("monitoring_migrate_alerts")); + } + + public void testSupportsContentStream() { + assertThat(action.supportsContentStream(), is(false)); + } + + public void testRestActionCompletion() throws Exception { + List migrationResults = new ArrayList<>(); + for (int i = 0; i < randomInt(5); i++) { + boolean success = randomBoolean(); + migrationResults.add(new ExporterMigrationResult( + randomAlphaOfLength(10), + randomFrom(LocalExporter.TYPE, HttpExporter.TYPE), + success, + success ? null : new IOException("mock failure") + )); + } + MonitoringMigrateAlertsResponse restResponse = new MonitoringMigrateAlertsResponse(migrationResults); + + final RestChannel channel = mock(RestChannel.class); + when(channel.newBuilder()).thenReturn(JsonXContent.contentBuilder()); + RestResponse response = RestMonitoringMigrateAlertsAction.getRestBuilderListener(channel).buildResponse(restResponse); + + assertThat(response.status(), is(RestStatus.OK)); + assertThat(response.content().utf8ToString(), startsWith("{\"exporters\":[")); + } +} diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index fc38b624d9653..d3e13bcacb721 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -150,6 +150,7 @@ public class Constants { "cluster:admin/xpack/ml/job/validate/detector", "cluster:admin/xpack/ml/upgrade_mode", "cluster:admin/xpack/monitoring/bulk", + "cluster:admin/xpack/monitoring/migrate/alerts", "cluster:admin/xpack/rollup/delete", "cluster:admin/xpack/rollup/put", "cluster:admin/xpack/rollup/start",