From f2aeb0a189922cc990040b24947ffb3efa85a511 Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Wed, 1 Jan 2025 13:46:21 -0800 Subject: [PATCH] Fix python udf source detection (#3189) PhysicalOp relies on the input port number to determine if an operator is a source operator. For Python UDF, from the changes in #3183, the input ports are not correctly associated with the PhysicalOp, causing all the Python UDFs to be recognized as source operators. This PR fixes the issue. --- .../architecture/pythonworker/PythonProxyClient.scala | 2 +- .../ics/amber/operator/udf/python/PythonUDFOpDescV2.scala | 7 +++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/pythonworker/PythonProxyClient.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/pythonworker/PythonProxyClient.scala index c7dc6400c1..61a1a2641d 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/pythonworker/PythonProxyClient.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/pythonworker/PythonProxyClient.scala @@ -69,7 +69,7 @@ class PythonProxyClient(portNumberPromise: Promise[Int], val actorId: ActorVirtu logger.warn( s"Failed to connect to Flight Server in this attempt, retrying after $UNIT_WAIT_TIME_MS ms... remaining attempts: ${MAX_TRY_COUNT - tryCount}" ) - flightClient.close() + if (flightClient != null) flightClient.close() Thread.sleep(UNIT_WAIT_TIME_MS) tryCount += 1 } diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/udf/python/PythonUDFOpDescV2.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/udf/python/PythonUDFOpDescV2.scala index 1c070636eb..802b8d7d54 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/udf/python/PythonUDFOpDescV2.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/udf/python/PythonUDFOpDescV2.scala @@ -93,7 +93,7 @@ class PythonUDFOpDescV2 extends LogicalOp { Map(operatorInfo.outputPorts.head.id -> outputSchema) } - if (workers > 1) { + val physicalOp = if (workers > 1) { PhysicalOp .oneToOnePhysicalOp( workflowId, @@ -112,7 +112,10 @@ class PythonUDFOpDescV2 extends LogicalOp { OpExecWithCode(code, "python") ) .withParallelizable(false) - }.withDerivePartition(_ => UnknownPartition()) + } + + physicalOp + .withDerivePartition(_ => UnknownPartition()) .withInputPorts(operatorInfo.inputPorts) .withOutputPorts(operatorInfo.outputPorts) .withPartitionRequirement(partitionRequirement)