diff --git a/CHANGELOG.md b/CHANGELOG.md index 1b707021e46..d8f023cc53b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ * Fix #5000: Remove clashing `v1alpha` apigroup packages in `istio-model-v1beta1` * fix #5002: Jetty response completion accounts for header processing * Fix #5009: addressing issue with serialization of wrapped polymorphic types +* Fix #5015: executing resync as a locking operation to ensure resync event ordering * Fix #5020: updating the resourceVersion on a delete with finalizers * Fix #5033: port forwarding for clients other than okhttp needs to specify the subprotocol * Fix #5044: disable Vert.x instance file caching diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/CacheImpl.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/CacheImpl.java index 94fd7b959e4..a9404dd1c54 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/CacheImpl.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/CacheImpl.java @@ -396,4 +396,8 @@ public boolean isFullState() { return items.isFullState(); } + public Object getLockObject() { + return this; + } + } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/ProcessorStore.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/ProcessorStore.java index d9be93bebc4..563b8a75c3a 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/ProcessorStore.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/ProcessorStore.java @@ -130,8 +130,11 @@ public String getKey(T obj) { @Override public void resync() { - this.cache.list() - .forEach(i -> this.processor.distribute(new ProcessorListener.UpdateNotification<>(i, i), true)); + // lock to ensure the ordering wrt other events + synchronized (cache.getLockObject()) { + this.cache.list() + .forEach(i -> this.processor.distribute(new ProcessorListener.UpdateNotification<>(i, i), true)); + } } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/SharedProcessor.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/SharedProcessor.java index 86e334bb93a..9b8828adb65 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/SharedProcessor.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/SharedProcessor.java @@ -56,6 +56,10 @@ public SharedProcessor() { } public SharedProcessor(Executor executor, String informerDescription) { + // serialexecutors are by default unbounded, we expect this behavior here + // because resync may flood the executor with events for large caches + // if we ever need to limit the queue size, we have to revisit the + // resync locking behavior this.executor = new SerialExecutor(executor); this.informerDescription = informerDescription; } diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/cache/ProcessorStoreTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/cache/ProcessorStoreTest.java index 3da19668ef7..032623f06d8 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/cache/ProcessorStoreTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/cache/ProcessorStoreTest.java @@ -118,4 +118,25 @@ void testSyncEvents() { assertTrue(syncCaptor.getAllValues().subList(4, 6).stream().allMatch(s -> !s.booleanValue())); } + @Test + void testResyncLock() throws InterruptedException { + CacheImpl podCache = new CacheImpl<>(); + SharedProcessor processor = Mockito.mock(SharedProcessor.class); + + ProcessorStore processorStore = new ProcessorStore<>(podCache, processor); + + Pod pod = new PodBuilder().withNewMetadata().withName("pod1").withResourceVersion("1").endMetadata().build(); + + List pods = Arrays.asList(pod); + processorStore.update(pods); + + Mockito.doAnswer(invocation -> { + assertTrue(Thread.holdsLock(podCache.getLockObject())); + return null; + }).when(processor).distribute(Mockito.any(ProcessorListener.Notification.class), Mockito.anyBoolean()); + processorStore.resync(); + + Mockito.verify(processor).distribute(Mockito.any(ProcessorListener.Notification.class), Mockito.anyBoolean()); + } + }