Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Group requests to ensure balance between bandwidth utilization and bookkeeping #5391 #5390 #4498 #1633 #4454 #5440

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/libsync/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ set(libsync_SRCS
propagateremotedelete.cpp
propagateremotemove.cpp
propagateremotemkdir.cpp
propagatefiles.cpp
syncengine.cpp
syncfilestatus.cpp
syncfilestatustracker.cpp
Expand Down
9 changes: 9 additions & 0 deletions src/libsync/capabilities.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,15 @@ bool Capabilities::chunkingNg() const
return _capabilities["dav"].toMap()["chunking"].toByteArray() >= "1.0";
}

bool Capabilities::scheduling() const
{
static const auto scheduling = qgetenv("OWNCLOUD_SCHEDULING");
if (scheduling == "0") return false;
if (scheduling == "1") return true;

return _capabilities["dav"].toMap()["scheduling"].toByteArray() >= "1.0";
}

bool Capabilities::chunkingParallelUploadDisabled() const
{
return _capabilities["dav"].toMap()["chunkingParallelUploadDisabled"].toBool();
Expand Down
1 change: 1 addition & 0 deletions src/libsync/capabilities.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class OWNCLOUDSYNC_EXPORT Capabilities {
int sharePublicLinkExpireDateDays() const;
bool shareResharing() const;
bool chunkingNg() const;
bool scheduling() const;

/// disable parallel upload in chunking
bool chunkingParallelUploadDisabled() const;
Expand Down
7 changes: 7 additions & 0 deletions src/libsync/configfile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ static const char updateCheckIntervalC[] = "updateCheckInterval";
static const char geometryC[] = "geometry";
static const char timeoutC[] = "timeout";
static const char chunkSizeC[] = "chunkSize";
static const char smallFileSizeC[] = "smallFileSize";

static const char proxyHostC[] = "Proxy/host";
static const char proxyTypeC[] = "Proxy/type";
Expand Down Expand Up @@ -130,6 +131,12 @@ quint64 ConfigFile::chunkSize() const
return settings.value(QLatin1String(chunkSizeC), 10*1000*1000).toLongLong(); // default to 10 MB
}

quint64 ConfigFile::smallFileSize() const
{
QSettings settings(configFile(), QSettings::IniFormat);
return settings.value(QLatin1String(smallFileSizeC), 500*1000).toLongLong(); // default to 500 kB
}

void ConfigFile::setOptionalDesktopNotifications(bool show)
{
QSettings settings(configFile(), QSettings::IniFormat);
Expand Down
1 change: 1 addition & 0 deletions src/libsync/configfile.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ class OWNCLOUDSYNC_EXPORT ConfigFile

int timeout() const;
quint64 chunkSize() const;
quint64 smallFileSize() const;

void saveGeometry(QWidget *w);
void restoreGeometry(QWidget *w);
Expand Down
31 changes: 31 additions & 0 deletions src/libsync/owncloudpropagator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,13 @@ void OwncloudPropagator::start(const SyncFileItemVector& items)
directories.push(qMakePair(QString(), _rootJob.data()));
QVector<PropagatorJob*> directoriesToRemove;
QString removedDirectory;

// This needs to be changed before marging - capability to switch on/off scheduling, since
// server needs to decide if to do that - scheduling will also bring more features in the future
// TODO: change it before marging to -> bool enableScheduledRequests = account()->capabilities().scheduling();
bool enableScheduledRequests = true;

PropagateFiles* filesJob = new PropagateFiles(this);
foreach(const SyncFileItemPtr &item, items) {

if (!removedDirectory.isEmpty() && item->_file.startsWith(removedDirectory)) {
Expand Down Expand Up @@ -389,6 +396,9 @@ void OwncloudPropagator::start(const SyncFileItemVector& items)
currentDirJob->append(dir);
}
directories.push(qMakePair(item->destination() + "/" , dir));
} else if (enableScheduledRequests
&& (item->_instruction == CSYNC_INSTRUCTION_NEW || item->_instruction == CSYNC_INSTRUCTION_SYNC)) {
filesJob->append(item);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes one purpose of the PropagateDirectory structure: update the directory's etag in the database only once child files have been synced properly.

I can't wrap my head about the conditions where this could be an issue, but @ogoffart should know if this could cause concrete problems.

See e.g.

// For new directories we always want to update the etag once

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing out, did not know about this, dont think it is difficult to resolve but let @ogoffart see this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mrow4a @jturcotte @ogoffart If I'm not mistaken the following is a situation in which that dependency structure could lead to an abort-related problem:

Initial state in db and server:
/  - "etag/1"
/A - "etagA1"
/A/F - "etagF1"

now when someone touches /A/F on the server, the server state becomes

Server state, after touching /A/F
/  - "etag/2"
/A - "etagA2"
/A/F - "etagF2"

but if I understand correctly the propagation dependency graph is

PropagateDirectory (/)
 |- PropagateDirectory (/A)
 |- PropagateFiles
    |- PropagateItem (/A/F)

meaning that /A and /A/F run independently of each other. So it would be possible to completely finish propagating /A before the file transfer /A/F is done. Then aborting the sync run could lead to the db tree

/  - "etag/1"
/A - "etagA2"
/A/F - "etagF1"

Which means a follow up sync would probably not pick up on /A/F being out of date.

Maybe this hints at a second dependency problem: Currently local and remote MkDir are FullParallelism - so isn't there a chance that with this change one could run into a case where the client wants to download or upload into a non-existant directory?

Possibly I'm missing something, feel free to point out incorrectness.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it wont upload to non-existent directory, because directory structure / resolving conflicts etc are done before any transfers, and directory deletions are done after https://github.com/owncloud/client/pull/5440/files#diff-c9731f430e8a29b13deaaf73bc4a4e22R56
https://github.com/owncloud/client/pull/5440/files#diff-20b960bb10cf3c0781a80fb3e5775241R493
https://github.com/owncloud/client/pull/5440/files#diff-7e5082f89a138020f2b1d37fc97d17dbR420

About the etag propagation did not look into the problem yet, because we dont have yet smashbox automation for PRs (it is nearly done) and I am busy on the server side before the release.

Copy link
Contributor

@ckamm ckamm Mar 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mrow4a Okay, I didn't see that. (btw, _runningNow is gone, so this branch doesn't compile - probably was rebased at some point?) In that case the second problem can indeed not happen.

} else if (PropagateItemJob* current = createJob(item)) {
if (item->_instruction == CSYNC_INSTRUCTION_TYPE_CHANGE) {
// will delete directories, so defer execution
Expand All @@ -400,6 +410,13 @@ void OwncloudPropagator::start(const SyncFileItemVector& items)
}
}

if (enableScheduledRequests && !filesJob->isEmpty()){
// This job has parallelism WaitForFinished to allow directoriesToRemove be last
_rootJob->append(filesJob);
} else {
delete filesJob;
}

foreach(PropagatorJob* it, directoriesToRemove) {
_rootJob->append(it);
}
Expand Down Expand Up @@ -459,6 +476,20 @@ quint64 OwncloudPropagator::chunkSize()
return chunkSize;
}

