-
Notifications
You must be signed in to change notification settings - Fork 594
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add nck with message to StreamingProcessController for passing fatal error messages. #5170
Merged
Merged
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
75 changes: 75 additions & 0 deletions
75
src/main/java/org/broadinstitute/hellbender/utils/runtime/ProcessControllerAckResult.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
package org.broadinstitute.hellbender.utils.runtime; | ||
|
||
import org.broadinstitute.hellbender.exceptions.GATKException; | ||
|
||
/** | ||
* Command acknowledgements that are returned from a process managed by StreamingProcessController. | ||
* Ack results can be positive, negative, or negative with a message. | ||
*/ | ||
public class ProcessControllerAckResult { | ||
|
||
private final boolean isPositiveAck; | ||
private final String message; | ||
|
||
// three message types can be used by the remote process | ||
private static String displayMessageFormat = "%s received\n\n"; | ||
private static String ACK_LOG_MESSAGE = String.format(displayMessageFormat, StreamingToolConstants.STREAMING_ACK_MESSAGE); | ||
private static String NCK_LOG_MESSAGE = String.format(displayMessageFormat, StreamingToolConstants.STREAMING_NCK_MESSAGE); | ||
private static String NCK_WITH_MESSAGE_LOG_MESSAGE = String.format(displayMessageFormat, StreamingToolConstants.STREAMING_NCK_WITH_MESSAGE_MESSAGE); | ||
|
||
/** | ||
* Creates an ack result, for ACK or NCK. | ||
* @param isPositiveAck true for a positive ack (ACK), false for negative ack (NCK) | ||
*/ | ||
public ProcessControllerAckResult(final boolean isPositiveAck) { | ||
this.isPositiveAck = isPositiveAck; | ||
this.message = null; | ||
} | ||
|
||
/** | ||
* Creates an (negative NKM) ack result, with a message. | ||
* @param nckMessage Message detail indicating reason for negative ack (NKM). | ||
*/ | ||
public ProcessControllerAckResult(final String nckMessage) { | ||
this.isPositiveAck = false; | ||
this.message = nckMessage; | ||
} | ||
|
||
/** | ||
* @return true if this represents a positive ack, otherwise false | ||
*/ | ||
public boolean isPositiveAck() { | ||
return isPositiveAck; | ||
} | ||
|
||
/** | ||
* @return true if this ack is negative and includes a message | ||
*/ | ||
public boolean hasMessage() { | ||
return !isPositiveAck() && !message.isEmpty(); | ||
} | ||
|
||
/** | ||
* @return A (possibly empty) String with any message sent from the remote process. | ||
* Only defined for negative acknowledgements {@link #hasMessage()}. | ||
*/ | ||
public String getNegativeACKMessage() { | ||
if (isPositiveAck()) { | ||
throw new GATKException("Can only retrieve messages for negative acknowledgements"); | ||
} | ||
return message; | ||
} | ||
|
||
/** | ||
* @return A message string representing this ack/nck suitable for logging/display to the user. | ||
*/ | ||
public String getDisplayMessage() { | ||
if (isPositiveAck()) { | ||
return ACK_LOG_MESSAGE; | ||
} else if (hasMessage()) { | ||
return String.format("%s: %s", NCK_WITH_MESSAGE_LOG_MESSAGE, getNegativeACKMessage()); | ||
} else { | ||
return NCK_LOG_MESSAGE; | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
31 changes: 31 additions & 0 deletions
31
src/main/java/org/broadinstitute/hellbender/utils/runtime/StreamingToolConstants.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
package org.broadinstitute.hellbender.utils.runtime; | ||
|
||
/** | ||
* Various constants used by StreamingProcessController that require synchronized equivalents in | ||
* the companion process, i.e., if the streaming process is written in Python, there must be | ||
* equivalent Python constants for use by the Python code. | ||
* | ||
* See the equivalents for Python in toolcontants.py. | ||
*/ | ||
public class StreamingToolConstants { | ||
/** | ||
* Command acknowledgement messages used to signal positive acknowledgement ('ack'), | ||
* negative acknowledgement ('nck'), and negative acknowledgement with an accompanying | ||
* message ('nkm'). | ||
*/ | ||
public static String STREAMING_ACK_MESSAGE = "ack"; | ||
public static String STREAMING_NCK_MESSAGE = "nck"; | ||
public static String STREAMING_NCK_WITH_MESSAGE_MESSAGE = "nkm"; | ||
|
||
// This is only used by Java, but is kept here since it represents the length of the constant | ||
// strings defined above. | ||
protected static int STREAMING_ACK_MESSAGE_SIZE = 3; // "ack", "nck", or "nkm" | ||
|
||
/** | ||
* Number of characters used to represent the length of the serialized message, fixed at a constant | ||
* 4 characters to ensure we can deterministically know how much input to wait for when looking for | ||
* a message length in the incoming stream. | ||
*/ | ||
public static int STREAMING_NCK_WITH_MESSAGE_MESSAGE_LEN_SIZE = 4; | ||
public static int STREAMING_NCK_WITH_MESSAGE_MAX_MESSAGE_LENGTH = 9999; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
22 changes: 22 additions & 0 deletions
22
src/main/python/org/broadinstitute/hellbender/gatktool/toolconstants.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
""" | ||
Constants that must remain in sync with the companion StreamingProcessController Java | ||
code in GATK. See StreamingToolConstants.java. | ||
""" | ||
|
||
""" | ||
Command acknowledgement messages used to signal positive acknowledgement ('ack', | ||
negative acknowledgement ('nck'), and negative acknowledgement with an accompanying | ||
message ('nkm'). | ||
""" | ||
_ackString = "ack" | ||
_nackString = "nck" | ||
_nkmString = "nkm" | ||
|
||
|
||
""" | ||
The length of a message written with a negative ack (nkm) must be 4 bytes long when | ||
serialized as a string, and cannot have a value > 9999. | ||
""" | ||
_nckMessageLengthSerializedSize = 4 | ||
_nckMaxMessageLength = 9999 |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think it's worth putting a timeout here to make sure this doesn't hang forever? (I'm not sure whether it's necessary, or not.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd be reluctant to go back to having a timeout. The original version of the streaming controller had them, and it was about impossible to pick a timeout time that isn't triggered legitimately, at least occasionally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah - I vaguely remembered something about that.
That's fine.