diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/codec/StringValueCodec.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/codec/StringValueCodec.java index f50a1b6cc..32937fbc4 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/codec/StringValueCodec.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/codec/StringValueCodec.java @@ -19,7 +19,7 @@ */ public class StringValueCodec { - public static List encode(short slot, BlockType blockType, IValueManifest valueManifest, List> values) throws IOException { + public static StringValueCodecResult encode(short slot, BlockType blockType, IValueManifest valueManifest, List> values) throws IOException { // //data to sub-block List subBlocks = new ArrayList<>(); @@ -80,6 +80,7 @@ public static List encode(short slot, BlockType blockType, IValueMani //sub-block to block List blockInfos = new ArrayList<>(); + List oldLocations = new ArrayList<>(); { //new block @@ -111,7 +112,10 @@ public static List encode(short slot, BlockType blockType, IValueMani buffer.putInt(block.compressLen); buffer.put(block.compressed); for (KeyInfo keyInfo : block.keyInfos) { - keyInfo.setValueLocation(new ValueLocation(location, offset)); + ValueLocation valueLocation = keyInfo.setValueLocation(new ValueLocation(location, offset)); + if (valueLocation != null) { + oldLocations.add(valueLocation); + } offset++; } subBlockCount++; @@ -122,7 +126,7 @@ public static List encode(short slot, BlockType blockType, IValueMani buffer.putShort(4, subBlockCount); blockInfos.add(new BlockInfo(blockType, location, buffer.array())); } - return blockInfos; + return new StringValueCodecResult(blockInfos, oldLocations); } public static List decode(byte[] data, BlockType blockType) { diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/KeyInfo.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/KeyInfo.java index 78b3afbdf..e55ea6d65 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/KeyInfo.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/KeyInfo.java @@ -107,13 +107,15 @@ public void setExtra(byte[] extra) { } } - public void setValueLocation(ValueLocation valueLocation) { + public ValueLocation setValueLocation(ValueLocation valueLocation) { + ValueLocation oldValueLocation = this.valueLocation; this.valueLocation = valueLocation; if (valueLocation != null) { setContainsValue(); } else { clearContainsValue(); } + return oldValueLocation; } public ValueLocation getValueLocation() { diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/block/IValueManifest.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/block/IValueManifest.java index d35f0716f..a703d670a 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/block/IValueManifest.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/block/IValueManifest.java @@ -41,11 +41,11 @@ public interface IValueManifest { /** - * clear + * recycle * @param slot slot * @param blockLocation location * @throws IOException exception */ - void clear(short slot, BlockLocation blockLocation) throws IOException; + void recycle(short slot, BlockLocation blockLocation) throws IOException; } diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/block/StringValueCodecResult.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/block/StringValueCodecResult.java new file mode 100644 index 000000000..cfcfe9798 --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/block/StringValueCodecResult.java @@ -0,0 +1,10 @@ +package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value.block; + +import java.util.List; + +/** + * Created by caojiajun on 2025/1/8 + */ +public record StringValueCodecResult(List blockInfos, List oldLocations) { + +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/block/ValueManifest.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/block/ValueManifest.java index 5bfe9bcf3..e9fc3a3ae 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/block/ValueManifest.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/block/ValueManifest.java @@ -81,7 +81,7 @@ public void commit(short slot, BlockLocation blockLocation) throws IOException { // BitSet bitSet1 = bits1Map.get(fileId); if (!bitSet1.get(blockId)) { - throw new IOException("blockId not allocated"); + throw new IOException("fileId=" + fileId + ",blockId=" + blockId + " not allocated"); } //bits2 BitSet bitSet2 = bits2Map.get(fileId); @@ -98,21 +98,25 @@ public void commit(short slot, BlockLocation blockLocation) throws IOException { } @Override - public void clear(short slot, BlockLocation blockLocation) throws IOException { + public void recycle(short slot, BlockLocation blockLocation) throws IOException { long fileId = blockLocation.fileId(); int blockId = blockLocation.blockId(); //bits1 - bits1Map.get(fileId).set(blockId, false); + BitSet bitSet1 = bits1Map.get(fileId); + if (!bitSet1.get(blockId)) { + throw new IOException("fileId=" + fileId + ",blockId=" + blockId + " not allocated"); + } + bitSet1.set(blockId, false); Integer offset = allocateOffsetMap.get(fileId); if (offset > blockId) { allocateOffsetMap.put(fileId, blockId); } //bits2 - BitSet bitSet = bits2Map.get(fileId); - bitSet.set(blockId, false); + BitSet bitSet2 = bits2Map.get(fileId); + bitSet2.set(blockId, false); //update file int index = blockId / 64; - long[] longArray = bitSet.toLongArray(); + long[] longArray = bitSet2.toLongArray(); long changed; if (longArray.length > index) { changed = longArray[index]; diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/persist/ValueFlushExecutor.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/persist/ValueFlushExecutor.java index 11252d5b4..45b7dd681 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/persist/ValueFlushExecutor.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/persist/ValueFlushExecutor.java @@ -11,10 +11,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.CompletableFuture; /** @@ -66,9 +63,11 @@ private void execute(StringValueFlushTask task) throws Exception { buffers.add(new Pair<>(entry.getKey(), entry.getValue())); } List list = new ArrayList<>(); + List oldLocations = new ArrayList<>(); for (Map.Entry>> entry : blockMap.entrySet()) { - List blockInfos = StringValueCodec.encode(slot, entry.getKey(), valueManifest, entry.getValue()); - list.addAll(blockInfos); + StringValueCodecResult result = StringValueCodec.encode(slot, entry.getKey(), valueManifest, entry.getValue()); + list.addAll(result.blockInfos()); + oldLocations.addAll(result.oldLocations()); } for (BlockInfo blockInfo : list) { BlockLocation blockLocation = blockInfo.blockLocation(); @@ -76,7 +75,16 @@ private void execute(StringValueFlushTask task) throws Exception { long offset = (long) blockLocation.blockId() * blockInfo.blockType().getBlockSize(); fileReadWrite.write(fileId, offset, blockInfo.data()); blockCache.updateBlockCache(slot, fileId, offset, blockInfo.data()); + valueManifest.commit(slot, blockLocation); } + Set changedBlocks = new HashSet<>(); + for (ValueLocation oldLocation : oldLocations) { + changedBlocks.add(oldLocation.blockLocation()); + } + compact(changedBlocks); } + private void compact(Set blocks) { + //todo + } } diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/test/java/com/netease/nim/camellia/redis/proxy/test/StringValueCodecTest.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/test/java/com/netease/nim/camellia/redis/proxy/test/StringValueCodecTest.java index 753ee2031..316ae75d9 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/test/java/com/netease/nim/camellia/redis/proxy/test/StringValueCodecTest.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/test/java/com/netease/nim/camellia/redis/proxy/test/StringValueCodecTest.java @@ -39,7 +39,7 @@ public void test() throws IOException { values.add(new Pair<>(keyInfo("k6", 1000L, null, null), "1k12l12121".getBytes(StandardCharsets.UTF_8))); values.add(new Pair<>(keyInfo("k7", 1000L, null, null), "sasasas".getBytes(StandardCharsets.UTF_8))); - List blockInfos = StringValueCodec.encode(slot, blockType, valueManifest, values); + List blockInfos = StringValueCodec.encode(slot, blockType, valueManifest, values).blockInfos(); Assert.assertEquals(blockInfos.size(), 1); @@ -65,7 +65,7 @@ public void test2() throws IOException { values.add(new Pair<>(keyInfo("k" + i, 1000L, null, null), UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8))); } - List blockInfos = StringValueCodec.encode(slot, blockType, valueManifest, values); + List blockInfos = StringValueCodec.encode(slot, blockType, valueManifest, values).blockInfos(); for (int i=0;i