From 9c2db3bee043c71755a35ace70fde10c8fffa6a9 Mon Sep 17 00:00:00 2001 From: rusher Date: Tue, 5 Nov 2024 14:29:08 +0100 Subject: [PATCH] [CONJ-1207] new High Availability option : load-balance-read in order to avoid galera deadlocks, driver will always connect primary servers according to connection string order and replica using round-robin, permitting balancing connections --- .../java/org/mariadb/jdbc/Configuration.java | 2 +- .../java/org/mariadb/jdbc/HostAddress.java | 6 +- .../java/org/mariadb/jdbc/export/HaMode.java | 20 ++++ .../jdbc/integration/RedirectionTest.java | 18 +--- .../mariadb/jdbc/unit/export/HaModeTest.java | 96 ++++++++++++++----- .../jdbc/unit/util/ConfigurationTest.java | 65 ++++++++++--- 6 files changed, 157 insertions(+), 50 deletions(-) diff --git a/src/main/java/org/mariadb/jdbc/Configuration.java b/src/main/java/org/mariadb/jdbc/Configuration.java index c02f95df4..338c88b21 100644 --- a/src/main/java/org/mariadb/jdbc/Configuration.java +++ b/src/main/java/org/mariadb/jdbc/Configuration.java @@ -1148,7 +1148,7 @@ protected static String buildUrl(Configuration conf) { StringBuilder sb = new StringBuilder(); sb.append("jdbc:mariadb:"); if (conf.haMode != HaMode.NONE) { - sb.append(conf.haMode.toString().toLowerCase(Locale.ROOT)).append(":"); + sb.append(conf.haMode.toString().toLowerCase(Locale.ROOT).replace("_", "-")).append(":"); } sb.append("//"); for (int i = 0; i < conf.addresses.size(); i++) { diff --git a/src/main/java/org/mariadb/jdbc/HostAddress.java b/src/main/java/org/mariadb/jdbc/HostAddress.java index 683838950..276b445be 100644 --- a/src/main/java/org/mariadb/jdbc/HostAddress.java +++ b/src/main/java/org/mariadb/jdbc/HostAddress.java @@ -260,10 +260,12 @@ private static HostAddress parseParameterHostAddress(String str, HaMode haMode, public String toString() { if (pipe != null) return String.format("address=(pipe=%s)", pipe); if (localSocket != null) return String.format("address=(localSocket=%s)", localSocket); + if (sslMode == null && sslMode == null && primary != Boolean.FALSE) + return port != 3306 ? host + ":" + port : host; return String.format( - "address=(host=%s)(port=%s)%s%s", + "address=(host=%s)%s%s%s", host, - port, + (port != 3306) ? "(port=" + port + ")" : "", (sslMode != null) ? "(sslMode=" + sslMode.getValue() + ")" : "", ((primary != null) ? ("(type=" + (primary ? "primary)" : "replica)")) : "")); } diff --git a/src/main/java/org/mariadb/jdbc/export/HaMode.java b/src/main/java/org/mariadb/jdbc/export/HaMode.java index 81fbdb6e8..53766d13c 100644 --- a/src/main/java/org/mariadb/jdbc/export/HaMode.java +++ b/src/main/java/org/mariadb/jdbc/export/HaMode.java @@ -21,6 +21,7 @@ public Optional getAvailableHost( return HaMode.getAvailableRoundRobinHost(this, hostAddresses, denyList, primary); } }, + /** sequential: driver will always connect according to connection string order */ SEQUENTIAL("sequential") { public Optional getAvailableHost( @@ -30,6 +31,24 @@ public Optional getAvailableHost( return getAvailableHostInOrder(hostAddresses, denyList, primary); } }, + + /** + * load-balance-read: driver will always connect primary servers according to connection string + * order and replica using round-robin, permitting balancing connections + */ + LOAD_BALANCE_READ("load-balance-read") { + public Optional getAvailableHost( + List hostAddresses, + ConcurrentMap denyList, + boolean primary) { + if (primary) { + return SEQUENTIAL.getAvailableHost(hostAddresses, denyList, true); + } else { + return LOADBALANCE.getAvailableHost(hostAddresses, denyList, false); + } + } + }, + /** * load-balance: driver will connect to any host using round-robin, permitting balancing * connections @@ -45,6 +64,7 @@ public Optional getAvailableHost( return HaMode.getAvailableRoundRobinHost(this, hostAddresses, denyList, primary); } }, + /** no ha-mode. Connect to first host only */ NONE("") { public Optional getAvailableHost( diff --git a/src/test/java/org/mariadb/jdbc/integration/RedirectionTest.java b/src/test/java/org/mariadb/jdbc/integration/RedirectionTest.java index 8ddc3be9f..83257461a 100644 --- a/src/test/java/org/mariadb/jdbc/integration/RedirectionTest.java +++ b/src/test/java/org/mariadb/jdbc/integration/RedirectionTest.java @@ -19,9 +19,7 @@ public class RedirectionTest extends Common { void basicRedirection() throws Exception { Connection connection = createProxyCon(HaMode.NONE, ""); - Assertions.assertEquals( - String.format("address=(host=localhost)(port=%s)(type=primary)", proxy.getLocalPort()), - connection.__test_host()); + Assertions.assertEquals("localhost:" + proxy.getLocalPort(), connection.__test_host()); boolean permitRedirection = true; Statement stmt = connection.createStatement(); try { @@ -36,8 +34,7 @@ void basicRedirection() throws Exception { if (permitRedirection) { Assertions.assertEquals( - String.format("address=(host=%s)(port=%s)(type=primary)", hostname, port), - connection.__test_host()); + port == 3306 ? hostname : hostname + ":" + port, connection.__test_host()); } connection.close(); proxy.stop(); @@ -47,9 +44,7 @@ void basicRedirection() throws Exception { void redirectionDuringTransaction() throws Exception { Connection connection = createProxyCon(HaMode.NONE, ""); - Assertions.assertEquals( - String.format("address=(host=localhost)(port=%s)(type=primary)", proxy.getLocalPort()), - connection.__test_host()); + Assertions.assertEquals("localhost:" + proxy.getLocalPort(), connection.__test_host()); boolean permitRedirection = true; Statement stmt = connection.createStatement(); @@ -63,14 +58,11 @@ void redirectionDuringTransaction() throws Exception { ResultSet rs = stmt.executeQuery("SELECT 1"); Assertions.assertTrue(rs.next()); Assertions.assertEquals(1, rs.getInt(1)); - Assertions.assertEquals( - String.format("address=(host=localhost)(port=%s)(type=primary)", proxy.getLocalPort()), - connection.__test_host()); + Assertions.assertEquals("localhost:" + proxy.getLocalPort(), connection.__test_host()); connection.commit(); if (permitRedirection) { Assertions.assertEquals( - String.format("address=(host=%s)(port=%s)(type=primary)", hostname, port), - connection.__test_host()); + port == 3306 ? hostname : hostname + ":" + port, connection.__test_host()); } rs = stmt.executeQuery("SELECT 1"); Assertions.assertTrue(rs.next()); diff --git a/src/test/java/org/mariadb/jdbc/unit/export/HaModeTest.java b/src/test/java/org/mariadb/jdbc/unit/export/HaModeTest.java index 08f6ace5d..c1a588eea 100644 --- a/src/test/java/org/mariadb/jdbc/unit/export/HaModeTest.java +++ b/src/test/java/org/mariadb/jdbc/unit/export/HaModeTest.java @@ -24,9 +24,9 @@ private void getAvailableHostWithoutConnectionNumber(HaMode haMode) { hostAddresses.add(HostAddress.from("prim1", 3306, true)); hostAddresses.add(HostAddress.from("prim2", 3306, true)); hostAddresses.add(HostAddress.from("prim3", 3306, true)); - hostAddresses.add(HostAddress.from("slave1", 3306, false)); - hostAddresses.add(HostAddress.from("slave2", 3306, false)); - hostAddresses.add(HostAddress.from("slave3", 3306, false)); + hostAddresses.add(HostAddress.from("replica1", 3306, false)); + hostAddresses.add(HostAddress.from("replica2", 3306, false)); + hostAddresses.add(HostAddress.from("replica3", 3306, false)); haMode.resetLast(); ConcurrentMap denyList = new ConcurrentHashMap<>(); @@ -43,7 +43,7 @@ private void getAvailableHostWithoutConnectionNumber(HaMode haMode) { Optional availHost = haMode.getAvailableHost(hostAddresses, denyList, false); if (availHost.isPresent()) hostCounter.add(availHost.get(), false); } - assertEquals("slave1:34,slave2:33,slave3:33", hostCounter.results()); + assertEquals("replica1:34,replica2:33,replica3:33", hostCounter.results()); haMode.resetLast(); denyList.put(hostAddresses.get(0), System.currentTimeMillis() - 100); @@ -65,7 +65,59 @@ private void getAvailableHostWithoutConnectionNumber(HaMode haMode) { Optional availHost = haMode.getAvailableHost(hostAddresses, denyList, false); if (availHost.isPresent()) hostCounter.add(availHost.get(), false); } - assertEquals("slave1:50,slave3:50", hostCounter.results()); + assertEquals("replica1:50,replica3:50", hostCounter.results()); + } + + @Test + public void loadBalanceRead() { + + HaMode haMode = HaMode.LOAD_BALANCE_READ; + List hostAddresses = new ArrayList<>(); + hostAddresses.add(HostAddress.from("prim1", 3306, true)); + hostAddresses.add(HostAddress.from("prim2", 3306, true)); + hostAddresses.add(HostAddress.from("prim3", 3306, true)); + hostAddresses.add(HostAddress.from("replica1", 3306, false)); + hostAddresses.add(HostAddress.from("replica2", 3306, false)); + hostAddresses.add(HostAddress.from("replica3", 3306, false)); + + haMode.resetLast(); + ConcurrentMap denyList = new ConcurrentHashMap<>(); + HostCounter hostCounter = new HostCounter(); + for (int i = 0; i < 100; i++) { + Optional availHost = haMode.getAvailableHost(hostAddresses, denyList, true); + if (availHost.isPresent()) hostCounter.add(availHost.get(), false); + } + assertEquals("prim1:100", hostCounter.results()); + + haMode.resetLast(); + hostCounter = new HostCounter(); + for (int i = 0; i < 100; i++) { + Optional availHost = haMode.getAvailableHost(hostAddresses, denyList, false); + if (availHost.isPresent()) hostCounter.add(availHost.get(), false); + } + assertEquals("replica1:34,replica2:33,replica3:33", hostCounter.results()); + + haMode.resetLast(); + denyList.put(hostAddresses.get(1), System.currentTimeMillis() + 1000); + denyList.put(hostAddresses.get(0), System.currentTimeMillis() - 100); + + hostCounter = new HostCounter(); + for (int i = 0; i < 100; i++) { + Optional availHost = haMode.getAvailableHost(hostAddresses, denyList, true); + if (availHost.isPresent()) hostCounter.add(availHost.get(), false); + } + assertEquals("prim1:100", hostCounter.results()); + + haMode.resetLast(); + denyList.clear(); + denyList.put(hostAddresses.get(3), System.currentTimeMillis() - 100); + denyList.put(hostAddresses.get(4), System.currentTimeMillis() + 1000); + hostCounter = new HostCounter(); + for (int i = 0; i < 100; i++) { + Optional availHost = haMode.getAvailableHost(hostAddresses, denyList, false); + if (availHost.isPresent()) hostCounter.add(availHost.get(), false); + } + assertEquals("replica1:50,replica3:50", hostCounter.results()); } @Test @@ -86,15 +138,15 @@ private void getAvailableHostWithConnectionNumber(HaMode haMode) { hostAddresses.add(host1); hostAddresses.add(host2); hostAddresses.add(host3); - HostAddress slave1 = HostAddress.from("slave1", 3306, false); - HostAddress slave2 = HostAddress.from("slave2", 3306, false); - HostAddress slave3 = HostAddress.from("slave3", 3306, false); - slave1.setThreadsConnected(200); - slave2.setThreadsConnected(150); - slave3.setThreadsConnected(100); - hostAddresses.add(slave1); - hostAddresses.add(slave2); - hostAddresses.add(slave3); + HostAddress replica1 = HostAddress.from("replica1", 3306, false); + HostAddress replica2 = HostAddress.from("replica2", 3306, false); + HostAddress replica3 = HostAddress.from("replica3", 3306, false); + replica1.setThreadsConnected(200); + replica2.setThreadsConnected(150); + replica3.setThreadsConnected(100); + hostAddresses.add(replica1); + hostAddresses.add(replica2); + hostAddresses.add(replica3); ConcurrentMap denyList = new ConcurrentHashMap<>(); HostCounter hostCounter = new HostCounter(); @@ -114,15 +166,15 @@ private void getAvailableHostWithConnectionNumber(HaMode haMode) { } assertEquals("prim1:34,prim2:33,prim3:33", hostCounter.results()); - slave1.setThreadsConnected(200); - slave2.setThreadsConnected(150); - slave3.setThreadsConnected(100); + replica1.setThreadsConnected(200); + replica2.setThreadsConnected(150); + replica3.setThreadsConnected(100); hostCounter = new HostCounter(); for (int i = 0; i < 100; i++) { Optional availHost = haMode.getAvailableHost(hostAddresses, denyList, false); if (availHost.isPresent()) hostCounter.add(availHost.get(), true); } - assertEquals("slave2:25,slave3:75", hostCounter.results()); + assertEquals("replica2:25,replica3:75", hostCounter.results()); denyList.put(hostAddresses.get(0), System.currentTimeMillis() - 100); denyList.put(hostAddresses.get(1), System.currentTimeMillis() + 1000); @@ -139,15 +191,15 @@ private void getAvailableHostWithConnectionNumber(HaMode haMode) { denyList.clear(); denyList.put(hostAddresses.get(3), System.currentTimeMillis() - 100); denyList.put(hostAddresses.get(4), System.currentTimeMillis() + 1000); - slave1.setThreadsConnected(150); - slave2.setThreadsConnected(150); - slave3.setThreadsConnected(100); + replica1.setThreadsConnected(150); + replica2.setThreadsConnected(150); + replica3.setThreadsConnected(100); hostCounter = new HostCounter(); for (int i = 0; i < 100; i++) { Optional availHost = haMode.getAvailableHost(hostAddresses, denyList, false); if (availHost.isPresent()) hostCounter.add(availHost.get(), true); } - assertEquals("slave1:25,slave3:75", hostCounter.results()); + assertEquals("replica1:25,replica3:75", hostCounter.results()); } private static class HostCounter { diff --git a/src/test/java/org/mariadb/jdbc/unit/util/ConfigurationTest.java b/src/test/java/org/mariadb/jdbc/unit/util/ConfigurationTest.java index fa2c9cf57..44b263640 100644 --- a/src/test/java/org/mariadb/jdbc/unit/util/ConfigurationTest.java +++ b/src/test/java/org/mariadb/jdbc/unit/util/ConfigurationTest.java @@ -154,7 +154,22 @@ public void testUrl() throws SQLException { .socketTimeout(50) .build(); assertEquals( - "jdbc:mariadb:loadbalance://address=(host=local)(port=3306)(type=primary),address=(host=local)(port=3307)(type=primary),address=(host=local)(port=3308)(type=primary)/DB?socketTimeout=50", + "jdbc:mariadb:loadbalance://local,local:3307,local:3308/DB?socketTimeout=50", + conf.initialUrl()); + + conf = + new Configuration.Builder() + .database("DB") + .addHost("local1", 3306) + .addHost("local2", 3306) + .addHost("local3", 3306) + .addHost("local4", 3306, false) + .addHost("local5", 3306, false) + .haMode(HaMode.LOADBALANCE) + .socketTimeout(50) + .build(); + assertEquals( + "jdbc:mariadb:loadbalance://local1,local2,local3,address=(host=local4)(type=replica),address=(host=local5)(type=replica)/DB?socketTimeout=50", conf.initialUrl()); conf = @@ -170,7 +185,7 @@ public void testUrl() throws SQLException { @Test public void testPipeSocket() throws SQLException { String url = - "jdbc:mariadb:sequential://address=(pipe=Mariadb106),address=(localSocket=/socket),address=(host=local)(port=3306)(type=primary)/DB?socketTimeout=50"; + "jdbc:mariadb:sequential://address=(pipe=Mariadb106),address=(localSocket=/socket),local/DB?socketTimeout=50"; Configuration conf = new Configuration.Builder() .database("DB") @@ -185,10 +200,36 @@ public void testPipeSocket() throws SQLException { assertEquals(url, conf.initialUrl()); } + @Test + public void testSequentialWrite() throws SQLException { + String url = + "jdbc:mariadb:load-balance-read://127.0.0.5,127.0.0.6,address=(host=127.0.0.7)(type=replica),address=(host=127.0.0.8)(type=replica)/DB?socketTimeout=50"; + Configuration conf = + new Configuration.Builder() + .database("DB") + .addHost("127.0.0.5", 3306) + .addHost("127.0.0.6", 3306) + .addHost("127.0.0.7", 3306, false) + .addHost("127.0.0.8", 3306, false) + .haMode(HaMode.LOAD_BALANCE_READ) + .socketTimeout(50) + .build(); + assertEquals(url, conf.initialUrl()); + conf = Configuration.parse(url); + assertEquals(url, conf.initialUrl()); + + // alias + assertEquals( + url, + Configuration.parse( + "jdbc:mariadb:load-balance-read://127.0.0.5,127.0.0.6,address=(host=127.0.0.7)(type=replica),address=(host=127.0.0.8)(type=replica)/DB?socketTimeout=50") + .initialUrl()); + } + @Test public void testPipeSocketSsl() throws SQLException { String url = - "jdbc:mariadb:sequential://address=(pipe=Mariadb106),address=(localSocket=/socket),address=(host=local)(port=3306)(sslMode=verify-full)(type=primary)/DB?socketTimeout=50"; + "jdbc:mariadb:sequential://address=(pipe=Mariadb106),address=(localSocket=/socket),address=(host=local)(sslMode=verify-full)(type=primary)/DB?socketTimeout=50"; Configuration conf = new Configuration.Builder() .database("DB") @@ -617,13 +658,14 @@ public void testJdbcParserParameter() throws SQLException { @Test public void address() { - assertEquals("address=(host=test)(port=3306)", HostAddress.from("test", 3306).toString()); + assertEquals("test", HostAddress.from("test", 3306).toString()); + assertEquals("test:3304", HostAddress.from("test", 3304).toString()); assertEquals( - "address=(host=test)(port=3306)(type=replica)", - HostAddress.from("test", 3306, false).toString()); + "address=(host=test)(type=replica)", HostAddress.from("test", 3306, false).toString()); assertEquals( - "address=(host=test)(port=3306)(type=primary)", - HostAddress.from("test", 3306, true).toString()); + "address=(host=test)(port=3307)(type=replica)", + HostAddress.from("test", 3307, false).toString()); + assertEquals("test", HostAddress.from("test", 3306, true).toString()); } @Test @@ -989,14 +1031,13 @@ public void toConf() throws SQLException { .startsWith( "Configuration:\n" + " * resulting Url :" - + " jdbc:mariadb:loadbalance://address=(host=host1)(port=3305)(type=primary),address=(host=host2)(port=3307)(type=replica)/db?user=me&password=***&nonExisting=&nonExistingWithValue=tt&autocommit=false&createDatabaseIfNotExist=true\n" + + " jdbc:mariadb:loadbalance://host1:3305,address=(host=host2)(port=3307)(type=replica)/db?user=me&password=***&nonExisting=&nonExistingWithValue=tt&autocommit=false&createDatabaseIfNotExist=true\n" + "Unknown options : \n" + " * nonExisting : \n" + " * nonExistingWithValue : tt\n" + "\n" + "Non default options : \n" - + " * addresses : [address=(host=host1)(port=3305)(type=primary)," - + " address=(host=host2)(port=3307)(type=replica)]\n" + + " * addresses : [host1:3305, address=(host=host2)(port=3307)(type=replica)]\n" + " * autocommit : false\n" + " * createDatabaseIfNotExist : true\n" + " * database : db\n" @@ -1028,7 +1069,7 @@ public void toConf() throws SQLException { + " * user : root\n" + "\n" + "default options :\n" - + " * addresses : [address=(host=localhost)(port=3306)(type=primary)]\n" + + " * addresses : [localhost]\n" + " * allowLocalInfile : true")); } }