Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Circuitbreaker #7532

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion modules/cells/src/test/java/dmg/util/ExceptionsTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void shouldWapWithMessageIfExceptionHasNoStringThrowableConstructor() {

assertThat(wrapped, is(notNullValue()));
assertThat(wrapped.getMessage(), is(equalTo("Wrapped message: Something went wrong")));
assertThat(wrapped.getCause(), is(nullValue()));
//assertThat(wrapped.getCause(), is(nullValue()));
assertThat(wrapped.getClass(), is(equalTo(SocketException.class)));

assertThat(_log, is(empty()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,20 @@ private static class Entry implements Serializable {

private static final long serialVersionUID = -6380756950554320179L;

private boolean _enabled = true;
private long _serialId;
private int _trustScore;

private final long timestamp;
private final PoolCostInfo _info;
private double _fakeCpu = -1.0;
private final ImmutableMap<String, String> _tagMap;
private final CellAddressCore _address;

public Entry(CellAddressCore address, PoolCostInfo info, Map<String, String> tagMap) {
public Entry(CellAddressCore address, PoolCostInfo info, long serialId, int trustScore, boolean enabled, Map<String, String> tagMap) {
_enabled = enabled;
_trustScore = trustScore;
_serialId = serialId;
timestamp = System.currentTimeMillis();
_address = address;
_info = info;
Expand All @@ -83,16 +90,64 @@ public ImmutableMap<String, String> getTagMap() {
public PoolInfo getPoolInfo() {
return new PoolInfo(_address, _info, _tagMap);
}

public long getSerialId() {
return _serialId;
}

public int getTrustScore() {
return _trustScore;
}

public boolean getEnabledStatus() {
return _enabled;
}
}
public boolean getPoolStatus (String poolName) {
return _hash.get(poolName).getEnabledStatus();

}

public synchronized void messageArrived(CellMessage envelope, PoolManagerPoolUpMessage msg) {
// CostModuleTest#testPoolCircuitbreaker depends on these vaules beeing as they are.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Obviously, we don't want test related constants to be a part of the main codebase.

// Should they be changed, the logic of the test needs to be altered to reflect the changes.
int tsIncrease = 16; // W/ a threshold of 35 and tsDecrease of 1.5, after the threshold is reached it takes to good heartbeats to re-enable.
int tsDecrease = 1.5;
int tsThreshold = 35; // After the third consecutive reboot a pool is disabled.
int tsCeiling = 150; // After Ceiling is reached, it takes 4 good heartbeats to re-enable.

long msgSerialId = msg.getSerialId();
int nextTrustScore = 0;
boolean nextEnabledStatus = true;

CellAddressCore poolAddress = envelope.getSourceAddress();
String poolName = msg.getPoolName();
PoolV2Mode poolMode = msg.getPoolMode();
PoolCostInfo newInfo = msg.getPoolCostInfo();
Entry poolEntry = _hash.get(poolName);
boolean isNewPool = poolEntry == null;

if (!isNewPool) { // Only check for reboots if the pool is not new
int lastTrustScore = poolEntry.getTrustScore();
long lastSerailId = poolEntry.getSerialId();

if (msgSerialId == lastSerailId) { // Pool has not rebooted
nextTrustScore = lastTrustScore/tsDecrease;
if (nextTrustScore < tsThreshold && !poolEntry.getEnabledStatus()) { // Pool was disabled, should now be re-ENABLED
LOGGER.error("Pool {} WOULD now be re-ENABLED, BUT IS NOT", poolName);
}

} else { // Pool has rebooted
if (lastTrustScore < tsCeiling) {nextTrustScore = lastTrustScore + tsIncrease;} // INCREASE trust score as long as it is not higher than the ceiling
LOGGER.error("Pool {} rebooted and changed ID from {} to {}, Trust Score now at {}", poolName, lastSerailId, msgSerialId, lastTrustScore);

if (nextTrustScore > tsThreshold) { // Set pool as DISABLED
nextEnabledStatus = false;
LOGGER.error("Pool {} WOULD now marked as DISABLED, BUT IS NOT", poolName);
}
}
}

/* Whether the pool mentioned in the message should be removed */
boolean shouldRemovePool = poolMode.getMode() == PoolV2Mode.DISABLED ||
poolMode.isDisabled(PoolV2Mode.DISABLED_STRICT) ||
Expand All @@ -108,7 +163,7 @@ public synchronized void messageArrived(CellMessage envelope, PoolManagerPoolUpM
if (shouldRemovePool) {
_hash.remove(poolName);
} else if (newInfo != null) {
_hash.put(poolName, new Entry(poolAddress, newInfo, msg.getTagMap()));
_hash.put(poolName, new Entry(poolAddress, newInfo, msgSerialId, nextTrustScore, nextEnabledStatus, msg.getTagMap()));
}
}

Expand Down Expand Up @@ -355,4 +410,4 @@ public synchronized Map<String, PoolInfo> getPoolInfoAsMap(Iterable<String> pool
private synchronized void writeObject(ObjectOutputStream stream) throws IOException {
stream.defaultWriteObject();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package org.dcache.tests.poolmanager;

import static org.dcache.util.ByteUnit.GiB;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.*;

import diskCacheV111.poolManager.CostModuleV1;
import diskCacheV111.pools.PoolCostInfo;
Expand Down Expand Up @@ -197,6 +194,45 @@ public void testTwoPoolsThenPercentile() {
assertPercentileCost(FRACTION_JUST_BELOW_ONE, maxPerfCost);
}

// Depends on hardcoded values of CostModuleV1#messageArrived(CellMessage, PoolManagerPoolUpMassage)
@Test
public void testPoolCircuitbreaker() throws InterruptedException {
PoolManagerPoolUpMessage currentMessage = getMessagePool(POOL_NAME);

for (int i = 0; i < 4; i++) {
_costModule.messageArrived(buildEnvelope(POOL_ADDRESS), currentMessage);
currentMessage = getMessagePool(POOL_NAME);
Thread.sleep(1);
}
_costModule.messageArrived(buildEnvelope(POOL_ADDRESS), currentMessage);

assertFalse(_costModule.getPoolStatus(POOL_NAME));

for (int i = 0; i < 1; i++) {
_costModule.messageArrived(buildEnvelope(POOL_ADDRESS),currentMessage);
}
assertTrue(_costModule.getPoolStatus(POOL_NAME));

currentMessage = getMessagePool(POOL_NAME);
_costModule.messageArrived(buildEnvelope(POOL_ADDRESS), currentMessage);
assertFalse(_costModule.getPoolStatus(POOL_NAME));

_costModule.messageArrived(buildEnvelope(POOL_ADDRESS), currentMessage);
_costModule.messageArrived(buildEnvelope(POOL_ADDRESS), currentMessage);
currentMessage = getMessagePool(POOL_NAME);
_costModule.messageArrived(buildEnvelope(POOL_ADDRESS), currentMessage);
assertTrue(_costModule.getPoolStatus(POOL_NAME));
}

private PoolManagerPoolUpMessage getMessagePool(String poolName) {
return buildPoolUpMessageWithCostAndQueue(
poolName,
100, 20, 30, 50,
40, 100, 0,
0, 0, 0,
0, 0, 0);
}


@Test
public void testThreePoolsThenPercentile() {
Expand Down