From 660d58178f45febca717f8e45b3a2806da8681f1 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan <861923274@qq.com> Date: Thu, 10 Jun 2021 18:17:06 +0800 Subject: [PATCH] [ISSUE #359] Split handler from controller (#359) (#360) * [ISSUE #359] Split handler from controller (#359) * add license header * add ut --- .../EventMeshMsgDownStreamHandler.java | 166 ++++++++++++++++++ .../ShowClientBySystemAndDcnHandler.java | 92 ++++++++++ 2 files changed, 258 insertions(+) create mode 100644 eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/EventMeshMsgDownStreamHandler.java create mode 100644 eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowClientBySystemAndDcnHandler.java diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/EventMeshMsgDownStreamHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/EventMeshMsgDownStreamHandler.java new file mode 100644 index 0000000000..cdc086e8e8 --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/EventMeshMsgDownStreamHandler.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.admin.handler; + +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import org.apache.eventmesh.runtime.boot.EventMeshTCPServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class EventMeshMsgDownStreamHandler implements HttpHandler { + + private static final Logger logger = LoggerFactory.getLogger(EventMeshMsgDownStreamHandler.class); + + private final EventMeshTCPServer eventMeshTCPServer; + + public EventMeshMsgDownStreamHandler(EventMeshTCPServer eventMeshTCPServer) { + this.eventMeshTCPServer = eventMeshTCPServer; + } + + @Override + public void handle(HttpExchange httpExchange) throws IOException { + String result = "false"; + OutputStream out = httpExchange.getResponseBody(); + try { +// Map queryStringInfo = parsePostParameters(httpExchange); +// String msgStr = (String)queryStringInfo.get("msg"); +// String groupName = (String)queryStringInfo.get("group"); +// logger.info("recieve msg from other eventMesh, group:{}, msg:{}", groupName, msgStr); +// if (StringUtils.isBlank(msgStr) || StringUtils.isBlank(groupName)) { +// logger.warn("msg or groupName is null"); +// httpExchange.sendResponseHeaders(200, 0); +// out.write(result.getBytes()); +// return; +// } +// MessageExt messageExt = JSON.parseObject(msgStr, MessageExt.class); +// String topic = messageExt.getTopic(); +// +// if (!EventMeshUtil.isValidRMBTopic(topic)) { +// logger.warn("msg topic is illegal"); +// httpExchange.sendResponseHeaders(200, 0); +// out.write(result.getBytes()); +// return; +// } +// +// DownstreamDispatchStrategy downstreamDispatchStrategy = eventMeshTCPServer.getClientSessionGroupMapping().getClientGroupWrapper(groupName).getDownstreamDispatchStrategy(); +// Set groupConsumerSessions = eventMeshTCPServer.getClientSessionGroupMapping().getClientGroupWrapper(groupName).getGroupConsumerSessions(); +// Session session = downstreamDispatchStrategy.select(groupName, topic, groupConsumerSessions); +// +// if(session == null){ +// logger.error("DownStream msg,retry other eventMesh found no session again"); +// httpExchange.sendResponseHeaders(200, 0); +// out.write(result.getBytes()); +// return; +// } +// +// DownStreamMsgContext downStreamMsgContext = +// new DownStreamMsgContext(messageExt, session, eventMeshTCPServer.getClientSessionGroupMapping().getClientGroupWrapper(groupName).getPersistentMsgConsumer(), null, true); +// eventMeshTCPServer.getClientSessionGroupMapping().getClientGroupWrapper(groupName).getDownstreamMap().putIfAbsent(downStreamMsgContext.seq, downStreamMsgContext); +// +// if (session.isCanDownStream()) { +// session.downstreamMsg(downStreamMsgContext); +// httpExchange.sendResponseHeaders(200, 0); +// result = "true"; +// out.write(result.getBytes()); +// return; +// } +// +// logger.warn("EventMeshMsgDownStreamHandler|dispatch retry, seq[{}]", downStreamMsgContext.seq); +// long delayTime = EventMeshUtil.isService(downStreamMsgContext.msgExt.getTopic()) ? 0 : eventMeshTCPServer.getAccessConfiguration().eventMeshTcpMsgRetryDelayInMills; +// downStreamMsgContext.delay(delayTime); +// eventMeshTCPServer.getEventMeshTcpRetryer().pushRetry(downStreamMsgContext); +// result = "true"; +// httpExchange.sendResponseHeaders(200, 0); +// out.write(result.getBytes()); + + } catch (Exception e) { + logger.error("EventMeshMsgDownStreamHandler handle fail...", e); + } finally { + if (out != null) { + try { + out.close(); + } catch (IOException e) { + logger.warn("out close failed...", e); + } + } + } + } + + private Map parsePostParameters(HttpExchange exchange) + throws IOException { + Map parameters = new HashMap<>(); + if ("post".equalsIgnoreCase(exchange.getRequestMethod())) { + InputStreamReader isr = + new InputStreamReader(exchange.getRequestBody(), "utf-8"); + BufferedReader br = new BufferedReader(isr); + String query = br.readLine(); + parseQuery(query, parameters); + } + return parameters; + } + + @SuppressWarnings("unchecked") + private void parseQuery(String query, Map parameters) + throws UnsupportedEncodingException { + + if (query != null) { + String pairs[] = query.split("&"); + + for (String pair : pairs) { + String param[] = pair.split("="); + + String key = null; + String value = null; + if (param.length > 0) { + key = URLDecoder.decode(param[0], "UTF-8"); + } + + if (param.length > 1) { + value = URLDecoder.decode(param[1], "UTF-8"); + } + + if (parameters.containsKey(key)) { + Object obj = parameters.get(key); + if (obj instanceof List) { + List values = (List) obj; + values.add(value); + } else if (obj instanceof String) { + List values = new ArrayList(); + values.add((String) obj); + values.add(value); + parameters.put(key, values); + } + } else { + parameters.put(key, value); + } + } + } + } +} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowClientBySystemAndDcnHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowClientBySystemAndDcnHandler.java new file mode 100644 index 0000000000..9ccd547291 --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowClientBySystemAndDcnHandler.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.admin.handler; + +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import org.apache.eventmesh.common.protocol.tcp.UserAgent; +import org.apache.eventmesh.runtime.boot.EventMeshTCPServer; +import org.apache.eventmesh.runtime.constants.EventMeshConstants; +import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping; +import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session; +import org.apache.eventmesh.runtime.util.NetUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class ShowClientBySystemAndDcnHandler implements HttpHandler { + + private static final Logger logger = LoggerFactory.getLogger(ShowClientBySystemAndDcnHandler.class); + + private final EventMeshTCPServer eventMeshTCPServer; + + public ShowClientBySystemAndDcnHandler(EventMeshTCPServer eventMeshTCPServer) { + this.eventMeshTCPServer = eventMeshTCPServer; + } + + /** + * print clientInfo by subsys and dcn + * + * @param httpExchange + * @throws IOException + */ + @Override + public void handle(HttpExchange httpExchange) throws IOException { + String result = ""; + OutputStream out = httpExchange.getResponseBody(); + try { + String queryString = httpExchange.getRequestURI().getQuery(); + Map queryStringInfo = NetUtils.formData2Dic(queryString); + String dcn = queryStringInfo.get(EventMeshConstants.MANAGE_DCN); + String subSystem = queryStringInfo.get(EventMeshConstants.MANAGE_SUBSYSTEM); + + String newLine = System.getProperty("line.separator"); + logger.info("showClientBySubsysAndDcn,subsys:{},dcn:{}=================", subSystem, dcn); + ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping(); + ConcurrentHashMap sessionMap = clientSessionGroupMapping.getSessionMap(); + if (!sessionMap.isEmpty()) { + for (Session session : sessionMap.values()) { + if (session.getClient().getDcn().equals(dcn) && session.getClient().getSubsystem().equals(subSystem)) { + UserAgent userAgent = session.getClient(); + result += String.format("pid=%s | ip=%s | port=%s | path=%s | purpose=%s", userAgent.getPid(), userAgent + .getHost(), userAgent.getPort(), userAgent.getPath(), userAgent.getPurpose()) + newLine; + } + } + } + httpExchange.sendResponseHeaders(200, 0); + out.write(result.getBytes()); + } catch (Exception e) { + logger.error("ShowClientBySystemAndDcnHandler fail...", e); + } finally { + if (out != null) { + try { + out.close(); + } catch (IOException e) { + logger.warn("out close failed...", e); + } + } + } + } + + +}