Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#387 temporary set timeout #405

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ public boolean init(List<HostAddress> addresses, NebulaPoolConfig config)
objConfig.setMinIdle(config.getMinConnSize());
objConfig.setMaxIdle(config.getMaxConnSize());
objConfig.setMaxTotal(config.getMaxConnSize());
objConfig.setTestOnBorrow(true);
objConfig.setTestOnReturn(true);
objConfig.setTestWhileIdle(true);
objConfig.setTestOnCreate(true);
objConfig.setTimeBetweenEvictionRunsMillis(config.getIntervalIdle() <= 0
? BaseObjectPoolConfig.DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS
: config.getIntervalIdle());
Expand Down Expand Up @@ -135,7 +139,7 @@ public Session getSession(String userName, String password, boolean reconnect)
SyncConnection connection = null;
try {
connection = getConnection();
AuthResult authResult = connection.authenticate(userName, password);
AuthResult authResult = connection.authenticate(userName, password,Integer.MAX_VALUE);
return new Session(connection, authResult, this, reconnect);
} catch (Exception e) {
// if get the connection succeeded, but authenticate failed,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,23 +61,37 @@ public Session(SyncConnection connection,
*/
public synchronized ResultSet execute(String stmt) throws
IOErrorException {
return execute(stmt,0);
}


/**
* Execute the nGql sentence.
*
* @param stmt The nGql sentence.
* such as insert ngql `INSERT VERTEX person(name) VALUES "Tom":("Tom");`
* @param timeout temporary set the timeout
* @return The ResultSet
*/
public synchronized ResultSet execute(String stmt,int timeout) throws
IOErrorException {
if (connection == null) {
throw new IOErrorException(IOErrorException.E_CONNECT_BROKEN,
"The session was released, couldn't use again.");
"The session was released, couldn't use again.");
}

if (connectionIsBroken.get() && retryConnect) {
if (retryConnect()) {
ExecutionResponse resp = connection.execute(sessionID, stmt);
ExecutionResponse resp = connection.execute(sessionID, stmt,timeout);
return new ResultSet(resp, timezoneOffset);
} else {
throw new IOErrorException(IOErrorException.E_ALL_BROKEN,
"All servers are broken.");
"All servers are broken.");
}
}

try {
ExecutionResponse resp = connection.execute(sessionID, stmt);
ExecutionResponse resp = connection.execute(sessionID, stmt,timeout);
return new ResultSet(resp, timezoneOffset);
} catch (IOErrorException ie) {
if (ie.getType() == IOErrorException.E_CONNECT_BROKEN) {
Expand All @@ -87,7 +101,7 @@ public synchronized ResultSet execute(String stmt) throws
if (retryConnect) {
if (retryConnect()) {
connectionIsBroken.set(false);
ExecutionResponse resp = connection.execute(sessionID, stmt);
ExecutionResponse resp = connection.execute(sessionID, stmt,timeout);
return new ResultSet(resp, timezoneOffset);
} else {
connectionIsBroken.set(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public synchronized SessionWrapper getSessionWrapper() throws RuntimeException,
try {
Session session = pool.getSession(
config.getUserName(), config.getPassword(), config.getReconnect());
ResultSet resultSet = session.execute("USE " + config.getSpaceName());
ResultSet resultSet = session.execute("USE " + config.getSpaceName(),Integer.MAX_VALUE);
if (!resultSet.isSucceeded()) {
throw new RuntimeException(
"Switch space `"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import com.facebook.thrift.protocol.TCompactProtocol;
import com.facebook.thrift.protocol.TProtocol;
import com.facebook.thrift.transport.TSocket;
import com.facebook.thrift.transport.TTransport;
import com.facebook.thrift.transport.TTransportException;
import com.facebook.thrift.utils.StandardCharsets;
import com.google.common.base.Charsets;
Expand All @@ -37,7 +36,7 @@ public class SyncConnection extends Connection {

private static final Logger LOGGER = LoggerFactory.getLogger(SyncConnection.class);

protected TTransport transport = null;
protected TSocket transport = null;
protected TProtocol protocol = null;
private GraphService.Client client = null;
private int timeout = 0;
Expand Down Expand Up @@ -128,17 +127,26 @@ public void reopen() throws IOErrorException, ClientServerIncompatibleException
}
}


public AuthResult authenticate(String user, String password)
throws AuthFailedException, IOErrorException, ClientServerIncompatibleException {
return authenticate(user,password,0);
}

public AuthResult authenticate(String user, String password,int timeout)
throws AuthFailedException, IOErrorException, ClientServerIncompatibleException {
try {
if (timeout > 0) {
this.transport.setTimeout(timeout);
}
AuthResponse resp = client.authenticate(user.getBytes(), password.getBytes());
if (resp.error_code != ErrorCode.SUCCEEDED) {
if (resp.error_msg != null) {
throw new AuthFailedException(new String(resp.error_msg));
} else {
throw new AuthFailedException(
"The error_msg is null, "
+ "maybe the service not set or the response is disorder.");
"The error_msg is null, "
+ "maybe the service not set or the response is disorder.");
}
}
return new AuthResult(resp.getSession_id(), resp.getTime_zone_offset_seconds());
Expand All @@ -148,20 +156,32 @@ public AuthResult authenticate(String user, String password)
if (te.getType() == TTransportException.END_OF_FILE) {
throw new IOErrorException(IOErrorException.E_CONNECT_BROKEN, te.getMessage());
} else if (te.getType() == TTransportException.TIMED_OUT
|| te.getMessage().contains("Read timed out")) {
|| te.getMessage().contains("Read timed out")) {
reopen();
throw new IOErrorException(IOErrorException.E_TIME_OUT, te.getMessage());
} else if (te.getType() == TTransportException.NOT_OPEN) {
throw new IOErrorException(IOErrorException.E_NO_OPEN, te.getMessage());
}
}
throw new AuthFailedException(String.format("Authenticate failed: %s", e.getMessage()));
} finally {
if (timeout > 0) {
this.transport.setTimeout(this.timeout);
}
}
}

public ExecutionResponse execute(long sessionID, String stmt)
throws IOErrorException {
return execute(sessionID,stmt,0);
}

public ExecutionResponse execute(long sessionID, String stmt,int timeout)
throws IOErrorException {
try {
if (timeout > 0) {
this.transport.setTimeout(timeout);
}
return client.execute(sessionID, stmt.getBytes());
} catch (TException e) {
if (e instanceof TTransportException) {
Expand All @@ -171,7 +191,7 @@ public ExecutionResponse execute(long sessionID, String stmt)
} else if (te.getType() == TTransportException.NOT_OPEN) {
throw new IOErrorException(IOErrorException.E_NO_OPEN, te.getMessage());
} else if (te.getType() == TTransportException.TIMED_OUT
|| te.getMessage().contains("Read timed out")) {
|| te.getMessage().contains("Read timed out")) {
try {
reopen();
} catch (ClientServerIncompatibleException ex) {
Expand All @@ -181,6 +201,10 @@ public ExecutionResponse execute(long sessionID, String stmt)
}
}
throw new IOErrorException(IOErrorException.E_UNKNOWN, e.getMessage());
} finally {
if (timeout > 0) {
this.transport.setTimeout(this.timeout);
}
}
}

Expand Down