diff --git a/cluster2/src/main/java/io/scalecube/cluster2/payload/PayloadCodec.java b/cluster2/src/main/java/io/scalecube/cluster2/payload/PayloadCodec.java
index 93b29d7a..0a8e0262 100644
--- a/cluster2/src/main/java/io/scalecube/cluster2/payload/PayloadCodec.java
+++ b/cluster2/src/main/java/io/scalecube/cluster2/payload/PayloadCodec.java
@@ -35,7 +35,7 @@ public MutableDirectBuffer encodePayloadGenerationEvent(UUID memberId, int paylo
return encodedBuffer;
}
- public MutableDirectBuffer encodePayloadChunkRequest(long payloadOffset, Member from) {
+ public MutableDirectBuffer encodePayloadChunkRequest(Member from, int payloadOffset) {
encodedLength = 0;
payloadChunkRequestEncoder.wrapAndApplyHeader(encodedBuffer, 0, headerEncoder);
@@ -47,10 +47,11 @@ public MutableDirectBuffer encodePayloadChunkRequest(long payloadOffset, Member
}
public MutableDirectBuffer encodePayloadChunkResponse(
- Member from, DirectBuffer chunk, int chunkOffset, int chunkLength) {
+ Member from, int payloadOffset, DirectBuffer chunk, int chunkOffset, int chunkLength) {
encodedLength = 0;
payloadChunkResponseEncoder.wrapAndApplyHeader(encodedBuffer, 0, headerEncoder);
+ payloadChunkResponseEncoder.payloadOffset(payloadOffset);
payloadChunkResponseEncoder.putFrom(memberCodec.encode(from), 0, memberCodec.encodedLength());
payloadChunkResponseEncoder.putChunk(chunk, chunkOffset, chunkLength);
diff --git a/cluster2/src/main/java/io/scalecube/cluster2/payload/PayloadStore.java b/cluster2/src/main/java/io/scalecube/cluster2/payload/PayloadStore.java
index 3dea1673..50c7b4ad 100644
--- a/cluster2/src/main/java/io/scalecube/cluster2/payload/PayloadStore.java
+++ b/cluster2/src/main/java/io/scalecube/cluster2/payload/PayloadStore.java
@@ -44,16 +44,21 @@ public int size() {
return payloadIndex.size();
}
- public boolean putPayload(UUID memberId, DirectBuffer chunk, int chunkOffset, int chunkLength)
+ public boolean putPayload(
+ UUID memberId, int payloadOffset, DirectBuffer chunk, int chunkOffset, int chunkLength)
throws IOException {
final PayloadInfo payloadInfo = payloadIndex.get(memberId);
if (payloadInfo == null) {
- return false;
+ throw new IllegalArgumentException("Payload not found, memberId: " + memberId);
+ }
+
+ if (payloadOffset != payloadInfo.appendOffset()) {
+ throw new IllegalArgumentException("Invalid payloadOffset: " + payloadOffset);
}
final int newAppendOffset = payloadInfo.appendOffset() + chunkLength;
- if (payloadInfo.length() < newAppendOffset) {
+ if (newAppendOffset > payloadInfo.length()) {
throw new IllegalArgumentException("Invalid chunkLength: " + chunkLength);
}
diff --git a/cluster2/src/main/resources/scalecube-cluster-codecs.xml b/cluster2/src/main/resources/scalecube-cluster-codecs.xml
index 34140412..74cac764 100644
--- a/cluster2/src/main/resources/scalecube-cluster-codecs.xml
+++ b/cluster2/src/main/resources/scalecube-cluster-codecs.xml
@@ -156,11 +156,12 @@
-
+
+
diff --git a/cluster2/src/test/java/io/scalecube/cluster2/payload/PayloadStoreTest.java b/cluster2/src/test/java/io/scalecube/cluster2/payload/PayloadStoreTest.java
index 50dc47d6..c9dd3126 100644
--- a/cluster2/src/test/java/io/scalecube/cluster2/payload/PayloadStoreTest.java
+++ b/cluster2/src/test/java/io/scalecube/cluster2/payload/PayloadStoreTest.java
@@ -1,6 +1,7 @@
package io.scalecube.cluster2.payload;
import static org.junit.jupiter.api.Assertions.*;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import java.io.File;
import java.io.IOException;
@@ -66,9 +67,9 @@ void testReadPayload() throws IOException {
random.nextBytes(bytes);
final UnsafeBuffer buffer = new UnsafeBuffer(bytes);
- assertFalse(payloadStore.putPayload(memberId, buffer, 0, 768), "putPayload");
- assertFalse(payloadStore.putPayload(memberId, buffer, 768, 255), "putPayload");
- assertTrue(payloadStore.putPayload(memberId, buffer, 1023, 9), "putPayload");
+ assertFalse(payloadStore.putPayload(memberId, 0, buffer, 0, 768), "putPayload");
+ assertFalse(payloadStore.putPayload(memberId, 768, buffer, 768, 255), "putPayload");
+ assertTrue(payloadStore.putPayload(memberId, 1023, buffer, 1023, 9), "putPayload");
final ByteBuffer payload = payloadStore.readPayload(memberId);
assertEquals(payloadLength, payload.limit(), "payloadLength");
@@ -77,4 +78,70 @@ void testReadPayload() throws IOException {
assertArrayEquals(bytes, payloadBytes, "payloadBytes");
}
+
+ @Test
+ void testPutPayloadPayloadNotFound() throws IOException {
+ final File storeFile = tempDir.resolve("" + System.currentTimeMillis()).toFile();
+ final PayloadStore payloadStore = new PayloadStore(storeFile);
+
+ final UUID memberId = UUID.randomUUID();
+ final int payloadLength = 1032;
+
+ final Random random = new Random();
+ byte[] bytes = new byte[payloadLength];
+ random.nextBytes(bytes);
+ final UnsafeBuffer buffer = new UnsafeBuffer(bytes);
+
+ payloadStore.addGeneration(memberId, payloadLength);
+
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> payloadStore.putPayload(UUID.randomUUID(), 0, buffer, 0, 768));
+ }
+
+ @Test
+ void testPutPayloadInvalidPayloadOffset() throws IOException {
+ final File storeFile = tempDir.resolve("" + System.currentTimeMillis()).toFile();
+ final PayloadStore payloadStore = new PayloadStore(storeFile);
+
+ final UUID memberId = UUID.randomUUID();
+ final int payloadLength = 1032;
+
+ final Random random = new Random();
+ byte[] bytes = new byte[payloadLength];
+ random.nextBytes(bytes);
+ final UnsafeBuffer buffer = new UnsafeBuffer(bytes);
+
+ payloadStore.addGeneration(memberId, payloadLength);
+
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> payloadStore.putPayload(memberId, 768, buffer, 0, 768));
+ }
+
+ @Test
+ void testPutPayloadInvalidChunk() throws IOException {
+ final File storeFile = tempDir.resolve("" + System.currentTimeMillis()).toFile();
+ final PayloadStore payloadStore = new PayloadStore(storeFile);
+
+ final UUID memberId = UUID.randomUUID();
+ final int payloadLength = 1032;
+
+ payloadStore.addGeneration(memberId, payloadLength);
+
+ assertNull(payloadStore.readPayload(memberId), "readPayload");
+
+ final Random random = new Random();
+ byte[] bytes = new byte[payloadLength];
+ random.nextBytes(bytes);
+ final UnsafeBuffer buffer = new UnsafeBuffer(bytes);
+
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> {
+ assertFalse(payloadStore.putPayload(memberId, 0, buffer, 0, 768), "putPayload");
+ assertFalse(payloadStore.putPayload(memberId, 768, buffer, 768, 255), "putPayload");
+ payloadStore.putPayload(memberId, 1023, buffer, 1023, 10);
+ });
+ }
}