Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add upd push for ip port client #3849

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@ public InstanceService(IpPortBasedClientManager ipPortBasedClientManager,
*/
public void registerInstance(String namespaceId, String serviceName, Instance instance) {
String clientId = instance.toInetAddr();
if (!ipPortBasedClientManager.allClientId().contains(clientId)) {
ipPortBasedClientManager.clientConnected(new IpPortBasedClient(clientId, instance.isEphemeral()));
}
createIpPortClientIfAbsent(clientId, instance.isEphemeral());
String groupName = NamingUtils.getGroupName(serviceName);
String serviceNameNoGrouped = NamingUtils.getServiceName(serviceName);
Service service = Service.newService(namespaceId, groupName, serviceNameNoGrouped, instance.isEphemeral());
Expand Down Expand Up @@ -104,9 +102,16 @@ public ServiceInfo listInstance(String namespaceId, String serviceName, Subscrib
String serviceNameNoGrouped = NamingUtils.getServiceName(serviceName);
Service service = Service.newService(namespaceId, groupName, serviceNameNoGrouped, true);
if (null != subscriber) {
createIpPortClientIfAbsent(subscriber.getAddrStr(), true);
clientOperationService.subscribeService(service, subscriber, subscriber.getAddrStr());
}
ServiceInfo serviceInfo = serviceStorage.getData(service);
return ServiceUtil.filterInstances(serviceInfo, cluster, healthOnly);
}

private void createIpPortClientIfAbsent(String clientId, boolean ephemeral) {
if (!ipPortBasedClientManager.allClientId().contains(clientId)) {
ipPortBasedClientManager.clientConnected(new IpPortBasedClient(clientId, ephemeral));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.pojo.Subscribers;
import com.alibaba.nacos.naming.push.PushService;
import com.alibaba.nacos.naming.push.RemotePushService;
import com.alibaba.nacos.naming.push.ClientPushService;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
Expand Down Expand Up @@ -59,15 +59,15 @@ public class SubscribeManager {
private PushService pushService;

@Autowired
private RemotePushService remotePushService;
private ClientPushService clientPushService;

@Autowired
private ServerMemberManager memberManager;

private List<Subscriber> getSubscribersFuzzy(String serviceName, String namespaceId) {
List<Subscriber> result = new LinkedList<>();
result.addAll(pushService.getClientsFuzzy(serviceName, namespaceId));
result.addAll(remotePushService.getSubscribes(namespaceId, serviceName));
result.addAll(clientPushService.getSubscribes(namespaceId, serviceName));
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class ConnectionBasedClientManager extends ClientConnectionEventListener
private final ConcurrentMap<String, ConnectionBasedClient> clients = new ConcurrentHashMap<>();

public ConnectionBasedClientManager() {
GlobalExecutor.scheduleRemoteConnectionManager(new ConnectionBasedClientManager.ExpiredClientCleaner(this), 0,
GlobalExecutor.scheduleExpiredClientCleaner(new ExpiredClientCleaner(this), 0,
Constants.DEFAULT_HEART_BEAT_INTERVAL, TimeUnit.MILLISECONDS);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,21 @@

package com.alibaba.nacos.naming.core.v2.client.manager.impl;

import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.naming.core.DistroMapper;
import com.alibaba.nacos.naming.core.v2.client.Client;
import com.alibaba.nacos.naming.core.v2.client.impl.IpPortBasedClient;
import com.alibaba.nacos.naming.core.v2.client.manager.ClientManager;
import com.alibaba.nacos.naming.core.v2.event.client.ClientEvent;
import com.alibaba.nacos.naming.misc.GlobalExecutor;
import com.alibaba.nacos.naming.misc.Loggers;
import org.springframework.stereotype.Component;

import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;

/**
* The manager of {@code IpPortBasedClient} and ephemeral.
Expand All @@ -43,6 +46,8 @@ public class IpPortBasedClientManager implements ClientManager {

public IpPortBasedClientManager(DistroMapper distroMapper) {
this.distroMapper = distroMapper;
GlobalExecutor.scheduleExpiredClientCleaner(new ExpiredClientCleaner(this), 0,
Constants.DEFAULT_HEART_BEAT_INTERVAL, TimeUnit.MILLISECONDS);
}

@Override
Expand Down Expand Up @@ -85,4 +90,29 @@ public void verifyClient(String clientId) {
IpPortBasedClient client = clients.get(clientId);
// TODO check whether client is newest by updated time
}

private static class ExpiredClientCleaner implements Runnable {

private final IpPortBasedClientManager clientManager;

public ExpiredClientCleaner(IpPortBasedClientManager clientManager) {
this.clientManager = clientManager;
}

@Override
public void run() {
long currentTime = System.currentTimeMillis();
for (String each : clientManager.allClientId()) {
IpPortBasedClient client = (IpPortBasedClient) clientManager.getClient(each);
if (null != client && isExpireClient(currentTime, client)) {
clientManager.clientDisconnected(each);
}
}
}

private boolean isExpireClient(long currentTime, IpPortBasedClient client) {
return client.isEphemeral() && client.getAllPublishedService().isEmpty()
&& currentTime - client.getLastUpdatedTime() > Constants.DEFAULT_IP_DELETE_TIMEOUT;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public class GlobalExecutor {
.newSingleScheduledExecutorService(ClassUtils.getCanonicalName(NamingApp.class),
new NameThreadFactory("com.alibaba.nacos.naming.nacos-server-performance"));

private static final ScheduledExecutorService REMOTE_CONNECTION_EXECUTOR = ExecutorFactory.Managed
private static final ScheduledExecutorService EXPIRED_CLIENT_CLEANER_EXECUTOR = ExecutorFactory.Managed
.newSingleScheduledExecutorService(ClassUtils.getCanonicalName(NamingApp.class),
new NameThreadFactory("com.alibaba.nacos.naming.remote-connection-manager"));

Expand Down Expand Up @@ -213,8 +213,7 @@ public static void schedulePerformanceLogger(Runnable runnable, long initialDela
SERVER_PERFORMANCE_EXECUTOR.scheduleWithFixedDelay(runnable, initialDelay, delay, unit);
}

public static void scheduleRemoteConnectionManager(Runnable runnable, long initialDelay, long delay,
TimeUnit unit) {
REMOTE_CONNECTION_EXECUTOR.scheduleWithFixedDelay(runnable, initialDelay, delay, unit);
public static void scheduleExpiredClientCleaner(Runnable runnable, long initialDelay, long delay, TimeUnit unit) {
EXPIRED_CLIENT_CLEANER_EXECUTOR.scheduleWithFixedDelay(runnable, initialDelay, delay, unit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,73 +17,51 @@
package com.alibaba.nacos.naming.push;

import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.remote.request.NotifySubscriberRequest;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.common.notify.Event;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.notify.listener.SmartSubscriber;
import com.alibaba.nacos.core.remote.RpcPushService;
import com.alibaba.nacos.naming.core.v2.client.manager.impl.ConnectionBasedClientManager;
import com.alibaba.nacos.naming.core.v2.client.manager.ClientManager;
import com.alibaba.nacos.naming.core.v2.client.manager.ClientManagerDelegate;
import com.alibaba.nacos.naming.core.v2.event.service.ServiceEvent;
import com.alibaba.nacos.naming.core.v2.index.ClientServiceIndexesManager;
import com.alibaba.nacos.naming.core.v2.index.ServiceStorage;
import com.alibaba.nacos.naming.core.v2.pojo.Service;
import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.push.v2.PushExecuteService;
import com.alibaba.nacos.naming.push.v2.PushExecuteServiceDelegate;
import org.springframework.stereotype.Component;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
* Remote push services.
*
* @author xiweng.yy
*/
@Component
public class RemotePushService extends SmartSubscriber {
public class ClientPushService extends SmartSubscriber {

private final RpcPushService notifier;

private final ConnectionBasedClientManager clientManager;
private final ClientManager clientManager;

private final ClientServiceIndexesManager indexesManager;

private final ServiceStorage serviceStorage;

/**
* ServiceKey --> actual Subscriber. The Subscriber may be only subscribe part of cluster of service.
*/
private final ConcurrentMap<String, Set<Subscriber>> serviceSubscribesMap = new ConcurrentHashMap<>();

private final ConcurrentMap<Subscriber, String> subscribeConnectionMap = new ConcurrentHashMap<>();
private final PushExecuteService pushExecuteService;

public RemotePushService(RpcPushService notifier, ConnectionBasedClientManager clientManager,
ClientServiceIndexesManager indexesManager, ServiceStorage serviceStorage) {
this.notifier = notifier;
public ClientPushService(ClientManagerDelegate clientManager, ClientServiceIndexesManager indexesManager,
ServiceStorage serviceStorage, PushExecuteServiceDelegate pushExecuteService) {
this.clientManager = clientManager;
this.indexesManager = indexesManager;
this.serviceStorage = serviceStorage;
this.pushExecuteService = pushExecuteService;
NotifyCenter.registerSubscriber(this);
}

/**
* Remove All subscribe for service.
*
* @param serviceKey service key
*/
public void removeAllSubscribeForService(String serviceKey) {
Set<Subscriber> subscribers = serviceSubscribesMap.remove(serviceKey);
if (null != subscribers) {
for (Subscriber each : subscribers) {
subscribeConnectionMap.remove(each);
}
}
}

public Set<Subscriber> getSubscribes(String namespaceId, String serviceName) {
String serviceNameWithoutGroup = NamingUtils.getServiceName(serviceName);
String groupName = NamingUtils.getGroupName(serviceName);
Expand All @@ -93,7 +71,7 @@ public Set<Subscriber> getSubscribes(String namespaceId, String serviceName) {

public Set<Subscriber> getSubscribes(Service service) {
Set<Subscriber> result = new HashSet<>();
for (String each : indexesManager.getAllClientsRegisteredService(service)) {
for (String each : indexesManager.getAllClientsSubscribeService(service)) {
result.add(clientManager.getClient(each).getSubscriber(service));
}
return result;
Expand All @@ -111,7 +89,8 @@ public void onEvent(Event event) {
Service service = serviceChangedEvent.getService();
ServiceInfo serviceInfo = serviceStorage.getPushData(service);
for (String each : indexesManager.getAllClientsSubscribeService(service)) {
notifier.pushWithoutAck(each, NotifySubscriberRequest.buildSuccessResponse(serviceInfo));
Subscriber subscriber = clientManager.getClient(each).getSubscriber(service);
pushExecuteService.doPush(each, subscriber, serviceInfo);
}
}
}
90 changes: 58 additions & 32 deletions naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.alibaba.nacos.naming.push;

import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.naming.core.Service;
Expand Down Expand Up @@ -185,6 +186,37 @@ public void onApplicationEvent(ServiceChangeEvent event) {

}

/**
* Push Data.
*
* @param subscriber subscriber
* @param serviceInfo service info
*/
public void pushData(Subscriber subscriber, ServiceInfo serviceInfo) {
String serviceName = subscriber.getServiceName();
String namespaceId = subscriber.getNamespaceId();
if (futureMap.containsKey(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName))) {
return;
}
int port = Integer.parseInt(subscriber.getAddrStr().split(":")[1]);
InetSocketAddress socketAddress = new InetSocketAddress(subscriber.getIp(), port);
Future future = GlobalExecutor.scheduleUdpSender(() -> {
try {
Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");
long lastRefTime = System.nanoTime();
Receiver.AckEntry ackEntry = prepareAckEntry(socketAddress, prepareHostsData(JacksonUtils.toJson(serviceInfo)), lastRefTime);
Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}", serviceInfo,
subscriber.getAddrStr(), subscriber.getAgent(), (ackEntry == null ? null : ackEntry.key));
udpPush(ackEntry);
} catch (Exception e) {
Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);
} finally {
futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
}
}, 1000L, TimeUnit.MILLISECONDS);
futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);
}

public int getTotalPush() {
return totalPush;
}
Expand Down Expand Up @@ -311,56 +343,47 @@ private static void removeClientIfZombie() {
}

private static Receiver.AckEntry prepareAckEntry(PushClient client, Map<String, Object> data, long lastRefTime) {
return prepareAckEntry(client.socketAddr, data, lastRefTime);
}

private static Receiver.AckEntry prepareAckEntry(InetSocketAddress socketAddress, Map<String, Object> data,
long lastRefTime) {
if (MapUtils.isEmpty(data)) {
Loggers.PUSH.error("[NACOS-PUSH] pushing empty data for client is not allowed: {}", client);
Loggers.PUSH.error("[NACOS-PUSH] pushing empty data for client is not allowed: {}", socketAddress);
return null;
}

data.put("lastRefTime", lastRefTime);

// we apply lastRefTime as sequence num for further ack
String key = getAckKey(client.getSocketAddr().getAddress().getHostAddress(), client.getSocketAddr().getPort(),
lastRefTime);

String dataStr = JacksonUtils.toJson(data);

try {
byte[] dataBytes = dataStr.getBytes(StandardCharsets.UTF_8);
dataBytes = compressIfNecessary(dataBytes);

DatagramPacket packet = new DatagramPacket(dataBytes, dataBytes.length, client.socketAddr);

// we must store the key be fore send, otherwise there will be a chance the
// ack returns before we put in
Receiver.AckEntry ackEntry = new Receiver.AckEntry(key, packet);
ackEntry.data = data;

return ackEntry;
return prepareAckEntry(socketAddress, dataBytes, data, lastRefTime);
} catch (Exception e) {
Loggers.PUSH.error("[NACOS-PUSH] failed to prepare data: {} to client: {}, error: {}", data,
client.getSocketAddr(), e);
Loggers.PUSH
.error("[NACOS-PUSH] failed to compress data: {} to client: {}, error: {}", data, socketAddress, e);
return null;
}
}

private static Receiver.AckEntry prepareAckEntry(PushClient client, byte[] dataBytes, Map<String, Object> data,
long lastRefTime) {
String key = getAckKey(client.getSocketAddr().getAddress().getHostAddress(), client.getSocketAddr().getPort(),
lastRefTime);
DatagramPacket packet = null;
return prepareAckEntry(client.socketAddr, dataBytes, data, lastRefTime);
}

private static Receiver.AckEntry prepareAckEntry(InetSocketAddress socketAddress, byte[] dataBytes,
Map<String, Object> data, long lastRefTime) {
String key = getAckKey(socketAddress.getAddress().getHostAddress(), socketAddress.getPort(), lastRefTime);
try {
packet = new DatagramPacket(dataBytes, dataBytes.length, client.socketAddr);
DatagramPacket packet = new DatagramPacket(dataBytes, dataBytes.length, socketAddress);
Receiver.AckEntry ackEntry = new Receiver.AckEntry(key, packet);
// we must store the key be fore send, otherwise there will be a chance the
// ack returns before we put in
ackEntry.data = data;

return ackEntry;
} catch (Exception e) {
Loggers.PUSH.error("[NACOS-PUSH] failed to prepare data: {} to client: {}, error: {}", data,
client.getSocketAddr(), e);
Loggers.PUSH
.error("[NACOS-PUSH] failed to prepare data: {} to client: {}, error: {}", data, socketAddress, e);
}

return null;
}

Expand Down Expand Up @@ -577,11 +600,14 @@ private static byte[] compressIfNecessary(byte[] dataBytes) throws IOException {
}

private static Map<String, Object> prepareHostsData(PushClient client) throws Exception {
Map<String, Object> cmd = new HashMap<String, Object>(2);
cmd.put("type", "dom");
cmd.put("data", client.getDataSource().getData(client));

return cmd;
return prepareHostsData(client.getDataSource().getData(client));
}

private static Map<String, Object> prepareHostsData(String dataContent) {
Map<String, Object> result = new HashMap<String, Object>(2);
result.put("type", "dom");
result.put("data", dataContent);
return result;
}

private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) {
Expand Down
Loading