diff --git a/client/src/main/java/com/vesoft/nebula/client/graph/net/NebulaPool.java b/client/src/main/java/com/vesoft/nebula/client/graph/net/NebulaPool.java index b5c6cd567..02bbbdd3e 100644 --- a/client/src/main/java/com/vesoft/nebula/client/graph/net/NebulaPool.java +++ b/client/src/main/java/com/vesoft/nebula/client/graph/net/NebulaPool.java @@ -132,7 +132,7 @@ public Session getSession(String userName, String password, boolean reconnect) SyncConnection connection = null; try { connection = getConnection(); - AuthResult authResult = connection.authenticate(userName, password); + AuthResult authResult = connection.authenticate(userName, password,Integer.MAX_VALUE); return new Session(connection, authResult, this, reconnect); } catch (Exception e) { // if get the connection succeeded, but authenticate failed, diff --git a/client/src/main/java/com/vesoft/nebula/client/graph/net/Session.java b/client/src/main/java/com/vesoft/nebula/client/graph/net/Session.java index 35709d558..bc6a92ecc 100644 --- a/client/src/main/java/com/vesoft/nebula/client/graph/net/Session.java +++ b/client/src/main/java/com/vesoft/nebula/client/graph/net/Session.java @@ -61,23 +61,36 @@ public Session(SyncConnection connection, * @return The ResultSet */ public synchronized ResultSet execute(String stmt) throws IOErrorException { + return execute(stmt,0); + } + + + /** + * Execute the nGql sentence. + * + * @param stmt The nGql sentence. + * such as insert ngql `INSERT VERTEX person(name) VALUES "Tom":("Tom");` + * @param timeout temporary setting timeout + * @return The ResultSet + */ + public synchronized ResultSet execute(String stmt,int timeout) throws IOErrorException { if (connection == null) { throw new IOErrorException(IOErrorException.E_CONNECT_BROKEN, - "The session was released, couldn't use again."); + "The session was released, couldn't use again."); } if (connectionIsBroken.get() && retryConnect) { if (retryConnect()) { - ExecutionResponse resp = connection.execute(sessionID, stmt); + ExecutionResponse resp = connection.execute(sessionID, stmt,timeout); return new ResultSet(resp, timezoneOffset); } else { throw new IOErrorException(IOErrorException.E_ALL_BROKEN, - "All servers are broken."); + "All servers are broken."); } } try { - ExecutionResponse resp = connection.execute(sessionID, stmt); + ExecutionResponse resp = connection.execute(sessionID, stmt,timeout); return new ResultSet(resp, timezoneOffset); } catch (IOErrorException ie) { if (ie.getType() == IOErrorException.E_CONNECT_BROKEN) { @@ -87,7 +100,7 @@ public synchronized ResultSet execute(String stmt) throws IOErrorException { if (retryConnect) { if (retryConnect()) { connectionIsBroken.set(false); - ExecutionResponse resp = connection.execute(sessionID, stmt); + ExecutionResponse resp = connection.execute(sessionID, stmt,timeout); return new ResultSet(resp, timezoneOffset); } else { connectionIsBroken.set(true); @@ -100,6 +113,7 @@ public synchronized ResultSet execute(String stmt) throws IOErrorException { } } + /** * Check current connection is ok * diff --git a/client/src/main/java/com/vesoft/nebula/client/graph/net/SessionsManager.java b/client/src/main/java/com/vesoft/nebula/client/graph/net/SessionsManager.java index 059199209..2cd66bb7f 100644 --- a/client/src/main/java/com/vesoft/nebula/client/graph/net/SessionsManager.java +++ b/client/src/main/java/com/vesoft/nebula/client/graph/net/SessionsManager.java @@ -38,6 +38,14 @@ private void checkConfig() { } } + /** + * + * @return canUseBitSet size + */ + public synchronized int getCanUseSize(){ + return canUseBitSet.length(); + } + /** * getSessionWrapper: return a SessionWrapper from sessionManager, * the SessionWrapper couldn't use by multi-thread @@ -66,7 +74,7 @@ public synchronized SessionWrapper getSessionWrapper() throws RuntimeException { try { Session session = pool.getSession( config.getUserName(), config.getPassword(), config.getReconnect()); - ResultSet resultSet = session.execute("USE " + config.getSpaceName()); + ResultSet resultSet = session.execute("USE " + config.getSpaceName(),Integer.MAX_VALUE); if (!resultSet.isSucceeded()) { throw new RuntimeException( "Switch space `" diff --git a/client/src/main/java/com/vesoft/nebula/client/graph/net/SyncConnection.java b/client/src/main/java/com/vesoft/nebula/client/graph/net/SyncConnection.java index e1ccdebcd..cb6f36e0a 100644 --- a/client/src/main/java/com/vesoft/nebula/client/graph/net/SyncConnection.java +++ b/client/src/main/java/com/vesoft/nebula/client/graph/net/SyncConnection.java @@ -22,7 +22,7 @@ import com.vesoft.nebula.graph.GraphService; public class SyncConnection extends Connection { - protected TTransport transport = null; + protected TSocket transport = null; protected TProtocol protocol = null; private GraphService.Client client = null; private int timeout = 0; @@ -58,9 +58,21 @@ public void reopen() throws IOErrorException { open(serverAddr, timeout); } - public AuthResult authenticate(String user, String password) + /** + * + * @param user + * @param password + * @param timeout temporary setting timeout + * @return + * @throws AuthFailedException + * @throws IOErrorException + */ + public AuthResult authenticate(String user, String password,int timeout) throws AuthFailedException, IOErrorException { try { + if (timeout>0) { + transport.setTimeout(timeout); + } AuthResponse resp = client.authenticate(user.getBytes(), password.getBytes()); if (resp.error_code != ErrorCode.SUCCEEDED) { if (resp.error_msg != null) { @@ -86,12 +98,34 @@ public AuthResult authenticate(String user, String password) } } throw new AuthFailedException(String.format("Authenticate failed: %s", e.getMessage())); + }finally { + if (timeout>0) { + transport.setTimeout(this.timeout); + } } } - public ExecutionResponse execute(long sessionID, String stmt) + + public AuthResult authenticate(String user, String password) + throws AuthFailedException, IOErrorException { + return authenticate(user,password,0); + } + + + /** + * + * @param sessionID + * @param stmt + * @param timeout temporary setting timeout + * @return + * @throws IOErrorException + */ + public ExecutionResponse execute(long sessionID, String stmt,int timeout) throws IOErrorException { try { + if (timeout>0) { + transport.setTimeout(timeout); + } return client.execute(sessionID, stmt.getBytes()); } catch (TException e) { if (e instanceof TTransportException) { @@ -101,15 +135,25 @@ public ExecutionResponse execute(long sessionID, String stmt) } else if (te.getType() == TTransportException.NOT_OPEN) { throw new IOErrorException(IOErrorException.E_NO_OPEN, te.getMessage()); } else if (te.getType() == TTransportException.TIMED_OUT - || te.getMessage().contains("Read timed out")) { + || te.getMessage().contains("Read timed out")) { reopen(); throw new IOErrorException(IOErrorException.E_TIME_OUT, te.getMessage()); } } throw new IOErrorException(IOErrorException.E_UNKNOWN, e.getMessage()); + }finally { + if (timeout>0) { + transport.setTimeout(this.timeout); + } } } + + public ExecutionResponse execute(long sessionID, String stmt) + throws IOErrorException { + return execute(sessionID,stmt,0); + } + public void signout(long sessionId) { client.signout(sessionId); }