Skip to content

Commit

Permalink
Merge branch 'release/0.8.4'
Browse files Browse the repository at this point in the history
  • Loading branch information
rusher committed Sep 29, 2020
2 parents eb00d2e + 7fe7d61 commit 784aa8b
Show file tree
Hide file tree
Showing 21 changed files with 304 additions and 82 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,15 @@
# Change Log
## [0.8.4](https://github.com/mariadb-corporation/mariadb-connector-r2dbc/tree/0.8.4) (29 Sep 2020)
[Full Changelog](https://github.com/mariadb-corporation/mariadb-connector-r2dbc/compare/0.8.3...0.8.4)

First Release Candidate release.

Changes compared to 0.8.3.beta1:
[R2DBC-9] Non pipelining prepare is close 2 time when race condition cache
[R2DBC-8] synchronous close
[R2DBC-7] authentication error when using multiple classloader
[R2DBC-6] socket option configuration addition for socket Timeout, keep alive and force RST

## [0.8.3](https://github.com/mariadb-corporation/mariadb-connector-r2dbc/tree/0.8.3) (23 Jul 2020)
[Full Changelog](https://github.com/mariadb-corporation/mariadb-connector-r2dbc/compare/0.8.2...0.8.3)

Expand Down
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ The MariaDB Connector is available through maven using :
<dependency>
<groupId>org.mariadb</groupId>
<artifactId>r2dbc-mariadb</artifactId>
<version>0.8.3-beta1</version>
<version>0.8.4-rc</version>
</dependency>
```

Expand Down Expand Up @@ -82,6 +82,9 @@ Basic example:
| **`port`** | Database server port number. *Not used when using option `socketPath`*|*integer*| 3306|
| **`database`** | Default database to use when establishing the connection. | *string* |
| **`connectTimeout`** | Sets the connection timeout | *Duration* | 10s|
| **`socketTimeout`** | Sets the socket timeout | *Duration* | |
| **`tcpKeepAlive`** | Sets socket keep alive | *Boolean* | false|
| **`tcpAbortiveClose`** | This option can be used in environments where connections are created and closed in rapid succession. Often, it is not possible to create a socket in such an environment after a while, since all local “ephemeral” ports are used up by TCP connections in TCP_WAIT state. Using tcpAbortiveClose works around this problem by resetting TCP connections (abortive or hard close) rather than doing an orderly close. | *boolean* | false|
| **`socket`** | Permits connections to the database through the Unix domain socket for faster connection whe server is local. | *string* |
| **`allowMultiQueries`** | Allows you to issue several SQL statements in a single call. (That is, `INSERT INTO a VALUES('b'); INSERT INTO c VALUES('d');`). <br/><br/>This may be a **security risk** as it allows for SQL Injection attacks.| *boolean* | false|
| **`connectionAttributes`** | When performance_schema is active, permit to send server some client information. <br>Those information can be retrieved on server within tables performance_schema.session_connect_attrs and performance_schema.session_account_connect_attrs. This can permit from server an identification of client/application per connection|*Map<String,String>* |
Expand All @@ -99,7 +102,7 @@ Basic example:
| **`useServerPrepStmts`** | Permit to indicate to use text or binary protocol for query with parameter |*boolean* | false |
| **`prepareCacheSize`** | if useServerPrepStmts = true, cache the prepared informations in a LRU cache to avoid re-preparation of command. Next use of that command, only prepared identifier and parameters (if any) will be sent to server. This mainly permit for server to avoid reparsing query. |*int* |256 |
| **`pamOtherPwd`** | Permit to provide additional password for PAM authentication with multiple authentication step. If multiple passwords, value must be URL encoded.|*string* | |

## Roadmap

* Performance !
Expand Down
14 changes: 7 additions & 7 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.mariadb</groupId>
<artifactId>r2dbc-mariadb</artifactId>
<version>0.8.3-beta1</version>
<version>0.8.4-rc</version>
<packaging>jar</packaging>
<url>https://github.com/mariadb-corporation/mariadb-connector-r2dbc</url>

Expand All @@ -41,7 +41,7 @@
<r2dbc-spi.version>0.8.2.RELEASE</r2dbc-spi.version>
<reactor.version>Dysprosium-SR9</reactor.version>
<mariadb-jdbc.version>2.6.1</mariadb-jdbc.version>
<r2dbc-mysql.version>0.8.1.RELEASE</r2dbc-mysql.version>
<r2dbc-mysql.version>0.8.2.RELEASE</r2dbc-mysql.version>
<uberjar.name>benchmarks</uberjar.name>
</properties>

Expand All @@ -53,15 +53,15 @@
</licenses>

<organization>
<name>mariadb.com</name>
<url>https://mariadb.com</url>
<name>MariaDB Corporation Ab</name>
<url>https://www.mariadb.com</url>
</organization>

<developers>
<developer>
<id>mariadbJdbcDevelopers</id>
<name>mariadb jdbc developers</name>
<url>http://mariadb.org/</url>
<id>mariadbDevelopers</id>
<name>mariadb developers</name>
<url>http://www.mariadb.com</url>
</developer>
</developers>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ public final class MariadbConnectionConfiguration {
public static final int DEFAULT_PORT = 3306;
private final String database;
private final String host;

private final Duration connectTimeout;
private final Duration socketTimeout;
private final boolean tcpKeepAlive;
private final boolean tcpAbortiveClose;
private final CharSequence password;
private final CharSequence[] pamOtherPwd;
private final int port;
Expand All @@ -54,6 +58,9 @@ public final class MariadbConnectionConfiguration {

private MariadbConnectionConfiguration(
@Nullable Duration connectTimeout,
@Nullable Duration socketTimeout,
@Nullable Boolean tcpKeepAlive,
@Nullable Boolean tcpAbortiveClose,
@Nullable String database,
@Nullable String host,
@Nullable Map<String, String> connectionAttributes,
Expand All @@ -77,6 +84,9 @@ private MariadbConnectionConfiguration(
@Nullable Integer prepareCacheSize,
@Nullable CharSequence[] pamOtherPwd) {
this.connectTimeout = connectTimeout == null ? Duration.ofSeconds(10) : connectTimeout;
this.socketTimeout = socketTimeout;
this.tcpKeepAlive = tcpKeepAlive == null ? Boolean.FALSE : tcpKeepAlive;
this.tcpAbortiveClose = tcpAbortiveClose == null ? Boolean.FALSE : tcpAbortiveClose;
this.database = database;
this.host = host;
this.connectionAttributes = connectionAttributes;
Expand Down Expand Up @@ -112,6 +122,16 @@ static boolean boolValue(Object value) {
throw new IllegalArgumentException(String.format("Option %s wrong boolean format", value));
}

static Duration durationValue(Object value) {
if (value instanceof Duration) {
return ((Duration) value);
}
if (value instanceof String) {
return Duration.parse(value.toString());
}
throw new IllegalArgumentException(String.format("Option %s wrong duration format", value));
}

static int intValue(Object value) {
if (value instanceof Number) {
return ((Number) value).intValue();
Expand All @@ -124,7 +144,6 @@ static int intValue(Object value) {

public static Builder fromOptions(ConnectionFactoryOptions connectionFactoryOptions) {
Builder builder = new Builder();
builder.connectTimeout(connectionFactoryOptions.getValue(CONNECT_TIMEOUT));
builder.database(connectionFactoryOptions.getValue(DATABASE));

if (connectionFactoryOptions.hasOption(MariadbConnectionFactoryProvider.SOCKET)) {
Expand All @@ -141,6 +160,31 @@ public static Builder fromOptions(ConnectionFactoryOptions connectionFactoryOpti
MariadbConnectionFactoryProvider.ALLOW_MULTI_QUERIES)));
}

if (connectionFactoryOptions.hasOption(ConnectionFactoryOptions.CONNECT_TIMEOUT)) {
builder.connectTimeout(
durationValue(
connectionFactoryOptions.getValue(ConnectionFactoryOptions.CONNECT_TIMEOUT)));
}

if (connectionFactoryOptions.hasOption(MariadbConnectionFactoryProvider.SOCKET_TIMEOUT)) {
builder.socketTimeout(
durationValue(
connectionFactoryOptions.getValue(MariadbConnectionFactoryProvider.SOCKET_TIMEOUT)));
}

if (connectionFactoryOptions.hasOption(MariadbConnectionFactoryProvider.TCP_KEEP_ALIVE)) {
builder.tcpKeepAlive(
boolValue(
connectionFactoryOptions.getValue(MariadbConnectionFactoryProvider.TCP_KEEP_ALIVE)));
}

if (connectionFactoryOptions.hasOption(MariadbConnectionFactoryProvider.TCP_ABORTIVE_CLOSE)) {
builder.tcpAbortiveClose(
boolValue(
connectionFactoryOptions.getValue(
MariadbConnectionFactoryProvider.TCP_ABORTIVE_CLOSE)));
}

if (connectionFactoryOptions.hasOption(MariadbConnectionFactoryProvider.ALLOW_PIPELINING)) {
builder.allowPipelining(
boolValue(
Expand Down Expand Up @@ -307,6 +351,18 @@ public int getPrepareCacheSize() {
return prepareCacheSize;
}

public Duration getSocketTimeout() {
return socketTimeout;
}

public boolean isTcpKeepAlive() {
return tcpKeepAlive;
}

public boolean isTcpAbortiveClose() {
return tcpAbortiveClose;
}

@Override
public String toString() {
StringBuilder hiddenPwd = new StringBuilder();
Expand Down Expand Up @@ -335,6 +391,12 @@ public String toString() {
+ '\''
+ ", connectTimeout="
+ connectTimeout
+ ", socketTimeout="
+ socketTimeout
+ ", tcpKeepAlive="
+ tcpKeepAlive
+ ", tcpAbortiveClose="
+ tcpAbortiveClose
+ ", password="
+ hiddenPwd
+ ", port="
Expand Down Expand Up @@ -386,6 +448,9 @@ public static final class Builder implements Cloneable {
private boolean allowPublicKeyRetrieval;
@Nullable private String username;
@Nullable private Duration connectTimeout;
@Nullable private Duration socketTimeout;
@Nullable private Boolean tcpKeepAlive;
@Nullable private Boolean tcpAbortiveClose;
@Nullable private String database;
@Nullable private String host;
@Nullable private Map<String, String> sessionVariables;
Expand Down Expand Up @@ -429,6 +494,9 @@ public MariadbConnectionConfiguration build() {

return new MariadbConnectionConfiguration(
this.connectTimeout,
this.socketTimeout,
this.tcpKeepAlive,
this.tcpAbortiveClose,
this.database,
this.host,
this.connectionAttributes,
Expand Down Expand Up @@ -464,6 +532,21 @@ public Builder connectTimeout(@Nullable Duration connectTimeout) {
return this;
}

public Builder socketTimeout(@Nullable Duration socketTimeout) {
this.socketTimeout = socketTimeout;
return this;
}

public Builder tcpKeepAlive(@Nullable Boolean tcpKeepAlive) {
this.tcpKeepAlive = tcpKeepAlive;
return this;
}

public Builder tcpAbortiveClose(@Nullable Boolean tcpAbortiveClose) {
this.tcpAbortiveClose = tcpAbortiveClose;
return this;
}

public Builder connectionAttributes(@Nullable Map<String, String> connectionAttributes) {
this.connectionAttributes = connectionAttributes;
return this;
Expand Down Expand Up @@ -747,6 +830,12 @@ public String toString() {
+ '\''
+ ", connectTimeout="
+ connectTimeout
+ ", socketTimeout="
+ socketTimeout
+ ", tcpKeepAlive="
+ tcpKeepAlive
+ ", tcpAbortiveClose="
+ tcpAbortiveClose
+ ", database='"
+ database
+ '\''
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.r2dbc.spi.ConnectionFactoryOptions;
import io.r2dbc.spi.ConnectionFactoryProvider;
import io.r2dbc.spi.Option;
import java.time.Duration;
import org.mariadb.r2dbc.util.Assert;

public final class MariadbConnectionFactoryProvider implements ConnectionFactoryProvider {
Expand All @@ -36,6 +37,9 @@ public final class MariadbConnectionFactoryProvider implements ConnectionFactory
public static final Option<String> SSL_MODE = Option.valueOf("sslMode");
public static final Option<String> CONNECTION_ATTRIBUTES = Option.valueOf("connectionAttributes");
public static final Option<String> PAM_OTHER_PASSWORD = Option.valueOf("pamOtherPwd");
public static final Option<Duration> SOCKET_TIMEOUT = Option.valueOf("socketTimeout");
public static final Option<Boolean> TCP_KEEP_ALIVE = Option.valueOf("tcpKeepAlive");
public static final Option<Boolean> TCP_ABORTIVE_CLOSE = Option.valueOf("tcpAbortiveClose");

static MariadbConnectionConfiguration createConfiguration(
ConnectionFactoryOptions connectionFactoryOptions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,11 @@ private Flux<org.mariadb.r2dbc.api.MariadbResult> execute(
} else {
flux =
sendPrepare(sql)
.flatMapMany(prepareResult1 -> sendExecuteCmd(factory, parameters, generatedColumns));
.flatMapMany(
prepareResult1 -> {
prepareResult = prepareResult1;
return sendExecuteCmd(factory, parameters, generatedColumns);
});
}
return flux.concatWith(
Flux.create(
Expand Down Expand Up @@ -341,13 +345,6 @@ private Mono<ServerPrepareResult> sendPrepare(String sql) {
prepareResult =
new ServerPrepareResult(
packet.getStatementId(), packet.getNumColumns(), packet.getNumParams());
if (client.getPrepareCache() != null) {
ServerPrepareResult res = client.getPrepareCache().get(sql);
if (res != null && !res.equals(prepareResult)) {
prepareResult.close(client);
prepareResult = res;
}
}
sink.next(prepareResult);
}
if (it.ending()) sink.complete();
Expand Down
30 changes: 28 additions & 2 deletions src/main/java/org/mariadb/r2dbc/client/ClientBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.mariadb.r2dbc.client;

import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslHandler;
Expand All @@ -43,6 +44,7 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.concurrent.Queues;
Expand Down Expand Up @@ -87,6 +89,32 @@ protected ClientBase(Connection connection, MariadbConnectionConfiguration confi
.subscribe();
}

public static TcpClient setSocketOption(
MariadbConnectionConfiguration configuration, TcpClient tcpClient) {
if (configuration.getConnectTimeout() != null) {
tcpClient =
tcpClient.option(
ChannelOption.CONNECT_TIMEOUT_MILLIS,
Math.toIntExact(configuration.getConnectTimeout().toMillis()));
}

if (configuration.getSocketTimeout() != null) {
tcpClient =
tcpClient.option(
ChannelOption.SO_TIMEOUT,
Math.toIntExact(configuration.getSocketTimeout().toMillis()));
}

if (configuration.isTcpKeepAlive()) {
tcpClient = tcpClient.option(ChannelOption.SO_KEEPALIVE, configuration.isTcpKeepAlive());
}

if (configuration.isTcpAbortiveClose()) {
tcpClient = tcpClient.option(ChannelOption.SO_LINGER, 0);
}
return tcpClient;
}

private void handleConnectionError(Throwable throwable) {
R2dbcNonTransientResourceException err;
if (this.isClosed.compareAndSet(false, true)) {
Expand All @@ -104,8 +132,6 @@ private void handleConnectionError(Throwable throwable) {
public Mono<Void> close() {
return Mono.defer(
() -> {
clearWaitingListWithError(
new R2dbcNonTransientResourceException("Connection is closing"));
if (this.isClosed.compareAndSet(false, true)) {

Channel channel = this.connection.channel();
Expand Down
8 changes: 1 addition & 7 deletions src/main/java/org/mariadb/r2dbc/client/ClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package org.mariadb.r2dbc.client;

import io.netty.channel.ChannelOption;
import io.r2dbc.spi.R2dbcNonTransientResourceException;
import java.net.SocketAddress;
import java.util.Queue;
Expand Down Expand Up @@ -47,12 +46,7 @@ public static Mono<Client> connect(
MariadbConnectionConfiguration configuration) {

TcpClient tcpClient = TcpClient.create(connectionProvider).remoteAddress(() -> socketAddress);
if (configuration.getConnectTimeout() != null) {
tcpClient =
tcpClient.option(
ChannelOption.CONNECT_TIMEOUT_MILLIS,
Math.toIntExact(configuration.getConnectTimeout().toMillis()));
}
tcpClient = setSocketOption(configuration, tcpClient);
return tcpClient.connect().flatMap(it -> Mono.just(new ClientImpl(it, configuration)));
}

Expand Down
Loading

0 comments on commit 784aa8b

Please sign in to comment.