Skip to content

Commit

Permalink
EHN: UDP socket group join issue, examples for udp is working.
Browse files Browse the repository at this point in the history
  • Loading branch information
leochan2009 committed Jul 13, 2018
1 parent aa1ea65 commit 8ac5ca2
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 128 deletions.
2 changes: 1 addition & 1 deletion Examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ if (${OpenIGTLink_PROTOCOL_VERSION} GREATER 1)
Capability
Trajectory
SessionManager
#TrackingDataUDPTransfer
TrackingDataUDPTransfer
#SampleUDPProgam
)
endif (${OpenIGTLink_PROTOCOL_VERSION} GREATER 1)
Expand Down
15 changes: 10 additions & 5 deletions Examples/SampleUDPProgam/ServerUDPTransfer.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,15 @@

#include <string.h>

#define BUFLEN 512
#define NPACK 10000
#define PORT 9930

#ifndef BUFLEN
#define BUFLEN 512
#endif
#ifndef NPACK
#define NPACK 10000
#endif
#ifndef PORT
#define PORT 9930
#endif
void diep(char *s)
{
perror(s);
Expand Down Expand Up @@ -50,4 +55,4 @@ int main(void)

close(s);
return 0;
}
}
132 changes: 36 additions & 96 deletions Examples/TrackingDataUDPTransfer/TrackingDataClientUDPTransfer.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -26,131 +26,71 @@
#include "igtlMessageRTPWrapper.h"
#include "igtlUDPClientSocket.h"

class ReorderBuffer
{
public:
ReorderBuffer(){firstPaketPos=0;filledPaketNum=0;receivedLastFrag=false;receivedFirstFrag==false;};
~ReorderBuffer(){};
unsigned char buffer[RTP_PAYLOAD_LENGTH*64]; // we use 6 bits for fragment number.
uint32_t firstPaketPos;
uint32_t filledPaketNum;
bool receivedLastFrag;
bool receivedFirstFrag;
};

int ReceiveTrackingData(igtl::TrackingDataMessage::Pointer& msgData);

