diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 5411556ca643..14deb8402bf6 100755 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -26,6 +26,7 @@ import org.apache.iotdb.commons.schema.SchemaConstant; import org.apache.iotdb.commons.service.metric.MetricService; import org.apache.iotdb.commons.utils.NodeUrlUtils; +import org.apache.iotdb.commons.utils.binaryallocator.BinaryAllocator; import org.apache.iotdb.confignode.rpc.thrift.TCQConfig; import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig; import org.apache.iotdb.confignode.rpc.thrift.TRatisConfig; @@ -2845,6 +2846,24 @@ public synchronized void loadHotModifiedProps(Properties properties) // update retry config commonDescriptor.loadRetryProperties(properties); + + // update binary allocator + commonDescriptor + .getConfig() + .setEnableBinaryAllocator( + Boolean.parseBoolean( + Optional.ofNullable( + properties.getProperty( + "enable_binary_allocator", + ConfigurationFileUtils.getConfigurationDefaultValue( + "enable_binary_allocator"))) + .map(String::trim) + .orElse( + ConfigurationFileUtils.getConfigurationDefaultValue( + "enable_binary_allocator")))); + if (!commonDescriptor.getConfig().isEnableBinaryAllocator()) { + BinaryAllocator.DEFAULT.close(false); + } } catch (Exception e) { if (e instanceof InterruptedException) { Thread.currentThread().interrupt(); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/BinaryAllocatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/BinaryAllocatorTest.java new file mode 100644 index 000000000000..b61c4f790f56 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/BinaryAllocatorTest.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.utils; + +import org.apache.iotdb.commons.utils.binaryallocator.AllocatorConfig; +import org.apache.iotdb.commons.utils.binaryallocator.BinaryAllocator; +import org.apache.iotdb.commons.utils.binaryallocator.SizeClasses; + +import org.apache.tsfile.utils.PooledBinary; +import org.junit.Test; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class BinaryAllocatorTest { + @Test + public void testAllocateBinary() { + AllocatorConfig config = new AllocatorConfig(); + config.arenaNum = 1; + BinaryAllocator binaryAllocator = new BinaryAllocator(config); + binaryAllocator.resetArenaBinding(); + + PooledBinary binary = binaryAllocator.allocateBinary(255); + assertNotNull(binary); + assertEquals(binary.getArenaIndex(), -1); + assertEquals(binary.getLength(), 255); + binaryAllocator.deallocateBinary(binary); + + binary = binaryAllocator.allocateBinary(65536); + assertNotNull(binary); + assertEquals(binary.getArenaIndex(), 0); + assertEquals(binary.getLength(), 65536); + binaryAllocator.deallocateBinary(binary); + + binary = binaryAllocator.allocateBinary(65535); + assertNotNull(binary); + assertEquals(binary.getArenaIndex(), 0); + assertEquals(binary.getLength(), 65535); + assertEquals(binary.getValues().length, 65536); + binaryAllocator.deallocateBinary(binary); + } + + @Test + public void testStrategy() throws InterruptedException { + BinaryAllocator binaryAllocator = new BinaryAllocator(AllocatorConfig.DEFAULT_CONFIG); + binaryAllocator.resetArenaBinding(); + + PooledBinary binary1 = binaryAllocator.allocateBinary(4096); + PooledBinary binary2 = binaryAllocator.allocateBinary(4096); + assertEquals(binary1.getArenaIndex(), binary2.getArenaIndex()); + binaryAllocator.deallocateBinary(binary1); + binaryAllocator.deallocateBinary(binary2); + + int threadCount = 4; + CountDownLatch latch = new CountDownLatch(threadCount); + Map arenaUsageCount = new ConcurrentHashMap<>(); + for (int i = 0; i < threadCount; i++) { + Thread thread = + new Thread( + () -> { + try { + PooledBinary firstBinary = binaryAllocator.allocateBinary(2048); + int arenaId = firstBinary.getArenaIndex(); + arenaUsageCount.merge(arenaId, 1, Integer::sum); + binaryAllocator.deallocateBinary(firstBinary); + } finally { + latch.countDown(); + } + }); + thread.start(); + } + + latch.await(); + int maxUsage = Collections.max(arenaUsageCount.values()); + int minUsage = Collections.min(arenaUsageCount.values()); + assertEquals(maxUsage, minUsage); + } + + @Test + public void testEviction() throws InterruptedException { + AllocatorConfig config = new AllocatorConfig(); + config.arenaNum = 1; + config.minAllocateSize = config.maxAllocateSize = 4096; + config.setTimeBetweenEvictorRunsMillis(1); + BinaryAllocator binaryAllocator = new BinaryAllocator(config); + binaryAllocator.resetArenaBinding(); + + PooledBinary binary = binaryAllocator.allocateBinary(4096); + binaryAllocator.deallocateBinary(binary); + assertEquals(binaryAllocator.getTotalUsedMemory(), 4096); + Thread.sleep(200); + assertEquals(binaryAllocator.getTotalUsedMemory(), 0); + } + + @Test + public void testSizeMapping() { + AllocatorConfig config = new AllocatorConfig(); + config.minAllocateSize = 4096; + config.maxAllocateSize = 65536; + SizeClasses sizeClasses = new SizeClasses(config); + + assertEquals(sizeClasses.getSizeClassNum(), 33); + int[] testSizes = {4607, 8191, 16383, 32767, 65535}; + + for (int size : testSizes) { + int sizeIdx = sizeClasses.size2SizeIdx(size); + int mappedSize = sizeClasses.sizeIdx2size(sizeIdx); + + assertEquals("Mapped size should be >= original size", mappedSize, size + 1); + + if (sizeIdx > 0) { + int previousSize = sizeClasses.sizeIdx2size(sizeIdx - 1); + assertTrue("Previous size should be < original size", previousSize < size); + } + } + } +} diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template index cdcd5a8571a4..103f26df3cef 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template @@ -1493,6 +1493,40 @@ data_region_iot_max_memory_ratio_for_queue = 0.6 # Datatype: long region_migration_speed_limit_bytes_per_second = 33554432 +#################### +### Blob Allocator Configuration +#################### +# Whether to enable binary allocator. +# For scenarios where large binary streams cause severe GC, enabling this parameter significantly improves performance. +# effectiveMode: hot_reload +enable_binary_allocator=true + +# The size boundaries that allocator is responsible for +# lower boundary for allocation size +# unit: bytes +# Datatype: int +# effectiveMode: restart +small_binary_object=4096 + +# The size boundaries that allocator is responsible for +# upper boundary for allocation size +# unit: bytes +# Datatype: int +# effectiveMode: restart +huge_binary_object=1048576 + +# Number of arena regions in blob allocator, used to control concurrent performance +# Datatype: int +# effectiveMode: restart +arena_num=4 + +# Control the number of slabs in allocator +# The number of different sizes in each power-of-2 interval is 2^LOG2_SIZE_CLASS_GROUP +# For example: if LOG2_SIZE_CLASS_GROUP=3, between 1024-2048 there will be 8 different sizes +# Datatype: int +# effectiveMode: restart +log2_size_class_group=3 + #################### ### TsFile Configurations #################### diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index 0a7dae166dd6..16127d167bf6 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -317,6 +317,16 @@ public class CommonConfig { private long seriesLimitThreshold = -1; private long deviceLimitThreshold = -1; + private boolean enableBinaryAllocator = true; + + private int arenaNum = 4; + + private int minAllocateSize = 4096; + + private int maxAllocateSize = 1024 * 1024; + + private int log2SizeClassGroup = 3; + // time in nanosecond precision when starting up private final long startUpNanosecond = System.nanoTime(); @@ -1468,4 +1478,44 @@ public long getRemoteWriteMaxRetryDurationInMs() { public void setRemoteWriteMaxRetryDurationInMs(long remoteWriteMaxRetryDurationInMs) { this.remoteWriteMaxRetryDurationInMs = remoteWriteMaxRetryDurationInMs; } + + public int getArenaNum() { + return arenaNum; + } + + public void setArenaNum(int arenaNum) { + this.arenaNum = arenaNum; + } + + public int getMinAllocateSize() { + return minAllocateSize; + } + + public void setMinAllocateSize(int minAllocateSize) { + this.minAllocateSize = minAllocateSize; + } + + public int getMaxAllocateSize() { + return maxAllocateSize; + } + + public void setMaxAllocateSize(int maxAllocateSize) { + this.maxAllocateSize = maxAllocateSize; + } + + public boolean isEnableBinaryAllocator() { + return enableBinaryAllocator; + } + + public void setEnableBinaryAllocator(boolean enableBinaryAllocator) { + this.enableBinaryAllocator = enableBinaryAllocator; + } + + public int getLog2SizeClassGroup() { + return log2SizeClassGroup; + } + + public void setLog2SizeClassGroup(int log2SizeClassGroup) { + this.log2SizeClassGroup = log2SizeClassGroup; + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java index 0b175dca2db9..92ed685b3c76 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java @@ -22,6 +22,7 @@ import org.apache.iotdb.commons.enums.HandleSystemErrorStrategy; import org.apache.iotdb.commons.enums.PipeRemainingTimeRateAverageTime; import org.apache.iotdb.commons.utils.CommonDateTimeUtils; +import org.apache.iotdb.commons.utils.binaryallocator.BinaryAllocator; import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig; import java.io.File; @@ -245,6 +246,7 @@ public void loadCommonProps(Properties properties) throws IOException { String.valueOf(config.getDeviceLimitThreshold())))); loadRetryProperties(properties); + loadBinaryAllocatorProps(properties); } private void loadPipeProps(Properties properties) { @@ -737,6 +739,30 @@ public void loadRetryProperties(Properties properties) throws IOException { "enable_retry_for_unknown_error")))); } + public void loadBinaryAllocatorProps(Properties properties) { + config.setEnableBinaryAllocator( + Boolean.parseBoolean( + properties.getProperty( + "enable_binary_allocator", Boolean.toString(config.isEnableBinaryAllocator())))); + config.setMinAllocateSize( + Integer.parseInt( + properties.getProperty( + "small_blob_object", String.valueOf(config.getMinAllocateSize())))); + config.setMaxAllocateSize( + Integer.parseInt( + properties.getProperty( + "huge_blob_object", String.valueOf(config.getMaxAllocateSize())))); + config.setArenaNum( + Integer.parseInt( + properties.getProperty("arena_num", String.valueOf(config.getArenaNum())))); + config.setLog2SizeClassGroup( + Integer.parseInt( + properties.getProperty( + "log2_size_class_group", String.valueOf(config.getLog2SizeClassGroup())))); + + BinaryAllocator.DEFAULT.allocateBinary(100); + } + public void loadGlobalConfig(TGlobalConfig globalConfig) { config.setTimestampPrecision(globalConfig.timestampPrecision); config.setTimePartitionOrigin( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java index 5812fac36f23..04757610064a 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java @@ -184,6 +184,7 @@ public enum Metric { LOAD_TIME_COST("load_time_cost"), LOAD_POINT_COUNT("load_point_count"), MEMTABLE_POINT_COUNT("memtable_point_count"), + BINARY_ALLOCATOR("binary_allocator"), ; final String value; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/binaryallocator/AdaptiveWeightedAverage.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/binaryallocator/AdaptiveWeightedAverage.java new file mode 100644 index 000000000000..3c347f1e9993 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/binaryallocator/AdaptiveWeightedAverage.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.utils.binaryallocator; + +import static java.lang.Math.max; + +/** + * This file is modified from JDK17 src/hotspot/share/gc/shared/gcUtil.hpp. But some necessary + * modifications are made to adapt to the usage of binary allocator: + * + *

