diff --git a/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java index 54d465aecda52..99845784f3b02 100644 --- a/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java +++ b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java @@ -212,7 +212,6 @@ public void testInvalidTimestamp() throws Exception { }); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/92888") public void testUpdatedTimestamp() throws Exception { assumeTrue("only test with fixture to have stable results", getEndpoint() != null); testGeoIpDatabasesDownload(); @@ -224,7 +223,6 @@ public void testUpdatedTimestamp() throws Exception { testGeoIpDatabasesDownload(); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/92888") public void testGeoIpDatabasesDownload() throws Exception { putGeoIpPipeline(); updateClusterSettings(Settings.builder().put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), true)); diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java index 713e5111853a7..0f310f7764047 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; @@ -18,12 +19,14 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.hash.MessageDigests; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.Index; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.MatchQueryBuilder; import org.elasticsearch.index.query.RangeQueryBuilder; @@ -36,6 +39,7 @@ import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.xcontent.XContentParserConfiguration; import org.elasticsearch.xcontent.XContentType; @@ -122,7 +126,7 @@ public class GeoIpDownloader extends AllocatedPersistentTask { } // visible for testing - void updateDatabases() throws IOException { + synchronized void updateDatabases() throws IOException { var clusterState = clusterService.state(); var geoipIndex = clusterState.getMetadata().getIndicesLookup().get(GeoIpDownloader.DATABASES_INDEX); if (geoipIndex != null) { @@ -323,6 +327,20 @@ private void cleanDatabases() { stats = stats.expiredDatabases((int) expiredDatabases); } + synchronized void deleteIndex() { + IndexAbstraction databasesAbstraction = clusterService.state().metadata().getIndicesLookup().get(DATABASES_INDEX); + if (databasesAbstraction != null) { + // regardless of whether DATABASES_INDEX is an alias, resolve it to a concrete index + Index databasesIndex = databasesAbstraction.getWriteIndex(); + client.admin().indices().prepareDelete(databasesIndex.getName()).execute(ActionListener.wrap(rr -> {}, e -> { + Throwable t = e instanceof RemoteTransportException ? e.getCause() : e; + if (t instanceof ResourceNotFoundException == false) { + logger.warn("failed to remove " + databasesIndex, e); + } + })); + } + } + @Override protected void onCancelled() { if (scheduled != null) { diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java index 322eb0666db07..735e8fb1600b2 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java @@ -18,14 +18,12 @@ import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; -import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; import org.elasticsearch.gateway.GatewayService; -import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.ingest.IngestMetadata; import org.elasticsearch.ingest.IngestService; @@ -48,7 +46,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; -import static org.elasticsearch.ingest.geoip.GeoIpDownloader.DATABASES_INDEX; import static org.elasticsearch.ingest.geoip.GeoIpDownloader.GEOIP_DOWNLOADER; import static org.elasticsearch.ingest.geoip.GeoIpProcessor.Factory.downloadDatabaseOnPipelineCreation; @@ -366,17 +363,10 @@ private void stopTask(Runnable onFailure) { } } ); + final GeoIpDownloader geoIpDownloader = this.getCurrentTask(); persistentTasksService.sendRemoveRequest(GEOIP_DOWNLOADER, ActionListener.runAfter(listener, () -> { - IndexAbstraction databasesAbstraction = clusterService.state().metadata().getIndicesLookup().get(DATABASES_INDEX); - if (databasesAbstraction != null) { - // regardless of whether DATABASES_INDEX is an alias, resolve it to a concrete index - Index databasesIndex = databasesAbstraction.getWriteIndex(); - client.admin().indices().prepareDelete(databasesIndex.getName()).execute(ActionListener.wrap(rr -> {}, e -> { - Throwable t = e instanceof RemoteTransportException ? e.getCause() : e; - if (t instanceof ResourceNotFoundException == false) { - logger.warn("failed to remove " + databasesIndex, e); - } - })); + if (geoIpDownloader != null) { + threadPool.generic().execute(geoIpDownloader::deleteIndex); } })); }