quint64 OwncloudPropagator::smallFileSize()
{
// A small filesize item is a file whose transfer time
// typically will be lower than its bookkeeping time.
static uint smallFileSize;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer explicit = 0 here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, copy from ChunkingNG

if (!smallFileSize) {
smallFileSize = qgetenv("OWNCLOUD_SMALLFILE_SIZE").toUInt();
if (smallFileSize == 0) {
ConfigFile cfg;
smallFileSize = cfg.smallFileSize();
}
}
return smallFileSize;
}

bool OwncloudPropagator::localFileNameClash( const QString& relFile )
{
Expand Down
106 changes: 105 additions & 1 deletion src/libsync/owncloudpropagator.h
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,6 @@ class PropagateIgnoreJob : public PropagateItemJob {
class OwncloudPropagator : public QObject {
Q_OBJECT

PropagateItemJob *createJob(const SyncFileItemPtr& item);
QScopedPointer<PropagateDirectory> _rootJob;

public:
Expand Down Expand Up @@ -322,6 +321,7 @@ class OwncloudPropagator : public QObject {

/** returns the size of chunks in bytes */
static quint64 chunkSize();
static quint64 smallFileSize();

AccountPtr account() const;

Expand All @@ -337,7 +337,11 @@ class OwncloudPropagator : public QObject {
*/
DiskSpaceResult diskSpaceCheck() const;

PropagateItemJob *createJob(const SyncFileItemPtr& item);

int runningAtRootJob(){
return _rootJob.data()->_runningNow;
}

private slots:

Expand Down Expand Up @@ -410,6 +414,106 @@ private slots:
void slotPollFinished();
};

/**
* @brief The PropagateFiles class is a container class.
*
* It will also ensure proper bandwidth utilization vs bookkeeping balance
*
* @ingroup libsync
*
* State Machine:
*
* _________________________________________________ ___________________________________________
* | | |
* | (Empty DB items queue and populated Data items queue?) |
* | | | |
* | Yes | | No |
* | | | |
* |<-----------[Schedule Data job] (Empty Data items queue and populated DB items queue?) |
* | | | |
* | No | | Yes |
* | | | |
* | | [Schedule DB job]--------------->|
* | | |
* | | |
* | (Populated Data items queue and populated DB items queue?) |
* | | | |
* | Yes | | No |
* | | | |
* | (Active running Data items number exceeded limit?) [Finish - no items] |
* | | | |
* | No | | Yes |
* | | | |
* <---[Schedule Data job] [Schedule DB job]------------------------------------------>
*
*
*/
class PropagateFiles : public PropagatorJob {
Q_OBJECT
public:
QVector<PropagatorJob *> _subJobs;

QVector<SyncFileItemPtr> _syncDBItems; // Items which bookkeeping on the server is longer then the transfer of its payload

QVector<SyncFileItemPtr> _syncDataItems; // Items which transfer of the payload is longer then bookkeeping on the server

int _jobsFinished; // number of jobs that have completed
SyncFileItem::Status _hasError; // NoStatus, or NormalError / SoftError if there was an error
int _firstUnfinishedSubJob;
int _totalItems;
int _activeDBJobsNow;
int _activeDataJobsNow;

explicit PropagateFiles(OwncloudPropagator *propagator)
: PropagatorJob(propagator)
, _jobsFinished(0), _hasError(SyncFileItem::NoStatus), _firstUnfinishedSubJob(0), _totalItems(0), _activeDBJobsNow(0), _activeDataJobsNow(0) { }

virtual ~PropagateFiles() {
qDeleteAll(_subJobs);
}

bool isEmpty() {
return _syncDBItems.isEmpty() && _syncDataItems.isEmpty();
}

void append(const SyncFileItemPtr &item);
virtual bool scheduleNextJob() Q_DECL_OVERRIDE;
bool scheduleNewJob(QVector<SyncFileItemPtr> &syncJobs);
bool scheduleNextItem();

virtual void abort() Q_DECL_OVERRIDE {
foreach (PropagatorJob *n, _subJobs)
n->abort();
}

void finalize();

qint64 committedDiskSpace() const Q_DECL_OVERRIDE;

JobParallelism parallelism() Q_DECL_OVERRIDE { return OCC::PropagatorJob::WaitForFinished; }

private slots:
bool possiblyRunNextJob(PropagatorJob *next) {
if (next->_state == NotYetStarted) {
connect(next, SIGNAL(finished(SyncFileItem::Status)), this, SLOT(slotSubJobFinished(SyncFileItem::Status)), Qt::QueuedConnection);
connect(next, SIGNAL(itemCompleted(const SyncFileItem &, const PropagatorJob &)),
this, SIGNAL(itemCompleted(const SyncFileItem &, const PropagatorJob &)));
connect(next, SIGNAL(progress(const SyncFileItem &,quint64)), this, SIGNAL(progress(const SyncFileItem &,quint64)));
connect(next, SIGNAL(ready()), this, SIGNAL(ready()));

PropagateItemJob *job = qobject_cast<PropagateItemJob *>(next);
if(job->_item->_size <= propagator()->smallFileSize()){
_activeDBJobsNow++;
} else {
_activeDataJobsNow++;
}
}
return next->scheduleNextJob();
}

void slotSubJobFinished(SyncFileItem::Status status);
};

}

#endif
2 changes: 1 addition & 1 deletion src/libsync/propagatedownload.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class PropagateDownloadFile : public PropagateItemJob {
qint64 committedDiskSpace() const Q_DECL_OVERRIDE;

// We think it might finish quickly because it is a small file.
bool isLikelyFinishedQuickly() Q_DECL_OVERRIDE { return _item->_size < 100*1024; }
bool isLikelyFinishedQuickly() Q_DECL_OVERRIDE { return _item->_size < propagator()->smallFileSize(); }

/**
* Whether an existing folder with the same name may be deleted before
Expand Down
Loading