From 5f088c4e471b2e5c3eaff178cc0214b30acb8bca Mon Sep 17 00:00:00 2001 From: James Netherton Date: Mon, 8 Jan 2024 14:37:02 +0000 Subject: [PATCH] Workaround BigQuery & Apache Arrow Netty imcompatibilities #5641 --- .../java/io/netty/buffer/LargeBuffer.java | 34 ++ .../netty/buffer/MutableWrappedByteBuf.java | 447 ++++++++++++++++++ .../netty/buffer/PooledByteBufAllocatorL.java | 275 +++++++++++ .../buffer/UnsafeDirectLittleEndian.java | 261 ++++++++++ 4 files changed, 1017 insertions(+) create mode 100644 extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/LargeBuffer.java create mode 100644 extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java create mode 100644 extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java create mode 100644 extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java diff --git a/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/LargeBuffer.java b/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/LargeBuffer.java new file mode 100644 index 000000000000..1d81b725b4fb --- /dev/null +++ b/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/LargeBuffer.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.netty.buffer; + +/** + * A MutableWrappedByteBuf that also maintains a metric of the number of huge buffer bytes and + * counts. + */ +public class LargeBuffer extends MutableWrappedByteBuf { + + public LargeBuffer(ByteBuf buffer) { + super(buffer); + } + + @Override + public ByteBuf copy(int index, int length) { + return new LargeBuffer(buffer.copy(index, length)); + } +} diff --git a/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java b/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java new file mode 100644 index 000000000000..2bee1bc50990 --- /dev/null +++ b/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java @@ -0,0 +1,447 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.netty.buffer; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.FileChannel; +import java.nio.channels.GatheringByteChannel; +import java.nio.channels.ScatteringByteChannel; + +import io.netty.util.ByteProcessor; + +/** + * This is basically a complete copy of netty's DuplicatedByteBuf. We copy because we want to override + * some behaviors and make buffer mutable. + */ +abstract class MutableWrappedByteBuf extends AbstractByteBuf { + + ByteBuf buffer; + + public MutableWrappedByteBuf(ByteBuf buffer) { + super(buffer.maxCapacity()); + + if (buffer instanceof MutableWrappedByteBuf) { + this.buffer = ((MutableWrappedByteBuf) buffer).buffer; + } else { + this.buffer = buffer; + } + + setIndex(buffer.readerIndex(), buffer.writerIndex()); + } + + @Override + public ByteBuffer nioBuffer(int index, int length) { + return unwrap().nioBuffer(index, length); + } + + @Override + public ByteBuf unwrap() { + return buffer; + } + + @Override + public ByteBufAllocator alloc() { + return buffer.alloc(); + } + + @Override + public ByteOrder order() { + return buffer.order(); + } + + @Override + public boolean isDirect() { + return buffer.isDirect(); + } + + @Override + public int capacity() { + return buffer.capacity(); + } + + @Override + public ByteBuf capacity(int newCapacity) { + buffer.capacity(newCapacity); + return this; + } + + @Override + public boolean hasArray() { + return buffer.hasArray(); + } + + @Override + public byte[] array() { + return buffer.array(); + } + + @Override + public int arrayOffset() { + return buffer.arrayOffset(); + } + + @Override + public boolean hasMemoryAddress() { + return buffer.hasMemoryAddress(); + } + + @Override + public long memoryAddress() { + return buffer.memoryAddress(); + } + + @Override + public byte getByte(int index) { + return _getByte(index); + } + + @Override + protected byte _getByte(int index) { + return buffer.getByte(index); + } + + @Override + public short getShort(int index) { + return _getShort(index); + } + + @Override + protected short _getShort(int index) { + return buffer.getShort(index); + } + + @Override + public short getShortLE(int index) { + return buffer.getShortLE(index); + } + + @Override + protected short _getShortLE(int index) { + return buffer.getShortLE(index); + } + + @Override + public int getUnsignedMedium(int index) { + return _getUnsignedMedium(index); + } + + @Override + protected int _getUnsignedMedium(int index) { + return buffer.getUnsignedMedium(index); + } + + @Override + public int getUnsignedMediumLE(int index) { + return buffer.getUnsignedMediumLE(index); + } + + @Override + protected int _getUnsignedMediumLE(int index) { + return buffer.getUnsignedMediumLE(index); + } + + @Override + public int getInt(int index) { + return _getInt(index); + } + + @Override + protected int _getInt(int index) { + return buffer.getInt(index); + } + + @Override + public int getIntLE(int index) { + return buffer.getIntLE(index); + } + + @Override + protected int _getIntLE(int index) { + return buffer.getIntLE(index); + } + + @Override + public long getLong(int index) { + return _getLong(index); + } + + @Override + protected long _getLong(int index) { + return buffer.getLong(index); + } + + @Override + public long getLongLE(int index) { + return buffer.getLongLE(index); + } + + @Override + protected long _getLongLE(int index) { + return buffer.getLongLE(index); + } + + @Override + public abstract ByteBuf copy(int index, int length); + + @Override + public ByteBuf slice(int index, int length) { + return new SlicedByteBuf(this, index, length); + } + + @Override + public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) { + buffer.getBytes(index, dst, dstIndex, length); + return this; + } + + @Override + public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) { + buffer.getBytes(index, dst, dstIndex, length); + return this; + } + + @Override + public ByteBuf getBytes(int index, ByteBuffer dst) { + buffer.getBytes(index, dst); + return this; + } + + @Override + public ByteBuf setByte(int index, int value) { + _setByte(index, value); + return this; + } + + @Override + protected void _setByte(int index, int value) { + buffer.setByte(index, value); + } + + @Override + public ByteBuf setShort(int index, int value) { + _setShort(index, value); + return this; + } + + @Override + protected void _setShort(int index, int value) { + buffer.setShort(index, value); + } + + @Override + public ByteBuf setShortLE(int index, int value) { + buffer.setShortLE(index, value); + return this; + } + + @Override + protected void _setShortLE(int index, int value) { + buffer.setShortLE(index, value); + } + + @Override + public ByteBuf setMedium(int index, int value) { + _setMedium(index, value); + return this; + } + + @Override + protected void _setMedium(int index, int value) { + buffer.setMedium(index, value); + } + + @Override + public ByteBuf setMediumLE(int index, int value) { + buffer.setMediumLE(index, value); + return this; + } + + @Override + protected void _setMediumLE(int index, int value) { + buffer.setMediumLE(index, value); + } + + @Override + public ByteBuf setInt(int index, int value) { + _setInt(index, value); + return this; + } + + @Override + protected void _setInt(int index, int value) { + buffer.setInt(index, value); + } + + @Override + public ByteBuf setIntLE(int index, int value) { + buffer.setIntLE(index, value); + return this; + } + + @Override + protected void _setIntLE(int index, int value) { + buffer.setIntLE(index, value); + } + + @Override + public ByteBuf setLong(int index, long value) { + _setLong(index, value); + return this; + } + + @Override + protected void _setLong(int index, long value) { + buffer.setLong(index, value); + } + + @Override + public ByteBuf setLongLE(int index, long value) { + buffer.setLongLE(index, value); + return this; + } + + @Override + protected void _setLongLE(int index, long value) { + buffer.setLongLE(index, value); + } + + @Override + public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) { + buffer.setBytes(index, src, srcIndex, length); + return this; + } + + @Override + public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) { + buffer.setBytes(index, src, srcIndex, length); + return this; + } + + @Override + public ByteBuf setBytes(int index, ByteBuffer src) { + buffer.setBytes(index, src); + return this; + } + + @Override + public int setBytes(int index, FileChannel in, long position, int length) + throws IOException { + return buffer.setBytes(index, in, position, length); + } + + @Override + public ByteBuf getBytes(int index, OutputStream out, int length) + throws IOException { + buffer.getBytes(index, out, length); + return this; + } + + @Override + public int getBytes(int index, GatheringByteChannel out, int length) + throws IOException { + return buffer.getBytes(index, out, length); + } + + @Override + public int setBytes(int index, InputStream in, int length) + throws IOException { + return buffer.setBytes(index, in, length); + } + + @Override + public int setBytes(int index, ScatteringByteChannel in, int length) + throws IOException { + return buffer.setBytes(index, in, length); + } + + @Override + public int getBytes(int index, FileChannel out, long position, int length) + throws IOException { + return buffer.getBytes(index, out, position, length); + } + + @Override + public int nioBufferCount() { + return buffer.nioBufferCount(); + } + + @Override + public ByteBuffer[] nioBuffers(int index, int length) { + return buffer.nioBuffers(index, length); + } + + @Override + public ByteBuffer internalNioBuffer(int index, int length) { + return nioBuffer(index, length); + } + + @Override + public int forEachByte(int index, int length, ByteProcessor processor) { + return buffer.forEachByte(index, length, processor); + } + + @Override + public int forEachByteDesc(int index, int length, ByteProcessor processor) { + return buffer.forEachByteDesc(index, length, processor); + } + + @Override + public final int refCnt() { + return unwrap().refCnt(); + } + + @Override + public final ByteBuf touch() { + unwrap().touch(); + return this; + } + + @Override + public final ByteBuf touch(Object hint) { + unwrap().touch(hint); + return this; + } + + @Override + public final ByteBuf retain() { + unwrap().retain(); + return this; + } + + @Override + public final ByteBuf retain(int increment) { + unwrap().retain(increment); + return this; + } + + @Override + public boolean release() { + return release(1); + } + + @Override + public boolean release(int decrement) { + boolean released = unwrap().release(decrement); + return released; + } + +} diff --git a/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java b/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java new file mode 100644 index 000000000000..16025d8655c4 --- /dev/null +++ b/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.netty.buffer; + +import java.lang.reflect.Field; +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicLong; + +import io.netty.util.internal.OutOfDirectMemoryError; +import io.netty.util.internal.StringUtil; +import org.apache.arrow.memory.OutOfMemoryException; +import org.apache.arrow.memory.util.LargeMemoryUtil; + +import static org.apache.arrow.memory.util.AssertionUtil.ASSERT_ENABLED; + +/** + * The base allocator that we use for all of Arrow's memory management. Returns + * UnsafeDirectLittleEndian buffers. + */ +public class PooledByteBufAllocatorL { + + private static final org.slf4j.Logger memoryLogger = org.slf4j.LoggerFactory.getLogger("arrow.allocator"); + + private static final int MEMORY_LOGGER_FREQUENCY_SECONDS = 60; + public final UnsafeDirectLittleEndian empty; + private final AtomicLong hugeBufferSize = new AtomicLong(0); + private final AtomicLong hugeBufferCount = new AtomicLong(0); + private final AtomicLong normalBufferSize = new AtomicLong(0); + private final AtomicLong normalBufferCount = new AtomicLong(0); + private final InnerAllocator allocator; + + public PooledByteBufAllocatorL() { + allocator = new InnerAllocator(); + empty = new UnsafeDirectLittleEndian(new DuplicatedByteBuf(Unpooled.EMPTY_BUFFER)); + } + + /** + * Returns a {@linkplain io.netty.buffer.UnsafeDirectLittleEndian} of the given size. + */ + public UnsafeDirectLittleEndian allocate(long size) { + try { + return allocator.directBuffer(LargeMemoryUtil.checkedCastToInt(size), Integer.MAX_VALUE); + } catch (OutOfMemoryError e) { + /* + * OutOfDirectMemoryError is thrown by Netty when we exceed the direct memory limit defined by + * -XX:MaxDirectMemorySize. OutOfMemoryError with "Direct buffer memory" message is thrown by + * java.nio.Bits when we exceed the direct memory limit. This should never be hit in practice + * as Netty is expected to throw an OutOfDirectMemoryError first. + */ + if (e instanceof OutOfDirectMemoryError || "Direct buffer memory".equals(e.getMessage())) { + throw new OutOfMemoryException("Failure allocating buffer.", e); + } + throw e; + } + } + + public int getChunkSize() { + return allocator.chunkSize(); + } + + public long getHugeBufferSize() { + return hugeBufferSize.get(); + } + + public long getHugeBufferCount() { + return hugeBufferCount.get(); + } + + public long getNormalBufferSize() { + return normalBufferSize.get(); + } + + public long getNormalBufferCount() { + return normalBufferSize.get(); + } + + private static class AccountedUnsafeDirectLittleEndian extends UnsafeDirectLittleEndian { + + private final long initialCapacity; + private final AtomicLong count; + private final AtomicLong size; + + private AccountedUnsafeDirectLittleEndian(LargeBuffer buf, AtomicLong count, AtomicLong size) { + super(buf); + this.initialCapacity = buf.capacity(); + this.count = count; + this.size = size; + } + + private AccountedUnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf, AtomicLong count, + AtomicLong size) { + super(buf); + this.initialCapacity = buf.capacity(); + this.count = count; + this.size = size; + } + + @Override + public ByteBuf copy() { + throw new UnsupportedOperationException("copy method is not supported"); + } + + @Override + public ByteBuf copy(int index, int length) { + throw new UnsupportedOperationException("copy method is not supported"); + } + + @Override + public boolean release(int decrement) { + boolean released = super.release(decrement); + if (released) { + count.decrementAndGet(); + size.addAndGet(-initialCapacity); + } + return released; + } + + } + + private class InnerAllocator extends PooledByteBufAllocator { + + private final PoolArena[] directArenas; + private final MemoryStatusThread statusThread; + + public InnerAllocator() { + super(true); + + try { + Field f = PooledByteBufAllocator.class.getDeclaredField("directArenas"); + f.setAccessible(true); + this.directArenas = (PoolArena[]) f.get(this); + } catch (Exception e) { + throw new RuntimeException("Failure while initializing allocator. Unable to retrieve direct arenas field.", e); + } + + if (memoryLogger.isTraceEnabled()) { + statusThread = new MemoryStatusThread(this); + statusThread.start(); + } else { + statusThread = null; + } + } + + private UnsafeDirectLittleEndian newDirectBufferL(int initialCapacity, int maxCapacity) { + PoolThreadCache cache = threadCache(); + PoolArena directArena = cache.directArena; + + if (directArena != null) { + + if (initialCapacity > chunkSize()) { + // This is beyond chunk size so we'll allocate separately. + ByteBuf buf = UnpooledByteBufAllocator.DEFAULT.directBuffer(initialCapacity, maxCapacity); + + hugeBufferSize.addAndGet(buf.capacity()); + hugeBufferCount.incrementAndGet(); + + // logger.debug("Allocating huge buffer of size {}", initialCapacity, new Exception()); + return new AccountedUnsafeDirectLittleEndian(new LargeBuffer(buf), hugeBufferCount, + hugeBufferSize); + } else { + // within chunk, use arena. + ByteBuf buf = directArena.allocate(cache, initialCapacity, maxCapacity); + if (!(buf instanceof PooledUnsafeDirectByteBuf)) { + fail(); + } + + if (!ASSERT_ENABLED) { + return new UnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf); + } + + normalBufferSize.addAndGet(buf.capacity()); + normalBufferCount.incrementAndGet(); + + return new AccountedUnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf, + normalBufferCount, normalBufferSize); + } + + } else { + throw fail(); + } + } + + private UnsupportedOperationException fail() { + return new UnsupportedOperationException( + "Arrow requires that the JVM used supports access sun.misc.Unsafe. This platform " + + "didn't provide that functionality."); + } + + @Override + public UnsafeDirectLittleEndian directBuffer(int initialCapacity, int maxCapacity) { + if (initialCapacity == 0 && maxCapacity == 0) { + newDirectBuffer(initialCapacity, maxCapacity); + } + validate(initialCapacity, maxCapacity); + return newDirectBufferL(initialCapacity, maxCapacity); + } + + @Override + public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) { + throw new UnsupportedOperationException("Arrow doesn't support using heap buffers."); + } + + private void validate(int initialCapacity, int maxCapacity) { + if (initialCapacity < 0) { + throw new IllegalArgumentException("initialCapacity: " + initialCapacity + " (expected: 0+)"); + } + if (initialCapacity > maxCapacity) { + throw new IllegalArgumentException(String.format( + "initialCapacity: %d (expected: not greater than maxCapacity(%d)", + initialCapacity, maxCapacity)); + } + } + + @Override + public String toString() { + StringBuilder buf = new StringBuilder(); + buf.append(directArenas.length); + buf.append(" direct arena(s):"); + buf.append(StringUtil.NEWLINE); + for (PoolArena a : directArenas) { + buf.append(a); + } + + buf.append("Large buffers outstanding: "); + buf.append(hugeBufferCount.get()); + buf.append(" totaling "); + buf.append(hugeBufferSize.get()); + buf.append(" bytes."); + buf.append('\n'); + buf.append("Normal buffers outstanding: "); + buf.append(normalBufferCount.get()); + buf.append(" totaling "); + buf.append(normalBufferSize.get()); + buf.append(" bytes."); + return buf.toString(); + } + + private class MemoryStatusThread extends Thread { + private final InnerAllocator allocator; + + public MemoryStatusThread(InnerAllocator allocator) { + super("allocation.logger"); + this.setDaemon(true); + this.allocator = allocator; + } + + @Override + public void run() { + while (true) { + memoryLogger.trace("Memory Usage: \n{}", allocator); + try { + Thread.sleep(MEMORY_LOGGER_FREQUENCY_SECONDS * 1000); + } catch (InterruptedException e) { + return; + } + } + } + } + + } +} diff --git a/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java b/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java new file mode 100644 index 000000000000..07f0685bc920 --- /dev/null +++ b/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.netty.buffer; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteOrder; +import java.util.concurrent.atomic.AtomicLong; + +import io.netty.util.internal.PlatformDependent; + +/** + * The underlying class we use for little-endian access to memory. Is used underneath ArrowBufs + * to abstract away the + * Netty classes and underlying Netty memory management. + */ +public class UnsafeDirectLittleEndian extends WrappedByteBuf { + private static final AtomicLong ID_GENERATOR = new AtomicLong(0); + public final long id = ID_GENERATOR.incrementAndGet(); + private final AbstractByteBuf wrapped; + private final long memoryAddress; + + UnsafeDirectLittleEndian(DuplicatedByteBuf buf) { + this(buf, true); + } + + UnsafeDirectLittleEndian(LargeBuffer buf) { + this(buf, true); + } + + UnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf) { + this(buf, true); + } + + private UnsafeDirectLittleEndian(AbstractByteBuf buf, boolean fake) { + super(buf); + + this.wrapped = buf; + this.memoryAddress = buf.memoryAddress(); + } + + private long addr(int index) { + return memoryAddress + index; + } + + @Override + public long getLong(int index) { + // wrapped.checkIndex(index, 8); + long v = PlatformDependent.getLong(addr(index)); + return v; + } + + @Override + public float getFloat(int index) { + return Float.intBitsToFloat(getInt(index)); + } + + @Override + public ByteBuf slice() { + return slice(this.readerIndex(), readableBytes()); + } + + @Override + public ByteBuf slice(int index, int length) { + return new SlicedByteBuf(this, index, length); + } + + @Override + public ByteBuf order(ByteOrder endianness) { + return this; + } + + @Override + public double getDouble(int index) { + return Double.longBitsToDouble(getLong(index)); + } + + @Override + public char getChar(int index) { + return (char) getShort(index); + } + + @Override + public long getUnsignedInt(int index) { + return getInt(index) & 0xFFFFFFFFL; + } + + @Override + public int getInt(int index) { + int v = PlatformDependent.getInt(addr(index)); + return v; + } + + @Override + public int getUnsignedShort(int index) { + return getShort(index) & 0xFFFF; + } + + @Override + public short getShort(int index) { + short v = PlatformDependent.getShort(addr(index)); + return v; + } + + @Override + public ByteBuf setShort(int index, int value) { + wrapped.checkIndex(index, 2); + setShort_(index, value); + return this; + } + + @Override + public ByteBuf setInt(int index, int value) { + wrapped.checkIndex(index, 4); + setInt_(index, value); + return this; + } + + @Override + public ByteBuf setLong(int index, long value) { + wrapped.checkIndex(index, 8); + setLong_(index, value); + return this; + } + + @Override + public ByteBuf setChar(int index, int value) { + setShort(index, value); + return this; + } + + @Override + public ByteBuf setFloat(int index, float value) { + setInt(index, Float.floatToRawIntBits(value)); + return this; + } + + @Override + public ByteBuf setDouble(int index, double value) { + setLong(index, Double.doubleToRawLongBits(value)); + return this; + } + + @Override + public ByteBuf writeShort(int value) { + wrapped.ensureWritable(2); + setShort_(wrapped.writerIndex, value); + wrapped.writerIndex += 2; + return this; + } + + @Override + public ByteBuf writeInt(int value) { + wrapped.ensureWritable(4); + setInt_(wrapped.writerIndex, value); + wrapped.writerIndex += 4; + return this; + } + + @Override + public ByteBuf writeLong(long value) { + wrapped.ensureWritable(8); + setLong_(wrapped.writerIndex, value); + wrapped.writerIndex += 8; + return this; + } + + @Override + public ByteBuf writeChar(int value) { + writeShort(value); + return this; + } + + @Override + public ByteBuf writeFloat(float value) { + writeInt(Float.floatToRawIntBits(value)); + return this; + } + + @Override + public ByteBuf writeDouble(double value) { + writeLong(Double.doubleToRawLongBits(value)); + return this; + } + + private void setShort_(int index, int value) { + PlatformDependent.putShort(addr(index), (short) value); + } + + private void setInt_(int index, int value) { + PlatformDependent.putInt(addr(index), value); + } + + private void setLong_(int index, long value) { + PlatformDependent.putLong(addr(index), value); + } + + @Override + public byte getByte(int index) { + return PlatformDependent.getByte(addr(index)); + } + + @Override + public ByteBuf setByte(int index, int value) { + PlatformDependent.putByte(addr(index), (byte) value); + return this; + } + + @Override + public boolean release() { + return release(1); + } + + @Override + public int setBytes(int index, InputStream in, int length) throws IOException { + wrapped.checkIndex(index, length); + byte[] tmp = new byte[length]; + int readBytes = in.read(tmp); + if (readBytes > 0) { + PlatformDependent.copyMemory(tmp, 0, addr(index), readBytes); + } + return readBytes; + } + + @Override + public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException { + wrapped.checkIndex(index, length); + if (length != 0) { + byte[] tmp = new byte[length]; + PlatformDependent.copyMemory(addr(index), tmp, 0, length); + out.write(tmp); + } + return this; + } + + @Override + public int hashCode() { + return System.identityHashCode(this); + } + + @Override + public boolean equals(Object obj) { + return this == obj; + } +}