diff --git a/nebula3/gclient/net/Connection.py b/nebula3/gclient/net/Connection.py index 222509cd..4f15867a 100644 --- a/nebula3/gclient/net/Connection.py +++ b/nebula3/gclient/net/Connection.py @@ -109,7 +109,9 @@ def _reopen(self): """ self.close() if self._ssl_conf is not None: - self.open_SSL(self._ip, self._port, self._timeout, self.handshakeKey, self._ssl_conf) + self.open_SSL( + self._ip, self._port, self._timeout, self.handshakeKey, self._ssl_conf + ) else: self.open(self._ip, self._port, self._timeout, self.handshakeKey) @@ -222,7 +224,7 @@ def close(self): self._connection._iprot.trans.close() except Exception as e: logger.error( - 'Close connection to {}:{} failed:{}'.format(self._ip, self._port, e) + "Close connection to {}:{} failed:{}".format(self._ip, self._port, e) ) def ping(self): @@ -230,7 +232,7 @@ def ping(self): :return: True or False """ try: - resp = self._connection.execute(0, 'YIELD 1;') + resp = self._connection.execute(0, "YIELD 1;") return True except Exception: return False diff --git a/nebula3/gclient/net/ConnectionPool.py b/nebula3/gclient/net/ConnectionPool.py index aa22d93a..db5dc06f 100644 --- a/nebula3/gclient/net/ConnectionPool.py +++ b/nebula3/gclient/net/ConnectionPool.py @@ -49,8 +49,8 @@ def init(self, addresses, configs, ssl_conf=None): :return: if all addresses are ok, return True else return False. """ if self._close: - logger.error('The pool has init or closed.') - raise RuntimeError('The pool has init or closed.') + logger.error("The pool has init or closed.") + raise RuntimeError("The pool has init or closed.") self._configs = configs self._ssl_configs = ssl_conf for address in addresses: @@ -73,7 +73,7 @@ def init(self, addresses, configs, ssl_conf=None): ok_num = self.get_ok_servers_num() if ok_num < len(self._addresses): raise RuntimeError( - 'The services status exception: {}'.format(self._get_services_status()) + "The services status exception: {}".format(self._get_services_status()) ) conns_per_address = int(self._configs.min_connection_pool_size / ok_num) @@ -82,7 +82,11 @@ def init(self, addresses, configs, ssl_conf=None): for i in range(0, conns_per_address): connection = Connection() connection.open_SSL( - addr[0], addr[1], self._configs.timeout, configs.handshakeKey, self._ssl_configs + addr[0], + addr[1], + self._configs.timeout, + configs.handshakeKey, + self._ssl_configs, ) self._connections[addr].append(connection) return True @@ -135,13 +139,13 @@ def get_connection(self): """ with self._lock: if self._close: - logger.error('The pool is closed') + logger.error("The pool is closed") raise NotValidConnectionException() try: ok_num = self.get_ok_servers_num() if ok_num == 0: - logger.error('No available server') + logger.error("No available server") return None max_con_per_address = int( self._configs.max_connection_pool_size / ok_num @@ -159,7 +163,7 @@ def get_connection(self): # ping to check the connection is valid if connection.ping(): connection.is_used = True - logger.info('Get connection to {}'.format(addr)) + logger.info("Get connection to {}".format(addr)) return connection else: invalid_connections.append(connection) @@ -185,7 +189,7 @@ def get_connection(self): ) connection.is_used = True self._connections[addr].append(connection) - logger.info('Get connection to {}'.format(addr)) + logger.info("Get connection to {}".format(addr)) return connection else: for connection in list(self._connections[addr]): @@ -193,10 +197,10 @@ def get_connection(self): self._connections[addr].remove(connection) try_count = try_count + 1 - logger.error('No available connection') + logger.error("No available connection") return None except Exception as ex: - logger.error('Get connection failed: {}'.format(ex)) + logger.error("Get connection failed: {}".format(ex)) return None def ping(self, address): @@ -207,12 +211,18 @@ def ping(self, address): """ try: conn = Connection() - conn.open_SSL(address[0], address[1], 1000, self._configs.handshakeKey, self._ssl_configs) + conn.open_SSL( + address[0], + address[1], + 1000, + self._configs.handshakeKey, + self._ssl_configs, + ) conn.close() return True except Exception as ex: logger.warning( - 'Connect {}:{} failed: {}'.format(address[0], address[1], ex) + "Connect {}:{} failed: {}".format(address[0], address[1], ex) ) return False @@ -225,7 +235,7 @@ def close(self): for addr in self._connections.keys(): for connection in self._connections[addr]: if connection.is_used: - logger.warning('Closing a connection that is in use') + logger.warning("Closing a connection that is in use") connection.close() self._close = True @@ -267,11 +277,11 @@ def get_ok_servers_num(self): def _get_services_status(self): msg_list = [] for addr in self._addresses_status.keys(): - status = 'OK' + status = "OK" if self._addresses_status[addr] != self.S_OK: - status = 'BAD' - msg_list.append('[services: {}, status: {}]'.format(addr, status)) - return ', '.join(msg_list) + status = "BAD" + msg_list.append("[services: {}, status: {}]".format(addr, status)) + return ", ".join(msg_list) def update_servers_status(self): """update the servers' status""" @@ -291,7 +301,7 @@ def _remove_idle_unusable_connection(self): if not connection.is_used: if not connection.ping(): logger.debug( - 'Remove the unusable connection to {}'.format( + "Remove the unusable connection to {}".format( connection.get_address() ) ) @@ -299,10 +309,9 @@ def _remove_idle_unusable_connection(self): continue if ( self._configs.idle_time != 0 - and connection.idle_time() > self._configs.idle_time ): logger.debug( - 'Remove the idle connection to {}'.format( + "Remove the idle connection to {}".format( connection.get_address() ) ) diff --git a/nebula3/gclient/net/SessionPool.py b/nebula3/gclient/net/SessionPool.py index 7e4c9383..694eae2c 100644 --- a/nebula3/gclient/net/SessionPool.py +++ b/nebula3/gclient/net/SessionPool.py @@ -84,12 +84,12 @@ def init(self, configs): try: self._check_configs() except Exception as e: - logger.error('Invalid configs: {}'.format(e)) + logger.error("Invalid configs: {}".format(e)) return False if self._close: - logger.error('The pool has init or closed.') - raise RuntimeError('The pool has init or closed.') + logger.error("The pool has init or closed.") + raise RuntimeError("The pool has init or closed.") self._configs = configs # ping all servers @@ -101,14 +101,14 @@ def init(self, configs): ok_num = self.get_ok_servers_num() if ok_num < len(self._addresses): raise RuntimeError( - 'The services status exception: {}'.format(self._get_services_status()) + "The services status exception: {}".format(self._get_services_status()) ) # iterate all addresses and create sessions to fullfil the min_size for i in range(self._configs.min_size): session = self._new_session() if session is None: - raise RuntimeError('Get session failed') + raise RuntimeError("Get session failed") self._add_session_to_idle(session) return True @@ -129,7 +129,7 @@ def ping(self, address): return True except Exception as ex: logger.warning( - 'Connect {}:{} failed: {}'.format(address[0], address[1], ex) + "Connect {}:{} failed: {}".format(address[0], address[1], ex) ) return False @@ -156,7 +156,7 @@ def execute_parameter(self, stmt, params): """ session = self._get_idle_session() if session is None: - raise RuntimeError('Get session failed') + raise RuntimeError("Get session failed") self._add_session_to_active(session) try: @@ -171,7 +171,7 @@ def execute_parameter(self, stmt, params): return resp except Exception as e: - logger.error('Execute failed: {}'.format(e)) + logger.error("Execute failed: {}".format(e)) # remove the session from the pool if it is invalid self._active_sessions.remove(session) raise e @@ -242,7 +242,7 @@ def execute_json(self, stmt): def execute_json_with_parameter(self, stmt, params): session = self._get_idle_session() if session is None: - raise RuntimeError('Get session failed') + raise RuntimeError("Get session failed") self._add_session_to_active(session) try: @@ -258,7 +258,7 @@ def execute_json_with_parameter(self, stmt, params): return resp except Exception as e: - logger.error('Execute failed: {}'.format(e)) + logger.error("Execute failed: {}".format(e)) # remove the session from the pool if it is invalid self._active_sessions.remove(session) raise e @@ -292,11 +292,11 @@ def get_ok_servers_num(self): def _get_services_status(self): msg_list = [] for addr in self._addresses_status.keys(): - status = 'OK' + status = "OK" if self._addresses_status[addr] != self.S_OK: - status = 'BAD' - msg_list.append('[services: {}, status: {}]'.format(addr, status)) - return ', '.join(msg_list) + status = "BAD" + msg_list.append("[services: {}, status: {}]".format(addr, status)) + return ", ".join(msg_list) def update_servers_status(self): """update the servers' status""" @@ -324,7 +324,7 @@ def _get_idle_session(self): return self._new_session() else: raise NoValidSessionException( - 'The total number of sessions reaches the pool max size {}'.format( + "The total number of sessions reaches the pool max size {}".format( self._configs.max_size ) ) @@ -336,7 +336,7 @@ def _new_session(self): :return: Session """ if self._ssl_configs is not None: - raise RuntimeError('SSL is not supported yet') + raise RuntimeError("SSL is not supported yet") self._pos = (self._pos + 1) % len(self._addresses) next_addr_index = self._pos @@ -349,7 +349,7 @@ def _new_session(self): # if the address is bad, skip it if self._addresses_status[addr] == self.S_BAD: - logger.warning('The graph service {} is not available'.format(addr)) + logger.warning("The graph service {} is not available".format(addr)) retries = retries - 1 next_addr_index = (next_addr_index + 1) % len(self._addresses) continue @@ -362,10 +362,10 @@ def _new_session(self): session = Session(connection, auth_result, self, False) # switch to the space specified in the configs - resp = session.execute('USE {}'.format(self._space_name)) + resp = session.execute("USE {}".format(self._space_name)) if not resp.is_succeeded(): raise RuntimeError( - 'Failed to get session, cannot set the session space to {} error: {} {}'.format( + "Failed to get session, cannot set the session space to {} error: {} {}".format( self._space_name, resp.error_code(), resp.error_msg() ) ) @@ -373,10 +373,10 @@ def _new_session(self): except AuthFailedException as e: # if auth failed because of credentials, close the pool if e.message.find("Invalid password") or e.message.find( - "User not exist" + "User not exist" ): logger.error( - 'Authentication failed, because of bad credentials, close the pool {}'.format( + "Authentication failed, because of bad credentials, close the pool {}".format( e ) ) @@ -386,7 +386,7 @@ def _new_session(self): raise raise RuntimeError( - 'Failed to get a valid session, no graph service is available' + "Failed to get a valid session, no graph service is available" ) def _return_session(self, session): @@ -428,14 +428,14 @@ def _set_space_to_default(self, session): :return: void """ try: - resp = session.execute('USE {}'.format(self._space_name)) + resp = session.execute("USE {}".format(self._space_name)) if not resp.is_succeeded(): raise RuntimeError( - 'Failed to set the session space to {}'.format(self._space_name) + "Failed to set the session space to {}".format(self._space_name) ) except Exception: logger.warning( - 'Failed to set the session space to {}, the current session has been dropped'.format( + "Failed to set the session space to {}, the current session has been dropped".format( self._space_name ) ) @@ -474,23 +474,23 @@ def _period_detect(self): def _check_configs(self): """validate the configs""" if self._configs.min_size < 0: - raise RuntimeError('The min_size must be greater than 0') + raise RuntimeError("The min_size must be greater than 0") if self._configs.max_size < 0: - raise RuntimeError('The max_size must be greater than 0') + raise RuntimeError("The max_size must be greater than 0") if self._configs.min_size > self._configs.max_size: raise RuntimeError( - 'The min_size must be less than or equal to the max_size' + "The min_size must be less than or equal to the max_size" ) if self._configs.idle_time < 0: - raise RuntimeError('The idle_time must be greater or equal to 0') + raise RuntimeError("The idle_time must be greater or equal to 0") if self._configs.timeout < 0: - raise RuntimeError('The timeout must be greater or equal to 0') + raise RuntimeError("The timeout must be greater or equal to 0") if self._space_name == "": - raise RuntimeError('The space_name must be set') + raise RuntimeError("The space_name must be set") if self._username == "": - raise RuntimeError('The username must be set') + raise RuntimeError("The username must be set") if self._password == "": - raise RuntimeError('The password must be set') + raise RuntimeError("The password must be set") if self._addresses is None or len(self._addresses) == 0: - raise RuntimeError('The addresses must be set') + raise RuntimeError("The addresses must be set")