Skip to content

Commit

Permalink
format code
Browse files Browse the repository at this point in the history
  • Loading branch information
javaGitHub2022 committed Jan 8, 2024
1 parent f201c0d commit aac8c80
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 57 deletions.
8 changes: 5 additions & 3 deletions nebula3/gclient/net/Connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ def _reopen(self):
"""
self.close()
if self._ssl_conf is not None:
self.open_SSL(self._ip, self._port, self._timeout, self.handshakeKey, self._ssl_conf)
self.open_SSL(
self._ip, self._port, self._timeout, self.handshakeKey, self._ssl_conf
)
else:
self.open(self._ip, self._port, self._timeout, self.handshakeKey)

Expand Down Expand Up @@ -222,15 +224,15 @@ def close(self):
self._connection._iprot.trans.close()
except Exception as e:
logger.error(
'Close connection to {}:{} failed:{}'.format(self._ip, self._port, e)
"Close connection to {}:{} failed:{}".format(self._ip, self._port, e)
)

def ping(self):
"""check the connection if ok
:return: True or False
"""
try:
resp = self._connection.execute(0, 'YIELD 1;')
resp = self._connection.execute(0, "YIELD 1;")
return True
except Exception:
return False
Expand Down
49 changes: 29 additions & 20 deletions nebula3/gclient/net/ConnectionPool.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ def init(self, addresses, configs, ssl_conf=None):
:return: if all addresses are ok, return True else return False.
"""
if self._close:
logger.error('The pool has init or closed.')
raise RuntimeError('The pool has init or closed.')
logger.error("The pool has init or closed.")
raise RuntimeError("The pool has init or closed.")
self._configs = configs
self._ssl_configs = ssl_conf
for address in addresses:
Expand All @@ -73,7 +73,7 @@ def init(self, addresses, configs, ssl_conf=None):
ok_num = self.get_ok_servers_num()
if ok_num < len(self._addresses):
raise RuntimeError(
'The services status exception: {}'.format(self._get_services_status())
"The services status exception: {}".format(self._get_services_status())
)

conns_per_address = int(self._configs.min_connection_pool_size / ok_num)
Expand All @@ -82,7 +82,11 @@ def init(self, addresses, configs, ssl_conf=None):
for i in range(0, conns_per_address):
connection = Connection()
connection.open_SSL(
addr[0], addr[1], self._configs.timeout, configs.handshakeKey, self._ssl_configs
addr[0],
addr[1],
self._configs.timeout,
configs.handshakeKey,
self._ssl_configs,
)
self._connections[addr].append(connection)
return True
Expand Down Expand Up @@ -135,13 +139,13 @@ def get_connection(self):
"""
with self._lock:
if self._close:
logger.error('The pool is closed')
logger.error("The pool is closed")
raise NotValidConnectionException()

try:
ok_num = self.get_ok_servers_num()
if ok_num == 0:
logger.error('No available server')
logger.error("No available server")
return None
max_con_per_address = int(
self._configs.max_connection_pool_size / ok_num
Expand All @@ -159,7 +163,7 @@ def get_connection(self):
# ping to check the connection is valid
if connection.ping():
connection.is_used = True
logger.info('Get connection to {}'.format(addr))
logger.info("Get connection to {}".format(addr))
return connection
else:
invalid_connections.append(connection)
Expand All @@ -185,18 +189,18 @@ def get_connection(self):
)
connection.is_used = True
self._connections[addr].append(connection)
logger.info('Get connection to {}'.format(addr))
logger.info("Get connection to {}".format(addr))
return connection
else:
for connection in list(self._connections[addr]):
if not connection.is_used:
self._connections[addr].remove(connection)
try_count = try_count + 1

logger.error('No available connection')
logger.error("No available connection")
return None
except Exception as ex:
logger.error('Get connection failed: {}'.format(ex))
logger.error("Get connection failed: {}".format(ex))
return None

def ping(self, address):
Expand All @@ -207,12 +211,18 @@ def ping(self, address):
"""
try:
conn = Connection()
conn.open_SSL(address[0], address[1], 1000, self._configs.handshakeKey, self._ssl_configs)
conn.open_SSL(
address[0],
address[1],
1000,
self._configs.handshakeKey,
self._ssl_configs,
)
conn.close()
return True
except Exception as ex:
logger.warning(
'Connect {}:{} failed: {}'.format(address[0], address[1], ex)
"Connect {}:{} failed: {}".format(address[0], address[1], ex)
)
return False

