Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Binary Allocator #14201

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.iotdb.commons.schema.SchemaConstant;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.utils.NodeUrlUtils;
import org.apache.iotdb.commons.utils.binaryallocator.BinaryAllocator;
import org.apache.iotdb.confignode.rpc.thrift.TCQConfig;
import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
import org.apache.iotdb.confignode.rpc.thrift.TRatisConfig;
Expand Down Expand Up @@ -2845,6 +2846,24 @@ public synchronized void loadHotModifiedProps(Properties properties)

// update retry config
commonDescriptor.loadRetryProperties(properties);

// update binary allocator
commonDescriptor
.getConfig()
.setEnableBinaryAllocator(
Boolean.parseBoolean(
Optional.ofNullable(
properties.getProperty(
"enable_binary_allocator",
ConfigurationFileUtils.getConfigurationDefaultValue(
"enable_binary_allocator")))
.map(String::trim)
.orElse(
ConfigurationFileUtils.getConfigurationDefaultValue(
"enable_binary_allocator"))));
if (!commonDescriptor.getConfig().isEnableBinaryAllocator()) {
BinaryAllocator.DEFAULT.close(false);
}
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.utils;

import org.apache.iotdb.commons.utils.binaryallocator.AllocatorConfig;
import org.apache.iotdb.commons.utils.binaryallocator.BinaryAllocator;
import org.apache.iotdb.commons.utils.binaryallocator.SizeClasses;

import org.apache.tsfile.utils.PooledBinary;
import org.junit.Test;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

