-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
2 changed files
with
175 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
174 changes: 174 additions & 0 deletions
174
src/main/java/com/windhoverlabs/yamcs/tctm/UdpTmDataLink.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,174 @@ | ||
package com.windhoverlabs.yamcs.tctm; | ||
|
||
import java.io.IOException; | ||
import java.net.DatagramPacket; | ||
import java.net.DatagramSocket; | ||
import java.net.SocketException; | ||
import org.yamcs.ConfigurationException; | ||
import org.yamcs.TmPacket; | ||
import org.yamcs.YConfiguration; | ||
import org.yamcs.tctm.AbstractTmDataLink; | ||
|
||
/** | ||
* Receives telemetry packets via UDP. One UDP datagram = one TM packet. | ||
* | ||
* <p>Options: | ||
* | ||
* <ul> | ||
* <li>{@code port} - the UDP port to listen to | ||
* <li>{@code maxLength} - the maximum length of the datagram (and thus the TM packet length + | ||
* initialBytesToStrip). If a datagram longer than this size will be received, it will be | ||
* truncated. Default: 1500 (bytes) | ||
* <li>{@code initialBytesToStrip} - if configured, skip that number of bytes from the beginning | ||
* of the datagram. Default: 0 | ||
* </ul> | ||
*/ | ||
public class UdpTmDataLink extends AbstractTmDataLink implements Runnable { | ||
private volatile int invalidDatagramCount = 0; | ||
|
||
private DatagramSocket tmSocket; | ||
private int port; | ||
|
||
static final int MAX_LENGTH = 1500; | ||
DatagramPacket datagram; | ||
int maxLength; | ||
int initialBytesToStrip; | ||
int rcvBufferSize; | ||
|
||
/** | ||
* Creates a new UDP TM Data Link | ||
* | ||
* @throws ConfigurationException if port is not defined in the configuration | ||
*/ | ||
@Override | ||
public void init(String instance, String name, YConfiguration config) | ||
throws ConfigurationException { | ||
super.init(instance, name, config); | ||
port = config.getInt("port"); | ||
maxLength = config.getInt("maxLength", MAX_LENGTH); | ||
initialBytesToStrip = config.getInt("initialBytesToStrip", 0); | ||
rcvBufferSize = config.getInt("rcvBufferSize", 0); | ||
datagram = new DatagramPacket(new byte[maxLength], maxLength); | ||
} | ||
|
||
@Override | ||
public void doStart() { | ||
if (!isDisabled()) { | ||
try { | ||
tmSocket = new DatagramSocket(port); | ||
if (rcvBufferSize > 0) { | ||
tmSocket.setReceiveBufferSize(rcvBufferSize); | ||
} | ||
Thread thread = new Thread(this); | ||
thread.setName("UdpTmDataLink-" + linkName); | ||
thread.start(); | ||
} catch (SocketException e) { | ||
notifyFailed(e); | ||
} | ||
} | ||
notifyStarted(); | ||
} | ||
|
||
@Override | ||
public void doStop() { | ||
if (tmSocket != null) { | ||
tmSocket.close(); | ||
} | ||
notifyStopped(); | ||
} | ||
|
||
@Override | ||
public void run() { | ||
while (isRunningAndEnabled()) { | ||
TmPacket tmpkt = getNextPacket(); | ||
if (tmpkt != null) { | ||
processPacket(tmpkt); | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Called to retrieve the next packet. It blocks in readining on the multicast socket | ||
* | ||
* @return anything that looks as a valid packet, just the size is taken into account to decide if | ||
* it's valid or not | ||
*/ | ||
public TmPacket getNextPacket() { | ||
byte[] packet = null; | ||
|
||
while (isRunning()) { | ||
try { | ||
tmSocket.receive(datagram); | ||
int pktLength = datagram.getLength() - initialBytesToStrip; | ||
|
||
if (pktLength <= 0) { | ||
log.warn( | ||
"received datagram of size {} <= {} (initialBytesToStrip); ignored.", | ||
datagram.getLength(), | ||
initialBytesToStrip); | ||
invalidDatagramCount++; | ||
continue; | ||
} | ||
|
||
updateStats(datagram.getLength()); | ||
packet = new byte[pktLength]; | ||
System.arraycopy( | ||
datagram.getData(), datagram.getOffset() + initialBytesToStrip, packet, 0, pktLength); | ||
break; | ||
} catch (IOException e) { | ||
if (!isRunning() | ||
|| isDisabled()) { // the shutdown or disable will close the socket and that will | ||
// generate an exception | ||
// which we ignore here | ||
return null; | ||
} | ||
log.warn("exception thrown when reading from the UDP socket at port {}", port, e); | ||
} | ||
} | ||
|
||
if (packet != null) { | ||
TmPacket tmPacket = new TmPacket(timeService.getMissionTime(), packet); | ||
tmPacket.setEarthRceptionTime(timeService.getHresMissionTime()); | ||
return packetPreprocessor.process(tmPacket); | ||
} else { | ||
return null; | ||
} | ||
} | ||
|
||
/** returns statistics with the number of datagram received and the number of invalid datagrams */ | ||
@Override | ||
public String getDetailedStatus() { | ||
if (isDisabled()) { | ||
return "DISABLED"; | ||
} else { | ||
return String.format( | ||
"OK (%s) %nValid datagrams received: %d%nInvalid datagrams received: %d", | ||
port, packetCount.get(), invalidDatagramCount); | ||
} | ||
} | ||
|
||
/** Sets the disabled to true such that getNextPacket ignores the received datagrams */ | ||
@Override | ||
public void doDisable() { | ||
if (tmSocket != null) { | ||
tmSocket.close(); | ||
tmSocket = null; | ||
} | ||
} | ||
|
||
/** | ||
* Sets the disabled to false such that getNextPacket does not ignore the received datagrams | ||
* | ||
* @throws SocketException | ||
*/ | ||
@Override | ||
public void doEnable() throws SocketException { | ||
tmSocket = new DatagramSocket(port); | ||
new Thread(this).start(); | ||
} | ||
|
||
@Override | ||
protected Status connectionStatus() { | ||
return Status.OK; | ||
} | ||
} |