Skip to content

Commit

Permalink
fix #5015: executing resync as a locked operation
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins authored and manusa committed Apr 20, 2023
1 parent 7d41264 commit c03eccf
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* Fix #5000: Remove clashing `v1alpha` apigroup packages in `istio-model-v1beta1`
* fix #5002: Jetty response completion accounts for header processing
* Fix #5009: addressing issue with serialization of wrapped polymorphic types
* Fix #5015: executing resync as a locking operation to ensure resync event ordering
* Fix #5020: updating the resourceVersion on a delete with finalizers
* Fix #5033: port forwarding for clients other than okhttp needs to specify the subprotocol
* Fix #5044: disable Vert.x instance file caching
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,4 +396,8 @@ public boolean isFullState() {
return items.isFullState();
}

public Object getLockObject() {
return this;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,11 @@ public String getKey(T obj) {

@Override
public void resync() {
this.cache.list()
.forEach(i -> this.processor.distribute(new ProcessorListener.UpdateNotification<>(i, i), true));
// lock to ensure the ordering wrt other events
synchronized (cache.getLockObject()) {
this.cache.list()
.forEach(i -> this.processor.distribute(new ProcessorListener.UpdateNotification<>(i, i), true));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ public SharedProcessor() {
}

public SharedProcessor(Executor executor, String informerDescription) {
// serialexecutors are by default unbounded, we expect this behavior here
// because resync may flood the executor with events for large caches
// if we ever need to limit the queue size, we have to revisit the
// resync locking behavior
this.executor = new SerialExecutor(executor);
this.informerDescription = informerDescription;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,25 @@ void testSyncEvents() {
assertTrue(syncCaptor.getAllValues().subList(4, 6).stream().allMatch(s -> !s.booleanValue()));
}

@Test
void testResyncLock() throws InterruptedException {
CacheImpl<Pod> podCache = new CacheImpl<>();
SharedProcessor<Pod> processor = Mockito.mock(SharedProcessor.class);

ProcessorStore<Pod> processorStore = new ProcessorStore<>(podCache, processor);

Pod pod = new PodBuilder().withNewMetadata().withName("pod1").withResourceVersion("1").endMetadata().build();

List<Pod> pods = Arrays.asList(pod);
processorStore.update(pods);

Mockito.doAnswer(invocation -> {
assertTrue(Thread.holdsLock(podCache.getLockObject()));
return null;
}).when(processor).distribute(Mockito.any(ProcessorListener.Notification.class), Mockito.anyBoolean());
processorStore.resync();

Mockito.verify(processor).distribute(Mockito.any(ProcessorListener.Notification.class), Mockito.anyBoolean());
}

}

0 comments on commit c03eccf

Please sign in to comment.