From fd97f45e20d43f4c473fad2c013899ed57348f69 Mon Sep 17 00:00:00 2001 From: cpf Date: Fri, 17 Dec 2021 22:13:11 +0800 Subject: [PATCH 1/3] #387 --- .../nebula/client/graph/net/NebulaPool.java | 2 +- .../nebula/client/graph/net/Session.java | 24 ++++++++++--- .../client/graph/net/SessionsManager.java | 2 +- .../client/graph/net/SyncConnection.java | 36 +++++++++++++++---- 4 files changed, 51 insertions(+), 13 deletions(-) diff --git a/client/src/main/java/com/vesoft/nebula/client/graph/net/NebulaPool.java b/client/src/main/java/com/vesoft/nebula/client/graph/net/NebulaPool.java index ec4d419f1..5836239c8 100644 --- a/client/src/main/java/com/vesoft/nebula/client/graph/net/NebulaPool.java +++ b/client/src/main/java/com/vesoft/nebula/client/graph/net/NebulaPool.java @@ -135,7 +135,7 @@ public Session getSession(String userName, String password, boolean reconnect) SyncConnection connection = null; try { connection = getConnection(); - AuthResult authResult = connection.authenticate(userName, password); + AuthResult authResult = connection.authenticate(userName, password,Integer.MAX_VALUE); return new Session(connection, authResult, this, reconnect); } catch (Exception e) { // if get the connection succeeded, but authenticate failed, diff --git a/client/src/main/java/com/vesoft/nebula/client/graph/net/Session.java b/client/src/main/java/com/vesoft/nebula/client/graph/net/Session.java index 940843617..bf4269874 100644 --- a/client/src/main/java/com/vesoft/nebula/client/graph/net/Session.java +++ b/client/src/main/java/com/vesoft/nebula/client/graph/net/Session.java @@ -61,23 +61,37 @@ public Session(SyncConnection connection, */ public synchronized ResultSet execute(String stmt) throws IOErrorException { + return execute(stmt,0); + } + + + /** + * Execute the nGql sentence. + * + * @param stmt The nGql sentence. + * such as insert ngql `INSERT VERTEX person(name) VALUES "Tom":("Tom");` + * @param timeout temporary set the timeout + * @return The ResultSet + */ + public synchronized ResultSet execute(String stmt,int timeout) throws + IOErrorException { if (connection == null) { throw new IOErrorException(IOErrorException.E_CONNECT_BROKEN, - "The session was released, couldn't use again."); + "The session was released, couldn't use again."); } if (connectionIsBroken.get() && retryConnect) { if (retryConnect()) { - ExecutionResponse resp = connection.execute(sessionID, stmt); + ExecutionResponse resp = connection.execute(sessionID, stmt,timeout); return new ResultSet(resp, timezoneOffset); } else { throw new IOErrorException(IOErrorException.E_ALL_BROKEN, - "All servers are broken."); + "All servers are broken."); } } try { - ExecutionResponse resp = connection.execute(sessionID, stmt); + ExecutionResponse resp = connection.execute(sessionID, stmt,timeout); return new ResultSet(resp, timezoneOffset); } catch (IOErrorException ie) { if (ie.getType() == IOErrorException.E_CONNECT_BROKEN) { @@ -87,7 +101,7 @@ public synchronized ResultSet execute(String stmt) throws if (retryConnect) { if (retryConnect()) { connectionIsBroken.set(false); - ExecutionResponse resp = connection.execute(sessionID, stmt); + ExecutionResponse resp = connection.execute(sessionID, stmt,timeout); return new ResultSet(resp, timezoneOffset); } else { connectionIsBroken.set(true); diff --git a/client/src/main/java/com/vesoft/nebula/client/graph/net/SessionsManager.java b/client/src/main/java/com/vesoft/nebula/client/graph/net/SessionsManager.java index 6fe4da3dc..4cf5353fe 100644 --- a/client/src/main/java/com/vesoft/nebula/client/graph/net/SessionsManager.java +++ b/client/src/main/java/com/vesoft/nebula/client/graph/net/SessionsManager.java @@ -67,7 +67,7 @@ public synchronized SessionWrapper getSessionWrapper() throws RuntimeException, try { Session session = pool.getSession( config.getUserName(), config.getPassword(), config.getReconnect()); - ResultSet resultSet = session.execute("USE " + config.getSpaceName()); + ResultSet resultSet = session.execute("USE " + config.getSpaceName(),Integer.MAX_VALUE); if (!resultSet.isSucceeded()) { throw new RuntimeException( "Switch space `" diff --git a/client/src/main/java/com/vesoft/nebula/client/graph/net/SyncConnection.java b/client/src/main/java/com/vesoft/nebula/client/graph/net/SyncConnection.java index 07e625aae..3f5f5e256 100644 --- a/client/src/main/java/com/vesoft/nebula/client/graph/net/SyncConnection.java +++ b/client/src/main/java/com/vesoft/nebula/client/graph/net/SyncConnection.java @@ -10,7 +10,6 @@ import com.facebook.thrift.protocol.TCompactProtocol; import com.facebook.thrift.protocol.TProtocol; import com.facebook.thrift.transport.TSocket; -import com.facebook.thrift.transport.TTransport; import com.facebook.thrift.transport.TTransportException; import com.facebook.thrift.utils.StandardCharsets; import com.google.common.base.Charsets; @@ -37,7 +36,7 @@ public class SyncConnection extends Connection { private static final Logger LOGGER = LoggerFactory.getLogger(SyncConnection.class); - protected TTransport transport = null; + protected TSocket transport = null; protected TProtocol protocol = null; private GraphService.Client client = null; private int timeout = 0; @@ -127,17 +126,26 @@ public void reopen() throws IOErrorException, ClientServerIncompatibleException } } + public AuthResult authenticate(String user, String password) throws AuthFailedException, IOErrorException, ClientServerIncompatibleException { + return authenticate(user,password,0); + } + + public AuthResult authenticate(String user, String password,int timeout) + throws AuthFailedException, IOErrorException, ClientServerIncompatibleException { try { + if (timeout>0) { + this.transport.setTimeout(timeout); + } AuthResponse resp = client.authenticate(user.getBytes(), password.getBytes()); if (resp.error_code != ErrorCode.SUCCEEDED) { if (resp.error_msg != null) { throw new AuthFailedException(new String(resp.error_msg)); } else { throw new AuthFailedException( - "The error_msg is null, " - + "maybe the service not set or the response is disorder."); + "The error_msg is null, " + + "maybe the service not set or the response is disorder."); } } return new AuthResult(resp.getSession_id(), resp.getTime_zone_offset_seconds()); @@ -147,7 +155,7 @@ public AuthResult authenticate(String user, String password) if (te.getType() == TTransportException.END_OF_FILE) { throw new IOErrorException(IOErrorException.E_CONNECT_BROKEN, te.getMessage()); } else if (te.getType() == TTransportException.TIMED_OUT - || te.getMessage().contains("Read timed out")) { + || te.getMessage().contains("Read timed out")) { reopen(); throw new IOErrorException(IOErrorException.E_TIME_OUT, te.getMessage()); } else if (te.getType() == TTransportException.NOT_OPEN) { @@ -155,12 +163,24 @@ public AuthResult authenticate(String user, String password) } } throw new AuthFailedException(String.format("Authenticate failed: %s", e.getMessage())); + } finally { + if (timeout>0) { + this.transport.setTimeout(this.timeout); + } } } public ExecutionResponse execute(long sessionID, String stmt) throws IOErrorException { + return execute(sessionID,stmt,0); + } + + public ExecutionResponse execute(long sessionID, String stmt,int timeout) + throws IOErrorException { try { + if (timeout>0) { + this.transport.setTimeout(timeout); + } return client.execute(sessionID, stmt.getBytes()); } catch (TException e) { if (e instanceof TTransportException) { @@ -170,7 +190,7 @@ public ExecutionResponse execute(long sessionID, String stmt) } else if (te.getType() == TTransportException.NOT_OPEN) { throw new IOErrorException(IOErrorException.E_NO_OPEN, te.getMessage()); } else if (te.getType() == TTransportException.TIMED_OUT - || te.getMessage().contains("Read timed out")) { + || te.getMessage().contains("Read timed out")) { try { reopen(); } catch (ClientServerIncompatibleException ex) { @@ -180,6 +200,10 @@ public ExecutionResponse execute(long sessionID, String stmt) } } throw new IOErrorException(IOErrorException.E_UNKNOWN, e.getMessage()); + } finally { + if (timeout>0) { + this.transport.setTimeout(this.timeout); + } } } From 486f4b41ac8483b792f5c10f7ae1f575890bedd8 Mon Sep 17 00:00:00 2001 From: cpf Date: Fri, 17 Dec 2021 22:29:43 +0800 Subject: [PATCH 2/3] #401 --- .../java/com/vesoft/nebula/client/graph/net/NebulaPool.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/client/src/main/java/com/vesoft/nebula/client/graph/net/NebulaPool.java b/client/src/main/java/com/vesoft/nebula/client/graph/net/NebulaPool.java index 5836239c8..adbabc549 100644 --- a/client/src/main/java/com/vesoft/nebula/client/graph/net/NebulaPool.java +++ b/client/src/main/java/com/vesoft/nebula/client/graph/net/NebulaPool.java @@ -94,6 +94,10 @@ public boolean init(List addresses, NebulaPoolConfig config) objConfig.setMinIdle(config.getMinConnSize()); objConfig.setMaxIdle(config.getMaxConnSize()); objConfig.setMaxTotal(config.getMaxConnSize()); + objConfig.setTestOnBorrow(true); + objConfig.setTestOnReturn(true); + objConfig.setTestWhileIdle(true); + objConfig.setTestOnCreate(true); objConfig.setTimeBetweenEvictionRunsMillis(config.getIntervalIdle() <= 0 ? BaseObjectPoolConfig.DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS : config.getIntervalIdle()); From be8c3013d15392d30ba3825c70d893ee271b3eee Mon Sep 17 00:00:00 2001 From: vchangpengfei <37330503+vchangpengfei@users.noreply.github.com> Date: Wed, 22 Dec 2021 11:10:03 +0800 Subject: [PATCH 3/3] Update SyncConnection.java fix --- .../vesoft/nebula/client/graph/net/SyncConnection.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/client/src/main/java/com/vesoft/nebula/client/graph/net/SyncConnection.java b/client/src/main/java/com/vesoft/nebula/client/graph/net/SyncConnection.java index 3f5f5e256..b609b59ca 100644 --- a/client/src/main/java/com/vesoft/nebula/client/graph/net/SyncConnection.java +++ b/client/src/main/java/com/vesoft/nebula/client/graph/net/SyncConnection.java @@ -135,7 +135,7 @@ public AuthResult authenticate(String user, String password) public AuthResult authenticate(String user, String password,int timeout) throws AuthFailedException, IOErrorException, ClientServerIncompatibleException { try { - if (timeout>0) { + if (timeout > 0) { this.transport.setTimeout(timeout); } AuthResponse resp = client.authenticate(user.getBytes(), password.getBytes()); @@ -164,7 +164,7 @@ public AuthResult authenticate(String user, String password,int timeout) } throw new AuthFailedException(String.format("Authenticate failed: %s", e.getMessage())); } finally { - if (timeout>0) { + if (timeout > 0) { this.transport.setTimeout(this.timeout); } } @@ -178,7 +178,7 @@ public ExecutionResponse execute(long sessionID, String stmt) public ExecutionResponse execute(long sessionID, String stmt,int timeout) throws IOErrorException { try { - if (timeout>0) { + if (timeout > 0) { this.transport.setTimeout(timeout); } return client.execute(sessionID, stmt.getBytes()); @@ -201,7 +201,7 @@ public ExecutionResponse execute(long sessionID, String stmt,int timeout) } throw new IOErrorException(IOErrorException.E_UNKNOWN, e.getMessage()); } finally { - if (timeout>0) { + if (timeout > 0) { this.transport.setTimeout(this.timeout); } }