int main(int argc, char* argv[])
{
//------------------------------------------------------------
// Parse Arguments

if (argc != 4) // check number of arguments
if (argc != 2) // check number of arguments
{
// If not correct, print usage
std::cerr << "Usage: " << argv[0] << " <hostname> <port> <fps>" << std::endl;
std::cerr << " <hostname> : IP or host name" << std::endl;
std::cerr << " <port> : Port # (18944 in Slicer default)" << std::endl;
std::cerr << " <fps> : Frequency (fps) to send coordinate" << std::endl;
std::cerr << " <port> : Port # (18944 or 18945 in Slicer default)" << std::endl;
exit(0);
}

char* hostname = argv[1];
int port = atoi(argv[2]);
double fps = atof(argv[3]);
int interval = (int) (1000.0 / fps);
int port = atoi(argv[1]);

//------------------------------------------------------------
// Establish Connection

igtl::UDPClientSocket::Pointer socket;
socket = igtl::UDPClientSocket::New();
socket->SetIPAddress("127.0.0.1");
socket->SetPortNumber(port);
socket->CreateUDPClient(port);
int success = socket->JoinNetwork("127.0.0.1", port, 1);
if (success<0)
{
std::cerr << "unable to join network, check if your local machine joined the host more than once. " << std::endl;
exit(0);
}
unsigned char* bufferPKT = new unsigned char[RTP_PAYLOAD_LENGTH+RTP_HEADER_LENGTH];
igtl::MessageRTPWrapper::Pointer rtpWrapper = igtl::MessageRTPWrapper::New();
igtl::TrackingDataMessage::Pointer trackingMultiPKTMSG = igtl::TrackingDataMessage::New();
//std::vector<ReorderBuffer> reorderBufferVec(10, ReorderBuffer();
ReorderBuffer reorderBuffer = ReorderBuffer();
igtl::SimpleMutexLock* glock = igtl::SimpleMutexLock::New();
int loop = 0;
for (loop = 0; loop<100; loop++)
{
int totMsgLen = socket->ReadSocket(bufferPKT, RTP_PAYLOAD_LENGTH+RTP_HEADER_LENGTH);
if (totMsgLen>12)
{
// Set up the RTP header:
igtl_uint32 rtpHdr, timeIncrement;
rtpHdr = *((igtl_uint32*)bufferPKT);
//bool rtpMarkerBit = (rtpHdr&0x00800000) != 0;
timeIncrement = *(igtl_uint32*)(bufferPKT+4);
igtl_uint32 SSRC = *(igtl_uint32*)(bufferPKT+8);
if(igtl_is_little_endian())
rtpWrapper->PushDataIntoPacketBuffer(bufferPKT, totMsgLen);
rtpWrapper->UnWrapPacketWithTypeAndName("TDATA", "Tracker");
glock->Lock();
unsigned int messageNum = rtpWrapper->unWrappedMessages.size();
glock->Unlock();
if(messageNum)// to do: glock this session
{
rtpHdr = BYTE_SWAP_INT32(rtpHdr);
timeIncrement = BYTE_SWAP_INT32(timeIncrement);
SSRC = BYTE_SWAP_INT32(SSRC);
}
int curPackedMSGLocation = RTP_HEADER_LENGTH;
while(curPackedMSGLocation<totMsgLen)
{
igtl_uint8 fragmentNumber = *(bufferPKT + curPackedMSGLocation);
curPackedMSGLocation++;
igtl::MessageHeader::Pointer header = igtl::MessageHeader::New();
header->AllocatePack();
memcpy(header->GetPackPointer(), bufferPKT + curPackedMSGLocation, IGTL_HEADER_SIZE);
curPackedMSGLocation += IGTL_HEADER_SIZE;
header->Unpack();
if(fragmentNumber==0X00) // fragment doesn't exist
{

if (strcmp(header->GetDeviceType(),"TDATA")==0)
{
igtl::TrackingDataMessage::Pointer trackingMSG = igtl::TrackingDataMessage::New();
trackingMSG->SetMessageHeader(header);
trackingMSG->AllocatePack();
memcpy(trackingMSG->GetPackBodyPointer(), bufferPKT + curPackedMSGLocation, header->GetBodySizeToRead());
}
curPackedMSGLocation += header->GetBodySizeToRead();
}
else
igtl::TrackingDataMessage::Pointer trackingMultiPKTMSG = igtl::TrackingDataMessage::New();
glock->Lock();
std::map<igtl_uint32, igtl::UnWrappedMessage*>::iterator it = rtpWrapper->unWrappedMessages.begin();
igtlUint8 * message = new igtlUint8[it->second->messageDataLength];
int MSGLength = it->second->messageDataLength;
memcpy(message, it->second->messagePackPointer, it->second->messageDataLength);
delete it->second;
it->second = NULL;
rtpWrapper->unWrappedMessages.erase(it);
glock->Unlock();
igtl::MessageHeader::Pointer header = igtl::MessageHeader::New();
header->InitPack();
memcpy(header->GetPackPointer(), message, IGTL_HEADER_SIZE);
header->Unpack();
trackingMultiPKTMSG->SetMessageHeader(header);
trackingMultiPKTMSG->AllocateBuffer();
if (MSGLength == trackingMultiPKTMSG->GetPackSize())
{
if (strcmp(header->GetDeviceType(),"TDATA")==0)
{
int bodyMsgLength = (RTP_PAYLOAD_LENGTH-IGTL_HEADER_SIZE-1);//this is the length of the body within a full fragment paket
int totFragNumber = -1;
if(fragmentNumber==0X80)// To do, fix the issue when later fragment arrives earlier than the beginning fragment
{
trackingMultiPKTMSG->SetMessageHeader(header);
trackingMultiPKTMSG->AllocatePack();
memcpy(reorderBuffer.buffer, bufferPKT + curPackedMSGLocation, totMsgLen-curPackedMSGLocation);
reorderBuffer.firstPaketPos = totMsgLen-curPackedMSGLocation;
}
else if(fragmentNumber>0XE0)// this is the last fragment
{
totFragNumber = fragmentNumber - 0XE0 + 1;
memcpy(reorderBuffer.buffer+reorderBuffer.firstPaketPos+(totFragNumber-2)*bodyMsgLength, bufferPKT + RTP_HEADER_LENGTH+IGTL_HEADER_SIZE+1, totMsgLen-(RTP_HEADER_LENGTH+IGTL_HEADER_SIZE+1));
reorderBuffer.receivedLastFrag = true;
}
else
{
int curFragNumber = fragmentNumber - 0X80;
memcpy(reorderBuffer.buffer+reorderBuffer.firstPaketPos+(curFragNumber-1)*bodyMsgLength, bufferPKT + RTP_HEADER_LENGTH+IGTL_HEADER_SIZE+1, totMsgLen-(RTP_HEADER_LENGTH+IGTL_HEADER_SIZE+1));
}
reorderBuffer.filledPaketNum++;
if(reorderBuffer.receivedLastFrag == true && reorderBuffer.filledPaketNum == totFragNumber)
{
memcpy(trackingMultiPKTMSG->GetPackBodyPointer(), reorderBuffer.buffer, header->GetBodySizeToRead());
ReceiveTrackingData(trackingMultiPKTMSG);
reorderBuffer.filledPaketNum = 0;
}
}
break;
memcpy(trackingMultiPKTMSG->GetPackPointer(), message, MSGLength);
ReceiveTrackingData(trackingMultiPKTMSG);
}
}
}
igtl::Sleep(interval);
}
}


