diff --git a/src/main/java/com/amazon/redshift/RedshiftConnection.java b/src/main/java/com/amazon/redshift/RedshiftConnection.java index 6f82139..a8fad6c 100644 --- a/src/main/java/com/amazon/redshift/RedshiftConnection.java +++ b/src/main/java/com/amazon/redshift/RedshiftConnection.java @@ -52,6 +52,8 @@ public interface RedshiftConnection { */ RedshiftNotification[] getNotifications() throws SQLException; + Long getBytesReadFromStream(); + /** * This method returns any notifications that have been received since the last call to this * method. Returns null if there have been no notifications. A timeout can be specified so the diff --git a/src/main/java/com/amazon/redshift/RedshiftProperty.java b/src/main/java/com/amazon/redshift/RedshiftProperty.java index fa5e5cf..f2db58c 100644 --- a/src/main/java/com/amazon/redshift/RedshiftProperty.java +++ b/src/main/java/com/amazon/redshift/RedshiftProperty.java @@ -122,6 +122,14 @@ public enum RedshiftProperty { false, new String[] {"true", "false"}), + /** + *

The compression method used.

+ */ + COMPRESSION( + "compression", + "lz4:1", + "The compression method used for wire protocol communication between Redshift server and the client/driver"), + /** *

The timeout value used for socket connect operations. If connecting to the server takes longer * than this value, the connection is broken.

diff --git a/src/main/java/com/amazon/redshift/core/CompressedInputStream.java b/src/main/java/com/amazon/redshift/core/CompressedInputStream.java new file mode 100644 index 0000000..5ff960b --- /dev/null +++ b/src/main/java/com/amazon/redshift/core/CompressedInputStream.java @@ -0,0 +1,397 @@ +package com.amazon.redshift.core; + +import com.amazon.redshift.logger.LogLevel; +import com.amazon.redshift.logger.RedshiftLogger; +import com.amazon.redshift.Driver; + +import java.lang.Math; +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; + +public class CompressedInputStream extends InputStream { + private final InputStream wrapped; + private static final int LZ4_MAX_MESSAGE_SIZE = 16 * 1024; + private static final int LZ4_RING_BUFFER_SIZE = 64 * 1024; + // 80KB as per buffer size on server + private static final int BUFFER_SIZE = 80 * 1024; + private byte[] buffer; + private final RedshiftLogger logger; + private byte[] decompress_buffer; + private int decompress_buffer_offset; + + private int bytes_before_next_message = 0; + private int next_byte = 0; + private int next_empty_byte = 0; + + private long compressedBytesReadFromStream = 0; + + public CompressedInputStream(InputStream in, RedshiftLogger in_logger) { + wrapped = in; + logger = in_logger; + decompress_buffer = null; + decompress_buffer_offset = 0; + buffer = new byte[BUFFER_SIZE]; + } + + public int read() throws IOException { + int readResult; + do { + readResult = tryReadMessage(); + if (readResult < 0) + return readResult; + } while (readResult == 1); + bytes_before_next_message--; + return buffer[next_byte++]; + } + + public long getBytesReadFromStream() + { + return compressedBytesReadFromStream; + } + + static final int MIN_MATCH = 4; // minimum length of a match + + /** + * Implementation of lz4 decompression. Curently I could not make any library to do stream decompression + * as is required by LZ4_decompress_safe_continue(). + */ + public static int lz4_decompress(byte[] compressed, int position, int compressedLen, byte[] dest, int dOff, RedshiftLogger logger) throws IOException { + final int destEnd = dest.length; + int startOff = dOff; + compressedLen += position; + + do + { + // literals + final int token = compressed[position++] & 0xFF; + int literalLen = token >>> 4; + + if (literalLen != 0) + { + if (literalLen == 0x0F) + { + byte len; + while ((len = compressed[position++]) == (byte) 0xFF) + { + literalLen += 0xFF; + } + literalLen += len & 0xFF; + } + for (int i = 0; i < literalLen; i++) + dest[dOff + i] = compressed[position++]; + dOff += literalLen; + } + + if (position >= compressedLen) + { + break; + } + + // matches + int a = compressed[position++] & 0xFF; + int b = compressed[position++] & 0xFF; + final int matchDec = (a) | (b << 8); + assert matchDec > 0; + + int matchLen = token & 0x0F; + if (matchLen == 0x0F) + { + int len; + while ((len = compressed[position++]) == (byte) 0xFF) + { + matchLen += 0xFF; + } + matchLen += len & 0xFF; + } + matchLen += MIN_MATCH; + + // copying a multiple of 8 bytes can make decompression from 5% to 10% faster + final int fastLen = (matchLen + 7) & 0xFFFFFFF8; + if (matchDec < matchLen || dOff + fastLen > destEnd) + { + // overlap -> naive incremental copy + for (int ref = dOff - matchDec, end = dOff + matchLen; dOff < end; ++ref, ++dOff) + { + dest[dOff] = dest[ref]; + } + // Note(xformmm): here we should use memcpy instead of byte loop as we do in + // https://github.com/postgres/postgres/commit/c60e520f + } + else + { + // no overlap -> arraycopy + try + { + System.arraycopy(dest, dOff - matchDec, dest, dOff, fastLen); + } + catch (Exception e) + { + if(RedshiftLogger.isEnable()) + { + logger.logInfo("matchDec : " + matchDec); + logger.logInfo("matchLen : " + matchLen); + + Integer initialSourcePosition = dOff - matchDec; + Integer initialDestinationPosition = dOff; + Integer length = fastLen; + Integer lastSourcePosition = initialSourcePosition + length - 1; + Integer lastDestinationPosition = initialDestinationPosition + length - 1; + + logger.logInfo("initialSourcePosition : " + initialSourcePosition); + logger.logInfo("initialDestinationPosition : " + initialDestinationPosition); + logger.logInfo("length : " + length); + logger.logInfo("lastSourcePosition : " + lastSourcePosition); + logger.logInfo("lastDestinationPosition : " + lastDestinationPosition); + logger.logInfo("buffer length : " + dest.length); + } + + throw e; + } + dOff += matchLen; + } + } while (position < compressedLen); + + return dOff - startOff; + } + + /** + * Ensures that we have at least one byte and checks if it is compressed message + * returns 1 if caller have to repeat () + */ + private int tryReadMessage() throws IOException + { + if (bytes_before_next_message == 0) + { + if (!readFromNetwork(5)) + { + if(RedshiftLogger.isEnable()) + { + logger.logInfo("Not yet ready to read from network"); + } + + return -1; + } + byte msg_type = buffer[next_byte]; + next_byte++; // Consume message type from stream + int msgSize = ntoh32(); + if (msg_type == 'k' || msg_type == 'z') + { + if (RedshiftLogger.isEnable()) + { + if (msg_type == 'z') + logger.log(LogLevel.DEBUG, "Compression-aware server, Compression acknowledged"); + else if (msg_type == 'k') + logger.log(LogLevel.DEBUG, "Set Compression method"); + } + /* + * SetCompressionMessageType or CompressionAckMessage + * We must restart decompression codec and discard rest of the message. + */ + if (!readFromNetwork(msgSize)) + { + if(RedshiftLogger.isEnable()) + { + logger.logInfo("Not yet ready to read from network"); + } + + return -1; + } + next_byte += msgSize; + + if (decompress_buffer == null) + decompress_buffer = new byte[LZ4_MAX_MESSAGE_SIZE + 2 * LZ4_RING_BUFFER_SIZE]; + decompress_buffer_offset = 0; + /* We still have bytes_before_next_message == 0 - next packet is coming */ + return 1; + } + else if (msg_type == 'm') + { + /* + * CompressedData + * Decompress everything and add data to buffer + */ + next_byte--; // return pointer to the beginning of message + msgSize++; // account message type byte with message + + if (!readFromNetwork(msgSize)) + { + if(RedshiftLogger.isEnable()) + { + logger.logInfo("Not yet ready to read from network"); + } + + return -1; + } + ensureCapacity(LZ4_MAX_MESSAGE_SIZE); + int decompressSize = lz4_decompress(buffer, next_byte + 5, msgSize - 5, + decompress_buffer, decompress_buffer_offset, logger); + + if (decompressSize < 0) + { + if (RedshiftLogger.isEnable()) + { + logger.logError("Decompressed message has a negative size"); + } + + return decompressSize; // Error happened + } + + /* Shift data after current compressed message */ + try + { + if (decompressSize + next_empty_byte - msgSize > buffer.length) + { + // Reallocate buffer size to avoid overflowing. This is a fallback to prevent errors. + Integer bufferSizeMultiplier = ((decompressSize + next_empty_byte - msgSize) / buffer.length) + 1; + buffer = Arrays.copyOf(buffer, buffer.length * bufferSizeMultiplier); + } + + System.arraycopy(buffer, next_byte + msgSize, buffer, + next_byte + decompressSize, next_empty_byte - next_byte - msgSize); + } + catch (Exception e) + { + if (RedshiftLogger.isEnable()) + { + Integer bufferLength = buffer.length; + Integer initialSourcePosition = next_byte + msgSize; + Integer initialDestinationPosition = next_byte + decompressSize; + Integer length = next_empty_byte - next_byte - msgSize + 1; + Integer lastSourcePosition = initialSourcePosition + length - 1; + Integer lastDestinationPosition = initialDestinationPosition + length - 1; + + logger.logDebug("next_byte : " + next_byte); + logger.logDebug("msgSize : " + msgSize); + logger.logDebug("decompressSize : " + decompressSize); + logger.logDebug("next_empty_byte : " + next_empty_byte); + logger.logDebug("initialSourcePosition : " + initialSourcePosition); + logger.logDebug("initialDestinationPosition : " + initialDestinationPosition); + logger.logDebug("length : " + length); + logger.logDebug("lastSourcePosition : " + lastSourcePosition); + logger.logDebug("lastDestinationPosition : " + lastDestinationPosition); + logger.logDebug("buffer length : " + bufferLength); + } + + throw e; + } + + byte[] decompressedData = new byte[decompressSize]; + + for (int i = 0; i < decompressSize; i++) + { + decompressedData[i] = decompress_buffer[decompress_buffer_offset]; + } + + /* Fit decompressed data in */ + System.arraycopy(decompress_buffer, decompress_buffer_offset, buffer, next_byte, decompressSize); + + /* Adjust all counters */ + next_empty_byte = next_empty_byte - msgSize + decompressSize; + decompress_buffer_offset += decompressSize; + bytes_before_next_message = decompressSize; + + /* shift decompression buffer if necessary */ + if (decompress_buffer_offset >= 2 * LZ4_RING_BUFFER_SIZE) + { + System.arraycopy(decompress_buffer, LZ4_RING_BUFFER_SIZE, decompress_buffer, 0, + LZ4_RING_BUFFER_SIZE + LZ4_MAX_MESSAGE_SIZE); + decompress_buffer_offset -= LZ4_RING_BUFFER_SIZE; + } + return 0; + } + else + { + next_byte--; // Return message type byte to the stream + bytes_before_next_message += msgSize + 1; // Scroll through next message + } + } + /* Ensure at least one byte is ready for the client */ + if (!readFromNetwork(1)) + { + if (RedshiftLogger.isEnable()) + { + logger.logInfo("Not yet ready to read from network"); + } + + return -1; + } + return 0; + } + + /** + * Read 32-bit integer in network format. + * This function assumes 4 bytes in buffer. + */ + private int ntoh32() { + return ((buffer[next_byte] & 0xFF) << 24) + ((buffer[next_byte + 1] & 0xFF) << 16) + + ((buffer[next_byte + 2] & 0xFF) << 8) + (buffer[next_byte + 3] & 0xFF); + } + + /* Ensures at least min bytes fetched from network */ + private boolean readFromNetwork(int min) throws IOException { + while (next_empty_byte - next_byte < min) { + /* Make some room if we are out of empty space */ + ensureCapacity(min); + int read = wrapped.read(buffer, next_empty_byte, buffer.length - next_empty_byte); + if(read > 0) + { + compressedBytesReadFromStream += read; + } + if (read < 0) + return false; + next_empty_byte += read; + } + + return true; + } + + /* Prevents buffer overflow when reading on the edge of buffer */ + private void ensureCapacity(int min) { + if (next_empty_byte + min >= buffer.length) { + next_empty_byte = next_empty_byte - next_byte; + for (int i = 0; i < next_empty_byte; i++) { + buffer[i] = buffer[i + next_byte]; + } + next_byte = 0; + } + } + + public void close() throws IOException { + wrapped.close(); + } + + @Override + public int available() throws IOException { + return Math.min(next_empty_byte - next_byte, bytes_before_next_message); + } + + @Override + public long skip(long n) throws IOException { + int readResult; + do { + readResult = tryReadMessage(); + if (readResult < 0) + return readResult; + } while (readResult == 1); + long available = Math.min(available(), n); + next_byte += available; + bytes_before_next_message -= available; + return available; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + int readResult; + do { + readResult = tryReadMessage(); + if (readResult < 0) + return readResult; + } while (readResult == 1); + int available = Math.min(available(), len); + System.arraycopy(buffer, next_byte, b, off, available); + next_byte += available; + bytes_before_next_message -= available; + return available; + } +} diff --git a/src/main/java/com/amazon/redshift/core/QueryExecutorBase.java b/src/main/java/com/amazon/redshift/core/QueryExecutorBase.java index 4897d8c..f26bbfc 100644 --- a/src/main/java/com/amazon/redshift/core/QueryExecutorBase.java +++ b/src/main/java/com/amazon/redshift/core/QueryExecutorBase.java @@ -68,7 +68,9 @@ public abstract class QueryExecutorBase implements QueryExecutor { protected int serverProtocolVersion; protected boolean datashareEnabled; protected boolean enableMultiSqlSupport; - + protected Properties properties; + + protected QueryExecutorBase(RedshiftStream pgStream, String user, String database, int cancelSignalTimeout, Properties info, RedshiftLogger logger) throws SQLException { @@ -76,6 +78,7 @@ protected QueryExecutorBase(RedshiftStream pgStream, String user, this.pgStream = pgStream; this.user = user; this.database = database; + this.properties = info; this.cancelSignalTimeout = cancelSignalTimeout; this.reWriteBatchedInserts = RedshiftProperty.REWRITE_BATCHED_INSERTS.getBoolean(info); this.columnSanitiserDisabled = RedshiftProperty.DISABLE_COLUMN_SANITISER.getBoolean(info); @@ -191,7 +194,7 @@ public void sendQueryCancel() throws SQLException { } cancelStream = - new RedshiftStream(pgStream.getSocketFactory(), pgStream.getHostSpec(), cancelSignalTimeout, logger); + new RedshiftStream(pgStream.getSocketFactory(), pgStream.getHostSpec(), cancelSignalTimeout, logger, false, this.properties); if (cancelSignalTimeout > 0) { cancelStream.setNetworkTimeout(cancelSignalTimeout); } diff --git a/src/main/java/com/amazon/redshift/core/RedshiftStream.java b/src/main/java/com/amazon/redshift/core/RedshiftStream.java index fa654e3..701dd68 100644 --- a/src/main/java/com/amazon/redshift/core/RedshiftStream.java +++ b/src/main/java/com/amazon/redshift/core/RedshiftStream.java @@ -28,9 +28,12 @@ import java.net.Socket; import java.net.SocketTimeoutException; import java.sql.SQLException; +import java.util.Properties; import javax.net.SocketFactory; +import static com.amazon.redshift.jdbc.RedshiftConnectionImpl.getOptionalSetting; + /** *

Wrapper around the raw connection to the server that implements some basic primitives * (reading/writing formatted data, doing string encoding, etc).

@@ -47,6 +50,8 @@ public class RedshiftStream implements Closeable, Flushable { private Socket connection; private VisibleBufferedInputStream pgInput; + + private CompressedInputStream pgCompressedInput; private OutputStream pgOutput; private byte[] streamBuffer; @@ -71,7 +76,7 @@ public class RedshiftStream implements Closeable, Flushable { * @param logger the logger to log the entry for debugging. * @throws IOException if an IOException occurs below it. */ - public RedshiftStream(SocketFactory socketFactory, HostSpec hostSpec, int timeout, RedshiftLogger logger) throws IOException { + public RedshiftStream(SocketFactory socketFactory, HostSpec hostSpec, int timeout, RedshiftLogger logger, boolean disableCompressionForSSL, Properties info) throws IOException { this.logger = logger; this.socketFactory = socketFactory; this.hostSpec = hostSpec; @@ -96,10 +101,9 @@ public RedshiftStream(SocketFactory socketFactory, HostSpec hostSpec, int timeou logger.log(LogLevel.INFO, "port: " + address.getPort()); logger.log(LogLevel.INFO, "hostname: " + address.getHostName()); logger.log(LogLevel.INFO, "hoststring: " + address.getHostString()); - - } - changeSocket(socket); + + changeSocket(socket, disableCompressionForSSL, info); setEncoding(Encoding.getJVMEncoding("UTF-8", logger)); int2Buf = new byte[2]; @@ -116,11 +120,11 @@ public RedshiftStream(SocketFactory socketFactory, HostSpec hostSpec, int timeou * @param hostSpec the host and port to connect to * @param logger the logger to log the entry for debugging. * @throws IOException if an IOException occurs below it. - * @deprecated use {@link #RedshiftStream(SocketFactory, com.amazon.redshift.util.HostSpec, int, RedshiftLogger)} + * @deprecated use {@link #RedshiftStream(SocketFactory, com.amazon.redshift.util.HostSpec, int, RedshiftLogger, boolean, Properties)} */ @Deprecated - public RedshiftStream(SocketFactory socketFactory, HostSpec hostSpec, RedshiftLogger logger) throws IOException { - this(socketFactory, hostSpec, 0, logger); + public RedshiftStream(SocketFactory socketFactory, HostSpec hostSpec, RedshiftLogger logger, Properties info) throws IOException { + this(socketFactory, hostSpec, 0, logger, true, info); } public RedshiftLogger getLogger() { @@ -197,16 +201,45 @@ public void setMinStreamAvailableCheckDelay(int delay) { * @param socket the new socket to change to * @throws IOException if something goes wrong */ - public void changeSocket(Socket socket) throws IOException { + public void changeSocket(Socket socket, Boolean disableCompressionForSSL, Properties info) throws IOException { this.connection = socket; + changeStream(disableCompressionForSSL, info); + } + public void changeStream(Boolean disableCompressionForSSL, Properties info) throws IOException { // Submitted by Jason Venner . Disable Nagle // as we are selective about flushing output only when we // really need to. connection.setTcpNoDelay(true); // Buffer sizes submitted by Sverre H Huseby - pgInput = new VisibleBufferedInputStream(connection.getInputStream(), 8192); + InputStream connectionStream = connection.getInputStream(); + + String compressionMode = getOptionalSetting(RedshiftProperty.COMPRESSION.getName(), info); + compressionMode = null == compressionMode ? RedshiftProperty.COMPRESSION.getDefaultValue() : compressionMode; + + // If doing SSL handshake or if compression is set to off by user, use regular input stream + if(disableCompressionForSSL || compressionMode.equalsIgnoreCase("off")) + { + if(RedshiftLogger.isEnable()) + { + logger.logInfo("Compression is disabled. Creating regular input stream."); + } + + pgInput = new VisibleBufferedInputStream(connectionStream, 8192); + } + else + { + // Use a compressed input stream + if(RedshiftLogger.isEnable()) + { + logger.logInfo("Compression is enabled. Creating compressed input stream."); + } + + pgCompressedInput = new CompressedInputStream(connectionStream, logger); + pgInput = new VisibleBufferedInputStream(pgCompressedInput, 8192); + } + pgOutput = new BufferedOutputStream(connection.getOutputStream(), 8192); if (encoding != null) { @@ -214,6 +247,16 @@ public void changeSocket(Socket socket) throws IOException { } } + public long getBytesFromStream() + { + if(null != pgCompressedInput) + { + return pgCompressedInput.getBytesReadFromStream(); + } + + return pgInput.getBytesReadFromStream(); + } + public Encoding getEncoding() { return encoding; } diff --git a/src/main/java/com/amazon/redshift/core/VisibleBufferedInputStream.java b/src/main/java/com/amazon/redshift/core/VisibleBufferedInputStream.java index 9e6dbd1..bc840b4 100644 --- a/src/main/java/com/amazon/redshift/core/VisibleBufferedInputStream.java +++ b/src/main/java/com/amazon/redshift/core/VisibleBufferedInputStream.java @@ -55,6 +55,12 @@ public class VisibleBufferedInputStream extends InputStream { */ private boolean timeoutRequested = false; + /** + * number of bytes read from stream + */ + private long bytesReadFromStream = 0; + + /** * Creates a new buffer around the given stream. * @@ -159,6 +165,10 @@ private boolean readMore(int wanted, boolean block) throws IOException { int read = 0; try { read = wrapped.read(buffer, endIndex, canFit); + if(read > 0) + { + bytesReadFromStream += read; + } if (!block && read == 0) { return false; } @@ -246,6 +256,10 @@ public int read(byte[] to, int off, int len) throws IOException { int r; try { r = wrapped.read(to, off, len); + if(r > 0) + { + bytesReadFromStream += r; + } } catch (SocketTimeoutException e) { if (read == 0 && timeoutRequested) { throw e; @@ -312,6 +326,14 @@ public int getIndex() { return index; } + /** + * Returns the number of bytes read by the stream + */ + public long getBytesReadFromStream() + { + return bytesReadFromStream; + } + /** * Scans the length of the next null terminated string (C-style string) from the stream. * diff --git a/src/main/java/com/amazon/redshift/core/v3/ConnectionFactoryImpl.java b/src/main/java/com/amazon/redshift/core/v3/ConnectionFactoryImpl.java index 9e36a2a..d3b3a38 100644 --- a/src/main/java/com/amazon/redshift/core/v3/ConnectionFactoryImpl.java +++ b/src/main/java/com/amazon/redshift/core/v3/ConnectionFactoryImpl.java @@ -115,61 +115,15 @@ private RedshiftStream tryConnect(String user, String database, RedshiftStream newStream = null; - try { - newStream = new RedshiftStream(socketFactory, hostSpec, connectTimeout, logger); - - // Set the socket timeout if the "socketTimeout" property has been set. - int socketTimeout = RedshiftProperty.SOCKET_TIMEOUT.getInt(info); - if (socketTimeout > 0) { - newStream.getSocket().setSoTimeout(socketTimeout * 1000); - } - - String maxResultBuffer = RedshiftProperty.MAX_RESULT_BUFFER.get(info); - newStream.setMaxResultBuffer(maxResultBuffer); - - // Enable TCP keep-alive probe if required. - boolean requireTCPKeepAlive = RedshiftProperty.TCP_KEEP_ALIVE.getBoolean(info); - newStream.getSocket().setKeepAlive(requireTCPKeepAlive); - - // Try to set SO_SNDBUF and SO_RECVBUF socket options, if requested. - // If receiveBufferSize and send_buffer_size are set to a value greater - // than 0, adjust. -1 means use the system default, 0 is ignored since not - // supported. - - // Set SO_RECVBUF read buffer size - int receiveBufferSize = RedshiftProperty.RECEIVE_BUFFER_SIZE.getInt(info); - if (receiveBufferSize > -1) { - // value of 0 not a valid buffer size value - if (receiveBufferSize > 0) { - newStream.getSocket().setReceiveBufferSize(receiveBufferSize); - } else { - if(RedshiftLogger.isEnable()) - logger.log(LogLevel.INFO, "Ignore invalid value for receiveBufferSize: {0}", receiveBufferSize); - } - } - - // Set SO_SNDBUF write buffer size - int sendBufferSize = RedshiftProperty.SEND_BUFFER_SIZE.getInt(info); - if (sendBufferSize > -1) { - if (sendBufferSize > 0) { - newStream.getSocket().setSendBufferSize(sendBufferSize); - } else { - if(RedshiftLogger.isEnable()) - logger.log(LogLevel.INFO, "Ignore invalid value for sendBufferSize: {0}", sendBufferSize); - } - } - - if(RedshiftLogger.isEnable()) { - logger.log(LogLevel.DEBUG, "Receive Buffer Size is {0}", newStream.getSocket().getReceiveBufferSize()); - logger.log(LogLevel.DEBUG, "Send Buffer Size is {0}", newStream.getSocket().getSendBufferSize()); - } - + try + { + newStream = constructNewStream(socketFactory, hostSpec, connectTimeout, logger, true, info); + // Construct and send an ssl startup packet if requested. newStream = enableSSL(newStream, sslMode, info, connectTimeout); - List paramList = getParametersForStartup(user, database, info, true); sendStartupPacket(newStream, paramList); - + newStream.changeStream(false, info); // Do authentication (until AuthenticationOk). doAuthentication(newStream, hostSpec.getHost(), user, info); } @@ -181,6 +135,59 @@ private RedshiftStream tryConnect(String user, String database, return newStream; } + public RedshiftStream constructNewStream(SocketFactory socketFactory, HostSpec hostSpec, int connectTimeout, RedshiftLogger logger, Boolean disableCompressionForSSL, Properties info) throws SQLException, IOException + { + RedshiftStream newStream = new RedshiftStream(socketFactory, hostSpec, connectTimeout, logger, disableCompressionForSSL, info); + + // Set the socket timeout if the "socketTimeout" property has been set. + int socketTimeout = RedshiftProperty.SOCKET_TIMEOUT.getInt(info); + if (socketTimeout > 0) { + newStream.getSocket().setSoTimeout(socketTimeout * 1000); + } + + String maxResultBuffer = RedshiftProperty.MAX_RESULT_BUFFER.get(info); + newStream.setMaxResultBuffer(maxResultBuffer); + + // Enable TCP keep-alive probe if required. + boolean requireTCPKeepAlive = RedshiftProperty.TCP_KEEP_ALIVE.getBoolean(info); + newStream.getSocket().setKeepAlive(requireTCPKeepAlive); + + // Try to set SO_SNDBUF and SO_RECVBUF socket options, if requested. + // If receiveBufferSize and send_buffer_size are set to a value greater + // than 0, adjust. -1 means use the system default, 0 is ignored since not + // supported. + + // Set SO_RECVBUF read buffer size + int receiveBufferSize = RedshiftProperty.RECEIVE_BUFFER_SIZE.getInt(info); + if (receiveBufferSize > -1) { + // value of 0 not a valid buffer size value + if (receiveBufferSize > 0) { + newStream.getSocket().setReceiveBufferSize(receiveBufferSize); + } else { + if(RedshiftLogger.isEnable()) + logger.log(LogLevel.INFO, "Ignore invalid value for receiveBufferSize: {0}", receiveBufferSize); + } + } + + // Set SO_SNDBUF write buffer size + int sendBufferSize = RedshiftProperty.SEND_BUFFER_SIZE.getInt(info); + if (sendBufferSize > -1) { + if (sendBufferSize > 0) { + newStream.getSocket().setSendBufferSize(sendBufferSize); + } else { + if(RedshiftLogger.isEnable()) + logger.log(LogLevel.INFO, "Ignore invalid value for sendBufferSize: {0}", sendBufferSize); + } + } + + if(RedshiftLogger.isEnable()) { + logger.log(LogLevel.DEBUG, "Receive Buffer Size is {0}", newStream.getSocket().getReceiveBufferSize()); + logger.log(LogLevel.DEBUG, "Send Buffer Size is {0}", newStream.getSocket().getSendBufferSize()); + } + + return newStream; + } + @Override public QueryExecutor openConnectionImpl(HostSpec[] hostSpecs, String user, String database, Properties info, RedshiftLogger logger) throws SQLException { @@ -406,6 +413,7 @@ else if(pluginName.equalsIgnoreCase(RedshiftConstants.IDC_BROWSER_PLUGIN)) paramList.add(new String[]{"database", database}); paramList.add(new String[]{"client_encoding", "UTF8"}); paramList.add(new String[]{"DateStyle", "ISO"}); + paramList.add(new String[]{"_pq_.compression", info.getProperty("compression", RedshiftProperty.COMPRESSION.get(info))}); Version assumeVersion = ServerVersion.from(RedshiftProperty.ASSUME_MIN_SERVER_VERSION.get(info)); @@ -580,7 +588,7 @@ private RedshiftStream enableSSL(RedshiftStream pgStream, SslMode sslMode, Prope // We have to reconnect to continue. pgStream.close(); - return new RedshiftStream(pgStream.getSocketFactory(), pgStream.getHostSpec(), connectTimeout, logger); + return new RedshiftStream(pgStream.getSocketFactory(), pgStream.getHostSpec(), connectTimeout, logger, true, info); case 'N': if(RedshiftLogger.isEnable()) diff --git a/src/main/java/com/amazon/redshift/core/v3/QueryExecutorImpl.java b/src/main/java/com/amazon/redshift/core/v3/QueryExecutorImpl.java index 57503b0..0fcae22 100644 --- a/src/main/java/com/amazon/redshift/core/v3/QueryExecutorImpl.java +++ b/src/main/java/com/amazon/redshift/core/v3/QueryExecutorImpl.java @@ -156,7 +156,7 @@ private static boolean looksLikePrepare(String sql) { private Object m_ringBufferThreadLock = new Object(); // Query or some execution on a socket in process - private final Lock m_executingLock = new ReentrantLock(); + private final Lock m_executingLock = new ReentrantLock(); /** * {@code CommandComplete(B)} messages are quite common, so we reuse instance to parse those @@ -188,6 +188,11 @@ public int getProtocolVersion() { return 3; } + public long getBytesReadFromStream() + { + return pgStream.getBytesFromStream(); + } + /** *

Supplement to synchronization of public methods on current QueryExecutor.

* @@ -2264,7 +2269,7 @@ private void processResultsOnThread(ResultHandler handler, } logger.log(LogLevel.DEBUG, " <=BE DataRow(len={0})", length); if (skipRow) { - logger.log(LogLevel.DEBUG, " skipRow={0}, rowCount = {1}, maxRows = {2}" + logger.log(LogLevel.DEBUG, " skipRow={0}, rowCount = {1}, maxRows = {2}" , skipRow, (rowCount!= null) ? rowCount[0] : 0, maxRows); } } diff --git a/src/main/java/com/amazon/redshift/jdbc/RedshiftConnectionImpl.java b/src/main/java/com/amazon/redshift/jdbc/RedshiftConnectionImpl.java index b2cc972..36d004e 100644 --- a/src/main/java/com/amazon/redshift/jdbc/RedshiftConnectionImpl.java +++ b/src/main/java/com/amazon/redshift/jdbc/RedshiftConnectionImpl.java @@ -37,7 +37,7 @@ import com.amazon.redshift.replication.RedshiftReplicationConnection; import com.amazon.redshift.replication.RedshiftReplicationConnectionImpl; import com.amazon.redshift.ssl.NonValidatingFactory; - +import com.amazon.redshift.core.v3.QueryExecutorImpl; import com.amazon.redshift.util.QuerySanitizer; import com.amazon.redshift.util.ByteConverter; import com.amazon.redshift.util.GT; @@ -242,6 +242,8 @@ public RedshiftConnectionImpl(HostSpec[] hostSpecs, logger.log(LogLevel.DEBUG, "JVM architecture is " + (RedshiftConnectionImpl.IS_64_BIT_JVM ? "64-bit" : "32-bit")); } + RedshiftProperties.evaluateProperties(info); + m_settings = new RedshiftJDBCSettings(); // IAM @@ -623,6 +625,20 @@ public ResultSet execSQLQuery(String s) throws SQLException { return execSQLQuery(s, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); } + @Override + public Long getBytesReadFromStream() + { + RedshiftConnectionImpl redshiftConnectionImpl = this; + if(null != redshiftConnectionImpl && null != redshiftConnectionImpl.getQueryExecutor()) + { + QueryExecutorImpl queryExecutorImpl = (QueryExecutorImpl) redshiftConnectionImpl.getQueryExecutor(); + long bytes = queryExecutorImpl.getBytesReadFromStream(); + return bytes; + } + + return 0L; + } + @Override public ResultSet execSQLQuery(String s, int resultSetType, int resultSetConcurrency) throws SQLException { @@ -926,8 +942,8 @@ private void initObjectTypes(Properties info) throws SQLException { public void close() throws SQLException { if (RedshiftLogger.isEnable()) - logger.logFunction(true); - + logger.logFunction(true); + if (queryExecutor == null) { // This might happen in case constructor throws an exception (e.g. host being not available). // When that happens the connection is still registered in the finalizer queue, so it gets finalized diff --git a/src/main/java/com/amazon/redshift/jdbc/RedshiftDatabaseMetaData.java b/src/main/java/com/amazon/redshift/jdbc/RedshiftDatabaseMetaData.java index d388d5d..36ee3dc 100644 --- a/src/main/java/com/amazon/redshift/jdbc/RedshiftDatabaseMetaData.java +++ b/src/main/java/com/amazon/redshift/jdbc/RedshiftDatabaseMetaData.java @@ -5327,8 +5327,8 @@ private int getExtSchemaPatternMatch(String schemaPattern) } private boolean isSingleDatabaseMetaData() { - return (isDatabaseMetadataCurrentDbOnly() - || !isMultiDatabasesCatalogEnableInServer()); + return (isDatabaseMetadataCurrentDbOnly() + || !isMultiDatabasesCatalogEnableInServer()); } private boolean isDatabaseMetadataCurrentDbOnly() { return connection.isDatabaseMetadataCurrentDbOnly(); diff --git a/src/main/java/com/amazon/redshift/jdbc/RedshiftResultSet.java b/src/main/java/com/amazon/redshift/jdbc/RedshiftResultSet.java index 0aec7af..718488e 100644 --- a/src/main/java/com/amazon/redshift/jdbc/RedshiftResultSet.java +++ b/src/main/java/com/amazon/redshift/jdbc/RedshiftResultSet.java @@ -195,7 +195,7 @@ public long getRowCount() throws SQLException { } public java.net.URL getURL(int columnIndex) throws SQLException { - if (RedshiftLogger.isEnable()) + if (RedshiftLogger.isEnable()) connection.getLogger().log(LogLevel.DEBUG, " getURL columnIndex: {0}", columnIndex); checkClosed(); throw com.amazon.redshift.Driver.notImplemented(this.getClass(), "getURL(int)"); @@ -2249,7 +2249,7 @@ private boolean isCharType(int columnIndex) throws SQLException { @Override public String getString(int columnIndex) throws SQLException { - if (RedshiftLogger.isEnable()) + if (RedshiftLogger.isEnable()) connection.getLogger().log(LogLevel.DEBUG, " getString columnIndex: {0}", columnIndex); checkResultSet(columnIndex); @@ -2366,7 +2366,7 @@ public String getString(int columnIndex) throws SQLException { */ @Override public boolean getBoolean(int columnIndex) throws SQLException { - if (RedshiftLogger.isEnable()) + if (RedshiftLogger.isEnable()) connection.getLogger().log(LogLevel.DEBUG, " getBoolean columnIndex: {0}", columnIndex); checkResultSet(columnIndex); @@ -2407,7 +2407,7 @@ public boolean getBoolean(int columnIndex) throws SQLException { @Override public byte getByte(int columnIndex) throws SQLException { - if (RedshiftLogger.isEnable()) + if (RedshiftLogger.isEnable()) connection.getLogger().log(LogLevel.DEBUG, " getByte columnIndex: {0}", columnIndex); checkResultSet(columnIndex); @@ -2458,7 +2458,7 @@ public byte getByte(int columnIndex) throws SQLException { @Override public short getShort(int columnIndex) throws SQLException { - if (RedshiftLogger.isEnable()) + if (RedshiftLogger.isEnable()) connection.getLogger().log(LogLevel.DEBUG, " getShort columnIndex: {0}", columnIndex); checkResultSet(columnIndex); @@ -2480,7 +2480,7 @@ public short getShort(int columnIndex) throws SQLException { @Override public int getInt(int columnIndex) throws SQLException { - if (RedshiftLogger.isEnable()) + if (RedshiftLogger.isEnable()) connection.getLogger().log(LogLevel.DEBUG, " getInt columnIndex: {0}", columnIndex); checkResultSet(columnIndex); @@ -2509,7 +2509,7 @@ public int getInt(int columnIndex) throws SQLException { @Override public long getLong(int columnIndex) throws SQLException { - if (RedshiftLogger.isEnable()) + if (RedshiftLogger.isEnable()) connection.getLogger().log(LogLevel.DEBUG, " getLong columnIndex: {0}", columnIndex); checkResultSet(columnIndex); @@ -2724,7 +2724,7 @@ private BigDecimal getFastBigDecimal(int columnIndex) throws SQLException, Numbe @Override public float getFloat(int columnIndex) throws SQLException { - if (RedshiftLogger.isEnable()) + if (RedshiftLogger.isEnable()) connection.getLogger().log(LogLevel.DEBUG, " getFloat columnIndex: {0}", columnIndex); checkResultSet(columnIndex); @@ -2746,7 +2746,7 @@ public float getFloat(int columnIndex) throws SQLException { @Override public double getDouble(int columnIndex) throws SQLException { - if (RedshiftLogger.isEnable()) + if (RedshiftLogger.isEnable()) connection.getLogger().log(LogLevel.DEBUG, " getDouble columnIndex: {0}", columnIndex); checkResultSet(columnIndex); @@ -2767,7 +2767,7 @@ public double getDouble(int columnIndex) throws SQLException { } public BigDecimal getBigDecimal(int columnIndex, int scale) throws SQLException { - if (RedshiftLogger.isEnable()) + if (RedshiftLogger.isEnable()) connection.getLogger().log(LogLevel.DEBUG, " getBigDecimal columnIndex: {0}", columnIndex); return (BigDecimal) getNumeric(columnIndex, scale, false); @@ -2850,7 +2850,7 @@ private Number getNumeric(int columnIndex, int scale, boolean allowNaN) throws S */ @Override public byte[] getBytes(int columnIndex) throws SQLException { - if (RedshiftLogger.isEnable()) + if (RedshiftLogger.isEnable()) connection.getLogger().log(LogLevel.DEBUG, " getBytes columnIndex: {0}", columnIndex); checkResultSet(columnIndex); @@ -2879,19 +2879,19 @@ public byte[] getBytes(int columnIndex) throws SQLException { } public java.sql.Date getDate(int columnIndex) throws SQLException { - if (RedshiftLogger.isEnable()) + if (RedshiftLogger.isEnable()) connection.getLogger().log(LogLevel.DEBUG, " getDate columnIndex: {0}", columnIndex); return getDate(columnIndex, null); } public Time getTime(int columnIndex) throws SQLException { - if (RedshiftLogger.isEnable()) + if (RedshiftLogger.isEnable()) connection.getLogger().log(LogLevel.DEBUG, " getTime columnIndex: {0}", columnIndex); return getTime(columnIndex, null); } public Timestamp getTimestamp(int columnIndex) throws SQLException { - if (RedshiftLogger.isEnable()) + if (RedshiftLogger.isEnable()) connection.getLogger().log(LogLevel.DEBUG, " getTimestamp columnIndex: {0}", columnIndex); return getTimestamp(columnIndex, null); } @@ -2919,7 +2919,7 @@ public InputStream getAsciiStream(int columnIndex) throws SQLException { } public InputStream getUnicodeStream(int columnIndex) throws SQLException { - if (RedshiftLogger.isEnable()) + if (RedshiftLogger.isEnable()) connection.getLogger().log(LogLevel.DEBUG, " getUnicodeStream columnIndex: {0}", columnIndex); checkResultSet(columnIndex); if (wasNullFlag) { @@ -2941,7 +2941,7 @@ public InputStream getUnicodeStream(int columnIndex) throws SQLException { } public InputStream getBinaryStream(int columnIndex) throws SQLException { - if (RedshiftLogger.isEnable()) + if (RedshiftLogger.isEnable()) connection.getLogger().log(LogLevel.DEBUG, " getBinaryStream columnIndex: {0}", columnIndex); checkResultSet(columnIndex); if (wasNullFlag) { @@ -3052,7 +3052,7 @@ public String getCursorName() throws SQLException { @Override public Object getObject(int columnIndex) throws SQLException { - if (RedshiftLogger.isEnable()) + if (RedshiftLogger.isEnable()) connection.getLogger().log(LogLevel.DEBUG, " getObject columnIndex: {0}", columnIndex); Field field; @@ -4083,7 +4083,7 @@ public void updateNClob(String columnName, Reader reader, long length) throws SQ } public NClob getNClob(int columnIndex) throws SQLException { - if (RedshiftLogger.isEnable()) + if (RedshiftLogger.isEnable()) connection.getLogger().log(LogLevel.DEBUG, " getNClob columnIndex: {0}", columnIndex); throw com.amazon.redshift.Driver.notImplemented(this.getClass(), "getNClob(int)"); } @@ -4128,7 +4128,7 @@ public void updateClob(String columnName, Reader reader) throws SQLException { } public SQLXML getSQLXML(int columnIndex) throws SQLException { - if (RedshiftLogger.isEnable()) + if (RedshiftLogger.isEnable()) connection.getLogger().log(LogLevel.DEBUG, " getSQLXML columnIndex: {0}", columnIndex); String data = getString(columnIndex); if (data == null) { @@ -4151,7 +4151,7 @@ public void updateSQLXML(String columnName, SQLXML xmlObject) throws SQLExceptio } public String getNString(int columnIndex) throws SQLException { - if (RedshiftLogger.isEnable()) + if (RedshiftLogger.isEnable()) connection.getLogger().log(LogLevel.DEBUG, " getNString columnIndex: {0}", columnIndex); throw com.amazon.redshift.Driver.notImplemented(this.getClass(), "getNString(int)"); } @@ -4161,7 +4161,7 @@ public String getNString(String columnName) throws SQLException { } public Reader getNCharacterStream(int columnIndex) throws SQLException { - if (RedshiftLogger.isEnable()) + if (RedshiftLogger.isEnable()) connection.getLogger().log(LogLevel.DEBUG, " getNCharacterStream columnIndex: {0}", columnIndex); throw com.amazon.redshift.Driver.notImplemented(this.getClass(), "getNCharacterStream(int)"); } diff --git a/src/main/java/com/amazon/redshift/logger/LogFileHandler.java b/src/main/java/com/amazon/redshift/logger/LogFileHandler.java index c9cd28d..9706c60 100644 --- a/src/main/java/com/amazon/redshift/logger/LogFileHandler.java +++ b/src/main/java/com/amazon/redshift/logger/LogFileHandler.java @@ -18,7 +18,7 @@ public class LogFileHandler implements LogHandler { private static final int FILE_SIZE = 10 * 1024 * 1024; // 10 MB - private static final int FILE_COUNT = 10; + private static final int FILE_COUNT = 10; private static final String FILE_EXTENSION = ".log"; diff --git a/src/main/java/com/amazon/redshift/ssl/MakeSSL.java b/src/main/java/com/amazon/redshift/ssl/MakeSSL.java index cc7fef1..ba8b519 100644 --- a/src/main/java/com/amazon/redshift/ssl/MakeSSL.java +++ b/src/main/java/com/amazon/redshift/ssl/MakeSSL.java @@ -47,7 +47,7 @@ public static void convert(RedshiftStream stream, Properties info) verifyPeerName(stream, info, newConnection); } - stream.changeSocket(newConnection); + stream.changeSocket(newConnection, false, info); } private static void verifyPeerName(RedshiftStream stream, Properties info, SSLSocket newConnection) diff --git a/src/main/java/com/amazon/redshift/util/RedshiftProperties.java b/src/main/java/com/amazon/redshift/util/RedshiftProperties.java index 26b9727..0326522 100644 --- a/src/main/java/com/amazon/redshift/util/RedshiftProperties.java +++ b/src/main/java/com/amazon/redshift/util/RedshiftProperties.java @@ -1,5 +1,7 @@ package com.amazon.redshift.util; +import com.amazon.redshift.RedshiftProperty; + import java.util.Locale; import java.util.Properties; import java.util.Enumeration; @@ -71,4 +73,17 @@ public synchronized Object setProperty(String key, String value) { return super.setProperty(key.toLowerCase(Locale.ENGLISH), value); } + + public static void evaluateProperties(RedshiftProperties properties) throws RedshiftException + { + //evaluate compression algo + String compressionAlgo = RedshiftProperty.COMPRESSION.get(properties); + + if(!(compressionAlgo.equalsIgnoreCase("lz4:1") || + compressionAlgo.equalsIgnoreCase("lz4") || + compressionAlgo.equalsIgnoreCase("off"))) + { + throw new RedshiftException("Unsupported compression algorithm specified : " + compressionAlgo); + } + } }