From d97ccdceda15cff9d415192879d9eed31d25c771 Mon Sep 17 00:00:00 2001 From: peixubin <20983498@qq.com> Date: Mon, 4 Nov 2024 14:49:45 +0800 Subject: [PATCH] =?UTF-8?q?pgsql=20engine:=20=E5=9C=A8=E6=9F=A5=E8=AF=A2?= =?UTF-8?q?=E6=A8=A1=E5=BC=8F=E4=B8=8B=E5=90=AF=E7=94=A8=E5=8F=AA=E8=AF=BB?= =?UTF-8?q?=E4=BA=8B=E5=8A=A1=EF=BC=8C=E5=B7=A5=E5=8D=95=E6=89=A7=E8=A1=8C?= =?UTF-8?q?=E5=90=AF=E7=94=A8=E8=AF=BB=E5=86=99=E4=BA=8B=E5=8A=A1,?= =?UTF-8?q?=E9=81=BF=E5=85=8D=E5=9C=A8=E6=9F=A5=E8=AF=A2=E6=A8=A1=E5=BC=8F?= =?UTF-8?q?=E4=B8=8B=E8=A2=AB=E5=87=BD=E6=95=B0=E4=BF=AE=E6=94=B9=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=20(#2855)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sql/engines/pgsql.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/sql/engines/pgsql.py b/sql/engines/pgsql.py index f686f33772..0307033caf 100644 --- a/sql/engines/pgsql.py +++ b/sql/engines/pgsql.py @@ -187,12 +187,14 @@ def query( result_set = ResultSet(full_sql=sql) try: conn = self.get_connection(db_name=db_name) + conn.autocommit = False max_execution_time = kwargs.get("max_execution_time", 0) cursor = conn.cursor() try: cursor.execute(f"SET statement_timeout TO {max_execution_time};") except: pass + cursor.execute("SET transaction ISOLATION LEVEL READ COMMITTED READ ONLY;") if schema_name: cursor.execute( f"SET search_path TO %(schema_name)s;", {"schema_name": schema_name} @@ -203,6 +205,7 @@ def query( rows = cursor.fetchmany(size=int(limit_num)) else: rows = cursor.fetchall() + conn.commit() fields = cursor.description column_type_codes = [i[1] for i in fields] if fields else [] # 定义 JSON 和 JSONB 的 type_code,# 114 是 json,3802 是 jsonb @@ -231,6 +234,7 @@ def query( result_set.rows = converted_rows result_set.affected_rows = len(converted_rows) except Exception as e: + conn.rollback() logger.warning( f"PgSQL命令执行报错,语句:{sql}, 错误信息:{traceback.format_exc()}" ) @@ -324,13 +328,14 @@ def execute_workflow(self, workflow, close_conn=True): db_name = workflow.db_name try: conn = self.get_connection(db_name=db_name) + conn.autocommit = False cursor = conn.cursor() + cursor.execute("SET transaction ISOLATION LEVEL READ COMMITTED READ WRITE;") # 逐条执行切分语句,追加到执行结果中 for statement in split_sql: statement = statement.rstrip(";") with FuncTimer() as t: cursor.execute(statement) - conn.commit() execute_result.rows.append( ReviewResult( id=line, @@ -343,7 +348,9 @@ def execute_workflow(self, workflow, close_conn=True): ) ) line += 1 + conn.commit() except Exception as e: + conn.rollback() logger.warning( f"PGSQL命令执行报错,语句:{statement or sql}, 错误信息:{traceback.format_exc()}" )