From b84cf02b09a0e153e14f04ebd73aa4c580f1d125 Mon Sep 17 00:00:00 2001 From: zongtanghu Date: Fri, 19 Jun 2020 10:32:40 +0800 Subject: [PATCH 1/6] [#3117]Sink the Notify implementation into common module and optimize some parts. --- .../nacos/common/notify/DefaultPublisher.java | 223 ++++++++++++++ .../alibaba/nacos/common/notify/Event.java | 44 +++ .../nacos/common/notify/EventPublisher.java | 83 +++++ .../nacos/common/notify/NotifyCenter.java | 291 ++++++++++++++++++ .../nacos/common/notify/SlowEvent.java | 28 ++ .../notify/listener/SmartSubscriber.java | 47 +++ .../common/notify/listener/Subscriber.java | 63 ++++ .../nacos/common/utils/BiFunction.java | 43 +++ .../nacos/common/utils/ClassUtils.java | 70 +++++ .../alibaba/nacos/common/utils/MapUtils.java | 18 ++ 10 files changed, 910 insertions(+) create mode 100644 common/src/main/java/com/alibaba/nacos/common/notify/DefaultPublisher.java create mode 100644 common/src/main/java/com/alibaba/nacos/common/notify/Event.java create mode 100644 common/src/main/java/com/alibaba/nacos/common/notify/EventPublisher.java create mode 100644 common/src/main/java/com/alibaba/nacos/common/notify/NotifyCenter.java create mode 100644 common/src/main/java/com/alibaba/nacos/common/notify/SlowEvent.java create mode 100644 common/src/main/java/com/alibaba/nacos/common/notify/listener/SmartSubscriber.java create mode 100644 common/src/main/java/com/alibaba/nacos/common/notify/listener/Subscriber.java create mode 100644 common/src/main/java/com/alibaba/nacos/common/utils/BiFunction.java create mode 100644 common/src/main/java/com/alibaba/nacos/common/utils/ClassUtils.java diff --git a/common/src/main/java/com/alibaba/nacos/common/notify/DefaultPublisher.java b/common/src/main/java/com/alibaba/nacos/common/notify/DefaultPublisher.java new file mode 100644 index 00000000000..4a9edf31929 --- /dev/null +++ b/common/src/main/java/com/alibaba/nacos/common/notify/DefaultPublisher.java @@ -0,0 +1,223 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.common.notify; + +import com.alibaba.nacos.common.notify.listener.Subscriber; +import com.alibaba.nacos.common.notify.listener.SmartSubscriber; +import com.alibaba.nacos.common.utils.ConcurrentHashSet; +import com.alibaba.nacos.common.utils.ThreadUtils; +import com.alibaba.nacos.common.utils.CollectionUtils; +import com.alibaba.nacos.common.utils.ClassUtils; +import com.alibaba.nacos.common.utils.Objects; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +import static com.alibaba.nacos.common.notify.NotifyCenter.RING_BUFFER_SIZE; + +/** + * The default event publisher implementation + *

+ * Internally, use {@link ArrayBlockingQueue } as a message staging queue + * + * @author liaochuntao + * @author zongtanghu + */ +public class DefaultPublisher extends Thread implements EventPublisher { + + private static final Logger LOGGER = LoggerFactory.getLogger(NotifyCenter.class); + + private volatile boolean initialized = false; + + private volatile boolean shutdown = false; + + private Class eventType; + + private final ConcurrentHashSet subscribers = new ConcurrentHashSet(); + + private int queueMaxSize = -1; + + private BlockingQueue queue; + + private volatile Long lastEventSequence = -1L; + + private final AtomicReferenceFieldUpdater updater = AtomicReferenceFieldUpdater + .newUpdater(DefaultPublisher.class, Long.class, "lastEventSequence"); + + @Override + public void init(Class type, int bufferSize) { + setDaemon(true); + setName("nacos.publisher-" + type.getName()); + this.eventType = type; + this.queueMaxSize = bufferSize; + this.queue = new ArrayBlockingQueue(bufferSize); + start(); + } + + public ConcurrentHashSet getSubscribers() { + return subscribers; + } + + @Override + public synchronized void start() { + if (!initialized) { + // start just called once + super.start(); + if (queueMaxSize == -1) { + queueMaxSize = RING_BUFFER_SIZE; + } + initialized = true; + } + } + + public long currentEventSize() { + return queue.size(); + } + + @Override + public void run() { + openEventHandler(); + } + + void openEventHandler() { + try { + // To ensure that messages are not lost, enable EventHandler when + // waiting for the first Subscriber to register + for (; ; ) { + if (shutdown || hasSubscriber()) { + break; + } + ThreadUtils.sleep(1000L); + } + + for (; ; ) { + if (shutdown) { + break; + } + final Event event = queue.take(); + receiveEvent(event); + updater.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence())); + } + } catch (Throwable ex) { + LOGGER.error("Event listener exception : {}", ex); + } + } + + private boolean hasSubscriber() { + return CollectionUtils.isNotEmpty(subscribers) || CollectionUtils.isNotEmpty(SMART_SUBSCRIBERS); + } + + @Override + public void addSubscriber(Subscriber subscriber) { + subscribers.add(subscriber); + } + + @Override + public void unSubscriber(Subscriber subscriber) { + subscribers.remove(subscriber); + } + + @Override + public boolean publish(Event event) { + checkIsStart(); + boolean success = this.queue.offer(event); + if (!success) { + LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event); + receiveEvent(event); + return true; + } + return true; + } + + void checkIsStart() { + if (!initialized) { + throw new IllegalStateException("Publisher does not start"); + } + } + + @Override + public void shutdown() { + this.shutdown = true; + this.queue.clear(); + } + + public boolean isInitialized() { + return initialized; + } + + void receiveEvent(Event event) { + final long currentEventSequence = event.sequence(); + final String sourceName = ClassUtils.getName(event); + + // Notification single event listener + for (Subscriber subscriber : subscribers) { + // Whether to ignore expiration events + if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) { + LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire", + event.getClass()); + continue; + } + + final String targetName = ClassUtils.getName(subscriber.subscribeType()); + + if (!Objects.equals(sourceName, targetName)) { + continue; + } + + notifySubscriber(subscriber, event); + } + + // Notification multi-event event listener + for (SmartSubscriber subscriber : SMART_SUBSCRIBERS) { + // If you are a multi-event listener, you need to make additional logical judgments + if (!subscriber.canNotify(event)) { + LOGGER.debug("[NotifyCenter] the {} is unacceptable to this multi-event subscriber", event.getClass()); + continue; + } + notifySubscriber(subscriber, event); + } + } + + @Override + public void notifySubscriber(final Subscriber subscriber, final Event event) { + + LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber); + + final Runnable job = new Runnable() { + @Override + public void run() { + subscriber.onEvent(event); + } + }; + + final Executor executor = subscriber.executor(); + + if (executor != null) { + executor.execute(job); + } else { + try { + job.run(); + } catch (Throwable e) { + LOGGER.error("Event callback exception : {}", e); + } + } + } +} diff --git a/common/src/main/java/com/alibaba/nacos/common/notify/Event.java b/common/src/main/java/com/alibaba/nacos/common/notify/Event.java new file mode 100644 index 00000000000..22c09222158 --- /dev/null +++ b/common/src/main/java/com/alibaba/nacos/common/notify/Event.java @@ -0,0 +1,44 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.common.notify; + +import java.io.Serializable; +import java.util.concurrent.atomic.AtomicLong; + +/** + * An abstract class for event. + * + * @author liaochuntao + * @author zongtanghu + */ +@SuppressWarnings("all") +public abstract class Event implements Serializable { + + private static final AtomicLong SEQUENCE = new AtomicLong(0); + + private long no = SEQUENCE.getAndIncrement(); + + /** + * Event sequence number, which can be used to handle the sequence of events + * + * @return sequence num, It's best to make sure it's monotone + */ + public long sequence() { + return no; + } +} + diff --git a/common/src/main/java/com/alibaba/nacos/common/notify/EventPublisher.java b/common/src/main/java/com/alibaba/nacos/common/notify/EventPublisher.java new file mode 100644 index 00000000000..9dc5b896676 --- /dev/null +++ b/common/src/main/java/com/alibaba/nacos/common/notify/EventPublisher.java @@ -0,0 +1,83 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.common.notify; + +import com.alibaba.nacos.common.lifecycle.Closeable; +import com.alibaba.nacos.common.notify.listener.Subscriber; +import com.alibaba.nacos.common.notify.listener.SmartSubscriber; +import com.alibaba.nacos.common.utils.ConcurrentHashSet; + +import java.util.Set; + +/** + * Event publisher + * + * @author liaochuntao + * @author zongtanghu + */ +public interface EventPublisher extends Closeable { + + /** + * Multi-event listener collection list + */ + Set SMART_SUBSCRIBERS = new ConcurrentHashSet(); + + /** + * Initializes the event publisher + * + * @param type {@link Class} + * @param bufferSize Message staging queue size + */ + void init(Class type, int bufferSize); + + /** + * The number of currently staged events + * + * @return event size + */ + long currentEventSize(); + + /** + * Add listener + * + * @param subscribe {@link Subscriber} + */ + void addSubscriber(Subscriber subscribe); + + /** + * Remove listener + * + * @param subscriber {@link Subscriber} + */ + void unSubscriber(Subscriber subscriber); + + /** + * publish event + * + * @param event {@link Event} + * @return publish event is success + */ + boolean publish(Event event); + + /** + * Notify listener + * + * @param subscriber {@link Subscriber} + * @param event {@link Event} + */ + void notifySubscriber(Subscriber subscriber, Event event); +} \ No newline at end of file diff --git a/common/src/main/java/com/alibaba/nacos/common/notify/NotifyCenter.java b/common/src/main/java/com/alibaba/nacos/common/notify/NotifyCenter.java new file mode 100644 index 00000000000..5889387383a --- /dev/null +++ b/common/src/main/java/com/alibaba/nacos/common/notify/NotifyCenter.java @@ -0,0 +1,291 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.common.notify; + +import com.alibaba.nacos.common.JustForTest; +import com.alibaba.nacos.common.notify.listener.Subscriber; +import com.alibaba.nacos.common.notify.listener.SmartSubscriber; +import com.alibaba.nacos.common.utils.BiFunction; +import com.alibaba.nacos.common.utils.ClassUtils; +import com.alibaba.nacos.common.utils.MapUtils; +import com.alibaba.nacos.common.utils.ThreadUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.ServiceLoader; +import java.util.Set; +import java.util.NoSuchElementException; +import java.util.Iterator; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * @author liaochuntao + * @author zongtanghu + */ +@SuppressWarnings("all") +public class NotifyCenter { + + private static final Logger LOGGER = LoggerFactory.getLogger(NotifyCenter.class); + + public static int RING_BUFFER_SIZE = 16384; + + public static int SHARE_BUFFER_SIZE = 1024; + + private static final AtomicBoolean CLOSED = new AtomicBoolean(false); + + private static BiFunction, Integer, EventPublisher> BUILD_FACTORY = null; + + private static final NotifyCenter INSTANCE = new NotifyCenter(); + + private EventPublisher sharePublisher; + + private static Class clazz = null; + + /** + * Publisher management container + */ + private final Map publisherMap = new ConcurrentHashMap(16); + + static { + // Internal ArrayBlockingQueue buffer size. For applications with high write throughput, + // this value needs to be increased appropriately. default value is 16384 + String ringBufferSizeProperty = "nacos.core.notify.ring-buffer-size"; + RING_BUFFER_SIZE = Integer.getInteger(ringBufferSizeProperty, 16384); + + // The size of the public publisher's message staging queue buffer + String shareBufferSizeProperty = "nacos.core.notify.share-buffer-size"; + SHARE_BUFFER_SIZE = Integer.getInteger(shareBufferSizeProperty, 1024); + + final ServiceLoader loader = ServiceLoader.load(EventPublisher.class); + Iterator iterator = loader.iterator(); + + if (iterator.hasNext()) { + clazz = iterator.next().getClass(); + } else { + clazz = DefaultPublisher.class; + } + + BUILD_FACTORY = new BiFunction, Integer, EventPublisher>() { + + @Override + public EventPublisher apply(Class cls, Integer buffer) { + try { + EventPublisher publisher = clazz.newInstance(); + publisher.init(cls, buffer); + return publisher; + } catch (Exception ex) { + LOGGER.error("Service class newInstance has error : {}", ex); + } + return null; + } + }; + + INSTANCE.sharePublisher = BUILD_FACTORY.apply(SlowEvent.class, SHARE_BUFFER_SIZE); + + ThreadUtils.addShutdownHook(new Runnable() { + @Override + public void run() { + shutdown(); + } + }); + } + + @JustForTest + public static Map getPublisherMap() { + return INSTANCE.publisherMap; + } + + @JustForTest + public static EventPublisher getPublisher(Class topic) { + if (ClassUtils.isAssignableFrom(SlowEvent.class, topic)) { + return INSTANCE.sharePublisher; + } + return INSTANCE.publisherMap.get(topic.getCanonicalName()); + } + + @JustForTest + public static Set getSmartSubscribers() { + return EventPublisher.SMART_SUBSCRIBERS; + } + + @JustForTest + public static EventPublisher getSharePublisher() { + return INSTANCE.sharePublisher; + } + + private static final AtomicBoolean closed = new AtomicBoolean(false); + + public static void shutdown() { + if (!closed.compareAndSet(false, true)) { + return; + } + LOGGER.warn("[NotifyCenter] Start destroying Publisher"); + + for (Map.Entry entry : INSTANCE.publisherMap.entrySet()) { + try { + EventPublisher eventPublisher = entry.getValue(); + eventPublisher.shutdown(); + } catch (Throwable e) { + LOGGER.error("[EventPublisher] shutdown has error : {}", e); + } + } + + try { + INSTANCE.sharePublisher.shutdown(); + } catch (Throwable e) { + LOGGER.error("[SharePublisher] shutdown has error : {}", e); + } + + LOGGER.warn("[NotifyCenter] Destruction of the end"); + } + + /** + * Register a Subscriber. If the Publisher concerned by the Subscriber does not exist, then PublihserMap will + * preempt a placeholder Publisher first. not call {@link Publisher#start()} + * + * @param eventType Types of events that Subscriber cares about + * @param consumer subscriber + * @param event type + */ + public static void registerSubscriber(final Subscriber consumer) { + final Class cls = consumer.subscribeType(); + // If you want to listen to multiple events, you do it separately, + // without automatically registering the appropriate publisher + if (consumer instanceof SmartSubscriber) { + EventPublisher.SMART_SUBSCRIBERS.add((SmartSubscriber) consumer); + return; + } + + if (ClassUtils.isAssignableFrom(SlowEvent.class, cls)) { + INSTANCE.sharePublisher.addSubscriber(consumer); + return; + } + final String topic = ClassUtils.getCanonicalName(consumer.subscribeType()); + MapUtils.computeIfAbsent(INSTANCE.publisherMap, topic, BUILD_FACTORY, cls, RING_BUFFER_SIZE); + EventPublisher publisher = INSTANCE.publisherMap.get(topic); + publisher.addSubscriber(consumer); + } + + /** + * deregister subscriber + * + * @param consumer subscriber + * @param + */ + public static void deregisterSubscribe(final Subscriber consumer) { + final Class cls = consumer.subscribeType(); + if (consumer instanceof SmartSubscriber) { + EventPublisher.SMART_SUBSCRIBERS.remove((SmartSubscriber) consumer); + return; + } + if (ClassUtils.isAssignableFrom(SlowEvent.class, cls)) { + INSTANCE.sharePublisher.unSubscriber(consumer); + return; + } + final String topic = ClassUtils.getCanonicalName(consumer.subscribeType()); + if (INSTANCE.publisherMap.containsKey(topic)) { + EventPublisher publisher = INSTANCE.publisherMap.get(topic); + publisher.unSubscriber(consumer); + return; + } + throw new NoSuchElementException("The subcriber has no event publisher"); + } + + /** + * request publisher publish event Publishers load lazily, calling publisher. Start () only when the event is + * actually published + * + * @param event + */ + public static boolean publishEvent(final Event event) { + try { + return publishEvent(event.getClass(), event); + } catch (Throwable ex) { + LOGGER.error("There was an exception to the message publishing : {}", ex); + return false; + } + } + + /** + * request publisher publish event Publishers load lazily, calling publisher. Start () only when the event is + * actually published + * + * @param eventType + * @param event + */ + private static boolean publishEvent(final Class eventType, final Event event) { + final String topic = ClassUtils.getCanonicalName(eventType); + if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) { + return INSTANCE.sharePublisher.publish(event); + } + + if (INSTANCE.publisherMap.containsKey(topic)) { + EventPublisher publisher = INSTANCE.publisherMap.get(topic); + return publisher.publish(event); + } + throw new NoSuchElementException("There are no [" + topic + "] publishers for this event, please register"); + } + + /** + * register to share-publisher + * + * @param supplier + * @param eventType + * @return + */ + public static EventPublisher registerToSharePublisher(final Class eventType) { + return INSTANCE.sharePublisher; + } + + /** + * register publisher + * + * @param supplier + * @param eventType + * @param queueMaxSize + * @return + */ + public static EventPublisher registerToPublisher(final Class eventType, final int queueMaxSize) { + if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) { + return INSTANCE.sharePublisher; + } + + final String topic = ClassUtils.getCanonicalName(eventType); + MapUtils.computeIfAbsent(INSTANCE.publisherMap, topic, BUILD_FACTORY, eventType, queueMaxSize); + EventPublisher publisher = INSTANCE.publisherMap.get(topic); + return publisher; + } + + /** + * deregister publisher + * + * @param eventType + * @return + */ + public static void deregisterPublisher(final Class eventType) { + final String topic = ClassUtils.getCanonicalName(eventType); + EventPublisher publisher = INSTANCE.publisherMap.remove(topic); + try { + publisher.shutdown(); + } catch (Throwable ex) { + LOGGER.error("There was an exception when publisher shutdown : {}", ex); + } + } + +} \ No newline at end of file diff --git a/common/src/main/java/com/alibaba/nacos/common/notify/SlowEvent.java b/common/src/main/java/com/alibaba/nacos/common/notify/SlowEvent.java new file mode 100644 index 00000000000..4b79f39e5e4 --- /dev/null +++ b/common/src/main/java/com/alibaba/nacos/common/notify/SlowEvent.java @@ -0,0 +1,28 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.common.notify; + +/** + * This event share one event-queue。 + * + * @author liaochuntao + * @author zongtanghu + */ +@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule") +public abstract class SlowEvent extends Event { + +} \ No newline at end of file diff --git a/common/src/main/java/com/alibaba/nacos/common/notify/listener/SmartSubscriber.java b/common/src/main/java/com/alibaba/nacos/common/notify/listener/SmartSubscriber.java new file mode 100644 index 00000000000..68b189e024b --- /dev/null +++ b/common/src/main/java/com/alibaba/nacos/common/notify/listener/SmartSubscriber.java @@ -0,0 +1,47 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.common.notify.listener; + +import com.alibaba.nacos.common.notify.Event; + +/** + * Subscribers to multiple events can be listened to + * + * @author liaochuntao + * @author zongtanghu + */ +@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule") +public abstract class SmartSubscriber extends Subscriber { + + /** + * Determines if the processing message is acceptable + * + * @param event {@link Event} + * @return Determines if the processing message is acceptable + */ + public abstract boolean canNotify(Event event); + + @Override + public final Class subscribeType() { + return null; + } + + @Override + public final boolean ignoreExpireEvent() { + return false; + } +} diff --git a/common/src/main/java/com/alibaba/nacos/common/notify/listener/Subscriber.java b/common/src/main/java/com/alibaba/nacos/common/notify/listener/Subscriber.java new file mode 100644 index 00000000000..af36777a133 --- /dev/null +++ b/common/src/main/java/com/alibaba/nacos/common/notify/listener/Subscriber.java @@ -0,0 +1,63 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.common.notify.listener; + +import com.alibaba.nacos.common.notify.Event; + +import java.util.concurrent.Executor; + +/** + * An abstract subscriber class for subscriber interface. + * + * @author liaochuntao + * @author zongtanghu + */ +@SuppressWarnings("all") +public abstract class Subscriber { + + /** + * Event callback + * + * @param event {@link Event} + */ + public abstract void onEvent(T event); + + /** + * Type of this subscriber's subscription + * + * @return Class which extends {@link Event} + */ + public abstract Class subscribeType(); + + /** + * It is up to the listener to determine whether the callback is asynchronous or synchronous + * + * @return {@link Executor} + */ + public Executor executor() { + return null; + } + + /** + * Whether to ignore expired events + * + * @return default value is {@link Boolean#FALSE} + */ + public boolean ignoreExpireEvent() { + return false; + } +} diff --git a/common/src/main/java/com/alibaba/nacos/common/utils/BiFunction.java b/common/src/main/java/com/alibaba/nacos/common/utils/BiFunction.java new file mode 100644 index 00000000000..e97988d3e24 --- /dev/null +++ b/common/src/main/java/com/alibaba/nacos/common/utils/BiFunction.java @@ -0,0 +1,43 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.nacos.common.utils; + + +/** + * Represents a function that accepts two arguments and produces a result. + * + *

This is a functional interface + * whose functional method is {@link #apply(Object, Object)}. + * + * @author zongtanghu + * + */ +public interface BiFunction { + + // The following utility functions are extracted from org.apache.commons.lang3 + // start + + /** + * Applies this function to the given arguments. + * + * @param t the first function argument + * @param u the second function argument + * @return the function result + */ + R apply(T t, U u); + + // end +} diff --git a/common/src/main/java/com/alibaba/nacos/common/utils/ClassUtils.java b/common/src/main/java/com/alibaba/nacos/common/utils/ClassUtils.java new file mode 100644 index 00000000000..01b76202342 --- /dev/null +++ b/common/src/main/java/com/alibaba/nacos/common/utils/ClassUtils.java @@ -0,0 +1,70 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.common.utils; + +/** + * Utils for Class. + * + * @author liaochuntao + */ +@SuppressWarnings("all") +public final class ClassUtils { + + public static Class findClassByName(String className) { + try { + return Class.forName(className); + } catch (Exception e) { + throw new RuntimeException("this class name not found"); + } + } + + public static String getName(Object obj) { + Objects.requireNonNull(obj, "obj"); + return obj.getClass().getName(); + } + + public static String getCanonicalName(Object obj) { + Objects.requireNonNull(obj, "obj"); + return obj.getClass().getCanonicalName(); + } + + public static boolean isAssignableFrom(Class clazz, Class cls) { + Objects.requireNonNull(cls, "cls"); + return clazz.isAssignableFrom(cls); + } + + public static String getSimplaName(Object obj) { + Objects.requireNonNull(obj, "obj"); + return obj.getClass().getSimpleName(); + } + + public static String getName(Class cls) { + Objects.requireNonNull(cls, "cls"); + return cls.getName(); + } + + public static String getCanonicalName(Class cls) { + Objects.requireNonNull(cls, "cls"); + return cls.getCanonicalName(); + } + + public static String getSimplaName(Class cls) { + Objects.requireNonNull(cls, "cls"); + return cls.getSimpleName(); + } + +} diff --git a/common/src/main/java/com/alibaba/nacos/common/utils/MapUtils.java b/common/src/main/java/com/alibaba/nacos/common/utils/MapUtils.java index 2a33a28bb88..343d8a901f8 100644 --- a/common/src/main/java/com/alibaba/nacos/common/utils/MapUtils.java +++ b/common/src/main/java/com/alibaba/nacos/common/utils/MapUtils.java @@ -18,6 +18,7 @@ import java.util.Collection; import java.util.Dictionary; +import java.util.HashMap; import java.util.Map; /** @@ -124,4 +125,21 @@ public static void putIfValNoEmpty(Map target, Object key, Object value) { } } + public static Object computeIfAbsent(Map target, Object key, BiFunction mappingFunction, Object param1, + Object param2) { + + Objects.requireNonNull(key, "key"); + Objects.requireNonNull(key, "mappingFunction"); + Objects.requireNonNull(key, "param1"); + Objects.requireNonNull(key, "param2"); + + Object val = target.get(key); + if (val == null) { + Object ret = mappingFunction.apply(param1, param2); + target.put(key, ret); + return ret; + } + return val; + } + } From 99a74153d5c9d9b7a14585dd15f89283d2d7d723 Mon Sep 17 00:00:00 2001 From: zongtanghu Date: Fri, 19 Jun 2020 11:13:15 +0800 Subject: [PATCH 2/6] [#3117]fix typo and reformat code styles. --- .../nacos/common/notify/DefaultPublisher.java | 6 +++--- .../nacos/common/notify/EventPublisher.java | 18 +++++++++--------- .../nacos/common/notify/NotifyCenter.java | 2 ++ .../alibaba/nacos/common/notify/SlowEvent.java | 2 +- .../notify/listener/SmartSubscriber.java | 4 ++-- .../alibaba/nacos/common/utils/BiFunction.java | 2 +- .../alibaba/nacos/common/utils/MapUtils.java | 11 ++++++++++- 7 files changed, 28 insertions(+), 17 deletions(-) diff --git a/common/src/main/java/com/alibaba/nacos/common/notify/DefaultPublisher.java b/common/src/main/java/com/alibaba/nacos/common/notify/DefaultPublisher.java index 4a9edf31929..80e2cadd595 100644 --- a/common/src/main/java/com/alibaba/nacos/common/notify/DefaultPublisher.java +++ b/common/src/main/java/com/alibaba/nacos/common/notify/DefaultPublisher.java @@ -34,9 +34,9 @@ import static com.alibaba.nacos.common.notify.NotifyCenter.RING_BUFFER_SIZE; /** - * The default event publisher implementation - *

- * Internally, use {@link ArrayBlockingQueue } as a message staging queue + * The default event publisher implementation. + * + *

Internally, use {@link ArrayBlockingQueue } as a message staging queue. * * @author liaochuntao * @author zongtanghu diff --git a/common/src/main/java/com/alibaba/nacos/common/notify/EventPublisher.java b/common/src/main/java/com/alibaba/nacos/common/notify/EventPublisher.java index 9dc5b896676..23fe8e2a92f 100644 --- a/common/src/main/java/com/alibaba/nacos/common/notify/EventPublisher.java +++ b/common/src/main/java/com/alibaba/nacos/common/notify/EventPublisher.java @@ -24,7 +24,7 @@ import java.util.Set; /** - * Event publisher + * Event publisher. * * @author liaochuntao * @author zongtanghu @@ -32,41 +32,41 @@ public interface EventPublisher extends Closeable { /** - * Multi-event listener collection list + * Multi-event listener collection list. */ Set SMART_SUBSCRIBERS = new ConcurrentHashSet(); /** - * Initializes the event publisher + * Initializes the event publisher. * - * @param type {@link Class} + * @param type {@link Event >} * @param bufferSize Message staging queue size */ void init(Class type, int bufferSize); /** - * The number of currently staged events + * The number of currently staged events. * * @return event size */ long currentEventSize(); /** - * Add listener + * Add listener. * * @param subscribe {@link Subscriber} */ void addSubscriber(Subscriber subscribe); /** - * Remove listener + * Remove listener. * * @param subscriber {@link Subscriber} */ void unSubscriber(Subscriber subscriber); /** - * publish event + * publish event. * * @param event {@link Event} * @return publish event is success @@ -74,7 +74,7 @@ public interface EventPublisher extends Closeable { boolean publish(Event event); /** - * Notify listener + * Notify listener. * * @param subscriber {@link Subscriber} * @param event {@link Event} diff --git a/common/src/main/java/com/alibaba/nacos/common/notify/NotifyCenter.java b/common/src/main/java/com/alibaba/nacos/common/notify/NotifyCenter.java index 5889387383a..a197355afd3 100644 --- a/common/src/main/java/com/alibaba/nacos/common/notify/NotifyCenter.java +++ b/common/src/main/java/com/alibaba/nacos/common/notify/NotifyCenter.java @@ -35,6 +35,8 @@ import java.util.concurrent.atomic.AtomicBoolean; /** + * Unified Event Notify Center. + * * @author liaochuntao * @author zongtanghu */ diff --git a/common/src/main/java/com/alibaba/nacos/common/notify/SlowEvent.java b/common/src/main/java/com/alibaba/nacos/common/notify/SlowEvent.java index 4b79f39e5e4..6b556490678 100644 --- a/common/src/main/java/com/alibaba/nacos/common/notify/SlowEvent.java +++ b/common/src/main/java/com/alibaba/nacos/common/notify/SlowEvent.java @@ -17,7 +17,7 @@ package com.alibaba.nacos.common.notify; /** - * This event share one event-queue。 + * This event share one event-queue. * * @author liaochuntao * @author zongtanghu diff --git a/common/src/main/java/com/alibaba/nacos/common/notify/listener/SmartSubscriber.java b/common/src/main/java/com/alibaba/nacos/common/notify/listener/SmartSubscriber.java index 68b189e024b..da2eed4f4f5 100644 --- a/common/src/main/java/com/alibaba/nacos/common/notify/listener/SmartSubscriber.java +++ b/common/src/main/java/com/alibaba/nacos/common/notify/listener/SmartSubscriber.java @@ -19,7 +19,7 @@ import com.alibaba.nacos.common.notify.Event; /** - * Subscribers to multiple events can be listened to + * Subscribers to multiple events can be listened to. * * @author liaochuntao * @author zongtanghu @@ -28,7 +28,7 @@ public abstract class SmartSubscriber extends Subscriber { /** - * Determines if the processing message is acceptable + * Determines if the processing message is acceptable. * * @param event {@link Event} * @return Determines if the processing message is acceptable diff --git a/common/src/main/java/com/alibaba/nacos/common/utils/BiFunction.java b/common/src/main/java/com/alibaba/nacos/common/utils/BiFunction.java index e97988d3e24..0ce44ccc0b7 100644 --- a/common/src/main/java/com/alibaba/nacos/common/utils/BiFunction.java +++ b/common/src/main/java/com/alibaba/nacos/common/utils/BiFunction.java @@ -13,8 +13,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.alibaba.nacos.common.utils; +package com.alibaba.nacos.common.utils; /** * Represents a function that accepts two arguments and produces a result. diff --git a/common/src/main/java/com/alibaba/nacos/common/utils/MapUtils.java b/common/src/main/java/com/alibaba/nacos/common/utils/MapUtils.java index 343d8a901f8..12d6aa87ab2 100644 --- a/common/src/main/java/com/alibaba/nacos/common/utils/MapUtils.java +++ b/common/src/main/java/com/alibaba/nacos/common/utils/MapUtils.java @@ -18,7 +18,6 @@ import java.util.Collection; import java.util.Dictionary; -import java.util.HashMap; import java.util.Map; /** @@ -125,6 +124,16 @@ public static void putIfValNoEmpty(Map target, Object key, Object value) { } } + /** + * ComputeIfAbsent lazy load. + * + * @param target target Map data. + * @param key map key. + * @param mappingFunction funtion which is need to be executed. + * @param param1 function's parameter value1. + * @param param2 function's parameter value1. + * @return + */ public static Object computeIfAbsent(Map target, Object key, BiFunction mappingFunction, Object param1, Object param2) { From c5f7e6c2315ac3017b6e545eeb2b5c728f3efe5e Mon Sep 17 00:00:00 2001 From: zongtanghu Date: Fri, 19 Jun 2020 14:33:04 +0800 Subject: [PATCH 3/6] [#3117]fix typo and reformat code styles. --- .../alibaba/nacos/common/NotThreadSafe.java | 35 +++++++++ .../nacos/common/notify/DefaultPublisher.java | 2 +- .../alibaba/nacos/common/notify/Event.java | 10 +-- .../nacos/common/notify/EventPublisher.java | 6 +- .../nacos/common/notify/NotifyCenter.java | 72 ++++++++++--------- .../nacos/common/notify/SlowEvent.java | 6 +- .../nacos/common/utils/BiFunction.java | 15 ++-- .../nacos/common/utils/ClassUtils.java | 57 ++++++++++++++- .../alibaba/nacos/common/utils/MapUtils.java | 6 +- 9 files changed, 155 insertions(+), 54 deletions(-) create mode 100644 common/src/main/java/com/alibaba/nacos/common/NotThreadSafe.java diff --git a/common/src/main/java/com/alibaba/nacos/common/NotThreadSafe.java b/common/src/main/java/com/alibaba/nacos/common/NotThreadSafe.java new file mode 100644 index 00000000000..8076d2afa6f --- /dev/null +++ b/common/src/main/java/com/alibaba/nacos/common/NotThreadSafe.java @@ -0,0 +1,35 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.common; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Annotation that marks a method as not thread safe. + * + * @author zongtanghu + */ +@Documented +@Target({ElementType.TYPE, ElementType.METHOD}) +@Retention(RetentionPolicy.SOURCE) +public @interface NotThreadSafe { + +} diff --git a/common/src/main/java/com/alibaba/nacos/common/notify/DefaultPublisher.java b/common/src/main/java/com/alibaba/nacos/common/notify/DefaultPublisher.java index 80e2cadd595..4a9f3192fac 100644 --- a/common/src/main/java/com/alibaba/nacos/common/notify/DefaultPublisher.java +++ b/common/src/main/java/com/alibaba/nacos/common/notify/DefaultPublisher.java @@ -131,7 +131,7 @@ public void addSubscriber(Subscriber subscriber) { } @Override - public void unSubscriber(Subscriber subscriber) { + public void removeSubscriber(Subscriber subscriber) { subscribers.remove(subscriber); } diff --git a/common/src/main/java/com/alibaba/nacos/common/notify/Event.java b/common/src/main/java/com/alibaba/nacos/common/notify/Event.java index 22c09222158..21a4cdcbe36 100644 --- a/common/src/main/java/com/alibaba/nacos/common/notify/Event.java +++ b/common/src/main/java/com/alibaba/nacos/common/notify/Event.java @@ -25,17 +25,17 @@ * @author liaochuntao * @author zongtanghu */ -@SuppressWarnings("all") +@SuppressWarnings({"PMD.AbstractClassShouldStartWithAbstractNamingRule", "PMD.ConstantFieldShouldBeUpperCaseRule"}) public abstract class Event implements Serializable { - private static final AtomicLong SEQUENCE = new AtomicLong(0); + private static final AtomicLong sequence = new AtomicLong(0); - private long no = SEQUENCE.getAndIncrement(); + private final long no = sequence.getAndIncrement(); /** - * Event sequence number, which can be used to handle the sequence of events + * Event sequence number, which can be used to handle the sequence of events. * - * @return sequence num, It's best to make sure it's monotone + * @return sequence num, It's best to make sure it's monotone. */ public long sequence() { return no; diff --git a/common/src/main/java/com/alibaba/nacos/common/notify/EventPublisher.java b/common/src/main/java/com/alibaba/nacos/common/notify/EventPublisher.java index 23fe8e2a92f..a8b254baa51 100644 --- a/common/src/main/java/com/alibaba/nacos/common/notify/EventPublisher.java +++ b/common/src/main/java/com/alibaba/nacos/common/notify/EventPublisher.java @@ -54,16 +54,16 @@ public interface EventPublisher extends Closeable { /** * Add listener. * - * @param subscribe {@link Subscriber} + * @param subscriber {@link Subscriber} */ - void addSubscriber(Subscriber subscribe); + void addSubscriber(Subscriber subscriber); /** * Remove listener. * * @param subscriber {@link Subscriber} */ - void unSubscriber(Subscriber subscriber); + void removeSubscriber(Subscriber subscriber); /** * publish event. diff --git a/common/src/main/java/com/alibaba/nacos/common/notify/NotifyCenter.java b/common/src/main/java/com/alibaba/nacos/common/notify/NotifyCenter.java index a197355afd3..13243d70db5 100644 --- a/common/src/main/java/com/alibaba/nacos/common/notify/NotifyCenter.java +++ b/common/src/main/java/com/alibaba/nacos/common/notify/NotifyCenter.java @@ -16,6 +16,7 @@ package com.alibaba.nacos.common.notify; +import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.common.JustForTest; import com.alibaba.nacos.common.notify.listener.Subscriber; import com.alibaba.nacos.common.notify.listener.SmartSubscriber; @@ -34,13 +35,15 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import static com.alibaba.nacos.api.exception.NacosException.SERVER_ERROR; + /** * Unified Event Notify Center. * * @author liaochuntao * @author zongtanghu */ -@SuppressWarnings("all") +@SuppressWarnings("PMD.ConstantFieldShouldBeUpperCaseRule") public class NotifyCenter { private static final Logger LOGGER = LoggerFactory.getLogger(NotifyCenter.class); @@ -60,7 +63,7 @@ public class NotifyCenter { private static Class clazz = null; /** - * Publisher management container + * Publisher management container. */ private final Map publisherMap = new ConcurrentHashMap(16); @@ -86,19 +89,23 @@ public class NotifyCenter { BUILD_FACTORY = new BiFunction, Integer, EventPublisher>() { @Override - public EventPublisher apply(Class cls, Integer buffer) { + public EventPublisher apply(Class cls, Integer buffer) throws NacosException { try { EventPublisher publisher = clazz.newInstance(); publisher.init(cls, buffer); return publisher; - } catch (Exception ex) { + } catch (Throwable ex) { LOGGER.error("Service class newInstance has error : {}", ex); + throw new NacosException(SERVER_ERROR, ex); } - return null; } }; - INSTANCE.sharePublisher = BUILD_FACTORY.apply(SlowEvent.class, SHARE_BUFFER_SIZE); + try { + INSTANCE.sharePublisher = BUILD_FACTORY.apply(SlowEvent.class, SHARE_BUFFER_SIZE); + } catch (Throwable ex) { + LOGGER.error("Service class newInstance has error : {}", ex); + } ThreadUtils.addShutdownHook(new Runnable() { @Override @@ -133,6 +140,9 @@ public static EventPublisher getSharePublisher() { private static final AtomicBoolean closed = new AtomicBoolean(false); + /** + * Shutdown the serveral publisher instance which notifycenter has. + */ public static void shutdown() { if (!closed.compareAndSet(false, true)) { return; @@ -159,13 +169,12 @@ public static void shutdown() { /** * Register a Subscriber. If the Publisher concerned by the Subscriber does not exist, then PublihserMap will - * preempt a placeholder Publisher first. not call {@link Publisher#start()} + * preempt a placeholder Publisher first. * - * @param eventType Types of events that Subscriber cares about - * @param consumer subscriber - * @param event type + * @param consumer subscriber + * @param event type */ - public static void registerSubscriber(final Subscriber consumer) { + public static void registerSubscriber(final Subscriber consumer) throws NacosException { final Class cls = consumer.subscribeType(); // If you want to listen to multiple events, you do it separately, // without automatically registering the appropriate publisher @@ -185,10 +194,9 @@ public static void registerSubscriber(final Subscriber consumer) { } /** - * deregister subscriber + * Deregister subscriber. * - * @param consumer subscriber - * @param + * @param consumer subscriber. */ public static void deregisterSubscribe(final Subscriber consumer) { final Class cls = consumer.subscribeType(); @@ -197,13 +205,13 @@ public static void deregisterSubscribe(final Subscriber consumer) { return; } if (ClassUtils.isAssignableFrom(SlowEvent.class, cls)) { - INSTANCE.sharePublisher.unSubscriber(consumer); + INSTANCE.sharePublisher.removeSubscriber(consumer); return; } final String topic = ClassUtils.getCanonicalName(consumer.subscribeType()); if (INSTANCE.publisherMap.containsKey(topic)) { EventPublisher publisher = INSTANCE.publisherMap.get(topic); - publisher.unSubscriber(consumer); + publisher.removeSubscriber(consumer); return; } throw new NoSuchElementException("The subcriber has no event publisher"); @@ -213,7 +221,7 @@ public static void deregisterSubscribe(final Subscriber consumer) { * request publisher publish event Publishers load lazily, calling publisher. Start () only when the event is * actually published * - * @param event + * @param event class Instances of the event. */ public static boolean publishEvent(final Event event) { try { @@ -225,11 +233,10 @@ public static boolean publishEvent(final Event event) { } /** - * request publisher publish event Publishers load lazily, calling publisher. Start () only when the event is - * actually published + * Request publisher publish event Publishers load lazily, calling publisher. * - * @param eventType - * @param event + * @param eventType class Instances type of the event type. + * @param event event instance. */ private static boolean publishEvent(final Class eventType, final Event event) { final String topic = ClassUtils.getCanonicalName(eventType); @@ -245,25 +252,23 @@ private static boolean publishEvent(final Class eventType, fina } /** - * register to share-publisher + * Register to share-publisher. * - * @param supplier - * @param eventType - * @return + * @param eventType class Instances type of the event type. + * @return share publisher instance. */ public static EventPublisher registerToSharePublisher(final Class eventType) { return INSTANCE.sharePublisher; } /** - * register publisher + * Register publisher. * - * @param supplier - * @param eventType - * @param queueMaxSize - * @return + * @param eventType class Instances type of the event type. + * @param queueMaxSize the publisher's queue max size. */ - public static EventPublisher registerToPublisher(final Class eventType, final int queueMaxSize) { + public static EventPublisher registerToPublisher(final Class eventType, final int queueMaxSize) + throws NacosException { if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) { return INSTANCE.sharePublisher; } @@ -275,10 +280,9 @@ public static EventPublisher registerToPublisher(final Class ev } /** - * deregister publisher + * Deregister publisher. * - * @param eventType - * @return + * @param eventType class Instances type of the event type. */ public static void deregisterPublisher(final Class eventType) { final String topic = ClassUtils.getCanonicalName(eventType); diff --git a/common/src/main/java/com/alibaba/nacos/common/notify/SlowEvent.java b/common/src/main/java/com/alibaba/nacos/common/notify/SlowEvent.java index 6b556490678..fef83daa2a0 100644 --- a/common/src/main/java/com/alibaba/nacos/common/notify/SlowEvent.java +++ b/common/src/main/java/com/alibaba/nacos/common/notify/SlowEvent.java @@ -24,5 +24,9 @@ */ @SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule") public abstract class SlowEvent extends Event { - + + @Override + public long sequence() { + return 0; + } } \ No newline at end of file diff --git a/common/src/main/java/com/alibaba/nacos/common/utils/BiFunction.java b/common/src/main/java/com/alibaba/nacos/common/utils/BiFunction.java index 0ce44ccc0b7..1c33d513182 100644 --- a/common/src/main/java/com/alibaba/nacos/common/utils/BiFunction.java +++ b/common/src/main/java/com/alibaba/nacos/common/utils/BiFunction.java @@ -16,8 +16,11 @@ package com.alibaba.nacos.common.utils; +import com.alibaba.nacos.api.exception.NacosException; + /** * Represents a function that accepts two arguments and produces a result. + * The following utility functions are extracted from org.apache.commons.lang3. * *

This is a functional interface * whose functional method is {@link #apply(Object, Object)}. @@ -25,19 +28,15 @@ * @author zongtanghu * */ +@SuppressWarnings("PMD.AbstractMethodOrInterfaceMethodMustUseJavadocRule") public interface BiFunction { - - // The following utility functions are extracted from org.apache.commons.lang3 - // start - + /** - * Applies this function to the given arguments. + * Applies this function to the given arguments. This * * @param t the first function argument * @param u the second function argument * @return the function result */ - R apply(T t, U u); - - // end + R apply(T t, U u) throws NacosException; } diff --git a/common/src/main/java/com/alibaba/nacos/common/utils/ClassUtils.java b/common/src/main/java/com/alibaba/nacos/common/utils/ClassUtils.java index 01b76202342..b536a5f00f7 100644 --- a/common/src/main/java/com/alibaba/nacos/common/utils/ClassUtils.java +++ b/common/src/main/java/com/alibaba/nacos/common/utils/ClassUtils.java @@ -16,6 +16,11 @@ package com.alibaba.nacos.common.utils; +import com.alibaba.nacos.api.exception.runtime.NacosRuntimeException; + +import static com.alibaba.nacos.api.exception.NacosException.SERVER_ERROR; + + /** * Utils for Class. * @@ -24,44 +29,94 @@ @SuppressWarnings("all") public final class ClassUtils { + /** + * Finds and returns class by className. + * + * @param className String value for className. + * @return class Instances of the class represent classes and interfaces. + */ public static Class findClassByName(String className) { try { return Class.forName(className); } catch (Exception e) { - throw new RuntimeException("this class name not found"); + throw new NacosRuntimeException(SERVER_ERROR, "this class name not found"); } } + /** + * Gets and returns className. + * + * @param obj Object instance. + * @return className. + */ public static String getName(Object obj) { Objects.requireNonNull(obj, "obj"); return obj.getClass().getName(); } + /** + * Gets and returns the canonical name of the underlying class. + * + * @param obj Object instance. + * @return The canonical name of the underlying class. + */ public static String getCanonicalName(Object obj) { Objects.requireNonNull(obj, "obj"); return obj.getClass().getCanonicalName(); } + /** + * Determines if the class or interface represented by this object is either the same as, or is a superclass or + * superinterface of, the class or interface represented by the specified parameter. + * + * @param clazz + * @param cls + * @return the value indicating whether objects of the type can be assigned to objects of this class. + */ public static boolean isAssignableFrom(Class clazz, Class cls) { Objects.requireNonNull(cls, "cls"); return clazz.isAssignableFrom(cls); } + /** + * Gets and returns the simple name of the underlying class as given in the source code. + * + * @param obj Object instance. + * @return the simple name of the underlying class. + */ public static String getSimplaName(Object obj) { Objects.requireNonNull(obj, "obj"); return obj.getClass().getSimpleName(); } + /** + * Gets and returns the class name. + * + * @param cls Instances of the class represent classes and interfaces. + * @return the name of the class or interface represented by this object. + */ public static String getName(Class cls) { Objects.requireNonNull(cls, "cls"); return cls.getName(); } + /** + * Gets and returns the canonical name of the underlying class. + * + * @param cls Instances of the class represent classes and interfaces. + * @return The canonical name of the underlying class. + */ public static String getCanonicalName(Class cls) { Objects.requireNonNull(cls, "cls"); return cls.getCanonicalName(); } + /** + * Gets and returns the simple name of the underlying class. + * + * @param cls Instances of the class represent classes and interfaces. + * @return the simple name of the underlying class. + */ public static String getSimplaName(Class cls) { Objects.requireNonNull(cls, "cls"); return cls.getSimpleName(); diff --git a/common/src/main/java/com/alibaba/nacos/common/utils/MapUtils.java b/common/src/main/java/com/alibaba/nacos/common/utils/MapUtils.java index 12d6aa87ab2..eaa1d1ae687 100644 --- a/common/src/main/java/com/alibaba/nacos/common/utils/MapUtils.java +++ b/common/src/main/java/com/alibaba/nacos/common/utils/MapUtils.java @@ -16,6 +16,9 @@ package com.alibaba.nacos.common.utils; +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.common.NotThreadSafe; + import java.util.Collection; import java.util.Dictionary; import java.util.Map; @@ -134,8 +137,9 @@ public static void putIfValNoEmpty(Map target, Object key, Object value) { * @param param2 function's parameter value1. * @return */ + @NotThreadSafe public static Object computeIfAbsent(Map target, Object key, BiFunction mappingFunction, Object param1, - Object param2) { + Object param2) throws NacosException { Objects.requireNonNull(key, "key"); Objects.requireNonNull(key, "mappingFunction"); From 0517324b26bf1b1d7eba85f6842dedc14af9d80c Mon Sep 17 00:00:00 2001 From: zongtanghu Date: Sat, 20 Jun 2020 11:03:14 +0800 Subject: [PATCH 4/6] [#3118]fix typo and formate. --- .../nacos/common/notify/DefaultPublisher.java | 4 +- .../alibaba/nacos/common/notify/Event.java | 9 +-- .../nacos/common/notify/NotifyCenter.java | 25 +++---- .../common/notify/listener/Subscriber.java | 10 +-- .../nacos/common/utils/BiFunction.java | 4 +- .../nacos/common/utils/ClassUtils.java | 72 +++++++++---------- 6 files changed, 60 insertions(+), 64 deletions(-) diff --git a/common/src/main/java/com/alibaba/nacos/common/notify/DefaultPublisher.java b/common/src/main/java/com/alibaba/nacos/common/notify/DefaultPublisher.java index 4a9f3192fac..95f653d46b0 100644 --- a/common/src/main/java/com/alibaba/nacos/common/notify/DefaultPublisher.java +++ b/common/src/main/java/com/alibaba/nacos/common/notify/DefaultPublisher.java @@ -31,7 +31,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -import static com.alibaba.nacos.common.notify.NotifyCenter.RING_BUFFER_SIZE; +import static com.alibaba.nacos.common.notify.NotifyCenter.ringBufferSize; /** * The default event publisher implementation. @@ -82,7 +82,7 @@ public synchronized void start() { // start just called once super.start(); if (queueMaxSize == -1) { - queueMaxSize = RING_BUFFER_SIZE; + queueMaxSize = ringBufferSize ; } initialized = true; } diff --git a/common/src/main/java/com/alibaba/nacos/common/notify/Event.java b/common/src/main/java/com/alibaba/nacos/common/notify/Event.java index 21a4cdcbe36..a8562884213 100644 --- a/common/src/main/java/com/alibaba/nacos/common/notify/Event.java +++ b/common/src/main/java/com/alibaba/nacos/common/notify/Event.java @@ -25,12 +25,12 @@ * @author liaochuntao * @author zongtanghu */ -@SuppressWarnings({"PMD.AbstractClassShouldStartWithAbstractNamingRule", "PMD.ConstantFieldShouldBeUpperCaseRule"}) +@SuppressWarnings({"PMD.AbstractClassShouldStartWithAbstractNamingRule"}) public abstract class Event implements Serializable { - private static final AtomicLong sequence = new AtomicLong(0); + private static final AtomicLong SEQUENCE = new AtomicLong(0); - private final long no = sequence.getAndIncrement(); + private final long sequence = SEQUENCE.getAndIncrement(); /** * Event sequence number, which can be used to handle the sequence of events. @@ -38,7 +38,8 @@ public abstract class Event implements Serializable { * @return sequence num, It's best to make sure it's monotone. */ public long sequence() { - return no; + return sequence; } + } diff --git a/common/src/main/java/com/alibaba/nacos/common/notify/NotifyCenter.java b/common/src/main/java/com/alibaba/nacos/common/notify/NotifyCenter.java index 13243d70db5..137b65b9b31 100644 --- a/common/src/main/java/com/alibaba/nacos/common/notify/NotifyCenter.java +++ b/common/src/main/java/com/alibaba/nacos/common/notify/NotifyCenter.java @@ -43,18 +43,17 @@ * @author liaochuntao * @author zongtanghu */ -@SuppressWarnings("PMD.ConstantFieldShouldBeUpperCaseRule") public class NotifyCenter { private static final Logger LOGGER = LoggerFactory.getLogger(NotifyCenter.class); - public static int RING_BUFFER_SIZE = 16384; + public static int ringBufferSize = 16384; - public static int SHARE_BUFFER_SIZE = 1024; + public static int shareBufferSize = 1024; private static final AtomicBoolean CLOSED = new AtomicBoolean(false); - private static BiFunction, Integer, EventPublisher> BUILD_FACTORY = null; + private static BiFunction, Integer, EventPublisher> publisherFactory = null; private static final NotifyCenter INSTANCE = new NotifyCenter(); @@ -71,11 +70,11 @@ public class NotifyCenter { // Internal ArrayBlockingQueue buffer size. For applications with high write throughput, // this value needs to be increased appropriately. default value is 16384 String ringBufferSizeProperty = "nacos.core.notify.ring-buffer-size"; - RING_BUFFER_SIZE = Integer.getInteger(ringBufferSizeProperty, 16384); + ringBufferSize = Integer.getInteger(ringBufferSizeProperty, 16384); // The size of the public publisher's message staging queue buffer String shareBufferSizeProperty = "nacos.core.notify.share-buffer-size"; - SHARE_BUFFER_SIZE = Integer.getInteger(shareBufferSizeProperty, 1024); + shareBufferSize = Integer.getInteger(shareBufferSizeProperty, 1024); final ServiceLoader loader = ServiceLoader.load(EventPublisher.class); Iterator iterator = loader.iterator(); @@ -86,7 +85,7 @@ public class NotifyCenter { clazz = DefaultPublisher.class; } - BUILD_FACTORY = new BiFunction, Integer, EventPublisher>() { + publisherFactory = new BiFunction, Integer, EventPublisher>() { @Override public EventPublisher apply(Class cls, Integer buffer) throws NacosException { @@ -102,7 +101,7 @@ public EventPublisher apply(Class cls, Integer buffer) throws N }; try { - INSTANCE.sharePublisher = BUILD_FACTORY.apply(SlowEvent.class, SHARE_BUFFER_SIZE); + INSTANCE.sharePublisher = publisherFactory.apply(SlowEvent.class, shareBufferSize); } catch (Throwable ex) { LOGGER.error("Service class newInstance has error : {}", ex); } @@ -138,13 +137,11 @@ public static EventPublisher getSharePublisher() { return INSTANCE.sharePublisher; } - private static final AtomicBoolean closed = new AtomicBoolean(false); - /** * Shutdown the serveral publisher instance which notifycenter has. */ public static void shutdown() { - if (!closed.compareAndSet(false, true)) { + if (!CLOSED.compareAndSet(false, true)) { return; } LOGGER.warn("[NotifyCenter] Start destroying Publisher"); @@ -188,7 +185,7 @@ public static void registerSubscriber(final Subscriber consumer) throws Naco return; } final String topic = ClassUtils.getCanonicalName(consumer.subscribeType()); - MapUtils.computeIfAbsent(INSTANCE.publisherMap, topic, BUILD_FACTORY, cls, RING_BUFFER_SIZE); + MapUtils.computeIfAbsent(INSTANCE.publisherMap, topic, publisherFactory, cls, ringBufferSize); EventPublisher publisher = INSTANCE.publisherMap.get(topic); publisher.addSubscriber(consumer); } @@ -274,7 +271,7 @@ public static EventPublisher registerToPublisher(final Class ev } final String topic = ClassUtils.getCanonicalName(eventType); - MapUtils.computeIfAbsent(INSTANCE.publisherMap, topic, BUILD_FACTORY, eventType, queueMaxSize); + MapUtils.computeIfAbsent(INSTANCE.publisherMap, topic, publisherFactory, eventType, queueMaxSize); EventPublisher publisher = INSTANCE.publisherMap.get(topic); return publisher; } @@ -294,4 +291,4 @@ public static void deregisterPublisher(final Class eventType) { } } -} \ No newline at end of file +} diff --git a/common/src/main/java/com/alibaba/nacos/common/notify/listener/Subscriber.java b/common/src/main/java/com/alibaba/nacos/common/notify/listener/Subscriber.java index af36777a133..65a41d59eee 100644 --- a/common/src/main/java/com/alibaba/nacos/common/notify/listener/Subscriber.java +++ b/common/src/main/java/com/alibaba/nacos/common/notify/listener/Subscriber.java @@ -26,25 +26,25 @@ * @author liaochuntao * @author zongtanghu */ -@SuppressWarnings("all") +@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule") public abstract class Subscriber { /** - * Event callback + * Event callback. * * @param event {@link Event} */ public abstract void onEvent(T event); /** - * Type of this subscriber's subscription + * Type of this subscriber's subscription. * * @return Class which extends {@link Event} */ public abstract Class subscribeType(); /** - * It is up to the listener to determine whether the callback is asynchronous or synchronous + * It is up to the listener to determine whether the callback is asynchronous or synchronous. * * @return {@link Executor} */ @@ -53,7 +53,7 @@ public Executor executor() { } /** - * Whether to ignore expired events + * Whether to ignore expired events. * * @return default value is {@link Boolean#FALSE} */ diff --git a/common/src/main/java/com/alibaba/nacos/common/utils/BiFunction.java b/common/src/main/java/com/alibaba/nacos/common/utils/BiFunction.java index 1c33d513182..20accf0851b 100644 --- a/common/src/main/java/com/alibaba/nacos/common/utils/BiFunction.java +++ b/common/src/main/java/com/alibaba/nacos/common/utils/BiFunction.java @@ -28,15 +28,15 @@ * @author zongtanghu * */ -@SuppressWarnings("PMD.AbstractMethodOrInterfaceMethodMustUseJavadocRule") public interface BiFunction { /** - * Applies this function to the given arguments. This + * Applies this function to the two given arguments. * * @param t the first function argument * @param u the second function argument * @return the function result + * @throws NacosException function throws NacosException */ R apply(T t, U u) throws NacosException; } diff --git a/common/src/main/java/com/alibaba/nacos/common/utils/ClassUtils.java b/common/src/main/java/com/alibaba/nacos/common/utils/ClassUtils.java index b536a5f00f7..5f4250971ee 100644 --- a/common/src/main/java/com/alibaba/nacos/common/utils/ClassUtils.java +++ b/common/src/main/java/com/alibaba/nacos/common/utils/ClassUtils.java @@ -20,13 +20,11 @@ import static com.alibaba.nacos.api.exception.NacosException.SERVER_ERROR; - /** * Utils for Class. * * @author liaochuntao */ -@SuppressWarnings("all") public final class ClassUtils { /** @@ -43,34 +41,12 @@ public static Class findClassByName(String className) { } } - /** - * Gets and returns className. - * - * @param obj Object instance. - * @return className. - */ - public static String getName(Object obj) { - Objects.requireNonNull(obj, "obj"); - return obj.getClass().getName(); - } - - /** - * Gets and returns the canonical name of the underlying class. - * - * @param obj Object instance. - * @return The canonical name of the underlying class. - */ - public static String getCanonicalName(Object obj) { - Objects.requireNonNull(obj, "obj"); - return obj.getClass().getCanonicalName(); - } - /** * Determines if the class or interface represented by this object is either the same as, or is a superclass or * superinterface of, the class or interface represented by the specified parameter. * - * @param clazz - * @param cls + * @param clazz Instances of the class represent classes and interfaces. + * @param cls Instances of the class represent classes and interfaces. * @return the value indicating whether objects of the type can be assigned to objects of this class. */ public static boolean isAssignableFrom(Class clazz, Class cls) { @@ -78,17 +54,6 @@ public static boolean isAssignableFrom(Class clazz, Class cls) { return clazz.isAssignableFrom(cls); } - /** - * Gets and returns the simple name of the underlying class as given in the source code. - * - * @param obj Object instance. - * @return the simple name of the underlying class. - */ - public static String getSimplaName(Object obj) { - Objects.requireNonNull(obj, "obj"); - return obj.getClass().getSimpleName(); - } - /** * Gets and returns the class name. * @@ -100,6 +65,17 @@ public static String getName(Class cls) { return cls.getName(); } + /** + * Gets and returns className. + * + * @param obj Object instance. + * @return className. + */ + public static String getName(Object obj) { + Objects.requireNonNull(obj, "obj"); + return obj.getClass().getName(); + } + /** * Gets and returns the canonical name of the underlying class. * @@ -111,6 +87,17 @@ public static String getCanonicalName(Class cls) { return cls.getCanonicalName(); } + /** + * Gets and returns the canonical name of the underlying class. + * + * @param obj Object instance. + * @return The canonical name of the underlying class. + */ + public static String getCanonicalName(Object obj) { + Objects.requireNonNull(obj, "obj"); + return obj.getClass().getCanonicalName(); + } + /** * Gets and returns the simple name of the underlying class. * @@ -122,4 +109,15 @@ public static String getSimplaName(Class cls) { return cls.getSimpleName(); } + /** + * Gets and returns the simple name of the underlying class as given in the source code. + * + * @param obj Object instance. + * @return the simple name of the underlying class. + */ + public static String getSimplaName(Object obj) { + Objects.requireNonNull(obj, "obj"); + return obj.getClass().getSimpleName(); + } + } From 85625cc8213094b8e46bc96455be6727ed72d583 Mon Sep 17 00:00:00 2001 From: zongtanghu Date: Sun, 21 Jun 2020 13:41:21 +0800 Subject: [PATCH 5/6] [#3118]Unify Subsciber and SmartSubscriber, and fix some typo and reformat. --- .../nacos/common/notify/DefaultPublisher.java | 17 +----- .../nacos/common/notify/NotifyCenter.java | 57 +++++++++++++++---- .../notify/listener/SmartSubscriber.java | 9 +-- 3 files changed, 54 insertions(+), 29 deletions(-) diff --git a/common/src/main/java/com/alibaba/nacos/common/notify/DefaultPublisher.java b/common/src/main/java/com/alibaba/nacos/common/notify/DefaultPublisher.java index 95f653d46b0..48d08973c09 100644 --- a/common/src/main/java/com/alibaba/nacos/common/notify/DefaultPublisher.java +++ b/common/src/main/java/com/alibaba/nacos/common/notify/DefaultPublisher.java @@ -17,7 +17,6 @@ package com.alibaba.nacos.common.notify; import com.alibaba.nacos.common.notify.listener.Subscriber; -import com.alibaba.nacos.common.notify.listener.SmartSubscriber; import com.alibaba.nacos.common.utils.ConcurrentHashSet; import com.alibaba.nacos.common.utils.ThreadUtils; import com.alibaba.nacos.common.utils.CollectionUtils; @@ -82,7 +81,7 @@ public synchronized void start() { // start just called once super.start(); if (queueMaxSize == -1) { - queueMaxSize = ringBufferSize ; + queueMaxSize = ringBufferSize; } initialized = true; } @@ -122,7 +121,7 @@ void openEventHandler() { } private boolean hasSubscriber() { - return CollectionUtils.isNotEmpty(subscribers) || CollectionUtils.isNotEmpty(SMART_SUBSCRIBERS); + return CollectionUtils.isNotEmpty(subscribers); } @Override @@ -177,21 +176,9 @@ void receiveEvent(Event event) { } final String targetName = ClassUtils.getName(subscriber.subscribeType()); - if (!Objects.equals(sourceName, targetName)) { continue; } - - notifySubscriber(subscriber, event); - } - - // Notification multi-event event listener - for (SmartSubscriber subscriber : SMART_SUBSCRIBERS) { - // If you are a multi-event listener, you need to make additional logical judgments - if (!subscriber.canNotify(event)) { - LOGGER.debug("[NotifyCenter] the {} is unacceptable to this multi-event subscriber", event.getClass()); - continue; - } notifySubscriber(subscriber, event); } } diff --git a/common/src/main/java/com/alibaba/nacos/common/notify/NotifyCenter.java b/common/src/main/java/com/alibaba/nacos/common/notify/NotifyCenter.java index 137b65b9b31..359d5283c13 100644 --- a/common/src/main/java/com/alibaba/nacos/common/notify/NotifyCenter.java +++ b/common/src/main/java/com/alibaba/nacos/common/notify/NotifyCenter.java @@ -174,9 +174,12 @@ public static void shutdown() { public static void registerSubscriber(final Subscriber consumer) throws NacosException { final Class cls = consumer.subscribeType(); // If you want to listen to multiple events, you do it separately, - // without automatically registering the appropriate publisher + // based on subclass's subscribeTypes method return list, it can register to publisher. if (consumer instanceof SmartSubscriber) { EventPublisher.SMART_SUBSCRIBERS.add((SmartSubscriber) consumer); + for (Class subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) { + addSubscriber(consumer, subscribeType); + } return; } @@ -184,8 +187,22 @@ public static void registerSubscriber(final Subscriber consumer) throws Naco INSTANCE.sharePublisher.addSubscriber(consumer); return; } - final String topic = ClassUtils.getCanonicalName(consumer.subscribeType()); - MapUtils.computeIfAbsent(INSTANCE.publisherMap, topic, publisherFactory, cls, ringBufferSize); + + addSubscriber(consumer, consumer.subscribeType()); + } + + /** + * Add a subscriber to pusblisher. + * + * @param consumer subscriber instance. + * @param subscribeType subscribeType. + * @throws NacosException BiFunction mappingFunction may throw a NacosException. + */ + private static void addSubscriber(final Subscriber consumer, Class subscribeType) + throws NacosException { + + final String topic = ClassUtils.getCanonicalName(subscribeType); + MapUtils.computeIfAbsent(INSTANCE.publisherMap, topic, publisherFactory, subscribeType, ringBufferSize); EventPublisher publisher = INSTANCE.publisherMap.get(topic); publisher.addSubscriber(consumer); } @@ -193,30 +210,50 @@ public static void registerSubscriber(final Subscriber consumer) throws Naco /** * Deregister subscriber. * - * @param consumer subscriber. + * @param consumer subscriber instance. */ - public static void deregisterSubscribe(final Subscriber consumer) { + public static void deregisterSubscriber(final Subscriber consumer) { final Class cls = consumer.subscribeType(); if (consumer instanceof SmartSubscriber) { EventPublisher.SMART_SUBSCRIBERS.remove((SmartSubscriber) consumer); + for (Class subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) { + removeSubscriber(consumer, subscribeType); + } return; } if (ClassUtils.isAssignableFrom(SlowEvent.class, cls)) { INSTANCE.sharePublisher.removeSubscriber(consumer); return; } - final String topic = ClassUtils.getCanonicalName(consumer.subscribeType()); + + if (removeSubscriber(consumer, consumer.subscribeType())) { + return; + } + throw new NoSuchElementException("The subcriber has no event publisher"); + } + + /** + * Remove subscriber. + * + * @param consumer subscriber instance. + * @param subscribeType subscribeType. + * @return whether remove subscriber successfully or not. + */ + private static boolean removeSubscriber(final Subscriber consumer, Class subscribeType) { + + final String topic = ClassUtils.getCanonicalName(subscribeType); if (INSTANCE.publisherMap.containsKey(topic)) { EventPublisher publisher = INSTANCE.publisherMap.get(topic); publisher.removeSubscriber(consumer); - return; + return true; } - throw new NoSuchElementException("The subcriber has no event publisher"); + + return false; } /** - * request publisher publish event Publishers load lazily, calling publisher. Start () only when the event is - * actually published + * Request publisher publish event Publishers load lazily, calling publisher. Start () only when the event is + * actually published. * * @param event class Instances of the event. */ diff --git a/common/src/main/java/com/alibaba/nacos/common/notify/listener/SmartSubscriber.java b/common/src/main/java/com/alibaba/nacos/common/notify/listener/SmartSubscriber.java index da2eed4f4f5..8e1da9f8b74 100644 --- a/common/src/main/java/com/alibaba/nacos/common/notify/listener/SmartSubscriber.java +++ b/common/src/main/java/com/alibaba/nacos/common/notify/listener/SmartSubscriber.java @@ -18,6 +18,8 @@ import com.alibaba.nacos.common.notify.Event; +import java.util.List; + /** * Subscribers to multiple events can be listened to. * @@ -28,12 +30,11 @@ public abstract class SmartSubscriber extends Subscriber { /** - * Determines if the processing message is acceptable. + * Returns which event type are smartsubscriber interested in. * - * @param event {@link Event} - * @return Determines if the processing message is acceptable + * @return The interestd event types. */ - public abstract boolean canNotify(Event event); + public abstract List> subscribeTypes(); @Override public final Class subscribeType() { From 051dc704099d937a89b7d1714e6d1ea91fafe94d Mon Sep 17 00:00:00 2001 From: zongtanghu Date: Sun, 21 Jun 2020 21:41:15 +0800 Subject: [PATCH 6/6] [#3118]fix some typo and reformat. --- .../com/alibaba/nacos/common/notify/EventPublisher.java | 9 --------- .../com/alibaba/nacos/common/notify/NotifyCenter.java | 8 -------- 2 files changed, 17 deletions(-) diff --git a/common/src/main/java/com/alibaba/nacos/common/notify/EventPublisher.java b/common/src/main/java/com/alibaba/nacos/common/notify/EventPublisher.java index a8b254baa51..21f44465d86 100644 --- a/common/src/main/java/com/alibaba/nacos/common/notify/EventPublisher.java +++ b/common/src/main/java/com/alibaba/nacos/common/notify/EventPublisher.java @@ -18,10 +18,6 @@ import com.alibaba.nacos.common.lifecycle.Closeable; import com.alibaba.nacos.common.notify.listener.Subscriber; -import com.alibaba.nacos.common.notify.listener.SmartSubscriber; -import com.alibaba.nacos.common.utils.ConcurrentHashSet; - -import java.util.Set; /** * Event publisher. @@ -31,11 +27,6 @@ */ public interface EventPublisher extends Closeable { - /** - * Multi-event listener collection list. - */ - Set SMART_SUBSCRIBERS = new ConcurrentHashSet(); - /** * Initializes the event publisher. * diff --git a/common/src/main/java/com/alibaba/nacos/common/notify/NotifyCenter.java b/common/src/main/java/com/alibaba/nacos/common/notify/NotifyCenter.java index 359d5283c13..5b23b877ab5 100644 --- a/common/src/main/java/com/alibaba/nacos/common/notify/NotifyCenter.java +++ b/common/src/main/java/com/alibaba/nacos/common/notify/NotifyCenter.java @@ -29,7 +29,6 @@ import java.util.Map; import java.util.ServiceLoader; -import java.util.Set; import java.util.NoSuchElementException; import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; @@ -127,11 +126,6 @@ public static EventPublisher getPublisher(Class topic) { return INSTANCE.publisherMap.get(topic.getCanonicalName()); } - @JustForTest - public static Set getSmartSubscribers() { - return EventPublisher.SMART_SUBSCRIBERS; - } - @JustForTest public static EventPublisher getSharePublisher() { return INSTANCE.sharePublisher; @@ -176,7 +170,6 @@ public static void registerSubscriber(final Subscriber consumer) throws Naco // If you want to listen to multiple events, you do it separately, // based on subclass's subscribeTypes method return list, it can register to publisher. if (consumer instanceof SmartSubscriber) { - EventPublisher.SMART_SUBSCRIBERS.add((SmartSubscriber) consumer); for (Class subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) { addSubscriber(consumer, subscribeType); } @@ -215,7 +208,6 @@ private static void addSubscriber(final Subscriber consumer, Class void deregisterSubscriber(final Subscriber consumer) { final Class cls = consumer.subscribeType(); if (consumer instanceof SmartSubscriber) { - EventPublisher.SMART_SUBSCRIBERS.remove((SmartSubscriber) consumer); for (Class subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) { removeSubscriber(consumer, subscribeType); }