diff --git a/java/fury-core/src/main/java/org/apache/fury/Fury.java b/java/fury-core/src/main/java/org/apache/fury/Fury.java index b03527de70..ee3ed4c81e 100644 --- a/java/fury-core/src/main/java/org/apache/fury/Fury.java +++ b/java/fury-core/src/main/java/org/apache/fury/Fury.java @@ -24,26 +24,22 @@ import java.io.OutputStream; import java.nio.ByteOrder; import java.util.ArrayList; -import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.function.Consumer; import javax.annotation.concurrent.NotThreadSafe; import org.apache.fury.builder.JITContext; -import org.apache.fury.collection.ObjectArray; import org.apache.fury.config.CompatibleMode; import org.apache.fury.config.Config; import org.apache.fury.config.FuryBuilder; import org.apache.fury.config.Language; import org.apache.fury.config.LongEncoding; -import org.apache.fury.exception.DeserializationException; import org.apache.fury.io.FuryInputStream; import org.apache.fury.io.FuryReadableChannel; import org.apache.fury.logging.Logger; import org.apache.fury.logging.LoggerFactory; import org.apache.fury.memory.MemoryBuffer; import org.apache.fury.memory.MemoryUtils; -import org.apache.fury.memory.Platform; import org.apache.fury.resolver.ClassInfo; import org.apache.fury.resolver.ClassInfoHolder; import org.apache.fury.resolver.ClassResolver; @@ -286,7 +282,7 @@ private StackOverflowError processStackOverflowError(StackOverflowError e) { throw e; } - private MemoryBuffer getBuffer() { + public MemoryBuffer getBuffer() { MemoryBuffer buf = buffer; if (buf == null) { buf = buffer = MemoryBuffer.newHeapBuffer(64); @@ -294,7 +290,7 @@ private MemoryBuffer getBuffer() { return buf; } - private void resetBuffer() { + public void resetBuffer() { MemoryBuffer buf = buffer; if (buf != null && buf.size() > BUFFER_SIZE_LIMIT) { buffer = MemoryBuffer.newHeapBuffer(BUFFER_SIZE_LIMIT); @@ -759,7 +755,7 @@ public Object deserialize(MemoryBuffer buffer, Iterable outOfBandB } return obj; } catch (Throwable t) { - throw handleReadFailed(t); + throw ExceptionUtils.handleReadFailed(this, t); } finally { resetRead(); jitContext.unlock(); @@ -792,18 +788,6 @@ public Object deserialize(FuryReadableChannel channel, Iterable ou return deserialize(buf, outOfBandBuffers); } - private RuntimeException handleReadFailed(Throwable t) { - if (refResolver instanceof MapRefResolver) { - ObjectArray readObjects = ((MapRefResolver) refResolver).getReadObjects(); - // carry with read objects for better trouble shooting. - List objects = Arrays.asList(readObjects.objects).subList(0, readObjects.size); - throw new DeserializationException(objects, t); - } else { - Platform.throwException(t); - throw new IllegalStateException("unreachable"); - } - } - private Object xdeserializeInternal(MemoryBuffer buffer) { Object obj; int nativeObjectsStartOffset = buffer.readInt32(); @@ -1092,7 +1076,7 @@ public T deserializeJavaObject(MemoryBuffer buffer, Class cls) { return null; } } catch (Throwable t) { - throw handleReadFailed(t); + throw ExceptionUtils.handleReadFailed(this, t); } finally { resetRead(); jitContext.unlock(); @@ -1102,6 +1086,10 @@ public T deserializeJavaObject(MemoryBuffer buffer, Class cls) { /** * Deserialize java object from binary by passing class info, serialization should use {@link * #serializeJavaObject}. + * + *

Note that {@link FuryInputStream} will buffer and read more data, do not use the original + * passed stream when constructing {@link FuryInputStream}. If this is not possible, use {@link + * org.apache.fury.io.BlockedStreamUtils} instead for streaming serialization and deserialization. */ @Override public T deserializeJavaObject(FuryInputStream inputStream, Class cls) { @@ -1116,6 +1104,10 @@ public T deserializeJavaObject(FuryInputStream inputStream, Class cls) { /** * Deserialize java object from binary channel by passing class info, serialization should use * {@link #serializeJavaObject}. + * + *

Note that {@link FuryInputStream} will buffer and read more data, do not use the original + * passed stream when constructing {@link FuryInputStream}. If this is not possible, use {@link + * org.apache.fury.io.BlockedStreamUtils} instead for streaming serialization and deserialization. */ @Override public T deserializeJavaObject(FuryReadableChannel channel, Class cls) { @@ -1191,7 +1183,7 @@ public Object deserializeJavaObjectAndClass(MemoryBuffer buffer) { } return readRef(buffer); } catch (Throwable t) { - throw handleReadFailed(t); + throw ExceptionUtils.handleReadFailed(this, t); } finally { resetRead(); jitContext.unlock(); diff --git a/java/fury-core/src/main/java/org/apache/fury/io/BlockedStreamUtils.java b/java/fury-core/src/main/java/org/apache/fury/io/BlockedStreamUtils.java new file mode 100644 index 0000000000..961b65f0a6 --- /dev/null +++ b/java/fury-core/src/main/java/org/apache/fury/io/BlockedStreamUtils.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.fury.io; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.ReadableByteChannel; +import java.util.function.Consumer; +import java.util.function.Function; +import org.apache.fury.Fury; +import org.apache.fury.exception.DeserializationException; +import org.apache.fury.memory.MemoryBuffer; +import org.apache.fury.serializer.BufferCallback; +import org.apache.fury.util.ExceptionUtils; +import org.apache.fury.util.Preconditions; + +/** + * A serialization helper as the fallback of streaming serialization/deserialization in {@link + * FuryInputStream}/{@link FuryReadableChannel}. {@link FuryInputStream}/{@link FuryReadableChannel} + * will buffer and read more data, which makes the original passed stream when constructing {@link + * FuryInputStream} not usable. If this is not possible, use this {@link BlockedStreamUtils} instead + * for streaming serialization and deserialization. + * + *

Note that this mode will disable streaming in essence. It's just a helper for make the usage + * in streaming interface more easily. The deserialization will read whole bytes before do the + * actual deserialization, which don't have any streaming behaviour under the hood. + */ +public class BlockedStreamUtils { + public static void serialize(Fury fury, OutputStream outputStream, Object obj) { + serializeToStream(fury, outputStream, buf -> fury.serialize(buf, obj, null)); + } + + public static void serialize( + Fury fury, OutputStream outputStream, Object obj, BufferCallback callback) { + serializeToStream(fury, outputStream, buf -> fury.serialize(buf, obj, callback)); + } + + /** + * Serialize java object without class info, deserialization should use {@link + * #deserializeJavaObject}. + */ + public static void serializeJavaObject(Fury fury, OutputStream outputStream, Object obj) { + serializeToStream(fury, outputStream, buf -> fury.serializeJavaObject(buf, obj)); + } + + public static Object deserialize(Fury fury, InputStream inputStream) { + return deserialize(fury, inputStream, null); + } + + public static Object deserialize( + Fury fury, InputStream inputStream, Iterable outOfBandBuffers) { + return deserializeFromStream(fury, inputStream, buf -> fury.deserialize(buf, outOfBandBuffers)); + } + + public static Object deserialize(Fury fury, ReadableByteChannel channel) { + return readFromChannel(fury, channel, b -> fury.deserialize(b, null)); + } + + public static Object deserialize( + Fury fury, ReadableByteChannel channel, Iterable outOfBandBuffers) { + return readFromChannel(fury, channel, b -> fury.deserialize(b, outOfBandBuffers)); + } + + @SuppressWarnings("unchecked") + public static T deserializeJavaObject(Fury fury, InputStream inputStream, Class type) { + return (T) + deserializeFromStream(fury, inputStream, buf -> fury.deserializeJavaObject(buf, type)); + } + + public static Object deserializeJavaObject( + Fury fury, ReadableByteChannel channel, Class type) { + return readFromChannel(fury, channel, b -> fury.deserializeJavaObject(b, type)); + } + + private static Object readFromChannel( + Fury fury, ReadableByteChannel channel, Function action) { + try { + MemoryBuffer buf = fury.getBuffer(); + ByteBuffer byteBuffer = ByteBuffer.allocate(4); + byteBuffer.order(ByteOrder.LITTLE_ENDIAN); + readByteBuffer(channel, byteBuffer, 4); + int size = byteBuffer.getInt(); + buf.ensure(size); + readByteBuffer(channel, buf.sliceAsByteBuffer(), size); + return action.apply(buf); + } finally { + fury.resetBuffer(); + } + } + + private static void readByteBuffer(ReadableByteChannel channel, ByteBuffer buffer, int size) { + int read; + buffer.limit(buffer.position() + size); + try { + read = channel.read(buffer); + while (read < size) { + int len = channel.read(buffer); + if (len == -1) { + throw new DeserializationException( + String.format("Channel only have %s, but need %s", read, size)); + } + read += channel.read(buffer); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + buffer.rewind(); + } + + private static void serializeToStream( + Fury fury, OutputStream outputStream, Consumer function) { + MemoryBuffer buf = fury.getBuffer(); + buf.writerIndex(0); + try { + buf.writeInt32(-1); + function.accept(buf); + buf.putInt32(0, buf.writerIndex() - 4); + byte[] bytes = buf.getHeapMemory(); + if (bytes != null) { + outputStream.write(bytes, 0, buf.writerIndex()); + } else { + outputStream.write(buf.getBytes(0, buf.writerIndex())); + } + outputStream.flush(); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + fury.resetBuffer(); + } + } + + private static Object deserializeFromStream( + Fury fury, InputStream inputStream, Function function) { + MemoryBuffer buf = fury.getBuffer(); + try { + readToBufferFromStream(inputStream, buf); + return function.apply(buf); + } catch (Throwable t) { + throw ExceptionUtils.handleReadFailed(fury, t); + } finally { + fury.resetBuffer(); + } + } + + private static void readToBufferFromStream(InputStream inputStream, MemoryBuffer buffer) + throws IOException { + buffer.readerIndex(0); + int read = readBytes(inputStream, buffer.getHeapMemory(), 0, 4); + Preconditions.checkArgument(read == 4); + int size = buffer.readInt32(); + buffer.ensure(4 + size); + read = readBytes(inputStream, buffer.getHeapMemory(), 4, size); + Preconditions.checkArgument(read == size); + } + + private static int readBytes(InputStream inputStream, byte[] buffer, int offset, int size) + throws IOException { + int read = 0; + int count = 0; + while (read < size) { + if ((count = inputStream.read(buffer, offset + read, size - read)) == -1) { + break; + } + read += count; + } + return (read == 0 && count == -1) ? -1 : read; + } +} diff --git a/java/fury-core/src/main/java/org/apache/fury/util/ExceptionUtils.java b/java/fury-core/src/main/java/org/apache/fury/util/ExceptionUtils.java index be72de10cf..664500d61d 100644 --- a/java/fury-core/src/main/java/org/apache/fury/util/ExceptionUtils.java +++ b/java/fury-core/src/main/java/org/apache/fury/util/ExceptionUtils.java @@ -20,7 +20,14 @@ package org.apache.fury.util; import java.lang.reflect.Field; +import java.util.Arrays; +import java.util.List; +import org.apache.fury.Fury; +import org.apache.fury.collection.ObjectArray; +import org.apache.fury.exception.DeserializationException; +import org.apache.fury.memory.Platform; import org.apache.fury.reflect.ReflectionUtils; +import org.apache.fury.resolver.MapRefResolver; /** Util for java exceptions. */ public class ExceptionUtils { @@ -48,5 +55,17 @@ public static StackOverflowError trySetStackOverflowErrorMessage( } } + public static RuntimeException handleReadFailed(Fury fury, Throwable t) { + if (fury.getRefResolver() instanceof MapRefResolver) { + ObjectArray readObjects = ((MapRefResolver) fury.getRefResolver()).getReadObjects(); + // carry with read objects for better trouble shooting. + List objects = Arrays.asList(readObjects.objects).subList(0, readObjects.size); + throw new DeserializationException(objects, t); + } else { + Platform.throwException(t); + throw new IllegalStateException("unreachable"); + } + } + public static void ignore(Object... args) {} } diff --git a/java/fury-core/src/test/java/org/apache/fury/CyclicTest.java b/java/fury-core/src/test/java/org/apache/fury/CyclicTest.java index ac3a4049e3..58ca1778ff 100644 --- a/java/fury-core/src/test/java/org/apache/fury/CyclicTest.java +++ b/java/fury-core/src/test/java/org/apache/fury/CyclicTest.java @@ -21,8 +21,12 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; +import java.util.zip.GZIPOutputStream; import org.apache.fury.config.CompatibleMode; import org.apache.fury.config.FuryBuilder; import org.apache.fury.config.Language; @@ -86,24 +90,13 @@ public void testBean(FuryBuilder builder) { } } - @Test(dataProvider = "fury") - public void testBeanMetaShared(FuryBuilder builder) { - Fury fury = builder.withMetaContextShare(true).withRefTracking(true).build(); - for (Object[] objects : beans()) { - Object notCyclic = objects[0]; - Object cyclic = objects[1]; - Assert.assertEquals(notCyclic, serDeMetaShared(fury, notCyclic)); - Assert.assertEquals(cyclic, serDeMetaShared(fury, cyclic)); - Object[] arr = new Object[2]; - arr[0] = arr; - arr[1] = cyclic; - Assert.assertEquals(arr[1], ((Object[]) serDeMetaShared(fury, arr))[1]); - List list = new ArrayList<>(); - list.add(list); - list.add(cyclic); - list.add(arr); - Assert.assertEquals( - ((Object[]) list.get(2))[1], ((Object[]) ((List) serDeMetaShared(fury, list)).get(2))[1]); - } + @Test + public void testBeanMetaShared() throws IOException { + ByteArrayOutputStream s = new ByteArrayOutputStream(); + GZIPOutputStream gzipOutputStream = new GZIPOutputStream(s); + gzipOutputStream.write(Fury.class.getName().getBytes(StandardCharsets.UTF_8)); + gzipOutputStream.close(); + System.out.println("gzip" + s.size()); + System.out.println(Fury.class.getName().getBytes(StandardCharsets.UTF_8).length); } } diff --git a/java/fury-core/src/test/java/org/apache/fury/io/BlockedStreamUtilsTest.java b/java/fury-core/src/test/java/org/apache/fury/io/BlockedStreamUtilsTest.java new file mode 100644 index 0000000000..4530440c84 --- /dev/null +++ b/java/fury-core/src/test/java/org/apache/fury/io/BlockedStreamUtilsTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.fury.io; + +import static org.testng.Assert.*; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import org.apache.fury.Fury; +import org.apache.fury.FuryTestBase; +import org.apache.fury.memory.MemoryBuffer; +import org.apache.fury.test.bean.Foo; +import org.testng.annotations.Test; + +public class BlockedStreamUtilsTest extends FuryTestBase { + + @Test + public void testDeserializeStream() { + Fury fury = getJavaFury(); + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + Foo foo = Foo.create(); + BlockedStreamUtils.serialize(fury, stream, foo); + BlockedStreamUtils.serializeJavaObject(fury, stream, foo); + ByteArrayInputStream inputStream = new ByteArrayInputStream(stream.toByteArray()); + assertEquals(BlockedStreamUtils.deserialize(fury, inputStream), foo); + assertEquals(BlockedStreamUtils.deserializeJavaObject(fury, inputStream, Foo.class), foo); + } + + @Test + public void testDeserializeChannel() { + Fury fury = builder().withCodegen(false).build(); + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + Foo foo = Foo.create(); + BlockedStreamUtils.serialize(fury, stream, foo); + BlockedStreamUtils.serializeJavaObject(fury, stream, foo); + try (MemoryBufferReadableChannel channel = + new MemoryBufferReadableChannel(MemoryBuffer.fromByteArray(stream.toByteArray()))) { + assertEquals(BlockedStreamUtils.deserialize(fury, channel), foo); + assertEquals(BlockedStreamUtils.deserializeJavaObject(fury, channel, Foo.class), foo); + } + } +}