diff --git a/common/src/main/java/org/apache/hertzbeat/common/constants/PluginType.java b/common/src/main/java/org/apache/hertzbeat/common/constants/PluginType.java index d586889cec6..3b94797050d 100644 --- a/common/src/main/java/org/apache/hertzbeat/common/constants/PluginType.java +++ b/common/src/main/java/org/apache/hertzbeat/common/constants/PluginType.java @@ -25,5 +25,9 @@ public enum PluginType { /** * do something after alter */ - POST_ALERT + POST_ALERT, + /** + * do something after collect + */ + POST_COLLECT } diff --git a/common/src/main/java/org/apache/hertzbeat/common/entity/plugin/PluginConfig.java b/common/src/main/java/org/apache/hertzbeat/common/entity/plugin/PluginConfig.java new file mode 100644 index 00000000000..f0377f50d5d --- /dev/null +++ b/common/src/main/java/org/apache/hertzbeat/common/entity/plugin/PluginConfig.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: cluster_msg.proto + +package org.apache.hertzbeat.common.entity.plugin; + +import java.util.List; +import lombok.Data; +import org.apache.hertzbeat.common.entity.manager.ParamDefine; + +/** + * The configuration file of the plugin, including parameters and other information + */ +@Data +public class PluginConfig { + + private List params; +} diff --git a/common/src/main/java/org/apache/hertzbeat/common/entity/plugin/PluginContext.java b/common/src/main/java/org/apache/hertzbeat/common/entity/plugin/PluginContext.java new file mode 100644 index 00000000000..e8433f9abc4 --- /dev/null +++ b/common/src/main/java/org/apache/hertzbeat/common/entity/plugin/PluginContext.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hertzbeat.common.entity.plugin; + + +import java.util.List; +import lombok.Builder; +import lombok.Data; +import org.apache.hertzbeat.common.entity.job.Configmap; + +/** + * plugin context + */ +@Builder +@Data +public class PluginContext { + + /** + * params + */ + List params; + +} diff --git a/manager/src/main/java/org/apache/hertzbeat/manager/component/alerter/DispatcherAlarm.java b/manager/src/main/java/org/apache/hertzbeat/manager/component/alerter/DispatcherAlarm.java index c543c08a1ca..faa483e22f8 100644 --- a/manager/src/main/java/org/apache/hertzbeat/manager/component/alerter/DispatcherAlarm.java +++ b/manager/src/main/java/org/apache/hertzbeat/manager/component/alerter/DispatcherAlarm.java @@ -29,10 +29,11 @@ import org.apache.hertzbeat.common.entity.manager.NoticeTemplate; import org.apache.hertzbeat.common.queue.CommonDataQueue; import org.apache.hertzbeat.manager.service.NoticeConfigService; -import org.apache.hertzbeat.manager.service.PluginService; import org.apache.hertzbeat.manager.support.exception.AlertNoticeException; import org.apache.hertzbeat.manager.support.exception.IgnoreException; +import org.apache.hertzbeat.plugin.PostAlertPlugin; import org.apache.hertzbeat.plugin.Plugin; +import org.apache.hertzbeat.plugin.runner.PluginRunner; import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Component; @@ -50,18 +51,18 @@ public class DispatcherAlarm implements InitializingBean { private final NoticeConfigService noticeConfigService; private final AlertStoreHandler alertStoreHandler; private final Map alertNotifyHandlerMap; - private final PluginService pluginService; + private final PluginRunner pluginRunner; public DispatcherAlarm(AlerterWorkerPool workerPool, CommonDataQueue dataQueue, NoticeConfigService noticeConfigService, AlertStoreHandler alertStoreHandler, - List alertNotifyHandlerList, PluginService pluginService) { + List alertNotifyHandlerList, PluginRunner pluginRunner) { this.workerPool = workerPool; this.dataQueue = dataQueue; this.noticeConfigService = noticeConfigService; this.alertStoreHandler = alertStoreHandler; - this.pluginService = pluginService; + this.pluginRunner = pluginRunner; alertNotifyHandlerMap = Maps.newHashMapWithExpectedSize(alertNotifyHandlerList.size()); alertNotifyHandlerList.forEach(r -> alertNotifyHandlerMap.put(r.type(), r)); } @@ -130,8 +131,11 @@ public void run() { alertStoreHandler.store(alert); // Notice distribution sendNotify(alert); - // Execute the plugin if enable - pluginService.pluginExecute(Plugin.class, plugin -> plugin.alert(alert), (plugin, configMapList) -> plugin.alert(alert, configMapList)); + // Execute the plugin if enable (Compatible with old version plugins, will be removed in later versions) + pluginRunner.pluginExecute(Plugin.class, plugin -> plugin.alert(alert)); + // Execute the plugin if enable with params + pluginRunner.pluginExecute(PostAlertPlugin.class, (afterAlertPlugin, pluginContext) -> afterAlertPlugin.execute(alert, pluginContext)); + } } catch (IgnoreException ignored) { } catch (InterruptedException e) { diff --git a/manager/src/main/java/org/apache/hertzbeat/manager/service/PluginService.java b/manager/src/main/java/org/apache/hertzbeat/manager/service/PluginService.java index a7f0c706e4f..8b747450940 100644 --- a/manager/src/main/java/org/apache/hertzbeat/manager/service/PluginService.java +++ b/manager/src/main/java/org/apache/hertzbeat/manager/service/PluginService.java @@ -22,8 +22,8 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; import org.apache.hertzbeat.common.entity.dto.PluginUpload; -import org.apache.hertzbeat.common.entity.job.Configmap; import org.apache.hertzbeat.common.entity.manager.PluginMetadata; +import org.apache.hertzbeat.common.entity.plugin.PluginContext; import org.apache.hertzbeat.manager.pojo.dto.PluginParam; import org.apache.hertzbeat.manager.pojo.dto.PluginParametersVO; import org.springframework.data.domain.Page; @@ -63,7 +63,18 @@ public interface PluginService { * @param execute run plugin logic * @param plugin type */ - void pluginExecute(Class clazz, Consumer execute, BiConsumer> biConsumer); + void pluginExecute(Class clazz, Consumer execute); + + + /** + * execute plugin + * + * @param clazz plugin interface + * @param execute run plugin logic + * @param plugin type + */ + void pluginExecute(Class clazz, BiConsumer execute); + /** * delete plugin diff --git a/manager/src/main/java/org/apache/hertzbeat/manager/service/impl/DefaultPluginRunner.java b/manager/src/main/java/org/apache/hertzbeat/manager/service/impl/DefaultPluginRunner.java new file mode 100644 index 00000000000..7add67d5668 --- /dev/null +++ b/manager/src/main/java/org/apache/hertzbeat/manager/service/impl/DefaultPluginRunner.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hertzbeat.manager.service.impl; + +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.hertzbeat.common.entity.plugin.PluginContext; +import org.apache.hertzbeat.manager.service.PluginService; +import org.apache.hertzbeat.plugin.runner.PluginRunner; +import org.springframework.stereotype.Service; + + +/** + * default plugin runner + */ +@Service +@RequiredArgsConstructor +@Slf4j +public class DefaultPluginRunner implements PluginRunner { + + private final PluginService pluginService; + + @Override + public void pluginExecute(Class clazz, Consumer execute) { + try { + pluginService.pluginExecute(clazz, execute); + } catch (Exception e) { + log.error("plugin execute failed", e); + } + } + + @Override + public void pluginExecute(Class clazz, BiConsumer execute) { + try { + pluginService.pluginExecute(clazz, execute); + } catch (Exception e) { + log.error("plugin execute failed", e); + } + } +} diff --git a/manager/src/main/java/org/apache/hertzbeat/manager/service/impl/PluginServiceImpl.java b/manager/src/main/java/org/apache/hertzbeat/manager/service/impl/PluginServiceImpl.java index 4ccae0fd0ed..0b789452f31 100644 --- a/manager/src/main/java/org/apache/hertzbeat/manager/service/impl/PluginServiceImpl.java +++ b/manager/src/main/java/org/apache/hertzbeat/manager/service/impl/PluginServiceImpl.java @@ -52,9 +52,10 @@ import org.apache.hertzbeat.common.constants.PluginType; import org.apache.hertzbeat.common.entity.dto.PluginUpload; import org.apache.hertzbeat.common.entity.job.Configmap; -import org.apache.hertzbeat.common.entity.manager.ParamDefine; import org.apache.hertzbeat.common.entity.manager.PluginItem; import org.apache.hertzbeat.common.entity.manager.PluginMetadata; +import org.apache.hertzbeat.common.entity.plugin.PluginConfig; +import org.apache.hertzbeat.common.entity.plugin.PluginContext; import org.apache.hertzbeat.common.support.exception.CommonException; import org.apache.hertzbeat.manager.dao.PluginItemDao; import org.apache.hertzbeat.manager.dao.PluginMetadataDao; @@ -62,7 +63,9 @@ import org.apache.hertzbeat.manager.pojo.dto.PluginParam; import org.apache.hertzbeat.manager.pojo.dto.PluginParametersVO; import org.apache.hertzbeat.manager.service.PluginService; +import org.apache.hertzbeat.plugin.PostAlertPlugin; import org.apache.hertzbeat.plugin.Plugin; +import org.apache.hertzbeat.plugin.PostCollectPlugin; import org.springframework.data.domain.Page; import org.springframework.data.domain.PageRequest; import org.springframework.data.jpa.domain.Specification; @@ -96,12 +99,12 @@ public class PluginServiceImpl implements PluginService { /** * plugin param define */ - private static final Map> PARAMS_DEFINE_MAP = new ConcurrentHashMap<>(); + private static final Map PARAMS_CONFIG_MAP = new ConcurrentHashMap<>(); /** * plugin params */ - private static final Map> PARAMS_MAP = new ConcurrentHashMap<>(); + private static final Map> PARAMS_MAP = new ConcurrentHashMap<>(); /** * pluginItem Mapping pluginId @@ -173,10 +176,10 @@ public void updateStatus(PluginMetadata plugin) { public PluginParametersVO getParamDefine(Long pluginMetadataId) { PluginParametersVO pluginParametersVO = new PluginParametersVO(); - if (PARAMS_DEFINE_MAP.containsKey(pluginMetadataId)) { - List paramDefines = PARAMS_DEFINE_MAP.get(pluginMetadataId); + if (PARAMS_CONFIG_MAP.containsKey(pluginMetadataId)) { + PluginConfig config = PARAMS_CONFIG_MAP.get(pluginMetadataId); List paramsByPluginMetadataId = pluginParamDao.findParamsByPluginMetadataId(pluginMetadataId); - pluginParametersVO.setParamDefines(paramDefines); + pluginParametersVO.setParamDefines(Optional.ofNullable(config).map(PluginConfig::getParams).orElse(new ArrayList<>())); pluginParametersVO.setPluginParams(paramsByPluginMetadataId); return pluginParametersVO; } @@ -205,6 +208,8 @@ private void syncPluginParamMap(Long pluginMetadataId, List params, static { PLUGIN_TYPE_MAPPING.put(Plugin.class, PluginType.POST_ALERT); + PLUGIN_TYPE_MAPPING.put(PostAlertPlugin.class, PluginType.POST_ALERT); + PLUGIN_TYPE_MAPPING.put(PostCollectPlugin.class, PluginType.POST_COLLECT); } /** @@ -218,7 +223,7 @@ public List validateJarFile(File jarFile) { try { URL jarUrl = new URL("file:" + jarFile.getAbsolutePath()); try (URLClassLoader classLoader = new URLClassLoader(new URL[]{jarUrl}, this.getClass().getClassLoader()); - JarFile jar = new JarFile(jarFile)) { + JarFile jar = new JarFile(jarFile)) { Enumeration entries = jar.entries(); Yaml yaml = new Yaml(); while (entries.hasMoreElements()) { @@ -355,11 +360,11 @@ private void syncPluginStatus() { } @PostConstruct - private void initParams(){ + private void initParams() { try { List params = pluginParamDao.findAll(); Map> content = params.stream() - .collect(Collectors.groupingBy(PluginParam::getPluginMetadataId)); + .collect(Collectors.groupingBy(PluginParam::getPluginMetadataId)); for (Map.Entry> entry : content.entrySet()) { syncPluginParamMap(entry.getKey(), entry.getValue(), false); @@ -381,36 +386,39 @@ private void loadJarToClassLoader() { pluginClassLoader.close(); } } + } catch (IOException e) { + throw new RuntimeException(e); + } - if (!pluginClassLoaders.isEmpty()) { - pluginClassLoaders.clear(); - System.gc(); - } - PARAMS_DEFINE_MAP.clear(); - List plugins = metadataDao.findPluginMetadataByEnableStatusTrue(); - for (PluginMetadata metadata : plugins) { + if (!pluginClassLoaders.isEmpty()) { + pluginClassLoaders.clear(); + System.gc(); + } + PARAMS_CONFIG_MAP.clear(); + List plugins = metadataDao.findPluginMetadataByEnableStatusTrue(); + for (PluginMetadata metadata : plugins) { + try { List urls = loadLibInPlugin(metadata.getJarFilePath(), metadata.getId()); urls.add(new File(metadata.getJarFilePath()).toURI().toURL()); pluginClassLoaders.add(new URLClassLoader(urls.toArray(new URL[0]), Plugin.class.getClassLoader())); + } catch (MalformedURLException e) { + log.error("Failed to load plugin:{}", e.getMessage()); + throw new CommonException("Failed to load plugin:" + e.getMessage()); + } catch (IOException exception) { + log.error("{} plugin file is missing, please delete the plugin and upload it again", metadata.getName()); } - - } catch (MalformedURLException e) { - log.error("Failed to load plugin:{}", e.getMessage()); - throw new CommonException("Failed to load plugin:" + e.getMessage()); - } catch (IOException e) { - throw new RuntimeException(e); } } /** * loading other JAR files that are dependencies for the plugin * - * @param pluginJarPath jar file path + * @param pluginJarPath jar file path * @param pluginMetadataId plugin id * @return urls */ - @SneakyThrows - private List loadLibInPlugin(String pluginJarPath, Long pluginMetadataId) { + + private List loadLibInPlugin(String pluginJarPath, Long pluginMetadataId) throws IOException { File libDir = new File(getOtherLibDir(pluginJarPath)); FileUtils.forceMkdir(libDir); List libUrls = new ArrayList<>(); @@ -440,8 +448,8 @@ private List loadLibInPlugin(String pluginJarPath, Long pluginMetadataId) { } if ((entry.getName().contains("define")) && (entry.getName().endsWith(".yml") || entry.getName().endsWith(".yaml"))) { try (InputStream ymlInputStream = jarFile.getInputStream(entry)) { - List params = yaml.loadAs(ymlInputStream, List.class); - PARAMS_DEFINE_MAP.put(pluginMetadataId, params); + PluginConfig config = yaml.loadAs(ymlInputStream, PluginConfig.class); + PARAMS_CONFIG_MAP.put(pluginMetadataId, config); } } } @@ -450,19 +458,29 @@ private List loadLibInPlugin(String pluginJarPath, Long pluginMetadataId) { } @Override - public void pluginExecute(Class clazz, Consumer execute, BiConsumer> biConsumer) { + public void pluginExecute(Class clazz, Consumer execute) { for (URLClassLoader pluginClassLoader : pluginClassLoaders) { ServiceLoader load = ServiceLoader.load(clazz, pluginClassLoader); for (T t : load) { if (pluginIsEnable(t.getClass())) { - Long pluginId = ITEM_TO_PLUGINMETADATAID_MAP.get(t.getClass().getName()); - List configmapList = PARAMS_MAP.get(pluginId); - if (CollectionUtils.isEmpty(configmapList)) { - execute.accept(t); - } else { - biConsumer.accept(t, configmapList); - } + execute.accept(t); + } + } + } + } + + @Override + public void pluginExecute(Class clazz, BiConsumer execute) { + for (URLClassLoader pluginClassLoader : pluginClassLoaders) { + ServiceLoader load = ServiceLoader.load(clazz, pluginClassLoader); + for (T t : load) { + if (!pluginIsEnable(t.getClass())) { + continue; } + Long pluginId = ITEM_TO_PLUGINMETADATAID_MAP.get(t.getClass().getName()); + List configmapList = PARAMS_MAP.get(pluginId); + PluginContext context = PluginContext.builder().params(configmapList).build(); + execute.accept(t, context); } } } diff --git a/manager/src/test/java/org/apache/hertzbeat/manager/component/alerter/DispatcherAlarmTest.java b/manager/src/test/java/org/apache/hertzbeat/manager/component/alerter/DispatcherAlarmTest.java index a0394cfba08..9347a6c6493 100644 --- a/manager/src/test/java/org/apache/hertzbeat/manager/component/alerter/DispatcherAlarmTest.java +++ b/manager/src/test/java/org/apache/hertzbeat/manager/component/alerter/DispatcherAlarmTest.java @@ -31,7 +31,7 @@ import org.apache.hertzbeat.common.entity.manager.NoticeTemplate; import org.apache.hertzbeat.common.queue.CommonDataQueue; import org.apache.hertzbeat.manager.service.NoticeConfigService; -import org.apache.hertzbeat.manager.service.PluginService; +import org.apache.hertzbeat.plugin.runner.PluginRunner; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -58,7 +58,7 @@ class DispatcherAlarmTest { private AlertStoreHandler alertStoreHandler; @Mock - private PluginService pluginService; + private PluginRunner pluginRunner; @Mock private AlertNotifyHandler alertNotifyHandler; @@ -77,7 +77,7 @@ void setUp() { noticeConfigService, alertStoreHandler, alertNotifyHandlerList, - pluginService + pluginRunner ); } diff --git a/plugin/src/main/java/org/apache/hertzbeat/plugin/Plugin.java b/plugin/src/main/java/org/apache/hertzbeat/plugin/Plugin.java index efeed504603..d72632925b5 100644 --- a/plugin/src/main/java/org/apache/hertzbeat/plugin/Plugin.java +++ b/plugin/src/main/java/org/apache/hertzbeat/plugin/Plugin.java @@ -17,9 +17,7 @@ package org.apache.hertzbeat.plugin; -import java.util.List; import org.apache.hertzbeat.common.entity.alerter.Alert; -import org.apache.hertzbeat.common.entity.job.Configmap; /** * Plugin @@ -30,10 +28,4 @@ public interface Plugin { * execute when alert */ void alert(Alert alert); - - /** - * Supports user-defined parameters - */ - void alert(Alert alert, List params); - } diff --git a/plugin/src/main/java/org/apache/hertzbeat/plugin/PostAlertPlugin.java b/plugin/src/main/java/org/apache/hertzbeat/plugin/PostAlertPlugin.java new file mode 100644 index 00000000000..feb4e177a67 --- /dev/null +++ b/plugin/src/main/java/org/apache/hertzbeat/plugin/PostAlertPlugin.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hertzbeat.plugin; + +import org.apache.hertzbeat.common.entity.alerter.Alert; +import org.apache.hertzbeat.common.entity.plugin.PluginContext; + +/** + * Post-alarm plug-in + */ +public interface PostAlertPlugin { + + /** + * Supports user-defined parameters + */ + void execute(Alert alert, PluginContext pluginContext); + +} diff --git a/plugin/src/main/java/org/apache/hertzbeat/plugin/PostCollectPlugin.java b/plugin/src/main/java/org/apache/hertzbeat/plugin/PostCollectPlugin.java new file mode 100644 index 00000000000..c4364ca151f --- /dev/null +++ b/plugin/src/main/java/org/apache/hertzbeat/plugin/PostCollectPlugin.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hertzbeat.plugin; + +import org.apache.hertzbeat.common.entity.message.CollectRep; +import org.apache.hertzbeat.common.entity.plugin.PluginContext; + +/** + * Post-collect plug-in + */ +public interface PostCollectPlugin { + + /** + * do something after collect + */ + void execute(CollectRep.MetricsData metricsData, PluginContext pluginContext); + +} diff --git a/plugin/src/main/java/org/apache/hertzbeat/plugin/runner/PluginRunner.java b/plugin/src/main/java/org/apache/hertzbeat/plugin/runner/PluginRunner.java new file mode 100644 index 00000000000..7f3e00930c4 --- /dev/null +++ b/plugin/src/main/java/org/apache/hertzbeat/plugin/runner/PluginRunner.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hertzbeat.plugin.runner; + + +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import org.apache.hertzbeat.common.entity.plugin.PluginContext; + +/** + * plugin runner + */ +public interface PluginRunner { + + /** + * execute plugin + * @param clazz plugin class + * @param execute plugin execution logic + * @param plugin type + */ + void pluginExecute(Class clazz, Consumer execute); + + /** + * execute plugin with params + * @param clazz plugin class + * @param execute plugin execution logic + * @param plugin type + */ + void pluginExecute(Class clazz, BiConsumer execute); + +} diff --git a/plugin/src/main/resources/META-INF/services/org.apache.hertzbeat.plugin.PostAlertPlugin b/plugin/src/main/resources/META-INF/services/org.apache.hertzbeat.plugin.PostAlertPlugin new file mode 100644 index 00000000000..b1312a0905c --- /dev/null +++ b/plugin/src/main/resources/META-INF/services/org.apache.hertzbeat.plugin.PostAlertPlugin @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/plugin/src/main/resources/META-INF/services/org.apache.hertzbeat.plugin.PostCollectPlugin b/plugin/src/main/resources/META-INF/services/org.apache.hertzbeat.plugin.PostCollectPlugin new file mode 100644 index 00000000000..b1312a0905c --- /dev/null +++ b/plugin/src/main/resources/META-INF/services/org.apache.hertzbeat.plugin.PostCollectPlugin @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/warehouse/pom.xml b/warehouse/pom.xml index c0303aa7732..491b6349a1c 100644 --- a/warehouse/pom.xml +++ b/warehouse/pom.xml @@ -35,6 +35,12 @@ hertzbeat-common provided + + + org.apache.hertzbeat + hertzbeat-plugin + provided + commons-net diff --git a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/DataStorageDispatch.java b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/DataStorageDispatch.java index fc1304ebefc..5d633df3b2d 100644 --- a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/DataStorageDispatch.java +++ b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/DataStorageDispatch.java @@ -21,6 +21,8 @@ import lombok.extern.slf4j.Slf4j; import org.apache.hertzbeat.common.entity.message.CollectRep; import org.apache.hertzbeat.common.queue.CommonDataQueue; +import org.apache.hertzbeat.plugin.PostCollectPlugin; +import org.apache.hertzbeat.plugin.runner.PluginRunner; import org.apache.hertzbeat.warehouse.WarehouseWorkerPool; import org.apache.hertzbeat.warehouse.store.history.HistoryDataWriter; import org.apache.hertzbeat.warehouse.store.realtime.RealTimeDataWriter; @@ -39,14 +41,18 @@ public class DataStorageDispatch { private final RealTimeDataWriter realTimeDataWriter; private final Optional historyDataWriter; + private final PluginRunner pluginRunner; + public DataStorageDispatch(CommonDataQueue commonDataQueue, WarehouseWorkerPool workerPool, Optional historyDataWriter, - RealTimeDataWriter realTimeDataWriter) { + RealTimeDataWriter realTimeDataWriter, + PluginRunner pluginRunner) { this.commonDataQueue = commonDataQueue; this.workerPool = workerPool; this.realTimeDataWriter = realTimeDataWriter; this.historyDataWriter = historyDataWriter; + this.pluginRunner = pluginRunner; startPersistentDataStorage(); startRealTimeDataStorage(); } @@ -61,6 +67,7 @@ private void startRealTimeDataStorage() { continue; } realTimeDataWriter.saveData(metricsData); + pluginRunner.pluginExecute(PostCollectPlugin.class, ((postCollectPlugin, pluginContext) -> postCollectPlugin.execute(metricsData, pluginContext))); } catch (InterruptedException interruptedException) { Thread.currentThread().interrupt(); } catch (Exception e) { diff --git a/web-app/src/assets/i18n/en-US.json b/web-app/src/assets/i18n/en-US.json index db13194e846..48d9529cdb2 100644 --- a/web-app/src/assets/i18n/en-US.json +++ b/web-app/src/assets/i18n/en-US.json @@ -580,6 +580,7 @@ "plugin.jar.file": "Jar File", "plugin.delete": "Delete Plugin", "plugin.type.POST_ALERT": "POST ALERT", + "plugin.type.POST_COLLECT": "POST COLLECT", "plugin.search": "Search plugins", "plugin.edit": "Edit plugin", "define.help": "The monitor templates define each monitoring type, parameter variable, metrics info, collection protocol, etc. You can select an existing monitoring template from the drop-down menu then make modifications according to your own needs. The bottom-left area is the compare area and the bottom-right area is the editing place.
You can also click \"New Monitor Type\" to custom define an new type. Currently supported protocols include HTTP, JDBC, SSH, JMX, SNMP. Monitor Templates.", diff --git a/web-app/src/assets/i18n/zh-CN.json b/web-app/src/assets/i18n/zh-CN.json index 1603c6af17e..303fab11bd7 100644 --- a/web-app/src/assets/i18n/zh-CN.json +++ b/web-app/src/assets/i18n/zh-CN.json @@ -563,6 +563,7 @@ "plugin.delete": "刪除插件", "plugin.type": "插件类型", "plugin.type.POST_ALERT": "告警后", + "plugin.type.POST_COLLECT": "采集后", "plugin.search": "搜索插件", "plugin.edit": "编辑插件", "define.help": "监控模版定义每一个监控类型,类型的参数变量,指标信息,采集协议等。您可根据需求在下拉菜单中选择已有监控模板修改。左下区域为对照区,右下区域为编辑区。
您也可以点击“新增监控类型”来自定义新的的监控类型,目前支持 HTTP 协议JDBC协议SSH协议 JMX 协议 SNMP 协议点击查看监控模板。\n", diff --git a/web-app/src/assets/i18n/zh-TW.json b/web-app/src/assets/i18n/zh-TW.json index 6f974a620fe..9783be18aec 100644 --- a/web-app/src/assets/i18n/zh-TW.json +++ b/web-app/src/assets/i18n/zh-TW.json @@ -576,6 +576,7 @@ "plugin.jar.file": "Jar包", "plugin.delete": "刪除插件", "plugin.type.POST_ALERT": "告警後", + "plugin.type.POST_COLLECT": "採集後", "plugin.search": "搜尋插件", "plugin.edit": "編輯插件", "define.help": "監控模版定義每一個監控類型,類型的參數變量,指標信息,採集協議等。您可根據需求在下拉功能表中選擇已有監控模版進行修改。右下區域為編輯區,左下區域為對照區。
您也可以點擊“新增監控類型”來自定義新的的監控類型,現支持 HTTP協議JDBC協定SSH協定 JMX協定 SNMP協定點擊查看監控範本。",