Expand All @@ -164,7 +104,7 @@ int ReceiveTrackingData(igtl::TrackingDataMessage::Pointer& msgData)

// Deserialize the transform data
// If you want to skip CRC check, call Unpack() without argument.
int c = trackingData->Unpack(1);
int c = trackingData->Unpack(0);

if (c & igtl::MessageHeader::UNPACK_BODY) // if CRC check is OK
{
Expand Down
34 changes: 12 additions & 22 deletions Examples/TrackingDataUDPTransfer/TrackingDataServerUDPTransfer.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,19 @@ int main(int argc, char* argv[])
//------------------------------------------------------------
// Parse Arguments

if (argc != 2) // check number of arguments
if (argc != 1) // check number of arguments
{
// If not correct, print usage
std::cerr << "Usage: " << argv[0] << " <port>" << std::endl;
std::cerr << " <port> : Port # (18944 in Slicer default)" << std::endl;
std::cerr << "Usage: No augments" << std::endl;
exit(0);
}

int port = atoi(argv[1]);
igtl::UDPServerSocket::Pointer serverSocket;
serverSocket = igtl::UDPServerSocket::New();
int r = serverSocket->CreateUDPServer(port);
int r = serverSocket->CreateUDPServer();
serverSocket->AddClient("127.0.0.1", 18944, 1);
serverSocket->AddClient("127.0.0.1", 18945, 1);

if (r < 0)
{
std::cerr << "Cannot create a server socket." << std::endl;
Expand All @@ -64,7 +64,7 @@ int main(int argc, char* argv[])
igtl::MessageRTPWrapper::Pointer rtpWrapper = igtl::MessageRTPWrapper::New();
//------------------------------------------------------------
// loop
for (int i = 0;i<100;i++)
for (int i = 0;i<1000;i++)
{
WrapMessage(serverSocket, rtpWrapper);
}
Expand All @@ -81,7 +81,7 @@ void WrapMessage(igtl::UDPServerSocket::Pointer serverSocket, igtl::MessageRTPWr
//------------------------------------------------------------
// Get user data
igtl::MutexLock::Pointer glock = igtl::MutexLock::New();
long interval = 5000;
long interval = 500;
std::cerr << "Interval = " << interval << " (ms)" << std::endl;
//long interval = 1000;
//long interval = (id + 1) * 100; // (ms)
Expand Down Expand Up @@ -153,22 +153,12 @@ int SendTrackingData(igtl::UDPServerSocket::Pointer& socket, igtl::TrackingDataM

trackingMsg->Pack();
rtpWrapper->SetSSRC(1);
int status = igtl::MessageRTPWrapper::PaketReady;
igtl_uint8* messagePointer = (igtl_uint8*)trackingMsg->GetPackBodyPointer();
int status = igtl::MessageRTPWrapper::PacketReady;
igtl_uint8* messagePointer = (igtl_uint8*)trackingMsg->GetPackPointer();
rtpWrapper->SetMSGHeader((igtl_uint8*)trackingMsg->GetPackPointer());
int messageLength = trackingMsg->GetPackBodySize();
do
{
status = rtpWrapper->WrapMessage(messagePointer, messageLength);
if (status == igtl::MessageRTPWrapper::WaitingForFragment || status == igtl::MessageRTPWrapper::PaketReady)
{
socket->WriteSocket(rtpWrapper->GetPackPointer(), rtpWrapper->GetPackedMSGLocation());
messagePointer += rtpWrapper->GetCurMSGLocation();
messageLength = trackingMsg->GetPackBodySize() - rtpWrapper->GetCurMSGLocation();
}
}while(status!=igtl::MessageRTPWrapper::PaketReady);
socket->WriteSocket(rtpWrapper->GetPackPointer(), RTP_PAYLOAD_LENGTH+RTP_HEADER_LENGTH);

int messageLength = trackingMsg->GetPackSize();
status = rtpWrapper->WrapMessageAndSend(socket, messagePointer, messageLength);

phi0 += 0.1;
phi1 += 0.2;
phi2 += 0.3;
Expand Down
2 changes: 1 addition & 1 deletion Source/VideoStreaming/igtlVideoStreamIGTLinkServer.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ int VideoStreamIGTLinkServer::StartUDPServer ()
{
this->serverUDPSocket->CloseSocket();
}
r = this->serverUDPSocket->CreateUDPServer(this->serverPortNumber);
r = this->serverUDPSocket->CreateUDPServer();
if (r < 0)
{
std::cerr << "Cannot create a server socket." << std::endl;
Expand Down
2 changes: 1 addition & 1 deletion Source/igtlUDPClientSocket.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ int UDPClientSocket::JoinNetwork(const char* groupIPAddr, int portNum, bool join
{
this->SetIPAddress(groupIPAddr);
this->SetPortNumber(portNum);
this->m_SocketDescriptor = this->CreateUDPClientSocket();
this->SetJoinGroup(joinGroup);
this->m_SocketDescriptor = this->CreateUDPClientSocket();
return this->m_SocketDescriptor;
}

Expand Down
2 changes: 1 addition & 1 deletion Source/igtlUDPServerSocket.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ int UDPServerSocket::WriteSocket(unsigned char* buffer, unsigned bufferSize)
return numByteSend;
}

int UDPServerSocket::CreateUDPServer(int port)
int UDPServerSocket::CreateUDPServer()
{
if (this->m_SocketDescriptor != -1)
{
Expand Down
2 changes: 1 addition & 1 deletion Source/igtlUDPServerSocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class IGTLCommon_EXPORT UDPServerSocket : public GeneralSocket
// Description:
// Creates a UDP server socket at a given port and binds to it.
// Returns -1 on error. 0 on success.
int CreateUDPServer(int port);
int CreateUDPServer();

// Desciption:
// Write the data to all clients
Expand Down

0 comments on commit 8ac5ca2

Please sign in to comment.