From 7a7d4792cd29bf2f9aa6e242f720a0053fe93604 Mon Sep 17 00:00:00 2001 From: liutianyou Date: Thu, 8 Aug 2024 20:41:08 +0800 Subject: [PATCH 1/2] [improve] remove demo plugin (#2492) Co-authored-by: tomsun28 --- .../hertzbeat/plugin/impl/DemoPluginImpl.java | 40 ------------------- .../org.apache.hertzbeat.plugin.Plugin | 1 - 2 files changed, 41 deletions(-) delete mode 100644 plugin/src/main/java/org/apache/hertzbeat/plugin/impl/DemoPluginImpl.java diff --git a/plugin/src/main/java/org/apache/hertzbeat/plugin/impl/DemoPluginImpl.java b/plugin/src/main/java/org/apache/hertzbeat/plugin/impl/DemoPluginImpl.java deleted file mode 100644 index 7fca0b75844..00000000000 --- a/plugin/src/main/java/org/apache/hertzbeat/plugin/impl/DemoPluginImpl.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hertzbeat.plugin.impl; - -import lombok.extern.slf4j.Slf4j; -import org.apache.hertzbeat.common.entity.alerter.Alert; -import org.apache.hertzbeat.plugin.Plugin; - -/** - * DemoPlugin - */ -@Slf4j -public class DemoPluginImpl implements Plugin { - - /** - * execute when alert - */ - @Override - public void alert(Alert alert) { - if (log.isDebugEnabled()) { - log.debug("DemoPluginImpl alert: {}", alert); - } - } - -} diff --git a/plugin/src/main/resources/META-INF/services/org.apache.hertzbeat.plugin.Plugin b/plugin/src/main/resources/META-INF/services/org.apache.hertzbeat.plugin.Plugin index 8da17400556..e69de29bb2d 100644 --- a/plugin/src/main/resources/META-INF/services/org.apache.hertzbeat.plugin.Plugin +++ b/plugin/src/main/resources/META-INF/services/org.apache.hertzbeat.plugin.Plugin @@ -1 +0,0 @@ -org.apache.hertzbeat.plugin.impl.DemoPluginImpl From 31134871921740f1ca6cfdc05d9b69f9aadd4849 Mon Sep 17 00:00:00 2001 From: tomsun28 Date: Thu, 8 Aug 2024 22:35:51 +0800 Subject: [PATCH 2/2] [refactor] refactor connect common cache (#2469) Signed-off-by: tomsun28 Co-authored-by: crossoverJie --- .../common/cache/ConnectionCommonCache.java | 72 ++++--------------- 1 file changed, 13 insertions(+), 59 deletions(-) diff --git a/collector/src/main/java/org/apache/hertzbeat/collector/collect/common/cache/ConnectionCommonCache.java b/collector/src/main/java/org/apache/hertzbeat/collector/collect/common/cache/ConnectionCommonCache.java index dd2a97010de..d36941d9fee 100644 --- a/collector/src/main/java/org/apache/hertzbeat/collector/collect/common/cache/ConnectionCommonCache.java +++ b/collector/src/main/java/org/apache/hertzbeat/collector/collect/common/cache/ConnectionCommonCache.java @@ -21,11 +21,9 @@ import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap; import java.util.Map; import java.util.Optional; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; @@ -34,16 +32,11 @@ */ @Slf4j public class ConnectionCommonCache> { - - /** - * default cache time 200s - */ - private static final long DEFAULT_CACHE_TIMEOUT = 200 * 1000L; - + /** - * default max cache num + * default cache time 600s */ - private static final int DEFAULT_MAX_CAPACITY = 10000; + private static final long DEFAULT_CACHE_TIMEOUT = 600 * 1000L; /** * cacheTime length @@ -60,19 +53,14 @@ public class ConnectionCommonCache> { */ private ConcurrentLinkedHashMap cacheMap; - /** - * the executor who clean cache when timeout - */ - private ThreadPoolExecutor timeoutCleanerExecutor; - public ConnectionCommonCache() { - init(); + initCache(); } - private void init() { + private void initCache() { cacheMap = new ConcurrentLinkedHashMap .Builder() - .maximumWeightedCapacity(DEFAULT_MAX_CAPACITY) + .maximumWeightedCapacity(Integer.MAX_VALUE) .listener((key, value) -> { timeoutMap.remove(key); try { @@ -82,70 +70,37 @@ private void init() { } log.info("connection common cache discard key: {}, value: {}.", key, value); }).build(); - timeoutMap = new ConcurrentHashMap<>(DEFAULT_MAX_CAPACITY >> 6); - // last-first-coverage algorithm, run the first and last thread, discard mid - timeoutCleanerExecutor = new ThreadPoolExecutor(1, 1, 1, TimeUnit.SECONDS, - new ArrayBlockingQueue<>(1), - r -> new Thread(r, "connection-cache-timeout-cleaner"), - new ThreadPoolExecutor.DiscardOldestPolicy()); + timeoutMap = new ConcurrentHashMap<>(16); // init monitor available detector cyc task ThreadFactory threadFactory = new ThreadFactoryBuilder() - .setNameFormat("connection-cache-ava-detector-%d") + .setNameFormat("connection-cache-timout-detector-%d") .setDaemon(true) .build(); ScheduledThreadPoolExecutor scheduledExecutor = new ScheduledThreadPoolExecutor(1, threadFactory); - scheduledExecutor.scheduleWithFixedDelay(this::detectCacheAvailable, 2, 20, TimeUnit.MINUTES); + scheduledExecutor.scheduleWithFixedDelay(this::cleanTimeoutOrUnHealthCache, 2, 100, TimeUnit.SECONDS); } /** - * detect all cache available, cleanup not ava connection + * clean and remove timeout cache */ - private void detectCacheAvailable() { + private void cleanTimeoutOrUnHealthCache() { try { cacheMap.forEach((key, value) -> { + // index 0 is startTime, 1 is timeDiff Long[] cacheTime = timeoutMap.get(key); long currentTime = System.currentTimeMillis(); if (cacheTime == null || cacheTime.length != CACHE_TIME_LENGTH || cacheTime[0] + cacheTime[1] < currentTime) { - cacheMap.remove(key); - timeoutMap.remove(key); - try { - value.close(); - } catch (Exception e) { - log.error("connection close error: {}.", e.getMessage(), e); - } - - } - }); - } catch (Exception e) { - log.error("connection common cache detect cache available error: {}.", e.getMessage(), e); - } - } - - /** - * clean timeout cache - */ - private void cleanTimeoutCache() { - try { - cacheMap.forEach((key, value) -> { - // index 0 is startTime, 1 is timeDiff - Long[] cacheTime = timeoutMap.get(key); - long currentTime = System.currentTimeMillis(); - if (cacheTime == null || cacheTime.length != CACHE_TIME_LENGTH) { - timeoutMap.put(key, new Long[]{currentTime, DEFAULT_CACHE_TIMEOUT}); - } else if (cacheTime[0] + cacheTime[1] < currentTime) { - // timeout, remove this object cache log.warn("[connection common cache] clean the timeout cache, key {}", key); timeoutMap.remove(key); cacheMap.remove(key); try { value.close(); } catch (Exception e) { - log.error("connection close error: {}.", e.getMessage(), e); + log.error("clean connection close error: {}.", e.getMessage(), e); } } }); - Thread.sleep(20 * 1000); } catch (Exception e) { log.error("[connection common cache] clean timeout cache error: {}.", e.getMessage(), e); } @@ -165,7 +120,6 @@ public void addCache(T key, C value, Long timeDiff) { } cacheMap.put(key, value); timeoutMap.put(key, new Long[]{System.currentTimeMillis(), timeDiff}); - timeoutCleanerExecutor.execute(this::cleanTimeoutCache); } /**