Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix concurrency problem when running really fast streaming gcode #2109

Merged
merged 1 commit into from
Jan 4, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ This file is part of Universal Gcode Sender (UGS).
import java.util.Optional;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand All @@ -65,17 +67,17 @@ public abstract class AbstractController implements ICommunicatorListener, ICont
protected MessageService messageService;

// Added value
private Boolean isStreaming = false;
private final AtomicBoolean isStreaming = new AtomicBoolean(false);

// For keeping track of the time spent streaming a file
private final StopWatch streamStopWatch = new StopWatch();

// This metadata needs to be cached instead of looked up from queues and
// streams, because those sources may be compromised during a cancel.
private int numCommands = 0;
private int numCommandsSent = 0;
private int numCommandsSkipped = 0;
private int numCommandsCompleted = 0;
private final AtomicInteger numCommands = new AtomicInteger(0);
private final AtomicInteger numCommandsSent = new AtomicInteger(0);
private final AtomicInteger numCommandsSkipped = new AtomicInteger(0);
private final AtomicInteger numCommandsCompleted = new AtomicInteger(0);

// Commands become active after the Communicator notifies us that they have
// been sent.
Expand Down Expand Up @@ -387,7 +389,7 @@ public Boolean isCommOpen() {

@Override
public Boolean isStreaming() {
return this.isStreaming;
return isStreaming.get();
}

/**
Expand Down Expand Up @@ -416,13 +418,13 @@ private enum RowStat {
public int getRowStat(RowStat stat) {
switch (stat) {
case TOTAL_ROWS:
return this.numCommands;
return numCommands.get();
case ROWS_SENT:
return this.numCommandsSent;
return numCommandsSent.get();
case ROWS_COMPLETED:
return this.numCommandsCompleted + this.numCommandsSkipped;
return numCommandsCompleted.get() + numCommandsSkipped.get();
case ROWS_REMAINING:
return this.numCommands <= 0 ? 0 : this.numCommands - (this.numCommandsCompleted + this.numCommandsSkipped);
return numCommands.get() <= 0 ? 0 : numCommands.get() - (numCommandsCompleted.get() + numCommandsSkipped.get());
default:
throw new IllegalStateException("This should be impossible - RowStat default case.");
}
Expand Down Expand Up @@ -527,13 +529,13 @@ public void beginStreaming() throws Exception {
throw new Exception("There are no commands queued for streaming.");
}

this.isStreaming = true;
this.streamStopWatch.reset();
this.streamStopWatch.start();
this.numCommands = 0;
this.numCommandsSent = 0;
this.numCommandsSkipped = 0;
this.numCommandsCompleted = 0;
isStreaming.set(true);
streamStopWatch.reset();
streamStopWatch.start();
numCommands.set(0);
numCommandsSent.set(0);
numCommandsSkipped.set(0);
numCommandsCompleted.set(0);
updateNumCommands();

// Send all queued commands and streams then kick off the stream.
Expand All @@ -545,7 +547,7 @@ public void beginStreaming() throws Exception {

comm.streamCommands();
} catch(Exception e) {
this.isStreaming = false;
isStreaming.set(false);
this.streamStopWatch.reset();
this.comm.cancelSend();
throw e;
Expand Down Expand Up @@ -614,9 +616,11 @@ public void cancelJog() throws Exception {

@Override
public void cancelCommands() {
this.comm.cancelSend();
this.isStreaming = false;
if (this.streamStopWatch.isStarted()) this.streamStopWatch.stop();
comm.cancelSend();
isStreaming.set(false);
if (streamStopWatch.isStarted()) {
streamStopWatch.stop();
}
}

@Override
Expand All @@ -628,35 +632,33 @@ public void resetBuffers() {

// Reset send queue and idx's.
private void flushSendQueues() {
numCommands = 0;
numCommands.set(0);
}

private void updateNumCommands() {
if (streamCommands != null) {
numCommands = streamCommands.getNumRows();
numCommands.set(streamCommands.getNumRows());
}
numCommandsSkipped = 0;
numCommandsCompleted = 0;
numCommandsSent = 0;
numCommandsSkipped.set(0);
numCommandsCompleted.set(0);
numCommandsSent.set(0);
}

// No longer a listener event
protected void fileStreamComplete(String filename) {

String duration =
com.willwinder.universalgcodesender.Utils.
formattedMillis(this.getSendDuration());

this.dispatchConsoleMessage(MessageType.INFO,"\n**** Finished sending file in "+duration+" ****\n\n");
this.streamStopWatch.stop();
this.isStreaming = false;
String duration = Utils.formattedMillis(getSendDuration());
dispatchConsoleMessage(MessageType.INFO, String.format("%n**** Finished sending file in %s ****%n%n", duration));
if (streamStopWatch.isStarted()) {
streamStopWatch.stop();
}
isStreaming.set(false);
dispatchStreamComplete(filename);
}

@Override
public void commandSent(GcodeCommand command) {
if (this.isStreaming()) {
this.numCommandsSent++;
numCommandsSent.incrementAndGet();
}

command.setSent(true);
Expand Down Expand Up @@ -697,7 +699,7 @@ public void checkStreamFinished() {
@Override
public void commandSkipped(GcodeCommand command) {
if (this.isStreaming()) {
this.numCommandsSkipped++;
numCommandsSkipped.incrementAndGet();
}

StringBuilder message = new StringBuilder();
Expand Down Expand Up @@ -753,7 +755,7 @@ public void commandComplete(String response) throws UnexpectedCommand {
updateCommandFromResponse(command, response);
updateParserModalState(command);

numCommandsCompleted++;
numCommandsCompleted.incrementAndGet();

if (activeCommands.isEmpty()) {
setCurrentState(COMM_IDLE);
Expand Down