Skip to content

Commit

Permalink
Add nck with message to StreamingProcessController for passing fatal …
Browse files Browse the repository at this point in the history
…error messages. (broadinstitute#5170)
  • Loading branch information
cmnbroad authored and EdwardDixon committed Nov 9, 2018
1 parent 7c09133 commit 54b7e60
Show file tree
Hide file tree
Showing 8 changed files with 319 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -197,22 +197,24 @@ public void sendAsynchronousCommand(final String line) {

/**
* Wait for an acknowledgement (which must have been previously requested).
* @return true if a positive acknowledgement (ack) is received, false if negative (nck)
* @return {@link ProcessOutput} when positive acknowledgement (ack) has been received, otherwise throws
* @throws PythonScriptExecutorException if nck was received
*/
public ProcessOutput waitForAck() {
if (!isAckRequestOutstanding) {
throw new GATKException("No ack request is outstanding. An ack request must be issued first");
}
final boolean isAck = spController.waitForAck();
final ProcessControllerAckResult pcAckResult = spController.waitForAck();
isAckRequestOutstanding = false;
// At every ack receipt, we want to retrieve the stdout/stderr output in case we're journaling
final ProcessOutput po = getAccumulatedOutput();
// if the ack was negative, throw, since the ack queue is no longer reliably in sync
if (!isAck) {
if (!pcAckResult.isPositiveAck()) {
throw new PythonScriptExecutorException(
String.format(
"A nack was received from the Python process (most likely caused by a raised exception caused by): %s",
po.toString()));
pcAckResult.getDisplayMessage()
)
);
}
return po;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package org.broadinstitute.hellbender.utils.runtime;

import org.broadinstitute.hellbender.exceptions.GATKException;

/**
* Command acknowledgements that are returned from a process managed by StreamingProcessController.
* Ack results can be positive, negative, or negative with a message.
*/
public class ProcessControllerAckResult {

private final boolean isPositiveAck;
private final String message;

// three message types can be used by the remote process
private static String displayMessageFormat = "%s received\n\n";
private static String ACK_LOG_MESSAGE = String.format(displayMessageFormat, StreamingToolConstants.STREAMING_ACK_MESSAGE);
private static String NCK_LOG_MESSAGE = String.format(displayMessageFormat, StreamingToolConstants.STREAMING_NCK_MESSAGE);
private static String NCK_WITH_MESSAGE_LOG_MESSAGE = String.format(displayMessageFormat, StreamingToolConstants.STREAMING_NCK_WITH_MESSAGE_MESSAGE);

/**
* Creates an ack result, for ACK or NCK.
* @param isPositiveAck true for a positive ack (ACK), false for negative ack (NCK)
*/
public ProcessControllerAckResult(final boolean isPositiveAck) {
this.isPositiveAck = isPositiveAck;
this.message = null;
}

/**
* Creates an (negative NKM) ack result, with a message.
* @param nckMessage Message detail indicating reason for negative ack (NKM).
*/
public ProcessControllerAckResult(final String nckMessage) {
this.isPositiveAck = false;
this.message = nckMessage;
}

/**
* @return true if this represents a positive ack, otherwise false
*/
public boolean isPositiveAck() {
return isPositiveAck;
}

/**
* @return true if this ack is negative and includes a message
*/
public boolean hasMessage() {
return !isPositiveAck() && !message.isEmpty();
}

/**
* @return A (possibly empty) String with any message sent from the remote process.
* Only defined for negative acknowledgements {@link #hasMessage()}.
*/
public String getNegativeACKMessage() {
if (isPositiveAck()) {
throw new GATKException("Can only retrieve messages for negative acknowledgements");
}
return message;
}

/**
* @return A message string representing this ack/nck suitable for logging/display to the user.
*/
public String getDisplayMessage() {
if (isPositiveAck()) {
return ACK_LOG_MESSAGE;
} else if (hasMessage()) {
return String.format("%s: %s", NCK_WITH_MESSAGE_LOG_MESSAGE, getNegativeACKMessage());
} else {
return NCK_LOG_MESSAGE;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,8 @@ public final class StreamingProcessController extends ProcessControllerBase<Capt
private File ackFIFOFile = null;
private File dataFIFOFile = null;

// These strings must be kept in sync with the ones used in the Python package
public static String ACK_MESSAGE = "ack";
public static String NCK_MESSAGE = "nck";
private static int ACK_MESSAGE_SIZE = 3; // "ack" or "nck"
private static String ACK_LOG_MESSAGE = "Ack received\n\n";
private static String NCK_LOG_MESSAGE = "Nck received\n\n";
private InputStream ackFIFOInputStream;
private Future<Boolean> ackFuture;
private Future<ProcessControllerAckResult> ackFuture;

// keep an optional journal of all IPC; disabled/no-op by default
private ProcessJournal processJournal = new ProcessJournal();
Expand Down Expand Up @@ -200,27 +194,28 @@ public void openAckFIFOForRead() {
/**
* Wait for a previously requested acknowledgement to be received. The remote process can deliver a positive
* ack to indicate successful command completion, or a negative ack to indicate command execution failure.
* @return true if an positive acknowledgement (ACK) was received, false if a negative acknowledgement (NCK)
* was received
* @return a {@link ProcessControllerAckResult} indicating the resulting ack state.
* {@link ProcessControllerAckResult#isPositiveAck()} will return true for a positive acknowledgement (ACK),
* false if a negative acknowledgement (NCK)
*/
public boolean waitForAck() {
public ProcessControllerAckResult waitForAck() {
if (ackFuture != null) {
throw new GATKException("An ack is already outstanding");
throw new GATKException("An ack request is already outstanding");
}
ackFuture = executorService.submit(
() -> {
try {
byte[] ack = new byte[ACK_MESSAGE_SIZE];
int nBytes = ackFIFOInputStream.read(ack, 0, ACK_MESSAGE_SIZE);
if (nBytes != ACK_MESSAGE_SIZE) {
throw new GATKException(String.format("Failure reading ack message from ack fifo, ret: (%d)", nBytes));
} else if (Arrays.equals(ack, NCK_MESSAGE.getBytes())) {
return false;
} else if (Arrays.equals(ack, ACK_MESSAGE.getBytes())) {
return true;
final String ackMessage = getBytesFromStream(StreamingToolConstants.STREAMING_ACK_MESSAGE_SIZE);
if (ackMessage.equals(StreamingToolConstants.STREAMING_ACK_MESSAGE)) {
return new ProcessControllerAckResult(true);
} else if (ackMessage.equals(StreamingToolConstants.STREAMING_NCK_MESSAGE)) {
return new ProcessControllerAckResult(false);
} else if (ackMessage.equals(StreamingToolConstants.STREAMING_NCK_WITH_MESSAGE_MESSAGE)) {
return getNckWithMessageResult();
} else {
logger.error("Unrecognized string written to ack fifo");
return false;
final String badAckMessage = "An unrecognized ack string message was written to ack fifo";
logger.error(badAckMessage);
return new ProcessControllerAckResult(badAckMessage);
}
} catch (IOException e) {
throw new GATKException("IOException reading from ack fifo", e);
Expand All @@ -230,15 +225,53 @@ public boolean waitForAck() {

try {
// blocking call to wait for the ack
boolean isAck = ackFuture.get();
processJournal.writeLogMessage(isAck ? ACK_LOG_MESSAGE : NCK_LOG_MESSAGE);
final ProcessControllerAckResult pcAck = ackFuture.get();
processJournal.writeLogMessage(pcAck.getDisplayMessage());
ackFuture = null;
return isAck;
return pcAck;
} catch (InterruptedException | ExecutionException e) {
throw new GATKException("Exception waiting for ack from Python: " + e.getMessage(), e);
}
}

// Retrieve a nkm message from the input stream and package it up as a ProcessControllerAckResult
private ProcessControllerAckResult getNckWithMessageResult() throws IOException {
// look for a 4 byte long string with a 4 byte decimal integer with a value <= 9999
final String messageLengthString = getBytesFromStream(StreamingToolConstants.STREAMING_NCK_WITH_MESSAGE_MESSAGE_LEN_SIZE);
final int messageLength = Integer.valueOf(messageLengthString);
if (messageLength < 0) {
throw new GATKException("Negative ack message length must be > 0");
}

// now get the corresponding message of that length messageLength
final String nckMessage = getBytesFromStream(messageLength);
return new ProcessControllerAckResult(nckMessage);
}

// Retrieve a given number of bytes from the stream and return the value as a String.
private String getBytesFromStream(final int expectedMessageLength) throws IOException {
int nBytesReceived = 0;
int nBytesRemaining = expectedMessageLength;
final StringBuilder sb = new StringBuilder();
while (nBytesReceived < expectedMessageLength) {
final byte[] nckMessage = new byte[nBytesRemaining];
int readLen = ackFIFOInputStream.read(nckMessage, 0, nBytesRemaining);
if (readLen <= 0) {
throw new GATKException(
String.format("Expected message of length %d but only found %d bytes", expectedMessageLength, nBytesReceived));
}

sb.append(new String(nckMessage, 0, readLen));
nBytesRemaining -= readLen;
nBytesReceived += readLen;
}
if (nBytesReceived != expectedMessageLength) {
throw new GATKException(
String.format("Expected message of length %d but found %d", expectedMessageLength, nBytesReceived));
}
return sb.toString();
}

/**
* Create a temporary FIFO suitable for sending output to the remote process. The FIFO is only valid for the
* lifetime of the controller; the FIFO is destroyed when the controller is destroyed.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package org.broadinstitute.hellbender.utils.runtime;

/**
* Various constants used by StreamingProcessController that require synchronized equivalents in
* the companion process, i.e., if the streaming process is written in Python, there must be
* equivalent Python constants for use by the Python code.
*
* See the equivalents for Python in toolcontants.py.
*/
public class StreamingToolConstants {
/**
* Command acknowledgement messages used to signal positive acknowledgement ('ack'),
* negative acknowledgement ('nck'), and negative acknowledgement with an accompanying
* message ('nkm').
*/
public static String STREAMING_ACK_MESSAGE = "ack";
public static String STREAMING_NCK_MESSAGE = "nck";
public static String STREAMING_NCK_WITH_MESSAGE_MESSAGE = "nkm";

// This is only used by Java, but is kept here since it represents the length of the constant
// strings defined above.
protected static int STREAMING_ACK_MESSAGE_SIZE = 3; // "ack", "nck", or "nkm"

/**
* Number of characters used to represent the length of the serialized message, fixed at a constant
* 4 characters to ensure we can deterministically know how much input to wait for when looking for
* a message length in the incoming stream.
*/
public static int STREAMING_NCK_WITH_MESSAGE_MESSAGE_LEN_SIZE = 4;
public static int STREAMING_NCK_WITH_MESSAGE_MAX_MESSAGE_LENGTH = 9999;
}
55 changes: 47 additions & 8 deletions src/main/python/org/broadinstitute/hellbender/gatktool/tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import sys
import os
import cProfile, pstats, io
import traceback
from gatktool import toolconstants

_ackFIFO = None
_dataFIFO = None
Expand All @@ -34,7 +36,7 @@ def initializeGATK(ackFIFOName: str):
sys.excepthook = gatkExceptionHook


def gatkExceptionHook(exceptionType, value, traceback):
def gatkExceptionHook(exceptionType, value, tracebck):
"""
GATK Handler for uncaught Python exceptions.
Expand All @@ -43,8 +45,8 @@ def gatkExceptionHook(exceptionType, value, traceback):
sends a nack to GATK through the FIFO, which results in a
PythonScriptExecutorException being thrown in the tool.
"""
sendNack()
sys.__excepthook__(exceptionType, value, traceback)
sendNackWithMessage(' '.join(map(str, traceback.format_exception(exceptionType, value, tracebck))))
sys.__excepthook__(exceptionType, value, tracebck)


def sendAck():
Expand All @@ -67,6 +69,16 @@ def sendNack():
_ackFIFO.writeNack()


def sendNackWithMessage(nckMessage: str):
"""
Send a negative acknowledgment to GATK, along with a message. Generally only
called by the installed exception hook. This will result in a Java exception being
thrown that unless caught by Java code, will terminate the tool.
"""
global _ackFIFO
_ackFIFO.writeNackWithMessage(nckMessage)


def terminateGATK():
"""
Called by GATK when no more Python commands will be executed
Expand Down Expand Up @@ -141,8 +153,6 @@ class AckFIFO:
Manage the FIFO used to notify GATK (via an ack) that a command has
completed, or failed due to an unhandled exception (via a nck).
"""
_ackString = "ack"
_nackString = "nck"

def __init__(self, ackFIFOName: str) -> None:
"""Open the ack fifo stream for writing only"""
Expand All @@ -156,21 +166,50 @@ def writeAck(self):
"""
if self.fileWriter is None:
raise RuntimeError("ack FIFO has not been initialized")
self.fileWriter.write(AckFIFO._ackString)
self.fileWriter.write(toolconstants._ackString)
self.fileWriter.flush()

def writeNack(self):
"""
Write a negative acknowledgement to the ACK FIFO.
Calling this method will result in an exception being thrown
in the GATK tool on whos behalf this module is running.
in the GATK tool on whose behalf this module is running.
"""
if self.fileWriter is None:
raise RuntimeError("ack FIFO has not been initialized")
self.fileWriter.write(toolconstants._nackString)
self.fileWriter.flush()

def writeNackWithMessage(self, message: str) -> None:
"""
Write a negative acknowledgement with a supplemental message to the ACK FIFO.
The format of a nck message is:
ncknnnnssss....
where "nck" is literal, "nnnn" is a 4 byte long string of decimal digits representing the length
of the message that follows, and "ssss...." is a string of length nnnn
Calling this method will result in an exception being thrown
in the GATK tool on whose behalf this module is running.
"""
if self.fileWriter is None:
raise RuntimeError("ack FIFO has not been initialized")
self.fileWriter.write(AckFIFO._nackString)
self.fileWriter.write(toolconstants._nkmString)
actualMessageLength = len(message)
"""The message length must be exactly 4 bytes"""
if len(str(actualMessageLength)) <= toolconstants._nckMessageLengthSerializedSize:
self.fileWriter.write(str(actualMessageLength).zfill(toolconstants._nckMessageLengthSerializedSize))
self.fileWriter.write(message)
else:
"""Message is too long, trim to 9999 bytes"""
self.fileWriter.write(str(toolconstants._nckMaxMessageLength))
self.fileWriter.write(message[:toolconstants._nckMaxMessageLength])
self.fileWriter.flush()


def close(self):
assert self.fileWriter != None
self.fileWriter.close()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""
Constants that must remain in sync with the companion StreamingProcessController Java
code in GATK. See StreamingToolConstants.java.
"""

"""
Command acknowledgement messages used to signal positive acknowledgement ('ack',
negative acknowledgement ('nck'), and negative acknowledgement with an accompanying
message ('nkm').
"""
_ackString = "ack"
_nackString = "nck"
_nkmString = "nkm"


"""
The length of a message written with a negative ack (nkm) must be 4 bytes long when
serialized as a string, and cannot have a value > 9999.
"""
_nckMessageLengthSerializedSize = 4
_nckMaxMessageLength = 9999
Loading

0 comments on commit 54b7e60

Please sign in to comment.