From 83775bbc615669165c9646894dccba3f2211a8cc Mon Sep 17 00:00:00 2001 From: Aurelien FOUCRET Date: Mon, 8 Apr 2024 16:12:06 +0200 Subject: [PATCH] Do not flush the cache on non-master node. --- .../TrainedModelCacheMetadataService.java | 23 ++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/persistence/TrainedModelCacheMetadataService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/persistence/TrainedModelCacheMetadataService.java index 48dc1fc8a6738..e59a17d789dec 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/persistence/TrainedModelCacheMetadataService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/persistence/TrainedModelCacheMetadataService.java @@ -9,7 +9,9 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateTaskExecutor; import org.elasticsearch.cluster.ClusterStateTaskExecutor.TaskContext; import org.elasticsearch.cluster.ClusterStateTaskListener; @@ -26,19 +28,28 @@ import java.util.HashMap; -public class TrainedModelCacheMetadataService { +public class TrainedModelCacheMetadataService implements ClusterStateListener { private static final Logger LOGGER = LogManager.getLogger(TrainedModelCacheMetadataService.class); private final MasterServiceTaskQueue modelCacheMetadataManagementTaskQueue; + private volatile boolean isMasterNode = false; + public TrainedModelCacheMetadataService(ClusterService clusterService) { this.modelCacheMetadataManagementTaskQueue = clusterService.createTaskQueue( "trained-models-cache-metadata", Priority.IMMEDIATE, new ModelCacheMetadataManagementTaskExecutor() ); + clusterService.addListener(this); } public void deleteCacheMetadataEntry(String modelId, ActionListener listener) { + if (this.isMasterNode == false) { + listener.onResponse(AcknowledgedResponse.FALSE); + return; + } + + ModelCacheMetadataManagementTask deleteModelCacheMetadataTask = new DeleteModelCacheMetadataTask(modelId, listener); this.modelCacheMetadataManagementTaskQueue.submitTask( deleteModelCacheMetadataTask.getDescription(), @@ -48,10 +59,20 @@ public void deleteCacheMetadataEntry(String modelId, ActionListener listener) { + if (this.isMasterNode == false) { + listener.onResponse(AcknowledgedResponse.FALSE); + return; + } + ModelCacheMetadataManagementTask putModelCacheMetadataTask = new PutModelCacheMetadataTask(modelConfig.getModelId(), listener); this.modelCacheMetadataManagementTaskQueue.submitTask(putModelCacheMetadataTask.getDescription(), putModelCacheMetadataTask, null); } + @Override + public void clusterChanged(ClusterChangedEvent event) { + this.isMasterNode = event.localNodeMaster(); + } + private abstract static class ModelCacheMetadataManagementTask implements ClusterStateTaskListener { protected final ActionListener listener;