From 555b0fc45daa4e8a2b51d08ea5c4ee6138da7152 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Fri, 9 Feb 2024 15:18:44 -0500 Subject: [PATCH 1/3] Re-enable some tests --- .../java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java index 54d465aecda52..99845784f3b02 100644 --- a/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java +++ b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java @@ -212,7 +212,6 @@ public void testInvalidTimestamp() throws Exception { }); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/92888") public void testUpdatedTimestamp() throws Exception { assumeTrue("only test with fixture to have stable results", getEndpoint() != null); testGeoIpDatabasesDownload(); @@ -224,7 +223,6 @@ public void testUpdatedTimestamp() throws Exception { testGeoIpDatabasesDownload(); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/92888") public void testGeoIpDatabasesDownload() throws Exception { putGeoIpPipeline(); updateClusterSettings(Settings.builder().put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), true)); From d686e1db6941785a6b87c3b320d28f3ef424835e Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Fri, 9 Feb 2024 15:21:48 -0500 Subject: [PATCH 2/3] Extract index deletion into a method --- .../ingest/geoip/GeoIpDownloader.java | 18 ++++++++++++++++++ .../geoip/GeoIpDownloaderTaskExecutor.java | 16 +++------------- 2 files changed, 21 insertions(+), 13 deletions(-) diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java index 713e5111853a7..b29fff6329813 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; @@ -18,12 +19,14 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.hash.MessageDigests; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.Index; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.MatchQueryBuilder; import org.elasticsearch.index.query.RangeQueryBuilder; @@ -36,6 +39,7 @@ import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.xcontent.XContentParserConfiguration; import org.elasticsearch.xcontent.XContentType; @@ -323,6 +327,20 @@ private void cleanDatabases() { stats = stats.expiredDatabases((int) expiredDatabases); } + public void deleteIndex() { + IndexAbstraction databasesAbstraction = clusterService.state().metadata().getIndicesLookup().get(DATABASES_INDEX); + if (databasesAbstraction != null) { + // regardless of whether DATABASES_INDEX is an alias, resolve it to a concrete index + Index databasesIndex = databasesAbstraction.getWriteIndex(); + client.admin().indices().prepareDelete(databasesIndex.getName()).execute(ActionListener.wrap(rr -> {}, e -> { + Throwable t = e instanceof RemoteTransportException ? e.getCause() : e; + if (t instanceof ResourceNotFoundException == false) { + logger.warn("failed to remove " + databasesIndex, e); + } + })); + } + } + @Override protected void onCancelled() { if (scheduled != null) { diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java index 322eb0666db07..e5dae30bdd1b9 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java @@ -18,14 +18,12 @@ import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; -import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; import org.elasticsearch.gateway.GatewayService; -import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.ingest.IngestMetadata; import org.elasticsearch.ingest.IngestService; @@ -48,7 +46,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; -import static org.elasticsearch.ingest.geoip.GeoIpDownloader.DATABASES_INDEX; import static org.elasticsearch.ingest.geoip.GeoIpDownloader.GEOIP_DOWNLOADER; import static org.elasticsearch.ingest.geoip.GeoIpProcessor.Factory.downloadDatabaseOnPipelineCreation; @@ -366,17 +363,10 @@ private void stopTask(Runnable onFailure) { } } ); + final GeoIpDownloader geoIpDownloader = this.getCurrentTask(); persistentTasksService.sendRemoveRequest(GEOIP_DOWNLOADER, ActionListener.runAfter(listener, () -> { - IndexAbstraction databasesAbstraction = clusterService.state().metadata().getIndicesLookup().get(DATABASES_INDEX); - if (databasesAbstraction != null) { - // regardless of whether DATABASES_INDEX is an alias, resolve it to a concrete index - Index databasesIndex = databasesAbstraction.getWriteIndex(); - client.admin().indices().prepareDelete(databasesIndex.getName()).execute(ActionListener.wrap(rr -> {}, e -> { - Throwable t = e instanceof RemoteTransportException ? e.getCause() : e; - if (t instanceof ResourceNotFoundException == false) { - logger.warn("failed to remove " + databasesIndex, e); - } - })); + if (geoIpDownloader != null) { + geoIpDownloader.deleteIndex(); } })); } From 8afcae4887c78e5e1e577f36e908a68390506933 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Fri, 9 Feb 2024 15:25:12 -0500 Subject: [PATCH 3/3] Use a threadpool and (super simple) mutex We can probably do better than just 'synchronized' but this is a good WIP version for thinking aloud. --- .../java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java | 4 ++-- .../ingest/geoip/GeoIpDownloaderTaskExecutor.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java index b29fff6329813..0f310f7764047 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java @@ -126,7 +126,7 @@ public class GeoIpDownloader extends AllocatedPersistentTask { } // visible for testing - void updateDatabases() throws IOException { + synchronized void updateDatabases() throws IOException { var clusterState = clusterService.state(); var geoipIndex = clusterState.getMetadata().getIndicesLookup().get(GeoIpDownloader.DATABASES_INDEX); if (geoipIndex != null) { @@ -327,7 +327,7 @@ private void cleanDatabases() { stats = stats.expiredDatabases((int) expiredDatabases); } - public void deleteIndex() { + synchronized void deleteIndex() { IndexAbstraction databasesAbstraction = clusterService.state().metadata().getIndicesLookup().get(DATABASES_INDEX); if (databasesAbstraction != null) { // regardless of whether DATABASES_INDEX is an alias, resolve it to a concrete index diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java index e5dae30bdd1b9..735e8fb1600b2 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java @@ -366,7 +366,7 @@ private void stopTask(Runnable onFailure) { final GeoIpDownloader geoIpDownloader = this.getCurrentTask(); persistentTasksService.sendRemoveRequest(GEOIP_DOWNLOADER, ActionListener.runAfter(listener, () -> { if (geoIpDownloader != null) { - geoIpDownloader.deleteIndex(); + threadPool.generic().execute(geoIpDownloader::deleteIndex); } })); }