Skip to content

Commit

Permalink
Merge branch 'master' into 0808-yuluo/test-2
Browse files Browse the repository at this point in the history
  • Loading branch information
tomsun28 authored Aug 8, 2024
2 parents 7fbdfcd + 3113487 commit 723dd81
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@
import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -34,16 +32,11 @@
*/
@Slf4j
public class ConnectionCommonCache<T, C extends AbstractConnection<?>> {

/**
* default cache time 200s
*/
private static final long DEFAULT_CACHE_TIMEOUT = 200 * 1000L;


/**
* default max cache num
* default cache time 600s
*/
private static final int DEFAULT_MAX_CAPACITY = 10000;
private static final long DEFAULT_CACHE_TIMEOUT = 600 * 1000L;

/**
* cacheTime length
Expand All @@ -60,19 +53,14 @@ public class ConnectionCommonCache<T, C extends AbstractConnection<?>> {
*/
private ConcurrentLinkedHashMap<T, C> cacheMap;

/**
* the executor who clean cache when timeout
*/
private ThreadPoolExecutor timeoutCleanerExecutor;

public ConnectionCommonCache() {
init();
initCache();
}

private void init() {
private void initCache() {
cacheMap = new ConcurrentLinkedHashMap
.Builder<T, C>()
.maximumWeightedCapacity(DEFAULT_MAX_CAPACITY)
.maximumWeightedCapacity(Integer.MAX_VALUE)
.listener((key, value) -> {
timeoutMap.remove(key);
try {
Expand All @@ -82,70 +70,37 @@ private void init() {
}
log.info("connection common cache discard key: {}, value: {}.", key, value);
}).build();
timeoutMap = new ConcurrentHashMap<>(DEFAULT_MAX_CAPACITY >> 6);
// last-first-coverage algorithm, run the first and last thread, discard mid
timeoutCleanerExecutor = new ThreadPoolExecutor(1, 1, 1, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1),
r -> new Thread(r, "connection-cache-timeout-cleaner"),
new ThreadPoolExecutor.DiscardOldestPolicy());
timeoutMap = new ConcurrentHashMap<>(16);
// init monitor available detector cyc task
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("connection-cache-ava-detector-%d")
.setNameFormat("connection-cache-timout-detector-%d")
.setDaemon(true)
.build();
ScheduledThreadPoolExecutor scheduledExecutor = new ScheduledThreadPoolExecutor(1, threadFactory);
scheduledExecutor.scheduleWithFixedDelay(this::detectCacheAvailable, 2, 20, TimeUnit.MINUTES);
scheduledExecutor.scheduleWithFixedDelay(this::cleanTimeoutOrUnHealthCache, 2, 100, TimeUnit.SECONDS);
}

/**
* detect all cache available, cleanup not ava connection
* clean and remove timeout cache
*/
private void detectCacheAvailable() {
private void cleanTimeoutOrUnHealthCache() {
try {
cacheMap.forEach((key, value) -> {
// index 0 is startTime, 1 is timeDiff
Long[] cacheTime = timeoutMap.get(key);
long currentTime = System.currentTimeMillis();
if (cacheTime == null || cacheTime.length != CACHE_TIME_LENGTH
|| cacheTime[0] + cacheTime[1] < currentTime) {
cacheMap.remove(key);
timeoutMap.remove(key);
try {
value.close();
} catch (Exception e) {
log.error("connection close error: {}.", e.getMessage(), e);
}

}
});
} catch (Exception e) {
log.error("connection common cache detect cache available error: {}.", e.getMessage(), e);
}
}

/**
* clean timeout cache
*/
private void cleanTimeoutCache() {
try {
cacheMap.forEach((key, value) -> {
// index 0 is startTime, 1 is timeDiff
Long[] cacheTime = timeoutMap.get(key);
long currentTime = System.currentTimeMillis();
if (cacheTime == null || cacheTime.length != CACHE_TIME_LENGTH) {
timeoutMap.put(key, new Long[]{currentTime, DEFAULT_CACHE_TIMEOUT});
} else if (cacheTime[0] + cacheTime[1] < currentTime) {
// timeout, remove this object cache
log.warn("[connection common cache] clean the timeout cache, key {}", key);
timeoutMap.remove(key);
cacheMap.remove(key);
try {
value.close();
} catch (Exception e) {
log.error("connection close error: {}.", e.getMessage(), e);
log.error("clean connection close error: {}.", e.getMessage(), e);
}
}
});
Thread.sleep(20 * 1000);
} catch (Exception e) {
log.error("[connection common cache] clean timeout cache error: {}.", e.getMessage(), e);
}
Expand All @@ -165,7 +120,6 @@ public void addCache(T key, C value, Long timeDiff) {
}
cacheMap.put(key, value);
timeoutMap.put(key, new Long[]{System.currentTimeMillis(), timeDiff});
timeoutCleanerExecutor.execute(this::cleanTimeoutCache);
}

/**
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1 +0,0 @@
org.apache.hertzbeat.plugin.impl.DemoPluginImpl

0 comments on commit 723dd81

Please sign in to comment.