diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java index 16adbeca7a34..6a521becbe1e 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java @@ -340,8 +340,9 @@ public EntryEventImpl( op = other.op; distributedMember = other.distributedMember; filterInfo = other.filterInfo; - keyInfo = other.keyInfo.isDistKeyInfo() ? new DistTxKeyInfo((DistTxKeyInfo) other.keyInfo) - : new KeyInfo(other.keyInfo); + keyInfo = + other.getKeyInfo().isDistKeyInfo() ? new DistTxKeyInfo((DistTxKeyInfo) other.getKeyInfo()) + : new KeyInfo(other.getKeyInfo()); if (other.getRawCallbackArgument() instanceof GatewaySenderEventCallbackArgument) { keyInfo.setCallbackArg((new GatewaySenderEventCallbackArgument( (GatewaySenderEventCallbackArgument) other.getRawCallbackArgument()))); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java index 47fa99e4a04b..73d85dd5aca2 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java @@ -1039,6 +1039,7 @@ public void distribute(EnumListenerEvent operation, EntryEventImpl event, List allRemoteDSIds, boolean isLastEventInTransaction) { final boolean isDebugEnabled = logger.isDebugEnabled(); + boolean wasInterrupted = false; // released by this method or transfers ownership to TmpQueueEvent @Released @@ -1153,15 +1154,17 @@ public void distribute(EnumListenerEvent operation, EntryEventImpl event, } } if (enqueuedAllTempQueueEvents) { - try { - while (!getLifeCycleLock().readLock().tryLock(10, TimeUnit.MILLISECONDS)) { - if (!getIsRunningAndDropEventIfNotRunning(event, isDebugEnabled, clonedEvent)) { - return; + while (true) { + try { + while (!getLifeCycleLock().readLock().tryLock(10, TimeUnit.MILLISECONDS)) { + if (!getIsRunningAndDropEventIfNotRunning(event, isDebugEnabled, clonedEvent)) { + return; + } } + break; + } catch (InterruptedException e) { + wasInterrupted = true; } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return; } } } @@ -1210,6 +1213,9 @@ this, getId(), operation, clonedEvent), if (freeClonedEvent) { clonedEvent.release(); // fix for bug 48035 } + if (wasInterrupted) { + Thread.currentThread().interrupt(); + } } } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderTest.java index aac5f0d3c050..d57ba5f99961 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderTest.java @@ -18,15 +18,28 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Set; +import java.util.concurrent.CountDownLatch; import org.junit.Test; +import org.apache.geode.cache.CacheException; +import org.apache.geode.cache.DataPolicy; +import org.apache.geode.cache.EntryEvent; +import org.apache.geode.cache.Operation; import org.apache.geode.cache.Region; import org.apache.geode.cache.wan.GatewayQueueEvent; +import org.apache.geode.distributed.internal.DistributionAdvisor; +import org.apache.geode.internal.cache.EntryEventImpl; +import org.apache.geode.internal.cache.EnumListenerEvent; +import org.apache.geode.internal.cache.InternalRegion; +import org.apache.geode.internal.cache.KeyInfo; import org.apache.geode.internal.cache.RegionQueue; public class AbstractGatewaySenderTest { @@ -58,4 +71,161 @@ public void getSynchronizationEventCanHandleRegionIsNullCase() { assertThat(event).isSameAs(gatewaySenderEvent); } + + @Test + public void distributeFinishesWorkWhenInterrupted() throws InterruptedException { + DummyGatewaySenderEventProcessor processor = new DummyGatewaySenderEventProcessor(); + TestableGatewaySender gatewaySender = new TestableGatewaySender(processor); + EnumListenerEvent operationType = EnumListenerEvent.AFTER_CREATE; + EntryEventImpl event = mock(EntryEventImpl.class); + when(event.getKeyInfo()).thenReturn(mock(KeyInfo.class)); + Operation operation = mock(Operation.class); + when(operation.isLocal()).thenReturn(false); + when(operation.isExpiration()).thenReturn(false); + when(event.getOperation()).thenReturn(operation); + InternalRegion region = mock(InternalRegion.class); + when(region.getDataPolicy()).thenReturn(DataPolicy.PARTITION); + when(event.getRegion()).thenReturn(region); + List allRemoteDSIds = Collections.singletonList(1); + + CountDownLatch lockAcquiredLatch = new CountDownLatch(1); + CountDownLatch unlockLatch = new CountDownLatch(1); + + // Get lifeCycleLock in write mode in new thread so that + // the thread calling distribute will not be able + // to acquire it + Thread thread = new Thread(() -> { + gatewaySender.getLifeCycleLock().writeLock().lock(); + lockAcquiredLatch.countDown(); + try { + unlockLatch.await(); + } catch (InterruptedException ignore) { + } + gatewaySender.getLifeCycleLock().writeLock().unlock(); + }); + thread.start(); + lockAcquiredLatch.await(); + + // Send interrupted and then call distribute + Thread.currentThread().interrupt(); + gatewaySender.distribute(operationType, event, allRemoteDSIds, true); + + unlockLatch.countDown(); + + // Check that the interrupted exception has been reset + assertThat(Thread.currentThread().isInterrupted()).isTrue(); + // Check that the work was finished even if the interrupt signal was set + assertThat(processor.getTimesRegisterEventDroppedInPrimaryQueueCalled()).isEqualTo(1); + } + + public static class TestableGatewaySender extends AbstractGatewaySender { + private int isRunningTimesCalled = 0; + + public TestableGatewaySender(AbstractGatewaySenderEventProcessor eventProcessor) { + this.eventProcessor = eventProcessor; + enqueuedAllTempQueueEvents = true; + } + + @Override + public void fillInProfile(DistributionAdvisor.Profile profile) {} + + @Override + public void start() {} + + @Override + public boolean isPrimary() { + return true; + } + + @Override + public void startWithCleanQueue() {} + + @Override + public void stop() {} + + @Override + public void setModifiedEventId(EntryEventImpl clonedEvent) {} + + @Override + public GatewaySenderStats getStatistics() { + return mock(GatewaySenderStats.class); + } + + @Override + public GatewaySenderAdvisor getSenderAdvisor() { + return mock(GatewaySenderAdvisor.class); + } + + @Override + public boolean isRunning() { + if (isRunningTimesCalled++ == 0) { + return true; + } + return false; + } + + @Override + public String getId() { + return "test"; + } + } + + public static class DummyGatewaySenderEventProcessor extends AbstractGatewaySenderEventProcessor { + + private int timesEnqueueEventCalled = 0; + private int timesRegisterEventDroppedInPrimaryQueueCalled = 0; + + public DummyGatewaySenderEventProcessor() { + super("", new DummyGatewaySender(), null); + } + + @Override + public void enqueueEvent(EnumListenerEvent operation, EntryEvent event, Object substituteValue, + boolean isLastEventInTransaction) throws IOException, CacheException { + timesEnqueueEventCalled++; + } + + public int getTimesEnqueueEventCalled() { + return timesEnqueueEventCalled; + } + + @Override + protected void initializeMessageQueue(String id, boolean cleanQueues) {} + + @Override + protected void rebalance() {} + + public int getTimesRegisterEventDroppedInPrimaryQueueCalled() { + return timesRegisterEventDroppedInPrimaryQueueCalled; + } + + @Override + protected void registerEventDroppedInPrimaryQueue(EntryEventImpl droppedEvent) { + timesRegisterEventDroppedInPrimaryQueueCalled++; + } + + @Override + public void initializeEventDispatcher() {} + + @Override + protected void enqueueEvent(GatewayQueueEvent event) {} + } + + public static class DummyGatewaySender extends AbstractGatewaySender { + @Override + public void fillInProfile(DistributionAdvisor.Profile profile) {} + + @Override + public void start() {} + + @Override + public void startWithCleanQueue() {} + + @Override + public void stop() {} + + @Override + public void setModifiedEventId(EntryEventImpl clonedEvent) {} + + } }