public class BinaryAllocatorTest {
@Test
public void testAllocateBinary() {
AllocatorConfig config = new AllocatorConfig();
config.arenaNum = 1;
BinaryAllocator binaryAllocator = new BinaryAllocator(config);
binaryAllocator.resetArenaBinding();

PooledBinary binary = binaryAllocator.allocateBinary(255);
assertNotNull(binary);
assertEquals(binary.getArenaIndex(), -1);
assertEquals(binary.getLength(), 255);
binaryAllocator.deallocateBinary(binary);

binary = binaryAllocator.allocateBinary(65536);
assertNotNull(binary);
assertEquals(binary.getArenaIndex(), 0);
assertEquals(binary.getLength(), 65536);
binaryAllocator.deallocateBinary(binary);

binary = binaryAllocator.allocateBinary(65535);
assertNotNull(binary);
assertEquals(binary.getArenaIndex(), 0);
assertEquals(binary.getLength(), 65535);
assertEquals(binary.getValues().length, 65536);
binaryAllocator.deallocateBinary(binary);
}

@Test
public void testStrategy() throws InterruptedException {
BinaryAllocator binaryAllocator = new BinaryAllocator(AllocatorConfig.DEFAULT_CONFIG);
binaryAllocator.resetArenaBinding();

PooledBinary binary1 = binaryAllocator.allocateBinary(4096);
PooledBinary binary2 = binaryAllocator.allocateBinary(4096);
assertEquals(binary1.getArenaIndex(), binary2.getArenaIndex());
binaryAllocator.deallocateBinary(binary1);
binaryAllocator.deallocateBinary(binary2);

int threadCount = 4;
CountDownLatch latch = new CountDownLatch(threadCount);
Map<Integer, Integer> arenaUsageCount = new ConcurrentHashMap<>();
for (int i = 0; i < threadCount; i++) {
Thread thread =
new Thread(
() -> {
try {
PooledBinary firstBinary = binaryAllocator.allocateBinary(2048);
int arenaId = firstBinary.getArenaIndex();
arenaUsageCount.merge(arenaId, 1, Integer::sum);
binaryAllocator.deallocateBinary(firstBinary);
} finally {
latch.countDown();
}
});
thread.start();
}

latch.await();
int maxUsage = Collections.max(arenaUsageCount.values());
int minUsage = Collections.min(arenaUsageCount.values());
assertEquals(maxUsage, minUsage);
}

@Test
public void testEviction() throws InterruptedException {
AllocatorConfig config = new AllocatorConfig();
config.arenaNum = 1;
config.minAllocateSize = config.maxAllocateSize = 4096;
config.setTimeBetweenEvictorRunsMillis(1);
BinaryAllocator binaryAllocator = new BinaryAllocator(config);
binaryAllocator.resetArenaBinding();

PooledBinary binary = binaryAllocator.allocateBinary(4096);
binaryAllocator.deallocateBinary(binary);
assertEquals(binaryAllocator.getTotalUsedMemory(), 4096);
Thread.sleep(200);
assertEquals(binaryAllocator.getTotalUsedMemory(), 0);
}

@Test
public void testSizeMapping() {
AllocatorConfig config = new AllocatorConfig();
config.minAllocateSize = 4096;
config.maxAllocateSize = 65536;
SizeClasses sizeClasses = new SizeClasses(config);

assertEquals(sizeClasses.getSizeClassNum(), 33);
int[] testSizes = {4607, 8191, 16383, 32767, 65535};

for (int size : testSizes) {
int sizeIdx = sizeClasses.size2SizeIdx(size);
int mappedSize = sizeClasses.sizeIdx2size(sizeIdx);

assertEquals("Mapped size should be >= original size", mappedSize, size + 1);

if (sizeIdx > 0) {
int previousSize = sizeClasses.sizeIdx2size(sizeIdx - 1);
assertTrue("Previous size should be < original size", previousSize < size);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1493,6 +1493,40 @@ data_region_iot_max_memory_ratio_for_queue = 0.6
# Datatype: long
region_migration_speed_limit_bytes_per_second = 33554432

####################
### Blob Allocator Configuration
####################
# Whether to enable binary allocator.
# For scenarios where large binary streams cause severe GC, enabling this parameter significantly improves performance.
# effectiveMode: hot_reload
enable_binary_allocator=true

# The size boundaries that allocator is responsible for
# lower boundary for allocation size
# unit: bytes
# Datatype: int
# effectiveMode: restart
small_binary_object=4096

# The size boundaries that allocator is responsible for
# upper boundary for allocation size
# unit: bytes
# Datatype: int
# effectiveMode: restart
huge_binary_object=1048576

# Number of arena regions in blob allocator, used to control concurrent performance
# Datatype: int
# effectiveMode: restart
arena_num=4

# Control the number of slabs in allocator
# The number of different sizes in each power-of-2 interval is 2^LOG2_SIZE_CLASS_GROUP
# For example: if LOG2_SIZE_CLASS_GROUP=3, between 1024-2048 there will be 8 different sizes
# Datatype: int
# effectiveMode: restart
log2_size_class_group=3

####################
### TsFile Configurations
####################
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,16 @@ public class CommonConfig {
private long seriesLimitThreshold = -1;
private long deviceLimitThreshold = -1;

private boolean enableBinaryAllocator = true;

private int arenaNum = 4;

private int minAllocateSize = 4096;

private int maxAllocateSize = 1024 * 1024;

private int log2SizeClassGroup = 3;

// time in nanosecond precision when starting up
private final long startUpNanosecond = System.nanoTime();

Expand Down Expand Up @@ -1468,4 +1478,44 @@ public long getRemoteWriteMaxRetryDurationInMs() {
public void setRemoteWriteMaxRetryDurationInMs(long remoteWriteMaxRetryDurationInMs) {
this.remoteWriteMaxRetryDurationInMs = remoteWriteMaxRetryDurationInMs;
}

public int getArenaNum() {
return arenaNum;
}

public void setArenaNum(int arenaNum) {
this.arenaNum = arenaNum;
}

public int getMinAllocateSize() {
return minAllocateSize;
}

public void setMinAllocateSize(int minAllocateSize) {
this.minAllocateSize = minAllocateSize;
}

public int getMaxAllocateSize() {
return maxAllocateSize;
}

public void setMaxAllocateSize(int maxAllocateSize) {
this.maxAllocateSize = maxAllocateSize;
}

public boolean isEnableBinaryAllocator() {
return enableBinaryAllocator;
}

public void setEnableBinaryAllocator(boolean enableBinaryAllocator) {
this.enableBinaryAllocator = enableBinaryAllocator;
}

public int getLog2SizeClassGroup() {
return log2SizeClassGroup;
}

public void setLog2SizeClassGroup(int log2SizeClassGroup) {
this.log2SizeClassGroup = log2SizeClassGroup;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.iotdb.commons.enums.HandleSystemErrorStrategy;
import org.apache.iotdb.commons.enums.PipeRemainingTimeRateAverageTime;
import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
import org.apache.iotdb.commons.utils.binaryallocator.BinaryAllocator;
import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;

import java.io.File;
Expand Down Expand Up @@ -245,6 +246,7 @@ public void loadCommonProps(Properties properties) throws IOException {
String.valueOf(config.getDeviceLimitThreshold()))));

loadRetryProperties(properties);
loadBinaryAllocatorProps(properties);
}

private void loadPipeProps(Properties properties) {
Expand Down Expand Up @@ -737,6 +739,30 @@ public void loadRetryProperties(Properties properties) throws IOException {
"enable_retry_for_unknown_error"))));
}

public void loadBinaryAllocatorProps(Properties properties) {
config.setEnableBinaryAllocator(
Boolean.parseBoolean(
properties.getProperty(
"enable_binary_allocator", Boolean.toString(config.isEnableBinaryAllocator()))));
config.setMinAllocateSize(
Integer.parseInt(
properties.getProperty(
"small_blob_object", String.valueOf(config.getMinAllocateSize()))));
config.setMaxAllocateSize(
Integer.parseInt(
properties.getProperty(
"huge_blob_object", String.valueOf(config.getMaxAllocateSize()))));
config.setArenaNum(
Integer.parseInt(
properties.getProperty("arena_num", String.valueOf(config.getArenaNum()))));
config.setLog2SizeClassGroup(
Integer.parseInt(
properties.getProperty(
"log2_size_class_group", String.valueOf(config.getLog2SizeClassGroup()))));

BinaryAllocator.DEFAULT.allocateBinary(100);
}

public void loadGlobalConfig(TGlobalConfig globalConfig) {
config.setTimestampPrecision(globalConfig.timestampPrecision);
config.setTimePartitionOrigin(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ public enum Metric {
LOAD_TIME_COST("load_time_cost"),
LOAD_POINT_COUNT("load_point_count"),
MEMTABLE_POINT_COUNT("memtable_point_count"),
BINARY_ALLOCATOR("binary_allocator"),
;

final String value;
Expand Down
Loading
Loading