diff --git a/CHANGELOG.md b/CHANGELOG.md index 40f1cd50fbd..2b4ef1d3342 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ * Fix #4891: address vertx not completely reading exec streams * Fix #4899: BuildConfigs.instantiateBinary().fromFile() does not time out * Fix #4908: using the response headers in the vertx response +* Fix #4931: using coarse grain locking for all mock server operations * Fix #4947: typo in HttpClient.Factory scoring system logic * Fix #4928: allows non-okhttp clients to handle invalid status diff --git a/junit/kubernetes-server-mock/src/main/java/io/fabric8/kubernetes/client/server/mock/KubernetesCrudDispatcher.java b/junit/kubernetes-server-mock/src/main/java/io/fabric8/kubernetes/client/server/mock/KubernetesCrudDispatcher.java index bd99db42e40..1dd30de96af 100644 --- a/junit/kubernetes-server-mock/src/main/java/io/fabric8/kubernetes/client/server/mock/KubernetesCrudDispatcher.java +++ b/junit/kubernetes-server-mock/src/main/java/io/fabric8/kubernetes/client/server/mock/KubernetesCrudDispatcher.java @@ -18,6 +18,7 @@ import io.fabric8.kubernetes.api.model.GenericKubernetesResource; import io.fabric8.kubernetes.client.Watcher.Action; import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext; +import io.fabric8.kubernetes.client.server.mock.crud.KubernetesCrudDispatcherException; import io.fabric8.kubernetes.client.server.mock.crud.KubernetesCrudDispatcherHandler; import io.fabric8.kubernetes.client.server.mock.crud.KubernetesCrudPersistence; import io.fabric8.kubernetes.client.server.mock.crud.PatchHandler; @@ -47,8 +48,8 @@ import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicLong; - -import static io.fabric8.kubernetes.client.server.mock.crud.KubernetesCrudDispatcherHandler.process; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; public class KubernetesCrudDispatcher extends CrudDispatcher implements KubernetesCrudPersistence, CustomResourceAware { @@ -61,6 +62,7 @@ public class KubernetesCrudDispatcher extends CrudDispatcher implements Kubernet private final KubernetesCrudDispatcherHandler postHandler; private final KubernetesCrudDispatcherHandler putHandler; private final KubernetesCrudDispatcherHandler patchHandler; + private final ReadWriteLock lock = new ReentrantReadWriteLock(); public KubernetesCrudDispatcher() { this(Collections.emptyList()); @@ -81,6 +83,17 @@ public KubernetesCrudDispatcher(List crdContext crdContexts.stream().forEach(this::expectCustomResource); } + MockResponse process(RecordedRequest request, KubernetesCrudDispatcherHandler handler) { + lock.writeLock().lock(); + try { + return handler.handle(request); + } catch (KubernetesCrudDispatcherException e) { + return new MockResponse().setResponseCode(e.getCode()).setBody(e.toStatusBody()); + } finally { + lock.writeLock().unlock(); + } + } + /** * Adds the specified object to the in-memory db. * @@ -111,10 +124,15 @@ public MockResponse handleUpdate(RecordedRequest request) { */ @Override public MockResponse handleGet(String path) { - if (detectWatchMode(path)) { - return handleWatch(path); + lock.readLock().lock(); + try { + if (detectWatchMode(path)) { + return handleWatch(path); + } + return handle(path, null); + } finally { + lock.readLock().unlock(); } - return handle(path, null); } private interface EventProcessor { @@ -126,17 +144,15 @@ private MockResponse handle(String path, EventProcessor eventProcessor) { List items = new ArrayList<>(); AttributeSet query = attributeExtractor.fromPath(path); - synchronized (map) { - new ArrayList<>(map.entrySet()).stream() - .filter(entry -> entry.getKey().matches(query)) - .forEach(entry -> { - LOGGER.debug("Entry found for query {} : {}", query, entry); - items.add(entry.getValue()); - if (eventProcessor != null) { - eventProcessor.processEvent(path, query, entry.getKey()); - } - }); - } + new ArrayList<>(map.entrySet()).stream() + .filter(entry -> entry.getKey().matches(query)) + .forEach(entry -> { + LOGGER.debug("Entry found for query {} : {}", query, entry); + items.add(entry.getValue()); + if (eventProcessor != null) { + eventProcessor.processEvent(path, query, entry.getKey()); + } + }); if (query.containsKey(KubernetesAttributesExtractor.NAME)) { if (!items.isEmpty()) { @@ -179,26 +195,30 @@ public MockResponse handlePatch(RecordedRequest request) { */ @Override public MockResponse handleDelete(String path) { - return handle(path, (p, pathAttributes, oldAttributes) -> { - String jsonStringOfResource = map.get(oldAttributes); - /* - * Potential performance improvement: The resource is unmarshalled and marshalled in other places (e.g., when creating a - * WatchEvent later). - * This could be avoided by storing the unmarshalled object (instead of a String) in the map. - */ - final GenericKubernetesResource resource = Serialization.unmarshal(jsonStringOfResource, GenericKubernetesResource.class); - if (resource.getFinalizers().isEmpty()) { - // No finalizers left, actually remove the resource. - processEvent(path, pathAttributes, oldAttributes, null); - return; - } else if (!resource.isMarkedForDeletion()) { - // Mark the resource as deleted, but don't remove it yet (wait for finalizer-removal). - resource.getMetadata().setDeletionTimestamp(LocalDateTime.now().toString()); - String updatedResource = Serialization.asJson(resource); - processEvent(path, pathAttributes, oldAttributes, updatedResource); - } - // else: if the resource is already marked for deletion and still has finalizers, do nothing. - }); + lock.writeLock().lock(); + try { + return handle(path, this::processDelete); + } finally { + lock.writeLock().unlock(); + } + } + + private void processDelete(String path, AttributeSet pathAttributes, AttributeSet oldAttributes) { + String jsonStringOfResource = map.get(oldAttributes); + final GenericKubernetesResource resource = Serialization.unmarshal(jsonStringOfResource, GenericKubernetesResource.class); + if (resource.getFinalizers().isEmpty()) { + // No finalizers left, actually remove the resource. + processEvent(path, pathAttributes, oldAttributes, null, null); + return; + } + if (!resource.isMarkedForDeletion()) { + // Mark the resource as deleted, but don't remove it yet (wait for finalizer-removal). + resource.getMetadata().setDeletionTimestamp(LocalDateTime.now().toString()); + String updatedResource = Serialization.asJson(resource); + processEvent(path, pathAttributes, oldAttributes, resource, updatedResource); + return; + } + // else: if the resource is already marked for deletion and still has finalizers, do nothing. } @Override @@ -213,11 +233,9 @@ public AttributeSet getKey(String path) { @Override public Map.Entry findResource(AttributeSet attributes) { - synchronized (map) { - return map.entrySet().stream() - .filter(entry -> entry.getKey().matches(attributes)) - .findFirst().orElse(null); - } + return map.entrySet().stream() + .filter(entry -> entry.getKey().matches(attributes)) + .findFirst().orElse(null); } @Override @@ -226,11 +244,16 @@ public boolean isStatusSubresourceEnabledForResource(String path) { } @Override - public void processEvent(String path, AttributeSet pathAttributes, AttributeSet oldAttributes, String newState) { + public void processEvent(String path, AttributeSet pathAttributes, AttributeSet oldAttributes, + GenericKubernetesResource resource, String newState) { String existing = map.remove(oldAttributes); AttributeSet newAttributes = null; if (newState != null) { - newAttributes = kubernetesAttributesExtractor.fromResource(newState); + if (resource != null) { + newAttributes = kubernetesAttributesExtractor.extract(resource); + } else { + newAttributes = kubernetesAttributesExtractor.fromResource(newState); + } // corner case - we need to get the plural from the path if (!newAttributes.containsKey(KubernetesAttributesExtractor.PLURAL)) { newAttributes = AttributeSet.merge(pathAttributes, newAttributes); @@ -269,13 +292,9 @@ public MockResponse handleWatch(String path) { query = query.add(new Attribute("name", resourceName)); } WatchEventsListener watchEventListener = new WatchEventsListener(context, query, watchEventListeners, LOGGER, - watch -> { - synchronized (map) { - map.entrySet().stream() - .filter(entry -> watch.attributeMatches(entry.getKey())) - .forEach(entry -> watch.sendWebSocketResponse(entry.getValue(), Action.ADDED)); - } - }); + watch -> map.entrySet().stream() + .filter(entry -> watch.attributeMatches(entry.getKey())) + .forEach(entry -> watch.sendWebSocketResponse(entry.getValue(), Action.ADDED))); watchEventListeners.add(watchEventListener); mockResponse.setSocketPolicy(SocketPolicy.KEEP_OPEN); return mockResponse.withWebSocketUpgrade(watchEventListener); diff --git a/junit/kubernetes-server-mock/src/main/java/io/fabric8/kubernetes/client/server/mock/crud/KubernetesCrudDispatcherHandler.java b/junit/kubernetes-server-mock/src/main/java/io/fabric8/kubernetes/client/server/mock/crud/KubernetesCrudDispatcherHandler.java index 5b40d6664da..bb1c41305b0 100644 --- a/junit/kubernetes-server-mock/src/main/java/io/fabric8/kubernetes/client/server/mock/crud/KubernetesCrudDispatcherHandler.java +++ b/junit/kubernetes-server-mock/src/main/java/io/fabric8/kubernetes/client/server/mock/crud/KubernetesCrudDispatcherHandler.java @@ -113,14 +113,6 @@ default GenericKubernetesResource validateRequestBody(String requestBody) throws return resource; } - static MockResponse process(RecordedRequest request, KubernetesCrudDispatcherHandler handler) { - try { - return handler.handle(request); - } catch (KubernetesCrudDispatcherException e) { - return new MockResponse().setResponseCode(e.getCode()).setBody(e.toStatusBody()); - } - } - static boolean isStatusPath(String path) { return path.endsWith("/" + STATUS); } diff --git a/junit/kubernetes-server-mock/src/main/java/io/fabric8/kubernetes/client/server/mock/crud/KubernetesCrudPersistence.java b/junit/kubernetes-server-mock/src/main/java/io/fabric8/kubernetes/client/server/mock/crud/KubernetesCrudPersistence.java index d493220f415..52b92f649e5 100644 --- a/junit/kubernetes-server-mock/src/main/java/io/fabric8/kubernetes/client/server/mock/crud/KubernetesCrudPersistence.java +++ b/junit/kubernetes-server-mock/src/main/java/io/fabric8/kubernetes/client/server/mock/crud/KubernetesCrudPersistence.java @@ -19,6 +19,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectReader; import com.fasterxml.jackson.databind.node.ObjectNode; +import io.fabric8.kubernetes.api.model.GenericKubernetesResource; import io.fabric8.kubernetes.client.server.mock.Resetable; import io.fabric8.kubernetes.client.utils.Serialization; import io.fabric8.mockwebserver.crud.AttributeSet; @@ -47,7 +48,8 @@ public interface KubernetesCrudPersistence extends Resetable { boolean isStatusSubresourceEnabledForResource(String path); - void processEvent(String path, AttributeSet pathAttributes, AttributeSet oldAttributes, String newState); + void processEvent(String path, AttributeSet pathAttributes, AttributeSet oldAttributes, GenericKubernetesResource resource, + String newState); default JsonNode asNode(Map.Entry resource) throws KubernetesCrudDispatcherException { return asNode(resource.getValue()); diff --git a/junit/kubernetes-server-mock/src/main/java/io/fabric8/kubernetes/client/server/mock/crud/PatchHandler.java b/junit/kubernetes-server-mock/src/main/java/io/fabric8/kubernetes/client/server/mock/crud/PatchHandler.java index cd98d22b70c..60ab3eda4c7 100644 --- a/junit/kubernetes-server-mock/src/main/java/io/fabric8/kubernetes/client/server/mock/crud/PatchHandler.java +++ b/junit/kubernetes-server-mock/src/main/java/io/fabric8/kubernetes/client/server/mock/crud/PatchHandler.java @@ -89,9 +89,10 @@ public MockResponse handle(String path, String contentType, String requestBody) if (deserializedResource.isMarkedForDeletion() && deserializedResource.getFinalizers().isEmpty()) { // Delete the resource. updatedAsString = null; + deserializedResource = null; } - persistence.processEvent(path, query, currentResourceEntry.getKey(), updatedAsString); + persistence.processEvent(path, query, currentResourceEntry.getKey(), deserializedResource, updatedAsString); return new MockResponse().setResponseCode(HTTP_ACCEPTED).setBody(Utils.getNonNullOrElse(updatedAsString, "")); } diff --git a/junit/kubernetes-server-mock/src/main/java/io/fabric8/kubernetes/client/server/mock/crud/PostHandler.java b/junit/kubernetes-server-mock/src/main/java/io/fabric8/kubernetes/client/server/mock/crud/PostHandler.java index 1573c66b3ec..f8831598f52 100644 --- a/junit/kubernetes-server-mock/src/main/java/io/fabric8/kubernetes/client/server/mock/crud/PostHandler.java +++ b/junit/kubernetes-server-mock/src/main/java/io/fabric8/kubernetes/client/server/mock/crud/PostHandler.java @@ -62,7 +62,7 @@ public MockResponse handle(String path, String contentType, String requestBody) resource.getAdditionalProperties().remove(STATUS); } final String response = Serialization.asJson(resource); - persistence.processEvent(path, attributes, null, response); + persistence.processEvent(path, attributes, null, resource, response); return new MockResponse().setResponseCode(HttpURLConnection.HTTP_CREATED).setBody(response); } diff --git a/junit/kubernetes-server-mock/src/main/java/io/fabric8/kubernetes/client/server/mock/crud/PutHandler.java b/junit/kubernetes-server-mock/src/main/java/io/fabric8/kubernetes/client/server/mock/crud/PutHandler.java index 359b8da7210..36b18a32c66 100644 --- a/junit/kubernetes-server-mock/src/main/java/io/fabric8/kubernetes/client/server/mock/crud/PutHandler.java +++ b/junit/kubernetes-server-mock/src/main/java/io/fabric8/kubernetes/client/server/mock/crud/PutHandler.java @@ -47,7 +47,7 @@ public MockResponse handle(String path, String contentType, String requestBody) // Delete the resource if it is marked for deletion and has no finalizers. if (resource.isMarkedForDeletion() && resource.getFinalizers().isEmpty()) { - persistence.processEvent(path, attributes, currentResourceEntry.getKey(), null); + persistence.processEvent(path, attributes, currentResourceEntry.getKey(), null, null); return new MockResponse().setResponseCode(HttpURLConnection.HTTP_OK); } @@ -72,7 +72,7 @@ public MockResponse handle(String path, String contentType, String requestBody) } persistence.touchResourceVersion(currentResource, updatedResource); final String response = Serialization.asJson(updatedResource); - persistence.processEvent(path, attributes, currentResourceEntry.getKey(), response); + persistence.processEvent(path, attributes, currentResourceEntry.getKey(), null, response); return new MockResponse().setResponseCode(HttpURLConnection.HTTP_OK).setBody(response); } } diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/utils/Serialization.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/utils/Serialization.java index 92499eb4eb7..3bdfbb03506 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/utils/Serialization.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/utils/Serialization.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import io.fabric8.kubernetes.api.model.KubernetesResource; +import io.fabric8.kubernetes.api.model.KubernetesResourceList; import io.fabric8.kubernetes.api.model.runtime.RawExtension; import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.model.jackson.UnmatchedFieldTypeModule; @@ -39,9 +40,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import java.util.stream.Collectors; public class Serialization { private Serialization() { @@ -57,8 +55,6 @@ private Serialization() { private static volatile ObjectMapper YAML_MAPPER; - private static final String DOCUMENT_DELIMITER = "---"; - /** * {@link ObjectMapper} singleton instance used internally by the Kubernetes client. * @@ -213,22 +209,7 @@ public static T unmarshal(InputStream is, ObjectMapper mapper) { */ @Deprecated public static T unmarshal(InputStream is, ObjectMapper mapper, Map parameters) { - // it's not well documented which Serialization methods are aware of input that can contain - // multiple docs - String specFile; - try { - specFile = IOHelpers.readFully(is); - } catch (IOException e1) { - throw new RuntimeException("Could not read stream"); - } - if (containsMultipleDocuments(specFile)) { - return (T) getKubernetesResourceList(Collections.emptyMap(), specFile); - } else if (specFile.contains(DOCUMENT_DELIMITER)) { - specFile = specFile.replaceAll("^---([ \\t].*?)?\\r?\\n", ""); - specFile = specFile.replaceAll("\\n---([ \\t].*?)?\\r?\\n?$", "\n"); - } - - return unmarshal(new ByteArrayInputStream(specFile.getBytes(StandardCharsets.UTF_8)), mapper, new TypeReference() { + return unmarshal(is, mapper, new TypeReference() { @Override public Type getType() { return KubernetesResource.class; @@ -249,13 +230,7 @@ private static T unmarshal(InputStream is, ObjectMapper mapper, TypeReferenc final T result; if (intch != '{' && intch != '[') { - final Load yaml = new Load(LoadSettings.builder().build()); - final Object obj = yaml.loadFromInputStream(bis); - if (obj instanceof Map) { - result = mapper.convertValue(obj, type); - } else { - result = mapper.convertValue(new RawExtension(obj), type); - } + result = parseYaml(bis, mapper, type); } else { result = mapper.readerFor(type).readValue(bis); } @@ -265,6 +240,48 @@ private static T unmarshal(InputStream is, ObjectMapper mapper, TypeReferenc } } + /** + * If multiple docs exist, only non-null resources will be kept. Results spanning multiple docs + * will be returned as a List of KubernetesResource + */ + private static T parseYaml(BufferedInputStream bis, ObjectMapper mapper, TypeReference type) { + T result = null; + List listResult = null; + final Load yaml = new Load(LoadSettings.builder().build()); + final Iterable objs = yaml.loadAllFromInputStream(bis); + for (Object obj : objs) { + Object value = null; + if (obj instanceof Map) { + value = mapper.convertValue(obj, type); + } else if (obj != null) { + value = mapper.convertValue(new RawExtension(obj), type); + } + if (value != null) { + if (result == null) { + result = (T) value; + } else { + if (listResult == null) { + listResult = new ArrayList<>(); + accumulateResult(result, listResult); + } + accumulateResult(value, listResult); + } + } + } + if (listResult != null) { + return (T) listResult; + } + return result; + } + + private static void accumulateResult(T result, List listResult) { + if (result instanceof KubernetesResourceList) { + listResult.addAll(((KubernetesResourceList) result).getItems()); + } else { + listResult.add((KubernetesResource) result); + } + } + /** * Unmarshals a {@link String} *

