Skip to content

Commit

Permalink
Do not flush the cache on non-master node.
Browse files Browse the repository at this point in the history
  • Loading branch information
afoucret committed Apr 8, 2024
1 parent 0576b86 commit 83775bb
Showing 1 changed file with 22 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskExecutor.TaskContext;
import org.elasticsearch.cluster.ClusterStateTaskListener;
Expand All @@ -26,19 +28,28 @@

import java.util.HashMap;

public class TrainedModelCacheMetadataService {
public class TrainedModelCacheMetadataService implements ClusterStateListener {
private static final Logger LOGGER = LogManager.getLogger(TrainedModelCacheMetadataService.class);
private final MasterServiceTaskQueue<ModelCacheMetadataManagementTask> modelCacheMetadataManagementTaskQueue;

private volatile boolean isMasterNode = false;

public TrainedModelCacheMetadataService(ClusterService clusterService) {
this.modelCacheMetadataManagementTaskQueue = clusterService.createTaskQueue(
"trained-models-cache-metadata",
Priority.IMMEDIATE,
new ModelCacheMetadataManagementTaskExecutor()
);
clusterService.addListener(this);
}

public void deleteCacheMetadataEntry(String modelId, ActionListener<AcknowledgedResponse> listener) {
if (this.isMasterNode == false) {
listener.onResponse(AcknowledgedResponse.FALSE);
return;
}


ModelCacheMetadataManagementTask deleteModelCacheMetadataTask = new DeleteModelCacheMetadataTask(modelId, listener);
this.modelCacheMetadataManagementTaskQueue.submitTask(
deleteModelCacheMetadataTask.getDescription(),
Expand All @@ -48,10 +59,20 @@ public void deleteCacheMetadataEntry(String modelId, ActionListener<Acknowledged
}

public void saveCacheMetadataEntry(TrainedModelConfig modelConfig, ActionListener<AcknowledgedResponse> listener) {
if (this.isMasterNode == false) {
listener.onResponse(AcknowledgedResponse.FALSE);
return;
}

ModelCacheMetadataManagementTask putModelCacheMetadataTask = new PutModelCacheMetadataTask(modelConfig.getModelId(), listener);
this.modelCacheMetadataManagementTaskQueue.submitTask(putModelCacheMetadataTask.getDescription(), putModelCacheMetadataTask, null);
}

@Override
public void clusterChanged(ClusterChangedEvent event) {
this.isMasterNode = event.localNodeMaster();
}

private abstract static class ModelCacheMetadataManagementTask implements ClusterStateTaskListener {
protected final ActionListener<AcknowledgedResponse> listener;

Expand Down

0 comments on commit 83775bb

Please sign in to comment.