Skip to content

Commit

Permalink
[R2DBC-67] set SPEC version support tp 1.0.0-release version
Browse files Browse the repository at this point in the history
  • Loading branch information
rusher committed Nov 21, 2022
1 parent f5e24d4 commit 7a89c0d
Show file tree
Hide file tree
Showing 12 changed files with 35 additions and 44 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
<logback.version>1.2.10</logback.version>
<netty.version>4.1.85.Final</netty.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<r2dbc-spi.version>0.9.1.RELEASE</r2dbc-spi.version>
<r2dbc-spi.version>1.0.0.RELEASE</r2dbc-spi.version>
<reactor.version>2022.0.0</reactor.version>
<mariadb-jdbc.version>3.0.9</mariadb-jdbc.version>
<uberjar.name>benchmarks</uberjar.name>
Expand Down
6 changes: 3 additions & 3 deletions src/benchmark/java/org/mariadb/r2dbc/Do_1.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@
public class Do_1 extends Common {

@Benchmark
public Integer testR2dbc(MyState state) throws Throwable {
public Long testR2dbc(MyState state) throws Throwable {
return consume(state.r2dbc);
}

@Benchmark
public Integer testR2dbcPrepare(MyState state) throws Throwable {
public Long testR2dbcPrepare(MyState state) throws Throwable {
return consume(state.r2dbcPrepare);
}

private Integer consume(io.r2dbc.spi.Connection connection) {
private Long consume(io.r2dbc.spi.Connection connection) {
io.r2dbc.spi.Statement statement = connection.createStatement("DO 1");
return
Flux.from(statement.execute())
Expand Down
6 changes: 3 additions & 3 deletions src/benchmark/java/org/mariadb/r2dbc/Do_1000_param.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@ public class Do_1000_param extends Common {
}

@Benchmark
public Integer testR2dbc(MyState state) throws Throwable {
public Long testR2dbc(MyState state) throws Throwable {
return consume(state.r2dbc);
}

@Benchmark
public Integer testR2dbcPrepare(MyState state) throws Throwable {
public Long testR2dbcPrepare(MyState state) throws Throwable {
return consume(state.r2dbcPrepare);
}

private Integer consume(io.r2dbc.spi.Connection connection) {
private Long consume(io.r2dbc.spi.Connection connection) {
io.r2dbc.spi.Statement statement = connection.createStatement(sql);
for (int i = 0; i < 1000; i++)
statement.bind(i,i);
Expand Down
6 changes: 3 additions & 3 deletions src/benchmark/java/org/mariadb/r2dbc/Insert_batch.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,16 @@ static public String randomString(int length) {
}

@Benchmark
public Integer testR2dbc(MyState state, Blackhole blackhole) throws Throwable {
public Long testR2dbc(MyState state, Blackhole blackhole) throws Throwable {
return consume(state.r2dbc, blackhole);
}

@Benchmark
public Integer testR2dbcPrepare(MyState state, Blackhole blackhole) throws Throwable {
public Long testR2dbcPrepare(MyState state, Blackhole blackhole) throws Throwable {
return consume(state.r2dbcPrepare, blackhole);
}

private Integer consume(io.r2dbc.spi.Connection connection, Blackhole blackhole) {
private Long consume(io.r2dbc.spi.Connection connection, Blackhole blackhole) {
String s = randomString(100);

io.r2dbc.spi.Statement statement = connection.createStatement("INSERT INTO perfTestTextBatch(t0) VALUES (?)");
Expand Down
16 changes: 2 additions & 14 deletions src/main/java/org/mariadb/r2dbc/api/MariadbResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,15 @@
package org.mariadb.r2dbc.api;

import io.r2dbc.spi.*;
import io.r2dbc.spi.Readable;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public interface MariadbResult extends Result {

@Override
Flux<Integer> getRowsUpdated();
Mono<Long> getRowsUpdated();

@Override
<T> Flux<T> map(BiFunction<Row, RowMetadata, ? extends T> mappingFunction);

@Override
<T> Flux<T> map(Function<? super Readable, ? extends T> mappingFunction);

@Override
Result filter(Predicate<Segment> filter);

@Override
<T> Flux<T> flatMap(Function<Segment, ? extends Publisher<? extends T>> mappingFunction);
}
2 changes: 1 addition & 1 deletion src/main/java/org/mariadb/r2dbc/client/SimpleClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ protected SimpleClient(
this.byteBufAllocator = connection.outbound().alloc();
this.messageSubscriber =
new ServerMessageSubscriber(this.lock, this.isClosed, exchangeQueue, receiverQueue);
connection.addHandler(new MariadbFrameDecoder());
connection.addHandlerFirst(new MariadbFrameDecoder());

if (logger.isTraceEnabled()) {
connection.addHandlerFirst(
Expand Down
4 changes: 2 additions & 2 deletions src/test/java/org/mariadb/r2dbc/integration/BatchTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ void basicBatch() {
.execute()
.flatMap(it -> it.getRowsUpdated())
.as(StepVerifier::create)
.expectNext(1, 1, 1, 1, 1)
.expectNext(1L, 1L, 1L, 1L, 1L)
.expectNextCount(15)
.then(
() -> {
Expand Down Expand Up @@ -84,7 +84,7 @@ private void batchTest(MariadbConnectionConfiguration conf) throws Exception {
.execute()
.flatMap(it -> it.getRowsUpdated())
.as(StepVerifier::create)
.expectNext(1, 1, 1, 1, 1)
.expectNext(1L, 1L, 1L, 1L, 1L)
.expectNextCount(15)
.then(
() -> {
Expand Down
4 changes: 2 additions & 2 deletions src/test/java/org/mariadb/r2dbc/integration/ErrorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import org.mariadb.r2dbc.TestConfiguration;
import org.mariadb.r2dbc.api.MariadbConnection;
import org.mariadb.r2dbc.api.MariadbConnectionMetadata;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

public class ErrorTest extends BaseConnectionTest {
Expand Down Expand Up @@ -124,7 +124,7 @@ void rollbackException() {
.createStatement("SET SESSION innodb_lock_wait_timeout=1")
.execute()
.map(res -> res.getRowsUpdated())
.onErrorReturn(Flux.empty())
.onErrorReturn(Mono.empty())
.blockLast();
connection.beginTransaction().block();
connection
Expand Down
10 changes: 6 additions & 4 deletions src/test/java/org/mariadb/r2dbc/integration/RowMetadataTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,15 @@ void rowMeta() {
(row, metadata) -> {
List<String> expected =
Arrays.asList("t1Alias", "t2", "t3", "t4", "t5", "t6");
assertEquals(expected.size(), metadata.getColumnNames().size());
assertEquals(expected.size(), metadata.getColumnMetadatas().size());
assertTrue(metadata.contains("t1Alias"));
assertTrue(metadata.contains("T1ALIAS"));
assertTrue(metadata.contains("t1alias"));
assertFalse(metadata.contains("t1Aliass"));

assertArrayEquals(expected.toArray(), metadata.getColumnNames().toArray());
for (int i = 0; i < expected.size(); i++) {
assertEquals(
expected.get(i), metadata.getColumnMetadatas().get(i).getName());
}
this.assertThrows(
IndexOutOfBoundsException.class,
() -> metadata.getColumnMetadata(-1),
Expand Down Expand Up @@ -111,7 +113,7 @@ void rowMeta() {
this.assertThrows(
NoSuchElementException.class,
() -> metadata.getColumnMetadata("wrongName"),
"Column name 'wrongName' does not exist in column names [t1Alias, t2, t3, t4, t5, t6]");
"Column name 'wrongName' does not exist in column names ");

colMeta = metadata.getColumnMetadata(1);
assertEquals(Long.class, colMeta.getJavaType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,23 +221,23 @@ public void dupplicate() {
.execute()
.flatMap(r -> r.getRowsUpdated())
.as(StepVerifier::create)
.expectNext(2)
.expectNext(2L)
.verifyComplete();
if (isMariaDBServer() && minVersion(10, 5, 1)) {
sharedConn
.createStatement("INSERT INTO dupplicate(test) VALUES ('test3'), ('test4') RETURNING *")
.execute()
.flatMap(r -> r.getRowsUpdated())
.as(StepVerifier::create)
.expectNext(2)
.expectNext(2L)
.verifyComplete();

sharedConn
.createStatement("INSERT INTO dupplicate(test) VALUES ('test5') RETURNING *")
.execute()
.flatMap(r -> r.getRowsUpdated())
.as(StepVerifier::create)
.expectNext(1)
.expectNext(1L)
.verifyComplete();
}

Expand Down Expand Up @@ -267,7 +267,7 @@ public void getPosition() {
.execute()
.flatMap(r -> r.getRowsUpdated())
.as(StepVerifier::create)
.expectNext(1)
.expectNext(1L)
.verifyComplete();

sharedConn
Expand Down
6 changes: 3 additions & 3 deletions src/test/java/org/mariadb/r2dbc/integration/TlsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import org.mariadb.r2dbc.*;
import org.mariadb.r2dbc.api.MariadbConnection;
import org.mariadb.r2dbc.api.MariadbConnectionMetadata;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

public class TlsTest extends BaseConnectionTest {
Expand Down Expand Up @@ -63,7 +63,7 @@ public static void before2() {
.createStatement("DROP USER 'MUTUAL_AUTH'")
.execute()
.map(res -> res.getRowsUpdated())
.onErrorReturn(Flux.empty())
.onErrorReturn(Mono.empty())
.subscribe();
String create_sql;
String grant_sql;
Expand Down Expand Up @@ -109,7 +109,7 @@ public void testWithoutPassword() throws Throwable {
.createStatement("DROP USER IF EXISTS userWithoutPassword")
.execute()
.map(res -> res.getRowsUpdated())
.onErrorReturn(Flux.empty())
.onErrorReturn(Mono.empty())
.blockLast();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.mariadb.r2dbc.api.MariadbConnection;
import org.mariadb.r2dbc.api.MariadbConnectionMetadata;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class Ed25519PluginTest extends BaseConnectionTest {
static AtomicBoolean ed25519PluginEnabled = new AtomicBoolean(true);
Expand All @@ -32,7 +33,7 @@ public static void before2() {
.onErrorResume(
e -> {
ed25519PluginEnabled.set(false);
return Flux.just(1);
return Flux.just(1L);
})
.blockLast();

Expand All @@ -46,7 +47,7 @@ public static void before2() {
.onErrorResume(
e -> {
ed25519PluginEnabled.set(false);
return Flux.just(1);
return Flux.just(1L);
})
.blockLast();
}
Expand All @@ -60,7 +61,7 @@ public static void before2() {
.onErrorResume(
e -> {
ed25519PluginEnabled.set(false);
return Flux.just(1);
return Flux.just(1L);
})
.blockLast();
sharedConn.createStatement("FLUSH PRIVILEGES").execute().blockLast();
Expand All @@ -73,7 +74,7 @@ public static void after2() {
.createStatement("DROP USER IF EXISTS verificationEd25519AuthPlugin")
.execute()
.map(res -> res.getRowsUpdated())
.onErrorReturn(Flux.empty())
.onErrorReturn(Mono.empty())
.blockLast();
}

Expand Down

0 comments on commit 7a89c0d

Please sign in to comment.