Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
artem-v committed Aug 5, 2024
1 parent 1894cd3 commit a96c239
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public MutableDirectBuffer encodePayloadGenerationEvent(UUID memberId, int paylo
return encodedBuffer;
}

public MutableDirectBuffer encodePayloadChunkRequest(long payloadOffset, Member from) {
public MutableDirectBuffer encodePayloadChunkRequest(Member from, int payloadOffset) {
encodedLength = 0;

payloadChunkRequestEncoder.wrapAndApplyHeader(encodedBuffer, 0, headerEncoder);
Expand All @@ -47,10 +47,11 @@ public MutableDirectBuffer encodePayloadChunkRequest(long payloadOffset, Member
}

public MutableDirectBuffer encodePayloadChunkResponse(
Member from, DirectBuffer chunk, int chunkOffset, int chunkLength) {
Member from, int payloadOffset, DirectBuffer chunk, int chunkOffset, int chunkLength) {
encodedLength = 0;

payloadChunkResponseEncoder.wrapAndApplyHeader(encodedBuffer, 0, headerEncoder);
payloadChunkResponseEncoder.payloadOffset(payloadOffset);
payloadChunkResponseEncoder.putFrom(memberCodec.encode(from), 0, memberCodec.encodedLength());
payloadChunkResponseEncoder.putChunk(chunk, chunkOffset, chunkLength);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,21 @@ public int size() {
return payloadIndex.size();
}

public boolean putPayload(UUID memberId, DirectBuffer chunk, int chunkOffset, int chunkLength)
public boolean putPayload(
UUID memberId, int payloadOffset, DirectBuffer chunk, int chunkOffset, int chunkLength)
throws IOException {
final PayloadInfo payloadInfo = payloadIndex.get(memberId);
if (payloadInfo == null) {
return false;
throw new IllegalArgumentException("Payload not found, memberId: " + memberId);
}

if (payloadOffset != payloadInfo.appendOffset()) {
throw new IllegalArgumentException("Invalid payloadOffset: " + payloadOffset);
}

final int newAppendOffset = payloadInfo.appendOffset() + chunkLength;

if (payloadInfo.length() < newAppendOffset) {
if (newAppendOffset > payloadInfo.length()) {
throw new IllegalArgumentException("Invalid chunkLength: " + chunkLength);
}

Expand Down
3 changes: 2 additions & 1 deletion cluster2/src/main/resources/scalecube-cluster-codecs.xml
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,12 @@
</sbe:message>

<sbe:message name="PayloadChunkRequest" id="402">
<field name="payloadOffset" id="1" type="int64"/>
<field name="payloadOffset" id="1" type="int32"/>
<data name="from" id="100" type="VarData"/>
</sbe:message>

<sbe:message name="PayloadChunkResponse" id="403">
<field name="payloadOffset" id="1" type="int32"/>
<data name="from" id="100" type="VarData"/>
<data name="chunk" id="101" type="VarData"/>
</sbe:message>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.scalecube.cluster2.payload;

import static org.junit.jupiter.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.assertThrows;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -66,9 +67,9 @@ void testReadPayload() throws IOException {
random.nextBytes(bytes);
final UnsafeBuffer buffer = new UnsafeBuffer(bytes);

assertFalse(payloadStore.putPayload(memberId, buffer, 0, 768), "putPayload");
assertFalse(payloadStore.putPayload(memberId, buffer, 768, 255), "putPayload");
assertTrue(payloadStore.putPayload(memberId, buffer, 1023, 9), "putPayload");
assertFalse(payloadStore.putPayload(memberId, 0, buffer, 0, 768), "putPayload");
assertFalse(payloadStore.putPayload(memberId, 768, buffer, 768, 255), "putPayload");
assertTrue(payloadStore.putPayload(memberId, 1023, buffer, 1023, 9), "putPayload");

final ByteBuffer payload = payloadStore.readPayload(memberId);
assertEquals(payloadLength, payload.limit(), "payloadLength");
Expand All @@ -77,4 +78,70 @@ void testReadPayload() throws IOException {

assertArrayEquals(bytes, payloadBytes, "payloadBytes");
}

@Test
void testPutPayloadPayloadNotFound() throws IOException {
final File storeFile = tempDir.resolve("" + System.currentTimeMillis()).toFile();
final PayloadStore payloadStore = new PayloadStore(storeFile);

final UUID memberId = UUID.randomUUID();
final int payloadLength = 1032;

final Random random = new Random();
byte[] bytes = new byte[payloadLength];
random.nextBytes(bytes);
final UnsafeBuffer buffer = new UnsafeBuffer(bytes);

payloadStore.addGeneration(memberId, payloadLength);

assertThrows(
IllegalArgumentException.class,
() -> payloadStore.putPayload(UUID.randomUUID(), 0, buffer, 0, 768));
}

@Test
void testPutPayloadInvalidPayloadOffset() throws IOException {
final File storeFile = tempDir.resolve("" + System.currentTimeMillis()).toFile();
final PayloadStore payloadStore = new PayloadStore(storeFile);

final UUID memberId = UUID.randomUUID();
final int payloadLength = 1032;

final Random random = new Random();
byte[] bytes = new byte[payloadLength];
random.nextBytes(bytes);
final UnsafeBuffer buffer = new UnsafeBuffer(bytes);

payloadStore.addGeneration(memberId, payloadLength);

assertThrows(
IllegalArgumentException.class,
() -> payloadStore.putPayload(memberId, 768, buffer, 0, 768));
}

@Test
void testPutPayloadInvalidChunk() throws IOException {
final File storeFile = tempDir.resolve("" + System.currentTimeMillis()).toFile();
final PayloadStore payloadStore = new PayloadStore(storeFile);

final UUID memberId = UUID.randomUUID();
final int payloadLength = 1032;

payloadStore.addGeneration(memberId, payloadLength);

assertNull(payloadStore.readPayload(memberId), "readPayload");

final Random random = new Random();
byte[] bytes = new byte[payloadLength];
random.nextBytes(bytes);
final UnsafeBuffer buffer = new UnsafeBuffer(bytes);

assertThrows(
IllegalArgumentException.class,
() -> {
assertFalse(payloadStore.putPayload(memberId, 0, buffer, 0, 768), "putPayload");
assertFalse(payloadStore.putPayload(memberId, 768, buffer, 768, 255), "putPayload");
payloadStore.putPayload(memberId, 1023, buffer, 1023, 10);
});
}
}

0 comments on commit a96c239

Please sign in to comment.