From ac7ad1039c6884ddb0fd6493d3ad0010e70d32a8 Mon Sep 17 00:00:00 2001 From: shawkins Date: Wed, 9 Jun 2021 08:39:24 -0400 Subject: [PATCH] making the event logic more uniform --- .../server/mock/KubernetesCrudDispatcher.java | 130 +++++++----------- .../kubernetes/client/mock/PodCrudTest.java | 40 ++++-- 2 files changed, 76 insertions(+), 94 deletions(-) diff --git a/kubernetes-server-mock/src/main/java/io/fabric8/kubernetes/client/server/mock/KubernetesCrudDispatcher.java b/kubernetes-server-mock/src/main/java/io/fabric8/kubernetes/client/server/mock/KubernetesCrudDispatcher.java index a194dce6a74..49d712d7c65 100644 --- a/kubernetes-server-mock/src/main/java/io/fabric8/kubernetes/client/server/mock/KubernetesCrudDispatcher.java +++ b/kubernetes-server-mock/src/main/java/io/fabric8/kubernetes/client/server/mock/KubernetesCrudDispatcher.java @@ -50,10 +50,11 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.stream.Collectors; @@ -125,7 +126,7 @@ public synchronized MockResponse dispatch(RecordedRequest request) { */ @Override public MockResponse handleCreate(String path, String s) { - return validateRequestBodyAndHandleRequest(s, (g) -> doCreateOrModify(path, g, ADDED)); + return validateRequestBodyAndHandleRequest(s, g -> doCreateOrModify(path, g, ADDED)); } /** @@ -135,7 +136,7 @@ public MockResponse handleCreate(String path, String s) { * @return The {@link MockResponse} */ public MockResponse handleReplace(String path, String s) { - return validateRequestBodyAndHandleRequest(s, (g) -> doCreateOrModify(path, g, MODIFIED)); + return validateRequestBodyAndHandleRequest(s, g -> doCreateOrModify(path, g, MODIFIED)); } /** @@ -180,10 +181,17 @@ public MockResponse handleGet(String path) { */ public MockResponse handlePatch(String path, String s, String contentType) { MockResponse response = new MockResponse(); - String body = fetchResource(path); - if (body == null) { + + AttributeSet query = attributeExtractor.fromPath(path); + + Optional> bodyEntry = map.entrySet().stream() + .filter(entry -> entry.getKey().matches(query)) + .findFirst(); + + if (!bodyEntry.isPresent()) { response.setResponseCode(HttpURLConnection.HTTP_NOT_FOUND); } else { + String body = bodyEntry.get().getValue(); try { JsonNode patch = context.getMapper().readTree(s); JsonNode source = context.getMapper().readTree(body); @@ -221,39 +229,11 @@ public MockResponse handlePatch(String path, String s, String contentType) { } String updatedAsString = context.getMapper().writeValueAsString(updated); + GenericKubernetesResource resource = toKubernetesResource(updatedAsString); + GenericKubernetesResource existingResource = toKubernetesResource(body); + setDefaultMetadata(resource, pathValues, existingResource.getMetadata()); - AttributeSet attributeSet; - - AttributeSet query = attributeExtractor.fromPath(path); - - attributeSet = map.entrySet().stream() - .filter(entry -> entry.getKey().matches(query)) - .findFirst().orElseThrow(IllegalStateException::new).getKey(); - - if (body.equals(updatedAsString)) { - response.setResponseCode(HttpURLConnection.HTTP_ACCEPTED); - response.setBody(updatedAsString); - return response; - } - map.remove(attributeSet); - AttributeSet newAttributeSet = AttributeSet.merge(attributeSet, attributeExtractor.fromResource(updatedAsString)); - map.put(newAttributeSet, updatedAsString); - crdProcessor.process(path, updatedAsString, false); - - final AtomicBoolean flag = new AtomicBoolean(false); - AttributeSet finalAttributeSet = attributeSet; - watchEventListeners.stream() - .filter(watchEventsListener -> watchEventsListener.attributeMatches(finalAttributeSet)) - .forEach(watchEventsListener -> { - flag.set(true); - watchEventsListener.sendWebSocketResponse(updatedAsString, MODIFIED); - }); - - if (!flag.get()) { - watchEventListeners.stream() - .filter(watchEventsListener -> watchEventsListener.attributeMatches(newAttributeSet)) - .forEach(watchEventsListener -> watchEventsListener.sendWebSocketResponse(updatedAsString, ADDED)); - } + processEvent(path, bodyEntry.get().getKey(), Serialization.asJson(resource)); response.setResponseCode(HttpURLConnection.HTTP_ACCEPTED); response.setBody(updatedAsString); @@ -272,7 +252,7 @@ public MockResponse handlePatch(String path, String s, String contentType) { */ @Override public MockResponse handleDelete(String path) { - return new MockResponse().setResponseCode(doDelete(path, "DELETED")); + return new MockResponse().setResponseCode(doDelete(path)); } /** @@ -336,41 +316,41 @@ private String fetchResourceNameFromWatchRequestPath(String path) { return name.isEmpty()? null: name; } - private String fetchResource(String path) { - List items = new ArrayList<>(); - AttributeSet query = attributeExtractor.fromPath(path); - - map.entrySet().stream() - .filter(entry -> entry.getKey().matches(query)) - .forEach(entry -> items.add(entry.getValue())); - - if (items.isEmpty()) { - return null; - } else if (items.size() == 1) { - return items.get(0); - } else { - return responseComposer.compose(items); - } - } - - private int doDelete(String path, String event) { + private int doDelete(String path) { List items = findItems(attributeExtractor.fromPath(path)); if (items.isEmpty()) return HttpURLConnection.HTTP_NOT_FOUND; - - items.forEach(item -> { - if (event != null && !event.isEmpty()) { - watchEventListeners.stream() - .filter(listener -> listener.attributeMatches(item)) - .forEach(listener -> listener.sendWebSocketResponse(map.get(item), event)); - } - String existing = map.remove(item); - crdProcessor.process(path, existing, true); - }); + items.forEach(item -> processEvent(path, item, null)); return HttpURLConnection.HTTP_OK; } + private void processEvent(String path, AttributeSet oldAttributes, String newState) { + String existing = map.remove(oldAttributes); + AttributeSet newAttributes = null; + if (newState != null) { + newAttributes = kubernetesAttributesExtractor.fromResource(newState); + map.put(newAttributes, newState); + } + if (!Objects.equals(existing, newState)) { + AttributeSet finalAttributeSet = newAttributes; + watchEventListeners.stream() + .forEach(listener -> { + boolean matchesOld = oldAttributes != null && listener.attributeMatches(oldAttributes); + boolean matchesNew = finalAttributeSet != null && listener.attributeMatches(finalAttributeSet); + if (matchesOld && matchesNew) { + listener.sendWebSocketResponse(newState, MODIFIED); + } else if (matchesOld) { + listener.sendWebSocketResponse(existing, "DELETED"); + } else if (matchesNew) { + listener.sendWebSocketResponse(newState, ADDED); + } + }); + + crdProcessor.process(path, Utils.getNonNullOrElse(newState, existing), newState == null); + } + } + private List findItems(AttributeSet query) { return map.keySet().stream() .filter(entry -> entry.matches(query)) @@ -393,6 +373,7 @@ private MockResponse doCreateOrModify(String path, GenericKubernetesResource val boolean statusSubresource = crdProcessor.isStatusSubresource(pathValues.get(KubernetesAttributesExtractor.KIND)); GenericKubernetesResource updated = Serialization.clone(value); + AttributeSet existingAttributes = null; List items = findItems(attributes); if (items.isEmpty()) { @@ -407,7 +388,8 @@ private MockResponse doCreateOrModify(String path, GenericKubernetesResource val } else if (ADDED.equals(event)) { responseCode = HttpURLConnection.HTTP_CONFLICT; } else if (MODIFIED.equals(event)) { - String existing = map.remove(items.get(0)); + existingAttributes = items.get(0); + String existing = map.get(existingAttributes); GenericKubernetesResource existingResource = toKubernetesResource(existing); Object status = null; if (isStatusPath(path)) { @@ -428,23 +410,11 @@ private MockResponse doCreateOrModify(String path, GenericKubernetesResource val updated.getAdditionalProperties().remove(STATUS); } } - if (existingResource.equals(updated)) { - event = null; // no change - } } if (responseCode == HttpURLConnection.HTTP_OK) { String s = context.getMapper().writeValueAsString(updated); - AttributeSet features = AttributeSet.merge(attributes, kubernetesAttributesExtractor.extract(updated)); - map.put(features, s); // always add back as it was proactively removed - if (event != null && !event.isEmpty()) { - crdProcessor.process(path, s, false); - final String response = s; - final String finalEvent = event; - watchEventListeners.stream() - .filter(listener -> listener.attributeMatches(features)) - .forEach(listener -> listener.sendWebSocketResponse(response, finalEvent)); - } + processEvent(path, existingAttributes, s); mockResponse.setBody(s); } mockResponse.setResponseCode(responseCode); diff --git a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/PodCrudTest.java b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/PodCrudTest.java index 3efc336e566..1f8dd329911 100644 --- a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/PodCrudTest.java +++ b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/PodCrudTest.java @@ -30,7 +30,6 @@ import org.junit.jupiter.api.Test; import java.util.Collections; -import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -158,29 +157,33 @@ void testPodWatchOnNamespace() throws InterruptedException { void testPodWatchOnLabels() throws InterruptedException { Pod pod1 = new PodBuilder().withNewMetadata().withName("pod1").addToLabels("test", "watch").endMetadata().build(); - //there are two adds - one when the watch is registered, another later - final LatchedWatcher lw = new LatchedWatcher(3, 1, 1, 1, 1); + //there are three adds - one when the watch is registered, another later + final LatchedWatcher lw = new LatchedWatcher(3, 1, 2, 1, 1); + // create 1 client.pods().inNamespace("ns1").create(pod1); Watch watch = client.pods().inNamespace("ns1") .withLabels(Collections.singletonMap("test", "watch")) .watch(lw); - Map m = pod1.getMetadata().getLabels(); - m.put("foo", "bar"); - client.pods().inNamespace("ns1").withName(pod1.getMetadata().getName()) - .patch(new PodBuilder().withNewMetadataLike(pod1.getMetadata()).endMetadata().build()); + // edit 1 + client.pods() + .inNamespace("ns1") + .withName(pod1.getMetadata().getName()) + .accept(p -> p.getMetadata().getLabels().put("foo", "bar")); + // delete 1 client.pods().inNamespace("ns1").withName(pod1.getMetadata().getName()).delete(); + // create 2 client.pods().inNamespace("ns1").create(new PodBuilder() .withNewMetadata().withName("pod-new").addToLabels("test", "watch").endMetadata() .build()); assertEquals(1, client.pods().inNamespace("ns1").list().getItems().size()); - assertTrue(lw.deleteLatch.await(1, TimeUnit.MINUTES)); assertTrue(lw.editLatch.await(1, TimeUnit.MINUTES)); + // not seen by watch Pod pod2 = client.pods().inNamespace("ns1").create(new PodBuilder() .withNewMetadata().withName("pod2").addToLabels("foo", "bar").endMetadata() .build()); @@ -188,15 +191,24 @@ void testPodWatchOnLabels() throws InterruptedException { assertEquals(2, client.pods().inNamespace("ns1").list().getItems().size()); assertEquals(1, client.pods().inNamespace("ns1").withLabel("test", "watch").list().getItems().size()); - Map m1 = pod2.getMetadata().getLabels(); - m1.put("test", "watch"); + // "create" 3 + client.pods() + .inNamespace("ns1") + .withName(pod2.getMetadata().getName()) + .accept(p -> p.getMetadata().getLabels().put("test", "watch")); - client.pods().inNamespace("ns1").withName(pod2.getMetadata().getName()) - .patch(new PodBuilder().withNewMetadataLike(pod2.getMetadata()).endMetadata().build()); + assertTrue(lw.addLatch.await(1, TimeUnit.MINUTES)); assertEquals(2, client.pods().inNamespace("ns1").list().getItems().size()); assertEquals(2, client.pods().inNamespace("ns1").withLabel("test", "watch").list().getItems().size()); - assertTrue(lw.addLatch.await(1, TimeUnit.MINUTES)); + + // "delete" 2 + client.pods() + .inNamespace("ns1") + .withName(pod2.getMetadata().getName()) + .accept(p -> p.getMetadata().getLabels().clear()); + + assertTrue(lw.deleteLatch.await(1, TimeUnit.MINUTES)); watch.close(); assertTrue(lw.closeLatch.await(1, TimeUnit.MINUTES)); @@ -214,7 +226,7 @@ void testPodWatchTryWithResources() throws InterruptedException { Watch watch = client.pods().inNamespace("ns1").withName(pod1.getMetadata().getName()).watch(lw) ) { client.pods().inNamespace("ns1").withName(pod1.getMetadata().getName()) - .patch(new PodBuilder().withNewMetadataLike(pod1.getMetadata()).endMetadata().build()); + .edit(p -> new PodBuilder(p).editMetadata().withLabels(Collections.emptyMap()).endMetadata().build()); client.pods().inNamespace("ns1").withName(pod1.getMetadata().getName()).delete();