From 62e60b5062f4c164f3a8af0b52fbc1d5e39fb6aa Mon Sep 17 00:00:00 2001 From: Alberto Gomez Date: Mon, 12 Sep 2022 15:12:14 +0200 Subject: [PATCH] GEODE-10420: Finish distribute() work if interrupted (#7854) It is possible that an event of which a gateway sender is to be notified is lost if during the process the thread is interrupted. The reason is that the distribute() method in the AbstractGatewaySender when it catches the InterruptedException at some point, just returns, but does not put the event in the queue and neither drops it. The fix consists of handling the event correctly (put it in the queue or drop it) if the InterruptedException is caught but when the method returns set again the interrupt flag so that the caller is aware. --- .../geode/internal/cache/EntryEventImpl.java | 5 +- .../cache/wan/AbstractGatewaySender.java | 20 ++- .../cache/wan/AbstractGatewaySenderTest.java | 170 ++++++++++++++++++ 3 files changed, 186 insertions(+), 9 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java index 16adbeca7a34..6a521becbe1e 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java @@ -340,8 +340,9 @@ public EntryEventImpl( op = other.op; distributedMember = other.distributedMember; filterInfo = other.filterInfo; - keyInfo = other.keyInfo.isDistKeyInfo() ? new DistTxKeyInfo((DistTxKeyInfo) other.keyInfo) - : new KeyInfo(other.keyInfo); + keyInfo = + other.getKeyInfo().isDistKeyInfo() ? new DistTxKeyInfo((DistTxKeyInfo) other.getKeyInfo()) + : new KeyInfo(other.getKeyInfo()); if (other.getRawCallbackArgument() instanceof GatewaySenderEventCallbackArgument) { keyInfo.setCallbackArg((new GatewaySenderEventCallbackArgument( (GatewaySenderEventCallbackArgument) other.getRawCallbackArgument()))); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java index 47fa99e4a04b..73d85dd5aca2 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java @@ -1039,6 +1039,7 @@ public void distribute(EnumListenerEvent operation, EntryEventImpl event, List allRemoteDSIds, boolean isLastEventInTransaction) { final boolean isDebugEnabled = logger.isDebugEnabled(); + boolean wasInterrupted = false; // released by this method or transfers ownership to TmpQueueEvent @Released @@ -1153,15 +1154,17 @@ public void distribute(EnumListenerEvent operation, EntryEventImpl event, } } if (enqueuedAllTempQueueEvents) { - try { - while (!getLifeCycleLock().readLock().tryLock(10, TimeUnit.MILLISECONDS)) { - if (!getIsRunningAndDropEventIfNotRunning(event, isDebugEnabled, clonedEvent)) { - return; + while (true) { + try { + while (!getLifeCycleLock().readLock().tryLock(10, TimeUnit.MILLISECONDS)) { + if (!getIsRunningAndDropEventIfNotRunning(event, isDebugEnabled, clonedEvent)) { + return; + } } + break; + } catch (InterruptedException e) { + wasInterrupted = true; } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return; } } } @@ -1210,6 +1213,9 @@ this, getId(), operation, clonedEvent), if (freeClonedEvent) { clonedEvent.release(); // fix for bug 48035 } + if (wasInterrupted) { + Thread.currentThread().interrupt(); + } } } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderTest.java index aac5f0d3c050..d57ba5f99961 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderTest.java @@ -18,15 +18,28 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Set; +import java.util.concurrent.CountDownLatch; import org.junit.Test; +import org.apache.geode.cache.CacheException; +import org.apache.geode.cache.DataPolicy; +import org.apache.geode.cache.EntryEvent; +import org.apache.geode.cache.Operation; import org.apache.geode.cache.Region; import org.apache.geode.cache.wan.GatewayQueueEvent; +import org.apache.geode.distributed.internal.DistributionAdvisor; +import org.apache.geode.internal.cache.EntryEventImpl; +import org.apache.geode.internal.cache.EnumListenerEvent; +import org.apache.geode.internal.cache.InternalRegion; +import org.apache.geode.internal.cache.KeyInfo; import org.apache.geode.internal.cache.RegionQueue; public class AbstractGatewaySenderTest { @@ -58,4 +71,161 @@ public void getSynchronizationEventCanHandleRegionIsNullCase() { assertThat(event).isSameAs(gatewaySenderEvent); } + + @Test + public void distributeFinishesWorkWhenInterrupted() throws InterruptedException { + DummyGatewaySenderEventProcessor processor = new DummyGatewaySenderEventProcessor(); + TestableGatewaySender gatewaySender = new TestableGatewaySender(processor); + EnumListenerEvent operationType = EnumListenerEvent.AFTER_CREATE; + EntryEventImpl event = mock(EntryEventImpl.class); + when(event.getKeyInfo()).thenReturn(mock(KeyInfo.class)); + Operation operation = mock(Operation.class); + when(operation.isLocal()).thenReturn(false); + when(operation.isExpiration()).thenReturn(false); + when(event.getOperation()).thenReturn(operation); + InternalRegion region = mock(InternalRegion.class); + when(region.getDataPolicy()).thenReturn(DataPolicy.PARTITION); + when(event.getRegion()).thenReturn(region); + List allRemoteDSIds = Collections.singletonList(1); + + CountDownLatch lockAcquiredLatch = new CountDownLatch(1); + CountDownLatch unlockLatch = new CountDownLatch(1); + + // Get lifeCycleLock in write mode in new thread so that + // the thread calling distribute will not be able + // to acquire it + Thread thread = new Thread(() -> { + gatewaySender.getLifeCycleLock().writeLock().lock(); + lockAcquiredLatch.countDown(); + try { + unlockLatch.await(); + } catch (InterruptedException ignore) { + } + gatewaySender.getLifeCycleLock().writeLock().unlock(); + }); + thread.start(); + lockAcquiredLatch.await(); + + // Send interrupted and then call distribute + Thread.currentThread().interrupt(); + gatewaySender.distribute(operationType, event, allRemoteDSIds, true); + + unlockLatch.countDown(); + + // Check that the interrupted exception has been reset + assertThat(Thread.currentThread().isInterrupted()).isTrue(); + // Check that the work was finished even if the interrupt signal was set + assertThat(processor.getTimesRegisterEventDroppedInPrimaryQueueCalled()).isEqualTo(1); + } + + public static class TestableGatewaySender extends AbstractGatewaySender { + private int isRunningTimesCalled = 0; + + public TestableGatewaySender(AbstractGatewaySenderEventProcessor eventProcessor) { + this.eventProcessor = eventProcessor; + enqueuedAllTempQueueEvents = true; + } + + @Override + public void fillInProfile(DistributionAdvisor.Profile profile) {} + + @Override + public void start() {} + + @Override + public boolean isPrimary() { + return true; + } + + @Override + public void startWithCleanQueue() {} + + @Override + public void stop() {} + + @Override + public void setModifiedEventId(EntryEventImpl clonedEvent) {} + + @Override + public GatewaySenderStats getStatistics() { + return mock(GatewaySenderStats.class); + } + + @Override + public GatewaySenderAdvisor getSenderAdvisor() { + return mock(GatewaySenderAdvisor.class); + } + + @Override + public boolean isRunning() { + if (isRunningTimesCalled++ == 0) { + return true; + } + return false; + } + + @Override + public String getId() { + return "test"; + } + } + + public static class DummyGatewaySenderEventProcessor extends AbstractGatewaySenderEventProcessor { + + private int timesEnqueueEventCalled = 0; + private int timesRegisterEventDroppedInPrimaryQueueCalled = 0; + + public DummyGatewaySenderEventProcessor() { + super("", new DummyGatewaySender(), null); + } + + @Override + public void enqueueEvent(EnumListenerEvent operation, EntryEvent event, Object substituteValue, + boolean isLastEventInTransaction) throws IOException, CacheException { + timesEnqueueEventCalled++; + } + + public int getTimesEnqueueEventCalled() { + return timesEnqueueEventCalled; + } + + @Override + protected void initializeMessageQueue(String id, boolean cleanQueues) {} + + @Override + protected void rebalance() {} + + public int getTimesRegisterEventDroppedInPrimaryQueueCalled() { + return timesRegisterEventDroppedInPrimaryQueueCalled; + } + + @Override + protected void registerEventDroppedInPrimaryQueue(EntryEventImpl droppedEvent) { + timesRegisterEventDroppedInPrimaryQueueCalled++; + } + + @Override + public void initializeEventDispatcher() {} + + @Override + protected void enqueueEvent(GatewayQueueEvent event) {} + } + + public static class DummyGatewaySender extends AbstractGatewaySender { + @Override + public void fillInProfile(DistributionAdvisor.Profile profile) {} + + @Override + public void start() {} + + @Override + public void startWithCleanQueue() {} + + @Override + public void stop() {} + + @Override + public void setModifiedEventId(EntryEventImpl clonedEvent) {} + + } }