Skip to content

Commit

Permalink
Implements sending message to unknown alias (#197)
Browse files Browse the repository at this point in the history
This PR adds code to support the case when an outgoing message addresses a Node for which we do not have a valid alias in the AliasMap. This can happen in multiple cases, some more frequent than others:
- There was a network partition thus we did not see that node at the time when we pre-filled the alias map after startup.
- The node purposefully released its alias using an AMR frame (this is standards compliant).
- The target node is a virtual node that does not yet exist, such as a train node operated by a command station.

The mechanism to do this is the following:
- The AliasMap has a new API to listen to all(...) calls. The MessageBuilder sings up for getting notifications like this.
- The MessageBuilder class, when seeing a miss in the AliasMap lookup for an outgoing addressed message, will put this message into a queue by destination ID. It emits a global verify node ID for the given target node ID instead of the specific message.
- Normally, the target node answers this, and that answer initializes its entry in the AliasMap.
- When this new alias is added to the AliasMap, the MessageBuilder is invoked and takes the pending messages out of the queue ("unblocked messages").
- Upon sending *any* outgoing message, the MessageBuilder first flushes any unblocked messages it has in the queue.
- An additional mechanism is added to MessageBuilder and CanInterface that allows waking up the output message processing thread when an unblocked message is found ("trigger message"). This is needed because the openlcb library does not make an assumption on what the threading model of the application is.


Misc fixes:
- makes AliasMap thread-safe.
- Fixes some comments.

===

* Makes the alias map thread safe.
Adds a listener API to alias map to know when new aliases are added.

* fixup comment.

* Makes the MessageBuilder stash away addressed messages where we don't know the
destination alias yet. Instead, we emit a VerifyNodeId message.

These pending messages are released automatically when the alias definition of the seeked node arrives.

Adds a "null message". This message does not turn into any outgoing frame.

* Generates a trigger message when an unblocked message is found after an
incoming frame.
  • Loading branch information
balazsracz authored Mar 7, 2022
1 parent 9d7e5ce commit 3b976a1
Show file tree
Hide file tree
Showing 5 changed files with 262 additions and 15 deletions.
30 changes: 24 additions & 6 deletions src/org/openlcb/can/AliasMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import org.openlcb.NodeID;

import java.util.ArrayList;

/**
* Maintains a 2-way map between nodes and CAN node ID aliases.
*<p>
Expand All @@ -16,7 +18,18 @@ public AliasMap() {
}
java.util.HashMap<NodeID, Integer> iMap = new java.util.HashMap<NodeID, Integer>();
java.util.HashMap<Integer, NodeID> nMap = new java.util.HashMap<Integer, NodeID>();

java.util.List<Watcher> watchers = new ArrayList<>();

/// This interface allows an external component to watch for newly discovered aliases.
public interface Watcher {
/// Called when a new alias was discovered.
void aliasAdded(NodeID id, int alias);
}

public synchronized void addWatcher(Watcher w) {
watchers.add(w);
}

public void processFrame(OpenLcbCanFrame f) {
// check type
if (f.isInitializationComplete() || f.isVerifiedNID() || f.isAliasMapDefinition()) {
Expand All @@ -30,23 +43,28 @@ public void processFrame(OpenLcbCanFrame f) {
}

public void insert(int alias, NodeID nid) {
nMap.put(alias, nid);
iMap.put(nid, alias);
synchronized (this) {
nMap.put(alias, nid);
iMap.put(nid, alias);
}
for (Watcher w: watchers) {
w.aliasAdded(nid, alias);
}
}

public void remove(int alias) {
public synchronized void remove(int alias) {
NodeID nid = getNodeID(alias);
if (nid == null) return;
nMap.remove(alias);
iMap.remove(nid);
}

public NodeID getNodeID(int alias) {
public synchronized NodeID getNodeID(int alias) {
NodeID retVal = nMap.get(Integer.valueOf(alias));
if (retVal != null) return retVal;
else return new NodeID();
}
public int getAlias(NodeID nid) {
public synchronized int getAlias(NodeID nid) {
Integer r = iMap.get(nid);
if (r == null) return -1;
else return r.intValue();
Expand Down
7 changes: 5 additions & 2 deletions src/org/openlcb/can/CanInterface.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

/**
* CanInterface collects all objects necessary to operate a standards-compliant node that connects
* via CAN-bus.
* via CAN-bus. It creates the OlcbInterface internally.
*
* Created by bracz on 12/27/15.
*/
Expand Down Expand Up @@ -64,7 +64,7 @@ public CanInterface(NodeID interfaceId, CanFrameListener frameOutput, ThreadPool
this.nodeId = interfaceId;

// Creates high-level OpenLCB interface.
olcbInterface = new OlcbInterface(nodeId, frameRenderer,threadPool);
olcbInterface = new OlcbInterface(nodeId, frameRenderer, threadPool);

// Creates CAN-level OpenLCB objects.
aliasMap = new AliasMap();
Expand Down Expand Up @@ -150,6 +150,9 @@ public void send(CanFrame frame) {
aliasWatcher.send(frame);
aliasMap.processFrame(new OpenLcbCanFrame(frame));
List<Message> l = messageBuilder.processFrame(frame);
if (messageBuilder.foundUnblockedMessage()) {
olcbInterface.getOutputConnection().put(messageBuilder.getTriggerMessage(), null);
}
if (l == null) return;
for (Message m : l) {
olcbInterface.getInputConnection().put(m, null);
Expand Down
155 changes: 150 additions & 5 deletions src/org/openlcb/can/MessageBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.openlcb.*;
Expand All @@ -23,7 +24,7 @@
* @author Bob Jacobsen Copyright 2010
* @version $Revision$
*/
public class MessageBuilder {
public class MessageBuilder implements AliasMap.Watcher {

private final static Logger logger = Logger.getLogger(MessageBuilder.class.getName());
/**
Expand All @@ -33,10 +34,66 @@ public class MessageBuilder {
*/
public MessageBuilder(AliasMap map) {
this.map = map;
map.addWatcher(this);
}

AliasMap map;



private static class BlockedMessage {
BlockedMessage(Message m) {
message = m;
}
/// Addressed message that was sent earlier.
Message message;
/// When this message was sent.
long timestampMsec;

/// Notifies that this message is now sent. This is not used yet; will be necessary to
// implement a linked list sorted by timestamp.
void release() {}
}

/// Stores addressed messages where we don't know the destination alias to.
private final Map<NodeID, List<BlockedMessage> > blockedMessages = new HashMap<>();

/// Stores addressed messages that we recently got the destination alias.
private List<Message> unblockedMessages = new ArrayList<>();

/// This value is atomically set to true if we found new unblocked messages. Can be reset by
// a call to foundUnblockedMessage.
private boolean haveUnblockedMessages = false;

/// Callback from the alias map when a new alias is inserted. This is used to determine if we
// are holding on to some messages that are unsent due to missing destination alias.
@Override
public void aliasAdded(NodeID id, int alias) {
if (alias <= 0) {
// Ignores invalid aliases.
return;
}
synchronized (blockedMessages) {
List<BlockedMessage> l = blockedMessages.get(id);
if (l == null) {
return;
}
for (BlockedMessage bm: l) {
unblockedMessages.add(bm.message);
haveUnblockedMessages = true;
}
l.clear();
}
}

/// @return true exactly once after we found some unblocked messages.
boolean foundUnblockedMessage() {
synchronized (blockedMessages) {
boolean ret = haveUnblockedMessages;
haveUnblockedMessages = false;
return ret;
}
}

/**
* Accept a frame, and convert to
* a standard OpenLCB Message object.
Expand Down Expand Up @@ -427,18 +484,98 @@ List<Message> processFormat7(CanFrame f) {
* @return CAN frames (one or more) representing that message
*/
public List<OpenLcbCanFrame> processMessage(Message msg) {
// Checks and flushes unblocked messages first.
List<Message> pending = null;
synchronized (blockedMessages) {
if (!unblockedMessages.isEmpty()) {
pending = unblockedMessages;
unblockedMessages = new ArrayList<>();
}
}
List<OpenLcbCanFrame> r = null;
if (pending != null) {
r = new ArrayList<>();
for (Message m : pending) {
FrameBuilder f = new FrameBuilder();
r.addAll(f.convert(m));
}
}

// Processes the new message.
FrameBuilder f = new FrameBuilder();
return f.convert(msg);
if (r == null) {
return f.convert(msg);
} else {
r.addAll(f.convert(msg));
return r;
}
}


/// @return a message that can be enqueued but does not turn into any outgoing frame. The
// interface can use this message to wake up its internal threads when we detected that
// blocked messages need to be sent out.
public Message getTriggerMessage() {
return new NullMessage();
}

/// Message that we can enqueue to wake up the outgoing frame builder.
private class NullMessage extends Message {
@Override
public int getMTI() {
return 0;
}
};

private class FrameBuilder extends org.openlcb.MessageDecoder {
/**
* Verifies that we know the destination alias for an addressed message. If it is not
* known, then generates a lookup frame, and enqueues the message for a later send.
* @param m message to send.
* @return true if this message can be sent. False if this message was enqueued for a
* later send instead.
*/
private boolean checkForDestinationAndQueue(Message m) {
if (!(m instanceof AddressedMessage)) {
// not addressed -> send immediately
return true;
}
AddressedMessage am = (AddressedMessage)m;
if (map.getAlias(am.getDestNodeID()) > 0) {
// Have destination alias -> send immediately
return true;
}
// We don't know the destination alias.

// Sends a node id verify message.
VerifyNodeIDNumberMessage om = new VerifyNodeIDNumberMessage(m.getSourceNodeID(),
((AddressedMessage) m).getDestNodeID());
handleVerifyNodeIDNumber(om, null);

// Enqueues the outgoing message.
synchronized (blockedMessages) {
List<BlockedMessage> bl = blockedMessages.get(am.getDestNodeID());
if (bl == null) {
bl = new ArrayList<>();
blockedMessages.put(am.getDestNodeID(), bl);
}

bl.add(new BlockedMessage(m));
}

// Tells the caller to skip sending this message now.
return false;
}

/**
* Catches messages that are not explicitly
* handled and throws an error
*/
@Override
protected void defaultHandler(Message msg, Connection sender) {
if (msg instanceof NullMessage) {
// This should not turn into any outgoing frames.
return;
}
if (msg instanceof AddressedPayloadMessage) {
handleAddressedPayloadMessage((AddressedPayloadMessage)msg, sender);
} else {
Expand All @@ -458,6 +595,9 @@ List<OpenLcbCanFrame> convert(Message msg) {
}

private void handleAddressedPayloadMessage(AddressedPayloadMessage msg, Connection sender) {
if (!checkForDestinationAndQueue(msg)) {
return;
}
byte[] payload = msg.getPayload();
if (payload == null) {
payload = new byte[0];
Expand Down Expand Up @@ -625,6 +765,9 @@ public void handleLearnEvent(LearnEventMessage msg, Connection sender){
*/
@Override
public void handleDatagram(DatagramMessage msg, Connection sender){
if (!checkForDestinationAndQueue(msg)) {
return;
}
// must loop over data to send 8 byte chunks
int remains = msg.getData().length;
int j = 0;
Expand Down Expand Up @@ -652,7 +795,9 @@ public void handleDatagram(DatagramMessage msg, Connection sender){
*/
@Override
public void handleStreamDataSend(StreamDataSendMessage msg, Connection sender){
// dph
if (!checkForDestinationAndQueue(msg)) {
return;
}
// must loop over data to send 8 byte chunks
int remains = msg.getData().length;
int j = 0;
Expand Down
31 changes: 30 additions & 1 deletion test/org/openlcb/can/AliasMapTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,34 @@ public void testAfterAMR() {
Assert.assertEquals("get Alias", -1, map.getAlias(new NodeID(new byte[]{0,1,2,3,4,5})));
Assert.assertEquals("get NodeID", new NodeID(), map.getNodeID(0));
}


@Test
public void testWatcher() {
AliasMap map = new AliasMap();

NodeID nid = new NodeID(new byte[]{1,2,3,4,5,6});
int a = 432;

final boolean[] found = {false};
map.addWatcher(new AliasMap.Watcher() {
@Override
public void aliasAdded(NodeID id, int alias) {
found[0] = true;
Assert.assertEquals(nid, id);
Assert.assertEquals(a, alias);
}
});

map.insert(a, nid);
Assert.assertTrue(found[0]);

found[0] = false;

OpenLcbCanFrame f = new OpenLcbCanFrame(a);
f.setInitializationComplete(a, nid);
map.processFrame(f);

Assert.assertTrue(found[0]);
}

}
Loading

0 comments on commit 3b976a1

Please sign in to comment.