@@ -382,42 +399,6 @@ public static T unmarshal(InputStream is, TypeReference type, Map getKubernetesResourceList(Map parameters, String specFile) { - return splitSpecFile(specFile).stream().filter(Serialization::validate) - .map( - document -> (KubernetesResource) Serialization.unmarshal(new ByteArrayInputStream(document.getBytes()), parameters)) - .filter(o -> o != null) - .collect(Collectors.toList()); - } - - static boolean containsMultipleDocuments(String specFile) { - final long validDocumentCount = splitSpecFile(specFile).stream().filter(Serialization::validate) - .count(); - return validDocumentCount > 1; - } - - private static List splitSpecFile(String aSpecFile) { - final List documents = new ArrayList<>(); - final StringBuilder documentBuilder = new StringBuilder(); - for (String line : aSpecFile.split("\r?\n")) { - if (line.startsWith(DOCUMENT_DELIMITER)) { - documents.add(documentBuilder.toString()); - documentBuilder.setLength(0); - } else { - documentBuilder.append(line).append(System.lineSeparator()); - } - } - if (documentBuilder.length() > 0) { - documents.add(documentBuilder.toString()); - } - return documents; - } - - private static boolean validate(String document) { - Matcher keyValueMatcher = Pattern.compile("(\\S+):\\s(\\S*)(?:\\b(?!:)|$)").matcher(document); - return !document.isEmpty() && keyValueMatcher.find(); - } - /** * Create a copy of the resource via serialization. * diff --git a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/utils/SerializationTest.java b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/utils/SerializationTest.java index e604755000e..1a8c11d18df 100644 --- a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/utils/SerializationTest.java +++ b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/utils/SerializationTest.java @@ -132,50 +132,6 @@ private String readYamlToString(String path) throws IOException { .collect(Collectors.joining("\n")); } - @Test - @DisplayName("containsMultipleDocuments, multiple documents with windows line ends, should return true") - void containsMultipleDocumentsWithMultipleDocumentsAndWindowsLineEnds() { - // Given - final String multiDocument = "---\r\napiVersion: v1\r\nKind: Something\r\n\r\n---\r\napiVersion: v2\r\nKind: Other"; - // When - final boolean result = Serialization.containsMultipleDocuments(multiDocument); - // Then - assertThat(result).isTrue(); - } - - @Test - @DisplayName("containsMultipleDocuments, single document with windows line ends, should return false") - void containsMultipleDocumentsWithSingleDocumentAndWindowsLineEnds() { - // Given - final String multiDocument = "---\r\napiVersion: v1\r\nKind: Something\r\n\r\n"; - // When - final boolean result = Serialization.containsMultipleDocuments(multiDocument); - // Then - assertThat(result).isFalse(); - } - - @Test - @DisplayName("containsMultipleDocuments, multiple documents with linux line ends, should return true") - void containsMultipleDocumentsWithMultipleDocumentsAndLinuxLineEnds() { - // Given - final String multiDocument = "---\napiVersion: v1\nKind: Something\n\n---\napiVersion: v2\nKind: Other"; - // When - final boolean result = Serialization.containsMultipleDocuments(multiDocument); - // Then - assertThat(result).isTrue(); - } - - @Test - @DisplayName("containsMultipleDocuments, single document with linux line ends, should return false") - void containsMultipleDocumentsWithSingleDocumentAndLinuxLineEnds() { - // Given - final String multiDocument = "---\napiVersion: v1\nKind: Something\n\n"; - // When - final boolean result = Serialization.containsMultipleDocuments(multiDocument); - // Then - assertThat(result).isFalse(); - } - @Test void testSerializeYamlWithAlias() { // Given