Skip to content

Commit

Permalink
making the event logic more uniform
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins authored and manusa committed Jun 11, 2021
1 parent 031ca7e commit ac7ad10
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,11 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -125,7 +126,7 @@ public synchronized MockResponse dispatch(RecordedRequest request) {
*/
@Override
public MockResponse handleCreate(String path, String s) {
return validateRequestBodyAndHandleRequest(s, (g) -> doCreateOrModify(path, g, ADDED));
return validateRequestBodyAndHandleRequest(s, g -> doCreateOrModify(path, g, ADDED));
}

/**
Expand All @@ -135,7 +136,7 @@ public MockResponse handleCreate(String path, String s) {
* @return The {@link MockResponse}
*/
public MockResponse handleReplace(String path, String s) {
return validateRequestBodyAndHandleRequest(s, (g) -> doCreateOrModify(path, g, MODIFIED));
return validateRequestBodyAndHandleRequest(s, g -> doCreateOrModify(path, g, MODIFIED));
}

/**
Expand Down Expand Up @@ -180,10 +181,17 @@ public MockResponse handleGet(String path) {
*/
public MockResponse handlePatch(String path, String s, String contentType) {
MockResponse response = new MockResponse();
String body = fetchResource(path);
if (body == null) {

AttributeSet query = attributeExtractor.fromPath(path);

Optional<Map.Entry<AttributeSet, String>> bodyEntry = map.entrySet().stream()
.filter(entry -> entry.getKey().matches(query))
.findFirst();

if (!bodyEntry.isPresent()) {
response.setResponseCode(HttpURLConnection.HTTP_NOT_FOUND);
} else {
String body = bodyEntry.get().getValue();
try {
JsonNode patch = context.getMapper().readTree(s);
JsonNode source = context.getMapper().readTree(body);
Expand Down Expand Up @@ -221,39 +229,11 @@ public MockResponse handlePatch(String path, String s, String contentType) {
}

String updatedAsString = context.getMapper().writeValueAsString(updated);
GenericKubernetesResource resource = toKubernetesResource(updatedAsString);
GenericKubernetesResource existingResource = toKubernetesResource(body);
setDefaultMetadata(resource, pathValues, existingResource.getMetadata());

AttributeSet attributeSet;

AttributeSet query = attributeExtractor.fromPath(path);

attributeSet = map.entrySet().stream()
.filter(entry -> entry.getKey().matches(query))
.findFirst().orElseThrow(IllegalStateException::new).getKey();

if (body.equals(updatedAsString)) {
response.setResponseCode(HttpURLConnection.HTTP_ACCEPTED);
response.setBody(updatedAsString);
return response;
}
map.remove(attributeSet);
AttributeSet newAttributeSet = AttributeSet.merge(attributeSet, attributeExtractor.fromResource(updatedAsString));
map.put(newAttributeSet, updatedAsString);
crdProcessor.process(path, updatedAsString, false);

final AtomicBoolean flag = new AtomicBoolean(false);
AttributeSet finalAttributeSet = attributeSet;
watchEventListeners.stream()
.filter(watchEventsListener -> watchEventsListener.attributeMatches(finalAttributeSet))
.forEach(watchEventsListener -> {
flag.set(true);
watchEventsListener.sendWebSocketResponse(updatedAsString, MODIFIED);
});

if (!flag.get()) {
watchEventListeners.stream()
.filter(watchEventsListener -> watchEventsListener.attributeMatches(newAttributeSet))
.forEach(watchEventsListener -> watchEventsListener.sendWebSocketResponse(updatedAsString, ADDED));
}
processEvent(path, bodyEntry.get().getKey(), Serialization.asJson(resource));

response.setResponseCode(HttpURLConnection.HTTP_ACCEPTED);
response.setBody(updatedAsString);
Expand All @@ -272,7 +252,7 @@ public MockResponse handlePatch(String path, String s, String contentType) {
*/
@Override
public MockResponse handleDelete(String path) {
return new MockResponse().setResponseCode(doDelete(path, "DELETED"));
return new MockResponse().setResponseCode(doDelete(path));
}

/**
Expand Down Expand Up @@ -336,41 +316,41 @@ private String fetchResourceNameFromWatchRequestPath(String path) {
return name.isEmpty()? null: name;
}

private String fetchResource(String path) {
List<String> items = new ArrayList<>();
AttributeSet query = attributeExtractor.fromPath(path);

map.entrySet().stream()
.filter(entry -> entry.getKey().matches(query))
.forEach(entry -> items.add(entry.getValue()));

if (items.isEmpty()) {
return null;
} else if (items.size() == 1) {
return items.get(0);
} else {
return responseComposer.compose(items);
}
}

private int doDelete(String path, String event) {
private int doDelete(String path) {
List<AttributeSet> items = findItems(attributeExtractor.fromPath(path));

if (items.isEmpty()) return HttpURLConnection.HTTP_NOT_FOUND;


items.forEach(item -> {
if (event != null && !event.isEmpty()) {
watchEventListeners.stream()
.filter(listener -> listener.attributeMatches(item))
.forEach(listener -> listener.sendWebSocketResponse(map.get(item), event));
}
String existing = map.remove(item);
crdProcessor.process(path, existing, true);
});
items.forEach(item -> processEvent(path, item, null));
return HttpURLConnection.HTTP_OK;
}

private void processEvent(String path, AttributeSet oldAttributes, String newState) {
String existing = map.remove(oldAttributes);
AttributeSet newAttributes = null;
if (newState != null) {
newAttributes = kubernetesAttributesExtractor.fromResource(newState);
map.put(newAttributes, newState);
}
if (!Objects.equals(existing, newState)) {
AttributeSet finalAttributeSet = newAttributes;
watchEventListeners.stream()
.forEach(listener -> {
boolean matchesOld = oldAttributes != null && listener.attributeMatches(oldAttributes);
boolean matchesNew = finalAttributeSet != null && listener.attributeMatches(finalAttributeSet);
if (matchesOld && matchesNew) {
listener.sendWebSocketResponse(newState, MODIFIED);
} else if (matchesOld) {
listener.sendWebSocketResponse(existing, "DELETED");
} else if (matchesNew) {
listener.sendWebSocketResponse(newState, ADDED);
}
});

crdProcessor.process(path, Utils.getNonNullOrElse(newState, existing), newState == null);
}
}

private List<AttributeSet> findItems(AttributeSet query) {
return map.keySet().stream()
.filter(entry -> entry.matches(query))
Expand All @@ -393,6 +373,7 @@ private MockResponse doCreateOrModify(String path, GenericKubernetesResource val
boolean statusSubresource = crdProcessor.isStatusSubresource(pathValues.get(KubernetesAttributesExtractor.KIND));

GenericKubernetesResource updated = Serialization.clone(value);
AttributeSet existingAttributes = null;

List<AttributeSet> items = findItems(attributes);
if (items.isEmpty()) {
Expand All @@ -407,7 +388,8 @@ private MockResponse doCreateOrModify(String path, GenericKubernetesResource val
} else if (ADDED.equals(event)) {
responseCode = HttpURLConnection.HTTP_CONFLICT;
} else if (MODIFIED.equals(event)) {
String existing = map.remove(items.get(0));
existingAttributes = items.get(0);
String existing = map.get(existingAttributes);
GenericKubernetesResource existingResource = toKubernetesResource(existing);
Object status = null;
if (isStatusPath(path)) {
Expand All @@ -428,23 +410,11 @@ private MockResponse doCreateOrModify(String path, GenericKubernetesResource val
updated.getAdditionalProperties().remove(STATUS);
}
}
if (existingResource.equals(updated)) {
event = null; // no change
}
}

if (responseCode == HttpURLConnection.HTTP_OK) {
String s = context.getMapper().writeValueAsString(updated);
AttributeSet features = AttributeSet.merge(attributes, kubernetesAttributesExtractor.extract(updated));
map.put(features, s); // always add back as it was proactively removed
if (event != null && !event.isEmpty()) {
crdProcessor.process(path, s, false);
final String response = s;
final String finalEvent = event;
watchEventListeners.stream()
.filter(listener -> listener.attributeMatches(features))
.forEach(listener -> listener.sendWebSocketResponse(response, finalEvent));
}
processEvent(path, existingAttributes, s);
mockResponse.setBody(s);
}
mockResponse.setResponseCode(responseCode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -158,45 +157,58 @@ void testPodWatchOnNamespace() throws InterruptedException {
void testPodWatchOnLabels() throws InterruptedException {
Pod pod1 = new PodBuilder().withNewMetadata().withName("pod1").addToLabels("test", "watch").endMetadata().build();

//there are two adds - one when the watch is registered, another later
final LatchedWatcher lw = new LatchedWatcher(3, 1, 1, 1, 1);
//there are three adds - one when the watch is registered, another later
final LatchedWatcher lw = new LatchedWatcher(3, 1, 2, 1, 1);

// create 1
client.pods().inNamespace("ns1").create(pod1);
Watch watch = client.pods().inNamespace("ns1")
.withLabels(Collections.singletonMap("test", "watch"))
.watch(lw);

Map<String, String> m = pod1.getMetadata().getLabels();
m.put("foo", "bar");
client.pods().inNamespace("ns1").withName(pod1.getMetadata().getName())
.patch(new PodBuilder().withNewMetadataLike(pod1.getMetadata()).endMetadata().build());
// edit 1
client.pods()
.inNamespace("ns1")
.withName(pod1.getMetadata().getName())
.accept(p -> p.getMetadata().getLabels().put("foo", "bar"));

// delete 1
client.pods().inNamespace("ns1").withName(pod1.getMetadata().getName()).delete();

// create 2
client.pods().inNamespace("ns1").create(new PodBuilder()
.withNewMetadata().withName("pod-new").addToLabels("test", "watch").endMetadata()
.build());

assertEquals(1, client.pods().inNamespace("ns1").list().getItems().size());
assertTrue(lw.deleteLatch.await(1, TimeUnit.MINUTES));
assertTrue(lw.editLatch.await(1, TimeUnit.MINUTES));

// not seen by watch
Pod pod2 = client.pods().inNamespace("ns1").create(new PodBuilder()
.withNewMetadata().withName("pod2").addToLabels("foo", "bar").endMetadata()
.build());

assertEquals(2, client.pods().inNamespace("ns1").list().getItems().size());
assertEquals(1, client.pods().inNamespace("ns1").withLabel("test", "watch").list().getItems().size());

Map<String, String> m1 = pod2.getMetadata().getLabels();
m1.put("test", "watch");
// "create" 3
client.pods()
.inNamespace("ns1")
.withName(pod2.getMetadata().getName())
.accept(p -> p.getMetadata().getLabels().put("test", "watch"));

client.pods().inNamespace("ns1").withName(pod2.getMetadata().getName())
.patch(new PodBuilder().withNewMetadataLike(pod2.getMetadata()).endMetadata().build());
assertTrue(lw.addLatch.await(1, TimeUnit.MINUTES));

assertEquals(2, client.pods().inNamespace("ns1").list().getItems().size());
assertEquals(2, client.pods().inNamespace("ns1").withLabel("test", "watch").list().getItems().size());
assertTrue(lw.addLatch.await(1, TimeUnit.MINUTES));

// "delete" 2
client.pods()
.inNamespace("ns1")
.withName(pod2.getMetadata().getName())
.accept(p -> p.getMetadata().getLabels().clear());

assertTrue(lw.deleteLatch.await(1, TimeUnit.MINUTES));

watch.close();
assertTrue(lw.closeLatch.await(1, TimeUnit.MINUTES));
Expand All @@ -214,7 +226,7 @@ void testPodWatchTryWithResources() throws InterruptedException {
Watch watch = client.pods().inNamespace("ns1").withName(pod1.getMetadata().getName()).watch(lw)
) {
client.pods().inNamespace("ns1").withName(pod1.getMetadata().getName())
.patch(new PodBuilder().withNewMetadataLike(pod1.getMetadata()).endMetadata().build());
.edit(p -> new PodBuilder(p).editMetadata().withLabels(Collections.emptyMap()).endMetadata().build());

client.pods().inNamespace("ns1").withName(pod1.getMetadata().getName()).delete();

Expand Down

0 comments on commit ac7ad10

Please sign in to comment.