Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
artem-v committed Aug 4, 2024
1 parent 28ebd4c commit 1894cd3
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 35 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package io.scalecube.cluster2.payload;

import java.util.StringJoiner;
import java.util.UUID;

public class PayloadInfo {

private final UUID memberId;
private final long position;
private final int length;
private int appendOffset;

public PayloadInfo(UUID memberId, long position, int length, int appendOffset) {
this.memberId = memberId;
this.position = position;
this.length = length;
this.appendOffset = appendOffset;
}

public UUID memberId() {
return memberId;
}

public long position() {
return position;
}

public int length() {
return length;
}

public int appendOffset() {
return appendOffset;
}

public PayloadInfo appendOffset(int appendOffset) {
this.appendOffset = appendOffset;
return this;
}

public long appendPosition() {
return position + appendOffset;
}

public boolean isCompleted() {
return appendOffset == length;
}

@Override
public String toString() {
return new StringJoiner(", ", PayloadInfo.class.getSimpleName() + "[", "]")
.add("memberId=" + memberId)
.add("initialPosition=" + position)
.add("length=" + length)
.add("proceedingOffset=" + appendOffset)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.StringJoiner;
import java.util.UUID;
import org.agrona.DirectBuffer;
import org.agrona.collections.Object2ObjectHashMap;
Expand Down Expand Up @@ -45,14 +44,16 @@ public int size() {
return payloadIndex.size();
}

public void putPayload(UUID memberId, DirectBuffer chunk, int chunkOffset, int chunkLength)
public boolean putPayload(UUID memberId, DirectBuffer chunk, int chunkOffset, int chunkLength)
throws IOException {
final PayloadInfo payloadInfo = payloadIndex.get(memberId);
if (payloadInfo == null || payloadInfo.isCompleted()) {
return;
if (payloadInfo == null) {
return false;
}

if (payloadInfo.length < payloadInfo.appendOffset + chunkLength) {
final int newAppendOffset = payloadInfo.appendOffset() + chunkLength;

if (payloadInfo.length() < newAppendOffset) {
throw new IllegalArgumentException("Invalid chunkLength: " + chunkLength);
}

Expand All @@ -65,39 +66,25 @@ public void putPayload(UUID memberId, DirectBuffer chunk, int chunkOffset, int c
position += storeChannel.write(dstBuffer, position);
} while (dstBuffer.hasRemaining());

payloadInfo.appendOffset += chunkLength;
}

private static class PayloadInfo {
payloadInfo.appendOffset(newAppendOffset);

private final UUID memberId;
private final long initialPosition;
private final int length;
private int appendOffset;

private PayloadInfo(UUID memberId, long initialPosition, int length, int appendOffset) {
this.memberId = memberId;
this.initialPosition = initialPosition;
this.length = length;
this.appendOffset = appendOffset;
}
return payloadInfo.isCompleted();
}

private long appendPosition() {
return initialPosition + appendOffset;
public ByteBuffer readPayload(UUID memberId) throws IOException {
final PayloadInfo payloadInfo = payloadIndex.get(memberId);
if (payloadInfo == null || !payloadInfo.isCompleted()) {
return null;
}

private boolean isCompleted() {
return appendOffset == length;
}
//noinspection RedundantCast
final ByteBuffer readBuffer = (ByteBuffer) dstBuffer.clear().limit(payloadInfo.length());
long position = payloadInfo.position();
do {
position += storeChannel.read(readBuffer, position);
} while (readBuffer.hasRemaining());

@Override
public String toString() {
return new StringJoiner(", ", PayloadInfo.class.getSimpleName() + "[", "]")
.add("memberId=" + memberId)
.add("initialPosition=" + initialPosition)
.add("length=" + length)
.add("proceedingOffset=" + appendOffset)
.toString();
}
//noinspection RedundantCast
return (ByteBuffer) readBuffer.flip();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.Random;
import java.util.UUID;
import org.agrona.concurrent.UnsafeBuffer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

Expand Down Expand Up @@ -42,6 +45,36 @@ void testRemoveGeneration() throws IOException {

payloadStore.removeGeneration(memberId);

assertEquals(n - 1, payloadStore.size());
assertEquals(n - 1, payloadStore.size(), "payloadStore.size");
assertNull(payloadStore.readPayload(memberId), "readPayload");
}

@Test
void testReadPayload() throws IOException {
final File storeFile = tempDir.resolve("" + System.currentTimeMillis()).toFile();
final PayloadStore payloadStore = new PayloadStore(storeFile);

final UUID memberId = UUID.randomUUID();
final int payloadLength = 1032;

payloadStore.addGeneration(memberId, payloadLength);

assertNull(payloadStore.readPayload(memberId), "readPayload");

final Random random = new Random();
byte[] bytes = new byte[payloadLength];
random.nextBytes(bytes);
final UnsafeBuffer buffer = new UnsafeBuffer(bytes);

assertFalse(payloadStore.putPayload(memberId, buffer, 0, 768), "putPayload");
assertFalse(payloadStore.putPayload(memberId, buffer, 768, 255), "putPayload");
assertTrue(payloadStore.putPayload(memberId, buffer, 1023, 9), "putPayload");

final ByteBuffer payload = payloadStore.readPayload(memberId);
assertEquals(payloadLength, payload.limit(), "payloadLength");
byte[] payloadBytes = new byte[payloadLength];
payload.get(payloadBytes);

assertArrayEquals(bytes, payloadBytes, "payloadBytes");
}
}

0 comments on commit 1894cd3

Please sign in to comment.