Skip to content

Commit

Permalink
Various fixes and improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
ckamm committed Mar 24, 2017
1 parent 54b94ff commit 1e9c5b1
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 80 deletions.
9 changes: 5 additions & 4 deletions src/libsync/capabilities.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,12 @@ bool Capabilities::chunkingNg() const
return _capabilities["dav"].toMap()["chunking"].toByteArray() >= "1.0";
}

quint64 Capabilities::requestMaxDurationDC() const
quint64 Capabilities::desiredChunkUploadDuration() const
{
QByteArray requestMaxDurationDC = _capabilities["dav"].toMap()["max_single_upload_request_duration_msec"].toByteArray();
if (!requestMaxDurationDC.isEmpty())
return requestMaxDurationDC.toLongLong();
QByteArray value = _capabilities["dav"].toMap()["target_chunk_upload_request_duration_msec"].toByteArray();
if (!value.isEmpty()) {
return value.toLongLong();
}
return 0;
}

Expand Down
12 changes: 11 additions & 1 deletion src/libsync/capabilities.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,17 @@ class OWNCLOUDSYNC_EXPORT Capabilities {
int sharePublicLinkExpireDateDays() const;
bool shareResharing() const;
bool chunkingNg() const;
quint64 requestMaxDurationDC() const;

/**
* The desired time in ms needed for a single-chunk upload.
*
* The chunk size will be dynamically adjusted to target
* this value.
*
* Capability: dav/target_chunk_upload_request_duration_msec
*/
quint64 desiredChunkUploadDuration() const;


/// returns true if the capabilities report notifications
bool notificationsAvailable() const;
Expand Down
3 changes: 2 additions & 1 deletion src/libsync/configfile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ static const char updateCheckIntervalC[] = "updateCheckInterval";
static const char geometryC[] = "geometry";
static const char timeoutC[] = "timeout";
static const char chunkSizeC[] = "chunkSize";
static const char minChunkSizeC[] = "minChunkSizeC";
static const char maxChunkSizeC[] = "maxChunkSizeC";

static const char proxyHostC[] = "Proxy/host";
Expand Down Expand Up @@ -138,7 +139,7 @@ quint64 ConfigFile::maxChunkSize() const
quint64 ConfigFile::minChunkSize() const
{
QSettings settings(configFile(), QSettings::IniFormat);
return settings.value(QLatin1String(maxChunkSizeC), 1000*1000).toLongLong(); // default to 1 MB
return settings.value(QLatin1String(minChunkSizeC), 1000*1000).toLongLong(); // default to 1 MB
}

void ConfigFile::setOptionalDesktopNotifications(bool show)
Expand Down
25 changes: 19 additions & 6 deletions src/libsync/owncloudpropagator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,8 @@ PropagateItemJob* OwncloudPropagator::createJob(const SyncFileItemPtr &item) {
return job;
} else {
PropagateUploadFileCommon *job = 0;
if (item->_size > chunkSize() && account()->capabilities().chunkingNg()) {
job = new PropagateUploadFileNG(this, item, account()->capabilities().requestMaxDurationDC());
if (item->_size > _chunkSize && account()->capabilities().chunkingNg()) {
job = new PropagateUploadFileNG(this, item);
} else {
job = new PropagateUploadFileV1(this, item);
}
Expand Down Expand Up @@ -434,7 +434,7 @@ bool OwncloudPropagator::isInSharedDirectory(const QString& file)

int OwncloudPropagator::httpTimeout()
{
static int timeout;
static int timeout = 0;
if (!timeout) {
timeout = qgetenv("OWNCLOUD_TIMEOUT").toUInt();
if (timeout == 0) {
Expand All @@ -446,9 +446,9 @@ int OwncloudPropagator::httpTimeout()
return timeout;
}

quint64 OwncloudPropagator::chunkSize()
quint64 OwncloudPropagator::initialChunkSize()
{
static uint chunkSize;
static uint chunkSize = 0;
if (!chunkSize) {
chunkSize = qgetenv("OWNCLOUD_CHUNK_SIZE").toUInt();
if (chunkSize == 0) {
Expand All @@ -461,7 +461,7 @@ quint64 OwncloudPropagator::chunkSize()

quint64 OwncloudPropagator::maxChunkSize()
{
static uint chunkSize;
static uint chunkSize = 0;
if (!chunkSize) {
chunkSize = qgetenv("OWNCLOUD_MAX_CHUNK_SIZE").toUInt();
if (chunkSize == 0) {
Expand All @@ -472,6 +472,19 @@ quint64 OwncloudPropagator::maxChunkSize()
return chunkSize;
}

quint64 OwncloudPropagator::minChunkSize()
{
static uint chunkSize = 0;
if (!chunkSize) {
chunkSize = qgetenv("OWNCLOUD_MIN_CHUNK_SIZE").toUInt();
if (chunkSize == 0) {
ConfigFile cfg;
chunkSize = cfg.minChunkSize();
}
}
return chunkSize;
}

bool OwncloudPropagator::localFileNameClash( const QString& relFile )
{
bool re = false;
Expand Down
12 changes: 11 additions & 1 deletion src/libsync/owncloudpropagator.h
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ class OwncloudPropagator : public QObject {
, _finishedEmited(false)
, _bandwidthManager(this)
, _anotherSyncNeeded(false)
, _chunkSize(initialChunkSize())
, _account(account)
{ }

Expand All @@ -305,6 +306,14 @@ class OwncloudPropagator : public QObject {
/** We detected that another sync is required after this one */
bool _anotherSyncNeeded;

/** The size to use for upload chunks.
*
* Will be dynamically adjusted after each chunk upload finishes
* if Capabilities::desiredChunkUploadDuration has a target
* chunk-upload duration set.
*/
quint64 _chunkSize;

/* The maximum number of active jobs in parallel */
int maximumActiveJob();
int hardMaximumActiveJob();
Expand All @@ -325,8 +334,9 @@ class OwncloudPropagator : public QObject {
static int httpTimeout();

/** returns the size of chunks in bytes */
static quint64 chunkSize();
static quint64 initialChunkSize();
static quint64 maxChunkSize();
static quint64 minChunkSize();

AccountPtr account() const;

Expand Down
1 change: 1 addition & 0 deletions src/libsync/propagateupload.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ void PUTFileJob::start() {
connect(_device.data(), SIGNAL(wasReset()), this, SLOT(slotSoftAbort()));
#endif

_requestTimer.start();
AbstractNetworkJob::start();
}

Expand Down
39 changes: 10 additions & 29 deletions src/libsync/propagateupload.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <QBuffer>
#include <QFile>
#include <QDebug>
#include <QElapsedTimer>


namespace OCC {
Expand Down Expand Up @@ -90,6 +91,7 @@ class PUTFileJob : public AbstractNetworkJob {
QMap<QByteArray, QByteArray> _headers;
QString _errorString;
QUrl _url;
QElapsedTimer _requestTimer;

public:
// Takes ownership of the device
Expand Down Expand Up @@ -117,6 +119,10 @@ class PUTFileJob : public AbstractNetworkJob {

virtual void slotTimeout() Q_DECL_OVERRIDE;

quint64 msSinceStart() const {
return _requestTimer.elapsed();
}


signals:
void finishedSignal();
Expand Down Expand Up @@ -194,7 +200,6 @@ class PropagateUploadFileCommon : public PropagateItemJob {
QByteArray _transmissionChecksum;
QByteArray _transmissionChecksumType;


public:
PropagateUploadFileCommon(OwncloudPropagator* propagator,const SyncFileItemPtr& item)
: PropagateItemJob(propagator, item), _finished(false), _deleteExisting(false) {}
Expand Down Expand Up @@ -262,7 +267,7 @@ class PropagateUploadFileV1 : public PropagateUploadFileCommon {
int _chunkCount; /// Total number of chunks for this file
int _transferId; /// transfer id (part of the url)

quint64 chunkSize() const { return _propagator->chunkSize(); }
quint64 chunkSize() const { return _propagator->initialChunkSize(); }


public:
Expand All @@ -289,46 +294,22 @@ class PropagateUploadFileNG : public PropagateUploadFileCommon {
quint64 _sent; /// amount of data (bytes) that was already sent
uint _transferId; /// transfer id (part of the url)
int _currentChunk; /// Id of the next chunk that will be sent
quint64 _currentChunkSize; /// current chunk size
bool _removeJobError; /// If not null, there was an error removing the job
quint64 _lastChunkSize; /// current chunk size

/*
* This is value in ms obtained from the server.
*
* Dynamic Chunking attribute the maximum number of miliseconds that single request below chunk size can take
* This value should be based on heuristics with default value 10000ms, time it takes to transfer 10MB chunk on 1MB/s upload link.
*
* Suggested solution will be to evaluate max(SNR, MORD) where:
* > SNR - Slow network request, so time it will take to transmit default chunking sized request at specific low upload bandwidth
* > MORD - Maximum observed request time, so double the time of maximum observed RTT of the very small PUT request (e.g. 1kB) to the system
*
* Exemplary, syncing 100MB files, with chunking size 10MB, will cause sync of 10 PUT requests which max evaluation was set to <max_single_upload_request_duration_msec>
*
* Dynamic chunking client algorithm is specified in the ownCloud documentation and uses <max_single_upload_request_duration_msec> to estimate if given
* bandwidth allows higher chunk sizes (because of high goodput)
*/
quint64 _requestMaxDuration;

// Map chunk number with its size from the PROPFIND on resume.
// (Only used from slotPropfindIterate/slotPropfindFinished because the LsColJob use signals to report data.)
QMap<int, quint64> _serverChunks;

quint64 chunkSize() const { return _propagator->chunkSize(); }
quint64 maxChunkSize() const { return _propagator->maxChunkSize(); }

quint64 getRequestMaxDurationDC(){
return _requestMaxDuration;
}

/**
* Return the URL of a chunk.
* If chunk == -1, returns the URL of the parent folder containing the chunks
*/
QUrl chunkUrl(int chunk = -1);

public:
PropagateUploadFileNG(OwncloudPropagator* propagator,const SyncFileItemPtr& item, const quint64& requestMaxDuration) :
PropagateUploadFileCommon(propagator,item), _lastChunkSize(0), _requestMaxDuration(requestMaxDuration) {}
PropagateUploadFileNG(OwncloudPropagator* propagator,const SyncFileItemPtr& item) :
PropagateUploadFileCommon(propagator,item), _currentChunkSize(0) {}

void doStartUpload() Q_DECL_OVERRIDE;

Expand Down
68 changes: 30 additions & 38 deletions src/libsync/propagateuploadng.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
#include <QDir>
#include <cmath>
#include <cstring>
#include <cmath>

namespace OCC {

QUrl PropagateUploadFileNG::chunkUrl(int chunk)
Expand Down Expand Up @@ -263,43 +263,10 @@ void PropagateUploadFileNG::startNextChunk()
quint64 fileSize = _item->_size;
Q_ASSERT(fileSize >= _sent);

quint64 currentChunkSize = chunkSize();

// this will check if getRequestMaxDurationDC is set to 0 or not
double requestMaxDurationDC = (double) getRequestMaxDurationDC();
if (requestMaxDurationDC != 0) {
// this if first chunked file request, so it can start with default size of chunkSize()
// if _lastChunkSize != 0 it means that we already have send one request
if(_lastChunkSize != 0){
//TODO: this is done step by step for debugging purposes

//get last request timestamp
double lastChunkLap = (double) _stopWatch.durationOfLap(QLatin1String("ChunkDuration"));

//get duration of the request
double requestDuration = (double) _stopWatch.addLapTime(QLatin1String("ChunkDuration")) - lastChunkLap;

// calculate natural logarithm
double correctionParameter = log(requestMaxDurationDC / requestDuration) - 1;

// If logarithm is smaller or equal zero, it means that we exceeded max request duration
// If exceeded it will use currentChunkSize = chunkSize()
// If did not exceeded, we will increase the chunk size
// motivation for logarithm is specified in the dynamic chunking documentation
// TODO: give link to documentation
if (correctionParameter>0){
currentChunkSize = qMin(_lastChunkSize + (qint64) correctionParameter*chunkSize(), maxChunkSize());
}
}

//remember the value of last chunk size
_lastChunkSize = currentChunkSize;
}

// prevent situation that chunk size is bigger then required one to send
currentChunkSize = qMin(currentChunkSize, fileSize - _sent);
_currentChunkSize = qMin(_propagator->_chunkSize, fileSize - _sent);

if (currentChunkSize == 0) {
if (_currentChunkSize == 0) {
Q_ASSERT(_jobs.isEmpty()); // There should be no running job anymore
_finished = true;
// Finish with a MOVE
Expand Down Expand Up @@ -330,7 +297,7 @@ void PropagateUploadFileNG::startNextChunk()
auto device = new UploadDevice(&_propagator->_bandwidthManager);
const QString fileName = _propagator->getFilePath(_item->_file);

if (! device->prepareAndOpen(fileName, _sent, currentChunkSize)) {
if (! device->prepareAndOpen(fileName, _sent, _currentChunkSize)) {
qDebug() << "ERR: Could not prepare upload device: " << device->errorString();

// If the file is currently locked, we want to retry the sync
Expand All @@ -346,7 +313,7 @@ void PropagateUploadFileNG::startNextChunk()
QMap<QByteArray, QByteArray> headers;
headers["OC-Chunk-Offset"] = QByteArray::number(_sent);

_sent += currentChunkSize;
_sent += _currentChunkSize;
QUrl url = chunkUrl(_currentChunk);

// job takes ownership of device via a QScopedPointer. Job deletes itself when finishing
Expand Down Expand Up @@ -422,6 +389,31 @@ void PropagateUploadFileNG::slotPutFinished()
return;
}

// Adjust the chunk size for the time taken.
//
// Dynamic chunk sizing is enabled if the server configured a
// target duration for each chunk upload.
double targetDuration = _propagator->account()->capabilities().desiredChunkUploadDuration();
if (targetDuration > 0) {
double uploadTime = job->msSinceStart();

auto correctedSize = static_cast<quint64>(
_currentChunkSize / uploadTime * targetDuration);

// There can be multiple chunk uploads going on at the same time.
// So don't force the chunk size to the new predicted best size
// and instead move it there gradually.
_propagator->_chunkSize = qBound(
_propagator->minChunkSize(),
(_propagator->_chunkSize + correctedSize) / 2,
_propagator->maxChunkSize());

qDebug() << "Chunked upload of " << _currentChunkSize << " took " << uploadTime
<< " desired is " << targetDuration << ", expected good chunk size is "
<< correctedSize << " and nudged next chunk size to " << _propagator->_chunkSize;
}


Q_ASSERT(_sent <= _item->_size);
bool finished = _sent == _item->_size;

Expand Down

0 comments on commit 1e9c5b1

Please sign in to comment.