diff --git a/src/main/java/org/broadinstitute/hellbender/utils/python/StreamingPythonScriptExecutor.java b/src/main/java/org/broadinstitute/hellbender/utils/python/StreamingPythonScriptExecutor.java index ba75e306ca7..41bba7a67f9 100644 --- a/src/main/java/org/broadinstitute/hellbender/utils/python/StreamingPythonScriptExecutor.java +++ b/src/main/java/org/broadinstitute/hellbender/utils/python/StreamingPythonScriptExecutor.java @@ -197,22 +197,24 @@ public void sendAsynchronousCommand(final String line) { /** * Wait for an acknowledgement (which must have been previously requested). - * @return true if a positive acknowledgement (ack) is received, false if negative (nck) + * @return {@link ProcessOutput} when positive acknowledgement (ack) has been received, otherwise throws + * @throws PythonScriptExecutorException if nck was received */ public ProcessOutput waitForAck() { if (!isAckRequestOutstanding) { throw new GATKException("No ack request is outstanding. An ack request must be issued first"); } - final boolean isAck = spController.waitForAck(); + final ProcessControllerAckResult pcAckResult = spController.waitForAck(); isAckRequestOutstanding = false; // At every ack receipt, we want to retrieve the stdout/stderr output in case we're journaling final ProcessOutput po = getAccumulatedOutput(); - // if the ack was negative, throw, since the ack queue is no longer reliably in sync - if (!isAck) { + if (!pcAckResult.isPositiveAck()) { throw new PythonScriptExecutorException( String.format( "A nack was received from the Python process (most likely caused by a raised exception caused by): %s", - po.toString())); + pcAckResult.getDisplayMessage() + ) + ); } return po; } diff --git a/src/main/java/org/broadinstitute/hellbender/utils/runtime/ProcessControllerAckResult.java b/src/main/java/org/broadinstitute/hellbender/utils/runtime/ProcessControllerAckResult.java new file mode 100644 index 00000000000..ee5b05f7f7c --- /dev/null +++ b/src/main/java/org/broadinstitute/hellbender/utils/runtime/ProcessControllerAckResult.java @@ -0,0 +1,75 @@ +package org.broadinstitute.hellbender.utils.runtime; + +import org.broadinstitute.hellbender.exceptions.GATKException; + +/** + * Command acknowledgements that are returned from a process managed by StreamingProcessController. + * Ack results can be positive, negative, or negative with a message. + */ +public class ProcessControllerAckResult { + + private final boolean isPositiveAck; + private final String message; + + // three message types can be used by the remote process + private static String displayMessageFormat = "%s received\n\n"; + private static String ACK_LOG_MESSAGE = String.format(displayMessageFormat, StreamingToolConstants.STREAMING_ACK_MESSAGE); + private static String NCK_LOG_MESSAGE = String.format(displayMessageFormat, StreamingToolConstants.STREAMING_NCK_MESSAGE); + private static String NCK_WITH_MESSAGE_LOG_MESSAGE = String.format(displayMessageFormat, StreamingToolConstants.STREAMING_NCK_WITH_MESSAGE_MESSAGE); + + /** + * Creates an ack result, for ACK or NCK. + * @param isPositiveAck true for a positive ack (ACK), false for negative ack (NCK) + */ + public ProcessControllerAckResult(final boolean isPositiveAck) { + this.isPositiveAck = isPositiveAck; + this.message = null; + } + + /** + * Creates an (negative NKM) ack result, with a message. + * @param nckMessage Message detail indicating reason for negative ack (NKM). + */ + public ProcessControllerAckResult(final String nckMessage) { + this.isPositiveAck = false; + this.message = nckMessage; + } + + /** + * @return true if this represents a positive ack, otherwise false + */ + public boolean isPositiveAck() { + return isPositiveAck; + } + + /** + * @return true if this ack is negative and includes a message + */ + public boolean hasMessage() { + return !isPositiveAck() && !message.isEmpty(); + } + + /** + * @return A (possibly empty) String with any message sent from the remote process. + * Only defined for negative acknowledgements {@link #hasMessage()}. + */ + public String getNegativeACKMessage() { + if (isPositiveAck()) { + throw new GATKException("Can only retrieve messages for negative acknowledgements"); + } + return message; + } + + /** + * @return A message string representing this ack/nck suitable for logging/display to the user. + */ + public String getDisplayMessage() { + if (isPositiveAck()) { + return ACK_LOG_MESSAGE; + } else if (hasMessage()) { + return String.format("%s: %s", NCK_WITH_MESSAGE_LOG_MESSAGE, getNegativeACKMessage()); + } else { + return NCK_LOG_MESSAGE; + } + } +} diff --git a/src/main/java/org/broadinstitute/hellbender/utils/runtime/StreamingProcessController.java b/src/main/java/org/broadinstitute/hellbender/utils/runtime/StreamingProcessController.java index 31d1eb5606a..80af2c0f841 100644 --- a/src/main/java/org/broadinstitute/hellbender/utils/runtime/StreamingProcessController.java +++ b/src/main/java/org/broadinstitute/hellbender/utils/runtime/StreamingProcessController.java @@ -37,14 +37,8 @@ public final class StreamingProcessController extends ProcessControllerBase ackFuture; + private Future ackFuture; // keep an optional journal of all IPC; disabled/no-op by default private ProcessJournal processJournal = new ProcessJournal(); @@ -200,27 +194,28 @@ public void openAckFIFOForRead() { /** * Wait for a previously requested acknowledgement to be received. The remote process can deliver a positive * ack to indicate successful command completion, or a negative ack to indicate command execution failure. - * @return true if an positive acknowledgement (ACK) was received, false if a negative acknowledgement (NCK) - * was received + * @return a {@link ProcessControllerAckResult} indicating the resulting ack state. + * {@link ProcessControllerAckResult#isPositiveAck()} will return true for a positive acknowledgement (ACK), + * false if a negative acknowledgement (NCK) */ - public boolean waitForAck() { + public ProcessControllerAckResult waitForAck() { if (ackFuture != null) { - throw new GATKException("An ack is already outstanding"); + throw new GATKException("An ack request is already outstanding"); } ackFuture = executorService.submit( () -> { try { - byte[] ack = new byte[ACK_MESSAGE_SIZE]; - int nBytes = ackFIFOInputStream.read(ack, 0, ACK_MESSAGE_SIZE); - if (nBytes != ACK_MESSAGE_SIZE) { - throw new GATKException(String.format("Failure reading ack message from ack fifo, ret: (%d)", nBytes)); - } else if (Arrays.equals(ack, NCK_MESSAGE.getBytes())) { - return false; - } else if (Arrays.equals(ack, ACK_MESSAGE.getBytes())) { - return true; + final String ackMessage = getBytesFromStream(StreamingToolConstants.STREAMING_ACK_MESSAGE_SIZE); + if (ackMessage.equals(StreamingToolConstants.STREAMING_ACK_MESSAGE)) { + return new ProcessControllerAckResult(true); + } else if (ackMessage.equals(StreamingToolConstants.STREAMING_NCK_MESSAGE)) { + return new ProcessControllerAckResult(false); + } else if (ackMessage.equals(StreamingToolConstants.STREAMING_NCK_WITH_MESSAGE_MESSAGE)) { + return getNckWithMessageResult(); } else { - logger.error("Unrecognized string written to ack fifo"); - return false; + final String badAckMessage = "An unrecognized ack string message was written to ack fifo"; + logger.error(badAckMessage); + return new ProcessControllerAckResult(badAckMessage); } } catch (IOException e) { throw new GATKException("IOException reading from ack fifo", e); @@ -230,15 +225,53 @@ public boolean waitForAck() { try { // blocking call to wait for the ack - boolean isAck = ackFuture.get(); - processJournal.writeLogMessage(isAck ? ACK_LOG_MESSAGE : NCK_LOG_MESSAGE); + final ProcessControllerAckResult pcAck = ackFuture.get(); + processJournal.writeLogMessage(pcAck.getDisplayMessage()); ackFuture = null; - return isAck; + return pcAck; } catch (InterruptedException | ExecutionException e) { throw new GATKException("Exception waiting for ack from Python: " + e.getMessage(), e); } } + // Retrieve a nkm message from the input stream and package it up as a ProcessControllerAckResult + private ProcessControllerAckResult getNckWithMessageResult() throws IOException { + // look for a 4 byte long string with a 4 byte decimal integer with a value <= 9999 + final String messageLengthString = getBytesFromStream(StreamingToolConstants.STREAMING_NCK_WITH_MESSAGE_MESSAGE_LEN_SIZE); + final int messageLength = Integer.valueOf(messageLengthString); + if (messageLength < 0) { + throw new GATKException("Negative ack message length must be > 0"); + } + + // now get the corresponding message of that length messageLength + final String nckMessage = getBytesFromStream(messageLength); + return new ProcessControllerAckResult(nckMessage); + } + + // Retrieve a given number of bytes from the stream and return the value as a String. + private String getBytesFromStream(final int expectedMessageLength) throws IOException { + int nBytesReceived = 0; + int nBytesRemaining = expectedMessageLength; + final StringBuilder sb = new StringBuilder(); + while (nBytesReceived < expectedMessageLength) { + final byte[] nckMessage = new byte[nBytesRemaining]; + int readLen = ackFIFOInputStream.read(nckMessage, 0, nBytesRemaining); + if (readLen <= 0) { + throw new GATKException( + String.format("Expected message of length %d but only found %d bytes", expectedMessageLength, nBytesReceived)); + } + + sb.append(new String(nckMessage, 0, readLen)); + nBytesRemaining -= readLen; + nBytesReceived += readLen; + } + if (nBytesReceived != expectedMessageLength) { + throw new GATKException( + String.format("Expected message of length %d but found %d", expectedMessageLength, nBytesReceived)); + } + return sb.toString(); + } + /** * Create a temporary FIFO suitable for sending output to the remote process. The FIFO is only valid for the * lifetime of the controller; the FIFO is destroyed when the controller is destroyed. diff --git a/src/main/java/org/broadinstitute/hellbender/utils/runtime/StreamingToolConstants.java b/src/main/java/org/broadinstitute/hellbender/utils/runtime/StreamingToolConstants.java new file mode 100644 index 00000000000..701374b6eec --- /dev/null +++ b/src/main/java/org/broadinstitute/hellbender/utils/runtime/StreamingToolConstants.java @@ -0,0 +1,31 @@ +package org.broadinstitute.hellbender.utils.runtime; + +/** + * Various constants used by StreamingProcessController that require synchronized equivalents in + * the companion process, i.e., if the streaming process is written in Python, there must be + * equivalent Python constants for use by the Python code. + * + * See the equivalents for Python in toolcontants.py. + */ +public class StreamingToolConstants { + /** + * Command acknowledgement messages used to signal positive acknowledgement ('ack'), + * negative acknowledgement ('nck'), and negative acknowledgement with an accompanying + * message ('nkm'). + */ + public static String STREAMING_ACK_MESSAGE = "ack"; + public static String STREAMING_NCK_MESSAGE = "nck"; + public static String STREAMING_NCK_WITH_MESSAGE_MESSAGE = "nkm"; + + // This is only used by Java, but is kept here since it represents the length of the constant + // strings defined above. + protected static int STREAMING_ACK_MESSAGE_SIZE = 3; // "ack", "nck", or "nkm" + + /** + * Number of characters used to represent the length of the serialized message, fixed at a constant + * 4 characters to ensure we can deterministically know how much input to wait for when looking for + * a message length in the incoming stream. + */ + public static int STREAMING_NCK_WITH_MESSAGE_MESSAGE_LEN_SIZE = 4; + public static int STREAMING_NCK_WITH_MESSAGE_MAX_MESSAGE_LENGTH = 9999; +} diff --git a/src/main/python/org/broadinstitute/hellbender/gatktool/tool.py b/src/main/python/org/broadinstitute/hellbender/gatktool/tool.py index 5d3259f5134..6997781f399 100644 --- a/src/main/python/org/broadinstitute/hellbender/gatktool/tool.py +++ b/src/main/python/org/broadinstitute/hellbender/gatktool/tool.py @@ -15,6 +15,8 @@ import sys import os import cProfile, pstats, io +import traceback +from gatktool import toolconstants _ackFIFO = None _dataFIFO = None @@ -34,7 +36,7 @@ def initializeGATK(ackFIFOName: str): sys.excepthook = gatkExceptionHook -def gatkExceptionHook(exceptionType, value, traceback): +def gatkExceptionHook(exceptionType, value, tracebck): """ GATK Handler for uncaught Python exceptions. @@ -43,8 +45,8 @@ def gatkExceptionHook(exceptionType, value, traceback): sends a nack to GATK through the FIFO, which results in a PythonScriptExecutorException being thrown in the tool. """ - sendNack() - sys.__excepthook__(exceptionType, value, traceback) + sendNackWithMessage(' '.join(map(str, traceback.format_exception(exceptionType, value, tracebck)))) + sys.__excepthook__(exceptionType, value, tracebck) def sendAck(): @@ -67,6 +69,16 @@ def sendNack(): _ackFIFO.writeNack() +def sendNackWithMessage(nckMessage: str): + """ + Send a negative acknowledgment to GATK, along with a message. Generally only + called by the installed exception hook. This will result in a Java exception being + thrown that unless caught by Java code, will terminate the tool. + """ + global _ackFIFO + _ackFIFO.writeNackWithMessage(nckMessage) + + def terminateGATK(): """ Called by GATK when no more Python commands will be executed @@ -141,8 +153,6 @@ class AckFIFO: Manage the FIFO used to notify GATK (via an ack) that a command has completed, or failed due to an unhandled exception (via a nck). """ - _ackString = "ack" - _nackString = "nck" def __init__(self, ackFIFOName: str) -> None: """Open the ack fifo stream for writing only""" @@ -156,7 +166,7 @@ def writeAck(self): """ if self.fileWriter is None: raise RuntimeError("ack FIFO has not been initialized") - self.fileWriter.write(AckFIFO._ackString) + self.fileWriter.write(toolconstants._ackString) self.fileWriter.flush() def writeNack(self): @@ -164,13 +174,42 @@ def writeNack(self): Write a negative acknowledgement to the ACK FIFO. Calling this method will result in an exception being thrown - in the GATK tool on whos behalf this module is running. + in the GATK tool on whose behalf this module is running. + """ + if self.fileWriter is None: + raise RuntimeError("ack FIFO has not been initialized") + self.fileWriter.write(toolconstants._nackString) + self.fileWriter.flush() + + def writeNackWithMessage(self, message: str) -> None: + """ + Write a negative acknowledgement with a supplemental message to the ACK FIFO. + + The format of a nck message is: + + ncknnnnssss.... + + where "nck" is literal, "nnnn" is a 4 byte long string of decimal digits representing the length + of the message that follows, and "ssss...." is a string of length nnnn + + Calling this method will result in an exception being thrown + in the GATK tool on whose behalf this module is running. """ if self.fileWriter is None: raise RuntimeError("ack FIFO has not been initialized") - self.fileWriter.write(AckFIFO._nackString) + self.fileWriter.write(toolconstants._nkmString) + actualMessageLength = len(message) + """The message length must be exactly 4 bytes""" + if len(str(actualMessageLength)) <= toolconstants._nckMessageLengthSerializedSize: + self.fileWriter.write(str(actualMessageLength).zfill(toolconstants._nckMessageLengthSerializedSize)) + self.fileWriter.write(message) + else: + """Message is too long, trim to 9999 bytes""" + self.fileWriter.write(str(toolconstants._nckMaxMessageLength)) + self.fileWriter.write(message[:toolconstants._nckMaxMessageLength]) self.fileWriter.flush() + def close(self): assert self.fileWriter != None self.fileWriter.close() diff --git a/src/main/python/org/broadinstitute/hellbender/gatktool/toolconstants.py b/src/main/python/org/broadinstitute/hellbender/gatktool/toolconstants.py new file mode 100644 index 00000000000..b1a99af9105 --- /dev/null +++ b/src/main/python/org/broadinstitute/hellbender/gatktool/toolconstants.py @@ -0,0 +1,22 @@ +""" +Constants that must remain in sync with the companion StreamingProcessController Java +code in GATK. See StreamingToolConstants.java. + +""" + +""" +Command acknowledgement messages used to signal positive acknowledgement ('ack', +negative acknowledgement ('nck'), and negative acknowledgement with an accompanying +message ('nkm'). +""" +_ackString = "ack" +_nackString = "nck" +_nkmString = "nkm" + + +""" +The length of a message written with a negative ack (nkm) must be 4 bytes long when +serialized as a string, and cannot have a value > 9999. +""" +_nckMessageLengthSerializedSize = 4 +_nckMaxMessageLength = 9999 diff --git a/src/test/java/org/broadinstitute/hellbender/utils/python/StreamingPythonScriptExecutorUnitTest.java b/src/test/java/org/broadinstitute/hellbender/utils/python/StreamingPythonScriptExecutorUnitTest.java index 9f415613308..e575b5e9bf0 100644 --- a/src/test/java/org/broadinstitute/hellbender/utils/python/StreamingPythonScriptExecutorUnitTest.java +++ b/src/test/java/org/broadinstitute/hellbender/utils/python/StreamingPythonScriptExecutorUnitTest.java @@ -71,6 +71,31 @@ public void testExecuteCommand(final PythonScriptExecutor.PythonExecutableName e } } + @Test(groups = "python", dataProvider="supportedPythonVersions", timeOut = 50000, expectedExceptions={PythonScriptExecutorException.class}) + public void testNckWithMessage(final PythonScriptExecutor.PythonExecutableName executableName) throws PythonScriptExecutorException { + + final StreamingPythonScriptExecutor streamingPythonExecutor = + new StreamingPythonScriptExecutor<>(executableName,true); + Assert.assertNotNull(streamingPythonExecutor); + Assert.assertTrue(streamingPythonExecutor.start(Collections.emptyList(), true, null)); + + try { + // force a division by 0 error to raise an exception on the Python side, and verify that + // the exeception details get propagated back through the ack fifo (as a negative ack with + // a message - "nkm") containing the details of the error, as well as to the exception + // thrown by the executor + streamingPythonExecutor.sendSynchronousCommand("0/0" + NL); + } + catch (PythonScriptExecutorException e) { + Assert.assertTrue(e.getMessage().contains("division")); + throw e; + } + finally { + streamingPythonExecutor.terminate(); + Assert.assertFalse(streamingPythonExecutor.getProcess().isAlive()); + } + } + @Test(groups = "python", dataProvider="supportedPythonVersions", dependsOnMethods = "testPythonExists") public void testTerminateWhilePythonBlocked(final PythonScriptExecutor.PythonExecutableName executableName) { // Test termination on a Python process that is blocked on I/O to ensure that we don't leave @@ -232,7 +257,7 @@ public void testRaisePythonException(final PythonScriptExecutor.PythonExecutable @Test(groups = "python", dataProvider="supportedPythonVersions", dependsOnMethods = "testPythonExists", expectedExceptions = PythonScriptExecutorException.class) public void testRaisePythonAssert(final PythonScriptExecutor.PythonExecutableName executableName) { - executeBadPythonCode(executableName,"assert false"); + executeBadPythonCode(executableName,"assert 0"); } private void executeBadPythonCode(final PythonScriptExecutor.PythonExecutableName executableName, final String errorCommand) { diff --git a/src/test/java/org/broadinstitute/hellbender/utils/runtime/StreamingProcessControllerUnitTest.java b/src/test/java/org/broadinstitute/hellbender/utils/runtime/StreamingProcessControllerUnitTest.java index 6884ca7038f..9b26e6eafeb 100644 --- a/src/test/java/org/broadinstitute/hellbender/utils/runtime/StreamingProcessControllerUnitTest.java +++ b/src/test/java/org/broadinstitute/hellbender/utils/runtime/StreamingProcessControllerUnitTest.java @@ -1,6 +1,9 @@ package org.broadinstitute.hellbender.utils.runtime; import org.broadinstitute.hellbender.testutils.BaseTest; +import org.broadinstitute.hellbender.utils.Utils; +import org.broadinstitute.hellbender.utils.python.PythonScriptExecutorException; +import org.broadinstitute.hellbender.utils.python.StreamingPythonScriptExecutor; import org.testng.Assert; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -10,9 +13,11 @@ import java.io.FileReader; import java.io.IOException; import java.util.Arrays; +import java.util.Collections; -// Tests for the StreamingProcessController. Although these tests use Python, they do not test -// or depend on the gatktool python package. +// Tests for the StreamingProcessController. The StreamingProcessController depends on the remote +// process cooperating via use of the ack fifo. Since the gatktool Python package implements the required +// cooperative methods, some of these tests depend on that package. // // NOTE: TestNG has a bug where it throws ArrayIndexOutOfBoundsException instead of TimeoutException // exception when the test time exceeds the timeOut threshold. This is fixed but not yet released: @@ -135,10 +140,10 @@ public void testFIFOLifetime() { Assert.assertFalse(catController.getProcess().isAlive()); } - @Test(expectedExceptions = IllegalStateException.class) + @Test(groups = "python", expectedExceptions = IllegalStateException.class) public void testRedundantStart() { - final ProcessSettings catProcessSettings = new ProcessSettings(new String[] {"python", "-i", "-u"}); - final StreamingProcessController catController = new StreamingProcessController(catProcessSettings); + final ProcessSettings processSettings = new ProcessSettings(new String[] {"python", "-i", "-u"}); + final StreamingProcessController catController = new StreamingProcessController(processSettings); Assert.assertNotNull(catController.start()); try { @@ -149,6 +154,49 @@ public void testRedundantStart() { } } + @DataProvider(name="nckMessages") + private Object[][] getNckMessages() { + return new Object[][] { + // message, expected message length + { "", "" }, // message of length 0 + { "1", "1" }, // message of length 1 + { "Test roundtrip negative ack with message protocol", "Test roundtrip negative ack with message protocol" }, + // one byte less than max + { Utils.dupChar('s', StreamingToolConstants.STREAMING_NCK_WITH_MESSAGE_MAX_MESSAGE_LENGTH - 1), + Utils.dupChar('s', StreamingToolConstants.STREAMING_NCK_WITH_MESSAGE_MAX_MESSAGE_LENGTH - 1) }, + // exactly max bytes + { Utils.dupChar('s', StreamingToolConstants.STREAMING_NCK_WITH_MESSAGE_MAX_MESSAGE_LENGTH), + Utils.dupChar('s', StreamingToolConstants.STREAMING_NCK_WITH_MESSAGE_MAX_MESSAGE_LENGTH) }, // 9999 + // max bytes + 1000, trimmed to max + { Utils.dupChar('s', StreamingToolConstants.STREAMING_NCK_WITH_MESSAGE_MAX_MESSAGE_LENGTH + 1000), + Utils.dupChar('s', StreamingToolConstants.STREAMING_NCK_WITH_MESSAGE_MAX_MESSAGE_LENGTH) }, // > 9999 is trimmed to 9999 + }; + } + + @Test(groups = "python", dataProvider = "nckMessages", timeOut = 50000, expectedExceptions={PythonScriptExecutorException.class}) + public void testNckWithMessage(final String nkmMessage, final String expectedMessage) throws PythonScriptExecutorException { + + // Since testing the nack w/message StreamingProcessController functionality requires a cooperative remote + // process that implements code to service the ack fifo, we test it indirectly via use of the + // StreamingPythonExecutor, since that already depends on Python code that knows how to participate + // in the protocol. + final StreamingPythonScriptExecutor streamingPythonExecutor = + new StreamingPythonScriptExecutor<>(StreamingPythonScriptExecutor.PythonExecutableName.PYTHON3,true); + Assert.assertNotNull(streamingPythonExecutor); + Assert.assertTrue(streamingPythonExecutor.start(Collections.emptyList(), true, null)); + + try { + streamingPythonExecutor.sendSynchronousCommand(String.format("tool.sendNackWithMessage(\"%s\")" + NL, nkmMessage)); + } + catch (PythonScriptExecutorException e) { + Assert.assertTrue(e.getMessage().contains(expectedMessage)); + throw e; + } + finally { + streamingPythonExecutor.terminate(); + } + } + private StreamingProcessController initializePythonControllerWithAckFIFO(boolean redirectStderr) { // start an interactive Python session with unbuffered IO final ProcessSettings processSettings = new ProcessSettings(new String[] {"python", "-i", "-u"}); @@ -188,7 +236,7 @@ private File writePythonExpressionToTempFile(final StreamingProcessController co private boolean requestAndWaitForAck(final StreamingProcessController controller) { controller.writeProcessInput("akcFIFOWriter.write('ack')" + NL); controller.writeProcessInput("akcFIFOWriter.flush()" + NL); - return controller.waitForAck(); + return controller.waitForAck().isPositiveAck(); } private String getLineFromTempFile(final File tempFile) throws IOException {