From a51a166a3658eefc5699b6edc8375772405d3e34 Mon Sep 17 00:00:00 2001 From: Jialei Date: Wed, 26 Jul 2023 14:43:33 +0800 Subject: [PATCH 1/2] fix(controller): online log with the correct pod --- .../k8s/log/CancellableJobLogK8sCollector.java | 15 ++++++++++++++- .../log/CancellableJobLogK8sCollectorTest.java | 11 ++++++++--- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/server/controller/src/main/java/ai/starwhale/mlops/schedule/k8s/log/CancellableJobLogK8sCollector.java b/server/controller/src/main/java/ai/starwhale/mlops/schedule/k8s/log/CancellableJobLogK8sCollector.java index abc374c4a7..c3e70484e5 100644 --- a/server/controller/src/main/java/ai/starwhale/mlops/schedule/k8s/log/CancellableJobLogK8sCollector.java +++ b/server/controller/src/main/java/ai/starwhale/mlops/schedule/k8s/log/CancellableJobLogK8sCollector.java @@ -45,7 +45,20 @@ private String getPodName(String taskId) throws ApiException { if (podList.getItems().isEmpty()) { throw new ApiException("get empty pod list by job name " + taskId); } - return podList.getItems().get(0).getMetadata().getName(); + // returns the running pod + var thePod = podList.getItems().stream().filter(pod -> { + if (pod.getStatus() == null) { + return false; + } + if (pod.getStatus().getPhase() == null) { + return false; + } + return pod.getStatus().getPhase().equals("Running"); + }).findFirst(); + if (thePod.isEmpty()) { + throw new ApiException("get empty running pod by job name " + taskId); + } + return thePod.get().getMetadata().getName(); } @Override diff --git a/server/controller/src/test/java/ai/starwhale/mlops/schedule/k8s/log/CancellableJobLogK8sCollectorTest.java b/server/controller/src/test/java/ai/starwhale/mlops/schedule/k8s/log/CancellableJobLogK8sCollectorTest.java index 3f309b5a99..b5c9094ebc 100644 --- a/server/controller/src/test/java/ai/starwhale/mlops/schedule/k8s/log/CancellableJobLogK8sCollectorTest.java +++ b/server/controller/src/test/java/ai/starwhale/mlops/schedule/k8s/log/CancellableJobLogK8sCollectorTest.java @@ -20,6 +20,7 @@ import static org.hamcrest.Matchers.is; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -29,6 +30,7 @@ import io.kubernetes.client.openapi.models.V1ObjectMeta; import io.kubernetes.client.openapi.models.V1Pod; import io.kubernetes.client.openapi.models.V1PodList; +import io.kubernetes.client.openapi.models.V1PodStatus; import java.io.ByteArrayInputStream; import java.io.IOException; import java.util.List; @@ -48,8 +50,11 @@ public void setup() { @Test public void testInitAndRead() throws IOException, ApiException { - var podMeta = new V1ObjectMeta().name("1-xxx"); - when(k8sClient.getPodsByJobName("1")).thenReturn(new V1PodList().items(List.of(new V1Pod().metadata(podMeta)))); + var runningPodMeta = new V1ObjectMeta().name("running-pod"); + var runningPod = new V1Pod().metadata(runningPodMeta).status(new V1PodStatus().phase("Running")); + var failedPodMeta = new V1ObjectMeta().name("failed-pod"); + var failedPod = new V1Pod().metadata(failedPodMeta).status(new V1PodStatus().phase("Failed")); + when(k8sClient.getPodsByJobName("1")).thenReturn(new V1PodList().items(List.of(runningPod, failedPod))); var line = "abc"; @@ -60,7 +65,7 @@ public void testInitAndRead() throws IOException, ApiException { when(resp.body()).thenReturn(respBody); when(call.execute()).thenReturn(resp); - when(k8sClient.readLog(anyString(), anyString(), anyBoolean())).thenReturn(call); + when(k8sClient.readLog(eq("running-pod"), anyString(), anyBoolean())).thenReturn(call); var ins = new CancellableJobLogK8sCollector(k8sClient, "1"); assertThat(ins.readLine(), is(line)); From de55cc8a3712dfa3daf781499586cd4c25607736 Mon Sep 17 00:00:00 2001 From: Jialei Date: Wed, 26 Jul 2023 17:04:26 +0800 Subject: [PATCH 2/2] fix ut --- .../k8s/log/CancellableJobLogK8sCollectorFactoryTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/controller/src/test/java/ai/starwhale/mlops/schedule/k8s/log/CancellableJobLogK8sCollectorFactoryTest.java b/server/controller/src/test/java/ai/starwhale/mlops/schedule/k8s/log/CancellableJobLogK8sCollectorFactoryTest.java index a896824934..b9d225bb15 100644 --- a/server/controller/src/test/java/ai/starwhale/mlops/schedule/k8s/log/CancellableJobLogK8sCollectorFactoryTest.java +++ b/server/controller/src/test/java/ai/starwhale/mlops/schedule/k8s/log/CancellableJobLogK8sCollectorFactoryTest.java @@ -26,6 +26,7 @@ import io.kubernetes.client.openapi.models.V1ObjectMeta; import io.kubernetes.client.openapi.models.V1Pod; import io.kubernetes.client.openapi.models.V1PodList; +import io.kubernetes.client.openapi.models.V1PodStatus; import java.io.ByteArrayInputStream; import java.io.IOException; import java.util.List; @@ -39,8 +40,9 @@ public class CancellableJobLogK8sCollectorFactoryTest { @Test public void testMake() throws IOException, ApiException { var k8sClient = mock(K8sClient.class); + var pod = new V1Pod().metadata(new V1ObjectMeta().name("1-xx")).status(new V1PodStatus().phase("Running")); when(k8sClient.getPodsByJobName("1")) - .thenReturn(new V1PodList().items(List.of(new V1Pod().metadata(new V1ObjectMeta().name("1-xx"))))); + .thenReturn(new V1PodList().items(List.of(pod))); var call = mock(Call.class); var resp = mock(Response.class); var respBody = mock(ResponseBody.class);