Skip to content

Commit

Permalink
GEODE-9990: turn DiskAccessException into CacheClosedException (apach…
Browse files Browse the repository at this point in the history
…e#7334) (apache#7374)

* GEODE-9990: turn DiskAccessException into CacheClosedException

- when DiskInitFile is in closed state and DiskStoreImpl is closed or
  closing
- catch DiskAccessException in PRHARedundancyProvider and turn into
  CacheClosedException if cache closing is in progress
- change CreateBucketMessage to handle DiskAccessException as cause of
  ReplyException

(cherry picked from commit a98197b)
  • Loading branch information
jmelchio authored Feb 17, 2022
1 parent 8b9c03b commit dee8335
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,15 @@
package org.apache.geode.internal.cache;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.File;
Expand All @@ -31,8 +36,11 @@
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import org.apache.geode.CancelCriterion;
import org.apache.geode.Statistics;
import org.apache.geode.StatisticsFactory;
import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.DiskAccessException;
import org.apache.geode.internal.cache.persistence.DiskRegionView;
import org.apache.geode.internal.cache.persistence.DiskStoreID;

Expand Down Expand Up @@ -134,4 +142,68 @@ public void testKrfIds() {
assertThat(dif.hasKrf(2)).isFalse();
dif.destroy();
}

@Test
public void markInitializedThrowsDiskAccessExceptionWhenInitFileClosedAndParentAndCacheNotClosing() {
markInitializedTestSetup();

DiskInitFile diskInitFile =
new DiskInitFile("testThrows", mockedDiskStoreImpl, false, Collections.emptySet());
diskInitFile.close();

assertThatThrownBy(() -> diskInitFile.markInitialized(mockDiskRegionView)).isInstanceOf(
DiskAccessException.class);
}

@Test
public void markInitializedThrowsCacheClosedExceptionWhenInitFileClosedAndParentIsClosedOrClosing() {
markInitializedTestSetup();
when(mockedDiskStoreImpl.isClosed()).thenReturn(Boolean.TRUE);

DiskInitFile diskInitFile =
new DiskInitFile("testThrows", mockedDiskStoreImpl, false, Collections.emptySet());
diskInitFile.close();

assertThatThrownBy(() -> diskInitFile.markInitialized(mockDiskRegionView)).isInstanceOf(
CacheClosedException.class);
}

@Test
public void markInitializedThrowsCacheClosedExceptionWhenCacheIsClosing() {
CancelCriterion cancelCriterion = markInitializedTestSetup();
CacheClosedException cacheClosedException = new CacheClosedException("boom");
doThrow(cacheClosedException).when(cancelCriterion).checkCancelInProgress();

DiskInitFile diskInitFile =
new DiskInitFile("testThrows", mockedDiskStoreImpl, false, Collections.emptySet());
diskInitFile.close();

assertThatThrownBy(() -> diskInitFile.markInitialized(mockDiskRegionView)).isEqualTo(
cacheClosedException);
}

@Test
public void markInitializedCacheCloseIsCalledWhenParentHandlesDiskAccessException() {
markInitializedTestSetup();

DiskInitFile diskInitFile =
new DiskInitFile("testThrows", mockedDiskStoreImpl, false, Collections.emptySet());
diskInitFile.close();

assertThatThrownBy(() -> diskInitFile.markInitialized(mockDiskRegionView))
.isInstanceOf(DiskAccessException.class);
verify(mockedDiskStoreImpl, times(1)).handleDiskAccessException(any(DiskAccessException.class));
}

private CancelCriterion markInitializedTestSetup() {
InternalCache internalCache = mock(InternalCache.class);
CancelCriterion cancelCriterion = mock(CancelCriterion.class);
DiskRegion diskRegion = mock(DiskRegion.class);

when(mockedDiskStoreImpl.getCache()).thenReturn(internalCache);
when(mockedDiskStoreImpl.getById(anyLong())).thenReturn(diskRegion);
when(internalCache.getCancelCriterion()).thenReturn(cancelCriterion);

return cancelCriterion;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.geode.CancelException;
import org.apache.geode.DataSerializer;
import org.apache.geode.Instantiator;
import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.DiskAccessException;
import org.apache.geode.cache.EvictionAction;
import org.apache.geode.cache.EvictionAlgorithm;
Expand Down Expand Up @@ -1307,11 +1308,24 @@ private void writeIFRecord(ByteBuffer bb) throws IOException {
writeIFRecord(bb, true);
}

private void writeIFRecord(ByteBuffer bb, boolean doStats) throws IOException {
assert lock.isHeldByCurrentThread();
private void checkClosed() {
if (closed) {
throw new DiskAccessException("The disk store is closed", parent);
parent.getCache().getCancelCriterion().checkCancelInProgress();

if (parent.isClosed() || parent.isClosing()) {
throw new CacheClosedException("The disk store is closed or closing");
}

DiskAccessException dae = new DiskAccessException("The disk init file is closed", parent);
parent.handleDiskAccessException(dae);

throw dae;
}
}

private void writeIFRecord(ByteBuffer bb, boolean doStats) throws IOException {
assert lock.isHeldByCurrentThread();
checkClosed();

ifRAF.write(bb.array(), 0, bb.position());
if (logger.isTraceEnabled(LogMarker.PERSIST_WRITES_VERBOSE)) {
Expand All @@ -1327,9 +1341,8 @@ private void writeIFRecord(ByteBuffer bb, boolean doStats) throws IOException {

private void writeIFRecord(HeapDataOutputStream hdos, boolean doStats) throws IOException {
assert lock.isHeldByCurrentThread();
if (closed) {
throw new DiskAccessException("The disk store is closed", parent);
}
checkClosed();

hdos.sendTo(ifRAF);
if (logger.isTraceEnabled(LogMarker.PERSIST_WRITES_VERBOSE)) {
logger.trace(LogMarker.PERSIST_WRITES_VERBOSE, "DiskInitFile writeIFRecord HDOS");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@

import org.apache.logging.log4j.Logger;

import org.apache.geode.CancelCriterion;
import org.apache.geode.CancelException;
import org.apache.geode.SystemFailure;
import org.apache.geode.annotations.Immutable;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.annotations.internal.MakeNotStatic;
import org.apache.geode.cache.DiskAccessException;
import org.apache.geode.cache.PartitionedRegionStorageException;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionDestroyedException;
Expand Down Expand Up @@ -726,6 +728,14 @@ public InternalDistributedMember createBucketAtomically(int bucketId, int newBuc
return bucketPrimary;
}
}
} catch (DiskAccessException dae) {
CancelCriterion cancelCriterion = partitionedRegion.getCancelCriterion();
if (cancelCriterion.isCancelInProgress()) {
needToElectPrimary = false;
cancelCriterion.checkCancelInProgress(dae);
}

throw dae;
} catch (CancelException | RegionDestroyedException e) {
// We don't need to elect a primary if the cache was closed. The other members will
// take care of it. This ensures we don't compromise redundancy.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import org.apache.geode.CancelException;
import org.apache.geode.DataSerializer;
import org.apache.geode.cache.DiskAccessException;
import org.apache.geode.cache.PartitionedRegionStorageException;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DistributionManager;
Expand Down Expand Up @@ -339,9 +340,9 @@ public InternalDistributedMember waitForResponse() throws ForceReattemptExceptio
waitForRepliesUninterruptibly();
} catch (ReplyException e) {
Throwable t = e.getCause();
if (t instanceof CancelException) {
if (t instanceof DiskAccessException || t instanceof CancelException) {
logger.debug(
"NodeResponse got remote cancellation, throwing PartitionedRegionCommunication Exception {}",
"NodeResponse got remote exception, throwing PartitionedRegionCommunication Exception {}",
t.getMessage(), t);
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,21 @@

import static org.apache.geode.cache.Region.SEPARATOR;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -44,10 +49,13 @@
import org.apache.geode.CancelCriterion;
import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.DiskAccessException;
import org.apache.geode.cache.PartitionAttributes;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.control.InternalResourceManager;
import org.apache.geode.internal.cache.partitioned.Bucket;
import org.apache.geode.internal.cache.partitioned.InternalPRInfo;
import org.apache.geode.internal.cache.partitioned.LoadProbe;
import org.apache.geode.internal.cache.partitioned.PartitionedRegionRebalanceOp;
Expand Down Expand Up @@ -247,6 +255,70 @@ public void completesStartupTaskWhenRedundancyRecovered() {
verify(providerStartupTask).complete(any());
}

@Test
public void createBucketAtomicallyConvertsDiskAccessExceptionWhenCacheCloseInProgress() {
String partitionName = "partitionName";
DiskAccessException diskAccessException = new DiskAccessException("boom");
CacheClosedException cacheClosedException = new CacheClosedException(diskAccessException);
InternalDistributedMember internalDistributedMember = mock(InternalDistributedMember.class);
Set<InternalDistributedMember> memberSet = Collections.singleton(internalDistributedMember);
InternalCache internalCache = mock(InternalCache.class);
RegionAdvisor regionAdvisor = mock(RegionAdvisor.class);
Bucket bucket = mock(Bucket.class);
BucketAdvisor bucketAdvisor = mock(BucketAdvisor.class);
CancelCriterion cancelCriterion = mock(CancelCriterion.class);

prHaRedundancyProvider = new PRHARedundancyProvider(partitionedRegion, resourceManager);

when(partitionedRegion.getRegionAdvisor()).thenReturn(regionAdvisor);
when(partitionedRegion.getCache()).thenReturn(internalCache);
when(partitionedRegion.getCancelCriterion()).thenReturn(cancelCriterion);
when(regionAdvisor.getBucket(anyInt())).thenReturn(bucket);
when(bucket.getBucketAdvisor()).thenReturn(bucketAdvisor);
when(internalCache.isCacheAtShutdownAll())
.thenReturn(Boolean.FALSE)
.thenThrow(diskAccessException);
when(cancelCriterion.isCancelInProgress()).thenReturn(Boolean.TRUE);
doThrow(cacheClosedException).when(cancelCriterion).checkCancelInProgress(diskAccessException);
when(partitionedRegion.getRegionAdvisor().adviseFixedPartitionDataStores(partitionName))
.thenReturn(memberSet);

assertThatThrownBy(
() -> prHaRedundancyProvider.createBucketAtomically(1, 5000, false, partitionName))
.isEqualTo(cacheClosedException);
}

@Test
public void createBucketAtomicallyPropagatesDiskAccessExceptionWhenCacheCloseNotInProgress() {
String partitionName = "partitionName";
DiskAccessException diskAccessException = new DiskAccessException("boom");
InternalDistributedMember internalDistributedMember = mock(InternalDistributedMember.class);
Set<InternalDistributedMember> memberSet = Collections.singleton(internalDistributedMember);
InternalCache internalCache = mock(InternalCache.class);
RegionAdvisor regionAdvisor = mock(RegionAdvisor.class);
Bucket bucket = mock(Bucket.class);
BucketAdvisor bucketAdvisor = mock(BucketAdvisor.class);
CancelCriterion cancelCriterion = mock(CancelCriterion.class);

prHaRedundancyProvider = new PRHARedundancyProvider(partitionedRegion, resourceManager);

when(partitionedRegion.getRegionAdvisor()).thenReturn(regionAdvisor);
when(partitionedRegion.getCache()).thenReturn(internalCache);
when(partitionedRegion.getCancelCriterion()).thenReturn(cancelCriterion);
when(regionAdvisor.getBucket(anyInt())).thenReturn(bucket);
when(bucket.getBucketAdvisor()).thenReturn(bucketAdvisor);
when(internalCache.isCacheAtShutdownAll())
.thenReturn(Boolean.FALSE)
.thenThrow(diskAccessException);
when(cancelCriterion.isCancelInProgress()).thenReturn(Boolean.FALSE);
when(partitionedRegion.getRegionAdvisor().adviseFixedPartitionDataStores(partitionName))
.thenReturn(memberSet);

assertThatThrownBy(
() -> prHaRedundancyProvider.createBucketAtomically(1, 5000, false, partitionName))
.isEqualTo(diskAccessException);
}

@Test
@Parameters({"RUNTIME", "CANCEL", "REGION_DESTROYED"})
@TestCaseName("{method}[{index}]: {params}")
Expand Down

0 comments on commit dee8335

Please sign in to comment.