From a98197b5d0a3a2547e0581e475dcabaa82e6e92f Mon Sep 17 00:00:00 2001 From: Joris Melchior Date: Fri, 11 Feb 2022 18:38:04 -0500 Subject: [PATCH] GEODE-9990: turn DiskAccessException into CacheClosedException (#7334) * GEODE-9990: turn DiskAccessException into CacheClosedException - when DiskInitFile is in closed state and DiskStoreImpl is closed or closing - catch DiskAccessException in PRHARedundancyProvider and turn into CacheClosedException if cache closing is in progress - change CreateBucketMessage to handle DiskAccessException as cause of ReplyException --- .../internal/cache/DiskInitFileJUnitTest.java | 72 +++++++++++++++++++ .../geode/internal/cache/DiskInitFile.java | 25 +++++-- .../cache/PRHARedundancyProvider.java | 10 +++ .../partitioned/CreateBucketMessage.java | 5 +- .../cache/PRHARedundancyProviderTest.java | 72 +++++++++++++++++++ 5 files changed, 176 insertions(+), 8 deletions(-) diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/DiskInitFileJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/DiskInitFileJUnitTest.java index 927616dbace9..56b3e552064e 100644 --- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/DiskInitFileJUnitTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/DiskInitFileJUnitTest.java @@ -15,10 +15,15 @@ package org.apache.geode.internal.cache; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.File; @@ -31,8 +36,11 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.apache.geode.CancelCriterion; import org.apache.geode.Statistics; import org.apache.geode.StatisticsFactory; +import org.apache.geode.cache.CacheClosedException; +import org.apache.geode.cache.DiskAccessException; import org.apache.geode.internal.cache.persistence.DiskRegionView; import org.apache.geode.internal.cache.persistence.DiskStoreID; @@ -134,4 +142,68 @@ public void testKrfIds() { assertThat(dif.hasKrf(2)).isFalse(); dif.destroy(); } + + @Test + public void markInitializedThrowsDiskAccessExceptionWhenInitFileClosedAndParentAndCacheNotClosing() { + markInitializedTestSetup(); + + DiskInitFile diskInitFile = + new DiskInitFile("testThrows", mockedDiskStoreImpl, false, Collections.emptySet()); + diskInitFile.close(); + + assertThatThrownBy(() -> diskInitFile.markInitialized(mockDiskRegionView)).isInstanceOf( + DiskAccessException.class); + } + + @Test + public void markInitializedThrowsCacheClosedExceptionWhenInitFileClosedAndParentIsClosedOrClosing() { + markInitializedTestSetup(); + when(mockedDiskStoreImpl.isClosed()).thenReturn(Boolean.TRUE); + + DiskInitFile diskInitFile = + new DiskInitFile("testThrows", mockedDiskStoreImpl, false, Collections.emptySet()); + diskInitFile.close(); + + assertThatThrownBy(() -> diskInitFile.markInitialized(mockDiskRegionView)).isInstanceOf( + CacheClosedException.class); + } + + @Test + public void markInitializedThrowsCacheClosedExceptionWhenCacheIsClosing() { + CancelCriterion cancelCriterion = markInitializedTestSetup(); + CacheClosedException cacheClosedException = new CacheClosedException("boom"); + doThrow(cacheClosedException).when(cancelCriterion).checkCancelInProgress(); + + DiskInitFile diskInitFile = + new DiskInitFile("testThrows", mockedDiskStoreImpl, false, Collections.emptySet()); + diskInitFile.close(); + + assertThatThrownBy(() -> diskInitFile.markInitialized(mockDiskRegionView)).isEqualTo( + cacheClosedException); + } + + @Test + public void markInitializedCacheCloseIsCalledWhenParentHandlesDiskAccessException() { + markInitializedTestSetup(); + + DiskInitFile diskInitFile = + new DiskInitFile("testThrows", mockedDiskStoreImpl, false, Collections.emptySet()); + diskInitFile.close(); + + assertThatThrownBy(() -> diskInitFile.markInitialized(mockDiskRegionView)) + .isInstanceOf(DiskAccessException.class); + verify(mockedDiskStoreImpl, times(1)).handleDiskAccessException(any(DiskAccessException.class)); + } + + private CancelCriterion markInitializedTestSetup() { + InternalCache internalCache = mock(InternalCache.class); + CancelCriterion cancelCriterion = mock(CancelCriterion.class); + DiskRegion diskRegion = mock(DiskRegion.class); + + when(mockedDiskStoreImpl.getCache()).thenReturn(internalCache); + when(mockedDiskStoreImpl.getById(anyLong())).thenReturn(diskRegion); + when(internalCache.getCancelCriterion()).thenReturn(cancelCriterion); + + return cancelCriterion; + } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskInitFile.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskInitFile.java index a9651e58750a..c6baad2ce398 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskInitFile.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskInitFile.java @@ -51,6 +51,7 @@ import org.apache.geode.CancelException; import org.apache.geode.DataSerializer; import org.apache.geode.Instantiator; +import org.apache.geode.cache.CacheClosedException; import org.apache.geode.cache.DiskAccessException; import org.apache.geode.cache.EvictionAction; import org.apache.geode.cache.EvictionAlgorithm; @@ -1307,11 +1308,24 @@ private void writeIFRecord(ByteBuffer bb) throws IOException { writeIFRecord(bb, true); } - private void writeIFRecord(ByteBuffer bb, boolean doStats) throws IOException { - assert lock.isHeldByCurrentThread(); + private void checkClosed() { if (closed) { - throw new DiskAccessException("The disk store is closed", parent); + parent.getCache().getCancelCriterion().checkCancelInProgress(); + + if (parent.isClosed() || parent.isClosing()) { + throw new CacheClosedException("The disk store is closed or closing"); + } + + DiskAccessException dae = new DiskAccessException("The disk init file is closed", parent); + parent.handleDiskAccessException(dae); + + throw dae; } + } + + private void writeIFRecord(ByteBuffer bb, boolean doStats) throws IOException { + assert lock.isHeldByCurrentThread(); + checkClosed(); ifRAF.write(bb.array(), 0, bb.position()); if (logger.isTraceEnabled(LogMarker.PERSIST_WRITES_VERBOSE)) { @@ -1327,9 +1341,8 @@ private void writeIFRecord(ByteBuffer bb, boolean doStats) throws IOException { private void writeIFRecord(HeapDataOutputStream hdos, boolean doStats) throws IOException { assert lock.isHeldByCurrentThread(); - if (closed) { - throw new DiskAccessException("The disk store is closed", parent); - } + checkClosed(); + hdos.sendTo(ifRAF); if (logger.isTraceEnabled(LogMarker.PERSIST_WRITES_VERBOSE)) { logger.trace(LogMarker.PERSIST_WRITES_VERBOSE, "DiskInitFile writeIFRecord HDOS"); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java index e07ab55a395b..0e24e0f3b1a2 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java @@ -43,11 +43,13 @@ import org.apache.logging.log4j.Logger; +import org.apache.geode.CancelCriterion; import org.apache.geode.CancelException; import org.apache.geode.SystemFailure; import org.apache.geode.annotations.Immutable; import org.apache.geode.annotations.VisibleForTesting; import org.apache.geode.annotations.internal.MakeNotStatic; +import org.apache.geode.cache.DiskAccessException; import org.apache.geode.cache.PartitionedRegionStorageException; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionDestroyedException; @@ -726,6 +728,14 @@ public InternalDistributedMember createBucketAtomically(int bucketId, int newBuc return bucketPrimary; } } + } catch (DiskAccessException dae) { + CancelCriterion cancelCriterion = partitionedRegion.getCancelCriterion(); + if (cancelCriterion.isCancelInProgress()) { + needToElectPrimary = false; + cancelCriterion.checkCancelInProgress(dae); + } + + throw dae; } catch (CancelException | RegionDestroyedException e) { // We don't need to elect a primary if the cache was closed. The other members will // take care of it. This ensures we don't compromise redundancy. diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/CreateBucketMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/CreateBucketMessage.java index ad9565e94716..9fcb5b24195e 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/CreateBucketMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/CreateBucketMessage.java @@ -23,6 +23,7 @@ import org.apache.geode.CancelException; import org.apache.geode.DataSerializer; +import org.apache.geode.cache.DiskAccessException; import org.apache.geode.cache.PartitionedRegionStorageException; import org.apache.geode.distributed.internal.ClusterDistributionManager; import org.apache.geode.distributed.internal.DistributionManager; @@ -338,9 +339,9 @@ public InternalDistributedMember waitForResponse() throws ForceReattemptExceptio waitForRepliesUninterruptibly(); } catch (ReplyException e) { Throwable t = e.getCause(); - if (t instanceof CancelException) { + if (t instanceof DiskAccessException || t instanceof CancelException) { logger.debug( - "NodeResponse got remote cancellation, throwing PartitionedRegionCommunication Exception {}", + "NodeResponse got remote exception, throwing PartitionedRegionCommunication Exception {}", t.getMessage(), t); return null; } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PRHARedundancyProviderTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PRHARedundancyProviderTest.java index 6cd69b7b6f5b..7cc089e9cb80 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/PRHARedundancyProviderTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PRHARedundancyProviderTest.java @@ -16,16 +16,21 @@ import static org.apache.geode.cache.Region.SEPARATOR; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.same; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.Collections; import java.util.HashSet; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; @@ -44,10 +49,13 @@ import org.apache.geode.CancelCriterion; import org.apache.geode.cache.CacheClosedException; import org.apache.geode.cache.DataPolicy; +import org.apache.geode.cache.DiskAccessException; import org.apache.geode.cache.PartitionAttributes; import org.apache.geode.cache.RegionDestroyedException; import org.apache.geode.distributed.DistributedSystem; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.cache.control.InternalResourceManager; +import org.apache.geode.internal.cache.partitioned.Bucket; import org.apache.geode.internal.cache.partitioned.InternalPRInfo; import org.apache.geode.internal.cache.partitioned.LoadProbe; import org.apache.geode.internal.cache.partitioned.PartitionedRegionRebalanceOp; @@ -247,6 +255,70 @@ public void completesStartupTaskWhenRedundancyRecovered() { verify(providerStartupTask).complete(any()); } + @Test + public void createBucketAtomicallyConvertsDiskAccessExceptionWhenCacheCloseInProgress() { + String partitionName = "partitionName"; + DiskAccessException diskAccessException = new DiskAccessException("boom"); + CacheClosedException cacheClosedException = new CacheClosedException(diskAccessException); + InternalDistributedMember internalDistributedMember = mock(InternalDistributedMember.class); + Set memberSet = Collections.singleton(internalDistributedMember); + InternalCache internalCache = mock(InternalCache.class); + RegionAdvisor regionAdvisor = mock(RegionAdvisor.class); + Bucket bucket = mock(Bucket.class); + BucketAdvisor bucketAdvisor = mock(BucketAdvisor.class); + CancelCriterion cancelCriterion = mock(CancelCriterion.class); + + prHaRedundancyProvider = new PRHARedundancyProvider(partitionedRegion, resourceManager); + + when(partitionedRegion.getRegionAdvisor()).thenReturn(regionAdvisor); + when(partitionedRegion.getCache()).thenReturn(internalCache); + when(partitionedRegion.getCancelCriterion()).thenReturn(cancelCriterion); + when(regionAdvisor.getBucket(anyInt())).thenReturn(bucket); + when(bucket.getBucketAdvisor()).thenReturn(bucketAdvisor); + when(internalCache.isCacheAtShutdownAll()) + .thenReturn(Boolean.FALSE) + .thenThrow(diskAccessException); + when(cancelCriterion.isCancelInProgress()).thenReturn(Boolean.TRUE); + doThrow(cacheClosedException).when(cancelCriterion).checkCancelInProgress(diskAccessException); + when(partitionedRegion.getRegionAdvisor().adviseFixedPartitionDataStores(partitionName)) + .thenReturn(memberSet); + + assertThatThrownBy( + () -> prHaRedundancyProvider.createBucketAtomically(1, 5000, false, partitionName)) + .isEqualTo(cacheClosedException); + } + + @Test + public void createBucketAtomicallyPropagatesDiskAccessExceptionWhenCacheCloseNotInProgress() { + String partitionName = "partitionName"; + DiskAccessException diskAccessException = new DiskAccessException("boom"); + InternalDistributedMember internalDistributedMember = mock(InternalDistributedMember.class); + Set memberSet = Collections.singleton(internalDistributedMember); + InternalCache internalCache = mock(InternalCache.class); + RegionAdvisor regionAdvisor = mock(RegionAdvisor.class); + Bucket bucket = mock(Bucket.class); + BucketAdvisor bucketAdvisor = mock(BucketAdvisor.class); + CancelCriterion cancelCriterion = mock(CancelCriterion.class); + + prHaRedundancyProvider = new PRHARedundancyProvider(partitionedRegion, resourceManager); + + when(partitionedRegion.getRegionAdvisor()).thenReturn(regionAdvisor); + when(partitionedRegion.getCache()).thenReturn(internalCache); + when(partitionedRegion.getCancelCriterion()).thenReturn(cancelCriterion); + when(regionAdvisor.getBucket(anyInt())).thenReturn(bucket); + when(bucket.getBucketAdvisor()).thenReturn(bucketAdvisor); + when(internalCache.isCacheAtShutdownAll()) + .thenReturn(Boolean.FALSE) + .thenThrow(diskAccessException); + when(cancelCriterion.isCancelInProgress()).thenReturn(Boolean.FALSE); + when(partitionedRegion.getRegionAdvisor().adviseFixedPartitionDataStores(partitionName)) + .thenReturn(memberSet); + + assertThatThrownBy( + () -> prHaRedundancyProvider.createBucketAtomically(1, 5000, false, partitionName)) + .isEqualTo(diskAccessException); + } + @Test @Parameters({"RUNTIME", "CANCEL", "REGION_DESTROYED"}) @TestCaseName("{method}[{index}]: {params}")