Skip to content

Commit

Permalink
Code review comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
cmnbroad committed Sep 21, 2018
1 parent c113155 commit 6638229
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,27 @@ public class ProcessControllerAckResult {
private final String message;

// three message types can be used by the remote process
private static String ACK_LOG_MESSAGE = "Ack received\n\n";
private static String NCK_LOG_MESSAGE = "Nck received\n\n";
private static String NCK_WITH_MESSAGE_LOG_MESSAGE = "Nkm received\n\n";
private static String displayMessageFormat = "%s received\n\n";
private static String ACK_LOG_MESSAGE = String.format(displayMessageFormat, StreamingToolConstants.STREAMING_ACK_MESSAGE);
private static String NCK_LOG_MESSAGE = String.format(displayMessageFormat, StreamingToolConstants.STREAMING_NCK_MESSAGE);
private static String NCK_WITH_MESSAGE_LOG_MESSAGE = String.format(displayMessageFormat, StreamingToolConstants.STREAMING_NCK_WITH_MESSAGE_MESSAGE);

/**
* Creates an ack result, for ACK or NCK.
* @param isPositiveAck true for a positive ack (ACK), false for negative ack (NCK)
*/
public ProcessControllerAckResult(final boolean isPositiveAck) {
this(isPositiveAck, "");
this.isPositiveAck = isPositiveAck;
this.message = null;
}

public ProcessControllerAckResult(final boolean isPositiveAck, final String message) {
this.isPositiveAck = isPositiveAck;
this.message = message;
/**
* Creates an (negative NKM) ack result, with a message.
* @param nckMessage Message detail indicating reason for negative ack (NKM).
*/
public ProcessControllerAckResult(final String nckMessage) {
this.isPositiveAck = false;
this.message = nckMessage;
}

/**
Expand All @@ -40,7 +50,6 @@ public boolean hasMessage() {
}

/**
*
* @return A (possibly empty) String with any message sent from the remote process.
* Only defined for negative acknowledgements {@link #hasMessage()}.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.broadinstitute.hellbender.utils.runtime;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.Files;
import org.apache.commons.io.IOUtils;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -38,17 +37,6 @@ public final class StreamingProcessController extends ProcessControllerBase<Capt
private File ackFIFOFile = null;
private File dataFIFOFile = null;

// These strings must be kept in sync with the ones used in the Python package
public static String ACK_MESSAGE = "ack";
public static String NCK_MESSAGE = "nck";
public static String NCK_WITH_MESSAGE_MESSAGE = "nkm";
private static int ACK_MESSAGE_SIZE = 3; // "ack", "nck", or "nkm"

private static int NCK_WITH_MESSAGE_MESSAGE_LEN_SIZE = 4; // length of serialized message size

@VisibleForTesting
static int NCK_WITH_MESSAGE_MAX_MESSAGE_LENGTH = 9999; // we cap messages at the max length represented in 4 decimal digits

private InputStream ackFIFOInputStream;
private Future<ProcessControllerAckResult> ackFuture;

Expand Down Expand Up @@ -206,8 +194,9 @@ public void openAckFIFOForRead() {
/**
* Wait for a previously requested acknowledgement to be received. The remote process can deliver a positive
* ack to indicate successful command completion, or a negative ack to indicate command execution failure.
* @return true if an positive acknowledgement (ACK) was received, false if a negative acknowledgement (NCK)
* was received
* @return a {@link ProcessControllerAckResult} indicating the resulting ack state.
* {@link ProcessControllerAckResult#isPositiveAck()} will return true for a positive acknowledgement (ACK),
* false if a negative acknowledgement (NCK)
*/
public ProcessControllerAckResult waitForAck() {
if (ackFuture != null) {
Expand All @@ -216,20 +205,17 @@ public ProcessControllerAckResult waitForAck() {
ackFuture = executorService.submit(
() -> {
try {
final byte[] ack = new byte[ACK_MESSAGE_SIZE];
int nBytes = ackFIFOInputStream.read(ack, 0, ACK_MESSAGE_SIZE);
if (nBytes < ACK_MESSAGE_SIZE) {
throw new GATKException(String.format("Failure reading ack message from ack fifo, ret: (%d)", nBytes));
} else if (Arrays.equals(ack, ACK_MESSAGE.getBytes())) {
final String ackMessage = getBytesFromStream(StreamingToolConstants.STREAMING_ACK_MESSAGE_SIZE);
if (ackMessage.equals(StreamingToolConstants.STREAMING_ACK_MESSAGE)) {
return new ProcessControllerAckResult(true);
} else if (Arrays.equals(ack, NCK_MESSAGE.getBytes())) {
} else if (ackMessage.equals(StreamingToolConstants.STREAMING_NCK_MESSAGE)) {
return new ProcessControllerAckResult(false);
} else if (Arrays.equals(ack, NCK_WITH_MESSAGE_MESSAGE.getBytes())) {
} else if (ackMessage.equals(StreamingToolConstants.STREAMING_NCK_WITH_MESSAGE_MESSAGE)) {
return getNckWithMessageResult();
} else {
final String badAckMessage = "An unrecognized ack string message was written to ack fifo";
logger.error(badAckMessage);
return new ProcessControllerAckResult(false, badAckMessage);
return new ProcessControllerAckResult(badAckMessage);
}
} catch (IOException e) {
throw new GATKException("IOException reading from ack fifo", e);
Expand All @@ -250,35 +236,38 @@ public ProcessControllerAckResult waitForAck() {

// Retrieve a nkm message from the input stream and package it up as a ProcessControllerAckResult
private ProcessControllerAckResult getNckWithMessageResult() throws IOException {
// look for a 4 byte long string with a 4 byte decimal integer with a value < 9999
final byte[] nckMessageLengthBuffer = new byte[NCK_WITH_MESSAGE_MESSAGE_LEN_SIZE];
final int nMessageLengthBytes = ackFIFOInputStream.read(nckMessageLengthBuffer, 0, NCK_WITH_MESSAGE_MESSAGE_LEN_SIZE);
if (nMessageLengthBytes < NCK_WITH_MESSAGE_MESSAGE_LEN_SIZE) {
throw new GATKException(String.format("Nck message length value must be %d bytes", NCK_WITH_MESSAGE_MESSAGE_LEN_SIZE));
}

final int expectedNckMessageLength = Integer.valueOf(new String(nckMessageLengthBuffer));
if (expectedNckMessageLength < 0) {
// look for a 4 byte long string with a 4 byte decimal integer with a value <= 9999
final String messageLengthString = getBytesFromStream(StreamingToolConstants.STREAMING_NCK_WITH_MESSAGE_MESSAGE_LEN_SIZE);
final int messageLength = Integer.valueOf(messageLengthString);
if (messageLength < 0) {
throw new GATKException("Negative ack message length must be > 0");
}

final String nckMessage = getNckMessage(expectedNckMessageLength);
return new ProcessControllerAckResult(false, nckMessage);
// now get the corresponding message of that length messageLength
final String nckMessage = getBytesFromStream(messageLength);
return new ProcessControllerAckResult(nckMessage);
}

// Given the expected message length, retrieve the entire message from the input stream.
private String getNckMessage(int expectedNckMessageLength) throws IOException {
// Retrieve a given number of bytes from the stream and return the value as a String.
private String getBytesFromStream(final int expectedMessageLength) throws IOException {
int nBytesReceived = 0;
int nBytesRemaining = expectedMessageLength;
final StringBuilder sb = new StringBuilder();
while (nBytesReceived < expectedNckMessageLength) {
final byte[] nckMessage = new byte[expectedNckMessageLength];
int readLen = ackFIFOInputStream.read(nckMessage, 0, expectedNckMessageLength);
while (nBytesReceived < expectedMessageLength) {
final byte[] nckMessage = new byte[nBytesRemaining];
int readLen = ackFIFOInputStream.read(nckMessage, 0, nBytesRemaining);
if (readLen <= 0) {
throw new GATKException(
String.format("Expected message of length %d but only found %d bytes", expectedMessageLength, nBytesReceived));
}

sb.append(new String(nckMessage, 0, readLen));
nBytesRemaining -= readLen;
nBytesReceived += readLen;
}
if (nBytesReceived != expectedNckMessageLength) {
if (nBytesReceived != expectedMessageLength) {
throw new GATKException(
String.format("Expected message of length %d but found %d", expectedNckMessageLength, nBytesReceived));
String.format("Expected message of length %d but found %d", expectedMessageLength, nBytesReceived));
}
return sb.toString();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package org.broadinstitute.hellbender.utils.runtime;

/**
* Various constants used by StreamingProcessController that require synchronized equivalents in
* the companion process, i.e., if the streaming process is written in Python, there must be
* equivalent Python constants for use by the Python code.
*
* See the equivalents for Python in toolcontants.py.
*/
public class StreamingToolConstants {
/**
* Command acknowledgement messages used to signal positive acknowledgement ('ack'),
* negative acknowledgement ('nck'), and negative acknowledgement with an accompanying
* message ('nkm').
*/
public static String STREAMING_ACK_MESSAGE = "ack";
public static String STREAMING_NCK_MESSAGE = "nck";
public static String STREAMING_NCK_WITH_MESSAGE_MESSAGE = "nkm";

// This is only used by Java, but is kept here since it represents the length of the constant
// strings defined above.
protected static int STREAMING_ACK_MESSAGE_SIZE = 3; // "ack", "nck", or "nkm"

/**
* Number of characters used to represent the length of the serialized message, fixed at a constant
* 4 characters to ensure we can deterministically know how much input to wait for when looking for
* a message length in the incoming stream.
*/
public static int STREAMING_NCK_WITH_MESSAGE_MESSAGE_LEN_SIZE = 4;
public static int STREAMING_NCK_WITH_MESSAGE_MAX_MESSAGE_LENGTH = 9999;
}
21 changes: 8 additions & 13 deletions src/main/python/org/broadinstitute/hellbender/gatktool/tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import os
import cProfile, pstats, io
import traceback
from gatktool import toolconstants

_ackFIFO = None
_dataFIFO = None
Expand Down Expand Up @@ -152,9 +153,6 @@ class AckFIFO:
Manage the FIFO used to notify GATK (via an ack) that a command has
completed, or failed due to an unhandled exception (via a nck).
"""
_ackString = "ack"
_nackString = "nck"
_nkmString = "nkm"

def __init__(self, ackFIFOName: str) -> None:
"""Open the ack fifo stream for writing only"""
Expand All @@ -168,7 +166,7 @@ def writeAck(self):
"""
if self.fileWriter is None:
raise RuntimeError("ack FIFO has not been initialized")
self.fileWriter.write(AckFIFO._ackString)
self.fileWriter.write(toolconstants._ackString)
self.fileWriter.flush()

def writeNack(self):
Expand All @@ -180,7 +178,7 @@ def writeNack(self):
"""
if self.fileWriter is None:
raise RuntimeError("ack FIFO has not been initialized")
self.fileWriter.write(AckFIFO._nackString)
self.fileWriter.write(toolconstants._nackString)
self.fileWriter.flush()

def writeNackWithMessage(self, message: str) -> None:
Expand All @@ -197,21 +195,18 @@ def writeNackWithMessage(self, message: str) -> None:
Calling this method will result in an exception being thrown
in the GATK tool on whose behalf this module is running.
"""
"""The length of the message to be written must be 4 bytes long when serialized as a string"""
nckMaxMessageLength = 9999
nckMessageLengthSerializedSize = 4
if self.fileWriter is None:
raise RuntimeError("ack FIFO has not been initialized")
self.fileWriter.write(AckFIFO._nkmString)
self.fileWriter.write(toolconstants._nkmString)
actualMessageLength = len(message)
"""The message length must be exactly 4 bytes"""
if len(str(actualMessageLength)) <= nckMessageLengthSerializedSize:
self.fileWriter.write(str(actualMessageLength).zfill(nckMessageLengthSerializedSize))
if len(str(actualMessageLength)) <= toolconstants._nckMessageLengthSerializedSize:
self.fileWriter.write(str(actualMessageLength).zfill(toolconstants._nckMessageLengthSerializedSize))
self.fileWriter.write(message)
else:
"""Message is too long, trim to 9999 bytes"""
self.fileWriter.write(str(nckMaxMessageLength))
self.fileWriter.write(message[:nckMaxMessageLength])
self.fileWriter.write(str(toolconstants._nckMaxMessageLength))
self.fileWriter.write(message[:toolconstants._nckMaxMessageLength])
self.fileWriter.flush()


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""
Constants that must remain in sync with the companion StreamingProcessController Java
code in GATK. See StreamingToolConstants.java.
"""

"""
Command acknowledgement messages used to signal positive acknowledgement ('ack',
negative acknowledgement ('nck'), and negative acknowledgement with an accompanying
message ('nkm').
"""
_ackString = "ack"
_nackString = "nck"
_nkmString = "nkm"


"""
The length of a message written with a negative ack (nkm) must be 4 bytes long when
serialized as a string, and cannot have a value > 9999.
"""
_nckMessageLengthSerializedSize = 4
_nckMaxMessageLength = 9999
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,14 @@ private Object[][] getNckMessages() {
{ "1", "1" }, // message of length 1
{ "Test roundtrip negative ack with message protocol", "Test roundtrip negative ack with message protocol" },
// one byte less than max
{ Utils.dupChar('s', StreamingProcessController.NCK_WITH_MESSAGE_MAX_MESSAGE_LENGTH - 1),
Utils.dupChar('s', StreamingProcessController.NCK_WITH_MESSAGE_MAX_MESSAGE_LENGTH - 1) },
{ Utils.dupChar('s', StreamingToolConstants.STREAMING_NCK_WITH_MESSAGE_MAX_MESSAGE_LENGTH - 1),
Utils.dupChar('s', StreamingToolConstants.STREAMING_NCK_WITH_MESSAGE_MAX_MESSAGE_LENGTH - 1) },
// exactly max bytes
{ Utils.dupChar('s', StreamingProcessController.NCK_WITH_MESSAGE_MAX_MESSAGE_LENGTH),
Utils.dupChar('s', StreamingProcessController.NCK_WITH_MESSAGE_MAX_MESSAGE_LENGTH) }, // 9999
{ Utils.dupChar('s', StreamingToolConstants.STREAMING_NCK_WITH_MESSAGE_MAX_MESSAGE_LENGTH),
Utils.dupChar('s', StreamingToolConstants.STREAMING_NCK_WITH_MESSAGE_MAX_MESSAGE_LENGTH) }, // 9999
// max bytes + 1000, trimmed to max
{ Utils.dupChar('s', StreamingProcessController.NCK_WITH_MESSAGE_MAX_MESSAGE_LENGTH + 1000),
Utils.dupChar('s', StreamingProcessController.NCK_WITH_MESSAGE_MAX_MESSAGE_LENGTH) }, // > 9999 is trimmed to 9999
{ Utils.dupChar('s', StreamingToolConstants.STREAMING_NCK_WITH_MESSAGE_MAX_MESSAGE_LENGTH + 1000),
Utils.dupChar('s', StreamingToolConstants.STREAMING_NCK_WITH_MESSAGE_MAX_MESSAGE_LENGTH) }, // > 9999 is trimmed to 9999
};
}

Expand Down

0 comments on commit 6638229

Please sign in to comment.