diff --git a/all/pom.xml b/all/pom.xml index d83e5b51b..f0aef3dc4 100644 --- a/all/pom.xml +++ b/all/pom.xml @@ -69,7 +69,7 @@ 1.5.18 4.5.13 4.4.13 - 1.28.0 + 1.33.0 27.0-jre 2.12.1 @@ -232,6 +232,11 @@ sofa-rpc-registry-sofa ${project.version} + + com.alipay.sofa + sofa-rpc-registry-polaris + ${project.version} + com.alipay.sofa sofa-rpc-remoting-bolt @@ -505,6 +510,7 @@ com.alipay.sofa:sofa-rpc-registry-mesh com.alipay.sofa:sofa-rpc-registry-multicast com.alipay.sofa:sofa-rpc-registry-sofa + com.alipay.sofa:sofa-rpc-registry-polaris com.alipay.sofa:sofa-rpc-remoting-bolt com.alipay.sofa:sofa-rpc-remoting-http com.alipay.sofa:sofa-rpc-remoting-resteasy diff --git a/bom/pom.xml b/bom/pom.xml index ae00e1a77..0d8d75a1d 100644 --- a/bom/pom.xml +++ b/bom/pom.xml @@ -25,6 +25,7 @@ 2.6.7 2.0.3 5.2.0 + 1.2.2 1.5.18 7.0 27.0-jre @@ -36,7 +37,7 @@ 2.9.10.8 0.6.12 1.5.9 - 1.28.0 + 1.33.0 4.4.13 @@ -521,6 +522,18 @@ jetty-alpn-openjdk8-client 9.4.8.v20171121 + + + com.tencent.polaris + polaris-discovery-factory + ${polaris.version} + + + com.tencent.polaris + polaris-test-mock-discovery + ${polaris.version} + test + diff --git a/example/pom.xml b/example/pom.xml index 6f875fda4..0e4d70a63 100644 --- a/example/pom.xml +++ b/example/pom.xml @@ -13,8 +13,8 @@ 3.11.0 1.17.0 - 0.0.2 - 1.28.0 + 0.0.3 + 1.33.0 3.7.0 diff --git a/example/src/test/java/com/alipay/sofa/rpc/polaris/PolarisBoltClientMain.java b/example/src/test/java/com/alipay/sofa/rpc/polaris/PolarisBoltClientMain.java new file mode 100644 index 000000000..731a51604 --- /dev/null +++ b/example/src/test/java/com/alipay/sofa/rpc/polaris/PolarisBoltClientMain.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.polaris; + +import com.alipay.sofa.rpc.config.ConsumerConfig; +import com.alipay.sofa.rpc.config.RegistryConfig; +import com.alipay.sofa.rpc.context.RpcRuntimeContext; +import com.alipay.sofa.rpc.log.Logger; +import com.alipay.sofa.rpc.log.LoggerFactory; +import com.alipay.sofa.rpc.test.EchoService; +import com.alipay.sofa.rpc.test.HelloService; + +/** + *

+ *

+ * + * + * @author ZhangLibin + */ +public class PolarisBoltClientMain { + + /** + * slf4j Logger for this class + */ + private final static Logger LOGGER = LoggerFactory.getLogger(PolarisBoltClientMain.class); + + public static void main(String[] args) throws InterruptedException { + + RegistryConfig registryConfig = new RegistryConfig() + .setProtocol("polaris") + .setAddress("127.0.0.1:8091"); + + ConsumerConfig consumerConfig = new ConsumerConfig() + .setInterfaceId(HelloService.class.getName()) + .setRegistry(registryConfig) + .setTimeout(3000); + HelloService helloService = consumerConfig.refer(); + + ConsumerConfig consumerConfig2 = new ConsumerConfig() + .setInterfaceId(EchoService.class.getName()) + .setRegistry(registryConfig) + .setTimeout(3000); + EchoService echoService = consumerConfig2.refer(); + + LOGGER.warn("started at pid {}", RpcRuntimeContext.PID); + + try { + while (true) { + try { + String s = helloService.sayHello("xxx", 22); + LOGGER.warn("{}", s); + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); + } + try { + Thread.sleep(2000); + } catch (Exception e) { + } + } + + } catch (Exception e) { + LOGGER.error("", e); + } + + synchronized (PolarisBoltClientMain.class) { + while (true) { + PolarisBoltClientMain.class.wait(); + } + } + } + +} diff --git a/example/src/test/java/com/alipay/sofa/rpc/polaris/PolarisBoltServerMain.java b/example/src/test/java/com/alipay/sofa/rpc/polaris/PolarisBoltServerMain.java new file mode 100644 index 000000000..f428885f5 --- /dev/null +++ b/example/src/test/java/com/alipay/sofa/rpc/polaris/PolarisBoltServerMain.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.polaris; + +import com.alipay.sofa.rpc.config.ProviderConfig; +import com.alipay.sofa.rpc.config.RegistryConfig; +import com.alipay.sofa.rpc.config.ServerConfig; +import com.alipay.sofa.rpc.context.RpcRuntimeContext; +import com.alipay.sofa.rpc.log.Logger; +import com.alipay.sofa.rpc.log.LoggerFactory; +import com.alipay.sofa.rpc.test.EchoService; +import com.alipay.sofa.rpc.test.EchoServiceImpl; +import com.alipay.sofa.rpc.test.HelloService; +import com.alipay.sofa.rpc.test.HelloServiceImpl; + +/** + *

+ *

+ * + * + * @author ZhangLibin + */ +public class PolarisBoltServerMain { + + /** + * slf4j Logger for this class + */ + private final static Logger LOGGER = LoggerFactory.getLogger(PolarisBoltServerMain.class); + + public static void main(String[] args) { + + RegistryConfig registryConfig = new RegistryConfig() + .setProtocol("polaris") + .setAddress("127.0.0.1:8091"); + + ServerConfig serverConfig = new ServerConfig() + .setPort(22101) + .setDaemon(false); + + ProviderConfig providerConfig = new ProviderConfig() + .setInterfaceId(HelloService.class.getName()) + .setRef(new HelloServiceImpl("result from 22101")) + .setServer(serverConfig) + .setRegistry(registryConfig); + + ProviderConfig providerConfig2 = new ProviderConfig() + .setInterfaceId(EchoService.class.getName()) + .setRef(new EchoServiceImpl()) + .setServer(serverConfig) + .setRegistry(registryConfig); + + providerConfig.export(); + providerConfig2.export(); + + LOGGER.warn("started at pid {}", RpcRuntimeContext.PID); + } + +} diff --git a/registry/pom.xml b/registry/pom.xml index 393651414..e65411b8e 100644 --- a/registry/pom.xml +++ b/registry/pom.xml @@ -21,6 +21,7 @@ registry-mesh registry-multicast registry-sofa + registry-polaris diff --git a/registry/registry-polaris/pom.xml b/registry/registry-polaris/pom.xml new file mode 100644 index 000000000..e145b3215 --- /dev/null +++ b/registry/registry-polaris/pom.xml @@ -0,0 +1,104 @@ + + + 4.0.0 + + + com.alipay.sofa + sofa-rpc-registry + ${revision} + + sofa-rpc-registry-polaris + + + com.alipay.sofa + sofa-rpc-log + + + com.alipay.sofa + sofa-rpc-api + + + com.tencent.polaris + polaris-discovery-factory + + + com.tencent.polaris + polaris-test-mock-discovery + test + + + org.slf4j + slf4j-log4j12 + test + + + junit + junit + test + + + + src/main/java + + + src/main/resources + false + + **/** + + + + src/test/java + + + src/test/resources + false + + **/** + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + ${maven.compiler.source} + ${maven.compiler.source} + ${project.build.sourceEncoding} + + + + org.apache.maven.plugins + maven-install-plugin + + ${module.install.skip} + + + + org.apache.maven.plugins + maven-deploy-plugin + + ${module.deploy.skip} + + + + org.apache.maven.plugins + maven-surefire-plugin + + ${skipTests} + + + **/*Test.java + + + once + + + + + + diff --git a/registry/registry-polaris/src/main/java/com/alipay/sofa/rpc/registry/polaris/PolarisConstants.java b/registry/registry-polaris/src/main/java/com/alipay/sofa/rpc/registry/polaris/PolarisConstants.java new file mode 100644 index 000000000..e3004a661 --- /dev/null +++ b/registry/registry-polaris/src/main/java/com/alipay/sofa/rpc/registry/polaris/PolarisConstants.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.registry.polaris; + +/** + * constants of the polaris registry + * + * @author ZhangLibin + */ +public class PolarisConstants { + + public static final String POLARIS_SERVER_CONNECTOR_PROTOCOL_KEY = "connector.protocol"; + + public static final String HEALTH_CHECK_TTL_KEY = "healthCheck.ttl"; + + public static final String HEARTBEAT_INTERVAL_KEY = "heartbeat.interval"; + + public static final String HEARTBEAT_CORE_SIZE_KEY = "heartbeat.coreSize"; + + public static final String LOOKUP_INTERVAL_KEY = "lookup.interval"; + + public static final String POLARIS_SERVER_CONNECTOR_PROTOCOL = "grpc"; + + public static final int DEFAULT_HEALTH_CHECK_TTL = 10; + + public static final int DEFAULT_HEARTBEAT_INTERVAL = 3000; + + public static final int DEFAULT_HEARTBEAT_CORE_SIZE = 1; + + public static final int DEFAULT_LOOKUP_INTERVAL = 1000; +} diff --git a/registry/registry-polaris/src/main/java/com/alipay/sofa/rpc/registry/polaris/PolarisRegistry.java b/registry/registry-polaris/src/main/java/com/alipay/sofa/rpc/registry/polaris/PolarisRegistry.java new file mode 100644 index 000000000..035960859 --- /dev/null +++ b/registry/registry-polaris/src/main/java/com/alipay/sofa/rpc/registry/polaris/PolarisRegistry.java @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.registry.polaris; + +import com.alipay.sofa.rpc.client.ProviderGroup; +import com.alipay.sofa.rpc.client.ProviderInfo; +import com.alipay.sofa.rpc.common.utils.CommonUtils; +import com.alipay.sofa.rpc.config.AbstractInterfaceConfig; +import com.alipay.sofa.rpc.config.ConfigUniqueNameGenerator; +import com.alipay.sofa.rpc.config.ConsumerConfig; +import com.alipay.sofa.rpc.config.ProviderConfig; +import com.alipay.sofa.rpc.config.RegistryConfig; +import com.alipay.sofa.rpc.config.ServerConfig; +import com.alipay.sofa.rpc.context.RpcRunningState; +import com.alipay.sofa.rpc.core.exception.SofaRpcRuntimeException; +import com.alipay.sofa.rpc.event.ConsumerSubEvent; +import com.alipay.sofa.rpc.event.EventBus; +import com.alipay.sofa.rpc.event.ProviderPubEvent; +import com.alipay.sofa.rpc.ext.Extension; +import com.alipay.sofa.rpc.log.LogCodes; +import com.alipay.sofa.rpc.log.Logger; +import com.alipay.sofa.rpc.log.LoggerFactory; +import com.alipay.sofa.rpc.registry.Registry; +import com.tencent.polaris.api.core.ConsumerAPI; +import com.tencent.polaris.api.core.ProviderAPI; +import com.tencent.polaris.api.rpc.InstanceDeregisterRequest; +import com.tencent.polaris.api.rpc.InstanceHeartbeatRequest; +import com.tencent.polaris.api.rpc.InstanceRegisterRequest; +import com.tencent.polaris.factory.api.DiscoveryAPIFactory; +import com.tencent.polaris.factory.config.ConfigurationImpl; +import com.tencent.polaris.factory.config.global.ServerConnectorConfigImpl; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import static com.alipay.sofa.rpc.registry.utils.RegistryUtils.buildUniqueName; +import static com.alipay.sofa.rpc.registry.utils.RegistryUtils.convertProviderToMap; +import static com.alipay.sofa.rpc.registry.utils.RegistryUtils.getServerHost; + +/** + * the main logic of polaris registry, similar to consul + * + * @author ZhangLibin + */ +@Extension("polaris") +public class PolarisRegistry extends Registry { + + public static final String EXT_NAME = "PolarisRegistry"; + + private static final Logger LOGGER = LoggerFactory.getLogger(PolarisRegistry.class); + + private final PolarisRegistryProperties properties; + + public ProviderAPI providerAPI; + + public ConsumerAPI consumerAPI; + + private ScheduledExecutorService heartbeatExecutor; + + private Map heartbeatFutures = new ConcurrentHashMap<>(); + + private Map polarisWatchers = new ConcurrentHashMap<>(); + + + protected PolarisRegistry(RegistryConfig registryConfig) { + super(registryConfig); + this.properties = new PolarisRegistryProperties(registryConfig.getParameters()); + } + + public static String buildServiceName(AbstractInterfaceConfig config) { + return ConfigUniqueNameGenerator.getUniqueName(config); + } + + @Override + public void init() { + if (providerAPI != null) { + return; + } + + ConfigurationImpl configuration = new ConfigurationImpl(); + //init configuration + configuration.setDefault(); + ServerConnectorConfigImpl serverConnector = configuration.getGlobal().getServerConnector(); + serverConnector.setAddresses(Arrays.asList(registryConfig.getAddress())); + serverConnector.setConnectTimeout((long) registryConfig.getConnectTimeout()); + serverConnector.setProtocol(properties.getConnectorProtocol()); + + providerAPI = DiscoveryAPIFactory.createProviderAPIByConfig(configuration); + consumerAPI = DiscoveryAPIFactory.createConsumerAPIByConfig(configuration); + + int coreSize = properties.getHeartbeatCoreSize(); + heartbeatExecutor = Executors.newScheduledThreadPool(coreSize); + + } + + @Override + public boolean start() { + return true; + } + + @Override + public void register(ProviderConfig config) { + String appName = config.getAppName(); + + if (!registryConfig.isRegister()) { + if (LOGGER.isInfoEnabled(appName)) { + LOGGER.infoWithApp(appName, LogCodes.getLog(LogCodes.INFO_REGISTRY_IGNORE)); + } + return; + } + if (!config.isRegister()) { + return; + } + + try { + List services = buildPolarisRegister(config); + if (CommonUtils.isNotEmpty(services)) { + if (LOGGER.isInfoEnabled(appName)) { + LOGGER.infoWithApp(appName, LogCodes.getLog(LogCodes.INFO_ROUTE_REGISTRY_PUB_START, config.getInterfaceId())); + } + for (InstanceRegisterRequest service : services) { + registerPolarisService(config, service); + if (LOGGER.isInfoEnabled(appName)) { + LOGGER.infoWithApp(appName, LogCodes.getLog(LogCodes.INFO_ROUTE_REGISTRY_PUB, config.getInterfaceId())); + } + } + if (LOGGER.isInfoEnabled(appName)) { + LOGGER.infoWithApp(appName, LogCodes.getLog(LogCodes.INFO_ROUTE_REGISTRY_PUB_OVER, config.getInterfaceId())); + } + } + } catch (SofaRpcRuntimeException e) { + throw e; + } catch (Exception e) { + throw new SofaRpcRuntimeException(LogCodes.getLog(LogCodes.ERROR_REG_PROVIDER, "polarisRegistry", config.buildKey()), e); + } + if (EventBus.isEnable(ProviderPubEvent.class)) { + ProviderPubEvent event = new ProviderPubEvent(config); + EventBus.post(event); + } + + } + + //convert ProviderConfig to polaris registerRequest + public List buildPolarisRegister(ProviderConfig config) { + + List serverConfigs = config.getServer(); + if (CommonUtils.isEmpty(serverConfigs)) { + return Collections.emptyList(); + } + + List requestList = new ArrayList<>(); + for (ServerConfig serverConfig : serverConfigs) { + InstanceRegisterRequest request = new InstanceRegisterRequest(); + request.setNamespace(buildNameSpace(config.getAppName())); + request.setService(buildServiceName(config)); + request.setHost(getServerHost(serverConfig)); + request.setPort(serverConfig.getPort()); + request.setPriority(config.getPriority()); + request.setProtocol(serverConfig.getProtocol()); + request.setWeight(config.getWeight()); + request.setTimeoutMs(config.getTimeout()); + request.setVersion(config.getVersion()); + request.setTtl(properties.getHealthCheckTTL()); + Map metaData = convertProviderToMap(config, serverConfig); + checkAndDelNull(metaData); + request.setMetadata(metaData); + requestList.add(request); + } + return requestList; + } + + private String buildNameSpace(String appName) { + return null == appName ? "sofa-default" : appName; + } + + private void checkAndDelNull(Map metaData) { + metaData.entrySet().removeIf((e) -> e.getValue() == null); + } + + private void registerPolarisService(ProviderConfig config, InstanceRegisterRequest service) { + providerAPI.register(service); + if (service.getTtl() != null) { + ScheduledFuture scheduledFuture = heartbeatExecutor.scheduleAtFixedRate(() -> heartbeatPolaris(service), 0, properties.getHeartbeatInterval(), TimeUnit.MILLISECONDS); + ScheduledFuture oldFuture = heartbeatFutures.put(buildUniqueName(config, service.getProtocol()), scheduledFuture); + if (oldFuture != null) { + oldFuture.cancel(true); + } + } + } + + private void heartbeatPolaris(InstanceRegisterRequest service) { + try { + InstanceHeartbeatRequest instanceHeartbeatRequest = new InstanceHeartbeatRequest(); + instanceHeartbeatRequest.setNamespace(service.getNamespace()); + instanceHeartbeatRequest.setService(service.getService()); + instanceHeartbeatRequest.setHost(service.getHost()); + instanceHeartbeatRequest.setPort(service.getPort()); + providerAPI.heartbeat(instanceHeartbeatRequest); + } catch (Exception e) { + LOGGER.error(LogCodes.getLog(LogCodes.ERROR_CHECK_PASS, "Polaris"), e); + } + } + + @Override + public void unRegister(ProviderConfig config) { + + String appName = config.getAppName(); + + if (!registryConfig.isRegister()) { + if (LOGGER.isInfoEnabled(appName)) { + LOGGER.infoWithApp(appName, LogCodes.getLog(LogCodes.INFO_REGISTRY_IGNORE)); + } + return; + } + if (!config.isRegister()) { + return; + } + + try { + List instanceRegisterRequests = buildPolarisRegister(config); + + for (InstanceRegisterRequest request : instanceRegisterRequests) { + deregisterPolarisService(config, request); + } + + } catch (Exception e) { + if (!RpcRunningState.isShuttingDown()) { + if (e instanceof SofaRpcRuntimeException) { + throw e; + } else { + throw new SofaRpcRuntimeException(LogCodes.getLog(LogCodes.ERROR_UNREG_PROVIDER, EXT_NAME), e); + } + } + } + } + + private void deregisterPolarisService(ProviderConfig config, InstanceRegisterRequest request) { + + InstanceDeregisterRequest instanceDeregisterRequest = new InstanceDeregisterRequest(); + instanceDeregisterRequest.setNamespace(request.getNamespace()); + instanceDeregisterRequest.setService(request.getService()); + instanceDeregisterRequest.setHost(request.getHost()); + instanceDeregisterRequest.setPort(request.getPort()); + providerAPI.deRegister(instanceDeregisterRequest); + + ScheduledFuture future = heartbeatFutures.remove(buildUniqueName(config, request.getProtocol())); + if (future != null) { + future.cancel(true); + } + } + + @Override + public void batchUnRegister(List configs) { + for (ProviderConfig config : configs) { + unRegister(config); + } + } + + @Override + public List subscribe(ConsumerConfig config) { + String appName = config.getAppName(); + if (!registryConfig.isSubscribe()) { + if (LOGGER.isInfoEnabled(appName)) { + LOGGER.infoWithApp(appName, LogCodes.getLog(LogCodes.INFO_REGISTRY_IGNORE)); + } + return null; + } + if (!config.isSubscribe()) { + return null; + } + + try { + List providers = findService(config); + if (EventBus.isEnable(ConsumerSubEvent.class)) { + ConsumerSubEvent event = new ConsumerSubEvent(config); + EventBus.post(event); + } + + return Collections.singletonList(new ProviderGroup().addAll(providers)); + } catch (SofaRpcRuntimeException e) { + throw e; + } catch (Exception e) { + throw new SofaRpcRuntimeException(LogCodes.getLog(LogCodes.ERROR_SUB_PROVIDER, EXT_NAME), e); + } + + } + + private List findService(ConsumerConfig config) { + + String uniqueName = buildUniqueName(config, config.getProtocol()); + PolarisWatcher watcher = polarisWatchers.get(uniqueName); + if (watcher == null) { + watcher = new PolarisWatcher(buildNameSpace(config.getAppName()), buildServiceName(config), config.getProtocol(), consumerAPI, properties); + watcher.init(); + polarisWatchers.put(uniqueName, watcher); + } + watcher.addListener(config.getProviderInfoListener()); + return watcher.currentProviders(); + } + + @Override + public void unSubscribe(ConsumerConfig config) { + String appName = config.getAppName(); + if (!registryConfig.isSubscribe()) { + if (LOGGER.isInfoEnabled(appName)) { + LOGGER.infoWithApp(config.getAppName(), LogCodes.getLog(LogCodes.INFO_REGISTRY_IGNORE)); + } + } + + if (!config.isSubscribe()) { + return; + } + String uniqueName = buildUniqueName(config, config.getProtocol()); + PolarisWatcher informer = polarisWatchers.get(uniqueName); + if (informer == null) { + return; + } + informer.removeListener(config.getProviderInfoListener()); + if (informer.getListenerSize() == 0) { + polarisWatchers.remove(uniqueName); + informer.shutdown(); + } + } + + @Override + public void batchUnSubscribe(List configs) { + for (ConsumerConfig config : configs) { + unSubscribe(config); + } + } + + @Override + public void destroy() { + if (heartbeatExecutor != null) { + heartbeatExecutor.shutdown(); + } + for (PolarisWatcher watcher : polarisWatchers.values()) { + watcher.shutdown(); + } + if (providerAPI != null) { + providerAPI.destroy(); + } + if (consumerAPI != null) { + consumerAPI.destroy(); + } + } +} diff --git a/registry/registry-polaris/src/main/java/com/alipay/sofa/rpc/registry/polaris/PolarisRegistryProperties.java b/registry/registry-polaris/src/main/java/com/alipay/sofa/rpc/registry/polaris/PolarisRegistryProperties.java new file mode 100644 index 000000000..ce8ab6069 --- /dev/null +++ b/registry/registry-polaris/src/main/java/com/alipay/sofa/rpc/registry/polaris/PolarisRegistryProperties.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.registry.polaris; + +import java.util.Collections; +import java.util.Map; +import java.util.function.Function; + +import static com.alipay.sofa.rpc.registry.polaris.PolarisConstants.DEFAULT_HEALTH_CHECK_TTL; +import static com.alipay.sofa.rpc.registry.polaris.PolarisConstants.DEFAULT_HEARTBEAT_CORE_SIZE; +import static com.alipay.sofa.rpc.registry.polaris.PolarisConstants.DEFAULT_HEARTBEAT_INTERVAL; +import static com.alipay.sofa.rpc.registry.polaris.PolarisConstants.DEFAULT_LOOKUP_INTERVAL; +import static com.alipay.sofa.rpc.registry.polaris.PolarisConstants.HEALTH_CHECK_TTL_KEY; +import static com.alipay.sofa.rpc.registry.polaris.PolarisConstants.HEARTBEAT_CORE_SIZE_KEY; +import static com.alipay.sofa.rpc.registry.polaris.PolarisConstants.HEARTBEAT_INTERVAL_KEY; +import static com.alipay.sofa.rpc.registry.polaris.PolarisConstants.LOOKUP_INTERVAL_KEY; +import static com.alipay.sofa.rpc.registry.polaris.PolarisConstants.POLARIS_SERVER_CONNECTOR_PROTOCOL; +import static com.alipay.sofa.rpc.registry.polaris.PolarisConstants.POLARIS_SERVER_CONNECTOR_PROTOCOL_KEY; + +/** + * the properties of polaris registry + * + * @author ZhangLibin + */ +public class PolarisRegistryProperties { + private final Map registryParameters; + + public PolarisRegistryProperties(Map registryParameters) { + if (registryParameters == null) { + registryParameters = Collections.emptyMap(); + } + this.registryParameters = registryParameters; + } + + public String getConnectorProtocol() { + return getString(POLARIS_SERVER_CONNECTOR_PROTOCOL_KEY, POLARIS_SERVER_CONNECTOR_PROTOCOL); + } + + public int getHealthCheckTTL() { + return getInt(HEALTH_CHECK_TTL_KEY, DEFAULT_HEALTH_CHECK_TTL); + } + + public int getHeartbeatInterval() { + return getInt(HEARTBEAT_INTERVAL_KEY, DEFAULT_HEARTBEAT_INTERVAL); + } + + public int getHeartbeatCoreSize() { + return getInt(HEARTBEAT_CORE_SIZE_KEY, DEFAULT_HEARTBEAT_CORE_SIZE); + } + + public int getLookupInterval() { + return getInt(LOOKUP_INTERVAL_KEY, DEFAULT_LOOKUP_INTERVAL); + } + + private int getInt(String key, int defaultValue) { + return get(key, Integer::parseInt, defaultValue); + } + + private String getString(String key, String defaultValue) { + return get(key, Function.identity(), defaultValue); + } + + private T get(String key, Function transform, T defaultValue) { + String value = registryParameters.get(key); + if (value != null) { + return transform.apply(value); + } + return defaultValue; + } + +} diff --git a/registry/registry-polaris/src/main/java/com/alipay/sofa/rpc/registry/polaris/PolarisWatcher.java b/registry/registry-polaris/src/main/java/com/alipay/sofa/rpc/registry/polaris/PolarisWatcher.java new file mode 100644 index 000000000..493edb860 --- /dev/null +++ b/registry/registry-polaris/src/main/java/com/alipay/sofa/rpc/registry/polaris/PolarisWatcher.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.registry.polaris; + +import com.alipay.sofa.rpc.client.ProviderGroup; +import com.alipay.sofa.rpc.client.ProviderHelper; +import com.alipay.sofa.rpc.client.ProviderInfo; +import com.alipay.sofa.rpc.listener.ProviderInfoListener; +import com.alipay.sofa.rpc.log.LogCodes; +import com.alipay.sofa.rpc.log.Logger; +import com.alipay.sofa.rpc.log.LoggerFactory; +import com.tencent.polaris.api.core.ConsumerAPI; +import com.tencent.polaris.api.pojo.Instance; +import com.tencent.polaris.api.rpc.GetAllInstancesRequest; +import com.tencent.polaris.api.rpc.InstancesResponse; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static com.alipay.sofa.rpc.registry.utils.RegistryUtils.convertInstanceToUrl; + +/** + * observe the providers from polaris and notify the consumers + * + * @author ZhangLibin + */ +public class PolarisWatcher { + private static final Logger LOGGER = LoggerFactory.getLogger(PolarisWatcher.class); + + private String nameSpace; + private String serviceName; + private String protocol; + + private InstancesResponse currentData; + + private ConsumerAPI consumerAPI; + + private PolarisRegistryProperties properties; + + private List listeners; + + private ScheduledExecutorService watchExecutor; + + public PolarisWatcher(String nameSpace, String serviceName, String protocol, ConsumerAPI consumerAPI, PolarisRegistryProperties properties) { + this.nameSpace = nameSpace; + this.serviceName = serviceName; + this.protocol = protocol; + this.consumerAPI = consumerAPI; + this.properties = properties; + this.listeners = new ArrayList<>(); + } + + private void watchService() { + try { + GetAllInstancesRequest getAllInstancesRequest = new GetAllInstancesRequest(); + getAllInstancesRequest.setNamespace(nameSpace); + getAllInstancesRequest.setService(serviceName); + Map parameters = new HashMap<>(); + parameters.put("protocol", protocol); + getAllInstancesRequest.setMetadata(parameters); + InstancesResponse response = consumerAPI.getAllInstance(getAllInstancesRequest); + this.currentData = response; + ProviderGroup providerGroup = new ProviderGroup(currentProviders()); + listeners.stream().filter(Objects::nonNull).forEach(l -> l.updateProviders(providerGroup)); + } catch (Exception e) { + LOGGER.error(LogCodes.getLog(LogCodes.ERROR_WATCH_HEALTH, "Polaris"), e); + } + } + + public List currentProviders() { + List providerInfos = new ArrayList<>(); + Instance[] instances = currentData.getInstances(); + for (Instance instance : instances) { + ProviderInfo providerInfo = ProviderHelper.toProviderInfo(convertInstanceToUrl(instance.getHost(), instance.getPort(), currentData.getMetadata())); + providerInfos.add(providerInfo); + } + return providerInfos; + } + + public void init() { + GetAllInstancesRequest getAllInstancesRequest = new GetAllInstancesRequest(); + getAllInstancesRequest.setNamespace(nameSpace); + getAllInstancesRequest.setService(serviceName); + this.currentData = consumerAPI.getAllInstance(getAllInstancesRequest); + this.watchExecutor = Executors.newSingleThreadScheduledExecutor(); + this.watchExecutor.scheduleWithFixedDelay(this::watchService, properties.getLookupInterval(), properties.getLookupInterval(), TimeUnit.MILLISECONDS); + } + + public void addListener(ProviderInfoListener listener) { + listeners.add(listener); + } + + public void removeListener(ProviderInfoListener listener) { + listeners.remove(listener); + } + + public int getListenerSize() { + return listeners.size(); + } + + public void shutdown() { + this.watchExecutor.shutdown(); + } +} diff --git a/registry/registry-polaris/src/main/resources/META-INF/services/sofa-rpc/com.alipay.sofa.rpc.registry.Registry b/registry/registry-polaris/src/main/resources/META-INF/services/sofa-rpc/com.alipay.sofa.rpc.registry.Registry new file mode 100644 index 000000000..521c8c716 --- /dev/null +++ b/registry/registry-polaris/src/main/resources/META-INF/services/sofa-rpc/com.alipay.sofa.rpc.registry.Registry @@ -0,0 +1 @@ +polaris=com.alipay.sofa.rpc.registry.polaris.PolarisRegistry diff --git a/registry/registry-polaris/src/test/java/com/alipay/sofa/rpc/registry/polaris/PolarisRegistryTest.java b/registry/registry-polaris/src/test/java/com/alipay/sofa/rpc/registry/polaris/PolarisRegistryTest.java new file mode 100644 index 000000000..b6193b16f --- /dev/null +++ b/registry/registry-polaris/src/test/java/com/alipay/sofa/rpc/registry/polaris/PolarisRegistryTest.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.registry.polaris; + +import com.alipay.sofa.rpc.client.ProviderGroup; +import com.alipay.sofa.rpc.config.ApplicationConfig; +import com.alipay.sofa.rpc.config.ConsumerConfig; +import com.alipay.sofa.rpc.config.ProviderConfig; +import com.alipay.sofa.rpc.config.RegistryConfig; +import com.alipay.sofa.rpc.config.ServerConfig; +import com.alipay.sofa.rpc.registry.RegistryFactory; +import com.tencent.polaris.api.core.ConsumerAPI; +import com.tencent.polaris.api.exception.PolarisException; +import com.tencent.polaris.api.pojo.ServiceKey; +import com.tencent.polaris.api.rpc.GetAllInstancesRequest; +import com.tencent.polaris.api.rpc.InstancesResponse; +import com.tencent.polaris.factory.api.DiscoveryAPIFactory; +import com.tencent.polaris.test.mock.discovery.NamingServer; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; +import java.util.stream.IntStream; + +import static com.tencent.polaris.api.exception.ErrorCode.SERVER_USER_ERROR; + +public class PolarisRegistryTest { + + private static final String APPNAME = "polaris-test"; + private static final String INTERFACE_ID = "com.alipay.sofa.rpc.registry.polaris.TestService"; + private static final String NAMESPACE = APPNAME; + private static final String SERVICE = "com.alipay.sofa.rpc.registry.polaris.TestService:1.0:polaris-test-1"; + private static final String SERVICE_1 = "com.alipay.sofa.rpc.registry.polaris.TestService:1.0:polaris-test-2"; + + private static NamingServer polaris; + private static RegistryConfig registryConfig; + private static PolarisRegistry registry; + + @BeforeClass + static public void setup() { + polaris = new NamingServer(8091); + polaris.getNamingService().addService(new ServiceKey(NAMESPACE, SERVICE_1)); + + try { + polaris.start(); + } catch (IOException e) { + e.printStackTrace(); + return; + } + + registryConfig = new RegistryConfig() + .setProtocol("polaris") + .setAddress("127.0.0.1:8091") + .setRegister(true); + + registry = (PolarisRegistry) RegistryFactory.getRegistry(registryConfig); + registry.init(); + + } + + @AfterClass + static public void tearDown() { + registry.destroy(); + polaris.terminate(); + } + + @Test + public void testRegister() { + polaris.getNamingService().addService(new ServiceKey(NAMESPACE, SERVICE)); + //register + ProviderConfig providerConfig = providerConfig("polaris-test-1", 12200, 12201, 12202); + registry.register(providerConfig); + //check register + ConsumerAPI consumerAPI = DiscoveryAPIFactory.createConsumerAPI(); + GetAllInstancesRequest getAllInstancesRequest = new GetAllInstancesRequest(); + getAllInstancesRequest.setNamespace(APPNAME); + getAllInstancesRequest.setService(SERVICE); + InstancesResponse allInstance = consumerAPI.getAllInstance(getAllInstancesRequest); + Assert.assertEquals(3, allInstance.getInstances().length); + + //unregister + registry.unRegister(providerConfig); + + //check unregister ,sleep to wait remove catch + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + //if no service will throw a exception + try { + consumerAPI.getAllInstance(getAllInstancesRequest); + } catch (PolarisException e) { + Assert.assertEquals(SERVER_USER_ERROR, e.getCode()); + } + } + + @Test + public void testSubscribe() { + polaris.getNamingService().addService(new ServiceKey(NAMESPACE, SERVICE)); + + //register + ProviderConfig providerConfig = providerConfig("polaris-test-1", 12200, 12201, 12202); + registry.register(providerConfig); + ConsumerConfig consumerConfig = consumerConfig("polaris-test-1"); + //subscribe + List providerGroups = registry.subscribe(consumerConfig); + Assert.assertEquals(1, providerGroups.size()); + Assert.assertEquals(3, providerGroups.get(0).size()); + + //another consumer subscribe, no service for it + ConsumerConfig consumerConfigWithAnotherUniqueId = consumerConfig("polaris-test-2"); + providerGroups = registry.subscribe(consumerConfigWithAnotherUniqueId); + Assert.assertEquals(1, providerGroups.size()); + Assert.assertEquals(0, providerGroups.get(0).size()); + + registry.unSubscribe(consumerConfig); + registry.unSubscribe(consumerConfigWithAnotherUniqueId); + } + + private ProviderConfig providerConfig(String uniqueId, int... ports) { + ProviderConfig provider = new ProviderConfig(); + provider.setInterfaceId(INTERFACE_ID) + .setUniqueId(uniqueId) + .setApplication(new ApplicationConfig().setAppName(APPNAME)) + .setProxy("javassist") + .setRegister(true) + .setRegistry(registryConfig) + .setSerialization("hessian2") + .setWeight(222) + .setTimeout(3000); + + IntStream.of(ports) + .mapToObj(port -> + new ServerConfig() + .setProtocol("bolt") + .setHost("127.0.0.1") + .setPort(port) + ).forEach(provider::setServer); + return provider; + } + + private ConsumerConfig consumerConfig(String uniqueId) { + ConsumerConfig consumer = new ConsumerConfig(); + consumer.setInterfaceId(INTERFACE_ID) + .setUniqueId(uniqueId) + .setApplication(new ApplicationConfig().setAppName(APPNAME)) + .setProxy("javassist") + .setSubscribe(true) + .setSerialization("java") + .setInvokeType("sync") + .setTimeout(4444); + + return consumer; + } + +} diff --git a/registry/registry-polaris/src/test/resources/log4j.xml b/registry/registry-polaris/src/test/resources/log4j.xml new file mode 100644 index 000000000..e95634f16 --- /dev/null +++ b/registry/registry-polaris/src/test/resources/log4j.xml @@ -0,0 +1,16 @@ + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/registry/registry-polaris/src/test/resources/sofa-rpc/rpc-config.json b/registry/registry-polaris/src/test/resources/sofa-rpc/rpc-config.json new file mode 100644 index 000000000..a555027fe --- /dev/null +++ b/registry/registry-polaris/src/test/resources/sofa-rpc/rpc-config.json @@ -0,0 +1,4 @@ +{ + "rpc.config.order": 999, // 加载顺序,越大越后加载 + "logger.impl" : "com.alipay.sofa.rpc.log.SLF4JLoggerImpl" +} \ No newline at end of file