Skip to content

Commit

Permalink
[CONJ-1207] new High Availability option : load-balance-read
Browse files Browse the repository at this point in the history
in order to avoid galera deadlocks, driver will always connect primary servers according to connection string order and replica using round-robin, permitting balancing connections
  • Loading branch information
rusher committed Nov 5, 2024
1 parent 65b4903 commit 9c2db3b
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 50 deletions.
2 changes: 1 addition & 1 deletion src/main/java/org/mariadb/jdbc/Configuration.java
Original file line number Diff line number Diff line change
Expand Up @@ -1148,7 +1148,7 @@ protected static String buildUrl(Configuration conf) {
StringBuilder sb = new StringBuilder();
sb.append("jdbc:mariadb:");
if (conf.haMode != HaMode.NONE) {
sb.append(conf.haMode.toString().toLowerCase(Locale.ROOT)).append(":");
sb.append(conf.haMode.toString().toLowerCase(Locale.ROOT).replace("_", "-")).append(":");
}
sb.append("//");
for (int i = 0; i < conf.addresses.size(); i++) {
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/org/mariadb/jdbc/HostAddress.java
Original file line number Diff line number Diff line change
Expand Up @@ -260,10 +260,12 @@ private static HostAddress parseParameterHostAddress(String str, HaMode haMode,
public String toString() {
if (pipe != null) return String.format("address=(pipe=%s)", pipe);
if (localSocket != null) return String.format("address=(localSocket=%s)", localSocket);
if (sslMode == null && sslMode == null && primary != Boolean.FALSE)
return port != 3306 ? host + ":" + port : host;
return String.format(
"address=(host=%s)(port=%s)%s%s",
"address=(host=%s)%s%s%s",
host,
port,
(port != 3306) ? "(port=" + port + ")" : "",
(sslMode != null) ? "(sslMode=" + sslMode.getValue() + ")" : "",
((primary != null) ? ("(type=" + (primary ? "primary)" : "replica)")) : ""));
}
Expand Down
20 changes: 20 additions & 0 deletions src/main/java/org/mariadb/jdbc/export/HaMode.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public Optional<HostAddress> getAvailableHost(
return HaMode.getAvailableRoundRobinHost(this, hostAddresses, denyList, primary);
}
},

/** sequential: driver will always connect according to connection string order */
SEQUENTIAL("sequential") {
public Optional<HostAddress> getAvailableHost(
Expand All @@ -30,6 +31,24 @@ public Optional<HostAddress> getAvailableHost(
return getAvailableHostInOrder(hostAddresses, denyList, primary);
}
},

/**
* load-balance-read: driver will always connect primary servers according to connection string
* order and replica using round-robin, permitting balancing connections
*/
LOAD_BALANCE_READ("load-balance-read") {
public Optional<HostAddress> getAvailableHost(
List<HostAddress> hostAddresses,
ConcurrentMap<HostAddress, Long> denyList,
boolean primary) {
if (primary) {
return SEQUENTIAL.getAvailableHost(hostAddresses, denyList, true);
} else {
return LOADBALANCE.getAvailableHost(hostAddresses, denyList, false);
}
}
},

/**
* load-balance: driver will connect to any host using round-robin, permitting balancing
* connections
Expand All @@ -45,6 +64,7 @@ public Optional<HostAddress> getAvailableHost(
return HaMode.getAvailableRoundRobinHost(this, hostAddresses, denyList, primary);
}
},

/** no ha-mode. Connect to first host only */
NONE("") {
public Optional<HostAddress> getAvailableHost(
Expand Down
18 changes: 5 additions & 13 deletions src/test/java/org/mariadb/jdbc/integration/RedirectionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ public class RedirectionTest extends Common {
void basicRedirection() throws Exception {

Connection connection = createProxyCon(HaMode.NONE, "");
Assertions.assertEquals(
String.format("address=(host=localhost)(port=%s)(type=primary)", proxy.getLocalPort()),
connection.__test_host());
Assertions.assertEquals("localhost:" + proxy.getLocalPort(), connection.__test_host());
boolean permitRedirection = true;
Statement stmt = connection.createStatement();
try {
Expand All @@ -36,8 +34,7 @@ void basicRedirection() throws Exception {

if (permitRedirection) {
Assertions.assertEquals(
String.format("address=(host=%s)(port=%s)(type=primary)", hostname, port),
connection.__test_host());
port == 3306 ? hostname : hostname + ":" + port, connection.__test_host());
}
connection.close();
proxy.stop();
Expand All @@ -47,9 +44,7 @@ void basicRedirection() throws Exception {
void redirectionDuringTransaction() throws Exception {

Connection connection = createProxyCon(HaMode.NONE, "");
Assertions.assertEquals(
String.format("address=(host=localhost)(port=%s)(type=primary)", proxy.getLocalPort()),
connection.__test_host());
Assertions.assertEquals("localhost:" + proxy.getLocalPort(), connection.__test_host());
boolean permitRedirection = true;
Statement stmt = connection.createStatement();

Expand All @@ -63,14 +58,11 @@ void redirectionDuringTransaction() throws Exception {
ResultSet rs = stmt.executeQuery("SELECT 1");
Assertions.assertTrue(rs.next());
Assertions.assertEquals(1, rs.getInt(1));
Assertions.assertEquals(
String.format("address=(host=localhost)(port=%s)(type=primary)", proxy.getLocalPort()),
connection.__test_host());
Assertions.assertEquals("localhost:" + proxy.getLocalPort(), connection.__test_host());
connection.commit();
if (permitRedirection) {
Assertions.assertEquals(
String.format("address=(host=%s)(port=%s)(type=primary)", hostname, port),
connection.__test_host());
port == 3306 ? hostname : hostname + ":" + port, connection.__test_host());
}
rs = stmt.executeQuery("SELECT 1");
Assertions.assertTrue(rs.next());
Expand Down
96 changes: 74 additions & 22 deletions src/test/java/org/mariadb/jdbc/unit/export/HaModeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ private void getAvailableHostWithoutConnectionNumber(HaMode haMode) {
hostAddresses.add(HostAddress.from("prim1", 3306, true));
hostAddresses.add(HostAddress.from("prim2", 3306, true));
hostAddresses.add(HostAddress.from("prim3", 3306, true));
hostAddresses.add(HostAddress.from("slave1", 3306, false));
hostAddresses.add(HostAddress.from("slave2", 3306, false));
hostAddresses.add(HostAddress.from("slave3", 3306, false));
hostAddresses.add(HostAddress.from("replica1", 3306, false));
hostAddresses.add(HostAddress.from("replica2", 3306, false));
hostAddresses.add(HostAddress.from("replica3", 3306, false));

haMode.resetLast();
ConcurrentMap<HostAddress, Long> denyList = new ConcurrentHashMap<>();
Expand All @@ -43,7 +43,7 @@ private void getAvailableHostWithoutConnectionNumber(HaMode haMode) {
Optional<HostAddress> availHost = haMode.getAvailableHost(hostAddresses, denyList, false);
if (availHost.isPresent()) hostCounter.add(availHost.get(), false);
}
assertEquals("slave1:34,slave2:33,slave3:33", hostCounter.results());
assertEquals("replica1:34,replica2:33,replica3:33", hostCounter.results());

haMode.resetLast();
denyList.put(hostAddresses.get(0), System.currentTimeMillis() - 100);
Expand All @@ -65,7 +65,59 @@ private void getAvailableHostWithoutConnectionNumber(HaMode haMode) {
Optional<HostAddress> availHost = haMode.getAvailableHost(hostAddresses, denyList, false);
if (availHost.isPresent()) hostCounter.add(availHost.get(), false);
}
assertEquals("slave1:50,slave3:50", hostCounter.results());
assertEquals("replica1:50,replica3:50", hostCounter.results());
}

@Test
public void loadBalanceRead() {

HaMode haMode = HaMode.LOAD_BALANCE_READ;
List<HostAddress> hostAddresses = new ArrayList<>();
hostAddresses.add(HostAddress.from("prim1", 3306, true));
hostAddresses.add(HostAddress.from("prim2", 3306, true));
hostAddresses.add(HostAddress.from("prim3", 3306, true));
hostAddresses.add(HostAddress.from("replica1", 3306, false));
hostAddresses.add(HostAddress.from("replica2", 3306, false));
hostAddresses.add(HostAddress.from("replica3", 3306, false));

haMode.resetLast();
ConcurrentMap<HostAddress, Long> denyList = new ConcurrentHashMap<>();
HostCounter hostCounter = new HostCounter();
for (int i = 0; i < 100; i++) {
Optional<HostAddress> availHost = haMode.getAvailableHost(hostAddresses, denyList, true);
if (availHost.isPresent()) hostCounter.add(availHost.get(), false);
}
assertEquals("prim1:100", hostCounter.results());

haMode.resetLast();
hostCounter = new HostCounter();
for (int i = 0; i < 100; i++) {
Optional<HostAddress> availHost = haMode.getAvailableHost(hostAddresses, denyList, false);
if (availHost.isPresent()) hostCounter.add(availHost.get(), false);
}
assertEquals("replica1:34,replica2:33,replica3:33", hostCounter.results());

haMode.resetLast();
denyList.put(hostAddresses.get(1), System.currentTimeMillis() + 1000);
denyList.put(hostAddresses.get(0), System.currentTimeMillis() - 100);

hostCounter = new HostCounter();
for (int i = 0; i < 100; i++) {
Optional<HostAddress> availHost = haMode.getAvailableHost(hostAddresses, denyList, true);
if (availHost.isPresent()) hostCounter.add(availHost.get(), false);
}
assertEquals("prim1:100", hostCounter.results());

haMode.resetLast();
denyList.clear();
denyList.put(hostAddresses.get(3), System.currentTimeMillis() - 100);
denyList.put(hostAddresses.get(4), System.currentTimeMillis() + 1000);
hostCounter = new HostCounter();
for (int i = 0; i < 100; i++) {
Optional<HostAddress> availHost = haMode.getAvailableHost(hostAddresses, denyList, false);
if (availHost.isPresent()) hostCounter.add(availHost.get(), false);
}
assertEquals("replica1:50,replica3:50", hostCounter.results());
}

@Test
Expand All @@ -86,15 +138,15 @@ private void getAvailableHostWithConnectionNumber(HaMode haMode) {
hostAddresses.add(host1);
hostAddresses.add(host2);
hostAddresses.add(host3);
HostAddress slave1 = HostAddress.from("slave1", 3306, false);
HostAddress slave2 = HostAddress.from("slave2", 3306, false);
HostAddress slave3 = HostAddress.from("slave3", 3306, false);
slave1.setThreadsConnected(200);
slave2.setThreadsConnected(150);
slave3.setThreadsConnected(100);
hostAddresses.add(slave1);
hostAddresses.add(slave2);
hostAddresses.add(slave3);
HostAddress replica1 = HostAddress.from("replica1", 3306, false);
HostAddress replica2 = HostAddress.from("replica2", 3306, false);
HostAddress replica3 = HostAddress.from("replica3", 3306, false);
replica1.setThreadsConnected(200);
replica2.setThreadsConnected(150);
replica3.setThreadsConnected(100);
hostAddresses.add(replica1);
hostAddresses.add(replica2);
hostAddresses.add(replica3);

ConcurrentMap<HostAddress, Long> denyList = new ConcurrentHashMap<>();
HostCounter hostCounter = new HostCounter();
Expand All @@ -114,15 +166,15 @@ private void getAvailableHostWithConnectionNumber(HaMode haMode) {
}
assertEquals("prim1:34,prim2:33,prim3:33", hostCounter.results());

slave1.setThreadsConnected(200);
slave2.setThreadsConnected(150);
slave3.setThreadsConnected(100);
replica1.setThreadsConnected(200);
replica2.setThreadsConnected(150);
replica3.setThreadsConnected(100);
hostCounter = new HostCounter();
for (int i = 0; i < 100; i++) {
Optional<HostAddress> availHost = haMode.getAvailableHost(hostAddresses, denyList, false);
if (availHost.isPresent()) hostCounter.add(availHost.get(), true);
}
assertEquals("slave2:25,slave3:75", hostCounter.results());
assertEquals("replica2:25,replica3:75", hostCounter.results());

denyList.put(hostAddresses.get(0), System.currentTimeMillis() - 100);
denyList.put(hostAddresses.get(1), System.currentTimeMillis() + 1000);
Expand All @@ -139,15 +191,15 @@ private void getAvailableHostWithConnectionNumber(HaMode haMode) {
denyList.clear();
denyList.put(hostAddresses.get(3), System.currentTimeMillis() - 100);
denyList.put(hostAddresses.get(4), System.currentTimeMillis() + 1000);
slave1.setThreadsConnected(150);
slave2.setThreadsConnected(150);
slave3.setThreadsConnected(100);
replica1.setThreadsConnected(150);
replica2.setThreadsConnected(150);
replica3.setThreadsConnected(100);
hostCounter = new HostCounter();
for (int i = 0; i < 100; i++) {
Optional<HostAddress> availHost = haMode.getAvailableHost(hostAddresses, denyList, false);
if (availHost.isPresent()) hostCounter.add(availHost.get(), true);
}
assertEquals("slave1:25,slave3:75", hostCounter.results());
assertEquals("replica1:25,replica3:75", hostCounter.results());
}

private static class HostCounter {
Expand Down
65 changes: 53 additions & 12 deletions src/test/java/org/mariadb/jdbc/unit/util/ConfigurationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,22 @@ public void testUrl() throws SQLException {
.socketTimeout(50)
.build();
assertEquals(
"jdbc:mariadb:loadbalance://address=(host=local)(port=3306)(type=primary),address=(host=local)(port=3307)(type=primary),address=(host=local)(port=3308)(type=primary)/DB?socketTimeout=50",
"jdbc:mariadb:loadbalance://local,local:3307,local:3308/DB?socketTimeout=50",
conf.initialUrl());

conf =
new Configuration.Builder()
.database("DB")
.addHost("local1", 3306)
.addHost("local2", 3306)
.addHost("local3", 3306)
.addHost("local4", 3306, false)
.addHost("local5", 3306, false)
.haMode(HaMode.LOADBALANCE)
.socketTimeout(50)
.build();
assertEquals(
"jdbc:mariadb:loadbalance://local1,local2,local3,address=(host=local4)(type=replica),address=(host=local5)(type=replica)/DB?socketTimeout=50",
conf.initialUrl());

conf =
Expand All @@ -170,7 +185,7 @@ public void testUrl() throws SQLException {
@Test
public void testPipeSocket() throws SQLException {
String url =
"jdbc:mariadb:sequential://address=(pipe=Mariadb106),address=(localSocket=/socket),address=(host=local)(port=3306)(type=primary)/DB?socketTimeout=50";
"jdbc:mariadb:sequential://address=(pipe=Mariadb106),address=(localSocket=/socket),local/DB?socketTimeout=50";
Configuration conf =
new Configuration.Builder()
.database("DB")
Expand All @@ -185,10 +200,36 @@ public void testPipeSocket() throws SQLException {
assertEquals(url, conf.initialUrl());
}

@Test
public void testSequentialWrite() throws SQLException {
String url =
"jdbc:mariadb:load-balance-read://127.0.0.5,127.0.0.6,address=(host=127.0.0.7)(type=replica),address=(host=127.0.0.8)(type=replica)/DB?socketTimeout=50";
Configuration conf =
new Configuration.Builder()
.database("DB")
.addHost("127.0.0.5", 3306)
.addHost("127.0.0.6", 3306)
.addHost("127.0.0.7", 3306, false)
.addHost("127.0.0.8", 3306, false)
.haMode(HaMode.LOAD_BALANCE_READ)
.socketTimeout(50)
.build();
assertEquals(url, conf.initialUrl());
conf = Configuration.parse(url);
assertEquals(url, conf.initialUrl());

// alias
assertEquals(
url,
Configuration.parse(
"jdbc:mariadb:load-balance-read://127.0.0.5,127.0.0.6,address=(host=127.0.0.7)(type=replica),address=(host=127.0.0.8)(type=replica)/DB?socketTimeout=50")
.initialUrl());
}

@Test
public void testPipeSocketSsl() throws SQLException {
String url =
"jdbc:mariadb:sequential://address=(pipe=Mariadb106),address=(localSocket=/socket),address=(host=local)(port=3306)(sslMode=verify-full)(type=primary)/DB?socketTimeout=50";
"jdbc:mariadb:sequential://address=(pipe=Mariadb106),address=(localSocket=/socket),address=(host=local)(sslMode=verify-full)(type=primary)/DB?socketTimeout=50";
Configuration conf =
new Configuration.Builder()
.database("DB")
Expand Down Expand Up @@ -617,13 +658,14 @@ public void testJdbcParserParameter() throws SQLException {

@Test
public void address() {
assertEquals("address=(host=test)(port=3306)", HostAddress.from("test", 3306).toString());
assertEquals("test", HostAddress.from("test", 3306).toString());
assertEquals("test:3304", HostAddress.from("test", 3304).toString());
assertEquals(
"address=(host=test)(port=3306)(type=replica)",
HostAddress.from("test", 3306, false).toString());
"address=(host=test)(type=replica)", HostAddress.from("test", 3306, false).toString());
assertEquals(
"address=(host=test)(port=3306)(type=primary)",
HostAddress.from("test", 3306, true).toString());
"address=(host=test)(port=3307)(type=replica)",
HostAddress.from("test", 3307, false).toString());
assertEquals("test", HostAddress.from("test", 3306, true).toString());
}

@Test
Expand Down Expand Up @@ -989,14 +1031,13 @@ public void toConf() throws SQLException {
.startsWith(
"Configuration:\n"
+ " * resulting Url :"
+ " jdbc:mariadb:loadbalance://address=(host=host1)(port=3305)(type=primary),address=(host=host2)(port=3307)(type=replica)/db?user=me&password=***&nonExisting=&nonExistingWithValue=tt&autocommit=false&createDatabaseIfNotExist=true\n"
+ " jdbc:mariadb:loadbalance://host1:3305,address=(host=host2)(port=3307)(type=replica)/db?user=me&password=***&nonExisting=&nonExistingWithValue=tt&autocommit=false&createDatabaseIfNotExist=true\n"
+ "Unknown options : \n"
+ " * nonExisting : \n"
+ " * nonExistingWithValue : tt\n"
+ "\n"
+ "Non default options : \n"
+ " * addresses : [address=(host=host1)(port=3305)(type=primary),"
+ " address=(host=host2)(port=3307)(type=replica)]\n"
+ " * addresses : [host1:3305, address=(host=host2)(port=3307)(type=replica)]\n"
+ " * autocommit : false\n"
+ " * createDatabaseIfNotExist : true\n"
+ " * database : db\n"
Expand Down Expand Up @@ -1028,7 +1069,7 @@ public void toConf() throws SQLException {
+ " * user : root\n"
+ "\n"
+ "default options :\n"
+ " * addresses : [address=(host=localhost)(port=3306)(type=primary)]\n"
+ " * addresses : [localhost]\n"
+ " * allowLocalInfile : true"));
}
}

0 comments on commit 9c2db3b

Please sign in to comment.