From d351655082153e0a22d5688b9d16e7303bc29451 Mon Sep 17 00:00:00 2001 From: James Netherton Date: Mon, 19 Feb 2024 10:27:37 +0000 Subject: [PATCH 1/2] Remove BigQuery workaround for Arrow Netty incompatibility Fixes #5641 --- .../java/io/netty/buffer/LargeBuffer.java | 34 -- .../netty/buffer/MutableWrappedByteBuf.java | 447 ------------------ .../netty/buffer/PooledByteBufAllocatorL.java | 275 ----------- .../buffer/UnsafeDirectLittleEndian.java | 261 ---------- 4 files changed, 1017 deletions(-) delete mode 100644 extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/LargeBuffer.java delete mode 100644 extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java delete mode 100644 extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java delete mode 100644 extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java diff --git a/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/LargeBuffer.java b/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/LargeBuffer.java deleted file mode 100644 index 306ce494330a..000000000000 --- a/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/LargeBuffer.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.netty.buffer; - -/** - * A MutableWrappedByteBuf that also maintains a metric of the number of huge buffer bytes and - * counts. - */ -public class LargeBuffer extends MutableWrappedByteBuf { - - public LargeBuffer(ByteBuf buffer) { - super(buffer); - } - - @Override - public ByteBuf copy(int index, int length) { - return new LargeBuffer(buffer.copy(index, length)); - } -} diff --git a/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java b/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java deleted file mode 100644 index 005a049f86d2..000000000000 --- a/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java +++ /dev/null @@ -1,447 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.netty.buffer; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import java.nio.channels.FileChannel; -import java.nio.channels.GatheringByteChannel; -import java.nio.channels.ScatteringByteChannel; - -import io.netty.util.ByteProcessor; - -/** - * This is basically a complete copy of netty's DuplicatedByteBuf. We copy because we want to override - * some behaviors and make buffer mutable. - */ -abstract class MutableWrappedByteBuf extends AbstractByteBuf { - - ByteBuf buffer; - - public MutableWrappedByteBuf(ByteBuf buffer) { - super(buffer.maxCapacity()); - - if (buffer instanceof MutableWrappedByteBuf) { - this.buffer = ((MutableWrappedByteBuf) buffer).buffer; - } else { - this.buffer = buffer; - } - - setIndex(buffer.readerIndex(), buffer.writerIndex()); - } - - @Override - public ByteBuffer nioBuffer(int index, int length) { - return unwrap().nioBuffer(index, length); - } - - @Override - public ByteBuf unwrap() { - return buffer; - } - - @Override - public ByteBufAllocator alloc() { - return buffer.alloc(); - } - - @Override - public ByteOrder order() { - return buffer.order(); - } - - @Override - public boolean isDirect() { - return buffer.isDirect(); - } - - @Override - public int capacity() { - return buffer.capacity(); - } - - @Override - public ByteBuf capacity(int newCapacity) { - buffer.capacity(newCapacity); - return this; - } - - @Override - public boolean hasArray() { - return buffer.hasArray(); - } - - @Override - public byte[] array() { - return buffer.array(); - } - - @Override - public int arrayOffset() { - return buffer.arrayOffset(); - } - - @Override - public boolean hasMemoryAddress() { - return buffer.hasMemoryAddress(); - } - - @Override - public long memoryAddress() { - return buffer.memoryAddress(); - } - - @Override - public byte getByte(int index) { - return _getByte(index); - } - - @Override - protected byte _getByte(int index) { - return buffer.getByte(index); - } - - @Override - public short getShort(int index) { - return _getShort(index); - } - - @Override - protected short _getShort(int index) { - return buffer.getShort(index); - } - - @Override - public short getShortLE(int index) { - return buffer.getShortLE(index); - } - - @Override - protected short _getShortLE(int index) { - return buffer.getShortLE(index); - } - - @Override - public int getUnsignedMedium(int index) { - return _getUnsignedMedium(index); - } - - @Override - protected int _getUnsignedMedium(int index) { - return buffer.getUnsignedMedium(index); - } - - @Override - public int getUnsignedMediumLE(int index) { - return buffer.getUnsignedMediumLE(index); - } - - @Override - protected int _getUnsignedMediumLE(int index) { - return buffer.getUnsignedMediumLE(index); - } - - @Override - public int getInt(int index) { - return _getInt(index); - } - - @Override - protected int _getInt(int index) { - return buffer.getInt(index); - } - - @Override - public int getIntLE(int index) { - return buffer.getIntLE(index); - } - - @Override - protected int _getIntLE(int index) { - return buffer.getIntLE(index); - } - - @Override - public long getLong(int index) { - return _getLong(index); - } - - @Override - protected long _getLong(int index) { - return buffer.getLong(index); - } - - @Override - public long getLongLE(int index) { - return buffer.getLongLE(index); - } - - @Override - protected long _getLongLE(int index) { - return buffer.getLongLE(index); - } - - @Override - public abstract ByteBuf copy(int index, int length); - - @Override - public ByteBuf slice(int index, int length) { - return new SlicedByteBuf(this, index, length); - } - - @Override - public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) { - buffer.getBytes(index, dst, dstIndex, length); - return this; - } - - @Override - public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) { - buffer.getBytes(index, dst, dstIndex, length); - return this; - } - - @Override - public ByteBuf getBytes(int index, ByteBuffer dst) { - buffer.getBytes(index, dst); - return this; - } - - @Override - public ByteBuf setByte(int index, int value) { - _setByte(index, value); - return this; - } - - @Override - protected void _setByte(int index, int value) { - buffer.setByte(index, value); - } - - @Override - public ByteBuf setShort(int index, int value) { - _setShort(index, value); - return this; - } - - @Override - protected void _setShort(int index, int value) { - buffer.setShort(index, value); - } - - @Override - public ByteBuf setShortLE(int index, int value) { - buffer.setShortLE(index, value); - return this; - } - - @Override - protected void _setShortLE(int index, int value) { - buffer.setShortLE(index, value); - } - - @Override - public ByteBuf setMedium(int index, int value) { - _setMedium(index, value); - return this; - } - - @Override - protected void _setMedium(int index, int value) { - buffer.setMedium(index, value); - } - - @Override - public ByteBuf setMediumLE(int index, int value) { - buffer.setMediumLE(index, value); - return this; - } - - @Override - protected void _setMediumLE(int index, int value) { - buffer.setMediumLE(index, value); - } - - @Override - public ByteBuf setInt(int index, int value) { - _setInt(index, value); - return this; - } - - @Override - protected void _setInt(int index, int value) { - buffer.setInt(index, value); - } - - @Override - public ByteBuf setIntLE(int index, int value) { - buffer.setIntLE(index, value); - return this; - } - - @Override - protected void _setIntLE(int index, int value) { - buffer.setIntLE(index, value); - } - - @Override - public ByteBuf setLong(int index, long value) { - _setLong(index, value); - return this; - } - - @Override - protected void _setLong(int index, long value) { - buffer.setLong(index, value); - } - - @Override - public ByteBuf setLongLE(int index, long value) { - buffer.setLongLE(index, value); - return this; - } - - @Override - protected void _setLongLE(int index, long value) { - buffer.setLongLE(index, value); - } - - @Override - public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) { - buffer.setBytes(index, src, srcIndex, length); - return this; - } - - @Override - public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) { - buffer.setBytes(index, src, srcIndex, length); - return this; - } - - @Override - public ByteBuf setBytes(int index, ByteBuffer src) { - buffer.setBytes(index, src); - return this; - } - - @Override - public int setBytes(int index, FileChannel in, long position, int length) - throws IOException { - return buffer.setBytes(index, in, position, length); - } - - @Override - public ByteBuf getBytes(int index, OutputStream out, int length) - throws IOException { - buffer.getBytes(index, out, length); - return this; - } - - @Override - public int getBytes(int index, GatheringByteChannel out, int length) - throws IOException { - return buffer.getBytes(index, out, length); - } - - @Override - public int setBytes(int index, InputStream in, int length) - throws IOException { - return buffer.setBytes(index, in, length); - } - - @Override - public int setBytes(int index, ScatteringByteChannel in, int length) - throws IOException { - return buffer.setBytes(index, in, length); - } - - @Override - public int getBytes(int index, FileChannel out, long position, int length) - throws IOException { - return buffer.getBytes(index, out, position, length); - } - - @Override - public int nioBufferCount() { - return buffer.nioBufferCount(); - } - - @Override - public ByteBuffer[] nioBuffers(int index, int length) { - return buffer.nioBuffers(index, length); - } - - @Override - public ByteBuffer internalNioBuffer(int index, int length) { - return nioBuffer(index, length); - } - - @Override - public int forEachByte(int index, int length, ByteProcessor processor) { - return buffer.forEachByte(index, length, processor); - } - - @Override - public int forEachByteDesc(int index, int length, ByteProcessor processor) { - return buffer.forEachByteDesc(index, length, processor); - } - - @Override - public final int refCnt() { - return unwrap().refCnt(); - } - - @Override - public final ByteBuf touch() { - unwrap().touch(); - return this; - } - - @Override - public final ByteBuf touch(Object hint) { - unwrap().touch(hint); - return this; - } - - @Override - public final ByteBuf retain() { - unwrap().retain(); - return this; - } - - @Override - public final ByteBuf retain(int increment) { - unwrap().retain(increment); - return this; - } - - @Override - public boolean release() { - return release(1); - } - - @Override - public boolean release(int decrement) { - boolean released = unwrap().release(decrement); - return released; - } - -} diff --git a/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java b/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java deleted file mode 100644 index 1202f003db9f..000000000000 --- a/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java +++ /dev/null @@ -1,275 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.netty.buffer; - -import java.lang.reflect.Field; -import java.nio.ByteBuffer; -import java.util.concurrent.atomic.AtomicLong; - -import io.netty.util.internal.OutOfDirectMemoryError; -import io.netty.util.internal.StringUtil; -import org.apache.arrow.memory.OutOfMemoryException; -import org.apache.arrow.memory.util.LargeMemoryUtil; - -import static org.apache.arrow.memory.util.AssertionUtil.ASSERT_ENABLED; - -/** - * The base allocator that we use for all of Arrow's memory management. Returns - * UnsafeDirectLittleEndian buffers. - */ -public class PooledByteBufAllocatorL { - - private static final org.slf4j.Logger memoryLogger = org.slf4j.LoggerFactory.getLogger("arrow.allocator"); - - private static final int MEMORY_LOGGER_FREQUENCY_SECONDS = 60; - public final UnsafeDirectLittleEndian empty; - private final AtomicLong hugeBufferSize = new AtomicLong(0); - private final AtomicLong hugeBufferCount = new AtomicLong(0); - private final AtomicLong normalBufferSize = new AtomicLong(0); - private final AtomicLong normalBufferCount = new AtomicLong(0); - private final InnerAllocator allocator; - - public PooledByteBufAllocatorL() { - allocator = new InnerAllocator(); - empty = new UnsafeDirectLittleEndian(new DuplicatedByteBuf(Unpooled.EMPTY_BUFFER)); - } - - /** - * Returns a {@linkplain io.netty.buffer.UnsafeDirectLittleEndian} of the given size. - */ - public UnsafeDirectLittleEndian allocate(long size) { - try { - return allocator.directBuffer(LargeMemoryUtil.checkedCastToInt(size), Integer.MAX_VALUE); - } catch (OutOfMemoryError e) { - /* - * OutOfDirectMemoryError is thrown by Netty when we exceed the direct memory limit defined by - * -XX:MaxDirectMemorySize. OutOfMemoryError with "Direct buffer memory" message is thrown by - * java.nio.Bits when we exceed the direct memory limit. This should never be hit in practice - * as Netty is expected to throw an OutOfDirectMemoryError first. - */ - if (e instanceof OutOfDirectMemoryError || "Direct buffer memory".equals(e.getMessage())) { - throw new OutOfMemoryException("Failure allocating buffer.", e); - } - throw e; - } - } - - public int getChunkSize() { - return allocator.chunkSize(); - } - - public long getHugeBufferSize() { - return hugeBufferSize.get(); - } - - public long getHugeBufferCount() { - return hugeBufferCount.get(); - } - - public long getNormalBufferSize() { - return normalBufferSize.get(); - } - - public long getNormalBufferCount() { - return normalBufferSize.get(); - } - - private static class AccountedUnsafeDirectLittleEndian extends UnsafeDirectLittleEndian { - - private final long initialCapacity; - private final AtomicLong count; - private final AtomicLong size; - - private AccountedUnsafeDirectLittleEndian(LargeBuffer buf, AtomicLong count, AtomicLong size) { - super(buf); - this.initialCapacity = buf.capacity(); - this.count = count; - this.size = size; - } - - private AccountedUnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf, AtomicLong count, - AtomicLong size) { - super(buf); - this.initialCapacity = buf.capacity(); - this.count = count; - this.size = size; - } - - @Override - public ByteBuf copy() { - throw new UnsupportedOperationException("copy method is not supported"); - } - - @Override - public ByteBuf copy(int index, int length) { - throw new UnsupportedOperationException("copy method is not supported"); - } - - @Override - public boolean release(int decrement) { - boolean released = super.release(decrement); - if (released) { - count.decrementAndGet(); - size.addAndGet(-initialCapacity); - } - return released; - } - - } - - private class InnerAllocator extends PooledByteBufAllocator { - - private final PoolArena[] directArenas; - private final MemoryStatusThread statusThread; - - public InnerAllocator() { - super(true); - - try { - Field f = PooledByteBufAllocator.class.getDeclaredField("directArenas"); - f.setAccessible(true); - this.directArenas = (PoolArena[]) f.get(this); - } catch (Exception e) { - throw new RuntimeException("Failure while initializing allocator. Unable to retrieve direct arenas field.", e); - } - - if (memoryLogger.isTraceEnabled()) { - statusThread = new MemoryStatusThread(this); - statusThread.start(); - } else { - statusThread = null; - } - } - - private UnsafeDirectLittleEndian newDirectBufferL(int initialCapacity, int maxCapacity) { - PoolThreadCache cache = threadCache(); - PoolArena directArena = cache.directArena; - - if (directArena != null) { - - if (initialCapacity > chunkSize()) { - // This is beyond chunk size so we'll allocate separately. - ByteBuf buf = UnpooledByteBufAllocator.DEFAULT.directBuffer(initialCapacity, maxCapacity); - - hugeBufferSize.addAndGet(buf.capacity()); - hugeBufferCount.incrementAndGet(); - - // logger.debug("Allocating huge buffer of size {}", initialCapacity, new Exception()); - return new AccountedUnsafeDirectLittleEndian(new LargeBuffer(buf), hugeBufferCount, - hugeBufferSize); - } else { - // within chunk, use arena. - ByteBuf buf = directArena.allocate(cache, initialCapacity, maxCapacity); - if (!(buf instanceof PooledUnsafeDirectByteBuf)) { - fail(); - } - - if (!ASSERT_ENABLED) { - return new UnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf); - } - - normalBufferSize.addAndGet(buf.capacity()); - normalBufferCount.incrementAndGet(); - - return new AccountedUnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf, - normalBufferCount, normalBufferSize); - } - - } else { - throw fail(); - } - } - - private UnsupportedOperationException fail() { - return new UnsupportedOperationException( - "Arrow requires that the JVM used supports access sun.misc.Unsafe. This platform " + - "didn't provide that functionality."); - } - - @Override - public UnsafeDirectLittleEndian directBuffer(int initialCapacity, int maxCapacity) { - if (initialCapacity == 0 && maxCapacity == 0) { - newDirectBuffer(initialCapacity, maxCapacity); - } - validate(initialCapacity, maxCapacity); - return newDirectBufferL(initialCapacity, maxCapacity); - } - - @Override - public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) { - throw new UnsupportedOperationException("Arrow doesn't support using heap buffers."); - } - - private void validate(int initialCapacity, int maxCapacity) { - if (initialCapacity < 0) { - throw new IllegalArgumentException("initialCapacity: " + initialCapacity + " (expected: 0+)"); - } - if (initialCapacity > maxCapacity) { - throw new IllegalArgumentException(String.format( - "initialCapacity: %d (expected: not greater than maxCapacity(%d)", - initialCapacity, maxCapacity)); - } - } - - @Override - public String toString() { - StringBuilder buf = new StringBuilder(); - buf.append(directArenas.length); - buf.append(" direct arena(s):"); - buf.append(StringUtil.NEWLINE); - for (PoolArena a : directArenas) { - buf.append(a); - } - - buf.append("Large buffers outstanding: "); - buf.append(hugeBufferCount.get()); - buf.append(" totaling "); - buf.append(hugeBufferSize.get()); - buf.append(" bytes."); - buf.append('\n'); - buf.append("Normal buffers outstanding: "); - buf.append(normalBufferCount.get()); - buf.append(" totaling "); - buf.append(normalBufferSize.get()); - buf.append(" bytes."); - return buf.toString(); - } - - private class MemoryStatusThread extends Thread { - private final InnerAllocator allocator; - - public MemoryStatusThread(InnerAllocator allocator) { - super("allocation.logger"); - this.setDaemon(true); - this.allocator = allocator; - } - - @Override - public void run() { - while (true) { - memoryLogger.trace("Memory Usage: \n{}", allocator); - try { - Thread.sleep(MEMORY_LOGGER_FREQUENCY_SECONDS * 1000); - } catch (InterruptedException e) { - return; - } - } - } - } - - } -} diff --git a/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java b/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java deleted file mode 100644 index cad271c49b6b..000000000000 --- a/extensions/google-bigquery/runtime/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java +++ /dev/null @@ -1,261 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.netty.buffer; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.ByteOrder; -import java.util.concurrent.atomic.AtomicLong; - -import io.netty.util.internal.PlatformDependent; - -/** - * The underlying class we use for little-endian access to memory. Is used underneath ArrowBufs - * to abstract away the - * Netty classes and underlying Netty memory management. - */ -public class UnsafeDirectLittleEndian extends WrappedByteBuf { - private static final AtomicLong ID_GENERATOR = new AtomicLong(0); - public final long id = ID_GENERATOR.incrementAndGet(); - private final AbstractByteBuf wrapped; - private final long memoryAddress; - - UnsafeDirectLittleEndian(DuplicatedByteBuf buf) { - this(buf, true); - } - - UnsafeDirectLittleEndian(LargeBuffer buf) { - this(buf, true); - } - - UnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf) { - this(buf, true); - } - - private UnsafeDirectLittleEndian(AbstractByteBuf buf, boolean fake) { - super(buf); - - this.wrapped = buf; - this.memoryAddress = buf.memoryAddress(); - } - - private long addr(int index) { - return memoryAddress + index; - } - - @Override - public long getLong(int index) { - // wrapped.checkIndex(index, 8); - long v = PlatformDependent.getLong(addr(index)); - return v; - } - - @Override - public float getFloat(int index) { - return Float.intBitsToFloat(getInt(index)); - } - - @Override - public ByteBuf slice() { - return slice(this.readerIndex(), readableBytes()); - } - - @Override - public ByteBuf slice(int index, int length) { - return new SlicedByteBuf(this, index, length); - } - - @Override - public ByteBuf order(ByteOrder endianness) { - return this; - } - - @Override - public double getDouble(int index) { - return Double.longBitsToDouble(getLong(index)); - } - - @Override - public char getChar(int index) { - return (char) getShort(index); - } - - @Override - public long getUnsignedInt(int index) { - return getInt(index) & 0xFFFFFFFFL; - } - - @Override - public int getInt(int index) { - int v = PlatformDependent.getInt(addr(index)); - return v; - } - - @Override - public int getUnsignedShort(int index) { - return getShort(index) & 0xFFFF; - } - - @Override - public short getShort(int index) { - short v = PlatformDependent.getShort(addr(index)); - return v; - } - - @Override - public ByteBuf setShort(int index, int value) { - wrapped.checkIndex(index, 2); - setShort_(index, value); - return this; - } - - @Override - public ByteBuf setInt(int index, int value) { - wrapped.checkIndex(index, 4); - setInt_(index, value); - return this; - } - - @Override - public ByteBuf setLong(int index, long value) { - wrapped.checkIndex(index, 8); - setLong_(index, value); - return this; - } - - @Override - public ByteBuf setChar(int index, int value) { - setShort(index, value); - return this; - } - - @Override - public ByteBuf setFloat(int index, float value) { - setInt(index, Float.floatToRawIntBits(value)); - return this; - } - - @Override - public ByteBuf setDouble(int index, double value) { - setLong(index, Double.doubleToRawLongBits(value)); - return this; - } - - @Override - public ByteBuf writeShort(int value) { - wrapped.ensureWritable(2); - setShort_(wrapped.writerIndex, value); - wrapped.writerIndex += 2; - return this; - } - - @Override - public ByteBuf writeInt(int value) { - wrapped.ensureWritable(4); - setInt_(wrapped.writerIndex, value); - wrapped.writerIndex += 4; - return this; - } - - @Override - public ByteBuf writeLong(long value) { - wrapped.ensureWritable(8); - setLong_(wrapped.writerIndex, value); - wrapped.writerIndex += 8; - return this; - } - - @Override - public ByteBuf writeChar(int value) { - writeShort(value); - return this; - } - - @Override - public ByteBuf writeFloat(float value) { - writeInt(Float.floatToRawIntBits(value)); - return this; - } - - @Override - public ByteBuf writeDouble(double value) { - writeLong(Double.doubleToRawLongBits(value)); - return this; - } - - private void setShort_(int index, int value) { - PlatformDependent.putShort(addr(index), (short) value); - } - - private void setInt_(int index, int value) { - PlatformDependent.putInt(addr(index), value); - } - - private void setLong_(int index, long value) { - PlatformDependent.putLong(addr(index), value); - } - - @Override - public byte getByte(int index) { - return PlatformDependent.getByte(addr(index)); - } - - @Override - public ByteBuf setByte(int index, int value) { - PlatformDependent.putByte(addr(index), (byte) value); - return this; - } - - @Override - public boolean release() { - return release(1); - } - - @Override - public int setBytes(int index, InputStream in, int length) throws IOException { - wrapped.checkIndex(index, length); - byte[] tmp = new byte[length]; - int readBytes = in.read(tmp); - if (readBytes > 0) { - PlatformDependent.copyMemory(tmp, 0, addr(index), readBytes); - } - return readBytes; - } - - @Override - public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException { - wrapped.checkIndex(index, length); - if (length != 0) { - byte[] tmp = new byte[length]; - PlatformDependent.copyMemory(addr(index), tmp, 0, length); - out.write(tmp); - } - return this; - } - - @Override - public int hashCode() { - return System.identityHashCode(this); - } - - @Override - public boolean equals(Object obj) { - return this == obj; - } -} From c1752f316a22168e3f36f39449b90f43a5aa60c0 Mon Sep 17 00:00:00 2001 From: James Netherton Date: Mon, 19 Feb 2024 10:46:56 +0000 Subject: [PATCH 2/2] Work around test issues with setting a custom BigQuery host #5734 --- integration-tests/google-bigquery/pom.xml | 2 - .../v2/CustomHostCapableHttpBigQueryRpc.java | 905 ++++++++++++++++++ .../bigquery/it/GoogleBigqueryResource.java | 12 +- .../src/main/resources/application.properties | 19 + .../bigquery/it/GoogleBigqueryTest.java | 2 - 5 files changed, 935 insertions(+), 5 deletions(-) create mode 100644 integration-tests/google-bigquery/src/main/java/com/google/cloud/bigquery/spi/v2/CustomHostCapableHttpBigQueryRpc.java create mode 100644 integration-tests/google-bigquery/src/main/resources/application.properties diff --git a/integration-tests/google-bigquery/pom.xml b/integration-tests/google-bigquery/pom.xml index 55bb995c8195..21fcc27e79dc 100644 --- a/integration-tests/google-bigquery/pom.xml +++ b/integration-tests/google-bigquery/pom.xml @@ -72,7 +72,6 @@ - virtualDependencies diff --git a/integration-tests/google-bigquery/src/main/java/com/google/cloud/bigquery/spi/v2/CustomHostCapableHttpBigQueryRpc.java b/integration-tests/google-bigquery/src/main/java/com/google/cloud/bigquery/spi/v2/CustomHostCapableHttpBigQueryRpc.java new file mode 100644 index 000000000000..bad93d4945d4 --- /dev/null +++ b/integration-tests/google-bigquery/src/main/java/com/google/cloud/bigquery/spi/v2/CustomHostCapableHttpBigQueryRpc.java @@ -0,0 +1,905 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* + * Copyright 2015 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery.spi.v2; + +import java.io.IOException; +import java.math.BigInteger; +import java.util.List; +import java.util.Map; + +import com.google.api.client.http.ByteArrayContent; +import com.google.api.client.http.GenericUrl; +import com.google.api.client.http.HttpRequest; +import com.google.api.client.http.HttpRequestFactory; +import com.google.api.client.http.HttpRequestInitializer; +import com.google.api.client.http.HttpResponse; +import com.google.api.client.http.HttpResponseException; +import com.google.api.client.http.HttpTransport; +import com.google.api.client.http.json.JsonHttpContent; +import com.google.api.client.json.JsonFactory; +import com.google.api.client.json.gson.GsonFactory; +import com.google.api.core.InternalApi; +import com.google.api.core.InternalExtensionOnly; +import com.google.api.services.bigquery.Bigquery; +import com.google.api.services.bigquery.model.Dataset; +import com.google.api.services.bigquery.model.DatasetList; +import com.google.api.services.bigquery.model.DatasetReference; +import com.google.api.services.bigquery.model.GetIamPolicyRequest; +import com.google.api.services.bigquery.model.GetPolicyOptions; +import com.google.api.services.bigquery.model.GetQueryResultsResponse; +import com.google.api.services.bigquery.model.Job; +import com.google.api.services.bigquery.model.JobList; +import com.google.api.services.bigquery.model.JobStatus; +import com.google.api.services.bigquery.model.ListModelsResponse; +import com.google.api.services.bigquery.model.ListRoutinesResponse; +import com.google.api.services.bigquery.model.Model; +import com.google.api.services.bigquery.model.ModelReference; +import com.google.api.services.bigquery.model.Policy; +import com.google.api.services.bigquery.model.QueryRequest; +import com.google.api.services.bigquery.model.QueryResponse; +import com.google.api.services.bigquery.model.Routine; +import com.google.api.services.bigquery.model.RoutineReference; +import com.google.api.services.bigquery.model.SetIamPolicyRequest; +import com.google.api.services.bigquery.model.Table; +import com.google.api.services.bigquery.model.TableDataInsertAllRequest; +import com.google.api.services.bigquery.model.TableDataInsertAllResponse; +import com.google.api.services.bigquery.model.TableDataList; +import com.google.api.services.bigquery.model.TableList; +import com.google.api.services.bigquery.model.TableReference; +import com.google.api.services.bigquery.model.TestIamPermissionsRequest; +import com.google.api.services.bigquery.model.TestIamPermissionsResponse; +import com.google.cloud.Tuple; +import com.google.cloud.bigquery.BigQueryException; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.http.HttpTransportOptions; +import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; + +import static java.net.HttpURLConnection.HTTP_CREATED; +import static java.net.HttpURLConnection.HTTP_NOT_FOUND; +import static java.net.HttpURLConnection.HTTP_OK; + +/** + * TODO: Remove this https://github.com/apache/camel-quarkus/issues/5734 + * + * Used only for testing where WireMock is used. Works around issues setting a custom BigQuery service host: + * + * https://github.com/googleapis/java-bigquery/issues/3125 + */ +@InternalExtensionOnly +public class CustomHostCapableHttpBigQueryRpc implements BigQueryRpc { + + public static final String DEFAULT_PROJECTION = "full"; + private static final String BASE_RESUMABLE_URI = "upload/bigquery/v2/projects/"; + // see: + // https://cloud.google.com/bigquery/loading-data-post-request#resume-upload + private static final int HTTP_RESUME_INCOMPLETE = 308; + private final BigQueryOptions options; + private final Bigquery bigquery; + + @InternalApi("Visible for testing") + static final Function LIST_TO_DATASET = new Function() { + @Override + public Dataset apply(DatasetList.Datasets datasetPb) { + return new Dataset() + .setDatasetReference(datasetPb.getDatasetReference()) + .setFriendlyName(datasetPb.getFriendlyName()) + .setId(datasetPb.getId()) + .setKind(datasetPb.getKind()) + .setLabels(datasetPb.getLabels()); + } + }; + + public CustomHostCapableHttpBigQueryRpc(BigQueryOptions options) { + HttpTransportOptions transportOptions = (HttpTransportOptions) options.getTransportOptions(); + HttpTransport transport = transportOptions.getHttpTransportFactory().create(); + HttpRequestInitializer initializer = transportOptions.getHttpRequestInitializer(options); + this.options = options; + bigquery = new Bigquery.Builder(transport, new GsonFactory(), initializer) + .setRootUrl(options.getHost()) + .setApplicationName(options.getApplicationName()) + .build(); + } + + private static BigQueryException translate(IOException exception) { + return new BigQueryException(exception); + } + + private void validateRPC() throws BigQueryException, IOException { + if (!this.options.hasValidUniverseDomain()) { + throw new BigQueryException(BigQueryException.UNKNOWN_CODE, "Invalid universe domain"); + } + } + + @Override + public Dataset getDataset(String projectId, String datasetId, Map options) { + try { + validateRPC(); + return bigquery + .datasets() + .get(projectId, datasetId) + .setFields(Option.FIELDS.getString(options)) + .setPrettyPrint(false) + .execute(); + } catch (IOException ex) { + BigQueryException serviceException = translate(ex); + if (serviceException.getCode() == HTTP_NOT_FOUND) { + return null; + } + throw serviceException; + } + } + + @Override + public Tuple> listDatasets(String projectId, Map options) { + try { + validateRPC(); + DatasetList datasetsList = bigquery + .datasets() + .list(projectId) + .setPrettyPrint(false) + .setAll(Option.ALL_DATASETS.getBoolean(options)) + .setFilter(Option.LABEL_FILTER.getString(options)) + .setMaxResults(Option.MAX_RESULTS.getLong(options)) + .setPageToken(Option.PAGE_TOKEN.getString(options)) + .execute(); + Iterable datasets = datasetsList.getDatasets(); + return Tuple.of( + datasetsList.getNextPageToken(), + Iterables.transform( + datasets != null ? datasets : ImmutableList. of(), + LIST_TO_DATASET)); + } catch (IOException ex) { + throw translate(ex); + } + } + + @Override + public Dataset create(Dataset dataset, Map options) { + try { + validateRPC(); + return bigquery + .datasets() + .insert(dataset.getDatasetReference().getProjectId(), dataset) + .setPrettyPrint(false) + .setFields(Option.FIELDS.getString(options)) + .execute(); + } catch (IOException ex) { + throw translate(ex); + } + } + + @Override + public Table create(Table table, Map options) { + try { + validateRPC(); + // unset the type, as it is output only + table.setType(null); + TableReference reference = table.getTableReference(); + return bigquery + .tables() + .insert(reference.getProjectId(), reference.getDatasetId(), table) + .setPrettyPrint(false) + .setFields(Option.FIELDS.getString(options)) + .execute(); + } catch (IOException ex) { + throw translate(ex); + } + } + + @Override + public Routine create(Routine routine, Map options) { + try { + validateRPC(); + RoutineReference reference = routine.getRoutineReference(); + return bigquery + .routines() + .insert(reference.getProjectId(), reference.getDatasetId(), routine) + .setPrettyPrint(false) + .setFields(Option.FIELDS.getString(options)) + .execute(); + } catch (IOException ex) { + throw translate(ex); + } + } + + @Override + public Job create(Job job, Map options) { + try { + validateRPC(); + String projectId = job.getJobReference() != null + ? job.getJobReference().getProjectId() + : this.options.getProjectId(); + return bigquery + .jobs() + .insert(projectId, job) + .setPrettyPrint(false) + .setFields(Option.FIELDS.getString(options)) + .execute(); + } catch (IOException ex) { + throw translate(ex); + } + } + + @Override + public Job createJobForQuery(Job job) { + try { + validateRPC(); + String projectId = job.getJobReference() != null + ? job.getJobReference().getProjectId() + : this.options.getProjectId(); + return bigquery.jobs().insert(projectId, job).setPrettyPrint(false).execute(); + } catch (IOException ex) { + throw translate(ex); + } + } + + @Override + public boolean deleteDataset(String projectId, String datasetId, Map options) { + try { + validateRPC(); + bigquery + .datasets() + .delete(projectId, datasetId) + .setPrettyPrint(false) + .setDeleteContents(Option.DELETE_CONTENTS.getBoolean(options)) + .execute(); + return true; + } catch (IOException ex) { + BigQueryException serviceException = translate(ex); + if (serviceException.getCode() == HTTP_NOT_FOUND) { + return false; + } + throw serviceException; + } + } + + @Override + public Dataset patch(Dataset dataset, Map options) { + try { + validateRPC(); + DatasetReference reference = dataset.getDatasetReference(); + return bigquery + .datasets() + .patch(reference.getProjectId(), reference.getDatasetId(), dataset) + .setPrettyPrint(false) + .setFields(Option.FIELDS.getString(options)) + .execute(); + } catch (IOException ex) { + throw translate(ex); + } + } + + @Override + public Table patch(Table table, Map options) { + try { + validateRPC(); + // unset the type, as it is output only + table.setType(null); + TableReference reference = table.getTableReference(); + return bigquery + .tables() + .patch(reference.getProjectId(), reference.getDatasetId(), reference.getTableId(), table) + .setPrettyPrint(false) + .setFields(Option.FIELDS.getString(options)) + .setAutodetectSchema(BigQueryRpc.Option.AUTODETECT_SCHEMA.getBoolean(options)) + .execute(); + } catch (IOException ex) { + throw translate(ex); + } + } + + @Override + public Table getTable( + String projectId, String datasetId, String tableId, Map options) { + try { + validateRPC(); + return bigquery + .tables() + .get(projectId, datasetId, tableId) + .setPrettyPrint(false) + .setFields(Option.FIELDS.getString(options)) + .setView(getTableMetadataOption(options)) + .execute(); + } catch (IOException ex) { + BigQueryException serviceException = translate(ex); + if (serviceException.getCode() == HTTP_NOT_FOUND) { + return null; + } + throw serviceException; + } + } + + private String getTableMetadataOption(Map options) { + if (options.containsKey(Option.TABLE_METADATA_VIEW)) { + return options.get(Option.TABLE_METADATA_VIEW).toString(); + } + return "STORAGE_STATS"; + } + + @Override + public Tuple> listTables( + String projectId, String datasetId, Map options) { + try { + validateRPC(); + TableList tableList = bigquery + .tables() + .list(projectId, datasetId) + .setPrettyPrint(false) + .setMaxResults(Option.MAX_RESULTS.getLong(options)) + .setPageToken(Option.PAGE_TOKEN.getString(options)) + .execute(); + Iterable tables = tableList.getTables(); + return Tuple.of( + tableList.getNextPageToken(), + Iterables.transform( + tables != null ? tables : ImmutableList. of(), + new Function() { + @Override + public Table apply(TableList.Tables tablePb) { + return new Table() + .setFriendlyName(tablePb.getFriendlyName()) + .setId(tablePb.getId()) + .setKind(tablePb.getKind()) + .setTableReference(tablePb.getTableReference()) + .setType(tablePb.getType()) + .setCreationTime(tablePb.getCreationTime()) + .setTimePartitioning(tablePb.getTimePartitioning()) + .setRangePartitioning(tablePb.getRangePartitioning()); + } + })); + } catch (IOException ex) { + throw translate(ex); + } + } + + @Override + public boolean deleteTable(String projectId, String datasetId, String tableId) { + try { + validateRPC(); + bigquery.tables().delete(projectId, datasetId, tableId).execute(); + return true; + } catch (IOException ex) { + BigQueryException serviceException = translate(ex); + if (serviceException.getCode() == HTTP_NOT_FOUND) { + return false; + } + throw serviceException; + } + } + + @Override + public Model patch(Model model, Map options) { + try { + validateRPC(); + // unset the type, as it is output only + ModelReference reference = model.getModelReference(); + return bigquery + .models() + .patch(reference.getProjectId(), reference.getDatasetId(), reference.getModelId(), model) + .setPrettyPrint(false) + .setFields(Option.FIELDS.getString(options)) + .execute(); + } catch (IOException ex) { + throw translate(ex); + } + } + + @Override + public Model getModel( + String projectId, String datasetId, String modelId, Map options) { + try { + validateRPC(); + return bigquery + .models() + .get(projectId, datasetId, modelId) + .setPrettyPrint(false) + .setFields(Option.FIELDS.getString(options)) + .execute(); + } catch (IOException ex) { + BigQueryException serviceException = translate(ex); + if (serviceException.getCode() == HTTP_NOT_FOUND) { + return null; + } + throw serviceException; + } + } + + @Override + public Tuple> listModels( + String projectId, String datasetId, Map options) { + try { + validateRPC(); + ListModelsResponse modelList = bigquery + .models() + .list(projectId, datasetId) + .setPrettyPrint(false) + .setMaxResults(Option.MAX_RESULTS.getLong(options)) + .setPageToken(Option.PAGE_TOKEN.getString(options)) + .execute(); + Iterable models = modelList.getModels() != null ? modelList.getModels() : ImmutableList. of(); + return Tuple.of(modelList.getNextPageToken(), models); + } catch (IOException ex) { + throw translate(ex); + } + } + + @Override + public boolean deleteModel(String projectId, String datasetId, String modelId) { + try { + validateRPC(); + bigquery.models().delete(projectId, datasetId, modelId).execute(); + return true; + } catch (IOException ex) { + BigQueryException serviceException = translate(ex); + if (serviceException.getCode() == HTTP_NOT_FOUND) { + return false; + } + throw serviceException; + } + } + + @Override + public Routine update(Routine routine, Map options) { + try { + validateRPC(); + RoutineReference reference = routine.getRoutineReference(); + return bigquery + .routines() + .update( + reference.getProjectId(), reference.getDatasetId(), reference.getRoutineId(), routine) + .setPrettyPrint(false) + .setFields(Option.FIELDS.getString(options)) + .execute(); + } catch (IOException ex) { + throw translate(ex); + } + } + + @Override + public Routine getRoutine( + String projectId, String datasetId, String routineId, Map options) { + try { + validateRPC(); + return bigquery + .routines() + .get(projectId, datasetId, routineId) + .setPrettyPrint(false) + .setFields(Option.FIELDS.getString(options)) + .execute(); + } catch (IOException ex) { + BigQueryException serviceException = translate(ex); + if (serviceException.getCode() == HTTP_NOT_FOUND) { + return null; + } + throw serviceException; + } + } + + @Override + public Tuple> listRoutines( + String projectId, String datasetId, Map options) { + try { + validateRPC(); + ListRoutinesResponse routineList = bigquery + .routines() + .list(projectId, datasetId) + .setPrettyPrint(false) + .setMaxResults(Option.MAX_RESULTS.getLong(options)) + .setPageToken(Option.PAGE_TOKEN.getString(options)) + .execute(); + Iterable routines = routineList.getRoutines() != null + ? routineList.getRoutines() + : ImmutableList. of(); + return Tuple.of(routineList.getNextPageToken(), routines); + } catch (IOException ex) { + throw translate(ex); + } + } + + @Override + public boolean deleteRoutine(String projectId, String datasetId, String routineId) { + try { + validateRPC(); + bigquery.routines().delete(projectId, datasetId, routineId).execute(); + return true; + } catch (IOException ex) { + BigQueryException serviceException = translate(ex); + if (serviceException.getCode() == HTTP_NOT_FOUND) { + return false; + } + throw serviceException; + } + } + + @Override + public TableDataInsertAllResponse insertAll( + String projectId, String datasetId, String tableId, TableDataInsertAllRequest request) { + try { + validateRPC(); + return bigquery + .tabledata() + .insertAll(projectId, datasetId, tableId, request) + .setPrettyPrint(false) + .execute(); + } catch (IOException ex) { + throw translate(ex); + } + } + + @Override + public TableDataList listTableData( + String projectId, String datasetId, String tableId, Map options) { + try { + validateRPC(); + return bigquery + .tabledata() + .list(projectId, datasetId, tableId) + .setPrettyPrint(false) + .setMaxResults(Option.MAX_RESULTS.getLong(options)) + .setPageToken(Option.PAGE_TOKEN.getString(options)) + .setStartIndex( + Option.START_INDEX.getLong(options) != null + ? BigInteger.valueOf(Option.START_INDEX.getLong(options)) + : null) + .execute(); + } catch (IOException ex) { + throw translate(ex); + } + } + + @Override + public TableDataList listTableDataWithRowLimit( + String projectId, + String datasetId, + String tableId, + Integer maxResultPerPage, + String pageToken) { + try { + validateRPC(); + return bigquery + .tabledata() + .list(projectId, datasetId, tableId) + .setPrettyPrint(false) + .setMaxResults(Long.valueOf(maxResultPerPage)) + .setPageToken(pageToken) + .execute(); + } catch (IOException ex) { + throw translate(ex); + } + } + + @Override + public Job getJob(String projectId, String jobId, String location, Map options) { + try { + validateRPC(); + return bigquery + .jobs() + .get(projectId, jobId) + .setPrettyPrint(false) + .setLocation(location) + .setFields(Option.FIELDS.getString(options)) + .execute(); + } catch (IOException ex) { + BigQueryException serviceException = translate(ex); + if (serviceException.getCode() == HTTP_NOT_FOUND) { + return null; + } + throw serviceException; + } + } + + @Override + public Job getQueryJob(String projectId, String jobId, String location) { + try { + validateRPC(); + return bigquery + .jobs() + .get(projectId, jobId) + .setPrettyPrint(false) + .setLocation(location) + .execute(); + } catch (IOException ex) { + BigQueryException serviceException = translate(ex); + if (serviceException.getCode() == HTTP_NOT_FOUND) { + return null; + } + throw serviceException; + } + } + + @Override + public Tuple> listJobs(String projectId, Map options) { + try { + validateRPC(); + Bigquery.Jobs.List request = bigquery + .jobs() + .list(projectId) + .setPrettyPrint(false) + .setAllUsers(Option.ALL_USERS.getBoolean(options)) + .setFields(Option.FIELDS.getString(options)) + .setStateFilter(Option.STATE_FILTER.> get(options)) + .setMaxResults(Option.MAX_RESULTS.getLong(options)) + .setPageToken(Option.PAGE_TOKEN.getString(options)) + .setProjection(DEFAULT_PROJECTION) + .setParentJobId(Option.PARENT_JOB_ID.getString(options)); + if (Option.MIN_CREATION_TIME.getLong(options) != null) { + request.setMinCreationTime(BigInteger.valueOf(Option.MIN_CREATION_TIME.getLong(options))); + } + if (Option.MAX_CREATION_TIME.getLong(options) != null) { + request.setMaxCreationTime(BigInteger.valueOf(Option.MAX_CREATION_TIME.getLong(options))); + } + JobList jobsList = request.execute(); + + Iterable jobs = jobsList.getJobs(); + return Tuple.of( + jobsList.getNextPageToken(), + Iterables.transform( + jobs != null ? jobs : ImmutableList. of(), + new Function() { + @Override + public Job apply(JobList.Jobs jobPb) { + JobStatus statusPb = jobPb.getStatus() != null ? jobPb.getStatus() : new JobStatus(); + if (statusPb.getState() == null) { + statusPb.setState(jobPb.getState()); + } + if (statusPb.getErrorResult() == null) { + statusPb.setErrorResult(jobPb.getErrorResult()); + } + return new Job() + .setConfiguration(jobPb.getConfiguration()) + .setId(jobPb.getId()) + .setJobReference(jobPb.getJobReference()) + .setKind(jobPb.getKind()) + .setStatistics(jobPb.getStatistics()) + .setStatus(statusPb) + .setUserEmail(jobPb.getUserEmail()); + } + })); + } catch (IOException ex) { + throw translate(ex); + } + } + + @Override + public boolean cancel(String projectId, String jobId, String location) { + try { + validateRPC(); + bigquery + .jobs() + .cancel(projectId, jobId) + .setLocation(location) + .setPrettyPrint(false) + .execute(); + return true; + } catch (IOException ex) { + BigQueryException serviceException = translate(ex); + if (serviceException.getCode() == HTTP_NOT_FOUND) { + return false; + } + throw serviceException; + } + } + + @Override + public boolean deleteJob(String projectId, String jobName, String location) { + try { + validateRPC(); + bigquery + .jobs() + .delete(projectId, jobName) + .setLocation(location) + .setPrettyPrint(false) + .execute(); + return true; + } catch (IOException ex) { + throw translate(ex); + } + } + + @Override + public GetQueryResultsResponse getQueryResults( + String projectId, String jobId, String location, Map options) { + try { + validateRPC(); + return bigquery + .jobs() + .getQueryResults(projectId, jobId) + .setPrettyPrint(false) + .setLocation(location) + .setMaxResults(Option.MAX_RESULTS.getLong(options)) + .setPageToken(Option.PAGE_TOKEN.getString(options)) + .setStartIndex( + Option.START_INDEX.getLong(options) != null + ? BigInteger.valueOf(Option.START_INDEX.getLong(options)) + : null) + .setTimeoutMs(Option.TIMEOUT.getLong(options)) + .execute(); + } catch (IOException ex) { + throw translate(ex); + } + } + + @Override + public GetQueryResultsResponse getQueryResultsWithRowLimit( + String projectId, String jobId, String location, Integer maxResultPerPage, Long timeoutMs) { + try { + validateRPC(); + return bigquery + .jobs() + .getQueryResults(projectId, jobId) + .setPrettyPrint(false) + .setLocation(location) + .setMaxResults(Long.valueOf(maxResultPerPage)) + .setTimeoutMs(timeoutMs) + .execute(); + } catch (IOException ex) { + throw translate(ex); + } + } + + @Override + public QueryResponse queryRpc(String projectId, QueryRequest content) { + try { + validateRPC(); + return bigquery.jobs().query(projectId, content).execute(); + } catch (IOException ex) { + throw translate(ex); + } + } + + @Override + public String open(Job loadJob) { + try { + String builder = options.getResolvedApiaryHost("bigquery"); + if (!builder.endsWith("/")) { + builder += "/"; + } + builder += BASE_RESUMABLE_URI + options.getProjectId() + "/jobs"; + GenericUrl url = new GenericUrl(builder); + url.set("uploadType", "resumable"); + JsonFactory jsonFactory = bigquery.getJsonFactory(); + HttpRequestFactory requestFactory = bigquery.getRequestFactory(); + HttpRequest httpRequest = requestFactory.buildPostRequest(url, new JsonHttpContent(jsonFactory, loadJob)); + httpRequest.getHeaders().set("X-Upload-Content-Value", "application/octet-stream"); + HttpResponse response = httpRequest.execute(); + return response.getHeaders().getLocation(); + } catch (IOException ex) { + throw translate(ex); + } + } + + @Override + public Job write( + String uploadId, + byte[] toWrite, + int toWriteOffset, + long destOffset, + int length, + boolean last) { + try { + if (length == 0) { + return null; + } + GenericUrl url = new GenericUrl(uploadId); + HttpRequest httpRequest = bigquery + .getRequestFactory() + .buildPutRequest(url, new ByteArrayContent(null, toWrite, toWriteOffset, length)); + httpRequest.setParser(bigquery.getObjectParser()); + long limit = destOffset + length; + StringBuilder range = new StringBuilder("bytes "); + range.append(destOffset).append('-').append(limit - 1).append('/'); + if (last) { + range.append(limit); + } else { + range.append('*'); + } + httpRequest.getHeaders().setContentRange(range.toString()); + int code; + String message; + IOException exception = null; + HttpResponse response = null; + try { + response = httpRequest.execute(); + code = response.getStatusCode(); + message = response.getStatusMessage(); + } catch (HttpResponseException ex) { + exception = ex; + code = ex.getStatusCode(); + message = ex.getStatusMessage(); + } + if (!last && code != HTTP_RESUME_INCOMPLETE + || last && !(code == HTTP_OK || code == HTTP_CREATED)) { + if (exception != null) { + throw exception; + } + throw new BigQueryException(code, message); + } + return last && response != null ? response.parseAs(Job.class) : null; + } catch (IOException ex) { + throw translate(ex); + } + } + + @Override + public Policy getIamPolicy(String resourceId, Map options) { + try { + validateRPC(); + GetIamPolicyRequest policyRequest = new GetIamPolicyRequest(); + if (null != Option.REQUESTED_POLICY_VERSION.getLong(options)) { + policyRequest = policyRequest.setOptions( + new GetPolicyOptions() + .setRequestedPolicyVersion( + Option.REQUESTED_POLICY_VERSION.getLong(options).intValue())); + } + return bigquery + .tables() + .getIamPolicy(resourceId, policyRequest) + .setPrettyPrint(false) + .execute(); + } catch (IOException ex) { + throw translate(ex); + } + } + + @Override + public Policy setIamPolicy(String resourceId, Policy policy, Map options) { + try { + validateRPC(); + SetIamPolicyRequest policyRequest = new SetIamPolicyRequest().setPolicy(policy); + return bigquery + .tables() + .setIamPolicy(resourceId, policyRequest) + .setPrettyPrint(false) + .execute(); + } catch (IOException ex) { + throw translate(ex); + } + } + + @Override + public TestIamPermissionsResponse testIamPermissions( + String resourceId, List permissions, Map options) { + try { + validateRPC(); + TestIamPermissionsRequest permissionsRequest = new TestIamPermissionsRequest().setPermissions(permissions); + return bigquery + .tables() + .testIamPermissions(resourceId, permissionsRequest) + .setPrettyPrint(false) + .execute(); + } catch (IOException ex) { + throw translate(ex); + } + } +} diff --git a/integration-tests/google-bigquery/src/main/java/org/apache/camel/quarkus/component/google/bigquery/it/GoogleBigqueryResource.java b/integration-tests/google-bigquery/src/main/java/org/apache/camel/quarkus/component/google/bigquery/it/GoogleBigqueryResource.java index a86993e814c9..94220df9ac45 100644 --- a/integration-tests/google-bigquery/src/main/java/org/apache/camel/quarkus/component/google/bigquery/it/GoogleBigqueryResource.java +++ b/integration-tests/google-bigquery/src/main/java/org/apache/camel/quarkus/component/google/bigquery/it/GoogleBigqueryResource.java @@ -30,9 +30,12 @@ import com.google.auth.oauth2.GoogleCredentials; import com.google.auth.oauth2.ServiceAccountCredentials; import com.google.cloud.NoCredentials; +import com.google.cloud.ServiceRpc; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryOptions; import com.google.cloud.bigquery.JobId; +import com.google.cloud.bigquery.spi.BigQueryRpcFactory; +import com.google.cloud.bigquery.spi.v2.CustomHostCapableHttpBigQueryRpc; import jakarta.inject.Inject; import jakarta.inject.Named; import jakarta.inject.Singleton; @@ -159,7 +162,14 @@ public synchronized BigQuery getDefaultClient() throws Exception { if (host != null) { builder.setHost(host) - .setLocation(host); + .setLocation(host) + // TODO: Remove this https://github.com/apache/camel-quarkus/issues/5734 + .setServiceRpcFactory(new BigQueryRpcFactory() { + @Override + public ServiceRpc create(BigQueryOptions options) { + return new CustomHostCapableHttpBigQueryRpc(options); + } + }); } if (credentialsPath == null) { diff --git a/integration-tests/google-bigquery/src/main/resources/application.properties b/integration-tests/google-bigquery/src/main/resources/application.properties new file mode 100644 index 000000000000..910ab6e6193b --- /dev/null +++ b/integration-tests/google-bigquery/src/main/resources/application.properties @@ -0,0 +1,19 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- + +# TODO: Remove this https://github.com/apache/camel-quarkus/issues/5734 +quarkus.test.flat-class-path = true \ No newline at end of file diff --git a/integration-tests/google-bigquery/src/test/java/org/apache/camel/quarkus/component/google/bigquery/it/GoogleBigqueryTest.java b/integration-tests/google-bigquery/src/test/java/org/apache/camel/quarkus/component/google/bigquery/it/GoogleBigqueryTest.java index 721ad6aece8c..8d0da04932f9 100644 --- a/integration-tests/google-bigquery/src/test/java/org/apache/camel/quarkus/component/google/bigquery/it/GoogleBigqueryTest.java +++ b/integration-tests/google-bigquery/src/test/java/org/apache/camel/quarkus/component/google/bigquery/it/GoogleBigqueryTest.java @@ -44,7 +44,6 @@ import org.apache.camel.quarkus.test.support.google.GoogleProperty; import org.apache.camel.util.CollectionHelper; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import static org.apache.camel.util.CollectionHelper.mapOf; @@ -55,7 +54,6 @@ @TestHTTPEndpoint(GoogleBigqueryResource.class) @QuarkusTestResource(GoogleBigqueryWiremockTestResource.class) @QuarkusTestResource(GoogleCloudTestResource.class) -@Disabled //https://github.com/apache/camel-quarkus/issues/5734 class GoogleBigqueryTest { @GoogleProperty(name = "project.id")