Skip to content

Commit

Permalink
[ISSUE apache#367] Optimize plugin load (apache#519)
Browse files Browse the repository at this point in the history
* 1. Optimize plugin load, support load plugin by plugin instance name
2. Optimize install plugin, suppory install plugin by ./gradlew jar dist
3. Make UrlClassLoader siglenton

* add license header

* optimize path

* resolve confilct
  • Loading branch information
ruanwenjun authored and xwm1992 committed Dec 27, 2021
1 parent 623f007 commit 154f4d7
Show file tree
Hide file tree
Showing 21 changed files with 200 additions and 115 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ dist
classes
package-lock.json
node_modules
.DS_Store
.DS_Store
.run
39 changes: 39 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,22 @@ subprojects {
}
}

// pluginType:
// pluginInstanceName:
// moduleName
Map<String, Map<String, String>> pluginTypeMap = [
"connector": [
"rocketmq" : "eventmesh-connector-rocketmq",
"standalone": "eventmesh-connector-standalone",
],
"security" : [
"acl": "eventmesh-security-acl",
],
"registry" : [
"namesrv": "eventmesh-registry-rocketmq-namesrv",
]
]

task dist(dependsOn: ['jar']) {
doFirst {
new File(projectDir, '../dist/bin').mkdirs()
Expand All @@ -196,6 +212,29 @@ subprojects {
}

doLast {
pluginTypeMap.forEach((pluginType, pluginInstanceMap) -> {
pluginInstanceMap.forEach((pluginInstanceName, moduleName) -> {
if (moduleName == project.name) {
println String.format("install plugin, pluginType: %s, pluginInstanceName: %s, module: %s",
pluginType, pluginInstanceName, moduleName)
new File("${rootDir}/dist/plugin/${pluginType}/${pluginInstanceName}").mkdirs()
copy {
into "${rootDir}/dist/plugin/${pluginType}/${pluginInstanceName}"
from project.jar.getArchivePath()
}
copy {
into "${rootDir}/dist/plugin/${pluginType}/${pluginInstanceName}"
from project.configurations.runtimeClasspath
}
copy {
into "${rootDir}/dist/conf"
from sourceSets.main.resources.srcDirs
exclude 'META-INF'
}
}
})
})

copy {
into('../dist/apps/')
from project.jar.getArchivePath()
Expand Down
24 changes: 1 addition & 23 deletions eventmesh-connector-plugin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,4 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

task copyConnectorPlugin(dependsOn: ['jar']) {
doFirst {
new File(projectDir, '../eventmesh-connector-plugin/dist/apps').mkdir()
new File(projectDir, '../dist/plugin/connector').mkdirs()
}
doLast {
copy {
into('../eventmesh-connector-plugin/dist/apps/')
from project.jar.getArchivePath()
exclude {
"eventmesh-connector-plugin-${version}.jar"
"eventmesh-connector-api-${version}.jar"
}
}
copy {
into '../dist/plugin/connector'
from "../eventmesh-connector-plugin/dist/apps/eventmesh-connector-standalone-${version}.jar"
from "../eventmesh-connector-plugin/dist/apps/eventmesh-connector-rocketmq-${version}.jar"
}
}
}
*/
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
*/
package org.apache.eventmesh.api.connector;

import org.apache.eventmesh.spi.EventMeshExtensionType;
import org.apache.eventmesh.spi.EventMeshSPI;

@EventMeshSPI(isSingleton = true)
@EventMeshSPI(isSingleton = true, eventMeshExtensionType = EventMeshExtensionType.CONNECTOR)
public interface ConnectorResourceService {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@
import io.openmessaging.api.Message;

import org.apache.eventmesh.api.AbstractContext;
import org.apache.eventmesh.spi.EventMeshExtensionType;
import org.apache.eventmesh.spi.EventMeshSPI;

@EventMeshSPI(isSingleton = false)
@EventMeshSPI(isSingleton = false, eventMeshExtensionType = EventMeshExtensionType.CONNECTOR)
public interface MeshMQPushConsumer extends Consumer {

void init(Properties keyValue) throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@
import io.openmessaging.api.SendCallback;

import org.apache.eventmesh.api.RRCallback;
import org.apache.eventmesh.spi.EventMeshExtensionType;
import org.apache.eventmesh.spi.EventMeshSPI;

@EventMeshSPI(isSingleton = false)
@EventMeshSPI(isSingleton = false, eventMeshExtensionType = EventMeshExtensionType.CONNECTOR)
public interface MeshMQProducer extends Producer {

void init(Properties properties) throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,10 @@
import io.openmessaging.api.MessageListener;
import io.openmessaging.api.MessageSelector;
import io.openmessaging.api.MessagingAccessPoint;
import io.openmessaging.api.OMS;
import io.openmessaging.api.OMSBuiltinKeys;

import org.apache.eventmesh.api.AbstractContext;
import org.apache.eventmesh.api.consumer.MeshMQPushConsumer;
import org.apache.eventmesh.connector.rocketmq.MessagingAccessPointImpl;
import org.apache.eventmesh.connector.rocketmq.common.Constants;
import org.apache.eventmesh.connector.rocketmq.common.EventMeshConstants;
import org.apache.eventmesh.connector.rocketmq.config.ClientConfiguration;
Expand All @@ -53,8 +52,6 @@ public class RocketMQConsumerImpl implements MeshMQPushConsumer {

public Logger messageLogger = LoggerFactory.getLogger("message");

public final String DEFAULT_ACCESS_DRIVER = "org.apache.eventmesh.connector.rocketmq.MessagingAccessPointImpl";

private PushConsumerImpl pushConsumer;

@Override
Expand All @@ -75,9 +72,7 @@ public synchronized void init(Properties keyValue) throws Exception {
}

String omsNamesrv = clientConfiguration.namesrvAddr;
// KeyValue properties = OMS.newKeyValue().put(OMSBuiltinKeys.DRIVER_IMPL, DEFAULT_ACCESS_DRIVER);
Properties properties = new Properties();
properties.put(OMSBuiltinKeys.DRIVER_IMPL, DEFAULT_ACCESS_DRIVER);
properties.put("ACCESS_POINTS", omsNamesrv);
properties.put("REGION", "namespace");
properties.put("instanceName", instanceName);
Expand All @@ -87,7 +82,7 @@ public synchronized void init(Properties keyValue) throws Exception {
} else {
properties.put("MESSAGE_MODEL", MessageModel.CLUSTERING.name());
}
MessagingAccessPoint messagingAccessPoint = OMS.builder().build(properties);
MessagingAccessPoint messagingAccessPoint = new MessagingAccessPointImpl(properties);
pushConsumer = (PushConsumerImpl) messagingAccessPoint.createConsumer(properties);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import org.apache.eventmesh.api.RRCallback;
import org.apache.eventmesh.api.producer.MeshMQProducer;
import org.apache.eventmesh.connector.rocketmq.MessagingAccessPointImpl;
import org.apache.eventmesh.connector.rocketmq.common.EventMeshConstants;
import org.apache.eventmesh.connector.rocketmq.config.ClientConfiguration;
import org.apache.eventmesh.connector.rocketmq.config.ConfigurationWrapper;
Expand All @@ -48,8 +49,6 @@ public class RocketMQProducerImpl implements MeshMQProducer {

private ProducerImpl producer;

public final String DEFAULT_ACCESS_DRIVER = "org.apache.eventmesh.connector.rocketmq.MessagingAccessPointImpl";

@Override
public synchronized void init(Properties keyValue) {
ConfigurationWrapper configurationWrapper =
Expand All @@ -62,14 +61,13 @@ public synchronized void init(Properties keyValue) {

String omsNamesrv = clientConfiguration.namesrvAddr;
Properties properties = new Properties();
properties.put(OMSBuiltinKeys.DRIVER_IMPL, DEFAULT_ACCESS_DRIVER);
properties.put("ACCESS_POINTS", omsNamesrv);
properties.put("REGION", "namespace");
properties.put("RMQ_PRODUCER_GROUP", producerGroup);
properties.put("OPERATION_TIMEOUT", 3000);
properties.put("PRODUCER_ID", producerGroup);

MessagingAccessPoint messagingAccessPoint = OMS.builder().build(properties);
MessagingAccessPoint messagingAccessPoint = new MessagingAccessPointImpl(properties);
producer = (ProducerImpl) messagingAccessPoint.createProducer(properties);

}
Expand Down
21 changes: 0 additions & 21 deletions eventmesh-registry-plugin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,3 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

task copyRegistryPlugin(dependsOn: ['jar']) {
doFirst {
new File(projectDir, '../eventmesh-registry-plugin/dist/apps').mkdir()
new File(projectDir, '../dist/plugin/registry').mkdirs()
}
doLast {
copy {
into('../eventmesh-registry-plugin/dist/apps/')
from project.jar.getArchivePath()
exclude {
"eventmesh-registry-plugin-${version}.jar"
"eventmesh-registry-api-${version}.jar"
}
}
copy {
into '../dist/plugin/registry'
from "../eventmesh-registry-plugin/dist/apps/eventmesh-registry-rocketmq-namesrv-${version}.jar"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@
import org.apache.eventmesh.api.registry.dto.EventMeshDataInfo;
import org.apache.eventmesh.api.registry.dto.EventMeshRegisterInfo;
import org.apache.eventmesh.api.registry.dto.EventMeshUnRegisterInfo;
import org.apache.eventmesh.spi.EventMeshExtensionType;
import org.apache.eventmesh.spi.EventMeshSPI;

import java.util.List;
import java.util.Map;

@EventMeshSPI(isSingleton = true)
@EventMeshSPI(isSingleton = true, eventMeshExtensionType = EventMeshExtensionType.REGISTRY)
public interface RegistryService {
void init() throws RegistryException;

Expand Down
23 changes: 1 addition & 22 deletions eventmesh-security-plugin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,4 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

task copyAclPlugin(dependsOn: ['jar']) {
doFirst {
new File(projectDir, '../eventmesh-security-plugin/dist/apps').mkdir()
new File(projectDir, '../dist/plugin/security').mkdirs()
}
doLast {
copy {
into('../eventmesh-security-plugin/dist/apps/')
from project.jar.getArchivePath()
exclude {
"eventmesh-security-plugin-${version}.jar"
"eventmesh-security-api-${version}.jar"
}
}
copy {
into '../dist/plugin/security'
from "../eventmesh-security-plugin/dist/apps/eventmesh-security-acl-${version}.jar"
}
}
}
*/
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
package org.apache.eventmesh.api.acl;

import org.apache.eventmesh.api.exception.AclException;
import org.apache.eventmesh.spi.EventMeshExtensionType;
import org.apache.eventmesh.spi.EventMeshSPI;

import java.util.Properties;

@EventMeshSPI(isSingleton = true)
@EventMeshSPI(isSingleton = true, eventMeshExtensionType = EventMeshExtensionType.SECURITY)
public interface AclService {
void init() throws AclException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,43 +72,43 @@ public static <T> T getExtension(Class<T> extensionType, String extensionName) {
}

@SuppressWarnings("unchecked")
private static <T> T getSingletonExtension(Class<T> extensionType, String extensionName) {
return (T) EXTENSION_INSTANCE_CACHE.computeIfAbsent(extensionName, name -> {
Class<T> extensionInstanceClass = getExtensionClass(extensionType, extensionName);
private static <T> T getSingletonExtension(Class<T> extensionType, String extensionInstanceName) {
return (T) EXTENSION_INSTANCE_CACHE.computeIfAbsent(extensionInstanceName, name -> {
Class<T> extensionInstanceClass = getExtensionInstanceClass(extensionType, extensionInstanceName);
try {
if (extensionInstanceClass == null) {
return null;
}
T extensionInstance = extensionInstanceClass.newInstance();
logger.info("initialize extension instance success, extensionType: {}, extensionName: {}",
extensionType, extensionName);
logger.info("initialize extension instance success, extensionType: {}, extensionInstanceName: {}",
extensionType, extensionInstanceName);
return extensionInstance;
} catch (InstantiationException | IllegalAccessException e) {
throw new ExtensionException("Extension initialize error", e);
}
});
}

private static <T> T getPrototypeExtension(Class<T> extensionType, String extensionName) {
Class<T> extensionInstanceClass = getExtensionClass(extensionType, extensionName);
private static <T> T getPrototypeExtension(Class<T> extensionType, String extensionInstanceName) {
Class<T> extensionInstanceClass = getExtensionInstanceClass(extensionType, extensionInstanceName);
try {
if (extensionInstanceClass == null) {
return null;
}
T extensionInstance = extensionInstanceClass.newInstance();
logger.info("initialize extension instance success, extensionType: {}, extensionName: {}",
extensionType, extensionName);
extensionType, extensionInstanceName);
return extensionInstance;
} catch (InstantiationException | IllegalAccessException e) {
throw new ExtensionException("Extension initialize error", e);
}
}

@SuppressWarnings("unchecked")
private static <T> Class<T> getExtensionClass(Class<T> extensionType, String extensionName) {
private static <T> Class<T> getExtensionInstanceClass(Class<T> extensionType, String extensionInstanceName) {
for (ExtensionClassLoader extensionClassLoader : extensionClassLoaders) {
Map<String, Class<?>> extensionClassMap = extensionClassLoader.loadExtensionClass(extensionType);
Class<?> instanceClass = extensionClassMap.get(extensionName);
Map<String, Class<?>> extensionInstanceClassMap = extensionClassLoader.loadExtensionClass(extensionType, extensionInstanceName);
Class<?> instanceClass = extensionInstanceClassMap.get(extensionInstanceName);
if (instanceClass != null) {
return (Class<T>) instanceClass;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.spi;

/**
* An Extension can be defined by extensionTypeName and extensionInstanceName
*/
public enum EventMeshExtensionType {
UNKNOWN("unknown"),
CONNECTOR("connector"),
REGISTRY("registry"),
SECURITY("security"),
;

private final String extensionTypeName;

EventMeshExtensionType(String extensionTypeName) {
this.extensionTypeName = extensionTypeName;
}

public String getExtensionTypeName() {
return extensionTypeName;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,11 @@
*/
boolean isSingleton() default false;

/**
* {@link EventMeshExtensionType}
* @return extension type
*/
EventMeshExtensionType eventMeshExtensionType();

}

Loading

0 comments on commit 154f4d7

Please sign in to comment.