diff --git a/modAionImpl/test/org/aion/zero/impl/sync/BlockPropagationTest.java b/modAionImpl/test/org/aion/zero/impl/sync/BlockPropagationTest.java index 5b6fe0d228..920f04776e 100644 --- a/modAionImpl/test/org/aion/zero/impl/sync/BlockPropagationTest.java +++ b/modAionImpl/test/org/aion/zero/impl/sync/BlockPropagationTest.java @@ -81,6 +81,42 @@ public long getTimestamp() { public String getBinaryVersion() { return ""; } + + @Override + public void setPort(int _port) { throw new IllegalStateException("not implemented"); } + + @Override + public void setConnection(String _connection) { throw new IllegalStateException("not implemented"); } + + @Override + public IPeerMetric getPeerMetric() { throw new IllegalStateException("not implemented"); } + + @Override + public void refreshTimestamp() { throw new IllegalStateException("not implemented"); } + + @Override + public void setChannel(SocketChannel _channel) { throw new IllegalStateException("not implemented"); } + + @Override + public void setId(byte[] _id) { throw new IllegalStateException("not implemented"); } + + @Override + public void setBinaryVersion(String _revision) { throw new IllegalStateException("not implemented"); } + + @Override + public boolean getIfFromBootList() { throw new IllegalStateException("not implemented"); } + + @Override + public byte[] getBestBlockHash() { throw new IllegalStateException("not implemented"); } + + @Override + public String getConnection() { throw new IllegalStateException("not implemented"); } + + @Override + public SocketChannel getChannel() { throw new IllegalStateException("not implemented"); } + + @Override + public void setFromBootList(boolean _ifBoot) { throw new IllegalStateException("not implemented"); } } private static class P2pMock implements IP2pMgr { @@ -138,6 +174,30 @@ public void closeSocket(SocketChannel _sc, String _reason) {} public int getSelfIdHash() { return 0; } + + @Override + public void dropActive(int _nodeIdHash, String _reason) { throw new IllegalStateException("not implemented."); } + + @Override + public void configChannel(SocketChannel _channel) { + throw new IllegalStateException("not implemented."); + } + + @Override + public int getMaxActiveNodes() { throw new IllegalStateException("not implemented."); } + + @Override + public boolean isSyncSeedsOnly() { throw new IllegalStateException("not implemented."); } + + @Override + public int getMaxTempNodes() { throw new IllegalStateException("not implemented."); } + + @Override + public boolean validateNode(INode _node) { throw new IllegalStateException("not implemented."); } + + @Override + public int getSelfNetId() { throw new IllegalStateException("not implemented."); } + } private static List generateDefaultAccounts() { diff --git a/modDbImpl/src/org/aion/db/impl/h2/H2MVMap.java b/modDbImpl/src/org/aion/db/impl/h2/H2MVMap.java index fc08918bc2..9492420d81 100644 --- a/modDbImpl/src/org/aion/db/impl/h2/H2MVMap.java +++ b/modDbImpl/src/org/aion/db/impl/h2/H2MVMap.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * Copyright (c) 2017-2018 Aion foundation. * * This file is part of the aion network project. @@ -31,7 +31,7 @@ * Samuel Neves through the BLAKE2 implementation. * Zcash project team. * Bitcoinj team. - ******************************************************************************/ + */ package org.aion.db.impl.h2; import org.aion.base.util.ByteArrayWrapper; @@ -148,7 +148,8 @@ public boolean open() { } catch (Exception e) { if (e instanceof NullPointerException) { LOG.error("Failed to open the database " + this.toString() - + ". A probable cause is that the H2 database cannot access the file path.", e); + + ". A probable cause is that the H2 database cannot access the file path. " + + "Check if you have two instances running on the same database.", e); } else { LOG.error("Failed to open the database " + this.toString() + " due to: ", e); } diff --git a/modDbImpl/src/org/aion/db/impl/leveldb/LevelDB.java b/modDbImpl/src/org/aion/db/impl/leveldb/LevelDB.java index db7b188433..31454d14a1 100644 --- a/modDbImpl/src/org/aion/db/impl/leveldb/LevelDB.java +++ b/modDbImpl/src/org/aion/db/impl/leveldb/LevelDB.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * Copyright (c) 2017-2018 Aion foundation. * * This file is part of the aion network project. @@ -31,7 +31,7 @@ * Samuel Neves through the BLAKE2 implementation. * Zcash project team. * Bitcoinj team. - ******************************************************************************/ + */ package org.aion.db.impl.leveldb; import org.aion.base.util.ByteArrayWrapper; @@ -142,7 +142,14 @@ public boolean open() { try { db = JniDBFactory.factory.open(f, options); } catch (Exception e1) { - LOG.error("Failed to open the database " + this.toString() + " due to: ", e1); + if (e1.getMessage().contains("lock")) { + LOG.error("Failed to open the database " + this.toString() + + "\nCheck if you have two instances running on the same database." + + "\nFailure due to: ", e1); + } else { + LOG.error("Failed to open the database " + this.toString() + " due to: ", e1); + } + if (e1.getMessage() != null && e1.getMessage().contains("No space left on device")) { LOG.error("Shutdown due to lack of disk space."); System.exit(0); diff --git a/modDbImpl/src/org/aion/db/impl/rocksdb/RocksDBWrapper.java b/modDbImpl/src/org/aion/db/impl/rocksdb/RocksDBWrapper.java index 639bd798f3..48b447da40 100644 --- a/modDbImpl/src/org/aion/db/impl/rocksdb/RocksDBWrapper.java +++ b/modDbImpl/src/org/aion/db/impl/rocksdb/RocksDBWrapper.java @@ -1,3 +1,25 @@ +/* + * Copyright (c) 2017-2018 Aion foundation. + * + * This file is part of the aion network project. + * + * The aion network project is free software: you can redistribute it + * and/or modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or any later version. + * + * The aion network project is distributed in the hope that it will + * be useful, but WITHOUT ANY WARRANTY; without even the implied + * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with the aion network project source files. + * If not, see . + * + * Contributors: + * Aion foundation. + */ package org.aion.db.impl.rocksdb; import org.aion.base.util.ByteArrayWrapper; @@ -90,7 +112,13 @@ public boolean open() { try { db = RocksDB.open(options, f.getAbsolutePath()); } catch (RocksDBException e) { - LOG.error("Failed to open the database " + this.toString() + " due to: ", e); + if (e.getMessage().contains("lock")) { + LOG.error("Failed to open the database " + this.toString() + + "\nCheck if you have two instances running on the same database." + + "\nFailure due to: ", e); + } else { + LOG.error("Failed to open the database " + this.toString() + " due to: ", e); + } // close the connection and cleanup if needed close(); diff --git a/modP2p/src/org/aion/p2p/INode.java b/modP2p/src/org/aion/p2p/INode.java index 1b1c7d3614..e9075e9992 100644 --- a/modP2p/src/org/aion/p2p/INode.java +++ b/modP2p/src/org/aion/p2p/INode.java @@ -1,31 +1,29 @@ /* * Copyright (c) 2017-2018 Aion foundation. * - * This file is part of the aion network project. + * This file is part of the aion network project. * - * The aion network project is free software: you can redistribute it - * and/or modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation, either version 3 of - * the License, or any later version. + * The aion network project is free software: you can redistribute it + * and/or modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or any later version. * - * The aion network project is distributed in the hope that it will - * be useful, but WITHOUT ANY WARRANTY; without even the implied - * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. - * See the GNU General Public License for more details. + * The aion network project is distributed in the hope that it will + * be useful, but WITHOUT ANY WARRANTY; without even the implied + * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU General Public License for more details. * - * You should have received a copy of the GNU General Public License - * along with the aion network project source files. - * If not, see . - * - * Contributors to the aion source files in decreasing order of code volume: - * - * Aion foundation. + * You should have received a copy of the GNU General Public License + * along with the aion network project source files. + * If not, see . * + * Contributors: + * Aion foundation. */ - package org.aion.p2p; import java.math.BigInteger; +import java.nio.channels.SocketChannel; /** * @@ -84,4 +82,29 @@ public interface INode { void updateStatus(long _bestBlockNumber, final byte[] _bestBlockHash, BigInteger _totalDifficulty); String getBinaryVersion(); + + boolean getIfFromBootList(); + + byte[] getBestBlockHash(); + + String getConnection(); + + SocketChannel getChannel(); + + void setFromBootList(boolean _ifBoot); + + void setConnection(String _connection); + + IPeerMetric getPeerMetric(); + + void refreshTimestamp(); + + void setChannel(SocketChannel _channel); + + void setId(byte[] _id); + + void setPort(int _port); + + void setBinaryVersion(String _revision); + } diff --git a/modP2p/src/org/aion/p2p/INodeMgr.java b/modP2p/src/org/aion/p2p/INodeMgr.java index ffc8e533ad..0938d84324 100644 --- a/modP2p/src/org/aion/p2p/INodeMgr.java +++ b/modP2p/src/org/aion/p2p/INodeMgr.java @@ -1,5 +1,30 @@ +/* + * Copyright (c) 2017-2018 Aion foundation. + * + * This file is part of the aion network project. + * + * The aion network project is free software: you can redistribute it + * and/or modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or any later version. + * + * The aion network project is distributed in the hope that it will + * be useful, but WITHOUT ANY WARRANTY; without even the implied + * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with the aion network project source files. + * If not, see . + * + * Contributors: + * Aion foundation. + */ package org.aion.p2p; +import java.util.List; +import java.util.Map; + public interface INodeMgr { void timeoutActive(final IP2pMgr _p2pMgr); @@ -10,4 +35,46 @@ public interface INodeMgr { void dropActive(int _nodeIdHash, final IP2pMgr _p2pMgr, String _reason); + void timeoutInbound(IP2pMgr _p2pMgr); + + Map getOutboundNodes(); + + int activeNodesSize(); + + INode tempNodesTake() throws InterruptedException; + + boolean isSeedIp(String _ip); + + void addTempNode(INode _n); + + boolean hasActiveNode(int k); + + void addOutboundNode(INode _n); + + void addInboundNode(INode _n); + + INode allocNode(String ip, int p0); + + INode getActiveNode(int k); + + List getActiveNodesList(); + + int tempNodesSize(); + + INode getInboundNode(int k); + + INode getOutboundNode(int k); + + String dumpNodeInfo(String selfShortId); + + void seedIpAdd(String _ip); + + void shutdown(IP2pMgr _p2pMgr); + + void ban(int _nodeIdHash); + + INode getRandom(); + + Map getActiveNodesMap(); + } diff --git a/modP2p/src/org/aion/p2p/IP2pMgr.java b/modP2p/src/org/aion/p2p/IP2pMgr.java index 58cb082578..f6c0f40295 100644 --- a/modP2p/src/org/aion/p2p/IP2pMgr.java +++ b/modP2p/src/org/aion/p2p/IP2pMgr.java @@ -1,68 +1,56 @@ /* * Copyright (c) 2017-2018 Aion foundation. * - * This file is part of the aion network project. + * This file is part of the aion network project. * - * The aion network project is free software: you can redistribute it - * and/or modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation, either version 3 of - * the License, or any later version. + * The aion network project is free software: you can redistribute it + * and/or modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or any later version. * - * The aion network project is distributed in the hope that it will - * be useful, but WITHOUT ANY WARRANTY; without even the implied - * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. - * See the GNU General Public License for more details. + * The aion network project is distributed in the hope that it will + * be useful, but WITHOUT ANY WARRANTY; without even the implied + * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU General Public License for more details. * - * You should have received a copy of the GNU General Public License - * along with the aion network project source files. - * If not, see . - * - * Contributors to the aion source files in decreasing order of code volume: - * - * Aion foundation. + * You should have received a copy of the GNU General Public License + * along with the aion network project source files. + * If not, see . * + * Contributors: + * Aion foundation. */ - package org.aion.p2p; +import java.io.IOException; import java.nio.channels.SocketChannel; import java.util.List; import java.util.Map; -/** - * @author chris - */ +/** @author chris */ public interface IP2pMgr { + // TODO: need refactor by passing the parameter in the later version to P2pMgr. + int txBroadCastRoute = (Ctrl.SYNC << 8) + 6; // ((Ver.V0 << 16) + (Ctrl.SYNC << 8) + 6); - /** - * @return Map - */ + /** @return Map */ Map getActiveNodes(); - /** - * @param _hs List - */ + /** @param _hs List */ void register(final List _hs); - /** - * @return INode - */ + /** @return INode */ INode getRandom(); /** - * @param _id int + * @param _id int * @param _msg Msg */ void send(int _id, String _displayId, final Msg _msg); - /** - * Used to hook up with kernel to shutdown threads in network module - */ + /** Used to hook up with kernel to shutdown threads in network module. */ void shutdown(); - /** - * start all p2p process - */ + /** Starts all p2p processes. */ void run(); List versions(); @@ -76,4 +64,18 @@ public interface IP2pMgr { boolean isShowLog(); void errCheck(int nodeIdHashcode, String _displayId); + + void dropActive(int _nodeIdHash, String _reason); + + void configChannel(SocketChannel _channel) throws IOException; + + int getMaxActiveNodes(); + + boolean isSyncSeedsOnly(); + + int getMaxTempNodes(); + + boolean validateNode(INode _node); + + int getSelfNetId(); } diff --git a/modP2p/src/org/aion/p2p/IPeerMetric.java b/modP2p/src/org/aion/p2p/IPeerMetric.java new file mode 100644 index 0000000000..c97c8f08ff --- /dev/null +++ b/modP2p/src/org/aion/p2p/IPeerMetric.java @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2017-2018 Aion foundation. + * + * This file is part of the aion network project. + * + * The aion network project is free software: you can redistribute it + * and/or modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or any later version. + * + * The aion network project is distributed in the hope that it will + * be useful, but WITHOUT ANY WARRANTY; without even the implied + * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with the aion network project source files. + * If not, see . + * + * Contributors: + * Aion foundation. + */ +package org.aion.p2p; + +/** + * An interface for tracking peer connection and banning metrics. + */ +public interface IPeerMetric { + + /** + * Returns true only if we should not accept any more connections. + */ + boolean shouldNotConn(); + + /** + * Increments the failed connection counter. + */ + void incFailedCount(); + + /** + * Decrements the failed connection counter. + */ + void decFailedCount(); + + /** + * Sets the current time for tracking a banned connection. + */ + void ban(); + + /** + * Returns true only if the time between now and the last ban is greater than the banned + * connection retry interval. + */ + boolean notBan(); + +} diff --git a/modP2pImpl/src/org/aion/p2p/impl/comm/Node.java b/modP2pImpl/src/org/aion/p2p/impl/comm/Node.java index 3f8eb4e788..761ae3354d 100644 --- a/modP2pImpl/src/org/aion/p2p/impl/comm/Node.java +++ b/modP2pImpl/src/org/aion/p2p/impl/comm/Node.java @@ -1,28 +1,25 @@ /* * Copyright (c) 2017-2018 Aion foundation. * - * This file is part of the aion network project. + * This file is part of the aion network project. * - * The aion network project is free software: you can redistribute it - * and/or modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation, either version 3 of - * the License, or any later version. + * The aion network project is free software: you can redistribute it + * and/or modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or any later version. * - * The aion network project is distributed in the hope that it will - * be useful, but WITHOUT ANY WARRANTY; without even the implied - * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. - * See the GNU General Public License for more details. + * The aion network project is distributed in the hope that it will + * be useful, but WITHOUT ANY WARRANTY; without even the implied + * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU General Public License for more details. * - * You should have received a copy of the GNU General Public License - * along with the aion network project source files. - * If not, see . - * - * Contributors to the aion source files in decreasing order of code volume: - * - * Aion foundation. + * You should have received a copy of the GNU General Public License + * along with the aion network project source files. + * If not, see . * + * Contributors: + * Aion foundation. */ - package org.aion.p2p.impl.comm; import java.math.BigInteger; @@ -31,8 +28,9 @@ import java.util.Arrays; import java.util.regex.Pattern; import org.aion.p2p.INode; +import org.aion.p2p.IPeerMetric; -/* +/** * * @author Chris * p2p://{node-id}@{ip}:{port} @@ -88,7 +86,12 @@ public final class Node implements INode { */ private String connection = ""; - public PeerMetric peerMetric = new PeerMetric(); + public IPeerMetric peerMetric = new PeerMetric(); + + @Override + public IPeerMetric getPeerMetric() { + return this.peerMetric; + } /** * constructor for initial stage of connections from network @@ -182,6 +185,7 @@ public static Node parseP2p(String _p2p) { return new Node(true, _id, _ip, _port); } + @Override public void setFromBootList(boolean _ifBoot) { this.fromBootList = _ifBoot; } @@ -190,6 +194,7 @@ public void setFromBootList(boolean _ifBoot) { * @param _id * byte[] */ + @Override public void setId(final byte[] _id) { this.id = _id; if (_id != null && _id.length == 36) { @@ -202,10 +207,12 @@ public void setId(final byte[] _id) { * @param _port * int */ + @Override public void setPort(final int _port) { this.port = _port; } + @Override public void setBinaryVersion(String _revision) { this.binaryVersion = _revision; } @@ -214,6 +221,7 @@ public void setBinaryVersion(String _revision) { * this method used to keep current node stage on either pending list or active * list */ + @Override public void refreshTimestamp() { this.timestamp = System.currentTimeMillis(); } @@ -222,6 +230,7 @@ public void refreshTimestamp() { * @param _channel * SocketChannel */ + @Override public void setChannel(final SocketChannel _channel) { this.channel = _channel; } @@ -230,13 +239,15 @@ public void setChannel(final SocketChannel _channel) { * @param _connection * String */ - void setConnection(String _connection) { + @Override + public void setConnection(String _connection) { this.connection = _connection; } /** * @return boolean */ + @Override public boolean getIfFromBootList() { return this.fromBootList; } @@ -270,6 +281,7 @@ public String getBinaryVersion() { /** * @return SocketChannel */ + @Override public SocketChannel getChannel() { return this.channel; } @@ -287,7 +299,8 @@ public int getIdHash() { /** * @return String */ - String getConnection() { + @Override + public String getConnection() { return this.connection; } @@ -301,7 +314,8 @@ public long getBestBlockNumber() { return this.bestBlockNumber; } - byte[] getBestBlockHash() { + @Override + public byte[] getBestBlockHash() { return this.bestBlockHash; } diff --git a/modP2pImpl/src/org/aion/p2p/impl/comm/NodeMgr.java b/modP2pImpl/src/org/aion/p2p/impl/comm/NodeMgr.java index abe281287d..3dd908915d 100644 --- a/modP2pImpl/src/org/aion/p2p/impl/comm/NodeMgr.java +++ b/modP2pImpl/src/org/aion/p2p/impl/comm/NodeMgr.java @@ -1,28 +1,25 @@ /* * Copyright (c) 2017-2018 Aion foundation. * - * This file is part of the aion network project. + * This file is part of the aion network project. * - * The aion network project is free software: you can redistribute it - * and/or modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation, either version 3 of - * the License, or any later version. + * The aion network project is free software: you can redistribute it + * and/or modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or any later version. * - * The aion network project is distributed in the hope that it will - * be useful, but WITHOUT ANY WARRANTY; without even the implied - * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. - * See the GNU General Public License for more details. + * The aion network project is distributed in the hope that it will + * be useful, but WITHOUT ANY WARRANTY; without even the implied + * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU General Public License for more details. * - * You should have received a copy of the GNU General Public License - * along with the aion network project source files. - * If not, see . - * - * Contributors to the aion source files in decreasing order of code volume: - * - * Aion foundation. + * You should have received a copy of the GNU General Public License + * along with the aion network project source files. + * If not, see . * + * Contributors: + * Aion foundation. */ - package org.aion.p2p.impl.comm; import org.aion.p2p.INode; @@ -47,17 +44,17 @@ public class NodeMgr implements INodeMgr { // // )); - private final BlockingQueue tempNodes = new LinkedBlockingQueue<>(); - private final Map outboundNodes = new ConcurrentHashMap<>(); - private final Map inboundNodes = new ConcurrentHashMap<>(); - private final Map activeNodes = new ConcurrentHashMap<>(); + private final BlockingQueue tempNodes = new LinkedBlockingQueue<>(); + private final Map outboundNodes = new ConcurrentHashMap<>(); + private final Map inboundNodes = new ConcurrentHashMap<>(); + private final Map activeNodes = new ConcurrentHashMap<>(); public NodeMgr(int _maxActiveNodes, int _maxTempNodes){ this.maxActiveNodes = _maxActiveNodes; this.maxTempNodes = _maxTempNodes; } - public Map getOutboundNodes() { + public Map getOutboundNodes() { return outboundNodes; } @@ -76,6 +73,7 @@ private static String bytesToHex(byte[] bytes) { /** * @param selfShortId String */ + @Override public String dumpNodeInfo(String selfShortId) { StringBuilder sb = new StringBuilder(); sb.append("\n"); @@ -85,7 +83,7 @@ public String dumpNodeInfo(String selfShortId) { sb.append(String.format( "temp[%3d] inbound[%3d] outbound[%3d] active[%3d] s - seed node, td - total difficulty, # - block number, bv - binary version\n", tempNodesSize(), inboundNodes.size(), outboundNodes.size(), activeNodes.size())); - List sorted = new ArrayList<>(activeNodes.values()); + List sorted = new ArrayList<>(activeNodes.values()); if (sorted.size() > 0) { sb.append("\n s"); // id & seed sb.append(" td"); @@ -107,7 +105,7 @@ public String dumpNodeInfo(String selfShortId) { } else return tdCompare; }); - for (Node n : sorted) { + for (INode n : sorted) { try { sb.append(String.format("id:%6s %c %16s %10d %64s %15s %5d %8s %15s %12s\n", n.getIdShort(), @@ -130,10 +128,12 @@ public String dumpNodeInfo(String selfShortId) { /** * @param _ip String */ + @Override public void seedIpAdd(String _ip) { this.seedIps.add(_ip); } + @Override public boolean isSeedIp(String _ip) { return this.seedIps.contains(_ip); } @@ -141,64 +141,80 @@ public boolean isSeedIp(String _ip) { /** * @param _n Node */ - public synchronized void addTempNode(final Node _n) { - if(tempNodes.size() < maxTempNodes) + @Override + public synchronized void addTempNode(final INode _n) { + if(tempNodes.size() < maxTempNodes) { tempNodes.add(_n); + } } - public void addInboundNode(final Node _n) { + @Override + public void addInboundNode(final INode _n) { inboundNodes.put(_n.getChannel().hashCode(), _n); } - public void addOutboundNode(final Node _n) { + @Override + public void addOutboundNode(final INode _n) { outboundNodes.put(_n.getIdHash(), _n); } - public Node tempNodesTake() throws InterruptedException { + @Override + public INode tempNodesTake() throws InterruptedException { return tempNodes.take(); } + @Override public int tempNodesSize() { return tempNodes.size(); } + @Override public int activeNodesSize() { return activeNodes.size(); } + @Override public boolean hasActiveNode(int k) { return activeNodes.containsKey(k); } - public Node getActiveNode(int k) { + @Override + public INode getActiveNode(int k) { return activeNodes.get(k); } - public Node getInboundNode(int k) { + @Override + public INode getInboundNode(int k) { return inboundNodes.get(k); } - public Node getOutboundNode(int k) { + @Override + public INode getOutboundNode(int k) { return outboundNodes.get(k); } - public Node allocNode(String ip, int p0) { - Node n = new Node(ip, p0); - if (seedIps.contains(ip)) + @Override + public INode allocNode(String ip, int p0) { + INode n = new Node(ip, p0); + if (seedIps.contains(ip)) { n.setFromBootList(true); + } return n; } - public List getActiveNodesList() { + @Override + public List getActiveNodesList() { return new ArrayList(activeNodes.values()); } + @Override public Map getActiveNodesMap() { synchronized(activeNodes){ return new HashMap(activeNodes); } } + @Override public INode getRandom() { int nodesCount = activeNodes.size(); if (nodesCount > 0) { @@ -243,7 +259,7 @@ private boolean activeIpAllow(String _ip){ // Attention: move node from container need sync to avoid node not belong to // any container during transit. public synchronized void moveInboundToActive(int _channelHashCode, final IP2pMgr _p2pMgr) { - Node node = inboundNodes.remove(_channelHashCode); + INode node = inboundNodes.remove(_channelHashCode); if (node != null) { if(activeNodes.size() >= maxActiveNodes){ @@ -259,13 +275,18 @@ public synchronized void moveInboundToActive(int _channelHashCode, final IP2pMgr node.setConnection("inbound"); node.setFromBootList(seedIps.contains(node.getIpStr())); INode previous = activeNodes.putIfAbsent(node.getIdHash(), node); - if (previous != null) - _p2pMgr.closeSocket(node.getChannel(), "inbound -> active, node " + previous.getIdShort() + " exits"); - else if(!activeIpAllow(node.getIpStr())) - _p2pMgr.closeSocket(node.getChannel(), "inbound -> active, ip " + node.getIpStr() + " exits"); - else { - if (_p2pMgr.isShowLog()) - System.out.println(" active node-id=" + node.getIdShort() + " ip=" + node.getIpStr() + ">"); + if (previous != null) { + _p2pMgr.closeSocket(node.getChannel(), + "inbound -> active, node " + previous.getIdShort() + " exits"); + } else if (!activeIpAllow(node.getIpStr())) { + _p2pMgr.closeSocket(node.getChannel(), + "inbound -> active, ip " + node.getIpStr() + " exits"); + } else { + if (_p2pMgr.isShowLog()) { + System.out.println( + " active node-id=" + node.getIdShort() + " ip=" + node + .getIpStr() + ">"); + } } } } @@ -278,7 +299,7 @@ else if(!activeIpAllow(node.getIpStr())) // Attention: move node from container need sync to avoid node not belong to // any container during transit. public synchronized void moveOutboundToActive(int _nodeIdHash, String _shortId, final IP2pMgr _p2pMgr) { - Node node = outboundNodes.remove(_nodeIdHash); + INode node = outboundNodes.remove(_nodeIdHash); if (node != null) { if(activeNodes.size() >= maxActiveNodes){ @@ -296,8 +317,11 @@ public synchronized void moveOutboundToActive(int _nodeIdHash, String _shortId, if (previous != null) _p2pMgr.closeSocket(node.getChannel(), "outbound -> active, node " + previous.getIdShort() + " exits"); else { - if (_p2pMgr.isShowLog()) - System.out.println(" active node-id=" + _shortId + " ip=" + node.getIpStr() + ">"); + if (_p2pMgr.isShowLog()) { + System.out.println( + " active node-id=" + _shortId + " ip=" + node.getIpStr() + + ">"); + } } } } @@ -306,7 +330,7 @@ public void timeoutInbound(final IP2pMgr _p2pMgr) { Iterator inboundIt = inboundNodes.keySet().iterator(); while (inboundIt.hasNext()) { int key = (int) inboundIt.next(); - Node node = inboundNodes.get(key); + INode node = inboundNodes.get(key); if (System.currentTimeMillis() - node.getTimestamp() > TIMEOUT_INBOUND_NODES) { _p2pMgr.closeSocket(node.getChannel(), "inbound-timeout ip=" + node.getIpStr()); inboundIt.remove(); @@ -327,7 +351,7 @@ public void timeoutActive(IP2pMgr _p2pMgr) { Iterator activeIt = activeNodes.keySet().iterator(); while (activeIt.hasNext()) { int key = (int) activeIt.next(); - Node node = getActiveNode(key); + INode node = getActiveNode(key); if (now - node.getTimestamp() > timeout) { _p2pMgr.closeSocket(node.getChannel(), "active-timeout node=" + node.getIdShort() + " ip=" + node.getIpStr()); @@ -342,15 +366,17 @@ public void timeoutActive(IP2pMgr _p2pMgr) { } public void dropActive(int nodeIdHash, final IP2pMgr _p2pMgr, String _reason) { - Node node = activeNodes.remove(nodeIdHash); - if (node == null) + INode node = activeNodes.remove(nodeIdHash); + if (node == null) { return; + } _p2pMgr.closeSocket(node.getChannel(), _reason); } /** * @param _p2pMgr P2pMgr */ + @Override public void shutdown(final IP2pMgr _p2pMgr) { try { @@ -374,10 +400,11 @@ public void shutdown(final IP2pMgr _p2pMgr) { } } + @Override public void ban(int _nodeIdHash) { - Node node = activeNodes.get(_nodeIdHash); + INode node = activeNodes.get(_nodeIdHash); if (node != null) { - node.peerMetric.ban(); + node.getPeerMetric().ban(); } } } diff --git a/modP2pImpl/src/org/aion/p2p/impl/comm/PeerMetric.java b/modP2pImpl/src/org/aion/p2p/impl/comm/PeerMetric.java index 227553216c..f80999b6a3 100644 --- a/modP2pImpl/src/org/aion/p2p/impl/comm/PeerMetric.java +++ b/modP2pImpl/src/org/aion/p2p/impl/comm/PeerMetric.java @@ -1,58 +1,77 @@ /* * Copyright (c) 2017-2018 Aion foundation. * - * This file is part of the aion network project. + * This file is part of the aion network project. * - * The aion network project is free software: you can redistribute it - * and/or modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation, either version 3 of - * the License, or any later version. + * The aion network project is free software: you can redistribute it + * and/or modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or any later version. * - * The aion network project is distributed in the hope that it will - * be useful, but WITHOUT ANY WARRANTY; without even the implied - * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. - * See the GNU General Public License for more details. + * The aion network project is distributed in the hope that it will + * be useful, but WITHOUT ANY WARRANTY; without even the implied + * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU General Public License for more details. * - * You should have received a copy of the GNU General Public License - * along with the aion network project source files. - * If not, see . - * - * Contributors to the aion source files in decreasing order of code volume: - * - * Aion foundation. + * You should have received a copy of the GNU General Public License + * along with the aion network project source files. + * If not, see . * + * Contributors: + * Aion foundation. */ - package org.aion.p2p.impl.comm; +import org.aion.p2p.IPeerMetric; import org.aion.p2p.P2pConstant; -public final class PeerMetric { +public final class PeerMetric implements IPeerMetric { private int metricFailedConn; private long metricFailedConnTs; private long metricBanConnTs; + /** + * Returns true only if we should not accept any more connections. + */ + @Override public boolean shouldNotConn() { return (metricFailedConn > P2pConstant.STOP_CONN_AFTER_FAILED_CONN && ((System.currentTimeMillis() - metricFailedConnTs) > P2pConstant.FAILED_CONN_RETRY_INTERVAL)) || ((System.currentTimeMillis() - metricBanConnTs) < P2pConstant.BAN_CONN_RETRY_INTERVAL); } + /** + * Increments the failed connection counter. + */ + @Override public void incFailedCount() { metricFailedConn++; metricFailedConnTs = System.currentTimeMillis(); } + /** + * Decrements the failed connection counter. + */ + @Override public void decFailedCount() { if (metricFailedConn > 0) metricFailedConn--; } - void ban() { + /** + * Sets the current time for tracking a banned connection. + */ + @Override + public void ban() { metricBanConnTs = System.currentTimeMillis(); } + /** + * Returns true only if the time between now and the last ban is greater than the banned + * connection retry interval. + */ + @Override public boolean notBan() { return ((System.currentTimeMillis() - metricBanConnTs) > P2pConstant.BAN_CONN_RETRY_INTERVAL); } diff --git a/modP2pImpl/src/org/aion/p2p/impl/zero/msg/ResActiveNodes.java b/modP2pImpl/src/org/aion/p2p/impl/zero/msg/ResActiveNodes.java index bd1b7e6ae8..e6e8534c44 100644 --- a/modP2pImpl/src/org/aion/p2p/impl/zero/msg/ResActiveNodes.java +++ b/modP2pImpl/src/org/aion/p2p/impl/zero/msg/ResActiveNodes.java @@ -1,28 +1,25 @@ /* * Copyright (c) 2017-2018 Aion foundation. * - * This file is part of the aion network project. + * This file is part of the aion network project. * - * The aion network project is free software: you can redistribute it - * and/or modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation, either version 3 of - * the License, or any later version. + * The aion network project is free software: you can redistribute it + * and/or modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or any later version. * - * The aion network project is distributed in the hope that it will - * be useful, but WITHOUT ANY WARRANTY; without even the implied - * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. - * See the GNU General Public License for more details. + * The aion network project is distributed in the hope that it will + * be useful, but WITHOUT ANY WARRANTY; without even the implied + * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU General Public License for more details. * - * You should have received a copy of the GNU General Public License - * along with the aion network project source files. - * If not, see . - * - * Contributors to the aion source files in decreasing order of code volume: - * - * Aion foundation. + * You should have received a copy of the GNU General Public License + * along with the aion network project source files. + * If not, see . * + * Contributors: + * Aion foundation. */ - package org.aion.p2p.impl.zero.msg; import java.nio.ByteBuffer; @@ -43,7 +40,7 @@ */ public final class ResActiveNodes extends Msg { - private final List nodes; + private final List nodes; private int count; @@ -58,19 +55,20 @@ public final class ResActiveNodes extends Msg { * @param _nodes * List */ - public ResActiveNodes(final List _nodes) { + public ResActiveNodes(final List _nodes) { super(Ver.V0, Ctrl.NET, Act.RES_ACTIVE_NODES); this.count = Math.min(MAX_NODES, _nodes.size()); - if (this.count > 0) + if (this.count > 0) { this.nodes = _nodes.subList(0, this.count); - else + } else { this.nodes = new ArrayList<>(); + } } /** * @return List */ - public List getNodes() { + public List getNodes() { return this.nodes; } @@ -80,9 +78,9 @@ public List getNodes() { * @return ResActiveNodes */ public static ResActiveNodes decode(final byte[] _bytes) { - if (_bytes == null || _bytes.length == 0 || (_bytes.length - 1) % NODE_BYTES_LENGTH != 0) + if (_bytes == null || _bytes.length == 0 || (_bytes.length - 1) % NODE_BYTES_LENGTH != 0) { return null; - else { + } else { try{ @@ -90,17 +88,18 @@ public static ResActiveNodes decode(final byte[] _bytes) { int count = buf.get(); // fix bug: https://github.com/aionnetwork/aion/issues/390 - if (_bytes.length != count * NODE_BYTES_LENGTH + 1) + if (_bytes.length != count * NODE_BYTES_LENGTH + 1) { return null; + } - ArrayList activeNodes = new ArrayList<>(); + ArrayList activeNodes = new ArrayList<>(); for (int i = 0; i < count; i++) { byte[] nodeIdBytes = new byte[36]; buf.get(nodeIdBytes); byte[] ipBytes = new byte[8]; buf.get(ipBytes); int port = buf.getInt(); - Node n = new Node(false, nodeIdBytes, ipBytes, port); + INode n = new Node(false, nodeIdBytes, ipBytes, port); activeNodes.add(n); } return new ResActiveNodes(activeNodes); diff --git a/modP2pImpl/src/org/aion/p2p/impl1/P2pMgr.java b/modP2pImpl/src/org/aion/p2p/impl1/P2pMgr.java index 8e7ef38e0b..1bbd677803 100644 --- a/modP2pImpl/src/org/aion/p2p/impl1/P2pMgr.java +++ b/modP2pImpl/src/org/aion/p2p/impl1/P2pMgr.java @@ -1,563 +1,110 @@ /* * Copyright (c) 2017-2018 Aion foundation. * - * This file is part of the aion network project. + * This file is part of the aion network project. * - * The aion network project is free software: you can redistribute it - * and/or modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation, either version 3 of - * the License, or any later version. + * The aion network project is free software: you can redistribute it + * and/or modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or any later version. * - * The aion network project is distributed in the hope that it will - * be useful, but WITHOUT ANY WARRANTY; without even the implied - * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. - * See the GNU General Public License for more details. + * The aion network project is distributed in the hope that it will + * be useful, but WITHOUT ANY WARRANTY; without even the implied + * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU General Public License for more details. * - * You should have received a copy of the GNU General Public License - * along with the aion network project source files. - * If not, see . - * - * Contributors to the aion source files in decreasing order of code volume: - * - * Aion foundation. + * You should have received a copy of the GNU General Public License + * along with the aion network project source files. + * If not, see . * + * Contributors: + * Aion foundation. */ - package org.aion.p2p.impl1; -import org.aion.p2p.P2pConstant; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketException; +import java.nio.channels.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; import org.aion.p2p.*; +import org.aion.p2p.P2pConstant; import org.aion.p2p.impl.TaskRequestActiveNodes; import org.aion.p2p.impl.TaskUPnPManager; -import org.aion.p2p.impl.comm.Act; import org.aion.p2p.impl.comm.Node; import org.aion.p2p.impl.comm.NodeMgr; import org.aion.p2p.impl.zero.msg.*; +import org.aion.p2p.impl1.tasks.MsgIn; +import org.aion.p2p.impl1.tasks.MsgOut; +import org.aion.p2p.impl1.tasks.TaskReceive; +import org.aion.p2p.impl1.tasks.TaskSend; +import org.aion.p2p.impl1.tasks.TaskClear; +import org.aion.p2p.impl1.tasks.TaskConnectPeers; +import org.aion.p2p.impl1.tasks.TaskInbound; +import org.aion.p2p.impl1.tasks.TaskStatus; import org.apache.commons.collections4.map.LRUMap; -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * @author Chris p2p://{uuid}@{ip}:{port} - */ +/** @author Chris p2p://{uuid}@{ip}:{port} */ public final class P2pMgr implements IP2pMgr { - - private final static int PERIOD_SHOW_STATUS = 10000; - private final static int PERIOD_REQUEST_ACTIVE_NODES = 1000; - private final static int PERIOD_CONNECT_OUTBOUND = 1000; - private final static int PERIOD_CLEAR = 20000; - private final static int PERIOD_UPNP_PORT_MAPPING = 3600000; - - private final static int TIMEOUT_OUTBOUND_CONNECT = 10000; - private final static int TIMEOUT_OUTBOUND_NODES = 20000; - private final static int TIMEOUT_MSG_READ = 10000; - - private final int maxTempNodes; - private final int maxActiveNodes; - - private final boolean syncSeedsOnly; - private final boolean showStatus; - private final boolean showLog; - private final int selfNetId; - private final String selfRevision; - private final byte[] selfNodeId; - private final int selfNodeIdHash; - private final String selfShortId; - private final byte[] selfIp; - private final int selfPort; - private final boolean upnpEnable; - + private static final int PERIOD_SHOW_STATUS = 10000; + private static final int PERIOD_REQUEST_ACTIVE_NODES = 1000; + private static final int PERIOD_UPNP_PORT_MAPPING = 3600000; + private static final int TIMEOUT_MSG_READ = 10000; + + private final int maxTempNodes, maxActiveNodes, selfNetId, selfNodeIdHash, selfPort; + private final boolean syncSeedsOnly, showStatus, showLog, upnpEnable; + private final String selfRevision, selfShortId; + private final byte[] selfNodeId, selfIp; + private final INodeMgr nodeMgr; private final Map> handlers = new ConcurrentHashMap<>(); private final Set versions = new HashSet<>(); - - private final NodeMgr nodeMgr; + private final Map errCnt = Collections.synchronizedMap(new LRUMap<>(128)); private ServerSocketChannel tcpServer; private Selector selector; - - private ScheduledThreadPoolExecutor scheduledWorkers; - - private final Map errCnt = Collections.synchronizedMap(new LRUMap<>(128)); - + private ScheduledExecutorService scheduledWorkers; private int errTolerance; - - // TODO: need refactor by passing the parameter in the later version. - private final static int txBroadCastRoute = (Ctrl.SYNC << 8) + 6; // ((Ver.V0 << 16) + (Ctrl.SYNC << 8) + 6); - - enum Dest { - INBOUND, OUTBOUND, ACTIVE - } - - private static class MsgOut { - MsgOut(int _nodeId, String _displayId, Msg _msg, Dest _dest) { - nodeId = _nodeId; - displayId = _displayId; - msg = _msg; - dest = _dest; - timestamp = System.currentTimeMillis(); - } - - int nodeId; - String displayId; - Msg msg; - Dest dest; - long timestamp; - } - - private static class MsgIn { - MsgIn(int nid, String nsid, int route, byte[] msg) { - this.nid = nid; - this.nsid = nsid; - this.route = route; - this.msg = msg; - } - - int nid; - String nsid; - int route; - byte[] msg; - } - - private LinkedBlockingQueue sendMsgQue = new LinkedBlockingQueue<>(); - - private LinkedBlockingQueue receiveMsgQue = new LinkedBlockingQueue<>(); - + private BlockingQueue sendMsgQue = new LinkedBlockingQueue<>(); + private BlockingQueue receiveMsgQue = new LinkedBlockingQueue<>(); private AtomicBoolean start = new AtomicBoolean(true); private static ReqHandshake1 cachedReqHandshake1; private static ResHandshake1 cachedResHandshake1; - private final class TaskInbound implements Runnable { - @Override - public void run() { - - // read buffer pre-alloc. @ max_body_size - ByteBuffer readBuf = ByteBuffer.allocate(P2pConstant.MAX_BODY_SIZE); - - while (start.get()) { - - try { - Thread.sleep(0, 1); - } catch (Exception e) { - } - - int num; - try { - num = selector.selectNow(); - } catch (IOException e) { - if (showLog) - System.out.println(""); - continue; - } - - if (num == 0) { - continue; - } - - Iterator keys = selector.selectedKeys().iterator(); - - while (keys.hasNext() && (num-- > 0)) { - - final SelectionKey sk = keys.next(); - keys.remove(); - - try{ - - if (!sk.isValid()) - continue; - - if (sk.isAcceptable()) - accept(); - - if (sk.isReadable()) { - - readBuf.rewind(); - - ChannelBuffer chanBuf = (ChannelBuffer) (sk.attachment()); - try { - - int ret; - int cnt = 0; - - while ((ret = ((SocketChannel) sk.channel()).read(readBuf)) > 0) { - cnt += ret; - } - - // read empty select key, continue. - if (cnt <= 0) { - continue; - } - - int prevCnt = cnt + chanBuf.buffRemain; - ByteBuffer forRead; - - if (chanBuf.buffRemain != 0) { - byte[] alreadyRead = new byte[cnt]; - - readBuf.position(0); - readBuf.get(alreadyRead); - forRead = ByteBuffer.allocate(prevCnt); - forRead.put(chanBuf.remainBuffer); - forRead.put(alreadyRead); - } else { - forRead = readBuf; - } - - do { - cnt = read(sk, forRead, prevCnt); - - if (prevCnt == cnt) { - break; - } else - prevCnt = cnt; - - } while (cnt > 0); - - // check if really read data. - if (cnt > prevCnt) { - chanBuf.buffRemain = 0; - throw new P2pException( - "IO read overflow! suppose read:" + prevCnt + " real left:" + cnt); - } - - chanBuf.buffRemain = cnt; - - if (cnt == 0) { - readBuf.rewind(); - } else { - // there are no perfect cycling buffer in jdk - // yet. - // simply just buff move for now. - // @TODO: looking for more efficient way. - - int currPos = forRead.position(); - chanBuf.remainBuffer = new byte[cnt]; - forRead.position(currPos - cnt); - forRead.get(chanBuf.remainBuffer); - readBuf.rewind(); - } - - } catch (NullPointerException e) { - closeSocket((SocketChannel) sk.channel(), chanBuf.displayId + "-read-msg-null-exception"); - chanBuf.isClosed.set(true); - } catch (P2pException e) { - closeSocket((SocketChannel) sk.channel(), chanBuf.displayId + "-read-msg-p2p-exception"); - chanBuf.isClosed.set(true); - - } catch (ClosedChannelException e) { - closeSocket((SocketChannel) sk.channel(), - chanBuf.displayId + "-read-msg-closed-channel-exception"); - - } catch (IOException e) { - closeSocket((SocketChannel) sk.channel(), - chanBuf.displayId + "-read-msg-io-exception: " + e.getMessage()); - chanBuf.isClosed.set(true); - - } catch (CancelledKeyException e) { - chanBuf.isClosed.set(true); - closeSocket((SocketChannel) sk.channel(), - chanBuf.displayId + "-read-msg-key-cancelled-exception"); - } catch (Exception e) { - if (showLog) - System.out.println(""); - } - } - } catch(Exception ex) { - if(showLog) { - System.out.println(""); - ex.printStackTrace(); - } - } - } - } - if (showLog) - System.out.println(""); - } - } - - // hash mapping channel id to write thread. - private int hash2Lane(int in) { - in ^= in >> (32 - 5); - in ^= in >> (32 - 10); - in ^= in >> (32 - 15); - in ^= in >> (32 - 20); - in ^= in >> (32 - 25); - return in & 0b11111; - } - - private final class TaskSend implements Runnable { - - static final int TOTAL_LANE = (1 << 5) - 1; - int lane; - - TaskSend(int _lane) { - this.lane = _lane; - } - - @Override - public void run() { - while (P2pMgr.this.start.get()) { - try { - MsgOut mo = sendMsgQue.take(); - // if timeout , throw away this msg. - long now = System.currentTimeMillis(); - if (now - mo.timestamp > P2pConstant.WRITE_MSG_TIMEOUT) { - if (showLog) - System.out.println(""); - continue; - } - - // if not belong to current lane, put it back. - int targetLane = hash2Lane(mo.nodeId); - if (targetLane != lane) { - sendMsgQue.offer(mo); - continue; - } - - Node node = null; - switch (mo.dest) { - case ACTIVE: - node = nodeMgr.getActiveNode(mo.nodeId); - break; - case INBOUND: - node = nodeMgr.getInboundNode(mo.nodeId); - break; - case OUTBOUND: - node = nodeMgr.getOutboundNode(mo.nodeId); - break; - } - - if (node != null) { - SelectionKey sk = node.getChannel().keyFor(selector); - if (sk != null) { - Object attachment = sk.attachment(); - if (attachment != null) { - TaskWrite tw = new TaskWrite(showLog, node.getIdShort(), node.getChannel(), mo.msg, - (ChannelBuffer) attachment, P2pMgr.this); - tw.run(); - } - } - } else { - if (showLog) - System.out.println("" + mo.displayId + " node-not-exit"); - } - } catch (InterruptedException e) { - if (showLog) - System.out.println(""); - return; - } catch (Exception e) { - if (showLog) - e.printStackTrace(); - } - } - } - } - - private final class TaskReceive implements Runnable { - @Override - public void run() { - - while (P2pMgr.this.start.get()) { - try { - MsgIn mi = receiveMsgQue.take(); - - List hs = handlers.get(mi.route); - if (hs == null) - continue; - for (Handler hlr : hs) { - if (hlr == null) - continue; - - try { - hlr.receive(mi.nid, mi.nsid, mi.msg); - } catch (Exception e) { - if (showLog) - e.printStackTrace(); - } - } - } catch (InterruptedException e) { - if (showLog) - System.out.println(""); - return; - } catch (Exception e) { - if (showLog) - e.printStackTrace(); - } - } - } - } - - private final class TaskStatus implements Runnable { - @Override - public void run() { - Thread.currentThread().setName("p2p-ts"); - String status = nodeMgr.dumpNodeInfo(selfShortId); - System.out.println(status); - System.out.println( - "--------------------------------------------------------------------------------------------------------------------------------------------------------------------"); - System.out.println("recv queue [" + receiveMsgQue.size() + "] send queue [" + sendMsgQue.size() + "]\n"); - } - } - - private final class TaskConnectPeers implements Runnable { - @Override - public void run() { - Thread.currentThread().setName("p2p-tcp"); - while (start.get()) { - try { - Thread.sleep(PERIOD_CONNECT_OUTBOUND); - } catch (InterruptedException e) { - if (showLog) - System.out.println(""); - } - - if (nodeMgr.activeNodesSize() >= maxActiveNodes) { - if (showLog) - System.out.println(""); - continue; - } - - Node node; - try { - node = nodeMgr.tempNodesTake(); - if (nodeMgr.isSeedIp(node.getIpStr())) - node.setFromBootList(true); - if (node.getIfFromBootList()) - nodeMgr.addTempNode(node); - // if (node.peerMetric.shouldNotConn()) { - // continue; - // } - } catch (InterruptedException e) { - if (showLog) - System.out.println(""); - return; - } catch (Exception e) { - if (showLog) - e.printStackTrace(); - continue; - } - int nodeIdHash = node.getIdHash(); - if (!nodeMgr.getOutboundNodes().containsKey(nodeIdHash) && !nodeMgr.hasActiveNode(nodeIdHash)) { - int _port = node.getPort(); - try { - SocketChannel channel = SocketChannel.open(); - - channel.socket().connect(new InetSocketAddress(node.getIpStr(), _port), - TIMEOUT_OUTBOUND_CONNECT); - configChannel(channel); - - if (channel.finishConnect() && channel.isConnected()) { - - if (showLog) - System.out.println(""); - - SelectionKey sk = channel.register(selector, SelectionKey.OP_READ); - ChannelBuffer rb = new ChannelBuffer(P2pMgr.this.showLog); - rb.displayId = node.getIdShort(); - rb.nodeIdHash = nodeIdHash; - sk.attach(rb); - - node.refreshTimestamp(); - node.setChannel(channel); - nodeMgr.addOutboundNode(node); - - if (showLog) - System.out.println(" id=" + node.getIdShort() + " ip=" - + node.getIpStr() + ">"); - sendMsgQue.offer(new MsgOut(node.getIdHash(), node.getIdShort(), cachedReqHandshake1, - Dest.OUTBOUND)); - // node.peerMetric.decFailedCount(); - - } else { - if (showLog) - System.out.println(""); - channel.close(); - // node.peerMetric.incFailedCount(); - } - } catch (IOException e) { - if (showLog) - System.out.println(""); - // node.peerMetric.incFailedCount(); - } catch (Exception e) { - if (showLog) - e.printStackTrace(); - } - } - } - } - } - - private final class TaskClear implements Runnable { - @Override - public void run() { - Thread.currentThread().setName("p2p-clr"); - while (start.get()) { - try { - Thread.sleep(PERIOD_CLEAR); - - nodeMgr.timeoutInbound(P2pMgr.this); - - Iterator outboundIt = nodeMgr.getOutboundNodes().keySet().iterator(); - while (outboundIt.hasNext()) { - - Object obj = outboundIt.next(); - - if (obj == null) - continue; - - int nodeIdHash = (int) obj; - Node node = nodeMgr.getOutboundNodes().get(nodeIdHash); - - if (node == null) - continue; - - if (System.currentTimeMillis() - node.getTimestamp() > TIMEOUT_OUTBOUND_NODES) { - closeSocket(node.getChannel(), "outbound-timeout node=" + node.getIdShort()); - outboundIt.remove(); - } - } - - nodeMgr.timeoutActive(P2pMgr.this); - - } catch (Exception e) { - } - } - } + public enum Dest { + INBOUND, + OUTBOUND, + ACTIVE } /** - * @param _nodeId - * byte[36] - * @param _ip - * String - * @param _port - * int - * @param _bootNodes - * String[] - * @param _upnpEnable - * boolean - * @param _maxTempNodes - * int - * @param _maxActiveNodes - * int - * @param _showStatus - * boolean - * @param _showLog - * boolean + * @param _nodeId byte[36] + * @param _ip String + * @param _port int + * @param _bootNodes String[] + * @param _upnpEnable boolean + * @param _maxTempNodes int + * @param _maxActiveNodes int + * @param _showStatus boolean + * @param _showLog boolean */ - public P2pMgr(int _netId, String _revision, String _nodeId, String _ip, int _port, final String[] _bootNodes, - boolean _upnpEnable, int _maxTempNodes, int _maxActiveNodes, boolean _showStatus, boolean _showLog, - boolean _bootlistSyncOnly, int _errorTolerance) { + public P2pMgr( + int _netId, + String _revision, + String _nodeId, + String _ip, + int _port, + final String[] _bootNodes, + boolean _upnpEnable, + int _maxTempNodes, + int _maxActiveNodes, + boolean _showStatus, + boolean _showLog, + boolean _bootlistSyncOnly, + int _errorTolerance) { + this.selfNetId = _netId; this.selfRevision = _revision; this.selfNodeId = _nodeId.getBytes(); @@ -588,397 +135,6 @@ public P2pMgr(int _netId, String _revision, String _nodeId, String _ip, int _por cachedResHandshake1 = new ResHandshake1(true, this.selfRevision); } - /** - * @param _node - * Node - * @return boolean - */ - private boolean validateNode(final Node _node) { - if (_node != null) { - boolean notSelfId = !Arrays.equals(_node.getId(), this.selfNodeId); - boolean notSameIpOrPort = !(Arrays.equals(selfIp, _node.getIp()) && selfPort == _node.getPort()); - boolean notActive = !nodeMgr.hasActiveNode(_node.getIdHash()); - boolean notOutbound = !nodeMgr.getOutboundNodes().containsKey(_node.getIdHash()); - return notSelfId && notSameIpOrPort && notActive && notOutbound; - } else - return false; - } - - /** - * @param _channel - * SocketChannel TODO: check option - */ - private void configChannel(final SocketChannel _channel) throws IOException { - _channel.configureBlocking(false); - _channel.socket().setSoTimeout(TIMEOUT_MSG_READ); - - // set buffer to 256k. - _channel.socket().setReceiveBufferSize(P2pConstant.RECV_BUFFER_SIZE); - _channel.socket().setSendBufferSize(P2pConstant.SEND_BUFFER_SIZE); - // _channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); - // _channel.setOption(StandardSocketOptions.TCP_NODELAY, true); - // _channel.setOption(StandardSocketOptions.SO_REUSEADDR, true); - } - - /** - * @param _sc - * SocketChannel - */ - public void closeSocket(final SocketChannel _sc, String _reason) { - if (showLog) - System.out.println(""); - - try { - SelectionKey sk = _sc.keyFor(selector); - _sc.close(); - if (sk != null) - sk.cancel(); - } catch (IOException e) { - if (showLog) - System.out.println(""); - } - } - - private void accept() { - SocketChannel channel; - try { - - if (nodeMgr.activeNodesSize() >= this.maxActiveNodes) - return; - - channel = tcpServer.accept(); - configChannel(channel); - - SelectionKey sk = channel.register(selector, SelectionKey.OP_READ); - sk.attach(new ChannelBuffer(this.showLog)); - - String ip = channel.socket().getInetAddress().getHostAddress(); - int port = channel.socket().getPort(); - - if (syncSeedsOnly && nodeMgr.isSeedIp(ip)) { - channel.close(); - return; - } - - Node node = nodeMgr.allocNode(ip, 0); - node.setChannel(channel); - nodeMgr.addInboundNode(node); - - if (showLog) - System.out.println(""); - - } catch (IOException e) { - if (showLog) - System.out.println(""); - } - } - - /** - * SocketChannel - * - * @throws IOException - * IOException - */ - private int readHeader(final ChannelBuffer _cb, ByteBuffer readBuffer, int cnt) throws IOException { - - if (cnt < Header.LEN) - return cnt; - - int origPos = readBuffer.position(); - - int startP = origPos - cnt; - - readBuffer.position(startP); - - _cb.readHead(readBuffer); - - readBuffer.position(origPos); - - return cnt - Header.LEN; - - } - - /** - * SocketChannel - * - * @throws IOException - * IOException - */ - private int readBody(final ChannelBuffer _cb, ByteBuffer readBuffer, int cnt) throws IOException { - - int bodyLen = _cb.header.getLen(); - - // some msg have nobody. - if (bodyLen == 0) { - _cb.body = new byte[0]; - return cnt; - } - - if (cnt < bodyLen) - return cnt; - - int origPos = readBuffer.position(); - int startP = origPos - cnt; - - readBuffer.position(startP); - - _cb.readBody(readBuffer); - - readBuffer.position(origPos); - - return cnt - bodyLen; - } - - /** - * @param _sk - * SelectionKey - * @throws IOException - * IOException - */ - private int read(final SelectionKey _sk, ByteBuffer _readBuffer, int _cnt) throws IOException { - - int currCnt = 0; - - if (_sk.attachment() == null) { - throw new P2pException("attachment is null"); - } - ChannelBuffer rb = (ChannelBuffer) _sk.attachment(); - - // read header - if (!rb.isHeaderCompleted()) { - currCnt = readHeader(rb, _readBuffer, _cnt); - } else { - currCnt = _cnt; - } - - // read body - if (rb.isHeaderCompleted() && !rb.isBodyCompleted()) { - currCnt = readBody(rb, _readBuffer, currCnt); - } - - if (!rb.isBodyCompleted()) - return currCnt; - - Header h = rb.header; - - byte[] bodyBytes = rb.body; - rb.refreshHeader(); - rb.refreshBody(); - - short ver = h.getVer(); - byte ctrl = h.getCtrl(); - byte act = h.getAction(); - int route = h.getRoute(); - - boolean underRC = rb.shouldRoute(route, - ((route == txBroadCastRoute) ? P2pConstant.READ_MAX_RATE_TXBC : P2pConstant.READ_MAX_RATE)); - - if (!underRC) { - if (showLog) - System.out.println(""); - return currCnt; - } - - switch (ver) { - case Ver.V0: - switch (ctrl) { - case Ctrl.NET: - try { - handleP2pMsg(_sk, act, bodyBytes); - } catch(Exception ex){ - if(showLog) - System.out.println(""); - } - break; - case Ctrl.SYNC: - - if (!handlers.containsKey(route)) { - if (showLog) - System.out.println(""); - return currCnt; - } - - handleKernelMsg(rb.nodeIdHash, route, bodyBytes); - break; - default: - if (showLog) - System.out.println( - ""); - break; - } - break; - default: - if (showLog) - System.out.println(""); - break; - } - - return currCnt; - - } - - /** - * @return boolean TODO: implementation - */ - private boolean handshakeRuleCheck(int netId) { - - // check net id - if (netId != selfNetId) - return false; - - // check supported protocol versions - return true; - } - - /** - * @param _buffer - * ChannelBuffer - * @param _channelHash - * int - * @param _nodeId - * byte[] - * @param _netId - * int - * @param _port - * int - * @param _revision - * byte[] - *

- * Construct node info after handshake request success - */ - private void handleReqHandshake(final ChannelBuffer _buffer, int _channelHash, final byte[] _nodeId, int _netId, - int _port, final byte[] _revision) { - Node node = nodeMgr.getInboundNode(_channelHash); - if (node != null && node.peerMetric.notBan()) { - if (handshakeRuleCheck(_netId)) { - _buffer.nodeIdHash = Arrays.hashCode(_nodeId); - _buffer.displayId = new String(Arrays.copyOfRange(_nodeId, 0, 6)); - node.setId(_nodeId); - node.setPort(_port); - - // handshake 1 - if (_revision != null) { - String binaryVersion; - try { - binaryVersion = new String(_revision, "UTF-8"); - } catch (UnsupportedEncodingException e) { - binaryVersion = "decode-fail"; - } - node.setBinaryVersion(binaryVersion); - nodeMgr.moveInboundToActive(_channelHash, this); - sendMsgQue.offer(new MsgOut(node.getIdHash(), node.getIdShort(), cachedResHandshake1, Dest.ACTIVE)); - } - - } else { - if (showLog) - System.out.println(""); - } - } - } - - private void handleResHandshake(int _nodeIdHash, String _binaryVersion) { - Node node = nodeMgr.getOutboundNodes().get(_nodeIdHash); - if (node != null && node.peerMetric.notBan()) { - node.refreshTimestamp(); - node.setBinaryVersion(_binaryVersion); - nodeMgr.moveOutboundToActive(node.getIdHash(), node.getIdShort(), this); - } - } - - /** - * @param _sk - * SelectionKey - * @param _act - * ACT - * @param _msgBytes - * byte[] - */ - private void handleP2pMsg(final SelectionKey _sk, byte _act, final byte[] _msgBytes) { - - - ChannelBuffer rb = (ChannelBuffer) _sk.attachment(); - - switch (_act) { - - case Act.REQ_HANDSHAKE: - if (_msgBytes.length > ReqHandshake.LEN) { - ReqHandshake1 reqHandshake1 = ReqHandshake1.decode(_msgBytes); - if (reqHandshake1 != null) { - handleReqHandshake(rb, _sk.channel().hashCode(), reqHandshake1.getNodeId(), - reqHandshake1.getNetId(), reqHandshake1.getPort(), reqHandshake1.getRevision()); - } - } - break; - - case Act.RES_HANDSHAKE: - if (rb.nodeIdHash == 0) - return; - - if (_msgBytes.length > ResHandshake.LEN) { - ResHandshake1 resHandshake1 = ResHandshake1.decode(_msgBytes); - if (resHandshake1 != null && resHandshake1.getSuccess()) - handleResHandshake(rb.nodeIdHash, resHandshake1.getBinaryVersion()); - - } - break; - - case Act.REQ_ACTIVE_NODES: - if (rb.nodeIdHash != 0) { - Node node = nodeMgr.getActiveNode(rb.nodeIdHash); - if (node != null) - sendMsgQue.offer(new MsgOut(node.getIdHash(), node.getIdShort(), - new ResActiveNodes(nodeMgr.getActiveNodesList()), Dest.ACTIVE)); - } - break; - - case Act.RES_ACTIVE_NODES: - if (syncSeedsOnly) - break; - - if (rb.nodeIdHash != 0) { - Node node = nodeMgr.getActiveNode(rb.nodeIdHash); - if (node != null) { - node.refreshTimestamp(); - ResActiveNodes resActiveNodes = ResActiveNodes.decode(_msgBytes); - if (resActiveNodes != null) { - List incomingNodes = resActiveNodes.getNodes(); - for (Node incomingNode : incomingNodes) { - if (nodeMgr.tempNodesSize() >= this.maxTempNodes) - return; - if (validateNode(incomingNode)) - nodeMgr.addTempNode(incomingNode); - } - } - } - } - break; - default: - if (showLog) - System.out.println(""); - break; - } - } - - /** - * @param _nodeIdHash - * int - * @param _route - * int - * @param _msgBytes - * byte[] - */ - private void handleKernelMsg(int _nodeIdHash, int _route, final byte[] _msgBytes) { - Node node = nodeMgr.getActiveNode(_nodeIdHash); - if (node != null) { - int nodeIdHash = node.getIdHash(); - String nodeDisplayId = node.getIdShort(); - node.refreshTimestamp(); - receiveMsgQue.offer(new MsgIn(nodeIdHash, nodeDisplayId, _route, _msgBytes)); - } - } - @Override public void run() { try { @@ -992,69 +148,68 @@ public void run() { tcpServer.socket().bind(new InetSocketAddress(Node.ipBytesToStr(selfIp), selfPort)); tcpServer.register(selector, SelectionKey.OP_ACCEPT); - Thread thrdIn = new Thread(new TaskInbound(), "p2p-in"); + Thread thrdIn = new Thread(getInboundInstance(), "p2p-in"); thrdIn.setPriority(Thread.NORM_PRIORITY); thrdIn.start(); - if (showLog) - this.handlers.forEach((route, callbacks) -> { - Handler handler = callbacks.get(0); - Header h = handler.getHeader(); - System.out.println(""); - }); + if (showLog) { + this.handlers.forEach( + (route, callbacks) -> { + Handler handler = callbacks.get(0); + Header h = handler.getHeader(); + System.out.println( + getRouteMsg(route, h.getVer(), h.getCtrl(), h.getAction(), + handler.getClass().getSimpleName())); + }); + } for (int i = 0; i < TaskSend.TOTAL_LANE; i++) { - Thread thrdOut = new Thread(new TaskSend(i), "p2p-out-" + i); + Thread thrdOut = new Thread(getSendInstance(i), "p2p-out-" + i); thrdOut.setPriority(Thread.NORM_PRIORITY); thrdOut.start(); } for (int i = 0, m = Runtime.getRuntime().availableProcessors(); i < m; i++) { - Thread t = new Thread(new TaskReceive(), "p2p-worker-" + i); + Thread t = new Thread(getReceiveInstance(), "p2p-worker-" + i); t.setPriority(Thread.NORM_PRIORITY); t.start(); } - if (upnpEnable) - scheduledWorkers.scheduleWithFixedDelay(new TaskUPnPManager(selfPort), 1, PERIOD_UPNP_PORT_MAPPING, - TimeUnit.MILLISECONDS); - - if (showStatus) - scheduledWorkers.scheduleWithFixedDelay(new TaskStatus(), 2, PERIOD_SHOW_STATUS, TimeUnit.MILLISECONDS); - - if (!syncSeedsOnly) - scheduledWorkers.scheduleWithFixedDelay(new TaskRequestActiveNodes(this), 5000, - PERIOD_REQUEST_ACTIVE_NODES, TimeUnit.MILLISECONDS); - - Thread thrdClear = new Thread(new TaskClear(), "p2p-clear"); + if (upnpEnable) { + scheduledWorkers.scheduleWithFixedDelay( + new TaskUPnPManager(selfPort), + 1, + PERIOD_UPNP_PORT_MAPPING, + TimeUnit.MILLISECONDS); + } + if (showStatus) { + scheduledWorkers.scheduleWithFixedDelay( + getStatusInstance(), + 2, + PERIOD_SHOW_STATUS, + TimeUnit.MILLISECONDS); + } + if (!syncSeedsOnly) { + scheduledWorkers.scheduleWithFixedDelay( + new TaskRequestActiveNodes(this), + 5000, + PERIOD_REQUEST_ACTIVE_NODES, + TimeUnit.MILLISECONDS); + } + Thread thrdClear = new Thread(getClearInstance(), "p2p-clear"); thrdClear.setPriority(Thread.NORM_PRIORITY); thrdClear.start(); - Thread thrdConn = new Thread(new TaskConnectPeers(), "p2p-conn"); + Thread thrdConn = new Thread(getConnectPeersInstance(), "p2p-conn"); thrdConn.setPriority(Thread.NORM_PRIORITY); thrdConn.start(); - + } catch (SocketException e) { + if (showLog) { System.out.println(" " + e.getMessage()); } } catch (IOException e) { - if (showLog) - System.out.println(""); + if (showLog) { System.out.println(""); } } } - @Override - public INode getRandom() { - return nodeMgr.getRandom(); - } - - @Override - public Map getActiveNodes() { - return new HashMap<>(this.nodeMgr.getActiveNodesMap()); - } - - public int getTempNodesCount() { - return nodeMgr.tempNodesSize(); - } - @Override public void register(final List _cbs) { for (Handler _cb : _cbs) { @@ -1062,9 +217,7 @@ public void register(final List _cbs) { short ver = h.getVer(); byte ctrl = h.getCtrl(); if (Ver.filter(ver) != Ver.UNKNOWN && Ctrl.filter(ctrl) != Ctrl.UNKNOWN) { - if (!versions.contains(ver)) { - versions.add(ver); - } + versions.add(ver); int route = h.getRoute(); List routeHandlers = handlers.get(route); @@ -1079,8 +232,7 @@ public void register(final List _cbs) { } List supportedVersions = new ArrayList<>(versions); - cachedReqHandshake1 = new ReqHandshake1(selfNodeId, selfNetId, this.selfIp, this.selfPort, - this.selfRevision.getBytes(), supportedVersions); + cachedReqHandshake1 = getReqHandshake1Instance(supportedVersions); } @Override @@ -1106,48 +258,201 @@ public List versions() { } @Override - public int chainId() { - return selfNetId; + public void errCheck(int _nodeIdHash, String _displayId) { + int cnt = (errCnt.get(_nodeIdHash) == null ? 1 : (errCnt.get(_nodeIdHash) + 1)); + if (cnt > this.errTolerance) { + ban(_nodeIdHash); + errCnt.put(_nodeIdHash, 0); + if (showLog) { + System.out.println(getBanNodeMsg(_displayId, _nodeIdHash, cnt)); + } + } else { + errCnt.put(_nodeIdHash, cnt); + } } - @Override - public int getSelfIdHash() { - return this.selfNodeIdHash; + /** @param _sc SocketChannel */ + public void closeSocket(final SocketChannel _sc, String _reason) { + if (showLog) { System.out.println(""); } + + try { + SelectionKey sk = _sc.keyFor(selector); + _sc.close(); + if (sk != null) { sk.cancel(); } + } catch (IOException e) { + if (showLog) { System.out.println(""); } + } } /** * Remove an active node if exists. * - * @param _nodeIdHash - * int - * @param _reason - * String + * @param _nodeIdHash int + * @param _reason String */ - void dropActive(int _nodeIdHash, String _reason) { + @Override + public void dropActive(int _nodeIdHash, String _reason) { nodeMgr.dropActive(_nodeIdHash, this, _reason); } - public boolean isShowLog() { - return showLog; + /** + * @param _node Node + * @return boolean + */ + @Override + public boolean validateNode(final INode _node) { + if (_node != null) { + boolean notSelfId = !Arrays.equals(_node.getId(), this.selfNodeId); + boolean notSameIpOrPort = + !(Arrays.equals(selfIp, _node.getIp()) && selfPort == _node.getPort()); + boolean notActive = !nodeMgr.hasActiveNode(_node.getIdHash()); + boolean notOutbound = !nodeMgr.getOutboundNodes().containsKey(_node.getIdHash()); + return notSelfId && notSameIpOrPort && notActive && notOutbound; + } else return false; } + /** @param _channel SocketChannel TODO: check option */ @Override - public void errCheck(int _nodeIdHash, String _displayId) { - int cnt = (errCnt.get(_nodeIdHash) == null ? 1 : (errCnt.get(_nodeIdHash) + 1)); - if (cnt > this.errTolerance) { - ban(_nodeIdHash); - errCnt.put(_nodeIdHash, 0); - if (showLog) { - System.out.println( - ""); - } - } else { - errCnt.put(_nodeIdHash, cnt); - } + public void configChannel(final SocketChannel _channel) throws IOException { + _channel.configureBlocking(false); + _channel.socket().setSoTimeout(TIMEOUT_MSG_READ); + + // set buffer to 256k. + _channel.socket().setReceiveBufferSize(P2pConstant.RECV_BUFFER_SIZE); + _channel.socket().setSendBufferSize(P2pConstant.SEND_BUFFER_SIZE); + // _channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); + // _channel.setOption(StandardSocketOptions.TCP_NODELAY, true); + // _channel.setOption(StandardSocketOptions.SO_REUSEADDR, true); } private void ban(int nodeIdHashcode) { nodeMgr.ban(nodeIdHashcode); nodeMgr.dropActive(nodeIdHashcode, this, "ban"); } -} \ No newline at end of file + + // <------------------------ getter methods below ---------------------------> + + @Override + public INode getRandom() { + return this.nodeMgr.getRandom(); + } + + @Override + public Map getActiveNodes() { + return new HashMap<>(this.nodeMgr.getActiveNodesMap()); + } + + @Override + public int chainId() { + return this.selfNetId; + } + + @Override + public int getSelfIdHash() { + return this.selfNodeIdHash; + } + + @Override + public boolean isShowLog() { + return this.showLog; + } + + public int getTempNodesCount() { + return this.nodeMgr.tempNodesSize(); + } + + @Override + public int getMaxActiveNodes() { + return this.maxActiveNodes; + } + + @Override + public int getMaxTempNodes() { + return this.maxTempNodes; + } + + @Override + public int getSelfNetId() { + return this.selfNetId; + } + + @Override + public boolean isSyncSeedsOnly() { + return this.syncSeedsOnly; + } + + // <---------------------- message and Runnable getters below -------------------------> + + private String getRouteMsg(Integer route, short ver, byte ctrl, byte act, String name) { + return ""; + } + + private String getBanNodeMsg(String id, int hash, int cnt) { + return ""; + } + + private TaskInbound getInboundInstance() { + return new TaskInbound( + this, + this.selector, + this.start, + this.nodeMgr, + this.tcpServer, + this.handlers, + this.sendMsgQue, + cachedResHandshake1, + this.receiveMsgQue); + } + + private TaskSend getSendInstance(int i) { + return new TaskSend( + this, + i, + this.sendMsgQue, + this.start, + this.nodeMgr, + this.selector); + } + + private TaskReceive getReceiveInstance() { + return new TaskReceive( + this.start, + this.receiveMsgQue, + this.handlers, + this.showLog); + } + + private TaskStatus getStatusInstance() { + return new TaskStatus( + this.nodeMgr, + this.selfShortId, + this.sendMsgQue, + this.receiveMsgQue); + } + + private TaskClear getClearInstance() { + return new TaskClear(this, this.nodeMgr, this.start); + } + + private TaskConnectPeers getConnectPeersInstance() { + return new TaskConnectPeers( + this, + this.start, + this.nodeMgr, + this.maxActiveNodes, + this.selector, + this.sendMsgQue, + cachedReqHandshake1); + } + + private ReqHandshake1 getReqHandshake1Instance(List versions) { + return new ReqHandshake1( + selfNodeId, + selfNetId, + this.selfIp, + this.selfPort, + this.selfRevision.getBytes(), + versions); + } +} diff --git a/modP2pImpl/src/org/aion/p2p/impl1/ChannelBuffer.java b/modP2pImpl/src/org/aion/p2p/impl1/tasks/ChannelBuffer.java similarity index 71% rename from modP2pImpl/src/org/aion/p2p/impl1/ChannelBuffer.java rename to modP2pImpl/src/org/aion/p2p/impl1/tasks/ChannelBuffer.java index a3d8c54399..3a16e08e76 100644 --- a/modP2pImpl/src/org/aion/p2p/impl1/ChannelBuffer.java +++ b/modP2pImpl/src/org/aion/p2p/impl1/tasks/ChannelBuffer.java @@ -1,28 +1,26 @@ /* * Copyright (c) 2017-2018 Aion foundation. * - * This file is part of the aion network project. + * This file is part of the aion network project. * - * The aion network project is free software: you can redistribute it - * and/or modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation, either version 3 of - * the License, or any later version. + * The aion network project is free software: you can redistribute it + * and/or modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or any later version. * - * The aion network project is distributed in the hope that it will - * be useful, but WITHOUT ANY WARRANTY; without even the implied - * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. - * See the GNU General Public License for more details. + * The aion network project is distributed in the hope that it will + * be useful, but WITHOUT ANY WARRANTY; without even the implied + * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU General Public License for more details. * - * You should have received a copy of the GNU General Public License - * along with the aion network project source files. - * If not, see . - * - * Contributors to the aion source files in decreasing order of code volume: - * - * Aion foundation. + * You should have received a copy of the GNU General Public License + * along with the aion network project source files. + * If not, see . * + * Contributors: + * Aion foundation. */ -package org.aion.p2p.impl1; +package org.aion.p2p.impl1.tasks; import org.aion.p2p.Header; @@ -69,12 +67,16 @@ synchronized boolean shouldRoute(int _route, int _maxReqsPerSec) { return true; } boolean shouldRoute = prev.count < _maxReqsPerSec; - if(shouldRoute) + if(shouldRoute) { prev.count++; + } if(showLog) { - if(!shouldRoute) - System.out.println(""); + if(!shouldRoute) { + System.out.println( + ""); + } // too many msgs //else // System.out.println(""); diff --git a/modP2pImpl/src/org/aion/p2p/impl1/tasks/MsgIn.java b/modP2pImpl/src/org/aion/p2p/impl1/tasks/MsgIn.java new file mode 100644 index 0000000000..046d0a03b2 --- /dev/null +++ b/modP2pImpl/src/org/aion/p2p/impl1/tasks/MsgIn.java @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2017-2018 Aion foundation. + * + * This file is part of the aion network project. + * + * The aion network project is free software: you can redistribute it + * and/or modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or any later version. + * + * The aion network project is distributed in the hope that it will + * be useful, but WITHOUT ANY WARRANTY; without even the implied + * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with the aion network project source files. + * If not, see . + * + * Contributors: + * Aion foundation. + */ +package org.aion.p2p.impl1.tasks; + +/** + * An incoming message. + */ +public class MsgIn { + private final int nodeId; + private final String displayId; + private final int route; + private final byte[] msg; + + /** + * Constructs an incoming message. + * + * @param nodeId The node id. + * @param displayId The display id. + * @param route The route. + * @param msg The message. + */ + public MsgIn(int nodeId, String displayId, int route, byte[] msg) { + this.nodeId = nodeId; + this.displayId = displayId; + this.route = route; + this.msg = msg; + } + + public int getNodeId() { + return this.nodeId; + } + + public String getDisplayId() { + return this.displayId; + } + + public int getRoute() { + return this.route; + } + + public byte[] getMsg() { + return this.msg; + } +} diff --git a/modP2pImpl/src/org/aion/p2p/impl1/tasks/MsgOut.java b/modP2pImpl/src/org/aion/p2p/impl1/tasks/MsgOut.java new file mode 100644 index 0000000000..31cfb97e4a --- /dev/null +++ b/modP2pImpl/src/org/aion/p2p/impl1/tasks/MsgOut.java @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2017-2018 Aion foundation. + * + * This file is part of the aion network project. + * + * The aion network project is free software: you can redistribute it + * and/or modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or any later version. + * + * The aion network project is distributed in the hope that it will + * be useful, but WITHOUT ANY WARRANTY; without even the implied + * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with the aion network project source files. + * If not, see . + * + * Contributors: + * Aion foundation. + */ +package org.aion.p2p.impl1.tasks; + +import org.aion.p2p.Msg; +import org.aion.p2p.impl1.P2pMgr.Dest; + +/** + * An outgoing message. + */ +public class MsgOut { + private final int nodeId; + private final String displayId; + private final Msg msg; + private final Dest dest; + private final long timestamp; + + /** + * Constructs an outgoing message. + * + * @param nodeId The node id. + * @param displayId The display id. + * @param msg The message. + * @param dest The destination. + */ + public MsgOut(int nodeId, String displayId, Msg msg, Dest dest) { + this.nodeId = nodeId; + this.displayId = displayId; + this.msg = msg; + this.dest = dest; + timestamp = System.currentTimeMillis(); + } + + public int getNodeId() { + return this.nodeId; + } + + public String getDisplayId() { + return this.displayId; + } + + public Msg getMsg() { + return this.msg; + } + + public Dest getDest() { + return this.dest; + } + + public long getTimestamp() { + return this.timestamp; + } +} diff --git a/modP2pImpl/src/org/aion/p2p/impl1/tasks/TaskClear.java b/modP2pImpl/src/org/aion/p2p/impl1/tasks/TaskClear.java new file mode 100644 index 0000000000..b1ed935769 --- /dev/null +++ b/modP2pImpl/src/org/aion/p2p/impl1/tasks/TaskClear.java @@ -0,0 +1,78 @@ +/* + * Copyright (c) 2017-2018 Aion foundation. + * + * This file is part of the aion network project. + * + * The aion network project is free software: you can redistribute it + * and/or modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or any later version. + * + * The aion network project is distributed in the hope that it will + * be useful, but WITHOUT ANY WARRANTY; without even the implied + * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with the aion network project source files. + * If not, see . + * + * Contributors: + * Aion foundation. + */ +package org.aion.p2p.impl1.tasks; + +import java.util.Iterator; +import java.util.concurrent.atomic.AtomicBoolean; +import org.aion.p2p.INode; +import org.aion.p2p.INodeMgr; +import org.aion.p2p.IP2pMgr; + +public class TaskClear implements Runnable { + private static final int PERIOD_CLEAR = 20000; + private static final int TIMEOUT_OUTBOUND_NODES = 20000; + + private final IP2pMgr mgr; + private final INodeMgr nodeMgr; + private AtomicBoolean start; + + public TaskClear(IP2pMgr _mgr, INodeMgr _nodeMgr, AtomicBoolean _start) { + this.mgr = _mgr; + this.nodeMgr = _nodeMgr; + this.start = _start; + } + + @Override + public void run() { + while (start.get()) { + try { + Thread.sleep(PERIOD_CLEAR); + + nodeMgr.timeoutInbound(this.mgr); + + Iterator outboundIt = nodeMgr.getOutboundNodes().keySet().iterator(); + while (outboundIt.hasNext()) { + + Object obj = outboundIt.next(); + + if (obj == null) { continue; } + + int nodeIdHash = (int) obj; + INode node = nodeMgr.getOutboundNodes().get(nodeIdHash); + + if (node == null) { continue; } + + if (System.currentTimeMillis() - node.getTimestamp() > TIMEOUT_OUTBOUND_NODES) { + this.mgr.closeSocket( + node.getChannel(), "outbound-timeout node=" + node.getIdShort()); + outboundIt.remove(); + } + } + + nodeMgr.timeoutActive(this.mgr); + + } catch (Exception e) { + } + } + } +} diff --git a/modP2pImpl/src/org/aion/p2p/impl1/tasks/TaskConnectPeers.java b/modP2pImpl/src/org/aion/p2p/impl1/tasks/TaskConnectPeers.java new file mode 100644 index 0000000000..fea763fd0d --- /dev/null +++ b/modP2pImpl/src/org/aion/p2p/impl1/tasks/TaskConnectPeers.java @@ -0,0 +1,181 @@ +/* + * Copyright (c) 2017-2018 Aion foundation. + * + * This file is part of the aion network project. + * + * The aion network project is free software: you can redistribute it + * and/or modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or any later version. + * + * The aion network project is distributed in the hope that it will + * be useful, but WITHOUT ANY WARRANTY; without even the implied + * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with the aion network project source files. + * If not, see . + * + * Contributors: + * Aion foundation. + */ +package org.aion.p2p.impl1.tasks; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import org.aion.p2p.INode; +import org.aion.p2p.INodeMgr; +import org.aion.p2p.IP2pMgr; +import org.aion.p2p.impl.zero.msg.ReqHandshake1; +import org.aion.p2p.impl1.P2pMgr.Dest; + +public class TaskConnectPeers implements Runnable { + private static final int PERIOD_CONNECT_OUTBOUND = 1000; + private static final int TIMEOUT_OUTBOUND_CONNECT = 10000; + + private final INodeMgr nodeMgr; + private final int maxActiveNodes; + private final IP2pMgr mgr; + private AtomicBoolean start; + private BlockingQueue sendMsgQue; + private Selector selector; + private ReqHandshake1 cachedReqHandshake1; + + public TaskConnectPeers( + IP2pMgr _mgr, + AtomicBoolean _start, + INodeMgr _nodeMgr, + int _maxActiveNodes, + Selector _selector, + BlockingQueue _sendMsgQue, + ReqHandshake1 _cachedReqHandshake1) { + + this.start = _start; + this.nodeMgr = _nodeMgr; + this.maxActiveNodes = _maxActiveNodes; + this.mgr = _mgr; + this.selector = _selector; + this.sendMsgQue = _sendMsgQue; + this.cachedReqHandshake1 = _cachedReqHandshake1; + } + + @Override + public void run() { + Thread.currentThread().setName("p2p-tcp"); + while (this.start.get()) { + try { + Thread.sleep(PERIOD_CONNECT_OUTBOUND); + } catch (InterruptedException e) { + if (this.mgr.isShowLog()) { System.out.println(getTcpInterruptedMsg()); } + } + + if (this.nodeMgr.activeNodesSize() >= this.maxActiveNodes) { + if (this.mgr.isShowLog()) { + System.out.println(getTcpPassMaxNodesMsg()); + } + continue; + } + + INode node; + try { + node = this.nodeMgr.tempNodesTake(); + if (this.nodeMgr.isSeedIp(node.getIpStr())) { node.setFromBootList(true); } + if (node.getIfFromBootList()) { this.nodeMgr.addTempNode(node); } + // if (node.peerMetric.shouldNotConn()) { + // continue; + // } + } catch (InterruptedException e) { + if (this.mgr.isShowLog()) { System.out.println(getTcpInterruptedMsg()); } + return; + } catch (Exception e) { + if (this.mgr.isShowLog()) { e.printStackTrace(); } + continue; + } + int nodeIdHash = node.getIdHash(); + if (!this.nodeMgr.getOutboundNodes().containsKey(nodeIdHash) + && !this.nodeMgr.hasActiveNode(nodeIdHash)) { + int _port = node.getPort(); + try { + SocketChannel channel = SocketChannel.open(); + + channel.socket() + .connect( + new InetSocketAddress(node.getIpStr(), _port), + TIMEOUT_OUTBOUND_CONNECT); + this.mgr.configChannel(channel); + + if (channel.finishConnect() && channel.isConnected()) { + + if (this.mgr.isShowLog()) { + System.out.println(getSucesCnctMsg(node.getIdShort(), node.getIpStr())); + } + SelectionKey sk = channel.register(this.selector, SelectionKey.OP_READ); + ChannelBuffer rb = new ChannelBuffer(this.mgr.isShowLog()); + rb.displayId = node.getIdShort(); + rb.nodeIdHash = nodeIdHash; + sk.attach(rb); + + node.refreshTimestamp(); + node.setChannel(channel); + this.nodeMgr.addOutboundNode(node); + + if (this.mgr.isShowLog()) { + System.out.println(getPrepRqstMsg(node.getIdShort(), node.getIpStr())); + } + this.sendMsgQue.offer( + new MsgOut( + node.getIdHash(), + node.getIdShort(), + this.cachedReqHandshake1, + Dest.OUTBOUND)); + // node.peerMetric.decFailedCount(); + + } else { + if (this.mgr.isShowLog()) { + System.out.println(getFailCnctMsg(node.getIdShort(), node.getIpStr())); + } + channel.close(); + // node.peerMetric.incFailedCount(); + } + } catch (IOException e) { + if (this.mgr.isShowLog()) { + System.out.println(getOutboundConnectMsg(node.getIpStr(), _port)); + } + // node.peerMetric.incFailedCount(); + } catch (Exception e) { + if (this.mgr.isShowLog()) e.printStackTrace(); + } + } + } + } + + private String getTcpInterruptedMsg() { + return ""; + } + + private String getTcpPassMaxNodesMsg() { + return ""; + } + + private String getSucesCnctMsg(String idStr, String ipStr) { + return ""; + } + + private String getOutboundConnectMsg(String ipStr, int port) { + return ""; + } + + private String getFailCnctMsg(String idStr, String ipStr) { + return ""; + } + + private String getPrepRqstMsg(String idStr, String ipStr) { + return " id=" + idStr + " ip=" + ipStr + ">"; + } +} diff --git a/modP2pImpl/src/org/aion/p2p/impl1/tasks/TaskInbound.java b/modP2pImpl/src/org/aion/p2p/impl1/tasks/TaskInbound.java new file mode 100644 index 0000000000..ddcfe03876 --- /dev/null +++ b/modP2pImpl/src/org/aion/p2p/impl1/tasks/TaskInbound.java @@ -0,0 +1,576 @@ +/* + * Copyright (c) 2017-2018 Aion foundation. + * + * This file is part of the aion network project. + * + * The aion network project is free software: you can redistribute it + * and/or modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or any later version. + * + * The aion network project is distributed in the hope that it will + * be useful, but WITHOUT ANY WARRANTY; without even the implied + * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with the aion network project source files. + * If not, see . + * + * Contributors: + * Aion foundation. + */ +package org.aion.p2p.impl1.tasks; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.nio.channels.CancelledKeyException; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import org.aion.p2p.Ctrl; +import org.aion.p2p.Handler; +import org.aion.p2p.Header; +import org.aion.p2p.INode; +import org.aion.p2p.INodeMgr; +import org.aion.p2p.IP2pMgr; +import org.aion.p2p.P2pConstant; +import org.aion.p2p.Ver; +import org.aion.p2p.impl.comm.Act; +import org.aion.p2p.impl.zero.msg.ReqHandshake; +import org.aion.p2p.impl.zero.msg.ReqHandshake1; +import org.aion.p2p.impl.zero.msg.ResActiveNodes; +import org.aion.p2p.impl.zero.msg.ResHandshake; +import org.aion.p2p.impl.zero.msg.ResHandshake1; +import org.aion.p2p.impl1.P2pException; +import org.aion.p2p.impl1.P2pMgr.Dest; + +public class TaskInbound implements Runnable { + private final IP2pMgr mgr; + private final Selector selector; + private final INodeMgr nodeMgr; + private final Map> handlers; + private AtomicBoolean start; + private ServerSocketChannel tcpServer; + private BlockingQueue sendMsgQue; + private ResHandshake1 cachedResHandshake1; + private BlockingQueue receiveMsgQue; + + public TaskInbound( + IP2pMgr _mgr, + Selector _selector, + AtomicBoolean _start, + INodeMgr _nodeMgr, + ServerSocketChannel _tcpServer, + Map> _handlers, + BlockingQueue _sendMsgQue, + ResHandshake1 _cachedResHandshake1, + BlockingQueue _receiveMsgQue) { + + this.mgr = _mgr; + this.selector = _selector; + this.start = _start; + this.nodeMgr = _nodeMgr; + this.tcpServer = _tcpServer; + this.handlers = _handlers; + this.sendMsgQue = _sendMsgQue; + this.cachedResHandshake1 = _cachedResHandshake1; + this.receiveMsgQue = _receiveMsgQue; + } + + @Override + public void run() { + + // read buffer pre-alloc. @ max_body_size + ByteBuffer readBuf = ByteBuffer.allocate(P2pConstant.MAX_BODY_SIZE); + + while (start.get()) { + + try { + Thread.sleep(0, 1); + } catch (Exception e) { + } + + int num; + try { + num = this.selector.selectNow(); + } catch (IOException e) { + if (this.mgr.isShowLog()) System.out.println(""); + continue; + } + + if (num == 0) { + continue; + } + + Iterator keys = this.selector.selectedKeys().iterator(); + + while (keys.hasNext() && (num-- > 0)) { + + final SelectionKey sk = keys.next(); + keys.remove(); + + try { + + if (!sk.isValid()) continue; + + if (sk.isAcceptable()) accept(); + + if (sk.isReadable()) { + + readBuf.rewind(); + + ChannelBuffer chanBuf = (ChannelBuffer) (sk.attachment()); + try { + + int ret; + int cnt = 0; + + while ((ret = ((SocketChannel) sk.channel()).read(readBuf)) > 0) { + cnt += ret; + } + + // read empty select key, continue. + if (cnt <= 0) { + continue; + } + + int prevCnt = cnt + chanBuf.buffRemain; + ByteBuffer forRead; + + if (chanBuf.buffRemain != 0) { + byte[] alreadyRead = new byte[cnt]; + + readBuf.position(0); + readBuf.get(alreadyRead); + forRead = ByteBuffer.allocate(prevCnt); + forRead.put(chanBuf.remainBuffer); + forRead.put(alreadyRead); + } else { + forRead = readBuf; + } + + do { + cnt = read(sk, forRead, prevCnt); + + if (prevCnt == cnt) { + break; + } else prevCnt = cnt; + + } while (cnt > 0); + + // check if really read data. + if (cnt > prevCnt) { + chanBuf.buffRemain = 0; + throw new P2pException(getReadOverflowMsg(prevCnt, cnt)); + } + + chanBuf.buffRemain = cnt; + + if (cnt == 0) { + readBuf.rewind(); + } else { + // there are no perfect cycling buffer in jdk + // yet. + // simply just buff move for now. + // @TODO: looking for more efficient way. + + int currPos = forRead.position(); + chanBuf.remainBuffer = new byte[cnt]; + forRead.position(currPos - cnt); + forRead.get(chanBuf.remainBuffer); + readBuf.rewind(); + } + + } catch (NullPointerException e) { + this.mgr.closeSocket( + (SocketChannel) sk.channel(), + chanBuf.displayId + "-read-msg-null-exception"); + chanBuf.isClosed.set(true); + } catch (P2pException e) { + this.mgr.closeSocket( + (SocketChannel) sk.channel(), + chanBuf.displayId + "-read-msg-p2p-exception"); + chanBuf.isClosed.set(true); + + } catch (ClosedChannelException e) { + this.mgr.closeSocket( + (SocketChannel) sk.channel(), + chanBuf.displayId + "-read-msg-closed-channel-exception"); + + } catch (IOException e) { + this.mgr.closeSocket( + (SocketChannel) sk.channel(), + chanBuf.displayId + + "-read-msg-io-exception: " + + e.getMessage()); + chanBuf.isClosed.set(true); + + } catch (CancelledKeyException e) { + chanBuf.isClosed.set(true); + this.mgr.closeSocket( + (SocketChannel) sk.channel(), + chanBuf.displayId + "-read-msg-key-cancelled-exception"); + } catch (Exception e) { + if (this.mgr.isShowLog()) + System.out.println(""); + } + } + } catch (Exception ex) { + if (this.mgr.isShowLog()) { + System.out.println(""); + ex.printStackTrace(); + } + } + } + } + if (this.mgr.isShowLog()) System.out.println(""); + } + + private void accept() { + SocketChannel channel; + try { + + if (this.nodeMgr.activeNodesSize() >= this.mgr.getMaxActiveNodes()) return; + + channel = this.tcpServer.accept(); + this.mgr.configChannel(channel); + + SelectionKey sk = channel.register(this.selector, SelectionKey.OP_READ); + sk.attach(new ChannelBuffer(this.mgr.isShowLog())); + + String ip = channel.socket().getInetAddress().getHostAddress(); + int port = channel.socket().getPort(); + + if (this.mgr.isSyncSeedsOnly() && this.nodeMgr.isSeedIp(ip)) { + channel.close(); + return; + } + + INode node = this.nodeMgr.allocNode(ip, 0); + node.setChannel(channel); + this.nodeMgr.addInboundNode(node); + + if (this.mgr.isShowLog()) + System.out.println(""); + + } catch (IOException e) { + if (this.mgr.isShowLog()) System.out.println(""); + } + } + + /** + * SocketChannel + */ + private int readHeader(final ChannelBuffer _cb, ByteBuffer readBuffer, int cnt) { + + if (cnt < Header.LEN) return cnt; + + int origPos = readBuffer.position(); + int startP = origPos - cnt; + readBuffer.position(startP); + _cb.readHead(readBuffer); + readBuffer.position(origPos); + return cnt - Header.LEN; + } + + /** + * SocketChannel + */ + private int readBody(final ChannelBuffer _cb, ByteBuffer readBuffer, int cnt) { + + int bodyLen = _cb.header.getLen(); + + // some msg have nobody. + if (bodyLen == 0) { + _cb.body = new byte[0]; + return cnt; + } + + if (cnt < bodyLen) { return cnt; } + + int origPos = readBuffer.position(); + int startP = origPos - cnt; + readBuffer.position(startP); + _cb.readBody(readBuffer); + readBuffer.position(origPos); + return cnt - bodyLen; + } + + /** + * @param _sk SelectionKey + * @throws IOException IOException + */ + private int read(final SelectionKey _sk, ByteBuffer _readBuffer, int _cnt) throws IOException { + + int currCnt = 0; + + if (_sk.attachment() == null) { + throw new P2pException("attachment is null"); + } + ChannelBuffer rb = (ChannelBuffer) _sk.attachment(); + + // read header + if (!rb.isHeaderCompleted()) { + currCnt = readHeader(rb, _readBuffer, _cnt); + } else { + currCnt = _cnt; + } + + // read body + if (rb.isHeaderCompleted() && !rb.isBodyCompleted()) { + currCnt = readBody(rb, _readBuffer, currCnt); + } + + if (!rb.isBodyCompleted()) { return currCnt; } + + Header h = rb.header; + + byte[] bodyBytes = rb.body; + rb.refreshHeader(); + rb.refreshBody(); + + short ver = h.getVer(); + byte ctrl = h.getCtrl(); + byte act = h.getAction(); + int route = h.getRoute(); + + boolean underRC = + rb.shouldRoute( + route, + ((route == this.mgr.txBroadCastRoute) + ? P2pConstant.READ_MAX_RATE_TXBC + : P2pConstant.READ_MAX_RATE)); + + if (!underRC) { + if (this.mgr.isShowLog()) { + System.out.println( + getRouteMsg(ver, ctrl, act, rb.getRouteCount(route).count, rb.displayId)); + } + return currCnt; + } + + switch (ver) { + case Ver.V0: + switch (ctrl) { + case Ctrl.NET: + try { + handleP2pMsg(_sk, act, bodyBytes); + } catch (Exception ex) { + if (this.mgr.isShowLog()) { + System.out.println( + ""); + } + } + break; + case Ctrl.SYNC: + if (!this.handlers.containsKey(route)) { + if (this.mgr.isShowLog()) { + System.out.println(getUnregRouteMsg(ver, ctrl, act, rb.displayId)); + } + return currCnt; + } + + this.handleKernelMsg(rb.nodeIdHash, route, bodyBytes); + break; + default: + if (this.mgr.isShowLog()) { + System.out.println(getInvalRouteMsg(ver, ctrl, act, rb.displayId)); + } + break; + } + break; + default: + if (this.mgr.isShowLog()) { + System.out.println(""); + } + break; + } + + return currCnt; + } + + /** + * @param _sk SelectionKey + * @param _act ACT + * @param _msgBytes byte[] + */ + private void handleP2pMsg(final SelectionKey _sk, byte _act, final byte[] _msgBytes) { + + ChannelBuffer rb = (ChannelBuffer) _sk.attachment(); + + switch (_act) { + case Act.REQ_HANDSHAKE: + if (_msgBytes.length > ReqHandshake.LEN) { + ReqHandshake1 reqHandshake1 = ReqHandshake1.decode(_msgBytes); + if (reqHandshake1 != null) { + handleReqHandshake( + rb, + _sk.channel().hashCode(), + reqHandshake1.getNodeId(), + reqHandshake1.getNetId(), + reqHandshake1.getPort(), + reqHandshake1.getRevision()); + } + } + break; + + case Act.RES_HANDSHAKE: + if (rb.nodeIdHash == 0) { return; } + + if (_msgBytes.length > ResHandshake.LEN) { + ResHandshake1 resHandshake1 = ResHandshake1.decode(_msgBytes); + if (resHandshake1 != null && resHandshake1.getSuccess()) { + handleResHandshake(rb.nodeIdHash, resHandshake1.getBinaryVersion()); + } + } + break; + + case Act.REQ_ACTIVE_NODES: + if (rb.nodeIdHash != 0) { + INode node = nodeMgr.getActiveNode(rb.nodeIdHash); + if (node != null) { + this.sendMsgQue.offer( + new MsgOut( + node.getIdHash(), + node.getIdShort(), + new ResActiveNodes(nodeMgr.getActiveNodesList()), + Dest.ACTIVE)); + } + } + break; + + case Act.RES_ACTIVE_NODES: + if (this.mgr.isSyncSeedsOnly()) { break; } + + if (rb.nodeIdHash != 0) { + INode node = nodeMgr.getActiveNode(rb.nodeIdHash); + if (node != null) { + node.refreshTimestamp(); + ResActiveNodes resActiveNodes = ResActiveNodes.decode(_msgBytes); + if (resActiveNodes != null) { + List incomingNodes = resActiveNodes.getNodes(); + for (INode incomingNode : incomingNodes) { + if (nodeMgr.tempNodesSize() >= this.mgr.getMaxTempNodes()) { return; } + if (this.mgr.validateNode(incomingNode)) { + nodeMgr.addTempNode(incomingNode); + } + } + } + } + } + break; + default: + if (this.mgr.isShowLog()) { + System.out.println(""); + } + break; + } + } + + /** + * @param _buffer ChannelBuffer + * @param _channelHash int + * @param _nodeId byte[] + * @param _netId int + * @param _port int + * @param _revision byte[] + *

Construct node info after handshake request success + */ + private void handleReqHandshake( + final ChannelBuffer _buffer, + int _channelHash, + final byte[] _nodeId, + int _netId, + int _port, + final byte[] _revision) { + INode node = nodeMgr.getInboundNode(_channelHash); + if (node != null && node.getPeerMetric().notBan()) { + if (handshakeRuleCheck(_netId)) { + _buffer.nodeIdHash = Arrays.hashCode(_nodeId); + _buffer.displayId = new String(Arrays.copyOfRange(_nodeId, 0, 6)); + node.setId(_nodeId); + node.setPort(_port); + + // handshake 1 + if (_revision != null) { + String binaryVersion; + try { + binaryVersion = new String(_revision, "UTF-8"); + } catch (UnsupportedEncodingException e) { + binaryVersion = "decode-fail"; + } + node.setBinaryVersion(binaryVersion); + nodeMgr.moveInboundToActive(_channelHash, this.mgr); + this.sendMsgQue.offer( + new MsgOut( + node.getIdHash(), + node.getIdShort(), + this.cachedResHandshake1, + Dest.ACTIVE)); + } + + } else { + if (this.mgr.isShowLog()) { System.out.println(""); } + } + } + } + + private void handleResHandshake(int _nodeIdHash, String _binaryVersion) { + INode node = nodeMgr.getOutboundNodes().get(_nodeIdHash); + if (node != null && node.getPeerMetric().notBan()) { + node.refreshTimestamp(); + node.setBinaryVersion(_binaryVersion); + nodeMgr.moveOutboundToActive(node.getIdHash(), node.getIdShort(), this.mgr); + } + } + + /** + * @param _nodeIdHash int + * @param _route int + * @param _msgBytes byte[] + */ + private void handleKernelMsg(int _nodeIdHash, int _route, final byte[] _msgBytes) { + INode node = nodeMgr.getActiveNode(_nodeIdHash); + if (node != null) { + int nodeIdHash = node.getIdHash(); + String nodeDisplayId = node.getIdShort(); + node.refreshTimestamp(); + this.receiveMsgQue.offer(new MsgIn(nodeIdHash, nodeDisplayId, _route, _msgBytes)); + } + } + + /** @return boolean TODO: implementation */ + private boolean handshakeRuleCheck(int netId) { + // check net id + if (netId != this.mgr.getSelfNetId()) { return false; } + // check supported protocol versions + return true; + } + + private String getReadOverflowMsg(int prevCnt, int cnt) { + return "IO read overflow! suppose read:" + prevCnt + " real left:" + cnt; + } + + private String getRouteMsg(short ver, byte ctrl, byte act, int count, String idStr) { + return ""; + } + + private String getUnregRouteMsg(short ver, byte ctrl, byte act, String idStr) { + return ""; + } + + private String getInvalRouteMsg(short ver, byte ctrl, byte act, String idStr) { + return ""; + } +} diff --git a/modP2pImpl/src/org/aion/p2p/impl1/tasks/TaskReceive.java b/modP2pImpl/src/org/aion/p2p/impl1/tasks/TaskReceive.java new file mode 100644 index 0000000000..b5af5e5e0d --- /dev/null +++ b/modP2pImpl/src/org/aion/p2p/impl1/tasks/TaskReceive.java @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2017-2018 Aion foundation. + * + * This file is part of the aion network project. + * + * The aion network project is free software: you can redistribute it + * and/or modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or any later version. + * + * The aion network project is distributed in the hope that it will + * be useful, but WITHOUT ANY WARRANTY; without even the implied + * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with the aion network project source files. + * If not, see . + * + * Contributors: + * Aion foundation. + */ +package org.aion.p2p.impl1.tasks; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import org.aion.p2p.Handler; + +public class TaskReceive implements Runnable { + private final AtomicBoolean start; + private final BlockingQueue receiveMsgQue; + private final Map> handlers; + private final boolean showLog; + + public TaskReceive( + AtomicBoolean _start, + BlockingQueue _receiveMsgQue, + Map> _handlers, + boolean _showLog) { + this.start = _start; + this.receiveMsgQue = _receiveMsgQue; + this.handlers = _handlers; + this.showLog = _showLog; + } + + @Override + public void run() { + while (this.start.get()) { + try { + MsgIn mi = this.receiveMsgQue.take(); + + List hs = this.handlers.get(mi.getRoute()); + if (hs == null) { continue; } + for (Handler hlr : hs) { + if (hlr == null) { continue; } + + try { + hlr.receive(mi.getNodeId(), mi.getDisplayId(), mi.getMsg()); + } catch (Exception e) { + if (this.showLog) { e.printStackTrace(); } + } + } + } catch (InterruptedException e) { + if (this.showLog) { System.out.println(""); } + return; + } catch (Exception e) { + if (this.showLog) { e.printStackTrace(); } + } + } + } + +} diff --git a/modP2pImpl/src/org/aion/p2p/impl1/tasks/TaskSend.java b/modP2pImpl/src/org/aion/p2p/impl1/tasks/TaskSend.java new file mode 100644 index 0000000000..904467b305 --- /dev/null +++ b/modP2pImpl/src/org/aion/p2p/impl1/tasks/TaskSend.java @@ -0,0 +1,142 @@ +/* + * Copyright (c) 2017-2018 Aion foundation. + * + * This file is part of the aion network project. + * + * The aion network project is free software: you can redistribute it + * and/or modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or any later version. + * + * The aion network project is distributed in the hope that it will + * be useful, but WITHOUT ANY WARRANTY; without even the implied + * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with the aion network project source files. + * If not, see . + * + * Contributors: + * Aion foundation. + */ +package org.aion.p2p.impl1.tasks; + +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import org.aion.p2p.INode; +import org.aion.p2p.INodeMgr; +import org.aion.p2p.IP2pMgr; +import org.aion.p2p.P2pConstant; + +public class TaskSend implements Runnable { + public static final int TOTAL_LANE = (1 << 5) - 1; + + private final IP2pMgr mgr; + private final AtomicBoolean start; + private final BlockingQueue sendMsgQue; + private final INodeMgr nodeMgr; + private final Selector selector; + private final int lane; + + public TaskSend( + IP2pMgr _mgr, + int _lane, + BlockingQueue _sendMsgQue, + AtomicBoolean _start, + INodeMgr _nodeMgr, + Selector _selector) { + + this.mgr = _mgr; + this.lane = _lane; + this.sendMsgQue = _sendMsgQue; + this.start = _start; + this.nodeMgr = _nodeMgr; + this.selector = _selector; + } + + @Override + public void run() { + while (start.get()) { + try { + MsgOut mo = sendMsgQue.take(); + // if timeout , throw away this msg. + long now = System.currentTimeMillis(); + if (now - mo.getTimestamp() > P2pConstant.WRITE_MSG_TIMEOUT) { + if (this.mgr.isShowLog()) { + System.out.println(getTimeoutMsg(mo.getDisplayId(), now)); + } + continue; + } + + // if not belong to current lane, put it back. + int targetLane = hash2Lane(mo.getNodeId()); + if (targetLane != lane) { + sendMsgQue.offer(mo); + continue; + } + + INode node = null; + switch (mo.getDest()) { + case ACTIVE: + node = nodeMgr.getActiveNode(mo.getNodeId()); + break; + case INBOUND: + node = nodeMgr.getInboundNode(mo.getNodeId()); + break; + case OUTBOUND: + node = nodeMgr.getOutboundNode(mo.getNodeId()); + break; + } + + if (node != null) { + SelectionKey sk = node.getChannel().keyFor(selector); + if (sk != null) { + Object attachment = sk.attachment(); + if (attachment != null) { + TaskWrite tw = + new TaskWrite( + this.mgr.isShowLog(), + node.getIdShort(), + node.getChannel(), + mo.getMsg(), + (ChannelBuffer) attachment, + this.mgr); + tw.run(); + } + } + } else { + if (this.mgr.isShowLog()) { + System.out + .println(getNodeNotExitMsg(mo.getDest().name(), mo.getDisplayId())); + } + } + } catch (InterruptedException e) { + if (this.mgr.isShowLog()) { System.out.println(""); } + return; + } catch (Exception e) { + if (this.mgr.isShowLog()) { e.printStackTrace(); } + } + } + } + + // hash mapping channel id to write thread. + private int hash2Lane(int in) { + in ^= in >> (32 - 5); + in ^= in >> (32 - 10); + in ^= in >> (32 - 15); + in ^= in >> (32 - 20); + in ^= in >> (32 - 25); + return in & 0b11111; + } + + private String getTimeoutMsg(String id, long now) { + return ""; + } + + private String getNodeNotExitMsg(String name, String id) { + return "" + id + " node-not-exit"; + } +} diff --git a/modP2pImpl/src/org/aion/p2p/impl1/tasks/TaskStatus.java b/modP2pImpl/src/org/aion/p2p/impl1/tasks/TaskStatus.java new file mode 100644 index 0000000000..38a16ea850 --- /dev/null +++ b/modP2pImpl/src/org/aion/p2p/impl1/tasks/TaskStatus.java @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2017-2018 Aion foundation. + * + * This file is part of the aion network project. + * + * The aion network project is free software: you can redistribute it + * and/or modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or any later version. + * + * The aion network project is distributed in the hope that it will + * be useful, but WITHOUT ANY WARRANTY; without even the implied + * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with the aion network project source files. + * If not, see . + * + * Contributors: + * Aion foundation. + */ +package org.aion.p2p.impl1.tasks; + +import java.util.concurrent.BlockingQueue; +import org.aion.p2p.INodeMgr; + +public class TaskStatus implements Runnable { + private final INodeMgr nodeMgr; + private final String selfShortId; + private BlockingQueue sendMsgQue; + private BlockingQueue receiveMsgQue; + + public TaskStatus( + INodeMgr _nodeMgr, + String _selfShortId, + BlockingQueue _sendMsgQue, + BlockingQueue _receiveMsgQue) { + this.nodeMgr = _nodeMgr; + this.selfShortId = _selfShortId; + this.sendMsgQue = _sendMsgQue; + this.receiveMsgQue = _receiveMsgQue; + } + + @Override + public void run() { + Thread.currentThread().setName("p2p-ts"); + String status = this.nodeMgr.dumpNodeInfo(this.selfShortId); + System.out.println(status); + System.out.println("--------------------------------------------------------------------" + + "-------------------------------------------------------------------------------" + + "-----------------"); + System.out.println( + "recv queue [" + + this.receiveMsgQue.size() + + "] send queue [" + + this.sendMsgQue.size() + + "]\n"); + } +} diff --git a/modP2pImpl/src/org/aion/p2p/impl1/TaskWrite.java b/modP2pImpl/src/org/aion/p2p/impl1/tasks/TaskWrite.java similarity index 76% rename from modP2pImpl/src/org/aion/p2p/impl1/TaskWrite.java rename to modP2pImpl/src/org/aion/p2p/impl1/tasks/TaskWrite.java index 126ffdd557..38e3dacbd4 100644 --- a/modP2pImpl/src/org/aion/p2p/impl1/TaskWrite.java +++ b/modP2pImpl/src/org/aion/p2p/impl1/tasks/TaskWrite.java @@ -1,35 +1,33 @@ /* * Copyright (c) 2017-2018 Aion foundation. * - * This file is part of the aion network project. + * This file is part of the aion network project. * - * The aion network project is free software: you can redistribute it - * and/or modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation, either version 3 of - * the License, or any later version. + * The aion network project is free software: you can redistribute it + * and/or modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or any later version. * - * The aion network project is distributed in the hope that it will - * be useful, but WITHOUT ANY WARRANTY; without even the implied - * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. - * See the GNU General Public License for more details. + * The aion network project is distributed in the hope that it will + * be useful, but WITHOUT ANY WARRANTY; without even the implied + * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU General Public License for more details. * - * You should have received a copy of the GNU General Public License - * along with the aion network project source files. - * If not, see . - * - * Contributors to the aion source files in decreasing order of code volume: - * - * Aion foundation. + * You should have received a copy of the GNU General Public License + * along with the aion network project source files. + * If not, see . * + * Contributors: + * Aion foundation. */ - -package org.aion.p2p.impl1; +package org.aion.p2p.impl1.tasks; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.SocketChannel; import org.aion.p2p.Header; +import org.aion.p2p.IP2pMgr; import org.aion.p2p.Msg; /** @author chris */ @@ -40,7 +38,7 @@ public class TaskWrite implements Runnable { private SocketChannel sc; private Msg msg; private ChannelBuffer channelBuffer; - private P2pMgr p2pMgr; + private IP2pMgr p2pMgr; TaskWrite( boolean _showLog, @@ -48,7 +46,7 @@ public class TaskWrite implements Runnable { final SocketChannel _sc, final Msg _msg, final ChannelBuffer _cb, - final P2pMgr _p2pMgr) { + final IP2pMgr _p2pMgr) { this.showLog = _showLog; this.nodeShortId = _nodeShortId; this.sc = _sc; @@ -83,7 +81,7 @@ public void run() { // System.out.println("write " + h.getVer() + "-" + h.getCtrl() + "-" + h.getAction()); ByteBuffer buf = ByteBuffer.allocate(headerBytes.length + bodyLen); buf.put(headerBytes); - if (bodyBytes != null) buf.put(bodyBytes); + if (bodyBytes != null) { buf.put(bodyBytes); } buf.flip(); try { diff --git a/modP2pImpl/test/org/aion/p2p/impl/zero/msg/ResActiveNodesTest.java b/modP2pImpl/test/org/aion/p2p/impl/zero/msg/ResActiveNodesTest.java index 2a7ec04b98..2ce184b0d3 100644 --- a/modP2pImpl/test/org/aion/p2p/impl/zero/msg/ResActiveNodesTest.java +++ b/modP2pImpl/test/org/aion/p2p/impl/zero/msg/ResActiveNodesTest.java @@ -1,6 +1,7 @@ package org.aion.p2p.impl.zero.msg; import org.aion.p2p.Ctrl; +import org.aion.p2p.INode; import org.aion.p2p.Ver; import org.aion.p2p.impl.comm.Act; import org.aion.p2p.impl.comm.Node; @@ -49,18 +50,18 @@ public void testRoute() { public void testEncodeDecode() { int m = ThreadLocalRandom.current().nextInt(0, 20); - List srcNodes = new ArrayList<>(); + List srcNodes = new ArrayList<>(); for(int i = 0; i < m; i++){ srcNodes.add(randomNode()); } ResActiveNodes res = ResActiveNodes.decode(new ResActiveNodes(srcNodes).encode()); assertEquals(res.getNodes().size(), m); - List tarNodes = res.getNodes(); + List tarNodes = res.getNodes(); for(int i = 0; i < m; i++){ - Node srcNode = srcNodes.get(i); - Node tarNode = tarNodes.get(i); + INode srcNode = srcNodes.get(i); + INode tarNode = tarNodes.get(i); Assert.assertArrayEquals(srcNode.getId(), tarNode.getId()); Assert.assertEquals(srcNode.getIdHash(), tarNode.getIdHash());