Skip to content

Commit

Permalink
feat: rename ValueManifest method (#364)
Browse files Browse the repository at this point in the history
  • Loading branch information
caojiajun committed Jan 8, 2025
1 parent 338157a commit dab82fc
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
*/
public class StringValueCodec {

public static List<BlockInfo> encode(short slot, BlockType blockType, IValueManifest valueManifest, List<Pair<KeyInfo, byte[]>> values) throws IOException {
public static StringValueCodecResult encode(short slot, BlockType blockType, IValueManifest valueManifest, List<Pair<KeyInfo, byte[]>> values) throws IOException {
//
//data to sub-block
List<SubBlock> subBlocks = new ArrayList<>();
Expand Down Expand Up @@ -80,6 +80,7 @@ public static List<BlockInfo> encode(short slot, BlockType blockType, IValueMani

//sub-block to block
List<BlockInfo> blockInfos = new ArrayList<>();
List<ValueLocation> oldLocations = new ArrayList<>();

{
//new block
Expand Down Expand Up @@ -111,7 +112,10 @@ public static List<BlockInfo> encode(short slot, BlockType blockType, IValueMani
buffer.putInt(block.compressLen);
buffer.put(block.compressed);
for (KeyInfo keyInfo : block.keyInfos) {
keyInfo.setValueLocation(new ValueLocation(location, offset));
ValueLocation valueLocation = keyInfo.setValueLocation(new ValueLocation(location, offset));
if (valueLocation != null) {
oldLocations.add(valueLocation);
}
offset++;
}
subBlockCount++;
Expand All @@ -122,7 +126,7 @@ public static List<BlockInfo> encode(short slot, BlockType blockType, IValueMani
buffer.putShort(4, subBlockCount);
blockInfos.add(new BlockInfo(blockType, location, buffer.array()));
}
return blockInfos;
return new StringValueCodecResult(blockInfos, oldLocations);
}

public static List<byte[]> decode(byte[] data, BlockType blockType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,15 @@ public void setExtra(byte[] extra) {
}
}

public void setValueLocation(ValueLocation valueLocation) {
public ValueLocation setValueLocation(ValueLocation valueLocation) {
ValueLocation oldValueLocation = this.valueLocation;
this.valueLocation = valueLocation;
if (valueLocation != null) {
setContainsValue();
} else {
clearContainsValue();
}
return oldValueLocation;
}

public ValueLocation getValueLocation() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ public interface IValueManifest {


/**
* clear
* recycle
* @param slot slot
* @param blockLocation location
* @throws IOException exception
*/
void clear(short slot, BlockLocation blockLocation) throws IOException;
void recycle(short slot, BlockLocation blockLocation) throws IOException;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value.block;

import java.util.List;

/**
* Created by caojiajun on 2025/1/8
*/
public record StringValueCodecResult(List<BlockInfo> blockInfos, List<ValueLocation> oldLocations) {

}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void commit(short slot, BlockLocation blockLocation) throws IOException {
//
BitSet bitSet1 = bits1Map.get(fileId);
if (!bitSet1.get(blockId)) {
throw new IOException("blockId not allocated");
throw new IOException("fileId=" + fileId + ",blockId=" + blockId + " not allocated");
}
//bits2
BitSet bitSet2 = bits2Map.get(fileId);
Expand All @@ -98,21 +98,25 @@ public void commit(short slot, BlockLocation blockLocation) throws IOException {
}

@Override
public void clear(short slot, BlockLocation blockLocation) throws IOException {
public void recycle(short slot, BlockLocation blockLocation) throws IOException {
long fileId = blockLocation.fileId();
int blockId = blockLocation.blockId();
//bits1
bits1Map.get(fileId).set(blockId, false);
BitSet bitSet1 = bits1Map.get(fileId);
if (!bitSet1.get(blockId)) {
throw new IOException("fileId=" + fileId + ",blockId=" + blockId + " not allocated");
}
bitSet1.set(blockId, false);
Integer offset = allocateOffsetMap.get(fileId);
if (offset > blockId) {
allocateOffsetMap.put(fileId, blockId);
}
//bits2
BitSet bitSet = bits2Map.get(fileId);
bitSet.set(blockId, false);
BitSet bitSet2 = bits2Map.get(fileId);
bitSet2.set(blockId, false);
//update file
int index = blockId / 64;
long[] longArray = bitSet.toLongArray();
long[] longArray = bitSet2.toLongArray();
long changed;
if (longArray.length > index) {
changed = longArray[index];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.CompletableFuture;

/**
Expand Down Expand Up @@ -66,17 +63,28 @@ private void execute(StringValueFlushTask task) throws Exception {
buffers.add(new Pair<>(entry.getKey(), entry.getValue()));
}
List<BlockInfo> list = new ArrayList<>();
List<ValueLocation> oldLocations = new ArrayList<>();
for (Map.Entry<BlockType, List<Pair<KeyInfo, byte[]>>> entry : blockMap.entrySet()) {
List<BlockInfo> blockInfos = StringValueCodec.encode(slot, entry.getKey(), valueManifest, entry.getValue());
list.addAll(blockInfos);
StringValueCodecResult result = StringValueCodec.encode(slot, entry.getKey(), valueManifest, entry.getValue());
list.addAll(result.blockInfos());
oldLocations.addAll(result.oldLocations());
}
for (BlockInfo blockInfo : list) {
BlockLocation blockLocation = blockInfo.blockLocation();
long fileId = blockLocation.fileId();
long offset = (long) blockLocation.blockId() * blockInfo.blockType().getBlockSize();
fileReadWrite.write(fileId, offset, blockInfo.data());
blockCache.updateBlockCache(slot, fileId, offset, blockInfo.data());
valueManifest.commit(slot, blockLocation);
}
Set<BlockLocation> changedBlocks = new HashSet<>();
for (ValueLocation oldLocation : oldLocations) {
changedBlocks.add(oldLocation.blockLocation());
}
compact(changedBlocks);
}

private void compact(Set<BlockLocation> blocks) {
//todo
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public void test() throws IOException {
values.add(new Pair<>(keyInfo("k6", 1000L, null, null), "1k12l12121".getBytes(StandardCharsets.UTF_8)));
values.add(new Pair<>(keyInfo("k7", 1000L, null, null), "sasasas".getBytes(StandardCharsets.UTF_8)));

List<BlockInfo> blockInfos = StringValueCodec.encode(slot, blockType, valueManifest, values);
List<BlockInfo> blockInfos = StringValueCodec.encode(slot, blockType, valueManifest, values).blockInfos();

Assert.assertEquals(blockInfos.size(), 1);

Expand All @@ -65,7 +65,7 @@ public void test2() throws IOException {
values.add(new Pair<>(keyInfo("k" + i, 1000L, null, null), UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8)));
}

List<BlockInfo> blockInfos = StringValueCodec.encode(slot, blockType, valueManifest, values);
List<BlockInfo> blockInfos = StringValueCodec.encode(slot, blockType, valueManifest, values).blockInfos();

for (int i=0;i<blockInfos.size(); i++) {
BlockInfo blockInfo = blockInfos.get(i);
Expand Down Expand Up @@ -125,7 +125,7 @@ public void commit(short slot, BlockLocation blockLocation) throws IOException {
}

@Override
public void clear(short slot, BlockLocation blockLocation) throws IOException {
public void recycle(short slot, BlockLocation blockLocation) throws IOException {

}
}
Expand Down

0 comments on commit dab82fc

Please sign in to comment.