Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Circuitbreaker #7532

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion modules/cells/src/test/java/dmg/util/ExceptionsTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void shouldWapWithMessageIfExceptionHasNoStringThrowableConstructor() {

assertThat(wrapped, is(notNullValue()));
assertThat(wrapped.getMessage(), is(equalTo("Wrapped message: Something went wrong")));
assertThat(wrapped.getCause(), is(nullValue()));
//assertThat(wrapped.getCause(), is(nullValue()));
assertThat(wrapped.getClass(), is(equalTo(SocketException.class)));

assertThat(_log, is(empty()));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package diskCacheV111.poolManager;

import com.google.common.collect.ImmutableMap;
import java.util.Properties;
import diskCacheV111.pools.PoolCostInfo;
import diskCacheV111.pools.PoolV2Mode;
import diskCacheV111.vehicles.CostModulePoolInfoTable;
Expand Down Expand Up @@ -44,6 +45,42 @@ public class CostModuleV1
private boolean _cachedPercentileCostCutIsValid;
private double _cachedPercentileCostCut;
private double _cachedPercentileFraction;
private int _tsIncrease;
private int _trustScoreIncrease;

private int _tsDecrease;
private int _trustScoreDecrease;

private int _tsThreshold;
private int _trustScoreThreshold;

private int _tsCeiling;
private int _trustScoreCeiling;

public void setTrustScoreIncrease(int TrustScoreIncrease) {
_trustScoreIncrease = TrustScoreIncrease;
}
public void setTrustScoreDecrease(int TrustScoreDecrease) {
_trustScoreDecrease = TrustScoreDecrease;
}
public void setTrustScoreThreshold(int TrustScoreThreshold) {
_trustScoreThreshold = TrustScoreThreshold;
}
public void setTrustScoreCeiling(int TrustScoreCeiling) {
_trustScoreCeiling = TrustScoreCeiling;
}
public int getTrustScoreIncrease() {
return _trustScoreIncrease;
}
public int getTrustScoreDecrease() {
return _trustScoreDecrease;
}
public int getTrustScoreThreshold() {
return _trustScoreThreshold;
}
public int getTrustScoreCeiling() {
return _trustScoreCeiling;
}

/**
* Information about some specific pool.
Expand All @@ -52,13 +89,20 @@ private static class Entry implements Serializable {

private static final long serialVersionUID = -6380756950554320179L;

private boolean _enabled = true;
private long _serialId;
private int _trustScore;

private final long timestamp;
private final PoolCostInfo _info;
private double _fakeCpu = -1.0;
private final ImmutableMap<String, String> _tagMap;
private final CellAddressCore _address;

public Entry(CellAddressCore address, PoolCostInfo info, Map<String, String> tagMap) {
public Entry(CellAddressCore address, PoolCostInfo info, long serialId, int trustScore, boolean enabled, Map<String, String> tagMap) {
_enabled = enabled;
_trustScore = trustScore;
_serialId = serialId;
timestamp = System.currentTimeMillis();
_address = address;
_info = info;
Expand All @@ -83,15 +127,69 @@ public ImmutableMap<String, String> getTagMap() {
public PoolInfo getPoolInfo() {
return new PoolInfo(_address, _info, _tagMap);
}

public long getSerialId() {
return _serialId;
}

public int getTrustScore() {
return _trustScore;
}

public boolean getEnabledStatus() {
return _enabled;
}
}
public boolean getPoolStatus (String poolName) {
return _hash.get(poolName).getEnabledStatus();

}

public synchronized void messageArrived(CellMessage envelope, PoolManagerPoolUpMessage msg) {
// TODO: Refactor those variables out into a config

//int tsIncrease = 16; // W/ a threshold of 35 and tsDecrease of 1.5, after the threshold is reached it takes two good heartbeats to re-enable.
//int tsDecrease = 2; //1.5;
//int tsThreshold = 35; // After the third consecutive reboot a pool is disabled.
//int tsCeiling = 150; // After Ceiling is reached, it takes 4 good heartbeats to re-enable.

long msgSerialId = msg.getSerialId();
int nextTrustScore = 0;
boolean nextEnabledStatus = true;

CellAddressCore poolAddress = envelope.getSourceAddress();
String poolName = msg.getPoolName();
PoolV2Mode poolMode = msg.getPoolMode();
PoolCostInfo newInfo = msg.getPoolCostInfo();
Entry poolEntry = _hash.get(poolName);
boolean isNewPool = poolEntry == null;
boolean trustScoreThresholdReached = false;

// TODO: To much indentation
if (!isNewPool) { // Only check for reboots if the pool is not new
int lastTrustScore = poolEntry.getTrustScore();
long lastSerailId = poolEntry.getSerialId();

if (msgSerialId == lastSerailId) { // Pool has not rebooted
nextTrustScore = lastTrustScore / _trustScoreDecrease;
if (nextTrustScore < _trustScoreThreshold && !poolEntry.getEnabledStatus()) { // Pool was disabled, should now be re-ENABLED
LOGGER.error("Pool {} WOULD now be re-ENABLED, BUT IS NOT", poolName);
// TODO: enable here
}

} else { // Pool has rebooted
if (lastTrustScore < _trustScoreCeiling) {
nextTrustScore = lastTrustScore + _trustScoreIncrease;
} // INCREASE trust score as long as it is not higher than the ceiling
LOGGER.error("Pool {} rebooted and changed ID from {} to {}, Trust Score now at {}", poolName, lastSerailId, msgSerialId, lastTrustScore);

if (nextTrustScore > _trustScoreThreshold) { // Set pool as DISABLED
nextEnabledStatus = false;
LOGGER.error("Pool {} WOULD now marked as DISABLED, BUT IS NOT", poolName);
// TODO: disable here
}
}
}

/* Whether the pool mentioned in the message should be removed */
boolean shouldRemovePool = poolMode.getMode() == PoolV2Mode.DISABLED ||
Expand All @@ -108,7 +206,7 @@ public synchronized void messageArrived(CellMessage envelope, PoolManagerPoolUpM
if (shouldRemovePool) {
_hash.remove(poolName);
} else if (newInfo != null) {
_hash.put(poolName, new Entry(poolAddress, newInfo, msg.getTagMap()));
_hash.put(poolName, new Entry(poolAddress, newInfo, msgSerialId, nextTrustScore, nextEnabledStatus, msg.getTagMap()));
}
}

Expand Down Expand Up @@ -355,4 +453,4 @@ public synchronized Map<String, PoolInfo> getPoolInfoAsMap(Iterable<String> pool
private synchronized void writeObject(ObjectOutputStream stream) throws IOException {
stream.defaultWriteObject();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@

<bean id="cm" class="diskCacheV111.poolManager.CostModuleV1">
<description>Cost module</description>
<property name="trustScoreIncrease" value="16"/>
<property name="trustScoreDecrease" value="2"/>
<property name="trustScoreThreshold" value="35"/>
<property name="trustScoreCeiling" value="150"/>
</bean>

<bean id="pm" class="org.dcache.poolmanager.PartitionManager">
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package org.dcache.tests.poolmanager;

import static org.dcache.util.ByteUnit.GiB;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.*;

import diskCacheV111.poolManager.CostModuleV1;
import diskCacheV111.pools.PoolCostInfo;
Expand Down Expand Up @@ -197,6 +194,53 @@ public void testTwoPoolsThenPercentile() {
assertPercentileCost(FRACTION_JUST_BELOW_ONE, maxPerfCost);
}

@Test
public void testPoolCircuitbreaker() throws InterruptedException {
int trustScoreIncrease = _costModule.getTrustScoreIncrease();
int trustScoreDecrease = _costModule.getTrustScoreDecrease();
int trustScoreThreshold = _costModule.getTrustScoreThreshold();
int trustScoreCeiling = _costModule.getTrustScoreCeiling();
PoolManagerPoolUpMessage msg = getMessagePool(POOL_NAME);

// Get tho the threshold no mater what it might be
for (int i = 0; i < trustScoreThreshold; i += trustScoreIncrease) {
msg = deadHeartbeat(msg, POOL_NAME, POOL_ADDRESS);
}
assertFalse(_costModule.getPoolStatus(POOL_NAME));

// Reset to the minimum trust value
msg = aliveHeartbeat(msg, POOL_NAME, POOL_ADDRESS);
assertTrue(_costModule.getPoolStatus(POOL_NAME));

// Those tests are coupled to specific values of CostModuleV1#messageArrived(CellMessage, PoolManagerPoolUpMassage)
// msg = deadHeartbeat(msg, POOL_NAME, POOL_ADDRESS);
// assertFalse(_costModule.getPoolStatus(POOL_NAME));
//
// msg = aliveHeartbeat(msg, POOL_NAME, POOL_ADDRESS);
// msg = aliveHeartbeat(msg, POOL_NAME, POOL_ADDRESS);
// msg = deadHeartbeat(msg, POOL_NAME, POOL_ADDRESS);
// assertTrue(_costModule.getPoolStatus(POOL_NAME));
}

private PoolManagerPoolUpMessage deadHeartbeat(PoolManagerPoolUpMessage message, String poolName, CellAddressCore poolAddress) {
message = getMessagePool(poolName);
_costModule.messageArrived(buildEnvelope(poolAddress), message);
return message;
}
private PoolManagerPoolUpMessage aliveHeartbeat(PoolManagerPoolUpMessage message, String poolName, CellAddressCore poolAddress) {
_costModule.messageArrived(buildEnvelope(poolAddress), message);
return message;
}

private PoolManagerPoolUpMessage getMessagePool(String poolName) {
return buildPoolUpMessageWithCostAndQueue(
poolName,
100, 20, 30, 50,
40, 100, 0,
0, 0, 0,
0, 0, 0);
}


@Test
public void testThreePoolsThenPercentile() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<plugins>
<plugin>
<name>oidc-te</name>
<class>org.dcache.gplazma.plugins.tokenx.TokenExchange</class>
<class>org.dcache.gplazma.tokenx.TokenExchange</class>
</plugin>
</plugins>
14 changes: 8 additions & 6 deletions packages/system-test/src/main/skel/etc/layouts/system-test.conf
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
system-test.home=${dcache.home}

dcache.broker.scheme=none
dcache.pid.dir=/tmp
dcache.java.memory.heap=1024m
dcache.enable.space-reservation=true
Expand Down Expand Up @@ -44,6 +43,7 @@ billing.enable.db=true
dcache.enable.quota=true

[dCacheDomain]
dcache.broker.scheme=core
# The following is defined for the domain to prevent that the CLI
# applications enable the debugging options.
dcache.java.options.extra=-Xdebug -agentlib:jdwp=transport=dt_socket,server=y,address=localhost:2299,suspend=n -XX:+TieredCompilation
Expand Down Expand Up @@ -74,11 +74,6 @@ srmmanager.net.host=localhost
srmmanager.expired-job-period = 30
srmmanager.expired-job-period.unit = SECONDS

[dCacheDomain/pool]
pool.name=pool_write
pool.path=${system-test.home}/var/pools/pool_write
pool.plugins.meta=org.dcache.pool.repository.meta.file.FileMetaDataRepository

[dCacheDomain/pool]
pool.name=pool_read
pool.path=${system-test.home}/var/pools/pool_read
Expand Down Expand Up @@ -246,3 +241,10 @@ nfs.enable.access-log=FULL
[dCacheDomain/qos-verifier]
[dCacheDomain/qos-adjuster]
[dCacheDomain/qos-scanner]


[pool]
[pool/pool]
pool.name=pool_write
pool.path=${system-test.home}/var/pools/pool_write
pool.plugins.meta=org.dcache.pool.repository.meta.file.FileMetaDataRepository
2 changes: 2 additions & 0 deletions skel/share/defaults/poolmanager.properties
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,5 @@ poolmanager.request-notifier.timeout=1
# the caching of the selected pools.

(one-of?true|false)poolmanager.selection.unit.cachingenabeled = false

# Coment that explains wtf i am doing, or not i guess