Skip to content

Commit

Permalink
Merge branch 'group_schedule' into gs_review
Browse files Browse the repository at this point in the history
  • Loading branch information
mrow4a authored Jan 9, 2017
2 parents 6429aa4 + 3701299 commit bc5f7d4
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 48 deletions.
38 changes: 21 additions & 17 deletions src/libsync/owncloudpropagator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -391,21 +391,25 @@ void OwncloudPropagator::start(const SyncFileItemVector& items)
}
directories.push(qMakePair(item->destination() + "/" , dir));
} else {
// Ensure that only files under or equal to chunk size are being inserted to Normal Upload
if (enableBundledRequests && item->_size <= chunkSize()
&& item->_instruction == CSYNC_INSTRUCTION_NEW
&& item->_direction == SyncFileItem::Up ) {
// Get PropagateNormalUpload container job
PropagateNormalUpload* bundleJob = 0;
if (directories.top().second->_bundledUploadJob.isNull()) {
bundleJob = new PropagateNormalUpload(this);
directories.top().second->_bundledUploadJob.reset(bundleJob);
// Ensure that only new files are inserted into PropagateFiles
if (enableBundledRequests && item->_instruction == CSYNC_INSTRUCTION_NEW) {
// Get PropagateFiles container job
PropagateFiles* filesJob = 0;
if (directories.top().second->_filesJob.isNull()) {
filesJob = new PropagateFiles(this);
directories.top().second->_filesJob.reset(filesJob);
} else {
bundleJob = qobject_cast<PropagateNormalUpload*>(directories.top().second->_bundledUploadJob.data());
filesJob = qobject_cast<PropagateFiles*>(directories.top().second->_filesJob.data());
}

// Append Upload job
bundleJob->append(item);
// Allow PropagateFiles to create PropagateItemJob from under chunk size
// items (since these are performance critical and can be handled differently)
if (item->_size <= chunkSize()
&& item->_direction == SyncFileItem::Up ){
filesJob->append(item);
} else if (PropagateItemJob* current = createJob(item)){
filesJob->append(current);
}
} else if (PropagateItemJob* current = createJob(item)) {
if (item->_instruction == CSYNC_INSTRUCTION_TYPE_CHANGE) {
// will delete directories, so defer execution
Expand Down Expand Up @@ -640,10 +644,10 @@ bool PropagateDirectory::scheduleNextJob()
if (_state == NotYetStarted) {
_state = Running;

if(_bundledUploadJob){
// PropagateNormalUpload is not a standard job, since it is an abstract object
PropagateNormalUpload* bundle = qobject_cast<PropagateNormalUpload*>(_bundledUploadJob.take());
append(bundle);
if(_filesJob){
// PropagateFiles is not a standard job, since it is an abstract object
PropagateFiles* files = qobject_cast<PropagateFiles*>(_filesJob.take());
append(files);
}

if (!_firstJob && _subJobs.isEmpty()) {
Expand All @@ -654,7 +658,7 @@ bool PropagateDirectory::scheduleNextJob()
// At the beginning of the Directory Job, update the expected number of Jobs to be synced
_totalJobs = _subJobs.count();
if (_firstJob) {
// _firstJob is a standard job, since it does interact with the server
// _firstJob is a standard job, since it does interact with server
_propagator->_standardJobsCount++;
_totalJobs++;
}
Expand Down
6 changes: 3 additions & 3 deletions src/libsync/owncloudpropagator.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,13 +194,13 @@ class OWNCLOUDSYNC_EXPORT PropagateDirectory : public PropagatorJob {
QScopedPointer<PropagateItemJob> _firstJob;

// e.g: create class which will handle bundled uploads and bandwidth utilization vs bookkeeping balance
QScopedPointer<PropagatorJob> _bundledUploadJob;
QScopedPointer<PropagatorJob> _filesJob;

// all the sub files or sub directories.
QVector<PropagatorJob *> _subJobs;

// all the finished sub PropagatorJob items which are not PropagateItemJob.
// one might need PropagatorJobs (PropagateDirectory, PropagateNormalUpload)
// one might need PropagatorJobs (PropagateDirectory, PropagateFiles)
QVector<PropagatorJob *> _finishedSubJobs;

SyncFileItemPtr _item;
Expand All @@ -212,7 +212,7 @@ class OWNCLOUDSYNC_EXPORT PropagateDirectory : public PropagatorJob {

explicit PropagateDirectory(OwncloudPropagator *propagator, const SyncFileItemPtr &item = SyncFileItemPtr(new SyncFileItem))
: PropagatorJob(propagator)
, _firstJob(0), _bundledUploadJob(0), _item(item), _jobsFinished(0), _runningNow(0), _hasError(SyncFileItem::NoStatus), _totalJobs(0)
, _firstJob(0), _filesJob(0), _item(item), _jobsFinished(0), _runningNow(0), _hasError(SyncFileItem::NoStatus), _totalJobs(0)

{ }

Expand Down
9 changes: 5 additions & 4 deletions src/libsync/propagateupload.h
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ private slots:
};

/**
* @brief The PropagateUploadBundle class is a container class for upload jobs under chunking size.
* @brief The PropagateFiles class is a container class for upload jobs under chunking size.
*
* It will also ensure proper bandwidth utilization vs bookkeeping balance, and that in case no other items then under chunk uploads are available,
* it will parallelise itself into 3 flows.
Expand Down Expand Up @@ -356,7 +356,7 @@ private slots:
* <-[Schedule STANDARD job] ->[Try in next container]<-
*
*/
class PropagateNormalUpload : public PropagatorJob {
class PropagateFiles : public PropagatorJob {
Q_OBJECT
public:
// all the sub files which are equal to or smaller than _propagator->smallFileSize()
Expand All @@ -373,17 +373,18 @@ class PropagateNormalUpload : public PropagatorJob {
SyncFileItem::Status _hasError; // NoStatus, or NormalError / SoftError if there was an error
int _totalJobs;

explicit PropagateNormalUpload(OwncloudPropagator *propagator)
explicit PropagateFiles(OwncloudPropagator *propagator)
: PropagatorJob(propagator)
, _jobsFinished(0), _runningNow(0), _hasError(SyncFileItem::NoStatus), _totalJobs(0) { }

virtual ~PropagateNormalUpload() {
virtual ~PropagateFiles() {
qDeleteAll(_dbJobs);
qDeleteAll(_standardJobs);
qDeleteAll(_finishedSubJobs);
}

void append(const SyncFileItemPtr &item);
void append(PropagateItemJob* subJob);
bool scheduleNextJobRoutine(QVector<PropagatorJob *> &subJobs);
virtual bool scheduleNextJob() Q_DECL_OVERRIDE;

Expand Down
56 changes: 32 additions & 24 deletions src/libsync/propagateuploadbundle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@
*/

#include "propagateupload.h"
#include "propagatedownload.h"

namespace OCC {


qint64 PropagateNormalUpload::committedDiskSpace() const
qint64 PropagateFiles::committedDiskSpace() const
{
qint64 needed = 0;
foreach (PropagatorJob* job, _dbJobs) {
Expand All @@ -29,21 +30,43 @@ qint64 PropagateNormalUpload::committedDiskSpace() const
return needed;
}

bool PropagateNormalUpload::scheduleNextJobRoutine(QVector<PropagatorJob *> &subJobs) {
void PropagateFiles::append(PropagateItemJob* subJob) {
Q_ASSERT(!(subJob->_item->_size <= _propagator->chunkSize() && subJob->_item->_direction == SyncFileItem::Up));
_propagator->_standardJobsCount++; // This item is not a small upload file, so it is standard
_standardJobs.append(subJob);
}

void PropagateFiles::append(const SyncFileItemPtr &item) {
// In case of bundles, in here we should append BundledUpload jobs with new files to the .top() of _subJobs until chunking size is reached.
// The role of this class is also to control how much data is going into the container class.
// In version 1.0 append just PUTs

Q_ASSERT((item->_size <= _propagator->chunkSize() && item->_direction == SyncFileItem::Up));
PropagateUploadFileV1* subJob = new PropagateUploadFileV1(_propagator, item);
if (item->_size <= _propagator->smallFileSize()){
_propagator->_dbJobsCount++; // Db operations for this item take considerably longer then any other factors.
_dbJobs.append(subJob);
} else {
_propagator->_standardJobsCount++; // This item is not a small upload file, so it is standard
_standardJobs.append(subJob);
}
}

bool PropagateFiles::scheduleNextJobRoutine(QVector<PropagatorJob *> &subJobs) {
QMutableVectorIterator<PropagatorJob *> subJobsIterator(subJobs);
while (subJobsIterator.hasNext()) {
subJobsIterator.next();
// Get the state of the sub job pointed at by call next()
if (subJobsIterator.value()->_state == Finished) {
// If this item is finished, remove it from the _subJobs list as it is not needed anymore
// If this items is finished, remove it from the _subJobs list as it is not needed anymore
// Note that in this case remove() from QVector will just perform memmove of pointer array items.
PropagatorJob * job = subJobsIterator.value();
subJobsIterator.remove();

// In this case we dont delete the job, but save it in the _finishedSubJobs queue
// In this case de dont delete the job, but save it in the _finishedSubJobs queue
// We might need this job in slotSubJobFinished
// The PropagateNormalUpload class will be destroyed in PropagateDirectory
// when it will detect that we finished PropagateNormalUpload
// The PropagateFiles class will be destroyed in PropagateDirectory
// when it will detect that we finished PropagateFiles
_finishedSubJobs.append(job);
continue;
}
Expand All @@ -57,22 +80,7 @@ bool PropagateNormalUpload::scheduleNextJobRoutine(QVector<PropagatorJob *> &sub
return false;
}

void PropagateNormalUpload::append(const SyncFileItemPtr &item) {
// In case of bundles, in here we should append BundledUpload jobs with new files to the .top() of _subJobs until chunking size is reached.
// The role of this class is also to control how much data is going into the container class.
// In version 1.0 append just PUTs

PropagateUploadFileV1* subJob = new PropagateUploadFileV1(_propagator, item);
if (item->_size <= _propagator->smallFileSize()){
_propagator->_dbJobsCount++; // Db operations for this item take considerably longer then any other factors.
_dbJobs.append(subJob);
} else {
_propagator->_standardJobsCount++; // This item is not a small upload file, so it is standard.
_standardJobs.append(subJob);
}
}

bool PropagateNormalUpload::scheduleNextJob()
bool PropagateFiles::scheduleNextJob()
{
if (_state == Finished) {
return false;
Expand Down Expand Up @@ -127,7 +135,7 @@ bool PropagateNormalUpload::scheduleNextJob()
}


void PropagateNormalUpload::slotSubJobFinished(SyncFileItem::Status status)
void PropagateFiles::slotSubJobFinished(SyncFileItem::Status status)
{
if (status == SyncFileItem::FatalError) {
abort();
Expand Down Expand Up @@ -164,7 +172,7 @@ void PropagateNormalUpload::slotSubJobFinished(SyncFileItem::Status status)
}
}

void PropagateNormalUpload::finalize()
void PropagateFiles::finalize()
{
_state = Finished;
emit finished(_hasError == SyncFileItem::NoStatus ? SyncFileItem::Success : _hasError);
Expand Down

0 comments on commit bc5f7d4

Please sign in to comment.