Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#3117]Just Sink the Notify implementation into common module and optimize some parts #3118

Merged
merged 6 commits into from
Jun 21, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.nacos.common.notify;

import com.alibaba.nacos.common.notify.listener.Subscriber;
import com.alibaba.nacos.common.notify.listener.SmartSubscriber;
import com.alibaba.nacos.common.utils.ConcurrentHashSet;
import com.alibaba.nacos.common.utils.ThreadUtils;
import com.alibaba.nacos.common.utils.CollectionUtils;
import com.alibaba.nacos.common.utils.ClassUtils;
import com.alibaba.nacos.common.utils.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

import static com.alibaba.nacos.common.notify.NotifyCenter.RING_BUFFER_SIZE;

/**
* The default event publisher implementation
* <p>
* Internally, use {@link ArrayBlockingQueue <Event>} as a message staging queue
*
* @author <a href="mailto:[email protected]">liaochuntao</a>
* @author zongtanghu
*/
public class DefaultPublisher extends Thread implements EventPublisher {

private static final Logger LOGGER = LoggerFactory.getLogger(NotifyCenter.class);

private volatile boolean initialized = false;

private volatile boolean shutdown = false;

private Class<? extends Event> eventType;

private final ConcurrentHashSet<Subscriber> subscribers = new ConcurrentHashSet<Subscriber>();

private int queueMaxSize = -1;

private BlockingQueue<Event> queue;

private volatile Long lastEventSequence = -1L;

private final AtomicReferenceFieldUpdater<DefaultPublisher, Long> updater = AtomicReferenceFieldUpdater
.newUpdater(DefaultPublisher.class, Long.class, "lastEventSequence");

@Override
public void init(Class<? extends Event> type, int bufferSize) {
setDaemon(true);
setName("nacos.publisher-" + type.getName());
this.eventType = type;
this.queueMaxSize = bufferSize;
this.queue = new ArrayBlockingQueue<Event>(bufferSize);
start();
}

public ConcurrentHashSet<Subscriber> getSubscribers() {
return subscribers;
}

@Override
public synchronized void start() {
if (!initialized) {
// start just called once
super.start();
if (queueMaxSize == -1) {
queueMaxSize = RING_BUFFER_SIZE;
}
initialized = true;
}
}

public long currentEventSize() {
return queue.size();
}

@Override
public void run() {
openEventHandler();
}

void openEventHandler() {
try {
// To ensure that messages are not lost, enable EventHandler when
// waiting for the first Subscriber to register
for (; ; ) {
if (shutdown || hasSubscriber()) {
break;
}
ThreadUtils.sleep(1000L);
}

for (; ; ) {
if (shutdown) {
break;
}
final Event event = queue.take();
receiveEvent(event);
updater.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence()));
}
} catch (Throwable ex) {
LOGGER.error("Event listener exception : {}", ex);
}
}

private boolean hasSubscriber() {
return CollectionUtils.isNotEmpty(subscribers) || CollectionUtils.isNotEmpty(SMART_SUBSCRIBERS);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not being empty in the smartSubscribers collection does not imply that there are listeners interested in the event

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, this issue has already been resolved in this pr.

}

@Override
public void addSubscriber(Subscriber subscriber) {
subscribers.add(subscriber);
}

@Override
public void unSubscriber(Subscriber subscriber) {
zongtanghu marked this conversation as resolved.
Show resolved Hide resolved
subscribers.remove(subscriber);
}

@Override
public boolean publish(Event event) {
checkIsStart();
boolean success = this.queue.offer(event);
if (!success) {
LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event);
receiveEvent(event);
return true;
}
return true;
}

void checkIsStart() {
if (!initialized) {
throw new IllegalStateException("Publisher does not start");
}
}

@Override
public void shutdown() {
this.shutdown = true;
this.queue.clear();
}

public boolean isInitialized() {
return initialized;
}

void receiveEvent(Event event) {
final long currentEventSequence = event.sequence();
final String sourceName = ClassUtils.getName(event);

// Notification single event listener
for (Subscriber subscriber : subscribers) {
// Whether to ignore expiration events
if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire",
event.getClass());
continue;
}

final String targetName = ClassUtils.getName(subscriber.subscribeType());

if (!Objects.equals(sourceName, targetName)) {
continue;
}

notifySubscriber(subscriber, event);
}

// Notification multi-event event listener
for (SmartSubscriber subscriber : SMART_SUBSCRIBERS) {
// If you are a multi-event listener, you need to make additional logical judgments
if (!subscriber.canNotify(event)) {
LOGGER.debug("[NotifyCenter] the {} is unacceptable to this multi-event subscriber", event.getClass());
continue;
}
notifySubscriber(subscriber, event);
}
}

@Override
public void notifySubscriber(final Subscriber subscriber, final Event event) {

LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber);

final Runnable job = new Runnable() {
@Override
public void run() {
subscriber.onEvent(event);
}
};

final Executor executor = subscriber.executor();

if (executor != null) {
executor.execute(job);
} else {
try {
job.run();
} catch (Throwable e) {
LOGGER.error("Event callback exception : {}", e);
}
}
}
}
44 changes: 44 additions & 0 deletions common/src/main/java/com/alibaba/nacos/common/notify/Event.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.nacos.common.notify;

import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong;

/**
* An abstract class for event.
*
* @author <a href="mailto:[email protected]">liaochuntao</a>
* @author zongtanghu
*/
@SuppressWarnings("all")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If no special situation, please do not use SuppressWarnings

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay I will fix this in next commit.

public abstract class Event implements Serializable {

private static final AtomicLong SEQUENCE = new AtomicLong(0);

private long no = SEQUENCE.getAndIncrement();
zongtanghu marked this conversation as resolved.
Show resolved Hide resolved

/**
* Event sequence number, which can be used to handle the sequence of events
*
* @return sequence num, It's best to make sure it's monotone
*/
public long sequence() {
return no;
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.nacos.common.notify;

import com.alibaba.nacos.common.lifecycle.Closeable;
import com.alibaba.nacos.common.notify.listener.Subscriber;
import com.alibaba.nacos.common.notify.listener.SmartSubscriber;
import com.alibaba.nacos.common.utils.ConcurrentHashSet;

import java.util.Set;

/**
* Event publisher
*
* @author <a href="mailto:[email protected]">liaochuntao</a>
* @author zongtanghu
*/
public interface EventPublisher extends Closeable {

/**
* Multi-event listener collection list
*/
Set<SmartSubscriber> SMART_SUBSCRIBERS = new ConcurrentHashSet<SmartSubscriber>();

/**
* Initializes the event publisher
*
* @param type {@link Class<? extends Event >}
* @param bufferSize Message staging queue size
*/
void init(Class<? extends Event> type, int bufferSize);

/**
* The number of currently staged events
*
* @return event size
*/
long currentEventSize();

/**
* Add listener
*
* @param subscribe {@link Subscriber}
*/
void addSubscriber(Subscriber subscribe);

/**
* Remove listener
*
* @param subscriber {@link Subscriber}
*/
void unSubscriber(Subscriber subscriber);

/**
* publish event
*
* @param event {@link Event}
* @return publish event is success
*/
boolean publish(Event event);

/**
* Notify listener
*
* @param subscriber {@link Subscriber}
* @param event {@link Event}
*/
void notifySubscriber(Subscriber subscriber, Event event);
}
Loading