Skip to content

Commit

Permalink
[CONJ-1084] load-balance distribution ensuring repartition after fail…
Browse files Browse the repository at this point in the history
…over
  • Loading branch information
rusher committed Jul 31, 2023
1 parent 78d868d commit 65d73e7
Show file tree
Hide file tree
Showing 4 changed files with 241 additions and 3 deletions.
22 changes: 20 additions & 2 deletions src/main/java/org/mariadb/jdbc/HostAddress.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

/** Host entry */
public class HostAddress {

private long CONNECTED_VALUE_TIMEOUT = 3*60*1000; // 3 minutes
/** host address */
public final String host;

Expand All @@ -22,7 +22,8 @@ public class HostAddress {
/** primary node */
public Boolean primary;

public Long threadsConnected;
private Long threadsConnected;
private Long threadConnectedTimeout;

/**
* Constructor.
Expand Down Expand Up @@ -192,4 +193,21 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(host, port, primary);
}

public void setThreadsConnected(long threadsConnected) {
this.threadsConnected = threadsConnected;
this.threadConnectedTimeout = System.currentTimeMillis() + CONNECTED_VALUE_TIMEOUT;
}

public Long getThreadsConnected() {
return threadsConnected;
}
public void forceThreadsConnected(long threadsConnected, long threadConnectedTimeout) {
this.threadsConnected = threadsConnected;
this.threadConnectedTimeout = threadConnectedTimeout;
}

public Long getThreadConnectedTimeout() {
return threadConnectedTimeout;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public void setThreadId(long connectionId) {
}

public void setTreadsConnected(long threadsConnected) {
if (hostAddress != null) hostAddress.threadsConnected = threadsConnected;
if (hostAddress != null) hostAddress.setThreadsConnected(threadsConnected);
}

public String getCharset() {
Expand Down
46 changes: 46 additions & 0 deletions src/main/java/org/mariadb/jdbc/export/HaMode.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ public Optional<HostAddress> getAvailableHost(
List<HostAddress> hostAddresses,
ConcurrentMap<HostAddress, Long> denyList,
boolean primary) {
HostAddress hostWithLessConnection = getHostWithLessConnections(hostAddresses, denyList, primary);
if (hostWithLessConnection != null) return Optional.of(hostWithLessConnection);
return HaMode.getAvailableRoundRobinHost(this, hostAddresses, denyList, primary);
}
},
Expand All @@ -37,6 +39,8 @@ public Optional<HostAddress> getAvailableHost(
List<HostAddress> hostAddresses,
ConcurrentMap<HostAddress, Long> denyList,
boolean primary) {
HostAddress hostWithLessConnection = getHostWithLessConnections(hostAddresses, denyList, primary);
if (hostWithLessConnection != null) return Optional.of(hostWithLessConnection);
return HaMode.getAvailableRoundRobinHost(this, hostAddresses, denyList, primary);
}
},
Expand All @@ -58,6 +62,14 @@ public Optional<HostAddress> getAvailableHost(
this.value = value;
}

/**
* For testing purpose only
*/
public void resetLast() {
lastRoundRobinPrimaryHost = null;
lastRoundRobinSecondaryHost = null;
}

/**
* Get HAMode from values or aliases
*
Expand Down Expand Up @@ -100,6 +112,40 @@ public static Optional<HostAddress> getAvailableHostInOrder(
return Optional.empty();
}

/**
* If all hosts not blacklisted connection number are known, choose the host with the less connections.
* @param hostAddresses host addresses
* @param denyList blacklist
* @param primary requires primary host
* @return the host with less connection, or null if unknown.
*/
public static HostAddress getHostWithLessConnections(List<HostAddress> hostAddresses, ConcurrentMap<HostAddress, Long> denyList, boolean primary) {
long currentTime = System.currentTimeMillis();
HostAddress hostAddressWithLessConnections = null;

for (HostAddress hostAddress : hostAddresses) {
if (hostAddress.primary == primary) {
if (denyList.containsKey(hostAddress)) {
// take in account denied server that have reached denied timeout
if (denyList.get(hostAddress) > System.currentTimeMillis()) {
continue;
} else {
denyList.remove(hostAddress);
}
}

// All host must have recently been connected
if (hostAddress.getThreadConnectedTimeout() == null || hostAddress.getThreadConnectedTimeout() < currentTime) {
return null;
}
if (hostAddressWithLessConnections == null || hostAddressWithLessConnections.getThreadsConnected() > hostAddress.getThreadsConnected()) {
hostAddressWithLessConnections = hostAddress;
}
}
}
return hostAddressWithLessConnections;
}

/**
* return hosts of corresponding type (primary or not) without blacklisted hosts. hosts in
* blacklist reaching blacklist timeout will be present, RoundRobin Order.
Expand Down
174 changes: 174 additions & 0 deletions src/test/java/org/mariadb/jdbc/unit/export/HaModeTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
// SPDX-License-Identifier: LGPL-2.1-or-later
// Copyright (c) 2012-2014 Monty Program Ab
// Copyright (c) 2023 MariaDB Corporation Ab

package org.mariadb.jdbc.unit.export;

import org.junit.jupiter.api.Test;
import org.mariadb.jdbc.HostAddress;
import org.mariadb.jdbc.export.HaMode;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class HaModeTest {
@Test
public void getAvailableHostWithoutConnectionNumber() {
getAvailableHostWithoutConnectionNumber(HaMode.REPLICATION);
getAvailableHostWithoutConnectionNumber(HaMode.LOADBALANCE);
}

private void getAvailableHostWithoutConnectionNumber(HaMode haMode) {
List<HostAddress> hostAddresses = new ArrayList<>();
hostAddresses.add(HostAddress.from("prim1", 3306, true));
hostAddresses.add(HostAddress.from("prim2", 3306, true));
hostAddresses.add(HostAddress.from("prim3", 3306, true));
hostAddresses.add(HostAddress.from("slave1", 3306, false));
hostAddresses.add(HostAddress.from("slave2", 3306, false));
hostAddresses.add(HostAddress.from("slave3", 3306, false));

haMode.resetLast();
ConcurrentMap<HostAddress, Long> denyList = new ConcurrentHashMap<>();
HostCounter hostCounter = new HostCounter();
for (int i = 0; i < 100; i++) {
hostCounter.add(haMode.getAvailableHost(hostAddresses, denyList, true).get(), false);
}
assertEquals("prim1:34,prim2:33,prim3:33", hostCounter.results());

haMode.resetLast();
hostCounter = new HostCounter();
for (int i = 0; i < 100; i++) {
hostCounter.add(haMode.getAvailableHost(hostAddresses, denyList, false).get(), false);
}
assertEquals("slave1:34,slave2:33,slave3:33", hostCounter.results());

haMode.resetLast();
denyList.put(hostAddresses.get(0), System.currentTimeMillis() - 100);
denyList.put(hostAddresses.get(1), System.currentTimeMillis() + 1000);

hostCounter = new HostCounter();
for (int i = 0; i < 100; i++) {
hostCounter.add(haMode.getAvailableHost(hostAddresses, denyList, true).get(), false);
}
assertEquals("prim1:50,prim3:50", hostCounter.results());

haMode.resetLast();
denyList.clear();
denyList.put(hostAddresses.get(3), System.currentTimeMillis() - 100);
denyList.put(hostAddresses.get(4), System.currentTimeMillis() + 1000);
hostCounter = new HostCounter();
for (int i = 0; i < 100; i++) {
hostCounter.add(haMode.getAvailableHost(hostAddresses, denyList, false).get(), false);
}
assertEquals("slave1:50,slave3:50", hostCounter.results());
}

@Test
public void getAvailableHostWithConnectionNumber() {
getAvailableHostWithConnectionNumber(HaMode.LOADBALANCE);
getAvailableHostWithConnectionNumber(HaMode.REPLICATION);
}
private void getAvailableHostWithConnectionNumber(HaMode haMode) {
List<HostAddress> hostAddresses = new ArrayList<>();

HostAddress host1 = HostAddress.from("prim1", 3306, true);
HostAddress host2 = HostAddress.from("prim2", 3306, true);
HostAddress host3 = HostAddress.from("prim3", 3306, true);
host1.setThreadsConnected(200);
host2.setThreadsConnected(150);
host3.setThreadsConnected(100);
hostAddresses.add(host1);
hostAddresses.add(host2);
hostAddresses.add(host3);
HostAddress slave1 = HostAddress.from("slave1", 3306, false);
HostAddress slave2 = HostAddress.from("slave2", 3306, false);
HostAddress slave3 = HostAddress.from("slave3", 3306, false);
slave1.setThreadsConnected(200);
slave2.setThreadsConnected(150);
slave3.setThreadsConnected(100);
hostAddresses.add(slave1);
hostAddresses.add(slave2);
hostAddresses.add(slave3);

ConcurrentMap<HostAddress, Long> denyList = new ConcurrentHashMap<>();
HostCounter hostCounter = new HostCounter();
for (int i = 0; i < 100; i++) {
hostCounter.add(haMode.getAvailableHost(hostAddresses, denyList, true).get(), true);
}
assertEquals("prim2:25,prim3:75", hostCounter.results());

host1.forceThreadsConnected(200, System.currentTimeMillis() - 50000);
host2.setThreadsConnected(150);
host3.setThreadsConnected(100);
hostCounter = new HostCounter();
for (int i = 0; i < 100; i++) {
hostCounter.add(haMode.getAvailableHost(hostAddresses, denyList, true).get(), true);
}
assertEquals("prim1:34,prim2:33,prim3:33", hostCounter.results());

slave1.setThreadsConnected(200);
slave2.setThreadsConnected(150);
slave3.setThreadsConnected(100);
hostCounter = new HostCounter();
for (int i = 0; i < 100; i++) {
hostCounter.add(haMode.getAvailableHost(hostAddresses, denyList, false).get(), true);
}
assertEquals("slave2:25,slave3:75", hostCounter.results());

denyList.put(hostAddresses.get(0), System.currentTimeMillis() - 100);
denyList.put(hostAddresses.get(1), System.currentTimeMillis() + 1000);
host1.setThreadsConnected(150);
host2.setThreadsConnected(150);
host3.setThreadsConnected(100);
hostCounter = new HostCounter();
for (int i = 0; i < 100; i++) {
hostCounter.add(haMode.getAvailableHost(hostAddresses, denyList, true).get(), true);
}
assertEquals("prim1:25,prim3:75", hostCounter.results());

denyList.clear();
denyList.put(hostAddresses.get(3), System.currentTimeMillis() - 100);
denyList.put(hostAddresses.get(4), System.currentTimeMillis() + 1000);
slave1.setThreadsConnected(150);
slave2.setThreadsConnected(150);
slave3.setThreadsConnected(100);
hostCounter = new HostCounter();
for (int i = 0; i < 100; i++) {
hostCounter.add(haMode.getAvailableHost(hostAddresses, denyList, false).get(), true);
}
assertEquals("slave1:25,slave3:75", hostCounter.results());
}


private class HostCounter {
Map<HostAddress, Integer> hosts = new HashMap<>();

public void add(HostAddress hostAddress, boolean increment) {
Integer counter = hosts.get(hostAddress);
if (counter == null) {
hosts.put(hostAddress, 1);
} else {
hosts.replace(hostAddress, counter + 1);
}
if (increment) {
if (hostAddress.getThreadsConnected() != null) {
hostAddress.forceThreadsConnected(hostAddress.getThreadsConnected() + 1, hostAddress.getThreadConnectedTimeout());
} else {
hostAddress.forceThreadsConnected(1, System.currentTimeMillis() + 1000);
}
}
}

public String results() {
List<String> res = new ArrayList<>();
for (Map.Entry<HostAddress, Integer> hostEntry : hosts.entrySet()) {
res.add(hostEntry.getKey().host + ':' +hostEntry.getValue());
}
Collections.sort(res);
return String.join(",", res);
}
}
}

0 comments on commit 65d73e7

Please sign in to comment.