From 3b976a14244a92b1946e211af746e2cc21a99d7d Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Mon, 7 Mar 2022 23:09:39 +0100 Subject: [PATCH] Implements sending message to unknown alias (#197) This PR adds code to support the case when an outgoing message addresses a Node for which we do not have a valid alias in the AliasMap. This can happen in multiple cases, some more frequent than others: - There was a network partition thus we did not see that node at the time when we pre-filled the alias map after startup. - The node purposefully released its alias using an AMR frame (this is standards compliant). - The target node is a virtual node that does not yet exist, such as a train node operated by a command station. The mechanism to do this is the following: - The AliasMap has a new API to listen to all(...) calls. The MessageBuilder sings up for getting notifications like this. - The MessageBuilder class, when seeing a miss in the AliasMap lookup for an outgoing addressed message, will put this message into a queue by destination ID. It emits a global verify node ID for the given target node ID instead of the specific message. - Normally, the target node answers this, and that answer initializes its entry in the AliasMap. - When this new alias is added to the AliasMap, the MessageBuilder is invoked and takes the pending messages out of the queue ("unblocked messages"). - Upon sending *any* outgoing message, the MessageBuilder first flushes any unblocked messages it has in the queue. - An additional mechanism is added to MessageBuilder and CanInterface that allows waking up the output message processing thread when an unblocked message is found ("trigger message"). This is needed because the openlcb library does not make an assumption on what the threading model of the application is. Misc fixes: - makes AliasMap thread-safe. - Fixes some comments. === * Makes the alias map thread safe. Adds a listener API to alias map to know when new aliases are added. * fixup comment. * Makes the MessageBuilder stash away addressed messages where we don't know the destination alias yet. Instead, we emit a VerifyNodeId message. These pending messages are released automatically when the alias definition of the seeked node arrives. Adds a "null message". This message does not turn into any outgoing frame. * Generates a trigger message when an unblocked message is found after an incoming frame. --- src/org/openlcb/can/AliasMap.java | 30 +++- src/org/openlcb/can/CanInterface.java | 7 +- src/org/openlcb/can/MessageBuilder.java | 155 ++++++++++++++++++- test/org/openlcb/can/AliasMapTest.java | 31 +++- test/org/openlcb/can/MessageBuilderTest.java | 54 ++++++- 5 files changed, 262 insertions(+), 15 deletions(-) diff --git a/src/org/openlcb/can/AliasMap.java b/src/org/openlcb/can/AliasMap.java index 7f2505b0..bf58927a 100644 --- a/src/org/openlcb/can/AliasMap.java +++ b/src/org/openlcb/can/AliasMap.java @@ -2,6 +2,8 @@ import org.openlcb.NodeID; +import java.util.ArrayList; + /** * Maintains a 2-way map between nodes and CAN node ID aliases. *

@@ -16,7 +18,18 @@ public AliasMap() { } java.util.HashMap iMap = new java.util.HashMap(); java.util.HashMap nMap = new java.util.HashMap(); - + java.util.List watchers = new ArrayList<>(); + + /// This interface allows an external component to watch for newly discovered aliases. + public interface Watcher { + /// Called when a new alias was discovered. + void aliasAdded(NodeID id, int alias); + } + + public synchronized void addWatcher(Watcher w) { + watchers.add(w); + } + public void processFrame(OpenLcbCanFrame f) { // check type if (f.isInitializationComplete() || f.isVerifiedNID() || f.isAliasMapDefinition()) { @@ -30,23 +43,28 @@ public void processFrame(OpenLcbCanFrame f) { } public void insert(int alias, NodeID nid) { - nMap.put(alias, nid); - iMap.put(nid, alias); + synchronized (this) { + nMap.put(alias, nid); + iMap.put(nid, alias); + } + for (Watcher w: watchers) { + w.aliasAdded(nid, alias); + } } - public void remove(int alias) { + public synchronized void remove(int alias) { NodeID nid = getNodeID(alias); if (nid == null) return; nMap.remove(alias); iMap.remove(nid); } - public NodeID getNodeID(int alias) { + public synchronized NodeID getNodeID(int alias) { NodeID retVal = nMap.get(Integer.valueOf(alias)); if (retVal != null) return retVal; else return new NodeID(); } - public int getAlias(NodeID nid) { + public synchronized int getAlias(NodeID nid) { Integer r = iMap.get(nid); if (r == null) return -1; else return r.intValue(); diff --git a/src/org/openlcb/can/CanInterface.java b/src/org/openlcb/can/CanInterface.java index 03b0bf9c..f73842e0 100644 --- a/src/org/openlcb/can/CanInterface.java +++ b/src/org/openlcb/can/CanInterface.java @@ -16,7 +16,7 @@ /** * CanInterface collects all objects necessary to operate a standards-compliant node that connects - * via CAN-bus. + * via CAN-bus. It creates the OlcbInterface internally. * * Created by bracz on 12/27/15. */ @@ -64,7 +64,7 @@ public CanInterface(NodeID interfaceId, CanFrameListener frameOutput, ThreadPool this.nodeId = interfaceId; // Creates high-level OpenLCB interface. - olcbInterface = new OlcbInterface(nodeId, frameRenderer,threadPool); + olcbInterface = new OlcbInterface(nodeId, frameRenderer, threadPool); // Creates CAN-level OpenLCB objects. aliasMap = new AliasMap(); @@ -150,6 +150,9 @@ public void send(CanFrame frame) { aliasWatcher.send(frame); aliasMap.processFrame(new OpenLcbCanFrame(frame)); List l = messageBuilder.processFrame(frame); + if (messageBuilder.foundUnblockedMessage()) { + olcbInterface.getOutputConnection().put(messageBuilder.getTriggerMessage(), null); + } if (l == null) return; for (Message m : l) { olcbInterface.getInputConnection().put(m, null); diff --git a/src/org/openlcb/can/MessageBuilder.java b/src/org/openlcb/can/MessageBuilder.java index 88eddbd3..97c3ba1a 100644 --- a/src/org/openlcb/can/MessageBuilder.java +++ b/src/org/openlcb/can/MessageBuilder.java @@ -3,6 +3,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.logging.Level; import java.util.logging.Logger; import org.openlcb.*; @@ -23,7 +24,7 @@ * @author Bob Jacobsen Copyright 2010 * @version $Revision$ */ -public class MessageBuilder { +public class MessageBuilder implements AliasMap.Watcher { private final static Logger logger = Logger.getLogger(MessageBuilder.class.getName()); /** @@ -33,10 +34,66 @@ public class MessageBuilder { */ public MessageBuilder(AliasMap map) { this.map = map; + map.addWatcher(this); } AliasMap map; - + + + private static class BlockedMessage { + BlockedMessage(Message m) { + message = m; + } + /// Addressed message that was sent earlier. + Message message; + /// When this message was sent. + long timestampMsec; + + /// Notifies that this message is now sent. This is not used yet; will be necessary to + // implement a linked list sorted by timestamp. + void release() {} + } + + /// Stores addressed messages where we don't know the destination alias to. + private final Map > blockedMessages = new HashMap<>(); + + /// Stores addressed messages that we recently got the destination alias. + private List unblockedMessages = new ArrayList<>(); + + /// This value is atomically set to true if we found new unblocked messages. Can be reset by + // a call to foundUnblockedMessage. + private boolean haveUnblockedMessages = false; + + /// Callback from the alias map when a new alias is inserted. This is used to determine if we + // are holding on to some messages that are unsent due to missing destination alias. + @Override + public void aliasAdded(NodeID id, int alias) { + if (alias <= 0) { + // Ignores invalid aliases. + return; + } + synchronized (blockedMessages) { + List l = blockedMessages.get(id); + if (l == null) { + return; + } + for (BlockedMessage bm: l) { + unblockedMessages.add(bm.message); + haveUnblockedMessages = true; + } + l.clear(); + } + } + + /// @return true exactly once after we found some unblocked messages. + boolean foundUnblockedMessage() { + synchronized (blockedMessages) { + boolean ret = haveUnblockedMessages; + haveUnblockedMessages = false; + return ret; + } + } + /** * Accept a frame, and convert to * a standard OpenLCB Message object. @@ -427,18 +484,98 @@ List processFormat7(CanFrame f) { * @return CAN frames (one or more) representing that message */ public List processMessage(Message msg) { + // Checks and flushes unblocked messages first. + List pending = null; + synchronized (blockedMessages) { + if (!unblockedMessages.isEmpty()) { + pending = unblockedMessages; + unblockedMessages = new ArrayList<>(); + } + } + List r = null; + if (pending != null) { + r = new ArrayList<>(); + for (Message m : pending) { + FrameBuilder f = new FrameBuilder(); + r.addAll(f.convert(m)); + } + } + // Processes the new message. FrameBuilder f = new FrameBuilder(); - return f.convert(msg); + if (r == null) { + return f.convert(msg); + } else { + r.addAll(f.convert(msg)); + return r; + } } - + + /// @return a message that can be enqueued but does not turn into any outgoing frame. The + // interface can use this message to wake up its internal threads when we detected that + // blocked messages need to be sent out. + public Message getTriggerMessage() { + return new NullMessage(); + } + + /// Message that we can enqueue to wake up the outgoing frame builder. + private class NullMessage extends Message { + @Override + public int getMTI() { + return 0; + } + }; + private class FrameBuilder extends org.openlcb.MessageDecoder { + /** + * Verifies that we know the destination alias for an addressed message. If it is not + * known, then generates a lookup frame, and enqueues the message for a later send. + * @param m message to send. + * @return true if this message can be sent. False if this message was enqueued for a + * later send instead. + */ + private boolean checkForDestinationAndQueue(Message m) { + if (!(m instanceof AddressedMessage)) { + // not addressed -> send immediately + return true; + } + AddressedMessage am = (AddressedMessage)m; + if (map.getAlias(am.getDestNodeID()) > 0) { + // Have destination alias -> send immediately + return true; + } + // We don't know the destination alias. + + // Sends a node id verify message. + VerifyNodeIDNumberMessage om = new VerifyNodeIDNumberMessage(m.getSourceNodeID(), + ((AddressedMessage) m).getDestNodeID()); + handleVerifyNodeIDNumber(om, null); + + // Enqueues the outgoing message. + synchronized (blockedMessages) { + List bl = blockedMessages.get(am.getDestNodeID()); + if (bl == null) { + bl = new ArrayList<>(); + blockedMessages.put(am.getDestNodeID(), bl); + } + + bl.add(new BlockedMessage(m)); + } + + // Tells the caller to skip sending this message now. + return false; + } + /** * Catches messages that are not explicitly * handled and throws an error */ @Override protected void defaultHandler(Message msg, Connection sender) { + if (msg instanceof NullMessage) { + // This should not turn into any outgoing frames. + return; + } if (msg instanceof AddressedPayloadMessage) { handleAddressedPayloadMessage((AddressedPayloadMessage)msg, sender); } else { @@ -458,6 +595,9 @@ List convert(Message msg) { } private void handleAddressedPayloadMessage(AddressedPayloadMessage msg, Connection sender) { + if (!checkForDestinationAndQueue(msg)) { + return; + } byte[] payload = msg.getPayload(); if (payload == null) { payload = new byte[0]; @@ -625,6 +765,9 @@ public void handleLearnEvent(LearnEventMessage msg, Connection sender){ */ @Override public void handleDatagram(DatagramMessage msg, Connection sender){ + if (!checkForDestinationAndQueue(msg)) { + return; + } // must loop over data to send 8 byte chunks int remains = msg.getData().length; int j = 0; @@ -652,7 +795,9 @@ public void handleDatagram(DatagramMessage msg, Connection sender){ */ @Override public void handleStreamDataSend(StreamDataSendMessage msg, Connection sender){ - // dph + if (!checkForDestinationAndQueue(msg)) { + return; + } // must loop over data to send 8 byte chunks int remains = msg.getData().length; int j = 0; diff --git a/test/org/openlcb/can/AliasMapTest.java b/test/org/openlcb/can/AliasMapTest.java index 3c20735c..436d26b5 100644 --- a/test/org/openlcb/can/AliasMapTest.java +++ b/test/org/openlcb/can/AliasMapTest.java @@ -51,5 +51,34 @@ public void testAfterAMR() { Assert.assertEquals("get Alias", -1, map.getAlias(new NodeID(new byte[]{0,1,2,3,4,5}))); Assert.assertEquals("get NodeID", new NodeID(), map.getNodeID(0)); } - + + @Test + public void testWatcher() { + AliasMap map = new AliasMap(); + + NodeID nid = new NodeID(new byte[]{1,2,3,4,5,6}); + int a = 432; + + final boolean[] found = {false}; + map.addWatcher(new AliasMap.Watcher() { + @Override + public void aliasAdded(NodeID id, int alias) { + found[0] = true; + Assert.assertEquals(nid, id); + Assert.assertEquals(a, alias); + } + }); + + map.insert(a, nid); + Assert.assertTrue(found[0]); + + found[0] = false; + + OpenLcbCanFrame f = new OpenLcbCanFrame(a); + f.setInitializationComplete(a, nid); + map.processFrame(f); + + Assert.assertTrue(found[0]); + } + } diff --git a/test/org/openlcb/can/MessageBuilderTest.java b/test/org/openlcb/can/MessageBuilderTest.java index 3b95c7ed..b0b80934 100644 --- a/test/org/openlcb/can/MessageBuilderTest.java +++ b/test/org/openlcb/can/MessageBuilderTest.java @@ -403,8 +403,60 @@ public void testDatagramOKMessageWithPayload() { testDecoding(m, list); } + @Test + public void testNullMessage() { + MessageBuilder b = new MessageBuilder(map); + Message m = b.getTriggerMessage(); + + List list = b.processMessage(m); + // No output from Trigger Message. + Assert.assertEquals("count", 0, list.size()); + } + + @Test + public void testDelayedMessage() { + MessageBuilder b = new MessageBuilder(map); + + NodeID unknownDst = new NodeID(new byte[]{2,2,2,2,2,2}); + Message m = new DatagramAcknowledgedMessage(source, unknownDst, 42); + + List list = b.processMessage(m); + + // Instead of the datagram OK message we get a verify node ID message. + Assert.assertEquals("count", 1, list.size()); + CanFrame f0 = list.get(0); + Assert.assertEquals("verify", toHexString(0x19490123), toHexString(f0.getHeader())); + compareContent(unknownDst.getContents(), f0); + + Assert.assertFalse(b.foundUnblockedMessage()); + + // Now the verify node id comes back. + OpenLcbCanFrame frame = new OpenLcbCanFrame(0x555); + frame.setInitializationComplete(0x575, unknownDst); + + b.processFrame(frame); + map.processFrame(frame); + + Assert.assertTrue(b.foundUnblockedMessage()); + + // Now sending any message will output the pending message. + Message mm = new IdentifyEventsGlobalMessage(source); + list = b.processMessage(mm); + + Assert.assertEquals("count", 2, list.size()); + f0 = list.get(0); + Assert.assertEquals("datagram-ack-header", toHexString(0x19A28123), + toHexString(f0.getHeader())); + compareContent(new byte[]{0x05, 0x75, 42}, f0); + + CanFrame f1 = list.get(1); + Assert.assertEquals("header", toHexString(0x19970123), toHexString(f1.getHeader())); + Assert.assertEquals("payload", 0, f1.getNumDataElements()); + compareContent(new byte[]{}, f1); + } + /** **************************************************** - * Tests of messages into frames + * Tests of frames into messages ***************************************************** */ @Test