diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java index 9dee1c1c77a0..b544ca065fe3 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java @@ -180,6 +180,13 @@ public class DiskStoreImpl implements DiskStore { public static final String RECOVER_VALUES_SYNC_PROPERTY_NAME = GeodeGlossary.GEMFIRE_PREFIX + "disk.recoverValuesSync"; + /** + * When configured threshold value is reached, then server will overflow to + * the new hashmap during the recovery of .drf files + */ + public static final String DRF_HASHMAP_OVERFLOW_THRESHOLD_NAME = + GeodeGlossary.GEMFIRE_PREFIX + "disk.drfHashMapOverflowThreshold"; + /** * Allows recovering values for LRU regions. By default values are not recovered for LRU regions * during recovery. @@ -187,6 +194,10 @@ public class DiskStoreImpl implements DiskStore { public static final String RECOVER_LRU_VALUES_PROPERTY_NAME = GeodeGlossary.GEMFIRE_PREFIX + "disk.recoverLruValues"; + static final long DRF_HASHMAP_OVERFLOW_THRESHOLD_DEFAULT = 805306368; + static final long DRF_HASHMAP_OVERFLOW_THRESHOLD = + Long.getLong(DRF_HASHMAP_OVERFLOW_THRESHOLD_NAME, DRF_HASHMAP_OVERFLOW_THRESHOLD_DEFAULT); + boolean RECOVER_VALUES = getBoolean(DiskStoreImpl.RECOVER_VALUE_PROPERTY_NAME, true); boolean RECOVER_VALUES_SYNC = getBoolean(DiskStoreImpl.RECOVER_VALUES_SYNC_PROPERTY_NAME, false); @@ -3525,31 +3536,49 @@ public void add(long id) { } try { - if (id > 0 && id <= 0x00000000FFFFFFFFL) { - currentInts.get().add((int) id); + if (shouldOverflow(id)) { + overflowToNewHashMap(id); } else { - currentLongs.get().add(id); + if (id > 0 && id <= 0x00000000FFFFFFFFL) { + this.currentInts.get().add((int) id); + } else { + this.currentLongs.get().add(id); + } } } catch (IllegalArgumentException illegalArgumentException) { // See GEODE-8029. - // Too many entries on the accumulated drf files, overflow and continue. + // Too many entries on the accumulated drf files, overflow next [Int|Long]OpenHashSet and + // continue. + overflowToNewHashMap(id); + } + } + + boolean shouldOverflow(final long id) { + if (id > 0 && id <= 0x00000000FFFFFFFFL) { + return currentInts.get().size() == DRF_HASHMAP_OVERFLOW_THRESHOLD; + } else { + return currentLongs.get().size() == DRF_HASHMAP_OVERFLOW_THRESHOLD; + } + } + + void overflowToNewHashMap(final long id) { + if (DRF_HASHMAP_OVERFLOW_THRESHOLD == DRF_HASHMAP_OVERFLOW_THRESHOLD_DEFAULT) { logger.warn( "There is a large number of deleted entries within the disk-store, please execute an offline compaction."); + } - // Overflow to the next [Int|Long]OpenHashSet and continue. - if (id > 0 && id <= 0x00000000FFFFFFFFL) { - IntOpenHashSet overflownHashSet = new IntOpenHashSet((int) INVALID_ID); - allInts.add(overflownHashSet); - currentInts.set(overflownHashSet); + if (id > 0 && id <= 0x00000000FFFFFFFFL) { + IntOpenHashSet overflownHashSet = new IntOpenHashSet((int) INVALID_ID); + allInts.add(overflownHashSet); + currentInts.set(overflownHashSet); - currentInts.get().add((int) id); - } else { - LongOpenHashSet overflownHashSet = new LongOpenHashSet((int) INVALID_ID); - allLongs.add(overflownHashSet); - currentLongs.set(overflownHashSet); + currentInts.get().add((int) id); + } else { + LongOpenHashSet overflownHashSet = new LongOpenHashSet((int) INVALID_ID); + allLongs.add(overflownHashSet); + currentLongs.set(overflownHashSet); - currentLongs.get().add(id); - } + currentLongs.get().add(id); } } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/OplogEntryIdSetDrfHashSetThresholdTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/OplogEntryIdSetDrfHashSetThresholdTest.java new file mode 100644 index 000000000000..ff7e43e7b541 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/OplogEntryIdSetDrfHashSetThresholdTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.IntStream; +import java.util.stream.LongStream; + +import it.unimi.dsi.fastutil.ints.IntOpenHashSet; +import it.unimi.dsi.fastutil.longs.LongOpenHashSet; +import org.junit.jupiter.api.Test; +import org.junitpioneer.jupiter.SetSystemProperty; + +import org.apache.geode.internal.cache.DiskStoreImpl.OplogEntryIdSet; + +/** + * Tests DiskStoreImpl.OplogEntryIdSet + */ +public class OplogEntryIdSetDrfHashSetThresholdTest { + @Test + @SetSystemProperty(key = "gemfire.disk.drfHashMapOverflowThreshold", value = "10") + public void addMethodOverflowBasedOnDrfOverflowThresholdParameters() { + + int testEntries = 41; + IntOpenHashSet intOpenHashSet = new IntOpenHashSet(); + LongOpenHashSet longOpenHashSet = new LongOpenHashSet(); + + List intOpenHashSets = + new ArrayList<>(Collections.singletonList(intOpenHashSet)); + List longOpenHashSets = + new ArrayList<>(Collections.singletonList(longOpenHashSet)); + + OplogEntryIdSet oplogEntryIdSet = new OplogEntryIdSet(intOpenHashSets, longOpenHashSets); + IntStream.range(1, testEntries).forEach(oplogEntryIdSet::add); + LongStream.range(0x00000000FFFFFFFFL + 1, 0x00000000FFFFFFFFL + testEntries) + .forEach(oplogEntryIdSet::add); + + assertThat(intOpenHashSets).hasSize(4); + assertThat(longOpenHashSets).hasSize(4); + + IntStream.range(1, testEntries).forEach(i -> assertThat(oplogEntryIdSet.contains(i)).isTrue()); + LongStream.range(0x00000000FFFFFFFFL + 1, 0x00000000FFFFFFFFL + testEntries) + .forEach(i -> assertThat(oplogEntryIdSet.contains(i)).isTrue()); + + } +}