diff --git a/src/libsync/CMakeLists.txt b/src/libsync/CMakeLists.txt index cbece4b41c7..c56f5ad508b 100644 --- a/src/libsync/CMakeLists.txt +++ b/src/libsync/CMakeLists.txt @@ -55,6 +55,7 @@ set(libsync_SRCS propagateremotedelete.cpp propagateremotemove.cpp propagateremotemkdir.cpp + propagatefiles.cpp syncengine.cpp syncfilestatus.cpp syncfilestatustracker.cpp diff --git a/src/libsync/capabilities.cpp b/src/libsync/capabilities.cpp index cee5f1aebe2..e89d95a588d 100644 --- a/src/libsync/capabilities.cpp +++ b/src/libsync/capabilities.cpp @@ -116,6 +116,15 @@ bool Capabilities::chunkingNg() const return _capabilities["dav"].toMap()["chunking"].toByteArray() >= "1.0"; } +bool Capabilities::scheduling() const +{ + static const auto scheduling = qgetenv("OWNCLOUD_SCHEDULING"); + if (scheduling == "0") return false; + if (scheduling == "1") return true; + + return _capabilities["dav"].toMap()["scheduling"].toByteArray() >= "1.0"; +} + bool Capabilities::chunkingParallelUploadDisabled() const { return _capabilities["dav"].toMap()["chunkingParallelUploadDisabled"].toBool(); diff --git a/src/libsync/capabilities.h b/src/libsync/capabilities.h index e3d8c013c23..bfd31cc0dd4 100644 --- a/src/libsync/capabilities.h +++ b/src/libsync/capabilities.h @@ -41,6 +41,7 @@ class OWNCLOUDSYNC_EXPORT Capabilities { int sharePublicLinkExpireDateDays() const; bool shareResharing() const; bool chunkingNg() const; + bool scheduling() const; /// disable parallel upload in chunking bool chunkingParallelUploadDisabled() const; diff --git a/src/libsync/configfile.cpp b/src/libsync/configfile.cpp index 39f6303f6e6..6257e1c7f0f 100644 --- a/src/libsync/configfile.cpp +++ b/src/libsync/configfile.cpp @@ -53,6 +53,7 @@ static const char updateCheckIntervalC[] = "updateCheckInterval"; static const char geometryC[] = "geometry"; static const char timeoutC[] = "timeout"; static const char chunkSizeC[] = "chunkSize"; +static const char smallFileSizeC[] = "smallFileSize"; static const char proxyHostC[] = "Proxy/host"; static const char proxyTypeC[] = "Proxy/type"; @@ -130,6 +131,12 @@ quint64 ConfigFile::chunkSize() const return settings.value(QLatin1String(chunkSizeC), 10*1000*1000).toLongLong(); // default to 10 MB } +quint64 ConfigFile::smallFileSize() const +{ + QSettings settings(configFile(), QSettings::IniFormat); + return settings.value(QLatin1String(smallFileSizeC), 500*1000).toLongLong(); // default to 500 kB +} + void ConfigFile::setOptionalDesktopNotifications(bool show) { QSettings settings(configFile(), QSettings::IniFormat); diff --git a/src/libsync/configfile.h b/src/libsync/configfile.h index 0fe3b3cd063..9a346ba97b2 100644 --- a/src/libsync/configfile.h +++ b/src/libsync/configfile.h @@ -115,6 +115,7 @@ class OWNCLOUDSYNC_EXPORT ConfigFile int timeout() const; quint64 chunkSize() const; + quint64 smallFileSize() const; void saveGeometry(QWidget *w); void restoreGeometry(QWidget *w); diff --git a/src/libsync/owncloudpropagator.cpp b/src/libsync/owncloudpropagator.cpp index 3573ffd2411..71bae726f54 100644 --- a/src/libsync/owncloudpropagator.cpp +++ b/src/libsync/owncloudpropagator.cpp @@ -313,6 +313,13 @@ void OwncloudPropagator::start(const SyncFileItemVector& items) directories.push(qMakePair(QString(), _rootJob.data())); QVector directoriesToRemove; QString removedDirectory; + + // This needs to be changed before marging - capability to switch on/off scheduling, since + // server needs to decide if to do that - scheduling will also bring more features in the future + // TODO: change it before marging to -> bool enableScheduledRequests = account()->capabilities().scheduling(); + bool enableScheduledRequests = true; + + PropagateFiles* filesJob = new PropagateFiles(this); foreach(const SyncFileItemPtr &item, items) { if (!removedDirectory.isEmpty() && item->_file.startsWith(removedDirectory)) { @@ -389,6 +396,9 @@ void OwncloudPropagator::start(const SyncFileItemVector& items) currentDirJob->append(dir); } directories.push(qMakePair(item->destination() + "/" , dir)); + } else if (enableScheduledRequests + && (item->_instruction == CSYNC_INSTRUCTION_NEW || item->_instruction == CSYNC_INSTRUCTION_SYNC)) { + filesJob->append(item); } else if (PropagateItemJob* current = createJob(item)) { if (item->_instruction == CSYNC_INSTRUCTION_TYPE_CHANGE) { // will delete directories, so defer execution @@ -400,6 +410,13 @@ void OwncloudPropagator::start(const SyncFileItemVector& items) } } + if (enableScheduledRequests && !filesJob->isEmpty()){ + // This job has parallelism WaitForFinished to allow directoriesToRemove be last + _rootJob->append(filesJob); + } else { + delete filesJob; + } + foreach(PropagatorJob* it, directoriesToRemove) { _rootJob->append(it); } @@ -459,6 +476,20 @@ quint64 OwncloudPropagator::chunkSize() return chunkSize; } +quint64 OwncloudPropagator::smallFileSize() +{ + // A small filesize item is a file whose transfer time + // typically will be lower than its bookkeeping time. + static uint smallFileSize; + if (!smallFileSize) { + smallFileSize = qgetenv("OWNCLOUD_SMALLFILE_SIZE").toUInt(); + if (smallFileSize == 0) { + ConfigFile cfg; + smallFileSize = cfg.smallFileSize(); + } + } + return smallFileSize; +} bool OwncloudPropagator::localFileNameClash( const QString& relFile ) { diff --git a/src/libsync/owncloudpropagator.h b/src/libsync/owncloudpropagator.h index 85fb3240a63..43cfa3c8719 100644 --- a/src/libsync/owncloudpropagator.h +++ b/src/libsync/owncloudpropagator.h @@ -257,7 +257,6 @@ class PropagateIgnoreJob : public PropagateItemJob { class OwncloudPropagator : public QObject { Q_OBJECT - PropagateItemJob *createJob(const SyncFileItemPtr& item); QScopedPointer _rootJob; public: @@ -322,6 +321,7 @@ class OwncloudPropagator : public QObject { /** returns the size of chunks in bytes */ static quint64 chunkSize(); + static quint64 smallFileSize(); AccountPtr account() const; @@ -337,7 +337,11 @@ class OwncloudPropagator : public QObject { */ DiskSpaceResult diskSpaceCheck() const; + PropagateItemJob *createJob(const SyncFileItemPtr& item); + int runningAtRootJob(){ + return _rootJob.data()->_runningNow; + } private slots: @@ -410,6 +414,106 @@ private slots: void slotPollFinished(); }; +/** + * @brief The PropagateFiles class is a container class. + * + * It will also ensure proper bandwidth utilization vs bookkeeping balance + * + * @ingroup libsync + * + * State Machine: + * + * _________________________________________________ ___________________________________________ + * | | | + * | (Empty DB items queue and populated Data items queue?) | + * | | | | + * | Yes | | No | + * | | | | + * |<-----------[Schedule Data job] (Empty Data items queue and populated DB items queue?) | + * | | | | + * | No | | Yes | + * | | | | + * | | [Schedule DB job]--------------->| + * | | | + * | | | + * | (Populated Data items queue and populated DB items queue?) | + * | | | | + * | Yes | | No | + * | | | | + * | (Active running Data items number exceeded limit?) [Finish - no items] | + * | | | | + * | No | | Yes | + * | | | | + * <---[Schedule Data job] [Schedule DB job]------------------------------------------> + * + * + */ +class PropagateFiles : public PropagatorJob { + Q_OBJECT +public: + QVector _subJobs; + + QVector _syncDBItems; // Items which bookkeeping on the server is longer then the transfer of its payload + + QVector _syncDataItems; // Items which transfer of the payload is longer then bookkeeping on the server + + int _jobsFinished; // number of jobs that have completed + SyncFileItem::Status _hasError; // NoStatus, or NormalError / SoftError if there was an error + int _firstUnfinishedSubJob; + int _totalItems; + int _activeDBJobsNow; + int _activeDataJobsNow; + + explicit PropagateFiles(OwncloudPropagator *propagator) + : PropagatorJob(propagator) + , _jobsFinished(0), _hasError(SyncFileItem::NoStatus), _firstUnfinishedSubJob(0), _totalItems(0), _activeDBJobsNow(0), _activeDataJobsNow(0) { } + + virtual ~PropagateFiles() { + qDeleteAll(_subJobs); + } + + bool isEmpty() { + return _syncDBItems.isEmpty() && _syncDataItems.isEmpty(); + } + + void append(const SyncFileItemPtr &item); + virtual bool scheduleNextJob() Q_DECL_OVERRIDE; + bool scheduleNewJob(QVector &syncJobs); + bool scheduleNextItem(); + + virtual void abort() Q_DECL_OVERRIDE { + foreach (PropagatorJob *n, _subJobs) + n->abort(); + } + + void finalize(); + + qint64 committedDiskSpace() const Q_DECL_OVERRIDE; + + JobParallelism parallelism() Q_DECL_OVERRIDE { return OCC::PropagatorJob::WaitForFinished; } + +private slots: + bool possiblyRunNextJob(PropagatorJob *next) { + if (next->_state == NotYetStarted) { + connect(next, SIGNAL(finished(SyncFileItem::Status)), this, SLOT(slotSubJobFinished(SyncFileItem::Status)), Qt::QueuedConnection); + connect(next, SIGNAL(itemCompleted(const SyncFileItem &, const PropagatorJob &)), + this, SIGNAL(itemCompleted(const SyncFileItem &, const PropagatorJob &))); + connect(next, SIGNAL(progress(const SyncFileItem &,quint64)), this, SIGNAL(progress(const SyncFileItem &,quint64))); + connect(next, SIGNAL(ready()), this, SIGNAL(ready())); + + PropagateItemJob *job = qobject_cast(next); + if(job->_item->_size <= propagator()->smallFileSize()){ + _activeDBJobsNow++; + } else { + _activeDataJobsNow++; + } + } + return next->scheduleNextJob(); + } + + void slotSubJobFinished(SyncFileItem::Status status); +}; + } #endif diff --git a/src/libsync/propagatedownload.h b/src/libsync/propagatedownload.h index 1317dda869c..3ab0285724d 100644 --- a/src/libsync/propagatedownload.h +++ b/src/libsync/propagatedownload.h @@ -115,7 +115,7 @@ class PropagateDownloadFile : public PropagateItemJob { qint64 committedDiskSpace() const Q_DECL_OVERRIDE; // We think it might finish quickly because it is a small file. - bool isLikelyFinishedQuickly() Q_DECL_OVERRIDE { return _item->_size < 100*1024; } + bool isLikelyFinishedQuickly() Q_DECL_OVERRIDE { return _item->_size < propagator()->smallFileSize(); } /** * Whether an existing folder with the same name may be deleted before diff --git a/src/libsync/propagatefiles.cpp b/src/libsync/propagatefiles.cpp new file mode 100644 index 00000000000..b0505cab317 --- /dev/null +++ b/src/libsync/propagatefiles.cpp @@ -0,0 +1,160 @@ +/* + * Copyright (C) by Piotr Mrowczynski + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. + */ + +#include "owncloudpropagator.h" + +namespace OCC { + + +qint64 PropagateFiles::committedDiskSpace() const +{ + qint64 needed = 0; + foreach (PropagatorJob* job, _subJobs) { + needed += job->committedDiskSpace(); + } + return needed; +} + +void PropagateFiles::append(const SyncFileItemPtr &item) +{ + _totalItems++; + if(item->_size <= propagator()->smallFileSize()){ + _syncDBItems.append(item); + } else { + _syncDataItems.append(item); + } +} + +bool PropagateFiles::scheduleNextJob() +{ + if (_state == Finished) { + return false; + } + + if (_state == NotYetStarted) { + _state = Running; + + if (isEmpty()) { + finalize(); + return true; + } + } + + if (_state == Running) { + // This will ensure that all other jobs in the earlier PropagateDirectory are finished, so we can start with data transfers + if (propagator()->runningAtRootJob() != 1){ + return false; + } + } + + // cache the value of first unfinished subjob + int i = _firstUnfinishedSubJob; + int subJobsCount = _subJobs.count(); + while (i < subJobsCount && _subJobs.at(i)->_state == Finished) { + _firstUnfinishedSubJob = ++i; + } + for (int i = _firstUnfinishedSubJob; i < subJobsCount; ++i) { + if (_subJobs.at(i)->_state == Finished) { + continue; + } + + if (possiblyRunNextJob(_subJobs.at(i))) { + return true; + } + + Q_ASSERT(_subJobs.at(i)->_state == Running); + } + + return scheduleNextItem(); +} + +bool PropagateFiles::scheduleNewJob(QVector &syncJobs){ + // This function is used to schedule new job and lazily create job from sync items + Q_ASSERT(!syncJobs.isEmpty()); + + // Equivalent to Qt5 takeFirst() + const SyncFileItemPtr &item = syncJobs.first(); + PropagateItemJob* job = propagator()->createJob(item); + _subJobs.append(job); + syncJobs.pop_front(); + return possiblyRunNextJob(job); +} + +bool PropagateFiles::scheduleNextItem() +{ + /// This function holds the whole bookkeeping/data-transfers balance logic + + bool syncDBItemsEmpty = _syncDBItems.isEmpty(); + bool syncDataItemsEmpty = _syncDataItems.isEmpty(); + + if (syncDBItemsEmpty && !syncDataItemsEmpty){ + // There are no more DB jobs, ensure to maximally parallelise Data Transfers now + return scheduleNewJob(_syncDataItems); + } else if (!syncDBItemsEmpty && syncDataItemsEmpty){ + // There are no more data transfer jobs, ensure to maximally parallelise DB jobs now + return scheduleNewJob(_syncDBItems); + } else if (!syncDBItemsEmpty && !syncDataItemsEmpty){ + // Both queues have items, ensure bookkeeping and data transfer balance + if (_activeDataJobsNow < 2){ + // By default, we have max 3 connections available for bigger files + // On the other hand, we have max 6 for isLikelyToFinishQuickly files (also small files) + // It makes sense to use 2 queues for bigger data transfers, and leave the remaining 1-4 for faster operations + return scheduleNewJob(_syncDataItems); + } else { + return scheduleNewJob(_syncDBItems); + } + } + + // This means that we have no more file-items to sync -> finish + return false; +} + +void PropagateFiles::slotSubJobFinished(SyncFileItem::Status status) +{ + if (status == SyncFileItem::FatalError) { + abort(); + _state = Finished; + emit finished(status); + return; + } else if (status == SyncFileItem::NormalError || status == SyncFileItem::SoftError) { + _hasError = status; + } + + PropagateItemJob *job = qobject_cast(sender()); + if(job->_item->_size <= propagator()->smallFileSize()){ + _activeDBJobsNow--; + } else { + _activeDataJobsNow--; + } + Q_ASSERT(job); + + _jobsFinished++; + + // We finished processing all the jobs + // check if we finished + if (_jobsFinished >= _totalItems) { + Q_ASSERT(!_activeDBJobsNow && !_activeDataJobsNow); // how can we be finished if there are still jobs running now + finalize(); + } else { + emit ready(); + } +} + +void PropagateFiles::finalize() +{ + _state = Finished; + emit finished(_hasError == SyncFileItem::NoStatus ? SyncFileItem::Success : _hasError); +} + +} diff --git a/src/libsync/propagateupload.h b/src/libsync/propagateupload.h index 31e9067d640..fbb87a24f62 100644 --- a/src/libsync/propagateupload.h +++ b/src/libsync/propagateupload.h @@ -210,7 +210,7 @@ class PropagateUploadFileCommon : public PropagateItemJob { void start() Q_DECL_OVERRIDE; - bool isLikelyFinishedQuickly() Q_DECL_OVERRIDE { return _item->_size < 100*1024; } + bool isLikelyFinishedQuickly() Q_DECL_OVERRIDE { return _item->_size < propagator()->smallFileSize(); } private slots: void slotComputeContentChecksum();