From 195c781974bc588a66489f15a1309928dde50586 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Mon, 9 Aug 2021 15:11:41 -0400 Subject: [PATCH 1/5] Only start re-assigning persistent tasks if they are not already being reassigned --- .../persistent/PersistentTasksClusterService.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java index 275bc7ba430d9..ebc989366c343 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java @@ -37,6 +37,7 @@ import java.io.Closeable; import java.util.List; import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; /** @@ -55,6 +56,7 @@ public class PersistentTasksClusterService implements ClusterStateListener, Clos private final EnableAssignmentDecider enableDecider; private final ThreadPool threadPool; private final PeriodicRechecker periodicRechecker; + private final AtomicBoolean reassigningTasks = new AtomicBoolean(false); public PersistentTasksClusterService(Settings settings, PersistentTasksExecutorRegistry registry, ClusterService clusterService, ThreadPool threadPool) { @@ -354,6 +356,9 @@ public void clusterChanged(ClusterChangedEvent event) { * Submit a cluster state update to reassign any persistent tasks that need reassigning */ private void reassignPersistentTasks() { + if (this.reassigningTasks.compareAndSet(false, true) == false) { + return; + } clusterService.submitStateUpdateTask("reassign persistent tasks", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { @@ -362,6 +367,7 @@ public ClusterState execute(ClusterState currentState) { @Override public void onFailure(String source, Exception e) { + reassigningTasks.set(false); logger.warn("failed to reassign persistent tasks", e); if (e instanceof NotMasterException == false) { // There must be a task that's worth rechecking because there was one @@ -373,6 +379,7 @@ public void onFailure(String source, Exception e) { @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + reassigningTasks.set(false); if (isAnyTaskUnassigned(newState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE))) { periodicRechecker.rescheduleIfNecessary(); } From 298fe8e192fc799bb43b9ec5e60a120b1751fec5 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Tue, 10 Aug 2021 08:39:59 -0400 Subject: [PATCH 2/5] adding tests addressing PR comments --- .../PersistentTasksClusterService.java | 2 +- .../PersistentTasksClusterServiceTests.java | 57 ++++- .../xpack/ml/job/JobNodeSelector.java | 22 +- .../xpack/ml/job/JobNodeSelectorTests.java | 204 ++++++++++++++++-- 4 files changed, 249 insertions(+), 36 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java index ebc989366c343..8660cd104cae1 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java @@ -355,7 +355,7 @@ public void clusterChanged(ClusterChangedEvent event) { /** * Submit a cluster state update to reassign any persistent tasks that need reassigning */ - private void reassignPersistentTasks() { + void reassignPersistentTasks() { if (this.reassigningTasks.compareAndSet(false, true) == false) { return; } diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java index 8c5fa95e224ce..a24e9fc605068 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java @@ -50,6 +50,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -69,8 +70,13 @@ import static org.hamcrest.core.Is.is; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; public class PersistentTasksClusterServiceTests extends ESTestCase { @@ -429,7 +435,7 @@ public void testPeriodicRecheck() throws Exception { nonClusterStateCondition = false; boolean shouldSimulateFailure = randomBoolean(); - ClusterService recheckTestClusterService = createRecheckTestClusterService(clusterState, shouldSimulateFailure); + ClusterService recheckTestClusterService = createStateUpdateClusterState(clusterState, shouldSimulateFailure); PersistentTasksClusterService service = createService(recheckTestClusterService, (params, candidateNodes, currentState) -> assignBasedOnNonClusterStateCondition(candidateNodes)); @@ -477,7 +483,7 @@ public void testPeriodicRecheckOffMaster() { ClusterState clusterState = builder.metadata(metadata).nodes(nodes).build(); nonClusterStateCondition = false; - ClusterService recheckTestClusterService = createRecheckTestClusterService(clusterState, false); + ClusterService recheckTestClusterService = createStateUpdateClusterState(clusterState, false); PersistentTasksClusterService service = createService(recheckTestClusterService, (params, candidateNodes, currentState) -> assignBasedOnNonClusterStateCondition(candidateNodes)); @@ -639,7 +645,49 @@ public void testTasksNotAssignedToShuttingDownNodes() { Sets.haveEmptyIntersection(shutdownNodes, nodesWithTasks)); } - private ClusterService createRecheckTestClusterService(ClusterState initialState, boolean shouldSimulateFailure) { + public void testReassignOnlyOnce() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + ClusterState initialState = initialState(); + ClusterState.Builder builder = ClusterState.builder(initialState); + PersistentTasksCustomMetadata.Builder tasks = PersistentTasksCustomMetadata.builder( + initialState.metadata().custom(PersistentTasksCustomMetadata.TYPE) + ); + DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(initialState.nodes()); + addTestNodes(nodes, randomIntBetween(1, 3)); + addTask(tasks, "assign_based_on_non_cluster_state_condition", null); + Metadata.Builder metadata = Metadata.builder(initialState.metadata()).putCustom(PersistentTasksCustomMetadata.TYPE, tasks.build()); + ClusterState clusterState = builder.metadata(metadata).nodes(nodes).build(); + + boolean shouldSimulateFailure = randomBoolean(); + ClusterService recheckTestClusterService = createStateUpdateClusterState(clusterState, shouldSimulateFailure, latch); + PersistentTasksClusterService service = createService( + recheckTestClusterService, + (params, candidateNodes, currentState) -> assignBasedOnNonClusterStateCondition(candidateNodes) + ); + verify(recheckTestClusterService, atLeastOnce()).getClusterSettings(); + verify(recheckTestClusterService, atLeastOnce()).addListener(any()); + Thread t1 = new Thread(service::reassignPersistentTasks); + Thread t2 = new Thread(service::reassignPersistentTasks); + try { + t1.start(); + t2.start(); + assertBusy(() -> verify(recheckTestClusterService, times(1)).submitStateUpdateTask(eq("reassign persistent tasks"), any())); + latch.countDown(); + t1.join(); + t2.join(); + verifyNoMoreInteractions(recheckTestClusterService); + } finally { + latch.countDown(); + t1.join(); + t2.join(); + } + } + + private ClusterService createStateUpdateClusterState(ClusterState initialState, boolean shouldSimulateFailure) { + return createStateUpdateClusterState(initialState, shouldSimulateFailure, null); + } + + private ClusterService createStateUpdateClusterState(ClusterState initialState, boolean shouldSimulateFailure, CountDownLatch await) { AtomicBoolean testFailureNextTime = new AtomicBoolean(shouldSimulateFailure); AtomicReference state = new AtomicReference<>(initialState); ClusterService recheckTestClusterService = mock(ClusterService.class); @@ -651,6 +699,9 @@ private ClusterService createRecheckTestClusterService(ClusterState initialState ClusterStateUpdateTask task = (ClusterStateUpdateTask) invocationOnMock.getArguments()[1]; ClusterState before = state.get(); ClusterState after = task.execute(before); + if (await != null) { + await.await(); + } if (testFailureNextTime.compareAndSet(true, false)) { task.onFailure("testing failure", new RuntimeException("foo")); } else { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java index 8342aa1d77e95..70ee5762f4b03 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java @@ -23,11 +23,11 @@ import java.util.ArrayList; import java.util.Collection; -import java.util.LinkedList; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.TreeMap; import java.util.function.Function; import java.util.stream.Collectors; @@ -156,7 +156,7 @@ public PersistentTasksCustomMetadata.Assignment selectNode(Long estimatedMemoryF logger.debug(reason); return new PersistentTasksCustomMetadata.Assignment(null, reason); } - List reasons = new LinkedList<>(); + Map reasons = new TreeMap<>(); long maxAvailableMemory = Long.MIN_VALUE; DiscoveryNode minLoadedNodeByMemory = null; long requiredMemoryForJob = estimatedMemoryFootprint; @@ -166,7 +166,7 @@ public PersistentTasksCustomMetadata.Assignment selectNode(Long estimatedMemoryF String reason = nodeFilter.apply(node); if (reason != null) { logger.trace(reason); - reasons.add(reason); + reasons.put(node.getName(), reason); continue; } NodeLoad currentLoad = nodeLoadDetector.detectNodeLoad( @@ -180,7 +180,7 @@ public PersistentTasksCustomMetadata.Assignment selectNode(Long estimatedMemoryF if (currentLoad.getError() != null) { reason = createReason(jobId, nodeNameAndMlAttributes(node), currentLoad.getError()); logger.trace(reason); - reasons.add(reason); + reasons.put(node.getName(), reason); continue; } // Assuming the node is eligible at all, check loading @@ -194,7 +194,7 @@ public PersistentTasksCustomMetadata.Assignment selectNode(Long estimatedMemoryF currentLoad.getNumAllocatingJobs(), maxConcurrentJobAllocations); logger.trace(reason); - reasons.add(reason); + reasons.put(node.getName(), reason); continue; } @@ -206,7 +206,7 @@ public PersistentTasksCustomMetadata.Assignment selectNode(Long estimatedMemoryF MAX_OPEN_JOBS_PER_NODE.getKey(), maxNumberOfOpenJobs); logger.trace(reason); - reasons.add(reason); + reasons.put(node.getName(), reason); continue; } @@ -215,7 +215,7 @@ public PersistentTasksCustomMetadata.Assignment selectNode(Long estimatedMemoryF nodeNameAndMlAttributes(node), "This node is not providing accurate information to determine is load by memory."); logger.trace(reason); - reasons.add(reason); + reasons.put(node.getName(),reason); continue; } @@ -224,7 +224,7 @@ public PersistentTasksCustomMetadata.Assignment selectNode(Long estimatedMemoryF nodeNameAndMlAttributes(node), "This node is indicating that it has no native memory for machine learning."); logger.trace(reason); - reasons.add(reason); + reasons.put(node.getName(),reason); continue; } @@ -247,7 +247,7 @@ public PersistentTasksCustomMetadata.Assignment selectNode(Long estimatedMemoryF requiredMemoryForJob, ByteSizeValue.ofBytes(requiredMemoryForJob).toString()); logger.trace(reason); - reasons.add(reason); + reasons.put(node.getName(),reason); continue; } @@ -260,7 +260,7 @@ public PersistentTasksCustomMetadata.Assignment selectNode(Long estimatedMemoryF return createAssignment( estimatedMemoryFootprint, minLoadedNodeByMemory, - reasons, + reasons.values(), maxNodeSize > 0L ? NativeMemoryCalculator.allowedBytesForMl(maxNodeSize, maxMachineMemoryPercent, useAutoMemoryPercentage) : Long.MAX_VALUE); @@ -268,7 +268,7 @@ public PersistentTasksCustomMetadata.Assignment selectNode(Long estimatedMemoryF PersistentTasksCustomMetadata.Assignment createAssignment(long estimatedMemoryUsage, DiscoveryNode minLoadedNode, - List reasons, + Collection reasons, long biggestPossibleJob) { if (minLoadedNode == null) { String explanation = String.join("|", reasons); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java index 04ff19ed6a97a..bde11c77a8019 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java @@ -12,6 +12,7 @@ import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.Randomness; import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.core.Tuple; import org.elasticsearch.common.transport.TransportAddress; @@ -34,9 +35,12 @@ import org.junit.Before; import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; @@ -107,7 +111,11 @@ public void testSelectLeastLoadedMlNodeForAnomalyDetectorJob_maxCapacityCountLim Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id1000", JOB_MEMORY_REQUIREMENT).build(new Date()); - JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), cs.nodes().getAllNodes(), job.getId(), MlTasks.JOB_TASK_NAME, + JobNodeSelector jobNodeSelector = new JobNodeSelector( + cs.build(), + shuffled(cs.nodes().getAllNodes()), + job.getId(), + MlTasks.JOB_TASK_NAME, memoryTracker, 0, node -> nodeFilter(node, job)); PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, @@ -133,7 +141,10 @@ public void testSelectLeastLoadedMlNodeForDataFrameAnalyticsJob_maxCapacityCount String dataFrameAnalyticsId = "data_frame_analytics_id1000"; - JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), cs.nodes().getAllNodes(), dataFrameAnalyticsId, + JobNodeSelector jobNodeSelector = new JobNodeSelector( + cs.build(), + shuffled(cs.nodes().getAllNodes()), + dataFrameAnalyticsId, MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, memoryTracker, 0, node -> TransportStartDataFrameAnalyticsAction.TaskExecutor.nodeFilter(node, createTaskParams(dataFrameAnalyticsId))); PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(maxRunningJobsPerNode, @@ -166,7 +177,11 @@ public void testSelectLeastLoadedMlNodeForAnomalyDetectorJob_maxCapacityMemoryLi Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id1000", JOB_MEMORY_REQUIREMENT).build(new Date()); - JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), cs.nodes().getAllNodes(), job.getId(), MlTasks.JOB_TASK_NAME, + JobNodeSelector jobNodeSelector = new JobNodeSelector( + cs.build(), + shuffled(cs.nodes().getAllNodes()), + job.getId(), + MlTasks.JOB_TASK_NAME, memoryTracker, 0, node -> nodeFilter(node, job)); PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, @@ -195,7 +210,10 @@ public void testSelectLeastLoadedMlNodeForDataFrameAnalyticsJob_givenTaskHasNull String dataFrameAnalyticsId = "data_frame_analytics_id_new"; - JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), cs.nodes().getAllNodes(), dataFrameAnalyticsId, + JobNodeSelector jobNodeSelector = new JobNodeSelector( + cs.build(), + shuffled(cs.nodes().getAllNodes()), + dataFrameAnalyticsId, MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, memoryTracker, 0, node -> TransportStartDataFrameAnalyticsAction.TaskExecutor.nodeFilter(node, createTaskParams(dataFrameAnalyticsId))); PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(maxRunningJobsPerNode, @@ -221,7 +239,11 @@ public void testSelectLeastLoadedMlNodeForAnomalyDetectorJob_firstJobTooBigMemor Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id1000", JOB_MEMORY_REQUIREMENT).build(new Date()); - JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), cs.nodes().getAllNodes(), job.getId(), MlTasks.JOB_TASK_NAME, + JobNodeSelector jobNodeSelector = new JobNodeSelector( + cs.build(), + shuffled(cs.nodes().getAllNodes()), + job.getId(), + MlTasks.JOB_TASK_NAME, memoryTracker, 0, node -> nodeFilter(node, job)); PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, @@ -255,7 +277,10 @@ public void testSelectLeastLoadedMlNodeForDataFrameAnalyticsJob_maxCapacityMemor String dataFrameAnalyticsId = "data_frame_analytics_id1000"; - JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), cs.nodes().getAllNodes(), dataFrameAnalyticsId, + JobNodeSelector jobNodeSelector = new JobNodeSelector( + cs.build(), + shuffled(cs.nodes().getAllNodes()), + dataFrameAnalyticsId, MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, memoryTracker, 0, node -> TransportStartDataFrameAnalyticsAction.TaskExecutor.nodeFilter(node, createTaskParams(dataFrameAnalyticsId))); PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode( @@ -287,7 +312,10 @@ public void testSelectLeastLoadedMlNodeForDataFrameAnalyticsJob_firstJobTooBigMe String dataFrameAnalyticsId = "data_frame_analytics_id1000"; - JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), cs.nodes().getAllNodes(), dataFrameAnalyticsId, + JobNodeSelector jobNodeSelector = new JobNodeSelector( + cs.build(), + shuffled(cs.nodes().getAllNodes()), + dataFrameAnalyticsId, MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, memoryTracker, 0, node -> TransportStartDataFrameAnalyticsAction.TaskExecutor.nodeFilter(node, createTaskParams(dataFrameAnalyticsId))); PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode( @@ -323,7 +351,11 @@ public void testSelectLeastLoadedMlNode_noMlNodes() { Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id2", JOB_MEMORY_REQUIREMENT).build(new Date()); - JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), cs.nodes().getAllNodes(), job.getId(), MlTasks.JOB_TASK_NAME, + JobNodeSelector jobNodeSelector = new JobNodeSelector( + cs.build(), + shuffled(cs.nodes().getAllNodes()), + job.getId(), + MlTasks.JOB_TASK_NAME, memoryTracker, 0, node -> nodeFilter(node, job)); PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode( 20, @@ -365,7 +397,11 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() { Job job6 = BaseMlIntegTestCase.createFareQuoteJob("job_id6", JOB_MEMORY_REQUIREMENT).build(new Date()); ClusterState cs = csBuilder.build(); - JobNodeSelector jobNodeSelector = new JobNodeSelector(cs, cs.nodes().getAllNodes(), job6.getId(), MlTasks.JOB_TASK_NAME, + JobNodeSelector jobNodeSelector = new JobNodeSelector( + cs, + shuffled(cs.nodes().getAllNodes()), + job6.getId(), + MlTasks.JOB_TASK_NAME, memoryTracker, 0, node -> nodeFilter(node, job6)); PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode( 10, @@ -384,7 +420,13 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() { cs = csBuilder.build(); Job job7 = BaseMlIntegTestCase.createFareQuoteJob("job_id7", JOB_MEMORY_REQUIREMENT).build(new Date()); - jobNodeSelector = new JobNodeSelector(cs, cs.nodes().getAllNodes(), job7.getId(), MlTasks.JOB_TASK_NAME, memoryTracker, 0, + jobNodeSelector = new JobNodeSelector( + cs, + shuffled(cs.nodes().getAllNodes()), + job7.getId(), + MlTasks.JOB_TASK_NAME, + memoryTracker, + 0, node -> nodeFilter(node, job7)); result = jobNodeSelector.selectNode(10, 2, @@ -402,7 +444,13 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() { csBuilder = ClusterState.builder(cs); csBuilder.metadata(Metadata.builder(cs.metadata()).putCustom(PersistentTasksCustomMetadata.TYPE, tasks)); cs = csBuilder.build(); - jobNodeSelector = new JobNodeSelector(cs, cs.nodes().getAllNodes(), job7.getId(), MlTasks.JOB_TASK_NAME, memoryTracker, 0, + jobNodeSelector = new JobNodeSelector( + cs, + shuffled(cs.nodes().getAllNodes()), + job7.getId(), + MlTasks.JOB_TASK_NAME, + memoryTracker, + 0, node -> nodeFilter(node, job7)); result = jobNodeSelector.selectNode(10, 2, 30, MAX_JOB_BYTES, false); assertNull("no node selected, because stale task", result.getExecutorNode()); @@ -415,7 +463,13 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() { csBuilder = ClusterState.builder(cs); csBuilder.metadata(Metadata.builder(cs.metadata()).putCustom(PersistentTasksCustomMetadata.TYPE, tasks)); cs = csBuilder.build(); - jobNodeSelector = new JobNodeSelector(cs, cs.nodes().getAllNodes(), job7.getId(), MlTasks.JOB_TASK_NAME, memoryTracker, 0, + jobNodeSelector = new JobNodeSelector( + cs, + shuffled(cs.nodes().getAllNodes()), + job7.getId(), + MlTasks.JOB_TASK_NAME, + memoryTracker, + 0, node -> nodeFilter(node, job7)); result = jobNodeSelector.selectNode(10, 2, 30, MAX_JOB_BYTES, false); assertNull("no node selected, because null state", result.getExecutorNode()); @@ -457,7 +511,11 @@ public void testSelectLeastLoadedMlNode_concurrentOpeningJobsAndStaleFailedJob() Job job7 = BaseMlIntegTestCase.createFareQuoteJob("job_id7", JOB_MEMORY_REQUIREMENT).build(new Date()); // Allocation won't be possible if the stale failed job is treated as opening - JobNodeSelector jobNodeSelector = new JobNodeSelector(cs, cs.nodes().getAllNodes(), job7.getId(), MlTasks.JOB_TASK_NAME, + JobNodeSelector jobNodeSelector = new JobNodeSelector( + cs, + shuffled(cs.nodes().getAllNodes()), + job7.getId(), + MlTasks.JOB_TASK_NAME, memoryTracker, 0, node -> nodeFilter(node, job7)); PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(10, 2, @@ -474,7 +532,13 @@ public void testSelectLeastLoadedMlNode_concurrentOpeningJobsAndStaleFailedJob() csBuilder.metadata(Metadata.builder(cs.metadata()).putCustom(PersistentTasksCustomMetadata.TYPE, tasks)); cs = csBuilder.build(); Job job8 = BaseMlIntegTestCase.createFareQuoteJob("job_id8", JOB_MEMORY_REQUIREMENT).build(new Date()); - jobNodeSelector = new JobNodeSelector(cs, cs.nodes().getAllNodes(), job8.getId(), MlTasks.JOB_TASK_NAME, memoryTracker, 0, + jobNodeSelector = new JobNodeSelector( + cs, + shuffled(cs.nodes().getAllNodes()), + job8.getId(), + MlTasks.JOB_TASK_NAME, + memoryTracker, + 0, node -> nodeFilter(node, job8)); result = jobNodeSelector.selectNode(10, 2, 30, MAX_JOB_BYTES, false); assertNull("no node selected, because OPENING state", result.getExecutorNode()); @@ -508,7 +572,11 @@ public void testSelectLeastLoadedMlNode_noCompatibleJobTypeNodes() { cs.nodes(nodes); metadata.putCustom(PersistentTasksCustomMetadata.TYPE, tasks); cs.metadata(metadata); - JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), cs.nodes().getAllNodes(), job.getId(), MlTasks.JOB_TASK_NAME, + JobNodeSelector jobNodeSelector = new JobNodeSelector( + cs.build(), + shuffled(cs.nodes().getAllNodes()), + job.getId(), + MlTasks.JOB_TASK_NAME, memoryTracker, 0, node -> nodeFilter(node, job)); PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(10, 2, @@ -519,6 +587,71 @@ public void testSelectLeastLoadedMlNode_noCompatibleJobTypeNodes() { assertNull(result.getExecutorNode()); } + public void testSelectLeastLoadedMlNode_reasonsAreInDeterministicOrder() { + Map nodeAttr = new HashMap<>(); + nodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "10"); + nodeAttr.put(MachineLearning.MACHINE_MEMORY_NODE_ATTR, "1000000000"); + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add( + new DiscoveryNode( + "_node_name1", + "_node_id1", + new TransportAddress(InetAddress.getLoopbackAddress(), 9300), + nodeAttr, + Collections.emptySet(), + Version.CURRENT + ) + ) + .add( + new DiscoveryNode( + "_node_name2", + "_node_id2", + new TransportAddress(InetAddress.getLoopbackAddress(), 9301), + nodeAttr, + Collections.emptySet(), + Version.CURRENT + ) + ) + .build(); + + PersistentTasksCustomMetadata.Builder tasksBuilder = PersistentTasksCustomMetadata.builder(); + OpenJobPersistentTasksExecutorTests.addJobTask("incompatible_type_job", "_node_id1", null, tasksBuilder); + PersistentTasksCustomMetadata tasks = tasksBuilder.build(); + + ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); + Metadata.Builder metadata = Metadata.builder(); + + Job job = mock(Job.class); + when(job.getId()).thenReturn("incompatible_type_job"); + when(job.getJobVersion()).thenReturn(Version.CURRENT); + when(job.getJobType()).thenReturn("incompatible_type"); + when(job.getInitialResultsIndexName()).thenReturn("shared"); + + cs.nodes(nodes); + metadata.putCustom(PersistentTasksCustomMetadata.TYPE, tasks); + cs.metadata(metadata); + JobNodeSelector jobNodeSelector = new JobNodeSelector( + cs.build(), + shuffled(cs.nodes().getAllNodes()), + job.getId(), + MlTasks.JOB_TASK_NAME, + memoryTracker, + 0, + node -> nodeFilter(node, job) + ); + PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(10, 2, 30, MAX_JOB_BYTES, false); + assertThat( + result.getExplanation(), + equalTo( + "Not opening job [incompatible_type_job] on node [{_node_name1}{version=8.0.0}], " + + "because this node does not support jobs of type [incompatible_type]|" + + "Not opening job [incompatible_type_job] on node [{_node_name2}{version=8.0.0}], " + + "because this node does not support jobs of type [incompatible_type]" + ) + ); + assertNull(result.getExecutorNode()); + } + public void testSelectLeastLoadedMlNode_noNodesMatchingModelSnapshotMinVersion() { Map nodeAttr = new HashMap<>(); nodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "10"); @@ -544,7 +677,10 @@ public void testSelectLeastLoadedMlNode_noNodesMatchingModelSnapshotMinVersion() cs.nodes(nodes); metadata.putCustom(PersistentTasksCustomMetadata.TYPE, tasks); cs.metadata(metadata); - JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), cs.nodes().getAllNodes(), job.getId(), + JobNodeSelector jobNodeSelector = new JobNodeSelector( + cs.build(), + shuffled(cs.nodes().getAllNodes()), + job.getId(), MlTasks.JOB_TASK_NAME, memoryTracker, 0, node -> nodeFilter(node, job)); PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(10, 2, @@ -578,7 +714,11 @@ public void testSelectLeastLoadedMlNode_jobWithRules() { cs.metadata(metadata); Job job = jobWithRules("job_with_rules"); - JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), cs.nodes().getAllNodes(), job.getId(), MlTasks.JOB_TASK_NAME, + JobNodeSelector jobNodeSelector = new JobNodeSelector( + cs.build(), + shuffled(cs.nodes().getAllNodes()), + job.getId(), + MlTasks.JOB_TASK_NAME, memoryTracker, 0, node -> nodeFilter(node, job)); PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(10, 2, @@ -637,7 +777,11 @@ public void testConsiderLazyAssignmentWithNoLazyNodes() { cs.nodes(nodes); Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id1000", JOB_MEMORY_REQUIREMENT).build(new Date()); - JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), cs.nodes().getAllNodes(), job.getId(), MlTasks.JOB_TASK_NAME, + JobNodeSelector jobNodeSelector = new JobNodeSelector( + cs.build(), + shuffled(cs.nodes().getAllNodes()), + job.getId(), + MlTasks.JOB_TASK_NAME, memoryTracker, 0, node -> nodeFilter(node, job)); PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.considerLazyAssignment(new PersistentTasksCustomMetadata.Assignment(null, "foo")); @@ -657,7 +801,11 @@ public void testConsiderLazyAssignmentWithLazyNodes() { cs.nodes(nodes); Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id1000", JOB_MEMORY_REQUIREMENT).build(new Date()); - JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), cs.nodes().getAllNodes(), job.getId(), MlTasks.JOB_TASK_NAME, + JobNodeSelector jobNodeSelector = new JobNodeSelector( + cs.build(), + shuffled(cs.nodes().getAllNodes()), + job.getId(), + MlTasks.JOB_TASK_NAME, memoryTracker, randomIntBetween(1, 3), node -> nodeFilter(node, job)); PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.considerLazyAssignment(new PersistentTasksCustomMetadata.Assignment(null, "foo")); @@ -680,7 +828,11 @@ public void testMaximumPossibleNodeMemoryTooSmall() { Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id1000", ByteSizeValue.ofMb(10)).build(new Date()); when(memoryTracker.getJobMemoryRequirement(anyString(), eq("job_id1000"))).thenReturn(1000L); - JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), cs.nodes().getAllNodes(), job.getId(), MlTasks.JOB_TASK_NAME, + JobNodeSelector jobNodeSelector = new JobNodeSelector( + cs.build(), + shuffled(cs.nodes().getAllNodes()), + job.getId(), + MlTasks.JOB_TASK_NAME, memoryTracker, randomIntBetween(1, 3), node -> nodeFilter(node, job)); PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, @@ -741,7 +893,11 @@ public void testPerceivedCapacityAndMaxFreeMemory() { Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id2", JOB_MEMORY_REQUIREMENT).build(new Date()); - JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), cs.nodes().getAllNodes(), job.getId(), MlTasks.JOB_TASK_NAME, + JobNodeSelector jobNodeSelector = new JobNodeSelector( + cs.build(), + shuffled(cs.nodes().getAllNodes()), + job.getId(), + MlTasks.JOB_TASK_NAME, memoryTracker, 0, node -> nodeFilter(node, job)); Tuple capacityAndFreeMemory = jobNodeSelector.perceivedCapacityAndMaxFreeMemory( 10, @@ -790,6 +946,12 @@ private ClusterState.Builder fillNodesWithRunningJobs(Map nodeAt return cs; } + static Collection shuffled(Collection nodes) { + List toShuffle = new ArrayList<>(nodes); + Randomness.shuffle(toShuffle); + return toShuffle; + } + static void addDataFrameAnalyticsJobTask(String id, String nodeId, DataFrameAnalyticsState state, PersistentTasksCustomMetadata.Builder builder) { addDataFrameAnalyticsJobTask(id, nodeId, state, builder, false, false); From 246022e06068e7e31363c3ef3c952f9770debf6c Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Tue, 10 Aug 2021 08:57:03 -0400 Subject: [PATCH 3/5] addressing Pr COmments --- .../PersistentTasksClusterServiceTests.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java index a24e9fc605068..79e393bcd5c1f 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java @@ -671,15 +671,17 @@ public void testReassignOnlyOnce() throws Exception { try { t1.start(); t2.start(); - assertBusy(() -> verify(recheckTestClusterService, times(1)).submitStateUpdateTask(eq("reassign persistent tasks"), any())); - latch.countDown(); - t1.join(); - t2.join(); - verifyNoMoreInteractions(recheckTestClusterService); + // Make sure we have at least one reassign check before we count down the latch + assertBusy(() -> verify(recheckTestClusterService, atLeastOnce()).submitStateUpdateTask(eq("reassign persistent tasks"), any())); } finally { latch.countDown(); t1.join(); t2.join(); + service.reassignPersistentTasks(); + // verify that our reassignment is possible again, here we have once from the previous reassignment in the `try` block + // And one from the line above once the other threads have joined + assertBusy(() -> verify(recheckTestClusterService, times(2)).submitStateUpdateTask(eq("reassign persistent tasks"), any())); + verifyNoMoreInteractions(recheckTestClusterService); } } From 57297a81244fd65d11eaf00986c523186077e504 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Tue, 10 Aug 2021 09:20:34 -0400 Subject: [PATCH 4/5] addressing PR comments + style" --- .../PersistentTasksClusterServiceTests.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java index 79e393bcd5c1f..5fb842f88b471 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java @@ -672,17 +672,19 @@ public void testReassignOnlyOnce() throws Exception { t1.start(); t2.start(); // Make sure we have at least one reassign check before we count down the latch - assertBusy(() -> verify(recheckTestClusterService, atLeastOnce()).submitStateUpdateTask(eq("reassign persistent tasks"), any())); + assertBusy( + () -> verify(recheckTestClusterService, atLeastOnce()).submitStateUpdateTask(eq("reassign persistent tasks"), any()) + ); } finally { latch.countDown(); t1.join(); t2.join(); service.reassignPersistentTasks(); - // verify that our reassignment is possible again, here we have once from the previous reassignment in the `try` block - // And one from the line above once the other threads have joined - assertBusy(() -> verify(recheckTestClusterService, times(2)).submitStateUpdateTask(eq("reassign persistent tasks"), any())); - verifyNoMoreInteractions(recheckTestClusterService); } + // verify that our reassignment is possible again, here we have once from the previous reassignment in the `try` block + // And one from the line above once the other threads have joined + assertBusy(() -> verify(recheckTestClusterService, times(2)).submitStateUpdateTask(eq("reassign persistent tasks"), any())); + verifyNoMoreInteractions(recheckTestClusterService); } private ClusterService createStateUpdateClusterState(ClusterState initialState, boolean shouldSimulateFailure) { From 6f1909a62d4a50acbb034f3ea7c8824f02f4c832 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Tue, 10 Aug 2021 13:18:59 -0400 Subject: [PATCH 5/5] improving test rigor --- .../elasticsearch/xpack/ml/job/JobNodeSelectorTests.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java index bde11c77a8019..a670ad4018606 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java @@ -643,9 +643,13 @@ public void testSelectLeastLoadedMlNode_reasonsAreInDeterministicOrder() { assertThat( result.getExplanation(), equalTo( - "Not opening job [incompatible_type_job] on node [{_node_name1}{version=8.0.0}], " + "Not opening job [incompatible_type_job] on node [{_node_name1}{version=" + + Version.CURRENT + + "}], " + "because this node does not support jobs of type [incompatible_type]|" - + "Not opening job [incompatible_type_job] on node [{_node_name2}{version=8.0.0}], " + + "Not opening job [incompatible_type_job] on node [{_node_name2}{version=" + + Version.CURRENT + + "}], " + "because this node does not support jobs of type [incompatible_type]" ) );