Adaptive weighted average implementation for memory allocation tracking. During each eviction + * cycle, records the peak memory allocation size via sampling, then uses this peak to calculate a + * weighted moving average. + */ +public class AdaptiveWeightedAverage { + private float average; + private int sampleCount; + private int tmpMaxSample; + private final int weight; + private boolean isOld; // Enable to have enough historical data + private static final int OLD_THRESHOLD = 100; + + public AdaptiveWeightedAverage(int weight) { + this.weight = weight; + average = 0f; + sampleCount = 0; + tmpMaxSample = 0; + } + + public void sample(int newSample) { + tmpMaxSample = max(tmpMaxSample, newSample); + } + + // called at the end of each eviction cycle + public void update() { + incrementCount(); + + // Compute the new weighted average + int newSample = tmpMaxSample; + tmpMaxSample = 0; + average = computeAdaptiveAverage(newSample, average); + } + + public float average() { + return average; + } + + public void clear() { + average = 0f; + sampleCount = 0; + tmpMaxSample = 0; + } + + void incrementCount() { + sampleCount++; + + if (!isOld && sampleCount > OLD_THRESHOLD) { + isOld = true; + } + } + + float computeAdaptiveAverage(int newSample, float average) { + // We smooth the samples by not using weight() directly until we've + // had enough data to make it meaningful. We'd like the first weight + // used to be 1, the second to be 1/2, etc until we have + // OLD_THRESHOLD/weight samples. + int countWeight = 0; + + // Avoid division by zero if the counter wraps + if (!isOld) { + countWeight = OLD_THRESHOLD / sampleCount; + } + + int adaptiveWeight = max(weight, countWeight); + + return (100.0f - adaptiveWeight) * average / 100.0f + adaptiveWeight * newSample / 100.0f; + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/binaryallocator/AllocatorConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/binaryallocator/AllocatorConfig.java new file mode 100644 index 000000000000..0eb1cd4f652b --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/binaryallocator/AllocatorConfig.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.utils.binaryallocator; + +import org.apache.iotdb.commons.conf.CommonDescriptor; + +import java.time.Duration; + +public class AllocatorConfig { + public int minAllocateSize = CommonDescriptor.getInstance().getConfig().getMinAllocateSize(); + + public int maxAllocateSize = CommonDescriptor.getInstance().getConfig().getMaxAllocateSize(); + + public int arenaNum = CommonDescriptor.getInstance().getConfig().getArenaNum(); + + public int log2ClassSizeGroup = + CommonDescriptor.getInstance().getConfig().getLog2SizeClassGroup(); + + public boolean enableBinaryAllocator = + CommonDescriptor.getInstance().getConfig().isEnableBinaryAllocator(); + + /** Maximum wait time in milliseconds when shutting down the evictor */ + public Duration durationEvictorShutdownTimeout = Duration.ofMillis(10000L); + + /** Time interval in milliseconds between two consecutive evictor runs */ + public Duration durationBetweenEvictorRuns = Duration.ofMillis(10000L); + + public int arenaPredictionWeight = 35; + + public static final AllocatorConfig DEFAULT_CONFIG = new AllocatorConfig(); + + public void setEvictionShutdownTimeoutMillis(long timeout) { + this.durationEvictorShutdownTimeout = Duration.ofMillis(timeout); + } + + public void setTimeBetweenEvictorRunsMillis(long time) { + this.durationBetweenEvictorRuns = Duration.ofMillis(time); + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/binaryallocator/Arena.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/binaryallocator/Arena.java new file mode 100644 index 000000000000..d3e0ae39b4a4 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/binaryallocator/Arena.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.utils.binaryallocator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; + +public class Arena { + private static final Logger LOGGER = LoggerFactory.getLogger(Arena.class); + private final int arenaID; + private SlabRegion[] regions; + private final SizeClasses sizeClasses; + private Evictor sampleEvictor; + private final BinaryAllocator binaryAllocator; + public AtomicInteger numRegisterThread = new AtomicInteger(0); + + private int sampleCount; + private static final int EVICT_SAMPLE_COUNT = 100; + + private final Duration evictorShutdownTimeoutDuration; + private final Duration durationBetweenEvictionRuns; + + public Arena( + BinaryAllocator allocator, SizeClasses sizeClasses, int id, AllocatorConfig allocatorConfig) { + this.sizeClasses = sizeClasses; + this.arenaID = id; + this.evictorShutdownTimeoutDuration = allocatorConfig.durationEvictorShutdownTimeout; + this.durationBetweenEvictionRuns = allocatorConfig.durationBetweenEvictorRuns; + this.binaryAllocator = allocator; + regions = new SlabRegion[sizeClasses.getSizeClassNum()]; + + for (int i = 0; i < regions.length; i++) { + regions[i] = new SlabRegion(sizeClasses.sizeIdx2size(i), allocatorConfig); + } + + sampleCount = 0; + + restart(); + } + + public int getArenaID() { + return arenaID; + } + + public byte[] allocate(int reqCapacity) { + final int sizeIdx = sizeClasses.size2SizeIdx(reqCapacity); + + SlabRegion region = regions[sizeIdx]; + return region.allocate(); + } + + public void deallocate(byte[] bytes) { + final int sizeIdx = sizeClasses.size2SizeIdx(bytes.length); + + SlabRegion region = regions[sizeIdx]; + region.deallocate(bytes); + } + + public void evict(double ratio) { + for (SlabRegion region : regions) { + region.evict(ratio); + } + } + + public void close() { + evict(1.0); + sampleEvictor.stopEvictor(); + } + + public void restart() { + sampleEvictor = + new SampleEvictor("arena-" + arenaID + "-sample-evictor", evictorShutdownTimeoutDuration); + sampleEvictor.startEvictor(durationBetweenEvictionRuns); + } + + public long getTotalUsedMemory() { + long totalUsedMemory = 0; + for (SlabRegion region : regions) { + totalUsedMemory += region.getTotalUsedMemory(); + } + return totalUsedMemory; + } + + public long getActiveMemory() { + long totalActiveMemory = 0; + for (SlabRegion region : regions) { + totalActiveMemory += region.size * (region.allocations.get() - region.deallocations.get()); + } + return totalActiveMemory; + } + + public class SampleEvictor extends Evictor { + + public SampleEvictor(String name, Duration evictorShutdownTimeoutDuration) { + super(name, evictorShutdownTimeoutDuration); + } + + @Override + public void run() { + LOGGER.debug("Arena-{} running evictor", arenaID); + + // update metric + int allocateFromSlabDelta = 0, allocateFromJVMDelta = 0; + for (SlabRegion region : regions) { + allocateFromSlabDelta += region.size * (region.allocations.get() - region.prevAllocations); + region.prevAllocations = region.allocations.get(); + allocateFromJVMDelta += + region.size * (region.allocationsFromJVM.get() - region.prevAllocationsFromJVM); + region.prevAllocationsFromJVM = region.allocationsFromJVM.get(); + } + binaryAllocator.getMetrics().updateCounter(allocateFromSlabDelta, allocateFromJVMDelta); + + // Start sampling + for (SlabRegion region : regions) { + region.updateSample(); + } + + sampleCount++; + if (sampleCount == EVICT_SAMPLE_COUNT) { + // Evict + for (SlabRegion region : regions) { + region.resize(); + } + sampleCount = 0; + } + } + } + + private static class SlabRegion { + private final int size; + private final Queue queue; + + private final AtomicInteger allocations; + private final AtomicInteger allocationsFromJVM; + private final AtomicInteger deallocations; + private final AtomicInteger evictions; + + public int prevAllocations; + public int prevAllocationsFromJVM; + AdaptiveWeightedAverage average; + + SlabRegion(int size, AllocatorConfig allocatorConfig) { + this.size = size; + this.average = new AdaptiveWeightedAverage(allocatorConfig.arenaPredictionWeight); + queue = new ConcurrentLinkedQueue<>(); + allocations = new AtomicInteger(0); + allocationsFromJVM = new AtomicInteger(0); + deallocations = new AtomicInteger(0); + evictions = new AtomicInteger(0); + prevAllocations = 0; + prevAllocationsFromJVM = 0; + } + + public final byte[] allocate() { + byte[] bytes = queue.poll(); + if (bytes == null) { + allocationsFromJVM.incrementAndGet(); + return new byte[this.size]; + } + allocations.incrementAndGet(); + return bytes; + } + + public void deallocate(byte[] bytes) { + deallocations.incrementAndGet(); + queue.add(bytes); + } + + public void updateSample() { + average.sample(getActiveSize()); + } + + public void resize() { + average.update(); + int needRemain = (int) Math.ceil(average.average()) - getActiveSize(); + int evictNum = getQueueSize() - needRemain; + while (evictNum > 0 && !queue.isEmpty()) { + queue.poll(); + evictions.incrementAndGet(); + evictNum--; + } + } + + public void evict(double ratio) { + int remain = getQueueSize(); + remain = (int) (remain * ratio); + while (remain > 0 && !queue.isEmpty()) { + queue.poll(); + evictions.incrementAndGet(); + remain--; + } + } + + public long getTotalUsedMemory() { + return (long) size * getQueueSize(); + } + + // ConcurrentLinkedQueue::size() is O(n) + private int getQueueSize() { + return deallocations.get() - allocations.get() - evictions.get(); + } + + private int getActiveSize() { + return allocations.get() + allocationsFromJVM.get() - deallocations.get(); + } + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/binaryallocator/ArenaStrategy.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/binaryallocator/ArenaStrategy.java new file mode 100644 index 000000000000..206670bc5d63 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/binaryallocator/ArenaStrategy.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.utils.binaryallocator; + +/** + * This interface defines a strategy for choosing a {@link Arena} from an array of {@link Arena}s. + * Implementations of this interface can provide various strategies for selection based on specific + * criteria. + */ +public interface ArenaStrategy { + /** + * Chooses a {@link Arena} from the given array of {@link Arena}s. + * + * @param arenas an array of {@link Arena}s to choose from + * @return the selected {@link Arena} + */ + Arena choose(Arena[] arenas); +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/binaryallocator/BinaryAllocator.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/binaryallocator/BinaryAllocator.java new file mode 100644 index 000000000000..e70e81be10c4 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/binaryallocator/BinaryAllocator.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.utils.binaryallocator; + +import org.apache.iotdb.commons.service.metric.JvmGcMonitorMetrics; +import org.apache.iotdb.commons.service.metric.MetricService; +import org.apache.iotdb.commons.utils.TestOnly; + +import org.apache.tsfile.utils.PooledBinary; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.concurrent.atomic.AtomicReference; + +public class BinaryAllocator { + + private static final Logger LOGGER = LoggerFactory.getLogger(BinaryAllocator.class); + + private final Arena[] heapArenas; + private final AllocatorConfig allocatorConfig; + + public static final BinaryAllocator DEFAULT = new BinaryAllocator(AllocatorConfig.DEFAULT_CONFIG); + private ArenaStrategy arenaStrategy = new LeastUsedArenaStrategy(); + private AtomicReference state = + new AtomicReference<>(BinaryAllocatorState.UNINITIALIZED); + + private BinaryAllocatorMetrics metrics; + private static ThreadLocal arenaRegistry = + ThreadLocal.withInitial(() -> new ThreadArenaRegistry()); + + private Evictor evictor; + private static final String GC_EVICTOR_NAME = "binary-allocator-gc-evictor"; + + private static final int WARNING_GC_TIME_PERCENTAGE = 10; + private static final int HALF_GC_TIME_PERCENTAGE = 20; + private static final int SHUTDOWN_GC_TIME_PERCENTAGE = 30; + private static final int RESTART_GC_TIME_PERCENTAGE = 20; + + public BinaryAllocator(AllocatorConfig allocatorConfig) { + this.allocatorConfig = allocatorConfig; + + heapArenas = newArenaArray(allocatorConfig.arenaNum); + SizeClasses sizeClasses = new SizeClasses(allocatorConfig); + + for (int i = 0; i < heapArenas.length; i++) { + Arena arena = new Arena(this, sizeClasses, i, allocatorConfig); + heapArenas[i] = arena; + } + + this.metrics = new BinaryAllocatorMetrics(this); + MetricService.getInstance().addMetricSet(this.metrics); + + if (allocatorConfig.enableBinaryAllocator) { + state.set(BinaryAllocatorState.OPEN); + evictor = new GCEvictor(GC_EVICTOR_NAME, allocatorConfig.durationEvictorShutdownTimeout); + evictor.startEvictor(allocatorConfig.durationBetweenEvictorRuns); + } else { + state.set(BinaryAllocatorState.CLOSE); + this.close(false); + } + } + + public PooledBinary allocateBinary(int reqCapacity) { + if (reqCapacity < allocatorConfig.minAllocateSize + || reqCapacity > allocatorConfig.maxAllocateSize) { + return new PooledBinary(new byte[reqCapacity]); + } + + Arena arena = arenaStrategy.choose(heapArenas); + + return new PooledBinary(arena.allocate(reqCapacity), reqCapacity, arena.getArenaID()); + } + + public void deallocateBinary(PooledBinary binary) { + if (binary != null + && binary.getLength() >= allocatorConfig.minAllocateSize + && binary.getLength() <= allocatorConfig.maxAllocateSize) { + int arenaIndex = binary.getArenaIndex(); + if (arenaIndex != -1) { + Arena arena = heapArenas[arenaIndex]; + arena.deallocate(binary.getValues()); + } + } + } + + public void deallocateBatch(PooledBinary[] blobs) { + for (PooledBinary blob : blobs) { + deallocateBinary(blob); + } + } + + public long getTotalUsedMemory() { + long totalUsedMemory = 0; + for (Arena arena : heapArenas) { + totalUsedMemory += arena.getTotalUsedMemory(); + } + return totalUsedMemory; + } + + public long getTotalActiveMemory() { + long totalActiveMemory = 0; + for (Arena arena : heapArenas) { + totalActiveMemory += arena.getActiveMemory(); + } + return totalActiveMemory; + } + + public void evict(double ratio) { + for (Arena arena : heapArenas) { + arena.evict(ratio); + } + } + + public boolean isOpen() { + return state.get() == BinaryAllocatorState.OPEN; + } + + public void close(boolean needReopen) { + if (needReopen) { + state.set(BinaryAllocatorState.TMP_CLOSE); + } else { + state.set(BinaryAllocatorState.CLOSE); + if (evictor != null) { + evictor.stopEvictor(); + } + } + for (Arena arena : heapArenas) { + arena.close(); + } + } + + private void restart() { + state.set(BinaryAllocatorState.OPEN); + for (Arena arena : heapArenas) { + arena.restart(); + } + } + + @TestOnly + public void resetArenaBinding() { + arenaRegistry.get().unbindArena(); + } + + public BinaryAllocatorMetrics getMetrics() { + return metrics; + } + + @SuppressWarnings("unchecked") + private static Arena[] newArenaArray(int size) { + return new Arena[size]; + } + + private static class ThreadArenaRegistry { + private Arena threadArenaBinding = null; + + public Arena getArena() { + return threadArenaBinding; + } + + public void bindArena(Arena arena) { + threadArenaBinding = arena; + arena.numRegisterThread.incrementAndGet(); + } + + public void unbindArena() { + Arena arena = threadArenaBinding; + if (arena != null) { + arena.numRegisterThread.decrementAndGet(); + threadArenaBinding = null; + } + } + + @Override + protected void finalize() { + unbindArena(); + } + } + + private class LeastUsedArenaStrategy implements ArenaStrategy { + @Override + public Arena choose(Arena[] arenas) { + Arena boundArena = arenaRegistry.get().getArena(); + if (boundArena != null) { + return boundArena; + } + + if (arenas == null || arenas.length == 0) { + return null; + } + + Arena minArena = arenas[0]; + + for (int i = 1; i < arenas.length; i++) { + Arena arena = arenas[i]; + if (arena.numRegisterThread.get() < minArena.numRegisterThread.get()) { + minArena = arena; + } + } + + arenaRegistry.get().bindArena(minArena); + return minArena; + } + } + + public class GCEvictor extends Evictor { + public GCEvictor(String name, Duration evictorShutdownTimeoutDuration) { + super(name, evictorShutdownTimeoutDuration); + } + + @Override + public void run() { + LOGGER.debug("Binary allocator running evictor"); + long GcTimePercent = JvmGcMonitorMetrics.getInstance().getGcData().getGcTimePercentage(); + if (state.get() == BinaryAllocatorState.TMP_CLOSE) { + if (GcTimePercent > RESTART_GC_TIME_PERCENTAGE) { + restart(); + } + return; + } + + if (GcTimePercent > SHUTDOWN_GC_TIME_PERCENTAGE) { + LOGGER.warn( + "Binary allocator is shutting down because of high GC time percentage{}", + GcTimePercent); + for (Arena arena : heapArenas) { + arena.evict(1.0); + } + close(true); + } else if (GcTimePercent > HALF_GC_TIME_PERCENTAGE) { + LOGGER.warn( + "Binary allocator is half evicting because of high GC time percentage{}", + GcTimePercent); + for (Arena arena : heapArenas) { + arena.evict(0.5); + } + } else if (GcTimePercent > WARNING_GC_TIME_PERCENTAGE) { + LOGGER.warn( + "Binary allocator is running evictor because of high GC time percentage{}", + GcTimePercent); + for (Arena arena : heapArenas) { + arena.evict(0.2); + } + } + } + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/binaryallocator/BinaryAllocatorMetrics.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/binaryallocator/BinaryAllocatorMetrics.java new file mode 100644 index 000000000000..9be978977345 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/binaryallocator/BinaryAllocatorMetrics.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.utils.binaryallocator; + +import org.apache.iotdb.commons.service.metric.enums.Metric; +import org.apache.iotdb.commons.service.metric.enums.Tag; +import org.apache.iotdb.metrics.AbstractMetricService; +import org.apache.iotdb.metrics.metricsets.IMetricSet; +import org.apache.iotdb.metrics.type.Counter; +import org.apache.iotdb.metrics.utils.MetricLevel; +import org.apache.iotdb.metrics.utils.MetricType; + +public class BinaryAllocatorMetrics implements IMetricSet { + + private static final String TOTAL_MEMORY = "total-memory"; + private static final String ALLOCATE_FROM_SLAB = "allocate-from-slab"; + private static final String ALLOCATE_FROM_JVM = "allocate-from-jvm"; + private static final String ACTIVE_MEMORY = "active-memory"; + + private final BinaryAllocator binaryAllocator; + private Counter allocateFromSlab; + private Counter allocateFromJVM; + + public BinaryAllocatorMetrics(final BinaryAllocator binaryAllocator) { + this.binaryAllocator = binaryAllocator; + } + + @Override + public void bindTo(AbstractMetricService metricService) { + metricService.createAutoGauge( + Metric.BINARY_ALLOCATOR.toString(), + MetricLevel.IMPORTANT, + binaryAllocator, + BinaryAllocator::getTotalUsedMemory, + Tag.NAME.toString(), + TOTAL_MEMORY); + metricService.createAutoGauge( + Metric.BINARY_ALLOCATOR.toString(), + MetricLevel.IMPORTANT, + binaryAllocator, + BinaryAllocator::getTotalActiveMemory, + Tag.NAME.toString(), + ACTIVE_MEMORY); + allocateFromSlab = + metricService.getOrCreateCounter( + Metric.BINARY_ALLOCATOR.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + ALLOCATE_FROM_SLAB); + allocateFromJVM = + metricService.getOrCreateCounter( + Metric.BINARY_ALLOCATOR.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + ALLOCATE_FROM_JVM); + } + + @Override + public void unbindFrom(AbstractMetricService metricService) { + metricService.remove( + MetricType.AUTO_GAUGE, + Metric.BINARY_ALLOCATOR.toString(), + Tag.NAME.toString(), + TOTAL_MEMORY); + metricService.remove( + MetricType.AUTO_GAUGE, + Metric.BINARY_ALLOCATOR.toString(), + Tag.NAME.toString(), + ACTIVE_MEMORY); + metricService.remove( + MetricType.COUNTER, + Metric.BINARY_ALLOCATOR.toString(), + Tag.NAME.toString(), + ALLOCATE_FROM_SLAB); + metricService.remove( + MetricType.COUNTER, + Metric.BINARY_ALLOCATOR.toString(), + Tag.NAME.toString(), + ALLOCATE_FROM_JVM); + } + + public void updateCounter(int allocateFromSlabDelta, int allocateFromJVMDelta) { + allocateFromSlab.inc(allocateFromSlabDelta); + allocateFromJVM.inc(allocateFromJVMDelta); + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/binaryallocator/BinaryAllocatorState.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/binaryallocator/BinaryAllocatorState.java new file mode 100644 index 000000000000..a3dd737e4709 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/binaryallocator/BinaryAllocatorState.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.utils.binaryallocator; + +/** + * The state transmission of a binary allocator. + * + *

+ *     ----------------------------------------
+ *     |                                      |
+ *     |              ----------              |
+ *     |              |        |              |
+ *     |              v        |              v
+ * UNINITIALIZED --> OPEN ---> TMP_CLOSE --> CLOSE
+ *                    |                       ^
+ *                    |                       |
+ *                    -------------------------
+ * 
+ */ +public enum BinaryAllocatorState { + /** + * Binary allocator is open for allocation. + * + *

1.When configuration 'enableBinaryAllocator' is set to true, binary allocator state becomes + * OPEN. + * + *

2.When current state is TMP_CLOSE and severe GC overhead is detected, the state will be set + * to OPEN. + */ + OPEN, + + /** + * Binary allocator is close. When configuration 'enableBinaryAllocator' is set to false, binary + * allocator state becomes CLOSE. + */ + CLOSE, + + /** Binary allocator is temporarily closed by GC evictor. */ + TMP_CLOSE, + + /** The initial state of a binary allocator. */ + UNINITIALIZED; + + @Override + public String toString() { + return name(); + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/binaryallocator/EvictionTimer.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/binaryallocator/EvictionTimer.java new file mode 100644 index 000000000000..e6a6f03b69af --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/binaryallocator/EvictionTimer.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.utils.binaryallocator; + +import org.apache.iotdb.commons.concurrent.IoTThreadFactory; +import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; + +import java.lang.ref.WeakReference; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * This file is modified from org.apache.commons.pool2.impl.EvictTimer to make some adaptations for + * IoTDB. + */ +public class EvictionTimer { + private static ScheduledThreadPoolExecutor executor; + + /** + * Task that removes references to abandoned tasks and shuts down the executor if there are no + * live tasks left. + */ + private static class Reaper implements Runnable { + @Override + public void run() { + synchronized (EvictionTimer.class) { + for (final Map.Entry, WeakRunner> entry : + TASK_MAP.entrySet()) { + if (entry.getKey().get() == null) { + executor.remove(entry.getValue()); + TASK_MAP.remove(entry.getKey()); + } + } + if (TASK_MAP.isEmpty() && executor != null) { + executor.shutdown(); + executor.setCorePoolSize(0); + executor = null; + } + } + } + } + + /** + * Runnable that runs the referent of a weak reference. When the referent is no no longer + * reachable, run is no-op. + */ + private static class WeakRunner implements Runnable { + + private final WeakReference ref; + + /** Constructs a new instance to track the given reference. */ + private WeakRunner(final WeakReference ref) { + this.ref = ref; + } + + @Override + public void run() { + final Runnable task = ref.get(); + if (task != null) { + task.run(); + } else { + executor.remove(this); + TASK_MAP.remove(ref); + } + } + } + + /** Keys are weak references to tasks, values are runners managed by executor. */ + private static final HashMap, WeakRunner> TASK_MAP = + new HashMap<>(); + + /** + * Removes the specified eviction task from the timer. + * + * @param evictor Task to be cancelled. + * @param timeout If the associated executor is no longer required, how long should this thread + * wait for the executor to terminate. + * @param restarting The state of the evictor. + */ + public static synchronized void cancel( + final Evictor evictor, final Duration timeout, final boolean restarting) { + if (evictor != null) { + evictor.cancel(); + remove(evictor); + } + if (!restarting && executor != null && TASK_MAP.isEmpty()) { + executor.shutdown(); + try { + executor.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS); + } catch (final InterruptedException e) { + } + executor.setCorePoolSize(0); + executor = null; + } + } + + /** Removes evictor from the task set and executor. Only called when holding the class lock. */ + private static void remove(final Evictor evictor) { + for (final Map.Entry, WeakRunner> entry : TASK_MAP.entrySet()) { + if (entry.getKey().get() == evictor) { + executor.remove(entry.getValue()); + TASK_MAP.remove(entry.getKey()); + break; + } + } + } + + /** + * Adds the specified eviction task to the timer. Tasks that are added with a call to this method + * *must* call {@link #cancel(Evictor, Duration, boolean)} to cancel the task to prevent memory + * and/or thread leaks in application server environments. + * + * @param task Task to be scheduled. + * @param delay Delay in milliseconds before task is executed. + * @param period Time in milliseconds between executions. + * @param name Name of the thread. + */ + public static synchronized void schedule( + final Evictor task, final Duration delay, final Duration period, final String name) { + if (null == executor) { + executor = new ScheduledThreadPoolExecutor(1, new IoTThreadFactory(name)); + executor.setRemoveOnCancelPolicy(true); + ScheduledExecutorUtil.safelyScheduleAtFixedRate( + executor, new Reaper(), delay.toMillis(), period.toMillis(), TimeUnit.MILLISECONDS); + } + final WeakReference ref = new WeakReference<>(task); + final WeakRunner runner = new WeakRunner<>(ref); + final ScheduledFuture scheduledFuture = + ScheduledExecutorUtil.safelyScheduleAtFixedRate( + executor, runner, delay.toMillis(), period.toMillis(), TimeUnit.MILLISECONDS); + task.setScheduledFuture(scheduledFuture); + TASK_MAP.put(ref, runner); + } + + private EvictionTimer() { + // Hide the default constructor + } + + @Override + public String toString() { + final StringBuilder builder = new StringBuilder(); + builder.append("EvictionTimer []"); + return builder.toString(); + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/binaryallocator/Evictor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/binaryallocator/Evictor.java new file mode 100644 index 000000000000..acb056f03233 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/binaryallocator/Evictor.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.utils.binaryallocator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.concurrent.ScheduledFuture; + +public abstract class Evictor implements Runnable { + private static final Logger LOGGER = LoggerFactory.getLogger(Evictor.class); + + private ScheduledFuture scheduledFuture; + private String name; + private final Duration evictorShutdownTimeoutDuration; + + public Evictor(String name, Duration evictorShutdownTimeoutDuration) { + this.name = name; + this.evictorShutdownTimeoutDuration = evictorShutdownTimeoutDuration; + } + + /** Cancels the scheduled future. */ + void cancel() { + scheduledFuture.cancel(false); + } + + @Override + public abstract void run(); + + void setScheduledFuture(final ScheduledFuture scheduledFuture) { + this.scheduledFuture = scheduledFuture; + } + + @Override + public String toString() { + return getClass().getName() + " [scheduledFuture=" + scheduledFuture + "]"; + } + + void startEvictor(final Duration delay) { + LOGGER.info("Starting evictor with delay {}", delay); + EvictionTimer.schedule(this, delay, delay, name); + } + + void stopEvictor() { + EvictionTimer.cancel(this, evictorShutdownTimeoutDuration, false); + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/binaryallocator/SizeClasses.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/binaryallocator/SizeClasses.java new file mode 100644 index 000000000000..6c39885d13de --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/binaryallocator/SizeClasses.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.utils.binaryallocator; + +public final class SizeClasses { + + // loookup table + private int[] sizeIdx2sizeTab; + + private final int log2MinSize; + private int log2SizeClassGroup; + private static final int INTEGER_SIZE_MINUS_ONE = Integer.SIZE - 1; + + public SizeClasses(AllocatorConfig allocatorConfig) { + int lookupMaxSize = allocatorConfig.maxAllocateSize; + this.log2SizeClassGroup = allocatorConfig.log2ClassSizeGroup; + this.log2MinSize = log2(allocatorConfig.minAllocateSize); + int group = log2(lookupMaxSize) - log2MinSize; + + sizeIdx2sizeTab = new int[(group << log2SizeClassGroup) + 1]; + + int ndeltaLimit = 1 << log2SizeClassGroup; + int log2Group = log2MinSize; + int log2Delta = log2MinSize - log2SizeClassGroup; + + int nSizes = 0; + int size = calculateSize(log2Group, 0, log2Delta); + sizeIdx2sizeTab[nSizes++] = size; + // All remaining groups, nDelta start at 1. + for (; size < lookupMaxSize; log2Group++, log2Delta++) { + for (int nDelta = 1; nDelta <= ndeltaLimit && size <= lookupMaxSize; nDelta++) { + size = calculateSize(log2Group, nDelta, log2Delta); + sizeIdx2sizeTab[nSizes++] = size; + } + } + } + + public int sizeIdx2size(int sizeIdx) { + return sizeIdx2sizeTab[sizeIdx]; + } + + public int size2SizeIdx(int size) { + int x = log2((size << 1) - 1); + + int shift = x - log2MinSize - 1; + + int group = shift << log2SizeClassGroup; + + int log2Delta = x - 1 - log2SizeClassGroup; + + int mod = size - 1 >> log2Delta & (1 << log2SizeClassGroup) - 1; + + return group + mod + 1; + } + + public int getSizeClassNum() { + return sizeIdx2sizeTab.length; + } + + private static int calculateSize(int log2Group, int nDelta, int log2Delta) { + return (1 << log2Group) + (nDelta << log2Delta); + } + + private static int log2(int val) { + return INTEGER_SIZE_MINUS_ONE - Integer.numberOfLeadingZeros(val); + } +}