Skip to content

Commit

Permalink
Improve command queue handling
Browse files Browse the repository at this point in the history
  • Loading branch information
docbender committed Feb 2, 2021
1 parent 192a41e commit 5400865
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.Nullable;
Expand Down Expand Up @@ -107,6 +109,8 @@ public class ProcessDataResult {

private final AtomicBoolean reconnecting = new AtomicBoolean(false);

protected final Lock processLock = new ReentrantLock();

AtomicLong readed = new AtomicLong(0);
AtomicLong readedBytes = new AtomicLong(0);
long metricsStart = 0;
Expand Down Expand Up @@ -153,11 +157,14 @@ public void dispose() {
return;
}
disposed = true;

logger.debug("{} - Disposing...", toString());
close();

if (periodicJob != null) {
periodicJob.cancel(true);
periodicJob = null;
logger.debug("{} - Periodic job cancelled", toString());
}
}

Expand Down Expand Up @@ -298,6 +305,33 @@ protected void offerNewDataCheckPriority(int deviceAddress) throws InterruptedEx
protected void offerNewDataCheckPriority(int deviceAddress, boolean forceAllDataAsNew) throws InterruptedException {
SimpleBinaryItemData data = SimpleBinaryProtocol.compileNewDataFrame(deviceAddress, forceAllDataAsNew);

// is already there?
if (!commandQueue.isEmpty()) {
var first = commandQueue.getFirst().getData();
if (first.length == 4) {
boolean match = true;
for (int i = 0; i < 4; i++) {
if (first[i] != data.getData()[i]) {
match = false;
break;
}
}
if (match) {
return;
}
}
// is there "check" without force
for (SimpleBinaryItemData qitem : commandQueue) {
if (qitem.getData().length != 4) {
continue;
}
// remove it if match
if (qitem.getData()[0] == data.getData()[0] && qitem.getData()[1] == (byte) 0xD0) {
commandQueue.remove(qitem);
break;
}
}
}
commandQueue.addFirst(data);
}

Expand Down Expand Up @@ -364,7 +398,7 @@ protected boolean matchInQueueDataPacket(SimpleBinaryItemData item) {

for (SimpleBinaryItemData qitem : commandQueue) {
if (qitem.getData().length != item.getData().length) {
break;
continue;
}

for (int i = 0; i < qitem.getData().length; i++) {
Expand Down Expand Up @@ -393,7 +427,6 @@ protected boolean matchInQueueTarget(SimpleBinaryItemData item, Boolean remove)
}

for (SimpleBinaryItemData qitem : commandQueue) {

if (qitem.deviceId == item.deviceId && qitem.itemAddress == item.itemAddress) {
if (remove) {
commandQueue.remove(qitem);
Expand All @@ -418,30 +451,20 @@ protected void processCommandQueue(int thisDeviceOnly) {
logger.debug("{} - Processing commandQueue - length {}. Only device {}. Thread={}", toString(),
commandQueue.size(), thisDeviceOnly, Thread.currentThread().getId());
}

// no reply expected
if (!canSend(thisDeviceOnly)) {
// queue is empty -> exit
if (commandQueue.isEmpty()) {
return;
}
// no reply expected - no command processed
if (!canSend(thisDeviceOnly) || !processLock.tryLock()) {
if (logger.isDebugEnabled()) {
logger.debug("{} - Processing commandQueue - waiting", this.toString());
}
return;
}

/*
* if (!lock.tryLock()) {
* if (logger.isDebugEnabled()) {
* logger.debug("{} - CommandQueue locked. Leaving processCommandQueue.", toString());
* }
* return;
* }
*/
SimpleBinaryItemData dataToSend = null;

try {
// queue is empty -> exit
if (commandQueue.isEmpty()) {
return;
}
SimpleBinaryItemData dataToSend = null;

for (SimpleBinaryItemData i : commandQueue) {
if (i.deviceId == thisDeviceOnly) {
Expand All @@ -450,14 +473,13 @@ protected void processCommandQueue(int thisDeviceOnly) {
break;
}
}
if (dataToSend != null) {
sendDataOut(dataToSend);
}
} catch (Exception e) {
logger.error("{} - Processing commandQueue(int) - error: {}", this.toString(), e.getMessage());
} finally {
// lock.unlock();
}

if (dataToSend != null) {
sendDataOut(dataToSend);
processLock.unlock();
}
}

Expand All @@ -474,30 +496,20 @@ protected void processCommandQueue() {
logger.debug("{} - Processing commandQueue - length {}. Thread={}", toString(), commandQueue.size(),
Thread.currentThread().getId());
}

// no reply expected
if (!canSend()) {
// queue is empty -> exit
if (commandQueue.isEmpty()) {
return;
}
// no reply expected or already in process
if (!canSend() || !processLock.tryLock()) {
if (logger.isDebugEnabled()) {
logger.debug("{} - Processing commandQueue - waiting", this.toString());
}
return;
}

/*
* if (!lock.tryLock()) {
* if (logger.isDebugEnabled()) {
* logger.debug("{} - CommandQueue locked. Leaving processCommandQueue.", toString());
* }
* return;
* }
*/
SimpleBinaryItemData dataToSend = null;

try {
// queue is empty -> exit
if (commandQueue.isEmpty()) {
return;
}
SimpleBinaryItemData dataToSend = null;

// check first command in queue
SimpleBinaryItemData firstdata = commandQueue.pollFirst();
Expand Down Expand Up @@ -623,18 +635,18 @@ protected void processCommandQueue() {
return;
}
}

if (dataToSend != null) {
sendDataOut(dataToSend);
}
} catch (Exception e) {
logger.error("{} - Processing commandQueue - error: {}", this.toString(), e.toString());
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw);
logger.error("Stacktrace: {}", sw.toString());
} finally {
// lock.unlock();
}

if (dataToSend != null) {
sendDataOut(dataToSend);
processLock.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,11 @@ public void close(String reason) {
*/
@Override
protected boolean canSend() {
return !waitingForAnswer.get();
if (waitingForAnswer.get()) {
return false;
}

return super.canSend();
}

/*
Expand Down

0 comments on commit 5400865

Please sign in to comment.