Skip to content

Commit

Permalink
Support role auth for crr retry
Browse files Browse the repository at this point in the history
Forward port of f2521c6

Issue: BB-544
  • Loading branch information
Kerkesni committed Oct 16, 2024
1 parent 31d9ca6 commit 7a592d6
Show file tree
Hide file tree
Showing 12 changed files with 584 additions and 159 deletions.
3 changes: 2 additions & 1 deletion extensions/replication/tasks/UpdateReplicationStatus.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,12 @@ class UpdateReplicationStatus extends BackbeatTask {
const bucket = queueEntry.getBucket();
const objectKey = queueEntry.getObjectKey();
const versionId = queueEntry.getEncodedVersionId();
const role = queueEntry.getReplicationRoles()?.split(',')[0];
const score = Date.now();
const latestHour = this.statsClient.getSortedSetCurrentHour(score);
const message = {
key: getSortedSetKey(site, latestHour),
member: getSortedSetMember(bucket, objectKey, versionId),
member: getSortedSetMember(bucket, objectKey, versionId, role),
score,
};
this.failedCRRProducer.publishFailedCRREntry(JSON.stringify(message));
Expand Down
16 changes: 14 additions & 2 deletions extensions/replication/utils/ObjectFailureEntry.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
const querystring = require('querystring');

/**
* Class used to create an entry from the cached Redis key.
*/
Expand All @@ -11,11 +13,12 @@ class ObjectFailureEntry {
constructor(member, sitename) {
this.member = member;
const schema = this.member.split(':');
const [bucket, objectKey, encodedVersionId] = schema;
const [bucket, objectKey, encodedVersionId, encodedRole] = schema;
this.bucket = bucket;
this.objectKey = objectKey;
this.encodedVersionId = encodedVersionId;
this.sitename = sitename;
this.role = encodedRole ? querystring.unescape(encodedRole) : '';
}

getBucket() {
Expand All @@ -34,8 +37,17 @@ class ObjectFailureEntry {
return this.sitename;
}

getReplicationRoles() {
return this.role;
}

getMember() {
return `${this.bucket}:${this.objectKey}:${this.encodedVersionId}`;
const member =
`${this.bucket}:${this.objectKey}:${this.encodedVersionId}`;
if (this.role) {
return `${member}:${this.role}`;
}
return member;

Check warning on line 50 in extensions/replication/utils/ObjectFailureEntry.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

extensions/replication/utils/ObjectFailureEntry.js#L50

Added line #L50 was not covered by tests
}

getLogInfo() {
Expand Down
37 changes: 28 additions & 9 deletions lib/api/BackbeatAPI.js
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ class BackbeatAPI {
* @return {Object|null} - The error object or `null` if no error
*/
validateQuery(bbRequest) {
const { marker, sitename } = bbRequest.getRouteDetails();
const { marker, sitename, role } = bbRequest.getRouteDetails();
if (marker !== undefined && Number.isNaN(Number.parseInt(marker, 10))) {
return errors.InvalidQueryParameter
.customizeDescription('marker must be a number');
Expand All @@ -153,6 +153,10 @@ class BackbeatAPI {
return errors.InvalidQueryParameter
.customizeDescription('must be a non-empty string');
}
if (role !== undefined && role === '') {
return errors.InvalidQueryParameter
.customizeDescription('role must be a non-empty string');
}
return null;
}

Expand Down Expand Up @@ -274,7 +278,7 @@ class BackbeatAPI {
// if rDetails has a status property
if (rDetails.status) {
if (rDetails.status === 'failed') {
const { extension, marker, sitename, bucket, key, versionId } =
const { extension, marker, sitename, bucket, key, versionId, role } =
rDetails;
const type = (bucket && key && versionId) ? 'specific' : 'all';
filteredRoutes = filteredRoutes.filter(r => {
Expand All @@ -295,6 +299,9 @@ class BackbeatAPI {
// this is optional for backward compatibility
addKeys.sitename = sitename;
}
if (role) {
addKeys.role = role;
}
} else {
// currently only pause/resume (for both crr and ingestion)
filteredRoutes = filteredRoutes.filter(r =>
Expand Down Expand Up @@ -409,7 +416,11 @@ class BackbeatAPI {
objectKey: entry.getObjectKey(),
versionId: entry.getEncodedVersionId(),
};
return this._backbeatMetadataProxy
const backbeatMetadataProxy = this._backbeatMetadataProxy;
if (this._repConfig.source.auth.type === 'role') {
backbeatMetadataProxy.setupSourceRole(entry, log);
}
return backbeatMetadataProxy
.setSourceClient(log)
.getMetadata(params, log, (err, res) => {
if (err) {
Expand Down Expand Up @@ -445,14 +456,19 @@ class BackbeatAPI {
// retry and filter it in the response.
return next();
}
return next(null, {
const response = {
Bucket: queueEntry.getBucket(),
Key: queueEntry.getObjectKey(),
VersionId: queueEntry.getEncodedVersionId(),
StorageClass: entry.getSite(),
Size: queueEntry.getContentLength(),
LastModified: queueEntry.getLastModified(),
});
};
if (this._repConfig.source.auth.type === 'role') {
response.Role =
queueEntry.getReplicationRoles().split(',')[0];
}
return next(null, response);
}),
(err, results) => {
if (err) {
Expand Down Expand Up @@ -486,13 +502,13 @@ class BackbeatAPI {
* @return {undefined}
*/
getFailedCRR(details, cb) {
const { bucket, key } = details;
const { bucket, key, role } = details;
let { versionId } = details;
// If the original CRR was on a bucket without `versioning` enabled
// (i.e. an NFS bucket), maintain the Redis key schema by using and
// empty string.
versionId = versionId === undefined ? '' : versionId;
const member = getSortedSetMember(bucket, key, versionId);
const member = getSortedSetMember(bucket, key, versionId, role);
return this._getEntriesAcrossSites(member, (err, entries) =>
this._getFailedCRRResponse(undefined, entries, cb));
}
Expand Down Expand Up @@ -953,8 +969,8 @@ class BackbeatAPI {
}
const entries = [];
return async.eachLimit(reqBody, 10, (o, next) => {
const { Bucket, Key, VersionId, StorageClass, ForceRetry } = o;
const member = getSortedSetMember(Bucket, Key, VersionId);
const { Bucket, Key, VersionId, StorageClass, ForceRetry, Role } = o;
const member = getSortedSetMember(Bucket, Key, VersionId, Role);
// The `ForceRetry` field allows for queueing a retry regardless of
// whether it is failed.
if (ForceRetry) {
Expand Down Expand Up @@ -1061,6 +1077,9 @@ class BackbeatAPI {
return true;
}
const requiredProperties = ['Bucket', 'Key', 'StorageClass'];
if (this._repConfig.source.auth.type === 'role') {
requiredProperties.push('Role');
}
requiredProperties.find(prop => {
if (typeof o[prop] !== 'string' || o[prop] === '') {
errMsg = `${msg}: ${prop} must be a non-empty string`;
Expand Down
3 changes: 2 additions & 1 deletion lib/api/BackbeatRequest.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,13 @@ class BackbeatRequest {
*/
_parseCRRRoutes(parts, query) {
if (parts[1] && parts[1] === 'failed') {
const { versionId, marker, sitename } = querystring.parse(query);
const { versionId, role, marker, sitename } = querystring.parse(query);
this._routeDetails.extension = parts[0];
this._routeDetails.status = parts[1];
this._routeDetails.bucket = parts[2];
this._routeDetails.key = parts.slice(3).join('/');
this._routeDetails.versionId = versionId;
this._routeDetails.role = role;
this._routeDetails.marker = marker;
this._routeDetails.sitename = sitename;
} else {
Expand Down
11 changes: 9 additions & 2 deletions lib/util/sortedSetHelper.js
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
const querystring = require('querystring');

const { redisKeys } = require('../../extensions/replication/constants');

/**
* Returns the schema used for failed CRR entry Redis sorted set member.
* @param {String} bucket - The name of the bucket
* @param {String} key - The name of the key
* @param {String|undefined} [versionId] - The encoded version ID
* @param {String|undefined} [role] - The source role used for CRR
* @return {String} - The sorted set member used for the failed CRR entry
*/
function getSortedSetMember(bucket, key, versionId) {
function getSortedSetMember(bucket, key, versionId, role) {
// If the original CRR was on a bucket without versioning enabled (i.e. an
// NFS bucket), maintain the Redis key schema by using and empty string.
const schemaVersionId = versionId === undefined ? '' : versionId;
return `${bucket}:${key}:${schemaVersionId}`;
if (!role) {
return `${bucket}:${key}:${schemaVersionId}`;
}
const schemaRole = querystring.escape(role);
return `${bucket}:${key}:${schemaVersionId}:${schemaRole}`;
}

/**
Expand Down
6 changes: 5 additions & 1 deletion tests/functional/replication/queueProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -1360,10 +1360,14 @@ describe('queue processor functional tests with mocking', () => {
const parsedFailedQueueMessage = JSON.parse(message);
assert(parsedFailedQueueMessage.key.startsWith(
`${replicationConstants.redisKeys.failedCRR}:${queueProcessorSF.site}:`));
const role = querystring.escape(
s3mock.getParam('source.md.replicationInfo.role').split(',')[0]
);
assert.strictEqual(
parsedFailedQueueMessage.member,
`${s3mock.getParam('source.bucket')}:${s3mock.getParam('key')}`
+ `:${s3mock.getParam('versionIdEncoded')}`);
+ `:${s3mock.getParam('versionIdEncoded')}`
+ `:${role}`);
failedCRRProducer.publishFailedCRREntry = origPublishFailedMethod;
done();
};
Expand Down
Loading

0 comments on commit 7a592d6

Please sign in to comment.