Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add nck with message to StreamingProcessController for passing fatal error messages. #5170

Merged
merged 2 commits into from
Oct 1, 2018

Conversation

cmnbroad
Copy link
Collaborator

This adds a new message to the StreamingProcessController ack FIFO protocol to allow additional message detail to be passed as part of a negative ack. Fixes #5100.

Copy link
Collaborator

@jonn-smith jonn-smith left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few minor changes and a couple of questions.

Back to you, @cmnbroad

private static String NCK_LOG_MESSAGE = "Nck received\n\n";
private static String NCK_WITH_MESSAGE_LOG_MESSAGE = "Nkm received\n\n";

public ProcessControllerAckResult(final boolean isPositiveAck) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If an ack can't have a message associated with it, I suggest changing the constructors to formally disallow acks with messages.

i.e. instead of ProcessControllerAckResult(final boolean isPositiveAck, final String message) have ProcessControllerAckResult(final String message) and comment it that it creates a nkm.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

private static String NCK_LOG_MESSAGE = "Nck received\n\n";
private static String NCK_WITH_MESSAGE_LOG_MESSAGE = "Nkm received\n\n";

public ProcessControllerAckResult(final boolean isPositiveAck) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add javadoc to the constructors?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

/**
*
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra white space.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

private final String message;

// three message types can be used by the remote process
private static String ACK_LOG_MESSAGE = "Ack received\n\n";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can reference the string representations for the ack types from StreamingTrocessController to create these strings.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True that. done.

public static String NCK_WITH_MESSAGE_MESSAGE = "nkm";
private static int ACK_MESSAGE_SIZE = 3; // "ack", "nck", or "nkm"

private static int NCK_WITH_MESSAGE_MESSAGE_LEN_SIZE = 4; // length of serialized message size
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you update this comment to say that it's the number of characters used to represent the length? It makes sense, but took me a sec to realize that this and NCK_WITH_MESSAGE_MAX_MESSAGE_LENGTH are closely releated.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I beefed up the comment a bit.

@@ -203,24 +209,27 @@ public void openAckFIFOForRead() {
* @return true if an positive acknowledgement (ACK) was received, false if a negative acknowledgement (NCK)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update javadoc to reflect new return value

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

private ProcessControllerAckResult getNckWithMessageResult() throws IOException {
// look for a 4 byte long string with a 4 byte decimal integer with a value < 9999
final byte[] nckMessageLengthBuffer = new byte[NCK_WITH_MESSAGE_MESSAGE_LEN_SIZE];
final int nMessageLengthBytes = ackFIFOInputStream.read(nckMessageLengthBuffer, 0, NCK_WITH_MESSAGE_MESSAGE_LEN_SIZE);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any concern about the case where python sends a nkm length, but it's split over two sends?

That is, for a length 1234 message, do you need to worry about python sending 12, then 34 later?

It's not clear this will actually ever happen, just wondering.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah - thats a good catch. The whole reason I chose a constant (4) for this length was to make this process of consuming it deterministic, and the Python side flushes the output stream after it writes the length and message. But I think you're right that there is still a chance that read could return before having consumed the whole 4 bytes. Luckily, I can hijack the getNckMessage method below, which was originally for retrieving the message string, and also use it to retrieve the length, since it already loops until it gets the number of bytes its expecting. Will need to rename it though.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I updated all of the places where we read from the stream now to use the (newly renamed) method getBytesFromStream to make sure we always either get the expected number of bytes, or throw.

final StringBuilder sb = new StringBuilder();
while (nBytesReceived < expectedNckMessageLength) {
final byte[] nckMessage = new byte[expectedNckMessageLength];
int readLen = ackFIFOInputStream.read(nckMessage, 0, expectedNckMessageLength);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to update the expected number of bytes to read with subsequent reads?

That is, if you expect to get 1234 bytes, I think you need to update the number of bytes to read in each loop iteration by subtracting the number of bytes actually read last time.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah - that would prevent overconsumption of the stream. Done.

in the GATK tool on whose behalf this module is running.
"""
"""The length of the message to be written must be 4 bytes long when serialized as a string"""
nckMaxMessageLength = 9999
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be worth pulling out all constants/magic numbers into a single class for the python and another the java.

Then it might be easier to make sure they're kept in sync.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we're racking up more of these now. That is an excellent idea.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@jonn-smith jonn-smith assigned cmnbroad and unassigned jonn-smith Sep 18, 2018
@codecov-io
Copy link

Codecov Report

Merging #5170 into master will increase coverage by 0.212%.
The diff coverage is 75.789%.

@@               Coverage Diff               @@
##              master     #5170       +/-   ##
===============================================
+ Coverage     86.743%   86.956%   +0.212%     
- Complexity     29470     31848     +2378     
===============================================
  Files           1818      1848       +30     
  Lines         136436    145878     +9442     
  Branches       15125     16953     +1828     
===============================================
+ Hits          118349    126849     +8500     
- Misses         12647     13342      +695     
- Partials        5440      5687      +247
Impacted Files Coverage Δ Complexity Δ
...er/utils/python/StreamingPythonScriptExecutor.java 85.345% <100%> (ø) 22 <0> (ø) ⬇️
...nder/utils/runtime/StreamingProcessController.java 74.82% <65.789%> (+3.528%) 43 <5> (+8) ⬆️
.../python/StreamingPythonScriptExecutorUnitTest.java 83.871% <66.667%> (-1.085%) 16 <1> (+1)
...ls/runtime/StreamingProcessControllerUnitTest.java 93.694% <85%> (-2.051%) 18 <3> (+2)
...nder/utils/runtime/ProcessControllerAckResult.java 86.364% <86.364%> (ø) 10 <10> (?)
...rmats/collections/AnnotatedIntervalCollection.java 72.165% <0%> (-27.835%) 19% <0%> (+14%)
...te/hellbender/utils/tsv/TableColumnCollection.java 90.789% <0%> (-6.225%) 34% <0%> (+2%)
...ead/markduplicates/sparkrecords/EmptyFragment.java 80% <0%> (-4.615%) 9% <0%> (+4%)
...e/hellbender/engine/spark/SparkContextFactory.java 71.233% <0%> (-2.74%) 11% <0%> (ø)
...tute/hellbender/utils/GenomeLocParserUnitTest.java 86.582% <0%> (-2.116%) 78% <0%> (+32%)
... and 118 more

@cmnbroad
Copy link
Collaborator Author

Back to you @jonn-smith.

@cmnbroad cmnbroad assigned jonn-smith and unassigned cmnbroad Sep 24, 2018
Copy link
Collaborator

@jonn-smith jonn-smith left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have one question about how the java reads the bytes from python (whether there should be a wall-clock timeout).

I'm not sure it's a problem, so if you think it's worth addressing then go for it. Otherwise, it looks good to me.

while (nBytesReceived < expectedNckMessageLength) {
final byte[] nckMessage = new byte[expectedNckMessageLength];
int readLen = ackFIFOInputStream.read(nckMessage, 0, expectedNckMessageLength);
while (nBytesReceived < expectedMessageLength) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it's worth putting a timeout here to make sure this doesn't hang forever? (I'm not sure whether it's necessary, or not.)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be reluctant to go back to having a timeout. The original version of the streaming controller had them, and it was about impossible to pick a timeout time that isn't triggered legitimately, at least occasionally.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah - I vaguely remembered something about that.

That's fine.

@cmnbroad cmnbroad merged commit ad8be9b into master Oct 1, 2018
EdwardDixon pushed a commit to EdwardDixon/gatk that referenced this pull request Nov 9, 2018
@cmnbroad cmnbroad deleted the cn_python_exception branch December 13, 2018 15:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants