Skip to content

Commit

Permalink
Merge pull request #2174 from mavlink/pr-fix-burst
Browse files Browse the repository at this point in the history
core: fix burst implementation
  • Loading branch information
julianoes authored Nov 10, 2023
2 parents 54f643c + 9e385b6 commit 1c78f03
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 31 deletions.
124 changes: 112 additions & 12 deletions src/mavsdk/core/mavlink_ftp_client.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "mavlink_ftp_client.h"
#include "system_impl.h"
#include "plugin_base.h"
#include "unused.h"
#include <algorithm>
#include <fstream>
#include <filesystem>
Expand Down Expand Up @@ -180,7 +181,8 @@ void MavlinkFtpClient::process_mavlink_ftp_message(const mavlink_message_t& msg)
[&](DownloadBurstItem& item) {
if (payload->opcode == RSP_ACK) {
if (payload->req_opcode == CMD_OPEN_FILE_RO ||
payload->req_opcode == CMD_BURST_READ_FILE) {
payload->req_opcode == CMD_BURST_READ_FILE ||
payload->req_opcode == CMD_READ_FILE) {
// Whenever we do get an ack,
// reset the retry counter.
work->retries = RETRIES;
Expand Down Expand Up @@ -506,7 +508,7 @@ bool MavlinkFtpClient::download_burst_continue(
LogDebug() << "Burst Download continue, got file size: " << file_size;
}

request_next_burst(work, item);
request_burst(work, item);
return true;

} else if (payload->req_opcode == CMD_BURST_READ_FILE) {
Expand Down Expand Up @@ -535,8 +537,12 @@ bool MavlinkFtpClient::download_burst_continue(
item.transferred[i] = DownloadBurstItem::Transferred::Yes;
}

const size_t bytes_transferred = std::count(
item.transferred.begin(), item.transferred.end(), DownloadBurstItem::Transferred::Yes);
if (_debugging) {
LogDebug() << "Received " << payload->offset << " to "
<< payload->size + payload->offset;
}

const size_t bytes_transferred = burst_bytes_transferred(item);

if (bytes_transferred == item.transferred.size()) {
if (_debugging) {
Expand Down Expand Up @@ -567,8 +573,9 @@ bool MavlinkFtpClient::download_burst_continue(
static_cast<uint32_t>(item.transferred.size())});

if (payload->burst_complete) {
// The burst is supposedly complete but we still need data, so request next burst.
request_next_burst(work, item);
// The burst is supposedly complete but we still need data, so request next without
// burst.
request_next_rest(work, item);

} else {
// There might be more coming, just wait for now.
Expand All @@ -578,13 +585,88 @@ bool MavlinkFtpClient::download_burst_continue(
return true;
}

} else if (payload->req_opcode == CMD_READ_FILE) {
if (_debugging) {
LogWarn() << "Burst download continue missing pieces, write at " << payload->offset
<< " for " << std::to_string(payload->size);
}

item.ofstream.seekp(payload->offset);
if (item.ofstream.fail()) {
LogWarn() << "Seek failed";
item.callback(ClientResult::FileIoError, {});
return false;
}

item.ofstream.write(reinterpret_cast<const char*>(payload->data), payload->size);
if (!item.ofstream) {
item.callback(ClientResult::FileIoError, {});
return false;
}

// Keep track of what was written.
for (size_t i = payload->offset; i < payload->offset + payload->size; ++i) {
item.transferred[i] = DownloadBurstItem::Transferred::Yes;
}

const size_t bytes_transferred = burst_bytes_transferred(item);

if (_debugging) {
LogDebug() << "Written " << bytes_transferred << " of " << item.transferred.size()
<< " bytes";
}

if (bytes_transferred == item.transferred.size()) {
// Final step
work.last_opcode = CMD_TERMINATE_SESSION;

work.payload = {};
work.payload.seq_number = work.last_sent_seq_number++;
work.payload.session = _session;

work.payload.opcode = work.last_opcode;
work.payload.offset = 0;
work.payload.size = 0;

start_timer();
send_mavlink_ftp_message(work.payload);
return true;
} else {
item.callback(
ClientResult::Next,
ProgressData{
static_cast<uint32_t>(bytes_transferred),
static_cast<uint32_t>(item.transferred.size())});

request_next_rest(work, item);
return true;
}

} else {
LogErr() << "Unexpected req_opcode";
return false;
}
}

void MavlinkFtpClient::request_next_burst(Work& work, DownloadBurstItem& item)
void MavlinkFtpClient::request_burst(Work& work, DownloadBurstItem& item)
{
UNUSED(item);

work.last_opcode = CMD_BURST_READ_FILE;
work.payload = {};
work.payload.seq_number = work.last_sent_seq_number++;
work.payload.session = _session;
work.payload.opcode = work.last_opcode;
work.payload.offset = 0;

// Fill up the whole packet.
work.payload.size = max_data_length;

start_timer();
send_mavlink_ftp_message(work.payload);
}

void MavlinkFtpClient::request_next_rest(Work& work, DownloadBurstItem& item)
{
const auto first_missing = std::find(
item.transferred.begin(), item.transferred.end(), DownloadBurstItem::Transferred::No);
Expand All @@ -597,6 +679,7 @@ void MavlinkFtpClient::request_next_burst(Work& work, DownloadBurstItem& item)
std::find(first_missing, item.transferred.end(), DownloadBurstItem::Transferred::Yes);

const size_t offset = std::distance(item.transferred.begin(), first_missing);

const uint32_t size =
static_cast<uint32_t>(std::distance(first_missing, last_missing_plus_one));

Expand All @@ -605,20 +688,30 @@ void MavlinkFtpClient::request_next_burst(Work& work, DownloadBurstItem& item)
return;
}

work.last_opcode = CMD_BURST_READ_FILE;
if (_debugging) {
LogDebug() << "Re-requesting from " << offset << " with size " << size;
}

work.last_opcode = CMD_READ_FILE;
work.payload = {};
work.payload.seq_number = work.last_sent_seq_number++;
work.payload.session = _session;
work.payload.opcode = work.last_opcode;
work.payload.offset = offset;

work.payload.size = 4;
std::memcpy(&work.payload.data, &size, 4);
work.payload.size =
static_cast<uint8_t>(std::min(static_cast<uint32_t>(max_data_length), size));

start_timer();
send_mavlink_ftp_message(work.payload);
}

size_t MavlinkFtpClient::burst_bytes_transferred(DownloadBurstItem& item)
{
return std::count(
item.transferred.begin(), item.transferred.end(), DownloadBurstItem::Transferred::Yes);
}

bool MavlinkFtpClient::upload_start(Work& work, UploadItem& item)
{
std::error_code ec;
Expand Down Expand Up @@ -1117,8 +1210,15 @@ void MavlinkFtpClient::timeout()
LogDebug() << "Retries left: " << work->retries;
}

start_timer();
send_mavlink_ftp_message(work->payload);
{
const size_t bytes_transferred = burst_bytes_transferred(item);
if (bytes_transferred == 0 || bytes_transferred == item.transferred.size()) {
start_timer();
send_mavlink_ftp_message(work->payload);
} else {
request_next_rest(*work, item);
}
}
},
[&](UploadItem& item) {
if (--work->retries == 0) {
Expand Down
4 changes: 3 additions & 1 deletion src/mavsdk/core/mavlink_ftp_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,9 @@ class MavlinkFtpClient {

bool download_burst_start(Work& work, DownloadBurstItem& item);
bool download_burst_continue(Work& work, DownloadBurstItem& item, PayloadHeader* payload);
void request_next_burst(Work& work, DownloadBurstItem& item);
void request_burst(Work& work, DownloadBurstItem& item);
void request_next_rest(Work& work, DownloadBurstItem& item);
size_t burst_bytes_transferred(DownloadBurstItem& item);

bool upload_start(Work& work, UploadItem& item);
bool upload_continue(Work& work, UploadItem& item);
Expand Down
26 changes: 9 additions & 17 deletions src/mavsdk/core/mavlink_ftp_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,10 @@ void MavlinkFtpServer::_work_read(const PayloadHeader& payload)
return;
}

if (_debugging) {
LogWarn() << "Read at " << payload.offset << " for " << int(payload.size);
}

_session_info.ifstream.read(reinterpret_cast<char*>(response.data), payload.size);

if (_session_info.ifstream.fail()) {
Expand All @@ -720,6 +724,7 @@ void MavlinkFtpServer::_work_read(const PayloadHeader& payload)

const uint32_t bytes_read = _session_info.ifstream.gcount();

response.offset = payload.offset;
response.size = bytes_read;
response.opcode = Opcode::RSP_ACK;

Expand Down Expand Up @@ -764,20 +769,7 @@ void MavlinkFtpServer::_work_burst(const PayloadHeader& payload)
}

_session_info.burst_offset = payload.offset;

if (payload.size != 4) {
response.seq_number = payload.seq_number + 1;
response.opcode = Opcode::RSP_NAK;
response.size = 1;
response.data[0] = ServerResult::ERR_INVALID_DATA_SIZE;
LogErr() << "Burst size invalid";
_send_mavlink_ftp_message(response);
return;
}

uint32_t burst_size;
std::memcpy(&burst_size, &payload.data, payload.size);
_session_info.burst_end = _session_info.burst_offset + burst_size;
_session_info.burst_chunk_size = payload.size;

_burst_seq = payload.seq_number + 1;

Expand Down Expand Up @@ -815,8 +807,8 @@ void MavlinkFtpServer::_send_burst_packet()
void MavlinkFtpServer::_make_burst_packet(PayloadHeader& packet)
{
uint32_t bytes_to_read = std::min(
static_cast<uint32_t>(max_data_length),
_session_info.burst_end - _session_info.burst_offset);
static_cast<uint32_t>(_session_info.burst_chunk_size),
_session_info.file_size - _session_info.burst_offset);

if (_debugging) {
LogDebug() << "Burst read of " << bytes_to_read << " bytes";
Expand All @@ -838,7 +830,7 @@ void MavlinkFtpServer::_make_burst_packet(PayloadHeader& packet)
packet.offset = _session_info.burst_offset;
_session_info.burst_offset += bytes_read;

if (_session_info.burst_offset == _session_info.burst_end) {
if (_session_info.burst_offset == _session_info.file_size) {
// Last read, we are done for this burst.
packet.burst_complete = 1;
if (_debugging) {
Expand Down
2 changes: 1 addition & 1 deletion src/mavsdk/core/mavlink_ftp_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ class MavlinkFtpServer {
struct SessionInfo {
uint32_t file_size{0};
uint32_t burst_offset{0};
uint32_t burst_end{0};
uint8_t burst_chunk_size{0};
std::ifstream ifstream;
std::ofstream ofstream;
} _session_info{};
Expand Down

0 comments on commit 1c78f03

Please sign in to comment.