Skip to content

Commit

Permalink
Comms: TCPLink threading updates
Browse files Browse the repository at this point in the history
  • Loading branch information
HTRamsey committed Oct 30, 2024
1 parent 617b651 commit b8d721e
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 140 deletions.
2 changes: 2 additions & 0 deletions src/Comms/LinkManager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ void LinkManager::_linkDisconnected()
for (auto it = _rgLinks.begin(); it != _rgLinks.end(); ++it) {
if (it->get() == link) {
qCDebug(LinkManagerLog) << "LinkManager::_linkDisconnected" << it->get()->linkConfiguration()->name() << it->use_count();
SharedLinkConfigurationPtr config = it->get()->linkConfiguration();
config->setLink(nullptr);
(void) _rgLinks.erase(it);
return;
}
Expand Down
267 changes: 135 additions & 132 deletions src/Comms/TCPLink.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,62 +18,25 @@
QGC_LOGGING_CATEGORY(TCPLinkLog, "qgc.comms.tcplink")

namespace {
constexpr int SEND_BUFFER_SIZE = 1024; // >= MAVLINK_MAX_PACKET_LEN
constexpr int RECEIVE_BUFFER_SIZE = 1024; // >= MAVLINK_MAX_PACKET_LEN
constexpr int READ_BUFFER_SIZE = 1024; // >= MAVLINK_MAX_PACKET_LEN
constexpr int CONNECT_TIMEOUT_MS = 1000;
constexpr int TYPE_OF_SERVICE = 32; // Optional: Set ToS for low delay
constexpr int MAX_RECONNECTION_ATTEMPTS = 5;
constexpr int TYPE_OF_SERVICE = 32; // Set ToS for low delay
constexpr int MAX_RECONNECTION_ATTEMPTS = 3;
}

TCPLink::TCPLink(SharedLinkConfigurationPtr &config, QObject *parent)
: LinkInterface(config, parent)
, _tcpConfig(qobject_cast<const TCPConfiguration*>(config.get()))
, _socket(new QTcpSocket())
TCPWorker::TCPWorker(const QString &host, quint16 port, QObject *parent)
: QObject(parent)
, _socket(new QTcpSocket(this))
, _host(host)
, _port(port)
{
// qCDebug(TCPLinkLog) << Q_FUNC_INFO << this;

Q_CHECK_PTR(_tcpConfig);
if (!_tcpConfig) {
qCWarning(TCPLinkLog) << "Invalid TCPConfiguration provided.";
emit communicationError(
tr("Configuration Error"),
tr("Link %1: Invalid TCP configuration.").arg(config->name())
);
return;
}

_socket->setSocketOption(QAbstractSocket::SendBufferSizeSocketOption, SEND_BUFFER_SIZE);
_socket->setSocketOption(QAbstractSocket::ReceiveBufferSizeSocketOption, RECEIVE_BUFFER_SIZE);
_socket->setReadBufferSize(READ_BUFFER_SIZE);
_socket->setSocketOption(QAbstractSocket::LowDelayOption, 1);
_socket->setSocketOption(QAbstractSocket::KeepAliveOption, 1);
// _socket->setSocketOption(QAbstractSocket::TypeOfServiceOption, TYPE_OF_SERVICE);

(void) QObject::connect(_socket, &QTcpSocket::connected, this, [this]() {
_isConnected = true;
_reconnectionAttempts = 0;
qCDebug(TCPLinkLog) << "TCP connected to" << _tcpConfig->host() << ":" << _tcpConfig->port();
emit connected();
}, Qt::AutoConnection);
_socket->setSocketOption(QAbstractSocket::TypeOfServiceOption, TYPE_OF_SERVICE);

(void) QObject::connect(_socket, &QTcpSocket::disconnected, this, [this]() {
_isConnected = false;
qCDebug(TCPLinkLog) << "TCP disconnected from" << _tcpConfig->host() << ":" << _tcpConfig->port();
emit disconnected();
// TODO: Uncomment after threading changes
// _attemptReconnection();
}, Qt::AutoConnection);

(void) QObject::connect(_socket, &QTcpSocket::readyRead, this, &TCPLink::_readBytes, Qt::AutoConnection);

(void) QObject::connect(_socket, &QTcpSocket::errorOccurred, this, [this](QTcpSocket::SocketError error) {
qCWarning(TCPLinkLog) << "TCP Link Error:" << error << _socket->errorString();
emit communicationError(
tr("TCP Link Error"),
tr("Link %1: %2.").arg(_tcpConfig->name(), _socket->errorString())
);
}, Qt::AutoConnection);
(void) connect(_socket, &QTcpSocket::connected, this, &TCPWorker::_onSocketConnected);
(void) connect(_socket, &QTcpSocket::disconnected, this, &TCPWorker::_onSocketDisconnected);
(void) connect(_socket, &QTcpSocket::readyRead, this, &TCPWorker::_onSocketReadyRead);
(void) connect(_socket, &QTcpSocket::errorOccurred, this, &TCPWorker::_onSocketErrorOccurred);

#ifdef QT_DEBUG
(void) QObject::connect(_socket, &QTcpSocket::stateChanged, this, [](QTcpSocket::SocketState state) {
Expand All @@ -86,132 +49,172 @@ TCPLink::TCPLink(SharedLinkConfigurationPtr &config, QObject *parent)
#endif
}

TCPLink::~TCPLink()
TCPWorker::~TCPWorker()
{
if (_socket->isOpen()) {
_socket->disconnectFromHost();
if (_socket->state() != QAbstractSocket::UnconnectedState) {
_socket->waitForDisconnected(CONNECT_TIMEOUT_MS);
}
}

_socket->deleteLater();

// qCDebug(TCPLinkLog) << Q_FUNC_INFO << this;
disconnectFromHost();
}

void TCPLink::disconnect()
bool TCPWorker::isConnected() const
{
if (isConnected()) {
_socket->disconnectFromHost();
} else {
emit disconnected();
}
return (_socket->isOpen() && (_socket->state() == QAbstractSocket::ConnectedState));
}

bool TCPLink::_connect()
void TCPWorker::connectToHost()
{
if (isConnected()) {
qCWarning(TCPLinkLog) << "Already connected to" << _tcpConfig->host() << ":" << _tcpConfig->port();
return true;
qCWarning(TCPLinkLog) << "Already connected to" << _host << ":" << _port;
return;
}

QSignalSpy errorSpy(_socket, &QTcpSocket::errorOccurred);

qCDebug(TCPLinkLog) << "Attempting to connect to host:" << _tcpConfig->host() << "port:" << _tcpConfig->port();
_socket->connectToHost(_tcpConfig->host(), _tcpConfig->port());
qCDebug(TCPLinkLog) << "Attempting to connect to host:" << _host << "port:" << _port;
_socket->connectToHost(_host, _port);

// TODO: Switch Blocking to Signals after Threading Changes
if (!_socket->waitForConnected(CONNECT_TIMEOUT_MS)) {
qCWarning(TCPLinkLog) << "Connection to" << _tcpConfig->host() << ":" << _tcpConfig->port() << "failed:" << _socket->errorString();
qCWarning(TCPLinkLog) << "Connection to" << _host << ":" << _port << "failed:" << _socket->errorString();
if (errorSpy.count() == 0) {
emit communicationError(
tr("TCP Link Connect Error"),
tr("Link %1: %2.").arg(_tcpConfig->name(), tr("Connection Failed: %1").arg(_socket->errorString()))
);
emit errorOccurred(tr("Connection Failed: %1").arg(_socket->errorString()));
}
return false;
_onSocketDisconnected();
}

qCDebug(TCPLinkLog) << "Successfully connected to" << _tcpConfig->host() << ":" << _tcpConfig->port();
return true;
qCDebug(TCPLinkLog) << "Successfully connected to" << _host.toString() << ":" << _port;
}

void TCPLink::_writeBytes(const QByteArray &bytes)
void TCPWorker::disconnectFromHost()
{
if (!_socket->isValid()) {
return;
if (isConnected()) {
_socket->disconnectFromHost();
}
}

static const QString title = tr("TCP Link Write Error");
void TCPWorker::writeData(const QByteArray &data)
{
if (isConnected()) {
qint64 totalBytesWritten = 0;
while (totalBytesWritten < data.size()) {
const qint64 bytesWritten = _socket->write(data.constData() + totalBytesWritten, data.size() - totalBytesWritten);
if (bytesWritten <= 0) {
break;
}

totalBytesWritten += bytesWritten;
}

if (!isConnected()) {
emit communicationError(
title,
tr("Link %1: Could Not Send Data - Link is Disconnected!").arg(_tcpConfig->name())
);
return;
if (totalBytesWritten < 0) {
emit errorOccurred(tr("Could Not Send Data - Write Failed: %1").arg(_socket->errorString()));
}
} else {
emit errorOccurred(tr("Socket is not connected"));
}
}

const qint64 bytesWritten = _socket->write(bytes);
if (bytesWritten < 0) {
emit communicationError(
title,
tr("Link %1: Could Not Send Data - Write Failed: %2").arg(_tcpConfig->name(), _socket->errorString())
);
return;
}
void TCPWorker::_onSocketConnected()
{
emit connected();
}

if (bytesWritten < bytes.size()) {
qCWarning(TCPLinkLog) << "Wrote" << bytesWritten << "Out of" << bytes.size() << "total bytes";
const QByteArray remainingBytes = bytes.mid(bytesWritten);
writeBytesThreadSafe(remainingBytes.constData(), remainingBytes.size());
}
void TCPWorker::_onSocketDisconnected()
{
emit disconnected();
}

emit bytesSent(this, bytes);
void TCPWorker::_onSocketReadyRead()
{
const QByteArray data = _socket->readAll();
emit dataReceived(data);
}

void TCPLink::_readBytes()
void TCPWorker::_onSocketErrorOccurred(QAbstractSocket::SocketError socketError)
{
if (!_socket->isValid()) {
return;
}
Q_UNUSED(socketError);
emit errorOccurred(_socket->errorString());
}

if (!isConnected()) {
/*===========================================================================*/

TCPLink::TCPLink(SharedLinkConfigurationPtr &config, QObject *parent)
: LinkInterface(config, parent)
, _tcpConfig(qobject_cast<const TCPConfiguration*>(config.get()))
, _worker(nullptr)
, _workerThread(new QThread(this))
{
Q_CHECK_PTR(_tcpConfig);
if (!_tcpConfig) {
qCWarning(TCPLinkLog) << "Invalid TCPConfiguration provided.";
emit communicationError(
tr("TCP Link Read Error"),
tr("Link %1: Could Not Read Data - Link is Disconnected!").arg(_tcpConfig->name())
tr("Configuration Error"),
tr("Link %1: Invalid TCP configuration.").arg(config->name())
);
return;
}

const QByteArray buffer = _socket->readAll();
_worker = new TCPWorker(_tcpConfig->host(), _tcpConfig->port());

if (buffer.isEmpty()) {
emit communicationError(
tr("TCP Link Read Error"),
tr("Link %1: Could Not Read Data - No Data Available!").arg(_tcpConfig->name())
);
return;
}
_worker->moveToThread(_workerThread);

(void) connect(_workerThread, &QThread::finished, _worker, &QObject::deleteLater);

(void) connect(_worker, &TCPWorker::connected, this, &TCPLink::_onConnected);
(void) connect(_worker, &TCPWorker::disconnected, this, &TCPLink::_onDisconnected);
(void) connect(_worker, &TCPWorker::errorOccurred, this, &TCPLink::_onErrorOccurred);
(void) connect(_worker, &TCPWorker::dataReceived, this, &TCPLink::_onDataReceived);

#ifdef QT_DEBUG
_workerThread->setObjectName(QString("TCP_%1").arg(config->name()));
#endif

_workerThread->start();
}

TCPLink::~TCPLink()
{
disconnect();

emit bytesReceived(this, buffer);
_workerThread->quit();
_workerThread->wait();
}

void TCPLink::_attemptReconnection()
bool TCPLink::isConnected() const
{
if (_reconnectionAttempts >= MAX_RECONNECTION_ATTEMPTS) {
qCWarning(TCPLinkLog) << "Max reconnection attempts reached for" << _tcpConfig->host() << ":" << _tcpConfig->port();
emit communicationError(tr("TCP Link Reconnect Error"), tr("Link %1: Maximum reconnection attempts reached.").arg(_tcpConfig->name()));
return;
}
return (_worker && _worker->isConnected());
}

bool TCPLink::_connect()
{
return QMetaObject::invokeMethod(_worker, "connectToHost", Qt::QueuedConnection);
}

void TCPLink::disconnect()
{
(void) QMetaObject::invokeMethod(_worker, "disconnectFromHost", Qt::QueuedConnection);
}

void TCPLink::_onConnected()
{
emit connected();
}

void TCPLink::_onDisconnected()
{
emit disconnected();
}

_reconnectionAttempts++;
const int delay = qPow(2, _reconnectionAttempts) * 1000; // Exponential backoff
qCDebug(TCPLinkLog) << "Attempting reconnection #" << _reconnectionAttempts << "in" << delay << "ms";
QTimer::singleShot(delay, this, [this]() {
_connect();
});
void TCPLink::_onErrorOccurred(const QString &errorString)
{
emit communicationError(tr("TCP Link Error"), tr("Link %1: %2").arg(_tcpConfig->name(), errorString));
}

void TCPLink::_onDataReceived(const QByteArray &data)
{
// Process data or emit signal
emit bytesReceived(this, data);
}

void TCPLink::_writeBytes(const QByteArray& bytes)
{
(void) QMetaObject::invokeMethod(_worker, "writeData", Qt::QueuedConnection, Q_ARG(QByteArray, bytes));
}

bool TCPLink::isSecureConnection()
Expand Down
Loading

0 comments on commit b8d721e

Please sign in to comment.