Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

https://github.com/vesoft-inc/nebula-java/issues/387 #389

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public Session getSession(String userName, String password, boolean reconnect)
SyncConnection connection = null;
try {
connection = getConnection();
AuthResult authResult = connection.authenticate(userName, password);
AuthResult authResult = connection.authenticate(userName, password,Integer.MAX_VALUE);
return new Session(connection, authResult, this, reconnect);
} catch (Exception e) {
// if get the connection succeeded, but authenticate failed,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,23 +61,36 @@ public Session(SyncConnection connection,
* @return The ResultSet
*/
public synchronized ResultSet execute(String stmt) throws IOErrorException {
return execute(stmt,0);
}


/**
* Execute the nGql sentence.
*
* @param stmt The nGql sentence.
* such as insert ngql `INSERT VERTEX person(name) VALUES "Tom":("Tom");`
* @param timeout temporary setting timeout
* @return The ResultSet
*/
public synchronized ResultSet execute(String stmt,int timeout) throws IOErrorException {
if (connection == null) {
throw new IOErrorException(IOErrorException.E_CONNECT_BROKEN,
"The session was released, couldn't use again.");
"The session was released, couldn't use again.");
}

if (connectionIsBroken.get() && retryConnect) {
if (retryConnect()) {
ExecutionResponse resp = connection.execute(sessionID, stmt);
ExecutionResponse resp = connection.execute(sessionID, stmt,timeout);
return new ResultSet(resp, timezoneOffset);
} else {
throw new IOErrorException(IOErrorException.E_ALL_BROKEN,
"All servers are broken.");
"All servers are broken.");
}
}

try {
ExecutionResponse resp = connection.execute(sessionID, stmt);
ExecutionResponse resp = connection.execute(sessionID, stmt,timeout);
return new ResultSet(resp, timezoneOffset);
} catch (IOErrorException ie) {
if (ie.getType() == IOErrorException.E_CONNECT_BROKEN) {
Expand All @@ -87,7 +100,7 @@ public synchronized ResultSet execute(String stmt) throws IOErrorException {
if (retryConnect) {
if (retryConnect()) {
connectionIsBroken.set(false);
ExecutionResponse resp = connection.execute(sessionID, stmt);
ExecutionResponse resp = connection.execute(sessionID, stmt,timeout);
return new ResultSet(resp, timezoneOffset);
} else {
connectionIsBroken.set(true);
Expand All @@ -100,6 +113,7 @@ public synchronized ResultSet execute(String stmt) throws IOErrorException {
}
}


/**
* Check current connection is ok
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ private void checkConfig() {
}
}

/**
*
* @return canUseBitSet size
*/
public synchronized int getCanUseSize(){
return canUseBitSet.length();
}

/**
* getSessionWrapper: return a SessionWrapper from sessionManager,
* the SessionWrapper couldn't use by multi-thread
Expand Down Expand Up @@ -66,7 +74,7 @@ public synchronized SessionWrapper getSessionWrapper() throws RuntimeException {
try {
Session session = pool.getSession(
config.getUserName(), config.getPassword(), config.getReconnect());
ResultSet resultSet = session.execute("USE " + config.getSpaceName());
ResultSet resultSet = session.execute("USE " + config.getSpaceName(),Integer.MAX_VALUE);
if (!resultSet.isSucceeded()) {
throw new RuntimeException(
"Switch space `"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import com.vesoft.nebula.graph.GraphService;

public class SyncConnection extends Connection {
protected TTransport transport = null;
protected TSocket transport = null;
protected TProtocol protocol = null;
private GraphService.Client client = null;
private int timeout = 0;
Expand Down Expand Up @@ -58,9 +58,21 @@ public void reopen() throws IOErrorException {
open(serverAddr, timeout);
}

public AuthResult authenticate(String user, String password)
/**
*
* @param user
* @param password
* @param timeout temporary setting timeout
* @return
* @throws AuthFailedException
* @throws IOErrorException
*/
public AuthResult authenticate(String user, String password,int timeout)
throws AuthFailedException, IOErrorException {
try {
if (timeout>0) {
transport.setTimeout(timeout);
}
AuthResponse resp = client.authenticate(user.getBytes(), password.getBytes());
if (resp.error_code != ErrorCode.SUCCEEDED) {
if (resp.error_msg != null) {
Expand All @@ -86,12 +98,34 @@ public AuthResult authenticate(String user, String password)
}
}
throw new AuthFailedException(String.format("Authenticate failed: %s", e.getMessage()));
}finally {
if (timeout>0) {
transport.setTimeout(this.timeout);
}
}
}

public ExecutionResponse execute(long sessionID, String stmt)

public AuthResult authenticate(String user, String password)
throws AuthFailedException, IOErrorException {
return authenticate(user,password,0);
}


/**
*
* @param sessionID
* @param stmt
* @param timeout temporary setting timeout
* @return
* @throws IOErrorException
*/
public ExecutionResponse execute(long sessionID, String stmt,int timeout)
throws IOErrorException {
try {
if (timeout>0) {
transport.setTimeout(timeout);
}
return client.execute(sessionID, stmt.getBytes());
} catch (TException e) {
if (e instanceof TTransportException) {
Expand All @@ -101,15 +135,25 @@ public ExecutionResponse execute(long sessionID, String stmt)
} else if (te.getType() == TTransportException.NOT_OPEN) {
throw new IOErrorException(IOErrorException.E_NO_OPEN, te.getMessage());
} else if (te.getType() == TTransportException.TIMED_OUT
|| te.getMessage().contains("Read timed out")) {
|| te.getMessage().contains("Read timed out")) {
reopen();
throw new IOErrorException(IOErrorException.E_TIME_OUT, te.getMessage());
}
}
throw new IOErrorException(IOErrorException.E_UNKNOWN, e.getMessage());
}finally {
if (timeout>0) {
transport.setTimeout(this.timeout);
}
}
}


public ExecutionResponse execute(long sessionID, String stmt)
throws IOErrorException {
return execute(sessionID,stmt,0);
}

public void signout(long sessionId) {
client.signout(sessionId);
}
Expand Down