Skip to content

Commit

Permalink
fix: local storage (#364)
Browse files Browse the repository at this point in the history
  • Loading branch information
caojiajun committed Jan 13, 2025
1 parent 67e07b4 commit 3a799b7
Show file tree
Hide file tree
Showing 15 changed files with 149 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import static com.netease.nim.camellia.redis.proxy.upstream.local.storage.constants.LocalStorageConstants.key_max_len;
Expand Down Expand Up @@ -107,19 +104,20 @@ private void flush() {
try {
int seconds = ProxyDynamicConf.getInt("local.storage.flush.max.interval.seconds", 10);
for (short slot=0; slot<RedisClusterCRC16Utils.SLOT_SIZE; slot++) {
Long lastWriteCommandTime = timeMap.get(slot);
boolean flush;
Long lastWriteCommandTime = timeMap.get(slot);
if (lastWriteCommandTime == null) {
flush = true;
} else {
flush = TimeCache.currentMillis - lastWriteCommandTime >= seconds*1000L;
flush = TimeCache.currentMillis - lastWriteCommandTime >= seconds * 1000L;
}
if (!flush) continue;
final short flushSlot = slot;
executor.submit(slot, () -> {
try {
ICommand invoker = commands.getCommandInvoker(RedisCommand.MEMFLUSH);
commands.execute(invoker, flushSlot, MemFlushCommand.command);
timeMap.put(flushSlot, TimeCache.currentMillis);
} catch (Exception e) {
ErrorLogCollector.collect(RedisLocalStorageClient.class, "send memflush command error, slot = " + flushSlot, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.netease.nim.camellia.codec.Pack;
import com.netease.nim.camellia.codec.Unpack;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.Key;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.KeyInfo;

import java.util.HashMap;
Expand All @@ -27,10 +28,10 @@ public static StringValue decode(byte[] data) {
return new StringValue(key, value);
}

public static Map<KeyInfo, byte[]> encodeMap(Map<KeyInfo, byte[]> map) {
Map<KeyInfo, byte[]> result = new HashMap<>();
public static Map<Key, byte[]> encodeMap(Map<KeyInfo, byte[]> map) {
Map<Key, byte[]> result = new HashMap<>();
for (Map.Entry<KeyInfo, byte[]> entry : map.entrySet()) {
result.put(entry.getKey(), StringValue.encode(entry.getKey().getKey(), entry.getValue()));
result.put(new Key(entry.getKey().getKey()), StringValue.encode(entry.getKey().getKey(), entry.getValue()));
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,36 @@
import com.netease.nim.camellia.redis.proxy.upstream.kv.exception.KvException;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.command.db.*;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.command.string.*;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.compact.CompactExecutor;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.enums.FlushResult;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.Key;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.KeyInfo;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.KeyReadWrite;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.value.string.StringReadWrite;
import com.netease.nim.camellia.redis.proxy.util.ErrorLogCollector;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

/**
* Created by caojiajun on 2025/1/10
*/
public class Commands {

private final Map<RedisCommand, ICommand> map = new HashMap<>();

protected CompactExecutor compactExecutor;

protected KeyReadWrite keyReadWrite;
protected StringReadWrite stringReadWrite;

public Commands(CommandConfig commandConfig) {
compactExecutor = commandConfig.getCompactExecutor();
keyReadWrite = commandConfig.getReadWrite().getKeyReadWrite();
stringReadWrite = commandConfig.getReadWrite().getStringReadWrite();

//db
initCommand(new MemFlushCommand(commandConfig));
initCommand(new PExpireCommand(commandConfig));
Expand Down Expand Up @@ -62,14 +80,28 @@ public Reply execute(ICommand invoker, short slot, Command command) {
return invoker.execute(slot, command);
} finally {
if (invoker.redisCommand().getType() == RedisCommand.Type.WRITE) {
invoker.afterWrite(slot);
afterWrite(slot);
}
}
} catch (Throwable e) {
return onException(command.getRedisCommand(), e);
}
}

private void afterWrite(short slot) throws IOException {
//compact
compactExecutor.compact(slot);
//flush
if (keyReadWrite.needFlush(slot) || stringReadWrite.needFlush(slot)) {
//key flush prepare
Map<Key, KeyInfo> keyMap = keyReadWrite.flushPrepare(slot);
//flush string value
CompletableFuture<FlushResult> future = stringReadWrite.flush(slot, keyMap);
//flush key
future.thenAccept(result -> keyReadWrite.flush(slot));
}
}

private Reply onException(RedisCommand redisCommand, Throwable e) {
if (e instanceof KvException || e instanceof IllegalArgumentException) {
ErrorLogCollector.collect(Commanders.class, redisCommand + " execute error", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,8 @@
import com.netease.nim.camellia.redis.proxy.enums.RedisCommand;
import com.netease.nim.camellia.redis.proxy.reply.Reply;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.compact.CompactExecutor;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.enums.FlushResult;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.KeyReadWrite;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.value.string.StringReadWrite;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;

/**
* Created by caojiajun on 2025/1/3
Expand Down Expand Up @@ -56,23 +53,4 @@ public Reply runToCompletion(short slot, Command command) {
* @return reply
*/
protected abstract Reply execute(short slot, Command command) throws Exception;

/**
* check and flush after write
* @param slot slot
* @throws IOException exception
*/
protected void afterWrite(short slot) throws IOException {
//compact
compactExecutor.compact(slot);
//flush
if (keyReadWrite.needFlush(slot) || stringReadWrite.needFlush(slot)) {
//key flush prepare
keyReadWrite.flushPrepare(slot);
//flush string value
CompletableFuture<FlushResult> future1 = stringReadWrite.flush(slot);
//flush key
future1.thenAccept(result -> keyReadWrite.flush(slot));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.value.string.StringReadWrite;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.codec.StringValue;
import com.netease.nim.camellia.redis.proxy.util.TimeCache;
import com.netease.nim.camellia.redis.proxy.util.Utils;
import com.netease.nim.camellia.tools.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -55,27 +56,27 @@ public void compact(short slot) {
}
BlockType blockType = nextBlockType(slot);
int offset = nextOffset(blockType, slot);
int limit = blockLimit.getOrDefault(blockType, 1);
int limit = blockLimit.getOrDefault(blockType, 2);
try {
List<BlockLocation> blocks = valueManifest.getBlocks(slot, blockType, offset, limit);
if (blocks.isEmpty()) {
if (blocks.isEmpty() || blocks.size() == 1) {
updateNextOffset(blockType, slot, 0);
return;
}
List<Pair<KeyInfo, byte[]>> values = new ArrayList<>();
List<BlockLocation> recycleBlocks = new ArrayList<>();

int remaining = 0;
List<Pair<KeyInfo, byte[]>> surviving = new ArrayList<>();

for (BlockLocation blockLocation : blocks) {
long fileId = blockLocation.fileId();
int blockId = blockLocation.blockId();
byte[] block = stringBlockReadWrite.getBlock(blockType, fileId, (long) blockId * blockType.getBlockSize());

StringValueDecodeResult decodeResult = StringValueCodec.decode(block, blockType);
List<byte[]> list = decodeResult.values();

List<Pair<KeyInfo, byte[]>> surviving = new ArrayList<>();
List<byte[]> list = decodeResult.values();

boolean recycle = false;
int survivingCount = 0;
for (byte[] data : list) {
StringValue stringValue = StringValue.decode(data);
KeyInfo keyInfo = keyReadWrite.getForCompact(slot, new Key(stringValue.key()));
Expand All @@ -85,42 +86,39 @@ public void compact(short slot) {
if (keyInfo.getValueLocation() != null) {
if (keyInfo.getValueLocation().blockLocation().equals(blockLocation)) {
surviving.add(new Pair<>(keyInfo, stringValue.value()));
survivingCount ++;
}
}
}
if (surviving.size() < list.size()) {
recycle = true;
}
if (!recycle) {
if (blockType == BlockType._4k) {
recycle = decodeResult.remaining() > 256;
} else if (blockType == BlockType._32k) {
recycle = decodeResult.remaining() > BlockType._4k.getBlockSize();
} else if (blockType == BlockType._256k) {
recycle = decodeResult.remaining() > BlockType._32k.getBlockSize();
} else if (blockType == BlockType._1024k) {
recycle = decodeResult.remaining() > BlockType._256k.getBlockSize();
} else if (blockType == BlockType._8m) {
recycle = decodeResult.remaining() > BlockType._1024k.getBlockSize();
}
}
if (recycle) {
values.addAll(surviving);
recycleBlocks.add(blockLocation);

long used = blockType.getBlockSize() - decodeResult.remaining();
if (list.isEmpty()) {
remaining = blockType.getBlockSize();
} else {
remaining += (int) (used * survivingCount / list.size());
remaining += decodeResult.remaining();
}
}
if (!values.isEmpty()) {
for (Pair<KeyInfo, byte[]> pair : values) {
keyReadWrite.put(slot, pair.getFirst());

if (remaining < blockType.getBlockSize()) {
return;
}

if (!surviving.isEmpty()) {
for (Pair<KeyInfo, byte[]> pair : surviving) {
stringReadWrite.put(slot, pair.getFirst(), pair.getSecond());
keyReadWrite.put(slot, pair.getFirst());
if (Utils.bytesToString(pair.getFirst().getKey()).startsWith("k")) {
ValueLocation valueLocation = pair.getFirst().getValueLocation();
logger.info("compact put, key = {}, fileId = {}, blockId = {}, offset = {}",
Utils.bytesToString(pair.getFirst().getKey()), valueLocation.blockLocation().fileId(), valueLocation.blockLocation().blockId(), valueLocation.offset());
}
}
}
for (BlockLocation block : recycleBlocks) {
for (BlockLocation block : blocks) {
valueManifest.recycle(slot, block);
}
if (recycleBlocks.isEmpty()) {
updateNextOffset(blockType, slot, offset + limit);
}
updateNextOffset(blockType, slot, offset + limit);
} catch (Exception e) {
logger.error("compact error, slot = {}, blockType = {}, offset = {}, limit = {}", slot, blockType, offset, limit, e);
} finally {
Expand All @@ -134,9 +132,9 @@ private void updateConf() {
for (BlockType type : BlockType.values()) {
String key = "local.storage.compact.block.type." + type.getType() + ".limit";
if (type == BlockType._4k) {
blockLimit.put(type, ProxyDynamicConf.getInt(key, 4));
blockLimit.put(type, ProxyDynamicConf.getInt(key, 6));
} else {
blockLimit.put(type, ProxyDynamicConf.getInt(key, 1));
blockLimit.put(type, ProxyDynamicConf.getInt(key, 3));
}
}
this.blockLimit = blockLimit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,17 @@ public long estimateSize() {
return size;
}

public KeyInfo duplicate() {
KeyInfo keyInfo = new KeyInfo();
keyInfo.dataType = dataType;
keyInfo.flag = flag;
keyInfo.key = key;
keyInfo.expireTime = expireTime;
keyInfo.valueLocation = valueLocation;
keyInfo.extra = extra;
return keyInfo;
}

public static enum FlagType {
DEFAULT((byte) 0),
CONTAINS_EXPIRE_TIME((byte) 1),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import com.netease.nim.camellia.tools.utils.CamelliaMapUtils;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;

Expand Down Expand Up @@ -111,12 +113,12 @@ public void delete(short slot, Key key) {
get(slot).delete(key);
}

public void flushPrepare(short slot) {
public Map<Key, KeyInfo> flushPrepare(short slot) {
SlotKeyReadWrite slotKeyReadWrite = map.get(slot);
if (slotKeyReadWrite == null) {
return;
return Collections.emptyMap();
}
slotKeyReadWrite.flushPrepare();
return slotKeyReadWrite.flushPrepare();
}

public CompletableFuture<FlushResult> flush(short slot) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public SlotInfo expand(short slot) throws IOException {
//使用同一个文件的空闲区域,优先复用其他slot回收的区域
if (LocalStorageConstants.key_manifest_bit_size - bits.cardinality() >= bitsStep*2) {
for (int i = 0; i< LocalStorageConstants.key_manifest_bit_size -bitsStep*2; i++) {
if (bits.get(i, bitsStep * 2).cardinality() == 0) {
if (bits.get(i, i + bitsStep * 2).cardinality() == 0) {
//clear old
for (int j=bitsStart; j<bitsEnd; j++) {
bits.set(j, false);
Expand Down
Loading

0 comments on commit 3a799b7

Please sign in to comment.