Expand All @@ -225,7 +235,7 @@ def close(self):
for addr in self._connections.keys():
for connection in self._connections[addr]:
if connection.is_used:
logger.warning('Closing a connection that is in use')
logger.warning("Closing a connection that is in use")
connection.close()
self._close = True

Expand Down Expand Up @@ -267,11 +277,11 @@ def get_ok_servers_num(self):
def _get_services_status(self):
msg_list = []
for addr in self._addresses_status.keys():
status = 'OK'
status = "OK"
if self._addresses_status[addr] != self.S_OK:
status = 'BAD'
msg_list.append('[services: {}, status: {}]'.format(addr, status))
return ', '.join(msg_list)
status = "BAD"
msg_list.append("[services: {}, status: {}]".format(addr, status))
return ", ".join(msg_list)

def update_servers_status(self):
"""update the servers' status"""
Expand All @@ -291,18 +301,17 @@ def _remove_idle_unusable_connection(self):
if not connection.is_used:
if not connection.ping():
logger.debug(
'Remove the unusable connection to {}'.format(
"Remove the unusable connection to {}".format(
connection.get_address()
)
)
conns.remove(connection)
continue
if (
self._configs.idle_time != 0
and connection.idle_time() > self._configs.idle_time
):
logger.debug(
'Remove the idle connection to {}'.format(
"Remove the idle connection to {}".format(
connection.get_address()
)
)
Expand Down
68 changes: 34 additions & 34 deletions nebula3/gclient/net/SessionPool.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,12 @@ def init(self, configs):
try:
self._check_configs()
except Exception as e:
logger.error('Invalid configs: {}'.format(e))
logger.error("Invalid configs: {}".format(e))
return False

if self._close:
logger.error('The pool has init or closed.')
raise RuntimeError('The pool has init or closed.')
logger.error("The pool has init or closed.")
raise RuntimeError("The pool has init or closed.")
self._configs = configs

# ping all servers
Expand All @@ -101,14 +101,14 @@ def init(self, configs):
ok_num = self.get_ok_servers_num()
if ok_num < len(self._addresses):
raise RuntimeError(
'The services status exception: {}'.format(self._get_services_status())
"The services status exception: {}".format(self._get_services_status())
)

# iterate all addresses and create sessions to fullfil the min_size
for i in range(self._configs.min_size):
session = self._new_session()
if session is None:
raise RuntimeError('Get session failed')
raise RuntimeError("Get session failed")
self._add_session_to_idle(session)

return True
Expand All @@ -129,7 +129,7 @@ def ping(self, address):
return True
except Exception as ex:
logger.warning(
'Connect {}:{} failed: {}'.format(address[0], address[1], ex)
"Connect {}:{} failed: {}".format(address[0], address[1], ex)
)
return False

Expand All @@ -156,7 +156,7 @@ def execute_parameter(self, stmt, params):
"""
session = self._get_idle_session()
if session is None:
raise RuntimeError('Get session failed')
raise RuntimeError("Get session failed")
self._add_session_to_active(session)

try:
Expand All @@ -171,7 +171,7 @@ def execute_parameter(self, stmt, params):

return resp
except Exception as e:
logger.error('Execute failed: {}'.format(e))
logger.error("Execute failed: {}".format(e))
# remove the session from the pool if it is invalid
self._active_sessions.remove(session)
raise e
Expand Down Expand Up @@ -242,7 +242,7 @@ def execute_json(self, stmt):
def execute_json_with_parameter(self, stmt, params):
session = self._get_idle_session()
if session is None:
raise RuntimeError('Get session failed')
raise RuntimeError("Get session failed")
self._add_session_to_active(session)

try:
Expand All @@ -258,7 +258,7 @@ def execute_json_with_parameter(self, stmt, params):

return resp
except Exception as e:
logger.error('Execute failed: {}'.format(e))
logger.error("Execute failed: {}".format(e))
# remove the session from the pool if it is invalid
self._active_sessions.remove(session)
raise e
Expand Down Expand Up @@ -292,11 +292,11 @@ def get_ok_servers_num(self):
def _get_services_status(self):
msg_list = []
for addr in self._addresses_status.keys():
status = 'OK'
status = "OK"
if self._addresses_status[addr] != self.S_OK:
status = 'BAD'
msg_list.append('[services: {}, status: {}]'.format(addr, status))
return ', '.join(msg_list)
status = "BAD"
msg_list.append("[services: {}, status: {}]".format(addr, status))
return ", ".join(msg_list)

def update_servers_status(self):
"""update the servers' status"""
Expand Down Expand Up @@ -324,7 +324,7 @@ def _get_idle_session(self):
return self._new_session()
else:
raise NoValidSessionException(
'The total number of sessions reaches the pool max size {}'.format(
"The total number of sessions reaches the pool max size {}".format(
self._configs.max_size
)
)
Expand All @@ -336,7 +336,7 @@ def _new_session(self):
:return: Session
"""
if self._ssl_configs is not None:
raise RuntimeError('SSL is not supported yet')
raise RuntimeError("SSL is not supported yet")

self._pos = (self._pos + 1) % len(self._addresses)
next_addr_index = self._pos
Expand All @@ -349,7 +349,7 @@ def _new_session(self):

# if the address is bad, skip it
if self._addresses_status[addr] == self.S_BAD:
logger.warning('The graph service {} is not available'.format(addr))
logger.warning("The graph service {} is not available".format(addr))
retries = retries - 1
next_addr_index = (next_addr_index + 1) % len(self._addresses)
continue
Expand All @@ -362,21 +362,21 @@ def _new_session(self):
session = Session(connection, auth_result, self, False)

# switch to the space specified in the configs
resp = session.execute('USE {}'.format(self._space_name))
resp = session.execute("USE {}".format(self._space_name))
if not resp.is_succeeded():
raise RuntimeError(
'Failed to get session, cannot set the session space to {} error: {} {}'.format(
"Failed to get session, cannot set the session space to {} error: {} {}".format(
self._space_name, resp.error_code(), resp.error_msg()
)
)
return session
except AuthFailedException as e:
# if auth failed because of credentials, close the pool
if e.message.find("Invalid password") or e.message.find(
"User not exist"
"User not exist"
):
logger.error(
'Authentication failed, because of bad credentials, close the pool {}'.format(
"Authentication failed, because of bad credentials, close the pool {}".format(
e
)
)
Expand All @@ -386,7 +386,7 @@ def _new_session(self):
raise

raise RuntimeError(
'Failed to get a valid session, no graph service is available'
"Failed to get a valid session, no graph service is available"
)

def _return_session(self, session):
Expand Down Expand Up @@ -428,14 +428,14 @@ def _set_space_to_default(self, session):
:return: void
"""
try:
resp = session.execute('USE {}'.format(self._space_name))
resp = session.execute("USE {}".format(self._space_name))
if not resp.is_succeeded():
raise RuntimeError(
'Failed to set the session space to {}'.format(self._space_name)
"Failed to set the session space to {}".format(self._space_name)
)
except Exception:
logger.warning(
'Failed to set the session space to {}, the current session has been dropped'.format(
"Failed to set the session space to {}, the current session has been dropped".format(
self._space_name
)
)
Expand Down Expand Up @@ -474,23 +474,23 @@ def _period_detect(self):
def _check_configs(self):
"""validate the configs"""
if self._configs.min_size < 0:
raise RuntimeError('The min_size must be greater than 0')
raise RuntimeError("The min_size must be greater than 0")
if self._configs.max_size < 0:
raise RuntimeError('The max_size must be greater than 0')
raise RuntimeError("The max_size must be greater than 0")
if self._configs.min_size > self._configs.max_size:
raise RuntimeError(
'The min_size must be less than or equal to the max_size'
"The min_size must be less than or equal to the max_size"
)
if self._configs.idle_time < 0:
raise RuntimeError('The idle_time must be greater or equal to 0')
raise RuntimeError("The idle_time must be greater or equal to 0")
if self._configs.timeout < 0:
raise RuntimeError('The timeout must be greater or equal to 0')
raise RuntimeError("The timeout must be greater or equal to 0")

if self._space_name == "":
raise RuntimeError('The space_name must be set')
raise RuntimeError("The space_name must be set")
if self._username == "":
raise RuntimeError('The username must be set')
raise RuntimeError("The username must be set")
if self._password == "":
raise RuntimeError('The password must be set')
raise RuntimeError("The password must be set")
if self._addresses is None or len(self._addresses) == 0:
raise RuntimeError('The addresses must be set')
raise RuntimeError("The addresses must be set")

0 comments on commit aac8c80

Please sign in to comment.