diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..8dada3e --- /dev/null +++ b/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/assembly.xml b/assembly.xml new file mode 100644 index 0000000..7315c22 --- /dev/null +++ b/assembly.xml @@ -0,0 +1,49 @@ + + + release-${project.version} + mpush-alloc-${project.version} + true + + tar.gz + + + + ./ + + + LICENSE + README.md + + + + ./bin/ + bin + + *.sh + *.cmd + + + + ./conf/ + conf + + mpush.conf + + + + target/ + bin + + bootstrap.jar + + + + + + false + runtime + false + lib + + + diff --git a/bin/env-mp.sh b/bin/env-mp.sh new file mode 100644 index 0000000..66fc23e --- /dev/null +++ b/bin/env-mp.sh @@ -0,0 +1,122 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This script should be sourced into other mpush +# scripts to setup the env variables + +# We use MP_CFG_DIR if defined, +# otherwise we use /etc/mp +# or the conf directory that is +# a sibling of this script's directory + +MP_BIN_DIR="${MP_BIN_DIR:-/usr/bin}" +MPUSH_PREFIX="${MP_BIN_DIR}/.." + +if [ "x$MP_CFG_DIR" = "x" ] +then + if [ -e "${MPUSH_PREFIX}/conf" ]; then + MP_CFG_DIR="$MP_BIN_DIR/../conf" + else + MP_CFG_DIR="$MP_BIN_DIR/../etc/mpush" + fi +fi + +if [ -f "${MP_BIN_DIR}/set-env.sh" ]; then + . "${MP_BIN_DIR}/set-env.sh" +fi + +if [ "x$MP_CFG" = "x" ] +then + MP_CFG="mpush.conf" +fi + +MP_CFG="$MP_CFG_DIR/$MP_CFG" + +if [ -f "$MP_BIN_DIR/java.env" ] +then + . "$MP_BIN_DIR/java.env" +fi + +if [ "x${MP_DATA_DIR}" = "x" ] +then + MP_DATA_DIR="${MPUSH_PREFIX}/tmp" +fi + +if [ "x${MP_LOG_DIR}" = "x" ] +then + MP_LOG_DIR="${MPUSH_PREFIX}/logs" +fi + +if [ "x${MP_LOG4J_PROP}" = "x" ] +then + MP_LOG4J_PROP="INFO,CONSOLE" +fi + +if [ "$JAVA_HOME" != "" ]; then + JAVA="$JAVA_HOME/bin/java" +else + JAVA=java +fi + + +#add the conf dir to classpath +CLASSPATH="$MP_CFG_DIR:$CLASSPATH" + +for i in "$MP_BIN_DIR"/../src/java/lib/*.jar +do + CLASSPATH="$i:$CLASSPATH" +done + +#make it work in the binary package +#(use array for LIB_PATH to account for spaces within wildcard expansion) +if [ -e "${MPUSH_PREFIX}"/share/mpush/mpush-*.jar ]; then + LIB_PATH=("${MPUSH_PREFIX}"/share/mpush/*.jar) +else + #release tarball format + for i in "$MP_BIN_DIR"/../mpush-*.jar + do + CLASSPATH="$i:$CLASSPATH" + done + LIB_PATH=("${MP_BIN_DIR}"/../lib/*.jar) +fi + +for i in "${LIB_PATH[@]}" +do + CLASSPATH="$i:$CLASSPATH" +done + +#make it work for developers +for d in "$MP_BIN_DIR"/../build/lib/*.jar +do + CLASSPATH="$d:$CLASSPATH" +done + +#make it work for developers +CLASSPATH="$MP_BIN_DIR/../build/classes:$CLASSPATH" + + +case "`uname`" in + CYGWIN*) cygwin=true ;; + *) cygwin=false ;; +esac + +if $cygwin +then + CLASSPATH=`cygpath -wp "$CLASSPATH"` +fi + +#echo "CLASSPATH=$CLASSPATH" \ No newline at end of file diff --git a/bin/mp.sh b/bin/mp.sh new file mode 100644 index 0000000..589412b --- /dev/null +++ b/bin/mp.sh @@ -0,0 +1,250 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# +# If this scripted is run out of /usr/bin or some other system bin directory +# it should be linked to and not copied. Things like java jar files are found +# relative to the canonical path of this script. +# + + + +# use POSTIX interface, symlink is followed automatically +MP_BIN="${BASH_SOURCE-$0}" +MP_BIN="$(dirname "${MP_BIN}")" +MP_BIN_DIR="$(cd "${MP_BIN}"; pwd)" + +if [ -e "$MP_BIN/../libexec/env-mp.sh" ]; then + . "$MP_BIN_DIR/../libexec/env-mp.sh" +else + . "$MP_BIN_DIR/env-mp.sh" +fi + +# See the following page for extensive details on setting +# up the JVM to accept JMX remote management: +# http://java.sun.com/javase/6/docs/technotes/guides/management/agent.html +# by default we allow local JMX connections +if [ "x$JMXLOCALONLY" = "x" ] +then + JMXLOCALONLY=false +fi + +if [ "x$JMXDISABLE" = "x" ] || [ "$JMXDISABLE" = 'false' ] +then + echo "MPush JMX enabled by default" >&2 + if [ "x$JMXPORT" = "x" ] + then + # for some reason these two options are necessary on jdk6 on Ubuntu + # accord to the docs they are not necessary, but otw jconsole cannot + # do a local attach + MP_MAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=$JMXLOCALONLY" + else + if [ "x$JMXAUTH" = "x" ] + then + JMXAUTH=false + fi + if [ "x$JMXSSL" = "x" ] + then + JMXSSL=false + fi + if [ "x$JMXLOG4J" = "x" ] + then + JMXLOG4J=true + fi + echo "MPush remote JMX Port set to $JMXPORT" >&2 + echo "MPush remote JMX authenticate set to $JMXAUTH" >&2 + echo "MPush remote JMX ssl set to $JMXSSL" >&2 + echo "MPush remote JMX log4j set to $JMXLOG4J" >&2 + MP_MAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=$JMXPORT -Dcom.sun.management.jmxremote.authenticate=$JMXAUTH -Dcom.sun.management.jmxremote.ssl=$JMXSSL -Dmpush.jmx.log4j.disable=$JMXLOG4J" + fi +else + echo "JMX disabled by user request" >&2 + MP_MAIN="" +fi + +MP_MAIN="$MP_MAIN -jar $MP_BIN_DIR/bootstrap.jar" + +if [ "x$SERVER_JVM_FLAGS" != "x" ] +then + JVM_FLAGS="$SERVER_JVM_FLAGS $JVM_FLAGS" +fi + +if [ "x$2" != "x" ] +then + MP_CFG="$MP_CFG_DIR/$2" +fi + +# if we give a more complicated path to the config, don't screw around in $MP_CFG_DIR +if [ "x$(dirname "$MP_CFG")" != "x$MP_CFG_DIR" ] +then + MP_CFG="$2" +fi + +if $cygwin +then + MP_CFG=`cygpath -wp "$MP_CFG"` + # cygwin has a "kill" in the shell itself, gets confused + KILL=/bin/kill +else + KILL=kill +fi + +echo "Using config: $MP_CFG" >&2 + +case "$OSTYPE" in +*solaris*) + GREP=/usr/xpg4/bin/grep + ;; +*) + GREP=grep + ;; +esac +if [ -z "$MP_PID_FILE" ]; then +# MP_DATA_DIR="$($GREP "^[[:space:]]*dataDir" "$MP_CFG" | sed -e 's/.*=//')" + if [ ! -d "$MP_DATA_DIR" ]; then + mkdir -p "$MP_DATA_DIR" + fi + MP_PID_FILE="$MP_DATA_DIR/mpush_server.pid" +else + # ensure it exists, otw stop will fail + mkdir -p "$(dirname "$MP_PID_FILE")" +fi + +if [ ! -w "$MP_LOG_DIR" ] ; then +echo $MP_LOG_DIR +mkdir -p "$MP_LOG_DIR" +fi + +_MP_DAEMON_OUT="$MP_LOG_DIR/mpush.out" + +case $1 in +start) + echo -n "Starting mpush ... " + if [ -f "$MP_PID_FILE" ]; then + if kill -0 `cat "$MP_PID_FILE"` > /dev/null 2>&1; then + echo $command already running as process `cat "$MP_PID_FILE"`. + exit 0 + fi + fi + nohup "$JAVA" "-Dmp.conf=$MP_CFG" "-Dmp.log.dir=${MP_LOG_DIR}" "-Dmp.root.logger=${MP_LOG4J_PROP}" \ + -cp "$CLASSPATH" $JVM_FLAGS $MP_MAIN > "$_MP_DAEMON_OUT" 2>&1 < /dev/null & + if [ $? -eq 0 ] + then + case "$OSTYPE" in + *solaris*) + /bin/echo "${!}\\c" > "$MP_PID_FILE" + ;; + *) + /bin/echo -n $! > "$MP_PID_FILE" + ;; + esac + if [ $? -eq 0 ]; + then + sleep 1 + echo STARTED + else + echo FAILED TO WRITE PID + exit 1 + fi + else + echo SERVER DID NOT START + exit 1 + fi + ;; +start-foreground) + MP_CMD=(exec "$JAVA") + if [ "${MP_NOEXEC}" != "" ]; then + MP_CMD=("$JAVA") + fi + "${MP_CMD[@]}" "-Dmp.log.dir=${MP_LOG_DIR}" "-Dmp.root.logger=${MP_LOG4J_PROP}" \ + -cp "$CLASSPATH" $JVM_FLAGS $MP_MAIN "-Dmp.conf=$MP_CFG" + ;; +print-cmd) + echo "\"$JAVA\" $MP_MAIN " + echo "\"-Dmp.conf=$MP_CFG\" -Dmp.log.dir=\"${MP_LOG_DIR}\" -Dmp.root.logger=\"${MP_LOG4J_PROP}\" " + echo "$JVM_FLAGS " + echo "-cp \"$CLASSPATH\" " + echo "> \"$_MP_DAEMON_OUT\" 2>&1 < /dev/null" + ;; +stop) + echo "Stopping mpush ... " + if [ ! -f "$MP_PID_FILE" ] + then + echo "no mpush to stop (could not find file $MP_PID_FILE)" + else + $KILL -15 $(cat "$MP_PID_FILE") + SLEEP=30 + SLEEP_COUNT=1 + while [ $SLEEP -ge 0 ]; do + kill -0 $(cat "$MP_PID_FILE") >/dev/null 2>&1 + if [ $? -gt 0 ]; then + rm -f "$MP_PID_FILE" >/dev/null 2>&1 + if [ $? != 0 ]; then + if [ -w "$MP_PID_FILE" ]; then + cat /dev/null > "$MP_PID_FILE" + else + echo "The PID file could not be removed or cleared." + fi + fi + echo STOPPED + break + fi + if [ $SLEEP -gt 0 ]; then + echo "stopping ... $SLEEP_COUNT" + sleep 1 + fi + if [ $SLEEP -eq 0 ]; then + echo "MPUSH did not stop in time." + echo "To aid diagnostics a thread dump has been written to standard out." + kill -3 `cat "$MP_PID_FILE"` + echo "force stop MPUSH." + kill -9 `cat "$MP_PID_FILE"` + echo STOPPED + fi + SLEEP=`expr $SLEEP - 1` + SLEEP_COUNT=`expr $SLEEP_COUNT + 1` + done + fi + exit 0 + ;; +upgrade) + shift + echo "upgrading the servers to 3.*" + "$JAVA" "-Dmpush.log.dir=${MP_LOG_DIR}" "-Dmpush.root.logger=${MP_LOG4J_PROP}" \ + -cp "$CLASSPATH" $JVM_FLAGS com.mpush.tools.upgrade.UpgradeMain ${@} + echo "Upgrading ... " + ;; +restart) + shift + "$0" stop ${@} + sleep 5 + "$0" start ${@} + ;; +status) + # -q is necessary on some versions of linux where nc returns too quickly, and no stat result is output + clientPortAddress=`$GREP "^[[:space:]]*clientPortAddress[^[:alpha:]]" "$MP_CFG" | sed -e 's/.*=//'` + if ! [ $clientPortAddress ] + then + clientPortAddress="localhost" + fi + clientPort=`$GREP "^[[:space:]]*connect-server-port[^[:alpha:]]" "$MP_CFG" | sed -e 's/.*=//'` + telnet 127.0.0.1 3002 + ;; +*) + echo "Usage: $0 {start|start-foreground|stop|restart|status|upgrade|print-cmd}" >&2 + +esac \ No newline at end of file diff --git a/bin/set-env.sh b/bin/set-env.sh new file mode 100644 index 0000000..9f7bf9b --- /dev/null +++ b/bin/set-env.sh @@ -0,0 +1,2 @@ +#!/usr/bin/env bash +#JVM_FLAGS="-Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=8008 -Dio.netty.leakDetectionLevel=advanced" \ No newline at end of file diff --git a/conf/mpush.conf b/conf/mpush.conf new file mode 100644 index 0000000..c170593 --- /dev/null +++ b/conf/mpush.conf @@ -0,0 +1,3 @@ +mp.log.level=debug +mp.zk.server-address="127.0.0.1:2181" +mp.net.alloc-server-port=9999 diff --git a/pom.xml b/pom.xml index f7f6b6e..4a58932 100644 --- a/pom.xml +++ b/pom.xml @@ -5,8 +5,8 @@ 4.0.0 com.mpush - mpush-alloc - 1.0 + alloc + 0.0.4 mpush-alloc mpush Server Allocator https://github.com/mpusher/mpush @@ -73,6 +73,7 @@ + bootstrap org.apache.maven.plugins @@ -92,21 +93,57 @@ true - - maven-assembly-plugin - - - - com.shinemo.mpush.alloc.Main - - - - - jar-with-dependencies - - - - + + + + zip + + + + maven-jar-plugin + + + + false + + + + true + + ../lib/ + + com.shinemo.mpush.alloc.Main + + + + + + package + + + + + maven-assembly-plugin + 2.6 + + alloc + + assembly.xml + + + + + package + + single + + + + + + + + \ No newline at end of file diff --git a/src/main/java/com/shinemo/mpush/alloc/AllocHandler.java b/src/main/java/com/shinemo/mpush/alloc/AllocHandler.java new file mode 100644 index 0000000..0467cfd --- /dev/null +++ b/src/main/java/com/shinemo/mpush/alloc/AllocHandler.java @@ -0,0 +1,127 @@ +/* + * (C) Copyright 2015-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Contributors: + * ohun@live.cn (夜色) + */ + +package com.shinemo.mpush.alloc; + +import com.mpush.cache.redis.RedisKey; +import com.mpush.cache.redis.manager.RedisManager; +import com.mpush.common.user.UserManager; +import com.mpush.zk.ZKClient; +import com.mpush.zk.listener.ZKServerNodeWatcher; +import com.mpush.zk.node.ZKServerNode; +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.Charset; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * Created by ohun on 16/9/22. + * + * @author ohun@live.cn (夜色) + */ +/*package*/ final class AllocHandler implements HttpHandler { + + private Charset UTF_8 = Charset.forName("UTF-8"); + private final ZKServerNodeWatcher watcher; + private List serverNodes = Collections.emptyList(); + private ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor(); + + public AllocHandler() { + //ZKClient.I.start();//启动ZK + watcher = ZKServerNodeWatcher.buildConnect();//监听长链接服务器节点 + watcher.beginWatch(); + RedisManager.I.init(); + scheduledExecutor.scheduleAtFixedRate(this::refresh, 0, 5, TimeUnit.MINUTES); + } + + public void stop() { + ZKClient.I.stop(); + scheduledExecutor.shutdown(); + } + + public void handle(HttpExchange exchange) throws IOException { + //3.格式组装 ip:port,ip:port + StringBuilder sb = new StringBuilder(); + Iterator it = serverNodes.iterator(); + if (it.hasNext()) { + ZKServerNode node = it.next(); + sb.append(node.getExtranetIp()).append(':').append(node.getPort()); + } + + while (it.hasNext()) { + ZKServerNode node = it.next(); + sb.append(',').append(node.getExtranetIp()).append(':').append(node.getPort()); + } + + byte[] data = sb.toString().getBytes(UTF_8); + exchange.sendResponseHeaders(200, data.length);//200, content-length + OutputStream out = exchange.getResponseBody(); + out.write(data); + out.close(); + } + + /** + * 从zk中获取可提供服务的机器,并以在线用户量排序 + */ + private void refresh() { + //1.从缓存中拿取可用对长链接服务器IP + Collection nodes = watcher.getCache().values(); + if (nodes.size() > 0) { + //2.对serverNodes可以按某种规则排序,以便实现负载均衡,比如:随机,轮询,链接数量等 + this.serverNodes = nodes.stream().map(this::convert).sorted(ServerNode::compareTo).collect(Collectors.toList()); + } + } + + + private long getOnlineUserNum(String publicIP) { + return UserManager.I.getOnlineUserNum(publicIP); + } + + private ServerNode convert(ZKServerNode node) { + ServerNode serverNode = new ServerNode(); + serverNode.setExtranetIp(node.getExtranetIp()); + serverNode.setIp(node.getIp()); + serverNode.setPort(node.getPort()); + serverNode.setOnlineUserNum(getOnlineUserNum(node.getExtranetIp())); + return serverNode; + } + + public static class ServerNode extends ZKServerNode implements Comparable { + long onlineUserNum = 0; + + public void setOnlineUserNum(long onlineUserNum) { + this.onlineUserNum = onlineUserNum; + } + + @Override + public int compareTo(ServerNode o) { + return Long.compare(onlineUserNum, o.onlineUserNum); + } + } +} diff --git a/src/main/java/com/shinemo/mpush/alloc/AllocServer.java b/src/main/java/com/shinemo/mpush/alloc/AllocServer.java index 5c56728..56a521e 100644 --- a/src/main/java/com/shinemo/mpush/alloc/AllocServer.java +++ b/src/main/java/com/shinemo/mpush/alloc/AllocServer.java @@ -1,23 +1,28 @@ package com.shinemo.mpush.alloc; +import com.mpush.api.service.BaseService; +import com.mpush.api.service.Listener; +import com.mpush.api.service.ServiceException; import com.mpush.cache.redis.RedisKey; import com.mpush.cache.redis.manager.RedisManager; +import com.mpush.tools.config.CC; +import com.mpush.tools.log.Logs; import com.mpush.zk.ZKClient; import com.mpush.zk.listener.ZKServerNodeWatcher; import com.mpush.zk.node.ZKServerNode; import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; +import com.sun.net.httpserver.HttpServer; import java.io.IOException; import java.io.OutputStream; +import java.net.InetSocketAddress; import java.nio.charset.Charset; import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.stream.Collectors; /** @@ -25,85 +30,33 @@ * * @author ohun@live.cn */ -public class AllocServer implements HttpHandler { +public final class AllocServer extends BaseService { - private Charset UTF_8 = Charset.forName("UTF-8"); - private final ZKServerNodeWatcher watcher; - private List serverNodes = Collections.emptyList(); - private ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor(); + private HttpServer httpServer; - public AllocServer() { - //ZKClient.I.start();//启动ZK - watcher = ZKServerNodeWatcher.buildConnect();//监听长链接服务器节点 - watcher.beginWatch(); - RedisManager.I.init(); - scheduledExecutor.scheduleAtFixedRate(this::refresh, 0, 5, TimeUnit.MINUTES); - } - - public void stop() { - ZKClient.I.stop(); - scheduledExecutor.shutdown(); - } - - public void handle(HttpExchange exchange) throws IOException { - //3.格式组装 ip:port,ip:port - StringBuilder sb = new StringBuilder(); - Iterator it = serverNodes.iterator(); - if (it.hasNext()) { - ZKServerNode node = it.next(); - sb.append(node.getExtranetIp()).append(':').append(node.getPort()); + @Override + public void init() { + try { + int port = CC.mp.net.cfg.getInt("alloc-server-port"); + httpServer = HttpServer.create(new InetSocketAddress(port), 0); + } catch (IOException e) { + throw new ServiceException(e); } - - while (it.hasNext()) { - ZKServerNode node = it.next(); - sb.append(',').append(node.getExtranetIp()).append(':').append(node.getPort()); - } - - byte[] data = sb.toString().getBytes(UTF_8); - exchange.sendResponseHeaders(200, data.length);//200, content-length - OutputStream out = exchange.getResponseBody(); - out.write(data); - out.close(); + httpServer.setExecutor(Executors.newCachedThreadPool()); + httpServer.createContext("/push", new PushHandler());//模拟发送push + httpServer.createContext("/", new AllocHandler());//模拟Alloc } - /** - * 从zk中获取可提供服务的机器,并以在线用户量排序 - */ - private void refresh() { - //1.从缓存中拿取可用对长链接服务器IP - Collection nodes = watcher.getCache().values(); - if (nodes.size() > 0) { - //2.对serverNodes可以按某种规则排序,以便实现负载均衡,比如:随机,轮询,链接数量等 - this.serverNodes = nodes.stream().map(this::convert).sorted(ServerNode::compareTo).collect(Collectors.toList()); - } + @Override + protected void doStart(Listener listener) throws Throwable { + httpServer.start(); + Logs.Console.info("==================================================================="); + Logs.Console.info("====================ALLOC SERVER START SUCCESS====================="); + Logs.Console.info("==================================================================="); } - - private long getOnlineUserNum(String publicIP) { - String online_key = RedisKey.getUserOnlineKey(publicIP); - Long value = RedisManager.I.zCard(online_key); - return value == null ? 0 : value; - } - - private ServerNode convert(ZKServerNode node) { - ServerNode serverNode = new ServerNode(); - serverNode.setExtranetIp(node.getExtranetIp()); - serverNode.setIp(node.getIp()); - serverNode.setPort(node.getPort()); - serverNode.setOnlineUserNum(getOnlineUserNum(node.getExtranetIp())); - return serverNode; - } - - public static class ServerNode extends ZKServerNode implements Comparable { - long onlineUserNum = 0; - - public void setOnlineUserNum(long onlineUserNum) { - this.onlineUserNum = onlineUserNum; - } - - @Override - public int compareTo(ServerNode o) { - return Long.compare(onlineUserNum, o.onlineUserNum); - } + @Override + protected void doStop(Listener listener) throws Throwable { + httpServer.stop(60);//1 min } } diff --git a/src/main/java/com/shinemo/mpush/alloc/Main.java b/src/main/java/com/shinemo/mpush/alloc/Main.java index e80dd91..83d18a9 100644 --- a/src/main/java/com/shinemo/mpush/alloc/Main.java +++ b/src/main/java/com/shinemo/mpush/alloc/Main.java @@ -31,13 +31,24 @@ * @author ohun@live.cn (夜色) */ public class Main { - public static void main(String[] args) throws IOException {//正式环境可以用tomcat - HttpServer httpServer = HttpServer.create(new InetSocketAddress(9999), 0); - httpServer.createContext("/push", new PushHandler());//模拟发送push - httpServer.createContext("/", new AllocServer());//模拟Alloc - httpServer.start(); - Logs.Console.info("==================================================================="); - Logs.Console.info("====================ALLOC SERVER START SUCCESS====================="); - Logs.Console.info("==================================================================="); + + public static void main(String[] args) throws IOException { + AllocServer server = new AllocServer(); + server.start(); + addHook(server); + } + + private static void addHook(AllocServer server) { + Runtime.getRuntime().addShutdownHook( + new Thread(() -> { + try { + server.stop(); + } catch (Exception e) { + Logs.Console.error("alloc server stop ex", e); + } + Logs.Console.info("jvm exit, all service stopped..."); + + }, "mpush-shutdown-hook-thread") + ); } } diff --git a/src/main/java/com/shinemo/mpush/alloc/PushHandler.java b/src/main/java/com/shinemo/mpush/alloc/PushHandler.java index febf5f0..c9f8cbc 100644 --- a/src/main/java/com/shinemo/mpush/alloc/PushHandler.java +++ b/src/main/java/com/shinemo/mpush/alloc/PushHandler.java @@ -41,7 +41,7 @@ * * @author ohun@live.cn (夜色) */ -public class PushHandler implements HttpHandler { +/*package*/ final class PushHandler implements HttpHandler { private final Logger logger = LoggerFactory.getLogger(this.getClass()); private final PushSender pushSender = PushSender.create(); private final AtomicInteger idSeq = new AtomicInteger();