diff --git a/.github/workflows/deploy_release.yaml b/.github/workflows/deploy_release.yaml new file mode 100644 index 000000000..cc61bbdd9 --- /dev/null +++ b/.github/workflows/deploy_release.yaml @@ -0,0 +1,38 @@ +name: Deploy release to the Maven Central Repository + +on: + release: + types: + - published + +jobs: + deploy: + runs-on: ubuntu-18.04 + steps: + - name: Check out Git repository + uses: actions/checkout@v2 + + - name: Install Java and Maven + uses: actions/setup-java@v1 + with: + java-version: 1.8 + + - name: Install nebula-graph + run: | + mkdir tmp + pushd tmp + git clone https://github.com/vesoft-inc/nebula-docker-compose.git + pushd nebula-docker-compose/ + cp ../../client/src/test/resources/docker-compose.yaml . + docker-compose up -d + sleep 10 + popd + popd + + - name: Deploy Release to Maven package + uses: samuelmeuli/action-maven-publish@v1 + with: + gpg_private_key: ${{ secrets.GPG_PRIVATE_KEY }} + gpg_passphrase: ${{ secrets.GPG_PASSPHRASE }} + nexus_username: ${{ secrets.OSSRH_USERNAME }} + nexus_password: ${{ secrets.OSSRH_TOKEN }} diff --git a/.github/workflows/deploy_snapshot.yaml b/.github/workflows/deploy_snapshot.yaml new file mode 100644 index 000000000..5f55ffaff --- /dev/null +++ b/.github/workflows/deploy_snapshot.yaml @@ -0,0 +1,38 @@ +name: Deploy snapshot to the Maven Central Repository + +on: + push: + branches: [ master ] + schedule: + - cron: '0 6 * * *' + +jobs: + deploy: + runs-on: ubuntu-18.04 + steps: + - name: Check out Git repository + uses: actions/checkout@v2 + + - name: Install Java and Maven + uses: actions/setup-java@v1 + with: + java-version: 1.8 + + - name: Install nebula-graph + run: | + mkdir tmp + pushd tmp + git clone https://github.com/vesoft-inc/nebula-docker-compose.git + pushd nebula-docker-compose/ + cp ../../client/src/test/resources/docker-compose.yaml . + docker-compose up -d + sleep 10 + popd + popd + - name: Deploy Snapshot to Maven package + uses: samuelmeuli/action-maven-publish@v1 + with: + gpg_private_key: ${{ secrets.GPG_PRIVATE_KEY }} + gpg_passphrase: ${{ secrets.GPG_PASSPHRASE }} + nexus_username: ${{ secrets.OSSRH_USERNAME }} + nexus_password: ${{ secrets.OSSRH_TOKEN }} diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index 91a9f9060..84e304e89 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -7,7 +7,10 @@ on: push: branches: [ master ] pull_request: - branches: [ master ] + branches: + - master + - 'v[0-9]+.*' + jobs: build: diff --git a/README.md b/README.md index b7e91b6e2..29e77336d 100644 --- a/README.md +++ b/README.md @@ -60,7 +60,7 @@ There are the version correspondence between client and Nebula: | 1.1.0 | 1.1.0,1.2.0 | | 2.0.0-beta | 2.0.0-beta | | 2.0.0-rc1 | 2.0.0-rc1 | -| 2.0.0 | 2.0.0 | +| 2.0.0/2.0.1 | 2.0.0/2.0.1 | | 2.0.0-SNAPSHOT| 2.0.0-nightly | ## Graph client example diff --git a/client/pom.xml b/client/pom.xml index dfc4e6d3f..8ec7bf118 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -5,7 +5,7 @@ com.vesoft nebula - 2.0.0 + 2.0.1 4.0.0 diff --git a/client/src/main/java/com/vesoft/nebula/client/graph/data/PathWrapper.java b/client/src/main/java/com/vesoft/nebula/client/graph/data/PathWrapper.java index 3e06e0fbf..aab294940 100644 --- a/client/src/main/java/com/vesoft/nebula/client/graph/data/PathWrapper.java +++ b/client/src/main/java/com/vesoft/nebula/client/graph/data/PathWrapper.java @@ -21,6 +21,7 @@ public class PathWrapper { private List segments = new ArrayList<>(); private List nodes = new ArrayList<>(); private List relationships = new ArrayList<>(); + private Path path = null; public static class Segment { Node startNode; @@ -144,6 +145,7 @@ public PathWrapper(Path path) throws InvalidValueException, UnsupportedEncodingE this.segments = new ArrayList<>(); return; } + this.path = path; nodes.add(new Node(path.src)); List vids = new ArrayList<>(); vids.add(path.src.vid); @@ -190,48 +192,28 @@ public PathWrapper(Path path) throws InvalidValueException, UnsupportedEncodingE @Override public String toString() { try { - Node startNode = getStartNode(); List edgeStrs = new ArrayList<>(); - if (segments.size() >= 1) { + for (int i = 0; i < relationships.size(); i++) { + Relationship relationship = relationships.get(i); List propStrs = new ArrayList<>(); - Map props = segments.get(0).getRelationShip().properties(); + Map props = relationship.properties(); for (String key : props.keySet()) { - propStrs.add(key + ":" + props.get(key).toString()); + propStrs.add(key + ": " + props.get(key).toString()); } - if (segments.get(0).getStartNode() == startNode) { + Step step = path.steps.get(i); + Node node = new Node(step.dst); + if (step.type > 0) { edgeStrs.add(String.format("-[:%s@%d{%s}]->%s", - segments.get(0).getRelationShip().edgeName(), - segments.get(0).getRelationShip().ranking(), + relationship.edgeName(), + relationship.ranking(), String.join(", ", propStrs), - segments.get(0).getEndNode().toString())); + node.toString())); } else { edgeStrs.add(String.format("<-[:%s@%d{%s}]-%s", - segments.get(0).getRelationShip().edgeName(), - segments.get(0).getRelationShip().ranking(), + relationship.edgeName(), + relationship.ranking(), String.join(", ", propStrs), - segments.get(0).getStartNode().toString())); - } - - } - - for (int i = 1; i < segments.size(); i++) { - List propStrs = new ArrayList<>(); - Map props = segments.get(0).getRelationShip().properties(); - for (String key : props.keySet()) { - propStrs.add(key + ":" + props.get(key).toString()); - } - if (segments.get(i).getStartNode() == segments.get(i - 1).getStartNode()) { - edgeStrs.add(String.format("-[:%s@%d{%s}]->%s", - segments.get(i).getRelationShip().edgeName(), - segments.get(i).getRelationShip().ranking(), - String.join(", ", propStrs), - segments.get(i).getEndNode().toString())); - } else { - edgeStrs.add(String.format("<-[:%s@%d{%s}]-%s", - segments.get(i).getRelationShip().edgeName(), - segments.get(i).getRelationShip().ranking(), - String.join(", ", propStrs), - segments.get(i).getStartNode().toString())); + node.toString())); } } return String.format("%s%s", diff --git a/client/src/main/java/com/vesoft/nebula/client/graph/data/ResultSet.java b/client/src/main/java/com/vesoft/nebula/client/graph/data/ResultSet.java index 7b79f1818..fb90560fb 100644 --- a/client/src/main/java/com/vesoft/nebula/client/graph/data/ResultSet.java +++ b/client/src/main/java/com/vesoft/nebula/client/graph/data/ResultSet.java @@ -20,7 +20,7 @@ public class ResultSet { private final ExecutionResponse response; - private List columnNames; + private List columnNames = new ArrayList<>(); private final String decodeType = "utf-8"; public static class Record implements Iterable { @@ -131,7 +131,6 @@ public ResultSet(ExecutionResponse resp) { } this.response = resp; if (resp.data != null) { - this.columnNames = Lists.newArrayListWithCapacity(resp.data.column_names.size()); // space name's charset is 'utf-8' for (byte[] column : resp.data.column_names) { this.columnNames.add(new String(column)); @@ -290,6 +289,10 @@ public List getRows() { @Override public String toString() { + // When error, print the raw data directly + if (!isSucceeded()) { + return response.toString(); + } int i = 0; List rowStrs = new ArrayList<>(); while (i < rowsSize()) { diff --git a/client/src/main/java/com/vesoft/nebula/client/graph/exception/IOErrorException.java b/client/src/main/java/com/vesoft/nebula/client/graph/exception/IOErrorException.java index f0f0b01ad..6231b5435 100644 --- a/client/src/main/java/com/vesoft/nebula/client/graph/exception/IOErrorException.java +++ b/client/src/main/java/com/vesoft/nebula/client/graph/exception/IOErrorException.java @@ -16,6 +16,10 @@ public class IOErrorException extends java.lang.Exception { public static final int E_CONNECT_BROKEN = 2; + public static final int E_TIME_OUT = 4; + + public static final int E_NO_OPEN = 5; + private int type = E_UNKNOWN; public IOErrorException(int errorType, String message) { diff --git a/client/src/main/java/com/vesoft/nebula/client/graph/exception/InvalidSessionException.java b/client/src/main/java/com/vesoft/nebula/client/graph/exception/InvalidSessionException.java new file mode 100644 index 000000000..194e76dc8 --- /dev/null +++ b/client/src/main/java/com/vesoft/nebula/client/graph/exception/InvalidSessionException.java @@ -0,0 +1,13 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package com.vesoft.nebula.client.graph.exception; + +public class InvalidSessionException extends RuntimeException { + public InvalidSessionException() { + super("The session was released, could not use again."); + } +} diff --git a/client/src/main/java/com/vesoft/nebula/client/graph/net/ConnObjectPool.java b/client/src/main/java/com/vesoft/nebula/client/graph/net/ConnObjectPool.java index 2ab7aeace..ac043fa2d 100644 --- a/client/src/main/java/com/vesoft/nebula/client/graph/net/ConnObjectPool.java +++ b/client/src/main/java/com/vesoft/nebula/client/graph/net/ConnObjectPool.java @@ -64,6 +64,17 @@ public boolean validateObject(PooledObject p) { return true; } + @Override + public void activateObject(PooledObject p) throws Exception { + if (p.getObject() == null) { + throw new RuntimeException("The connection is null."); + } + if (!p.getObject().ping()) { + throw new RuntimeException("The connection is broken."); + } + super.activateObject(p); + } + public boolean init() { return loadBalancer.isServersOK(); } diff --git a/client/src/main/java/com/vesoft/nebula/client/graph/net/Connection.java b/client/src/main/java/com/vesoft/nebula/client/graph/net/Connection.java index 9b79d37d3..71693618d 100644 --- a/client/src/main/java/com/vesoft/nebula/client/graph/net/Connection.java +++ b/client/src/main/java/com/vesoft/nebula/client/graph/net/Connection.java @@ -12,6 +12,8 @@ public HostAddress getServerAddress() { public abstract void open(HostAddress address, int timeout) throws IOErrorException; + protected abstract void reopen() throws IOErrorException; + public abstract void close(); public abstract boolean ping(); diff --git a/client/src/main/java/com/vesoft/nebula/client/graph/net/NebulaPool.java b/client/src/main/java/com/vesoft/nebula/client/graph/net/NebulaPool.java index 3d849598d..72d28029d 100644 --- a/client/src/main/java/com/vesoft/nebula/client/graph/net/NebulaPool.java +++ b/client/src/main/java/com/vesoft/nebula/client/graph/net/NebulaPool.java @@ -27,7 +27,7 @@ public class NebulaPool { private LoadBalancer loadBalancer; private final Logger log = LoggerFactory.getLogger(this.getClass()); // the wait time to get idle connection, unit ms - private final int waitTime = 60 * 1000; + private final int waitTime = 0; private List hostToIp(List addresses) throws UnknownHostException { @@ -62,7 +62,7 @@ private void checkConfig(NebulaPoolConfig config) { } public boolean init(List addresses, NebulaPoolConfig config) - throws UnknownHostException, InvalidConfigException { + throws UnknownHostException, InvalidConfigException { checkConfig(config); List newAddrs = hostToIp(addresses); this.loadBalancer = new RoundRobinLoadBalancer(newAddrs, config.getTimeout()); @@ -88,31 +88,18 @@ public void close() { public Session getSession(String userName, String password, boolean reconnect) throws NotValidConnectionException, IOErrorException, AuthFailedException { + SyncConnection connection = null; try { - // If no idle connection, try once - int retry = getIdleConnNum() == 0 ? 1 : getIdleConnNum(); - SyncConnection connection = null; - while (retry-- > 0) { - connection = objectPool.borrowObject(waitTime); - if (connection == null || !connection.ping()) { - continue; - } - break; - } - if (connection == null) { - throw new NotValidConnectionException("Get connection object failed."); - } - log.info(String.format("Get connection to %s:%d", - connection.getServerAddress().getHost(), - connection.getServerAddress().getPort())); + connection = getConnection(); long sessionID = connection.authenticate(userName, password); - return new Session(connection, sessionID, this.objectPool, reconnect); - } catch (NotValidConnectionException | AuthFailedException | IOErrorException e) { + return new Session(connection, sessionID, this, reconnect); + } catch (AuthFailedException | IOErrorException e) { + // if get the connection succeeded, but authenticate failed, + // needs to return connection to pool + if (connection != null) { + setInvalidateConnection(connection); + } throw e; - } catch (IllegalStateException e) { - throw new NotValidConnectionException(e.getMessage()); - } catch (Exception e) { - throw new IOErrorException(IOErrorException.E_UNKNOWN, e.getMessage()); } } @@ -133,4 +120,24 @@ public void updateServerStatus() { ((ConnObjectPool)objectPool.getFactory()).updateServerStatus(); } } + + protected void setInvalidateConnection(SyncConnection connection) { + try { + objectPool.invalidateObject(connection); + } catch (Exception e) { + log.error("Set invalidate object failed"); + } + } + + protected void returnConnection(SyncConnection connection) { + objectPool.returnObject(connection); + } + + protected SyncConnection getConnection() throws NotValidConnectionException { + try { + return objectPool.borrowObject(waitTime); + } catch (Exception e) { + throw new NotValidConnectionException(e.getMessage()); + } + } } diff --git a/client/src/main/java/com/vesoft/nebula/client/graph/net/Session.java b/client/src/main/java/com/vesoft/nebula/client/graph/net/Session.java index 76884f69b..952cab20f 100644 --- a/client/src/main/java/com/vesoft/nebula/client/graph/net/Session.java +++ b/client/src/main/java/com/vesoft/nebula/client/graph/net/Session.java @@ -6,24 +6,25 @@ package com.vesoft.nebula.client.graph.net; +import com.vesoft.nebula.client.graph.data.HostAddress; import com.vesoft.nebula.client.graph.data.ResultSet; import com.vesoft.nebula.client.graph.exception.IOErrorException; import com.vesoft.nebula.graph.ExecutionResponse; -import java.io.UnsupportedEncodingException; -import org.apache.commons.pool2.impl.GenericObjectPool; +import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class Session { private final long sessionID; private SyncConnection connection; - private final GenericObjectPool pool; + private final NebulaPool pool; private final Boolean retryConnect; + private final AtomicBoolean connectionIsBroken = new AtomicBoolean(false); private final Logger log = LoggerFactory.getLogger(getClass()); public Session(SyncConnection connection, long sessionID, - GenericObjectPool connPool, + NebulaPool connPool, Boolean retryConnect) { this.connection = connection; this.sessionID = sessionID; @@ -37,7 +38,22 @@ public Session(SyncConnection connection, * @param stmt The query sentence. * @return The ResultSet. */ - public ResultSet execute(String stmt) throws IOErrorException, UnsupportedEncodingException { + public synchronized ResultSet execute(String stmt) throws IOErrorException { + if (connection == null) { + throw new IOErrorException(IOErrorException.E_CONNECT_BROKEN, + "The session was released, couldn't use again."); + } + + if (connectionIsBroken.get() && retryConnect) { + if (retryConnect()) { + ExecutionResponse resp = connection.execute(sessionID, stmt); + return new ResultSet(resp); + } else { + throw new IOErrorException(IOErrorException.E_ALL_BROKEN, + "All servers are broken."); + } + } + try { if (connection == null) { throw new IOErrorException(IOErrorException.E_CONNECT_BROKEN, @@ -47,15 +63,16 @@ public ResultSet execute(String stmt) throws IOErrorException, UnsupportedEncodi return new ResultSet(resp); } catch (IOErrorException ie) { if (ie.getType() == IOErrorException.E_CONNECT_BROKEN) { - if (pool.getFactory() instanceof ConnObjectPool) { - ((ConnObjectPool) pool.getFactory()).updateServerStatus(); - } + connectionIsBroken.set(true); + pool.updateServerStatus(); if (retryConnect) { if (retryConnect()) { + connectionIsBroken.set(false); ExecutionResponse resp = connection.execute(sessionID, stmt); return new ResultSet(resp); } else { + connectionIsBroken.set(true); throw new IOErrorException(IOErrorException.E_ALL_BROKEN, "All servers are broken."); } @@ -67,39 +84,44 @@ public ResultSet execute(String stmt) throws IOErrorException, UnsupportedEncodi private boolean retryConnect() { try { - try { - pool.invalidateObject(connection); - } catch (Exception e) { - log.error("Return object failed"); - } - SyncConnection newConn = pool.borrowObject(); + pool.setInvalidateConnection(connection); + SyncConnection newConn = pool.getConnection(); if (newConn == null) { log.error("Get connection object failed."); + return false; } connection = newConn; return true; } catch (Exception e) { + log.error("Reconnected failed: " + e); return false; } } - // Need server supported, v1.0 nebula-graph doesn't supported - public boolean ping() { + public synchronized boolean ping() { if (connection == null) { return false; } return connection.ping(); } - public void release() { + public synchronized void release() { if (connection == null) { return; } - connection.signout(sessionID); try { - pool.returnObject(connection); + connection.signout(sessionID); + pool.returnConnection(connection); } catch (Exception e) { - log.warn("Return object to pool failed."); + log.warn("Signout out failed or return object to pool failed:" + e.getMessage()); + } + connection = null; + } + + public synchronized HostAddress getGraphHost() { + if (connection == null) { + return null; } + return connection.getServerAddress(); } } diff --git a/client/src/main/java/com/vesoft/nebula/client/graph/net/SyncConnection.java b/client/src/main/java/com/vesoft/nebula/client/graph/net/SyncConnection.java index 5784978aa..99e643066 100644 --- a/client/src/main/java/com/vesoft/nebula/client/graph/net/SyncConnection.java +++ b/client/src/main/java/com/vesoft/nebula/client/graph/net/SyncConnection.java @@ -25,14 +25,15 @@ public class SyncConnection extends Connection { protected TTransport transport = null; protected TProtocol protocol = null; private GraphService.Client client = null; + private int timeout = 0; @Override public void open(HostAddress address, int timeout) throws IOErrorException { this.serverAddr = address; try { - int newTimeout = timeout <= 0 ? Integer.MAX_VALUE : timeout; + this.timeout = timeout <= 0 ? Integer.MAX_VALUE : timeout; this.transport = new TSocket( - address.getHost(), address.getPort(), newTimeout, newTimeout); + address.getHost(), address.getPort(), this.timeout, this.timeout); this.transport.open(); this.protocol = new TCompactProtocol(transport); client = new GraphService.Client(protocol); @@ -41,12 +42,33 @@ public void open(HostAddress address, int timeout) throws IOErrorException { } } + /** + * Because the code generated by Fbthrift does not handle the seqID, + * the message will be dislocation when the timeout occurs, + * resulting in unexpected response, + * so when the timeout occurs, + * the connection will be reopened to avoid the impact of the message. + * So if timeout happen, need to call reopen + * @throws IOErrorException if io problem happen + */ + @Override + protected void reopen() throws IOErrorException { + close(); + open(serverAddr, timeout); + } + public long authenticate(String user, String password) throws AuthFailedException, IOErrorException { try { AuthResponse resp = client.authenticate(user.getBytes(), password.getBytes()); if (resp.error_code != ErrorCode.SUCCEEDED) { - throw new AuthFailedException(new String(resp.error_msg).intern()); + if (resp.error_msg != null) { + throw new AuthFailedException(new String(resp.error_msg)); + } else { + throw new AuthFailedException( + "The error_msg is null, " + + "maybe the service not set or the response is disorder."); + } } return resp.session_id; } catch (TException e) { @@ -54,6 +76,12 @@ public long authenticate(String user, String password) TTransportException te = (TTransportException)e; if (te.getType() == TTransportException.END_OF_FILE) { throw new IOErrorException(IOErrorException.E_CONNECT_BROKEN, te.getMessage()); + } else if (te.getType() == TTransportException.TIMED_OUT + || te.getMessage().contains("Read timed out")) { + reopen(); + throw new IOErrorException(IOErrorException.E_TIME_OUT, te.getMessage()); + } else if (te.getType() == TTransportException.NOT_OPEN) { + throw new IOErrorException(IOErrorException.E_NO_OPEN, te.getMessage()); } } throw new AuthFailedException(String.format("Authenticate failed: %s", e.getMessage())); @@ -69,32 +97,29 @@ public ExecutionResponse execute(long sessionID, String stmt) TTransportException te = (TTransportException) e; if (te.getType() == TTransportException.END_OF_FILE) { throw new IOErrorException(IOErrorException.E_CONNECT_BROKEN, te.getMessage()); + } else if (te.getType() == TTransportException.NOT_OPEN) { + throw new IOErrorException(IOErrorException.E_NO_OPEN, te.getMessage()); + } else if (te.getType() == TTransportException.TIMED_OUT + || te.getMessage().contains("Read timed out")) { + reopen(); + throw new IOErrorException(IOErrorException.E_TIME_OUT, te.getMessage()); } } throw new IOErrorException(IOErrorException.E_UNKNOWN, e.getMessage()); } } - public void signout(long sessionId) { - try { - client.signout(sessionId); - } catch (TException e) { - this.close(); - } + public void signout(long sessionId) throws TException { + client.signout(sessionId); } @Override public boolean ping() { try { - client.execute(0, "YIELD 1;".getBytes()); - return true; - } catch (TException e) { - if (e instanceof TTransportException) { - TTransportException te = (TTransportException) e; - return te.getType() != TTransportException.END_OF_FILE - && te.getType() != TTransportException.NOT_OPEN; - } + execute(0, "YIELD 1;"); return true; + } catch (IOErrorException e) { + return false; } } diff --git a/client/src/main/java/com/vesoft/nebula/client/meta/MetaClient.java b/client/src/main/java/com/vesoft/nebula/client/meta/MetaClient.java index 52ac3cd96..cc86900c9 100644 --- a/client/src/main/java/com/vesoft/nebula/client/meta/MetaClient.java +++ b/client/src/main/java/com/vesoft/nebula/client/meta/MetaClient.java @@ -23,6 +23,7 @@ import com.vesoft.nebula.meta.GetTagReq; import com.vesoft.nebula.meta.GetTagResp; import com.vesoft.nebula.meta.HostItem; +import com.vesoft.nebula.meta.HostStatus; import com.vesoft.nebula.meta.IdName; import com.vesoft.nebula.meta.ListEdgesReq; import com.vesoft.nebula.meta.ListEdgesResp; @@ -309,7 +310,9 @@ public synchronized Set listHosts() { } Set hostAddrs = new HashSet<>(); for (HostItem hostItem : resp.hosts) { - hostAddrs.add(hostItem.getHostAddr()); + if (hostItem.getStatus() == HostStatus.ONLINE) { + hostAddrs.add(hostItem.getHostAddr()); + } } return hostAddrs; } diff --git a/client/src/main/java/com/vesoft/nebula/client/meta/MetaManager.java b/client/src/main/java/com/vesoft/nebula/client/meta/MetaManager.java index 52ba708ad..16be6c528 100644 --- a/client/src/main/java/com/vesoft/nebula/client/meta/MetaManager.java +++ b/client/src/main/java/com/vesoft/nebula/client/meta/MetaManager.java @@ -80,6 +80,7 @@ public static MetaManager getMetaManager(List address) throws TExce * close meta client */ public void close() { + metaManager = null; metaClient.close(); } @@ -98,16 +99,23 @@ private void fillMetaInfo() { spaceInfo.spaceItem = spaceItem; List tags = metaClient.getTags(spaceName); for (TagItem tag : tags) { - spaceInfo.tagItems.put(new String(tag.tag_name), tag); - spaceInfo.tagIdNames.put(tag.tag_id, new String(tag.tag_name)); + String tagName = new String(tag.tag_name); + if (!spaceInfo.tagItems.containsKey(tagName) + || spaceInfo.tagItems.get(tagName).getVersion() < tag.getVersion()) { + spaceInfo.tagItems.put(tagName, tag); + spaceInfo.tagIdNames.put(tag.tag_id, tagName); + } } List edges = metaClient.getEdges(spaceName); for (EdgeItem edge : edges) { - spaceInfo.edgeItems.put(new String(edge.edge_name), edge); - spaceInfo.edgeTypeNames.put(edge.edge_type, new String(edge.edge_name)); + String edgeName = new String(edge.edge_name); + if (!spaceInfo.edgeItems.containsKey(edgeName) + || spaceInfo.edgeItems.get(edgeName).getVersion() < edge.getVersion()) { + spaceInfo.edgeItems.put(edgeName, edge); + spaceInfo.edgeTypeNames.put(edge.edge_type, edgeName); + } } - Map> partsAlloc = metaClient.getPartsAlloc(spaceName); - spaceInfo.partsAlloc = partsAlloc; + spaceInfo.partsAlloc = metaClient.getPartsAlloc(spaceName); tempSpacesInfo.put(spaceName, spaceInfo); } try { diff --git a/client/src/main/java/com/vesoft/nebula/client/storage/StorageClient.java b/client/src/main/java/com/vesoft/nebula/client/storage/StorageClient.java index c782e85fd..f0a49ebb7 100644 --- a/client/src/main/java/com/vesoft/nebula/client/storage/StorageClient.java +++ b/client/src/main/java/com/vesoft/nebula/client/storage/StorageClient.java @@ -36,7 +36,7 @@ public class StorageClient { private int timeout = 10000; // ms /** - * @param ip the ip of metad server + * @param ip the ip of metad server * @param port the port of meted server */ public StorageClient(String ip, int port) { @@ -53,7 +53,7 @@ public StorageClient(List addresses) { /** * @param addresses the address of metad server - * @param timeout the timeout of scan vertex or edge + * @param timeout the timeout of scan vertex or edge */ public StorageClient(List addresses, int timeout) { this.connection = new GraphStorageConnection(); @@ -761,8 +761,12 @@ private ScanEdgeResultIterator doScanEdge(String spaceName, * release storage client */ public void close() { - pool.close(); - connection.close(); + if (pool != null) { + pool.close(); + } + if (connection != null) { + connection.close(); + } } diff --git a/client/src/main/java/com/vesoft/nebula/client/storage/scan/ScanEdgeResultIterator.java b/client/src/main/java/com/vesoft/nebula/client/storage/scan/ScanEdgeResultIterator.java index e6a7df0ce..f2cef809a 100644 --- a/client/src/main/java/com/vesoft/nebula/client/storage/scan/ScanEdgeResultIterator.java +++ b/client/src/main/java/com/vesoft/nebula/client/storage/scan/ScanEdgeResultIterator.java @@ -22,6 +22,8 @@ import java.util.List; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,6 +32,7 @@ public class ScanEdgeResultIterator extends ScanResultIterator { private static final Logger LOGGER = LoggerFactory.getLogger(ScanEdgeResultIterator.class); private final ScanEdgeRequest request; + private ExecutorService threadPool = null; private ScanEdgeResultIterator(MetaManager metaManager, StorageConnPool pool, @@ -62,6 +65,7 @@ public ScanEdgeResult next() throws Exception { CountDownLatch countDownLatch = new CountDownLatch(addresses.size()); AtomicInteger existSuccess = new AtomicInteger(0); + threadPool = Executors.newFixedThreadPool(addresses.size()); for (HostAddress addr : addresses) { threadPool.submit(() -> { ScanEdgeRequest partRequest = new ScanEdgeRequest(request); @@ -121,6 +125,7 @@ public ScanEdgeResult next() throws Exception { try { countDownLatch.await(); + threadPool.shutdown(); } catch (InterruptedException interruptedE) { LOGGER.error("scan interrupted:", interruptedE); throw interruptedE; diff --git a/client/src/main/java/com/vesoft/nebula/client/storage/scan/ScanResultIterator.java b/client/src/main/java/com/vesoft/nebula/client/storage/scan/ScanResultIterator.java index 46eae0210..17582c988 100644 --- a/client/src/main/java/com/vesoft/nebula/client/storage/scan/ScanResultIterator.java +++ b/client/src/main/java/com/vesoft/nebula/client/storage/scan/ScanResultIterator.java @@ -25,7 +25,6 @@ public class ScanResultIterator { protected boolean hasNext = true; protected final Map partCursor; - protected final ExecutorService threadPool; protected final MetaManager metaManager; protected final StorageConnPool pool; @@ -45,8 +44,6 @@ protected ScanResultIterator(MetaManager metaManager, StorageConnPool pool, this.spaceName = spaceName; this.labelName = labelName; this.partSuccess = partSuccess; - - this.threadPool = Executors.newFixedThreadPool(addresses.size()); this.partCursor = new HashMap<>(partScanQueue.size()); } diff --git a/client/src/main/java/com/vesoft/nebula/client/storage/scan/ScanVertexResultIterator.java b/client/src/main/java/com/vesoft/nebula/client/storage/scan/ScanVertexResultIterator.java index 22b70eb75..963c93c03 100644 --- a/client/src/main/java/com/vesoft/nebula/client/storage/scan/ScanVertexResultIterator.java +++ b/client/src/main/java/com/vesoft/nebula/client/storage/scan/ScanVertexResultIterator.java @@ -22,6 +22,8 @@ import java.util.List; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,6 +35,7 @@ public class ScanVertexResultIterator extends ScanResultIterator { private static final Logger LOGGER = LoggerFactory.getLogger(ScanVertexResultIterator.class); private final ScanVertexRequest request; + private ExecutorService threadPool = null; private ScanVertexResultIterator(MetaManager metaManager, StorageConnPool pool, @@ -64,6 +67,8 @@ public ScanVertexResult next() throws Exception { CountDownLatch countDownLatch = new CountDownLatch(addresses.size()); AtomicInteger existSuccess = new AtomicInteger(0); + threadPool = Executors.newFixedThreadPool(addresses.size()); + for (HostAddress addr : addresses) { threadPool.submit(() -> { ScanVertexRequest partRequest = new ScanVertexRequest(request); @@ -121,6 +126,7 @@ public ScanVertexResult next() throws Exception { try { countDownLatch.await(); + threadPool.shutdown(); } catch (InterruptedException interruptedE) { LOGGER.error("scan interrupted:", interruptedE); throw interruptedE; @@ -244,4 +250,4 @@ public ScanVertexResultIterator build() { partSuccess); } } -} \ No newline at end of file +} diff --git a/client/src/main/java/com/vesoft/nebula/encoder/RowWriterImpl.java b/client/src/main/java/com/vesoft/nebula/encoder/RowWriterImpl.java index 8e8a5cf39..1bcaed615 100644 --- a/client/src/main/java/com/vesoft/nebula/encoder/RowWriterImpl.java +++ b/client/src/main/java/com/vesoft/nebula/encoder/RowWriterImpl.java @@ -25,7 +25,7 @@ public class RowWriterImpl implements RowWriter { private long approxStrLen = 0; private ByteBuffer buf; private final List isSet; - private final List strList = new ArrayList<>(); + private final List strList = new ArrayList<>(); private ByteOrder byteOrder; static int[] andBits = {0x7F, 0xBF, 0xDF, 0xEF, 0xF7, 0xFB, 0xFD, 0xFE}; static int[] orBits = {0x80, 0x40, 0x20, 0x10, 0x08, 0x04, 0x02, 0x01}; @@ -82,7 +82,7 @@ public RowWriterImpl(SchemaProviderImpl schema, ByteOrder byteOrder) throws Runt if (numNullables > 0) { numNullBytes = ((numNullables - 1) >> 3) + 1; } - buf = ByteBuffer.allocate(headerLen + numNullBytes + schema.size()); + buf = ByteBuffer.allocate(headerLen + numNullBytes + schema.size() + Long.BYTES); buf.order(this.byteOrder); buf.put(header); if (ver > 0) { @@ -480,7 +480,7 @@ public void write(int index, byte[] v) { int offset = headerLen + numNullBytes + field.offset(); switch (field.type()) { case PropertyType.STRING: { - strList.add(new String(v)); + strList.add(v); outOfSpaceStr = true; approxStrLen += v.length; break; @@ -767,9 +767,9 @@ public ByteBuffer processOutOfSpace() { // Reserve enough space to avoid memory re-allocation // Copy the data except the strings - temp = temp.put(buf.array()); + temp = temp.put(buf.array(), 0, buf.array().length - Long.BYTES); - int strOffset = buf.array().length; + int strOffset = buf.array().length - Long.BYTES; // Now let's process all strings int strNum = 0; @@ -789,11 +789,11 @@ public ByteBuffer processOutOfSpace() { if (strNum >= strList.size()) { throw new RuntimeException("Wrong strNum: " + strNum); } - temp.put(strList.get(strNum).getBytes()); + temp.put(strList.get(strNum)); // Set the new offset and length temp.putInt(offset, strOffset); - int len = strList.get(strNum).length(); + int len = strList.get(strNum).length; temp.putInt(offset + Integer.BYTES, len); strOffset += len; } @@ -812,7 +812,7 @@ public void finish() { buf = processOutOfSpace(); } // Save the timestamp to the tail of buf - buf.putLong(getTimestamp()); + buf.putLong(buf.array().length - Long.BYTES, getTimestamp()); } private long getTimestamp() { diff --git a/client/src/test/java/com/vesoft/nebula/client/graph/data/TestDataFromServer.java b/client/src/test/java/com/vesoft/nebula/client/graph/data/TestDataFromServer.java index 8e41c9e2e..1641ed2ef 100644 --- a/client/src/test/java/com/vesoft/nebula/client/graph/data/TestDataFromServer.java +++ b/client/src/test/java/com/vesoft/nebula/client/graph/data/TestDataFromServer.java @@ -45,10 +45,11 @@ public class TestDataFromServer { public void setUp() throws Exception { NebulaPoolConfig nebulaPoolConfig = new NebulaPoolConfig(); nebulaPoolConfig.setMaxConnSize(1); - Assert.assertTrue(pool.init(Arrays.asList(new HostAddress("127.0.0.1", 9671)), + Assert.assertTrue(pool.init(Arrays.asList(new HostAddress("127.0.0.1", 9670)), nebulaPoolConfig)); session = pool.getSession("root", "nebula", true); - ResultSet resp = session.execute("CREATE SPACE IF NOT EXISTS test_data; " + ResultSet resp = session.execute("CREATE SPACE IF NOT EXISTS test_data" + + "(vid_type=fixed_string(8)); " + "USE test_data;" + "CREATE TAG IF NOT EXISTS person(name string, age int8, grade int16, " + "friends int32, book_num int64, birthday datetime, " @@ -333,4 +334,55 @@ public void testPath() { assert false; } } + + @Test + public void tesDataset() { + try { + ResultSet result = session.execute( + "CREATE TAG IF NOT EXISTS player(name string, age int);" + + "CREATE EDGE IF NOT EXISTS like(likeness int);"); + Assert.assertTrue(result.getErrorMessage(), result.isSucceeded()); + TimeUnit.SECONDS.sleep(6); + result = session.execute( + "INSERT VERTEX player(name, age) values \"a\":(\"a\", 1); " + + "INSERT VERTEX player(name, age) values \"b\":(\"b\", 2); " + + "INSERT VERTEX player(name, age) values \"c\":(\"c\", 3); " + + "INSERT VERTEX player(name, age) values \"d\":(\"d\", 4);" + + "INSERT VERTEX player(name, age) values \"f\":(\"f\", 5);" + + "INSERT VERTEX player(name, age) values \"g\":(\"g\", 6);" + + "INSERT EDGE like(likeness) values \"d\" -> \"a\":(10); " + + "INSERT EDGE like(likeness) values \"d\" -> \"c\":(10);" + + "INSERT EDGE like(likeness) values \"b\" -> \"a\":(10); " + + "INSERT EDGE like(likeness) values \"c\" -> \"b\":(10);" + + "INSERT EDGE like(likeness) values \"a\" -> \"f\":(10); " + + "INSERT EDGE like(likeness) values \"c\" -> \"f\":(10);" + + "INSERT EDGE like(likeness) values \"a\" -> \"g\":(10); " + + "INSERT EDGE like(likeness) values \"g\" -> \"c\":(10);"); + Assert.assertTrue(result.getErrorMessage(), result.isSucceeded()); + result = session.execute( + "FIND NOLOOP PATH FROM \"a\" TO \"c\" OVER like BIDIRECT UPTO 5 STEPS"); + Assert.assertTrue(result.getErrorMessage(), result.isSucceeded()); + Assert.assertEquals(4, result.rowsSize()); + String expectString = "ColumnName: [path], " + + "Rows: [(\"a\" )-[:like@0{}]->(\"g\" )-[:like@0{}]->(\"c\" ), " + + "(\"a\" )<-[:like@0{}]-(\"d\" )-[:like@0{}]->(\"c\" ), " + + "(\"a\" )<-[:like@0{}]-(\"b\" )<-[:like@0{}]-(\"c\" ), " + + "(\"a\" )-[:like@0{}]->(\"f\" )<-[:like@0{}]-(\"c\" )]"; + Assert.assertEquals(expectString, result.toString()); + } catch (IOErrorException | InterruptedException e) { + e.printStackTrace(); + assert false; + } + } + + @Test + public void testErrorResult() { + try { + ResultSet result = session.execute("FETCH PROP ON no_exist_tag \"nobody\""); + Assert.assertTrue(result.toString().contains("ExecutionResponse")); + } catch (IOErrorException e) { + e.printStackTrace(); + assert false; + } + } } diff --git a/client/src/test/java/com/vesoft/nebula/client/graph/net/TestConnectionPool.java b/client/src/test/java/com/vesoft/nebula/client/graph/net/TestConnectionPool.java index 8e428277a..ab16c1ff1 100644 --- a/client/src/test/java/com/vesoft/nebula/client/graph/net/TestConnectionPool.java +++ b/client/src/test/java/com/vesoft/nebula/client/graph/net/TestConnectionPool.java @@ -9,15 +9,16 @@ import com.vesoft.nebula.client.graph.NebulaPoolConfig; import com.vesoft.nebula.client.graph.data.HostAddress; import com.vesoft.nebula.client.graph.data.ResultSet; +import com.vesoft.nebula.client.graph.exception.IOErrorException; import com.vesoft.nebula.client.graph.exception.InvalidConfigException; import com.vesoft.nebula.client.graph.exception.NotValidConnectionException; import com.vesoft.nebula.graph.ErrorCode; import java.net.UnknownHostException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.List; import org.junit.Assert; +import org.junit.Ignore; import org.junit.Test; public class TestConnectionPool { @@ -120,13 +121,14 @@ public void testInitFailed() { @Test() public void testGetSession() { + NebulaPool pool = null; try { NebulaPoolConfig nebulaPoolConfig = new NebulaPoolConfig(); nebulaPoolConfig.setMinConnSize(2); nebulaPoolConfig.setMaxConnSize(4); List addresses = Collections.singletonList( new HostAddress("127.0.0.1", 9671)); - NebulaPool pool = new NebulaPool(); + pool = new NebulaPool(); assert pool.init(addresses, nebulaPoolConfig); int i = 0; List sessions = new ArrayList<>(); @@ -167,6 +169,10 @@ public void testGetSession() { } catch (Exception e) { e.printStackTrace(); assert (false); + } finally { + if (pool != null) { + pool.close(); + } } } @@ -180,7 +186,7 @@ public void testClose() { NebulaPool pool = new NebulaPool(); Assert.assertTrue(pool.init(addresses, nebulaPoolConfig)); pool.close(); - Session s = pool.getSession("root", "nebula", false); + pool.getSession("root", "nebula", false); assert (false); } catch (NotValidConnectionException e) { System.out.println("We expect must reach here: get session failed."); @@ -190,4 +196,61 @@ public void testClose() { assert (false); } } + + @Ignore("The test data no exists nba.") + @Test() + public void testExecuteTimeout() { + System.out.println("====== testExecuteTimeout ======"); + NebulaPool pool = null; + Session session = null; + try { + NebulaPoolConfig nebulaPoolConfig = new NebulaPoolConfig(); + nebulaPoolConfig.setMaxConnSize(1); + nebulaPoolConfig.setTimeout(1000); + List addresses = Collections.singletonList( + new HostAddress("127.0.0.1", 9669)); + pool = new NebulaPool(); + assert pool.init(addresses, nebulaPoolConfig); + session = pool.getSession("root", "nebula", false); + assert (session != null); + try { + session.execute( + "USE nba;GO 500 STEPS FROM \"Tim Duncan\" OVER like"); + assert false; + } catch (IOErrorException e) { + Assert.assertTrue(e.getMessage().contains("Read timed out")); + assert true; + } + + try { + ResultSet resultSet = session.execute("SHOW SPACES"); + Assert.assertTrue(resultSet.isSucceeded()); + Assert.assertTrue(resultSet.toString().contains("ColumnName: [Name]")); + assert true; + } catch (Exception e) { + e.printStackTrace(); + assert false; + } + session.release(); + try { + session = pool.getSession("root", "nebula", false); + ResultSet resultSet = session.execute("SHOW HOSTS"); + Assert.assertTrue(resultSet.isSucceeded()); + Assert.assertTrue(resultSet.toString().contains("ColumnName:")); + } catch (Exception e) { + e.printStackTrace(); + assert false; + } + } catch (Exception e) { + e.printStackTrace(); + assert false; + } finally { + if (session != null) { + session.release(); + } + if (pool != null) { + pool.close(); + } + } + } } diff --git a/client/src/test/java/com/vesoft/nebula/client/graph/net/TestSession.java b/client/src/test/java/com/vesoft/nebula/client/graph/net/TestSession.java index 94b86dd0b..5faa6b9fc 100644 --- a/client/src/test/java/com/vesoft/nebula/client/graph/net/TestSession.java +++ b/client/src/test/java/com/vesoft/nebula/client/graph/net/TestSession.java @@ -10,19 +10,96 @@ import com.vesoft.nebula.client.graph.data.HostAddress; import com.vesoft.nebula.client.graph.data.ResultSet; import com.vesoft.nebula.client.graph.exception.IOErrorException; +import com.vesoft.nebula.client.util.ProcessUtil; import com.vesoft.nebula.graph.ErrorCode; import java.util.Arrays; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.Assert; import org.junit.Test; public class TestSession { - @Test() - public void testReconnect() { + @Test + public void testMultiThreadUseTheSameSession() { + System.out.println("<==== testMultiThreadUseTheSameSession ====>"); + NebulaPool pool = new NebulaPool(); + try { + NebulaPoolConfig nebulaPoolConfig = new NebulaPoolConfig(); + nebulaPoolConfig.setMaxConnSize(1); + List addresses = Arrays.asList(new HostAddress("127.0.0.1", 9670)); + Assert.assertTrue(pool.init(addresses, nebulaPoolConfig)); + Session session = pool.getSession("root", "nebula", true); + ExecutorService executorService = Executors.newFixedThreadPool(10); + AtomicInteger failedCount = new AtomicInteger(0); + for (int i = 0; i < 10; i++) { + executorService.submit(() -> { + try { + session.execute("SHOW SPACES;"); + } catch (Exception e) { + failedCount.incrementAndGet(); + } + }); + } + executorService.awaitTermination(10, TimeUnit.SECONDS); + executorService.shutdown(); + assert failedCount.get() == 0; + } catch (Exception e) { + e.printStackTrace(); + Assert.assertFalse(e.getMessage(), false); + } finally { + pool.close(); + } + } + + @Test + public void testReconnectWithOneService() { + System.out.println("<==== testReconnectWithOneService ====>"); + NebulaPool pool = new NebulaPool(); + try { + NebulaPoolConfig nebulaPoolConfig = new NebulaPoolConfig(); + nebulaPoolConfig.setMaxConnSize(1); + List addresses = Arrays.asList( + new HostAddress("127.0.0.1", 9669)); + Assert.assertTrue(pool.init(addresses, nebulaPoolConfig)); + Session session = pool.getSession("root", "nebula", true); + session.release(); + + Runtime runtime = Runtime.getRuntime(); + runtime.exec("docker restart nebula-docker-compose_graphd0_1") + .waitFor(5, TimeUnit.SECONDS); + TimeUnit.SECONDS.sleep(10); + // the connections in pool are broken, test getSession can get right connection + session = pool.getSession("root", "nebula", true); + + // the connections in pool are broken, test execute can get right connection + runtime.exec("docker restart nebula-docker-compose_graphd0_1") + .waitFor(5, TimeUnit.SECONDS); + TimeUnit.SECONDS.sleep(10); + session.execute("SHOW SPACES"); + session.release(); + } catch (Exception e) { + e.printStackTrace(); + Assert.assertFalse(e.getMessage(), true); + } finally { + pool.close(); + } + } + + @Test + public void testReconnectWithMultiServices() { + System.out.println("<==== testReconnectWithMultiServices ====>"); Runtime runtime = Runtime.getRuntime(); NebulaPool pool = new NebulaPool(); try { + // make sure the graphd2_1 without any sessions + String cmd = "docker restart nebula-docker-compose_graphd2_1"; + Process p = runtime.exec(cmd); + p.waitFor(10, TimeUnit.SECONDS); + ProcessUtil.printProcessStatus(cmd, p); + NebulaPoolConfig nebulaPoolConfig = new NebulaPoolConfig(); nebulaPoolConfig.setMaxConnSize(6); List addresses = Arrays.asList( @@ -34,13 +111,16 @@ public void testReconnect() { // test ping Assert.assertTrue(session.ping()); - for (int i = 0; i < 10; i++) { if (i == 3) { - runtime.exec("docker stop nebula-docker-compose_graphd0_1") - .waitFor(5, TimeUnit.SECONDS); - runtime.exec("docker stop nebula-docker-compose_graphd1_1") - .waitFor(5, TimeUnit.SECONDS); + cmd = "docker stop nebula-docker-compose_graphd0_1"; + p = runtime.exec(cmd); + p.waitFor(5, TimeUnit.SECONDS); + ProcessUtil.printProcessStatus(cmd, p); + cmd = "docker stop nebula-docker-compose_graphd1_1"; + p = runtime.exec(cmd); + p.waitFor(5, TimeUnit.SECONDS); + ProcessUtil.printProcessStatus(cmd, p); } try { ResultSet resp = session.execute("SHOW SPACES"); @@ -56,8 +136,16 @@ public void testReconnect() { } session.release(); // test release then execute ngql - ResultSet result = session.execute("SHOW SPACES;"); - Assert.assertFalse(result.isSucceeded()); + try { + ResultSet result = session.execute("SHOW SPACES;"); + assert false; + } catch (IOErrorException e) { + Assert.assertTrue(e.getMessage().contains( + "The session was released, couldn't use again.")); + } catch (Exception e) { + e.printStackTrace(); + Assert.assertFalse(e.getMessage(),false); + } // get new session from the pool Session session1 = pool.getSession("root", "nebula", false); @@ -67,13 +155,14 @@ public void testReconnect() { } catch (Exception e) { e.printStackTrace(); - Assert.assertFalse(e.getMessage(),false); + Assert.assertFalse(e.getMessage(), false); } finally { try { runtime.exec("docker start nebula-docker-compose_graphd0_1") .waitFor(5, TimeUnit.SECONDS); runtime.exec("docker start nebula-docker-compose_graphd1_1") .waitFor(5, TimeUnit.SECONDS); + TimeUnit.SECONDS.sleep(5); } catch (Exception e) { e.printStackTrace(); } diff --git a/client/src/test/java/com/vesoft/nebula/client/meta/MockNebulaGraph.java b/client/src/test/java/com/vesoft/nebula/client/meta/MockNebulaGraph.java index 96334eee1..f95eb7b2b 100644 --- a/client/src/test/java/com/vesoft/nebula/client/meta/MockNebulaGraph.java +++ b/client/src/test/java/com/vesoft/nebula/client/meta/MockNebulaGraph.java @@ -18,14 +18,17 @@ import java.net.UnknownHostException; import java.util.Arrays; import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * two spaces: test1, test2, both have 2 parts * each space has one tag and one edge */ public class MockNebulaGraph { - public static void initGraph() { + private static final Logger LOGGER = LoggerFactory.getLogger(MockNebulaGraph.class); + public static void initGraph() { NebulaPoolConfig nebulaPoolConfig = new NebulaPoolConfig(); nebulaPoolConfig.setMaxConnSize(100); List addresses = Arrays.asList(new HostAddress("127.0.0.1", 9669), @@ -38,11 +41,11 @@ public static void initGraph() { ResultSet resp = session.execute(createSpace()); if (!resp.isSucceeded()) { + LOGGER.error(resp.getErrorMessage()); System.exit(1); } } catch (UnknownHostException | NotValidConnectionException - | IOErrorException | AuthFailedException - | UnsupportedEncodingException e) { + | IOErrorException | AuthFailedException e) { e.printStackTrace(); } finally { pool.close(); @@ -50,10 +53,50 @@ public static void initGraph() { } public static String createSpace() { - String exec = "CREATE SPACE IF NOT EXISTS testMeta(partition_num=10);" + String exec = "CREATE SPACE IF NOT EXISTS testMeta(partition_num=10, " + + "vid_type=fixed_string(8));" + "USE testMeta;" + "CREATE TAG IF NOT EXISTS person(name string, age int);" + "CREATE EDGE IF NOT EXISTS friend(likeness double);"; return exec; } + + public static void createMultiVersionTagAndEdge() { + NebulaPoolConfig nebulaPoolConfig = new NebulaPoolConfig(); + nebulaPoolConfig.setMaxConnSize(100); + List addresses = Arrays.asList(new HostAddress("127.0.0.1", 9669), + new HostAddress("127.0.0.1", 9670)); + NebulaPool pool = new NebulaPool(); + Session session = null; + try { + pool.init(addresses, nebulaPoolConfig); + session = pool.getSession("root", "nebula", true); + + String exec = "CREATE SPACE IF NOT EXISTS testMeta(partition_num=10, " + + "vid_type=fixed_string(10));" + + "USE testMeta;" + + "CREATE TAG IF NOT EXISTS player();" + + "CREATE EDGE IF NOT EXISTS couples()"; + ResultSet resp = session.execute(exec); + if (!resp.isSucceeded()) { + LOGGER.error(resp.getErrorMessage()); + System.exit(1); + } + Thread.sleep(10000); + String updateSchema = "USE testMeta;" + + "ALTER TAG player ADD(col1 string);" + + "ALTER EDGE couples ADD(col1 string)"; + ResultSet updateResp = session.execute(updateSchema); + if (!updateResp.isSucceeded()) { + if (!"Existed!".equals(updateResp.getErrorMessage())) { + LOGGER.error(resp.getErrorMessage()); + System.exit(1); + } + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + pool.close(); + } + } } diff --git a/client/src/test/java/com/vesoft/nebula/client/meta/TestMetaClient.java b/client/src/test/java/com/vesoft/nebula/client/meta/TestMetaClient.java index bbd400043..4a02c12d8 100644 --- a/client/src/test/java/com/vesoft/nebula/client/meta/TestMetaClient.java +++ b/client/src/test/java/com/vesoft/nebula/client/meta/TestMetaClient.java @@ -8,11 +8,14 @@ import com.facebook.thrift.TException; import com.vesoft.nebula.client.meta.exception.ExecuteFailedException; +import com.vesoft.nebula.client.util.ProcessUtil; import com.vesoft.nebula.meta.EdgeItem; import com.vesoft.nebula.meta.IdName; import com.vesoft.nebula.meta.TagItem; import java.util.List; +import java.util.concurrent.TimeUnit; import junit.framework.TestCase; +import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,7 +69,7 @@ public void testGetSpaces() { public void testGetTags() { try { List tags = metaClient.getTags("testMeta"); - assert (tags.size() == 1); + Assert.assertTrue(tags.size() >= 1); assert (metaClient.getTag("testMeta", "person") != null); } catch (TException | ExecuteFailedException e) { LOGGER.error(e.getMessage()); @@ -77,7 +80,7 @@ public void testGetTags() { public void testGetEdges() { try { List edges = metaClient.getEdges("testMeta"); - assert (edges.size() == 1); + Assert.assertTrue(edges.size() >= 1); assert (metaClient.getEdge("testMeta", "friend") != null); } catch (TException | ExecuteFailedException e) { LOGGER.error(e.getMessage()); @@ -98,6 +101,34 @@ public void testListHosts() { if (metaClient == null) { metaClient = new MetaClient(address, port); } - assert (metaClient.listHosts().size() == 3); + Assert.assertEquals(3, metaClient.listHosts().size()); + } + + public void testListOnlineHosts() { + // stop one storage server + String cmd = "docker stop nebula-docker-compose_storaged0_1"; + Runtime runtime = Runtime.getRuntime(); + try { + Process p = runtime.exec(cmd); + p.waitFor(5, TimeUnit.SECONDS); + ProcessUtil.printProcessStatus(cmd, p); + Thread.sleep(5000); // wait to update the storaged's status to OFFLINE + } catch (Exception e) { + LOGGER.error("stop docker service error, ", e); + assert (false); + } + if (metaClient == null) { + metaClient = new MetaClient(address, port); + } + assert (metaClient.listHosts().size() == 2); + + try { + runtime.exec("docker start nebula-docker-compose_storaged0_1") + .waitFor(5, TimeUnit.SECONDS); + Thread.sleep(5000); + } catch (Exception e) { + LOGGER.error("start docker service error,", e); + assert (false); + } } } diff --git a/client/src/test/java/com/vesoft/nebula/client/meta/TestMetaManager.java b/client/src/test/java/com/vesoft/nebula/client/meta/TestMetaManager.java index ae5b42dab..c2ac211de 100644 --- a/client/src/test/java/com/vesoft/nebula/client/meta/TestMetaManager.java +++ b/client/src/test/java/com/vesoft/nebula/client/meta/TestMetaManager.java @@ -6,6 +6,7 @@ package com.vesoft.nebula.client.meta; +import com.facebook.thrift.TException; import com.vesoft.nebula.HostAddr; import com.vesoft.nebula.client.graph.data.HostAddress; import com.vesoft.nebula.meta.EdgeItem; @@ -23,7 +24,7 @@ public class TestMetaManager extends TestCase { public void setUp() throws Exception { MockNebulaGraph.initGraph(); metaManager = MetaManager.getMetaManager( - Collections.singletonList(new HostAddress("127.0.0.1", 9559))); + Collections.singletonList(new HostAddress("127.0.0.1", 9559))); } public void tearDown() { @@ -82,7 +83,7 @@ public void testGetEdge() { public void testGetSpaceParts() { assert (metaManager.getSpaceParts("testMeta").size() == 10); Assert.assertArrayEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).toArray(), - metaManager.getSpaceParts("testMeta").toArray()); + metaManager.getSpaceParts("testMeta").toArray()); // test get leader HostAddr hostAddr = metaManager.getLeader("testMeta", 1); @@ -96,4 +97,23 @@ public void testGetSpaceParts() { Assert.assertNotNull(hostAddr); Assert.assertEquals(hostAddr.port, 4400); } -} \ No newline at end of file + + public void testMultiVersionSchema() { + try { + MockNebulaGraph.createMultiVersionTagAndEdge(); + metaManager.close(); + metaManager = MetaManager.getMetaManager( + Collections.singletonList(new HostAddress("127.0.0.1", 9559))); + TagItem tagItem = metaManager.getTag("testMeta", "player"); + assert (tagItem.getVersion() == 1); + assert (tagItem.schema.getColumns().size() == 1); + + EdgeItem edgeItem = metaManager.getEdge("testMeta", "couples"); + assert (edgeItem.getVersion() == 1); + assert (edgeItem.schema.getColumns().size() == 1); + } catch (Exception e) { + e.printStackTrace(); + assert false; + } + } +} diff --git a/client/src/test/java/com/vesoft/nebula/client/storage/MockStorageData.java b/client/src/test/java/com/vesoft/nebula/client/storage/MockStorageData.java index d13ed9e35..a0dfe6c00 100644 --- a/client/src/test/java/com/vesoft/nebula/client/storage/MockStorageData.java +++ b/client/src/test/java/com/vesoft/nebula/client/storage/MockStorageData.java @@ -50,14 +50,14 @@ public static void initGraph() { assert (false); } } catch (UnknownHostException | NotValidConnectionException - | IOErrorException | AuthFailedException - | UnsupportedEncodingException e) { + | IOErrorException | AuthFailedException e) { e.printStackTrace(); } } public static String createSpace() { - String exec = "CREATE SPACE IF NOT EXISTS testStorage(partition_num=10);" + String exec = "CREATE SPACE IF NOT EXISTS testStorage(partition_num=10," + + "vid_type=fixed_string(8));" + "USE testStorage;" + "CREATE TAG IF NOT EXISTS person(name string, age int);" + "CREATE EDGE IF NOT EXISTS friend(likeness double);"; diff --git a/client/src/test/java/com/vesoft/nebula/client/util/ProcessUtil.java b/client/src/test/java/com/vesoft/nebula/client/util/ProcessUtil.java new file mode 100644 index 000000000..04f65e341 --- /dev/null +++ b/client/src/test/java/com/vesoft/nebula/client/util/ProcessUtil.java @@ -0,0 +1,28 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package com.vesoft.nebula.client.util; + +import java.io.BufferedReader; +import java.io.InputStreamReader; + +public class ProcessUtil { + public static void printProcessStatus(String cmd, Process p) { + try { + BufferedReader reader = new BufferedReader( + new InputStreamReader(p.getInputStream())); + + String line; + System.out.print(cmd + " output: "); + while ((line = reader.readLine()) != null) { + System.out.print(line); + } + System.out.print("\n"); + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/client/src/test/java/com/vesoft/nebula/encoder/MetaCacheImplTest.java b/client/src/test/java/com/vesoft/nebula/encoder/MetaCacheImplTest.java index fb75ff688..a69880be7 100644 --- a/client/src/test/java/com/vesoft/nebula/encoder/MetaCacheImplTest.java +++ b/client/src/test/java/com/vesoft/nebula/encoder/MetaCacheImplTest.java @@ -158,6 +158,18 @@ public Schema genEmptyString() { return new Schema(columns, null); } + public Schema genWithoutString() { + List columns = new ArrayList<>(); + ColumnDef columnDef = new ColumnDef(("Col01").getBytes(), + new ColumnTypeDef(PropertyType.INT64)); + columns.add(columnDef); + return new Schema(columns, null); + } + + public Schema genWithoutProp() { + return new Schema(new ArrayList<>(), null); + } + public MetaCacheImplTest() { spaceItem.space_id = 1; SpaceDesc spaceDesc = new SpaceDesc("test_space".getBytes(), @@ -190,6 +202,20 @@ public MetaCacheImplTest() { tagItem3.schema = genEmptyString(); this.tagItems.put(new String(tagItem3.tag_name), tagItem3); + TagItem tagItem4 = new TagItem(); + tagItem4.tag_name = "tag_without_string".getBytes(); + tagItem4.version = 7; + tagItem4.schema = genWithoutString(); + this.tagItems.put(new String(tagItem4.tag_name), tagItem4); + + TagItem tagItem5 = new TagItem(); + tagItem5.tag_name = "tag_without_property".getBytes(); + tagItem5.version = 7; + tagItem5.schema = genWithoutProp(); + this.tagItems.put(new String(tagItem5.tag_name), tagItem5); + + this.tagItems.put("person", createPersonTag()); + EdgeItem edgeItem1 = new EdgeItem(); edgeItem1.edge_name = "edge_no_default".getBytes(); edgeItem1.schema = genNoDefaultVal(); diff --git a/client/src/test/java/com/vesoft/nebula/encoder/TestEncoder.java b/client/src/test/java/com/vesoft/nebula/encoder/TestEncoder.java index 09394696d..72e4c34e1 100644 --- a/client/src/test/java/com/vesoft/nebula/encoder/TestEncoder.java +++ b/client/src/test/java/com/vesoft/nebula/encoder/TestEncoder.java @@ -18,6 +18,7 @@ import com.vesoft.nebula.meta.PropertyType; import com.vesoft.nebula.meta.SpaceItem; import com.vesoft.nebula.meta.TagItem; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -210,6 +211,8 @@ public void testEncodeVertexValue() { TagItem tagItem1 = cacheImplTest.getTag("test", "tag_no_default"); TagItem tagItem2 = cacheImplTest.getTag("test", "tag_with_empty_string"); TagItem tagItem3 = cacheImplTest.getTag("test", "tag_with_default"); + TagItem tagItem4 = cacheImplTest.getTag("test", "tag_without_string"); + TagItem tagItem5 = cacheImplTest.getTag("test", "tag_without_property"); try { codec.encodeTag(tagItem1, colNames, colVals); Assert.fail(); @@ -253,6 +256,45 @@ public void testEncodeVertexValue() { "Unsupported default value yet".getBytes()); assert (true); } + + // test without string type + try { + byte[] encodeStr = codec.encodeTag( + tagItem4, Arrays.asList("Col01"), Arrays.asList(1024)); + String hexStr = Hex.encodeHexString(encodeStr); + String expectResult = "09070004000000000000"; + Assert.assertArrayEquals(expectResult.getBytes(), + hexStr.substring(0, hexStr.length() - 16).getBytes()); + } catch (Exception exception) { + exception.printStackTrace(); + Assert.fail(exception.getMessage()); + } + + // test without empty property + try { + byte[] encodeStr = codec.encodeTag(tagItem5, new ArrayList<>(), new ArrayList<>()); + String hexStr = Hex.encodeHexString(encodeStr); + String expectResult = "0907"; + Assert.assertArrayEquals(expectResult.getBytes(), + hexStr.substring(0, hexStr.length() - 16).getBytes()); + } catch (Exception exception) { + exception.printStackTrace(); + Assert.fail(exception.getMessage()); + } + + // test with chinese value + try { + byte[] encodeStr = codec.encodeTag(tagItem2, + Collections.singletonList("Col01"), + Collections.singletonList("中国")); + String hexStr = Hex.encodeHexString(encodeStr); + String expectResult = "080900000006000000e4b8ade59bbd"; + Assert.assertArrayEquals(expectResult.getBytes(), + hexStr.substring(0, hexStr.length() - 16).getBytes()); + } catch (Exception exception) { + exception.printStackTrace(); + Assert.fail(exception.getMessage()); + } } @Test() diff --git a/client/src/test/resources/docker-compose.yaml b/client/src/test/resources/docker-compose.yaml index 1f112e072..19b9e58b2 100644 --- a/client/src/test/resources/docker-compose.yaml +++ b/client/src/test/resources/docker-compose.yaml @@ -1,7 +1,7 @@ version: '3.4' services: metad0: - image: vesoft/nebula-metad:v2-nightly + image: vesoft/nebula-metad:v2.0.1 environment: USER: root TZ: "${TZ}" @@ -15,6 +15,7 @@ services: - --v=0 - --minloglevel=0 - --heartbeat_interval_secs=2 + - --expired_time_factor=2 healthcheck: test: ["CMD", "curl", "-f", "http://172.28.1.1:11000/status"] interval: 30s @@ -36,7 +37,7 @@ services: - SYS_PTRACE metad1: - image: vesoft/nebula-metad:v2-nightly + image: vesoft/nebula-metad:v2.0.1 environment: USER: root TZ: "${TZ}" @@ -50,6 +51,7 @@ services: - --v=0 - --minloglevel=0 - --heartbeat_interval_secs=2 + - --expired_time_factor=2 healthcheck: test: ["CMD", "curl", "-f", "http://172.28.1.2:11000/status"] interval: 30s @@ -71,7 +73,7 @@ services: - SYS_PTRACE metad2: - image: vesoft/nebula-metad:v2-nightly + image: vesoft/nebula-metad:v2.0.1 environment: USER: root TZ: "${TZ}" @@ -85,6 +87,7 @@ services: - --v=0 - --minloglevel=0 - --heartbeat_interval_secs=2 + - --expired_time_factor=2 healthcheck: test: ["CMD", "curl", "-f", "http://172.28.1.3:11000/status"] interval: 30s @@ -106,7 +109,7 @@ services: - SYS_PTRACE storaged0: - image: vesoft/nebula-storaged:v2-nightly + image: vesoft/nebula-storaged:v2.0.1 environment: USER: root TZ: "${TZ}" @@ -145,7 +148,7 @@ services: - SYS_PTRACE storaged1: - image: vesoft/nebula-storaged:v2-nightly + image: vesoft/nebula-storaged:v2.0.1 environment: USER: root TZ: "${TZ}" @@ -184,7 +187,7 @@ services: - SYS_PTRACE storaged2: - image: vesoft/nebula-storaged:v2-nightly + image: vesoft/nebula-storaged:v2.0.1 environment: USER: root TZ: "${TZ}" @@ -223,7 +226,7 @@ services: - SYS_PTRACE graphd0: - image: vesoft/nebula-graphd:v2-nightly + image: vesoft/nebula-graphd:v2.0.1 environment: USER: root TZ: "${TZ}" @@ -259,7 +262,7 @@ services: - SYS_PTRACE graphd1: - image: vesoft/nebula-graphd:v2-nightly + image: vesoft/nebula-graphd:v2.0.1 environment: USER: root TZ: "${TZ}" @@ -295,7 +298,7 @@ services: - SYS_PTRACE graphd2: - image: vesoft/nebula-graphd:v2-nightly + image: vesoft/nebula-graphd:v2.0.1 environment: USER: root TZ: "${TZ}" @@ -335,4 +338,4 @@ networks: ipam: driver: default config: - - subnet: 172.28.0.0/16 \ No newline at end of file + - subnet: 172.28.0.0/16 diff --git a/examples/pom.xml b/examples/pom.xml index 040017a14..7cb473b3e 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -5,7 +5,7 @@ com.vesoft nebula - 2.0.0 + 2.0.1 4.0.0 diff --git a/examples/src/main/java/com/vesoft/nebula/examples/StorageClientExample.java b/examples/src/main/java/com/vesoft/nebula/examples/StorageClientExample.java index b82b232d4..39cab78c3 100644 --- a/examples/src/main/java/com/vesoft/nebula/examples/StorageClientExample.java +++ b/examples/src/main/java/com/vesoft/nebula/examples/StorageClientExample.java @@ -30,10 +30,13 @@ public static void main(String[] args) { client.connect(); } catch (Exception e) { LOGGER.error("storage client connect error, ", e); + client.close(); System.exit(1); } scanVertex(client); scanEdge(client); + + client.close(); } /** @@ -53,6 +56,7 @@ public static void scanVertex(StorageClient client) { result = iterator.next(); } catch (Exception e) { LOGGER.error("scan error, ", e); + client.close(); System.exit(1); } if (result.isEmpty()) { @@ -99,6 +103,7 @@ public static void scanEdge(StorageClient client) { result = iterator.next(); } catch (Exception e) { LOGGER.error("scan error, ", e); + client.close(); System.exit(1); } if (result.isEmpty()) { diff --git a/pom.xml b/pom.xml index 0e05ecd3d..ff36ee700 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ com.vesoft nebula pom - 2.0.0 + 2.0.1 UTF-8 @@ -43,37 +43,100 @@ - client examples + + + + deploy + + + + + org.apache.maven.plugins + maven-source-plugin + 3.2.0 + + + attach-sources + + jar-no-fork + + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + 3.2.0 + + + attach-javadocs + + jar + + + + + + + + org.apache.maven.plugins + maven-gpg-plugin + 1.6 + + + sign-artifacts + verify + + sign + + + + + --pinentry-mode + loopback + + + + + + + + + + + - release - https://oss.sonatype.org/service/local/staging/deploy/maven2/ + ossrh + Nexus Release Repository + https://oss.sonatype.org/service/local/staging/deploy/maven2 - snapshots - https://oss.sonatype.org/content/repositories/snapshots/ + ossrh + Nexus Snapshot Repository + https://oss.sonatype.org/content/repositories/snapshots + - org.apache.maven.plugins - maven-gpg-plugin - 1.6 - - - verify - - sign - - - + org.sonatype.plugins + nexus-staging-maven-plugin + 1.6.8 + true + + ossrh + https://oss.sonatype.org/ + false +