Skip to content

Commit

Permalink
feat(java): configurable buffer size limit (#1963)
Browse files Browse the repository at this point in the history
## What does this PR do?

This PR introduces a new configuration option `bufferSizeLimitBytes`
that replaces the hard-coded default of 128kb.

## Related issues

#1950

## Does this PR introduce any user-facing change?

The PR introduces a new configuration option `bufferSizeLimitBytes`.

- [x] Does this PR introduce any public API change?
- [ ] Does this PR introduce any binary protocol compatibility change?

## Discussion

This PR solves my problem, but I'm not sure if it is the right way to
move forward. This is quite a low-level configuration option, but a
potentially very important one. Every user whose average payload size is
>=128kb, will need to increase this value for maximum performance. Maybe
the default limit should be increased to something less conservative
like 1MB, so fewer users will need to adjust this setting?
  • Loading branch information
theigl authored Dec 5, 2024
1 parent 54b62fb commit b3f531c
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 3 deletions.
5 changes: 2 additions & 3 deletions java/fury-core/src/main/java/org/apache/fury/Fury.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ public final class Fury implements BaseFury {
private static final byte isOutOfBandFlag = 1 << 3;
private static final boolean isLittleEndian = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN;
private static final byte BITMAP = isLittleEndian ? isLittleEndianFlag : 0;
private static final int BUFFER_SIZE_LIMIT = 128 * 1024;
private static final short MAGIC_NUMBER = 0x62D4;

private final Config config;
Expand Down Expand Up @@ -330,8 +329,8 @@ public MemoryBuffer getBuffer() {

public void resetBuffer() {
MemoryBuffer buf = buffer;
if (buf != null && buf.size() > BUFFER_SIZE_LIMIT) {
buffer = MemoryBuffer.newHeapBuffer(BUFFER_SIZE_LIMIT);
if (buf != null && buf.size() > config.bufferSizeLimitBytes()) {
buffer = MemoryBuffer.newHeapBuffer(config.bufferSizeLimitBytes());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class Config implements Serializable {
private transient int configHash;
private final boolean deserializeNonexistentEnumValueAsNull;
private final boolean serializeEnumByName;
private final int bufferSizeLimitBytes;

public Config(FuryBuilder builder) {
name = builder.name;
Expand Down Expand Up @@ -95,6 +96,7 @@ public Config(FuryBuilder builder) {
scalaOptimizationEnabled = builder.scalaOptimizationEnabled;
deserializeNonexistentEnumValueAsNull = builder.deserializeNonexistentEnumValueAsNull;
serializeEnumByName = builder.serializeEnumByName;
bufferSizeLimitBytes = builder.bufferSizeLimitBytes;
}

/** Returns the name for Fury serialization. */
Expand Down Expand Up @@ -187,6 +189,10 @@ public LongEncoding longEncoding() {
return longEncoding;
}

public int bufferSizeLimitBytes() {
return bufferSizeLimitBytes;
}

public boolean requireClassRegistration() {
return requireClassRegistration;
}
Expand Down Expand Up @@ -283,6 +289,7 @@ public boolean equals(Object o) {
&& compressString == config.compressString
&& compressInt == config.compressInt
&& compressLong == config.compressLong
&& bufferSizeLimitBytes == config.bufferSizeLimitBytes
&& requireClassRegistration == config.requireClassRegistration
&& suppressClassRegistrationWarnings == config.suppressClassRegistrationWarnings
&& registerGuavaTypes == config.registerGuavaTypes
Expand Down Expand Up @@ -317,6 +324,7 @@ public int hashCode() {
compressInt,
compressLong,
longEncoding,
bufferSizeLimitBytes,
requireClassRegistration,
suppressClassRegistrationWarnings,
registerGuavaTypes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public final class FuryBuilder {
boolean suppressClassRegistrationWarnings = true;
boolean deserializeNonexistentEnumValueAsNull = false;
boolean serializeEnumByName = false;
int bufferSizeLimitBytes = 128 * 1024;
MetaCompressor metaCompressor = new DeflaterMetaCompressor();

public FuryBuilder() {}
Expand Down Expand Up @@ -184,6 +185,17 @@ public FuryBuilder withStringCompressed(boolean stringCompressed) {
return this;
}

/**
* Sets the limit for Fury's internal buffer. If the buffer size exceeds this limit, it will be
* reset to this limit after every serialization and deserialization.
*
* <p>The default is 128k.
*/
public FuryBuilder withBufferSizeLimitBytes(int bufferSizeLimitBytes) {
this.bufferSizeLimitBytes = bufferSizeLimitBytes;
return this;
}

/**
* Set classloader for fury to load classes, this classloader can't up updated. Fury will cache
* the class meta data, if classloader can be updated, there may be class meta collision if
Expand Down
21 changes: 21 additions & 0 deletions java/fury-core/src/test/java/org/apache/fury/FuryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -615,4 +615,25 @@ public void testNullObjSerAndDe() {
Object obj = fury.deserializeJavaObjectAndClass(bytes);
assertNull(obj);
}

@Test
public void testResetBufferToSizeLimit() {
final int minBufferBytes = 64;
final int limitInBytes = 1024;
Fury fury = Fury.builder().withBufferSizeLimitBytes(limitInBytes).build();

final byte[] smallPayload = new byte[0];
final byte[] serializedSmall = fury.serialize(smallPayload);
assertEquals(fury.getBuffer().size(), minBufferBytes);

fury.deserialize(serializedSmall);
assertEquals(fury.getBuffer().size(), minBufferBytes);

final byte[] largePayload = new byte[limitInBytes * 2];
final byte[] serializedLarge = fury.serialize(largePayload);
assertEquals(fury.getBuffer().size(), limitInBytes);

fury.deserialize(serializedLarge);
assertEquals(fury.getBuffer().size(), limitInBytes);
}
}

0 comments on commit b3f531c

Please sign in to comment.