Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ft: ZENKO-1019 delete scheduled resume #405

Merged
merged 2 commits into from
Aug 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion docs/crr-pause-resume.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Cross-Region Replication (CRR) Manual Pause and Resume
# Cross-Region Replication (CRR) Pause and Resume

## Description

Expand Down Expand Up @@ -145,3 +145,16 @@ is resumed, it again resumes consuming entries from its last offset.
```json
{}
```

* DELETE `/_/backbeat/api/crr/resume/<location-name>/schedule`

This is a DELETE request to remove a scheduled resume for cross-region
replication to a specified location configured as a destination replication
endpoint.
Specify "all" as a location name to make this request to all available
destinations.

Response:
```json
{}
```
24 changes: 24 additions & 0 deletions extensions/replication/queueProcessor/QueueProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ class QueueProcessor extends EventEmitter {
const validActions = {
pauseService: this._pauseService.bind(this),
resumeService: this._resumeService.bind(this),
deleteScheduledResumeService:
this._deleteScheduledResumeService.bind(this),
};
try {
const { action, date } = JSON.parse(message);
Expand Down Expand Up @@ -304,6 +306,28 @@ class QueueProcessor extends EventEmitter {
}
}

/**
* Delete scheduled resume (if any)
* @return {undefined}
*/
_deleteScheduledResumeService() {
this._updateZkStateNode('scheduledResume', null, err => {
if (err) {
this.logger.trace('error occurred saving state to zookeeper', {
method: 'QueueProcessor._deleteScheduledResumeService',
});
} else if (this.scheduledResume) {
this.scheduledResume.cancel();
this.scheduledResume = null;
this.logger.info('deleted scheduled CRR resume for location:' +
` ${this.site}`);
} else {
this.logger.info('no scheduled CRR resume for location: ' +
` ${this.site}`);
}
});
}

_getZkSiteNode() {
return `${zookeeperReplicationNamespace}${ZK_CRR_STATE_PATH}/` +
`${this.site}`;
Expand Down
25 changes: 25 additions & 0 deletions lib/api/BackbeatAPI.js
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,7 @@ class BackbeatAPI {
}
try {
reqBody = JSON.parse(body);
// default 6 hours if no body value sent by user
if (!reqBody.hours) {
return defaultRes;
}
Expand Down Expand Up @@ -797,6 +798,30 @@ class BackbeatAPI {
return cb(null, {});
}

/**
* Remove CRR scheduled resume for given site(s)
* @param {Object} details - The route details
* @param {Function} cb - The callback to call
* @return {undefined}
*/
deleteScheduledResumeService(details, cb) {
let sites;
if (details.site === 'all') {
sites = details.extensions.crr.filter(s => s !== 'all');
} else {
sites = [details.site];
}
sites.forEach(site => {
const channel = `${this._crrTopic}-${site}`;
const message = JSON.stringify({
action: 'deleteScheduledResumeService',
});
this._redisPublisher.publish(channel, message);
});
this._logger.info(`deleted scheduled resume for locations: ${sites}`);
return cb(null, {});
}

/**
* Helper method to get zookeeper state details for given site(s)
* @param {Object} details - The route details
Expand Down
2 changes: 1 addition & 1 deletion lib/api/BackbeatServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class BackbeatServer {
backbeatRequest);
}

const validMethods = ['GET', 'POST'];
const validMethods = ['GET', 'POST', 'DELETE'];
if (!validMethods.includes(req.method)) {
return this._errorResponse(errors.MethodNotAllowed,
backbeatRequest);
Expand Down
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
},
"homepage": "https://github.com/scality/backbeat#readme",
"dependencies": {
"arsenal": "scality/Arsenal#06dfdd9",
"arsenal": "scality/Arsenal#79ed68c",
"async": "^2.3.0",
"aws-sdk": "2.147.0",
"backo": "^1.1.0",
Expand Down
40 changes: 32 additions & 8 deletions tests/functional/api/BackbeatServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ const zookeeper = require('node-zookeeper-client');
const { RedisClient, StatsModel } = require('arsenal').metrics;

const config = require('../../config.json');
const { makePOSTRequest, getResponseBody } =
require('../utils/makePOSTRequest');
const { makeRequest, getResponseBody } =
require('../utils/makeRequest');
const S3Mock = require('../utils/S3Mock');
const VaultMock = require('../utils/VaultMock');
const redisConfig = { host: '127.0.0.1', port: 6379 };
Expand Down Expand Up @@ -57,7 +57,7 @@ function makeRetryPOSTRequest(body, cb) {
method: 'POST',
path: '/_/crr/failed',
});
makePOSTRequest(options, body, cb);
makeRequest(options, body, cb);
}

function getRequest(path, done) {
Expand Down Expand Up @@ -1221,7 +1221,7 @@ describe('Backbeat Server', () => {
method: 'POST',
path: '/_/crr/pause',
});
makePOSTRequest(options, emptyBody, err => {
makeRequest(options, emptyBody, err => {
assert.ifError(err);

setTimeout(() => {
Expand All @@ -1247,7 +1247,7 @@ describe('Backbeat Server', () => {
method: 'POST',
path: `/_/crr/pause/${firstSite}`,
});
makePOSTRequest(options, emptyBody, err => {
makeRequest(options, emptyBody, err => {
assert.ifError(err);

setTimeout(() => {
Expand All @@ -1270,7 +1270,7 @@ describe('Backbeat Server', () => {
method: 'POST',
path: '/_/crr/resume',
});
makePOSTRequest(options, emptyBody, err => {
makeRequest(options, emptyBody, err => {
assert.ifError(err);

setTimeout(() => {
Expand Down Expand Up @@ -1309,7 +1309,7 @@ describe('Backbeat Server', () => {
path: `/_/crr/resume/${firstSite}/schedule`,
});
const body = JSON.stringify({ hours: 1 });
makePOSTRequest(options, body, (err, res) => {
makeRequest(options, body, (err, res) => {
assert.ifError(err);
setTimeout(() => {
getResponseBody(res, err => {
Expand Down Expand Up @@ -1337,7 +1337,7 @@ describe('Backbeat Server', () => {
method: 'POST',
path: `/_/crr/resume/${firstSite}/schedule`,
});
makePOSTRequest(options, emptyBody, err => {
makeRequest(options, emptyBody, err => {
assert.ifError(err);

setTimeout(() => {
Expand All @@ -1360,6 +1360,30 @@ describe('Backbeat Server', () => {
});
});

it('should remove a scheduled resume request when receiving a DELETE ' +
`request to route /_/crr/resume/${secondSite}/schedule`, done => {
const options = Object.assign({}, defaultOptions, {
method: 'DELETE',
path: `/_/crr/resume/${secondSite}/schedule`,
});
makeRequest(options, emptyBody, (err, res) => {
assert.ifError(err);

setTimeout(() => {
getResponseBody(res, err => {
assert.ifError(err);

assert.strictEqual(cache2.length, 1);

const message = JSON.parse(cache2[0].message);
assert.equal('deleteScheduledResumeService',
message.action);
done();
});
});
});
});

it('should receive a status request on all site channels from route ' +
'/_/crr/status', done => {
getRequest('/_/crr/status', (err, res) => {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
const http = require('http');

function makePOSTRequest(options, body, cb) {
function makeRequest(options, body, cb) {
const req = http.request(options, res => cb(null, res));
req.on('error', err => cb(err));
req.end(body);
Expand All @@ -14,4 +14,4 @@ function getResponseBody(res, cb) {
res.on('error', err => cb(err));
}

module.exports = { makePOSTRequest, getResponseBody };
module.exports = { makeRequest, getResponseBody };