Skip to content
This repository has been archived by the owner on Jun 2, 2024. It is now read-only.

feat: add changes stream syncer #970

Merged
merged 2 commits into from
Jun 25, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 7 additions & 15 deletions common/logger.js
Original file line number Diff line number Diff line change
@@ -1,20 +1,6 @@
/**!
* cnpmjs.org - common/logger.js
*
* Copyright(c) cnpmjs.org and other contributors.
* MIT Licensed
*
* Authors:
* dead_horse <[email protected]> (http://deadhorse.me)
* fengmk2 <[email protected]> (http://fengmk2.github.com)
*/

'use strict';

/**
* Module dependencies.
*/

var debug = require('debug')('cnpmjs.org:logger');
var formater = require('error-formater');
var Logger = require('mini-logger');
var utility = require('utility');
Expand Down Expand Up @@ -50,6 +36,9 @@ logger.syncInfo = function () {
if (typeof args[0] === 'string') {
args[0] = util.format('[%s][%s] ', utility.logDate(), process.pid) + args[0];
}
if (debug.enabled) {
debug.apply(debug, args);
}
logger.sync_info.apply(logger, args);
};

Expand All @@ -58,5 +47,8 @@ logger.syncError =function () {
if (typeof args[0] === 'string') {
args[0] = util.format('[%s][%s] ', utility.logDate(), process.pid) + args[0];
}
if (debug.enabled) {
debug.apply(debug, args);
}
logger.sync_error.apply(logger, arguments);
};
18 changes: 5 additions & 13 deletions config/index.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,5 @@
/**
* Copyright(c) cnpmjs.org and other contributors.
* MIT Licensed
*
* Authors:
* dead_horse <[email protected]>
* fengmk2 <[email protected]> (http://fengmk2.com)
*/

'use strict';

/**
* Module dependencies.
*/

var mkdirp = require('mkdirp');
var copy = require('copy-to');
var path = require('path');
Expand All @@ -26,6 +13,7 @@ var dataDir = path.join(process.env.HOME || root, '.cnpmjs.org');

var config = {
version: version,
dataDir: dataDir,

/**
* Cluster mode
Expand Down Expand Up @@ -209,6 +197,10 @@ var config = {
// sync devDependencies or not, default is false
syncDevDependencies: false,

// changes streaming sync
syncChangesStream: false,
handleSyncRegistry: 'http://127.0.0.1:7001',

// badge subject on http://shields.io/
badgePrefixURL: 'https://img.shields.io/badge',
badgeSubject: 'cnpm',
Expand Down
13 changes: 0 additions & 13 deletions controllers/sync_module_worker.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,5 @@
/**!
* Copyright(c) cnpmjs.org and other contributors.
* MIT Licensed
*
* Authors:
* fengmk2 <[email protected]> (http://fengmk2.com)
* dead_horse <[email protected]> (http://deadhorse.me)
*/

'use strict';

/**
* Module dependencies.
*/

var debug = require('debug')('cnpmjs.org:sync_module_worker');
var co = require('co');
var gather = require('co-gather');
Expand Down
13 changes: 0 additions & 13 deletions controllers/total.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,5 @@
/**!
* Copyright(c) cnpm and other contributors.
* MIT Licensed
*
* Authors:
* fengmk2 <[email protected]> (http://fengmk2.com)
* dead_horse <[email protected]> (http://deadhorse.me)
*/

'use strict';

/**
* Module dependencies.
*/

const Total = require('../services/total');
const version = require('../package.json').version;
const config = require('../config');
Expand Down
13 changes: 0 additions & 13 deletions dispatch.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,5 @@
/**!
* Copyright(c) cnpmjs.org and other contributors.
* MIT Licensed
*
* Authors:
* dead_horse <[email protected]>
* fengmk2 <[email protected]> (http://fengmk2.com)
*/

'use strict';

/**
* Module dependencies.
*/

var childProcess = require('child_process');
var path = require('path');
var util = require('util');
Expand Down
3 changes: 3 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
},
"dependencies": {
"agentkeepalive": "~2.1.0",
"await-event": "^1.0.0",
"bytes": "~2.4.0",
"cfork": "~1.4.0",
"changes-stream": "^1.1.0",
"co": "~4.6.0",
"co-defer": "~1.0.0",
"co-gather": "~0.0.1",
Expand Down Expand Up @@ -44,6 +46,7 @@
"mkdirp": "~0.5.0",
"moment": "~2.12.0",
"mysql": "~2.10.2",
"mz": "^2.4.0",
"nodemailer": "~1.3.0",
"semver": "~5.1.0",
"sequelize": "~3.21.0",
Expand Down
75 changes: 75 additions & 0 deletions sync/changes_stream_syncer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
'use strict';

const ChangesStream = require('changes-stream');
const path = require('path');
const fs = require('mz/fs');
const os = require('os');
const urllib = require('urllib');
const streamAwait = require('await-event')
const logger = require('../common/logger');
const config = require('../config');

const db = 'https://replicate.npmjs.com';
const lastSeqFile = path.join(config.dataDir, '.cnpmjs.org.last_seq.txt');

module.exports = function* sync() {
const since = yield getLastSequence();
logger.syncInfo('start changes stream, since: %s', since);
const changes = new ChangesStream({
db,
since,
include_docs: false,
});
changes.await = streamAwait;
changes.on('data', change => {
logger.syncInfo('Get change: %j', change);
syncPackage(change);
});

yield changes.await('error');
};

function syncPackage(change) {
const url = `${config.handleSyncRegistry}/${change.id}/sync`;
urllib.request(url, {
method: 'PUT',
dataType: 'json',
timeout: 10000,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个接口只同步这个模块,不会同步他的依赖吧? 10s 够么?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

只是调用一下创建同步任务,不关心是否真正同步成功。即使失败,原来的定时同步在10分钟后也会确保同步成功。

Copy link
Member

@dead-horse dead-horse Jun 25, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

记起来了,查询是否成功日志的接口是另外的

}, (err, data, res) => {
if (err) {
logger.syncInfo('%s:%s PUT %s error: %s, retry after 5s',
change.seq, change.id, url, err);
logger.syncError(err);
syncPackage(change);
setTimeout(() => syncPackage(change), 5000);
} else {
saveLastSequence(change.seq);
logger.syncInfo('%s:%s sync request sent, log: %s/log/%s',
change.seq, change.id, url, data.logId);
}
});
}

function* getLastSequence() {
let lastSeq;
if (yield fs.exists(lastSeqFile)) {
lastSeq = yield fs.readFile(lastSeqFile, 'utf8');
lastSeq = Number(lastSeq);
}
if (!lastSeq) {
lastSeq = 2614765;
}
// const r = yield urllib.request(db, {
// dataType: 'json',
// timeout: 15000,
// });
// logger.syncInfo('get registry info: %j', r.data);
// if (lastSeq < r.data.update_seq) {
// lastSeq = r.data.update_seq;
// }
return lastSeq;
}

function saveLastSequence(seq) {
fs.writeFile(lastSeqFile, String(seq), () => {});
}
Loading