-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix geoip index deletion race condition #105367
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,14 +18,12 @@ | |
import org.elasticsearch.cluster.ClusterChangedEvent; | ||
import org.elasticsearch.cluster.ClusterState; | ||
import org.elasticsearch.cluster.ClusterStateListener; | ||
import org.elasticsearch.cluster.metadata.IndexAbstraction; | ||
import org.elasticsearch.cluster.node.DiscoveryNode; | ||
import org.elasticsearch.cluster.service.ClusterService; | ||
import org.elasticsearch.common.settings.Setting; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.core.TimeValue; | ||
import org.elasticsearch.gateway.GatewayService; | ||
import org.elasticsearch.index.Index; | ||
import org.elasticsearch.index.IndexSettings; | ||
import org.elasticsearch.ingest.IngestMetadata; | ||
import org.elasticsearch.ingest.IngestService; | ||
|
@@ -48,7 +46,6 @@ | |
import java.util.concurrent.atomic.AtomicReference; | ||
import java.util.stream.Collectors; | ||
|
||
import static org.elasticsearch.ingest.geoip.GeoIpDownloader.DATABASES_INDEX; | ||
import static org.elasticsearch.ingest.geoip.GeoIpDownloader.GEOIP_DOWNLOADER; | ||
import static org.elasticsearch.ingest.geoip.GeoIpProcessor.Factory.downloadDatabaseOnPipelineCreation; | ||
|
||
|
@@ -366,17 +363,10 @@ private void stopTask(Runnable onFailure) { | |
} | ||
} | ||
); | ||
final GeoIpDownloader geoIpDownloader = this.getCurrentTask(); | ||
persistentTasksService.sendRemoveRequest(GEOIP_DOWNLOADER, ActionListener.runAfter(listener, () -> { | ||
IndexAbstraction databasesAbstraction = clusterService.state().metadata().getIndicesLookup().get(DATABASES_INDEX); | ||
if (databasesAbstraction != null) { | ||
// regardless of whether DATABASES_INDEX is an alias, resolve it to a concrete index | ||
Index databasesIndex = databasesAbstraction.getWriteIndex(); | ||
client.admin().indices().prepareDelete(databasesIndex.getName()).execute(ActionListener.wrap(rr -> {}, e -> { | ||
Throwable t = e instanceof RemoteTransportException ? e.getCause() : e; | ||
if (t instanceof ResourceNotFoundException == false) { | ||
logger.warn("failed to remove " + databasesIndex, e); | ||
} | ||
})); | ||
if (geoIpDownloader != null) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if this needs to be here because the code is set up to be able to stop the task from any node, but the current task is only set on the node that is running it. Check out the clusterChanged method. We're not ensuring that the stopTask method is only run on the master node like we are in the |
||
threadPool.generic().execute(geoIpDownloader::deleteIndex); | ||
} | ||
})); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right beneath this method is the
onCancelled()
method, which runs on the node executing the task (on a GENERIC thread, so we should be fine). The task should not be scheduled again until aftermarkAsCompleted()
is called, so as long as we have a way to avoid running the delete at the same time as the update (synchronized is probably fine?) then the delete will happen always happen on the right node.