Skip to content

Commit

Permalink
feat: add support to auto replicate tables
Browse files Browse the repository at this point in the history
  • Loading branch information
limcross committed Nov 7, 2022
1 parent 65a8ea7 commit 395d7c1
Show file tree
Hide file tree
Showing 5 changed files with 250 additions and 61 deletions.
13 changes: 13 additions & 0 deletions src/_get-arc-options.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
module.exports = (arc) => {
let currentRegion
arc.aws.forEach((element) => {
if (element[0] == 'region') {
currentRegion = element[1]
}
})

return {
appName: arc.app[0],
currentRegion
}
}
23 changes: 23 additions & 0 deletions src/_get-multi-region-options.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
module.exports = (multiRegion) => {
if (!Array.isArray(multiRegion)) {
throw ReferenceError('Invalid multi region params')
}

let primaryRegion
if (Array.isArray(multiRegion[0]) && multiRegion[0][0] == 'primary') {
primaryRegion = multiRegion[0][1]
}
else {
throw ReferenceError('Invalid multi region params: Missing primary region')
}

let replicaRegions
if (multiRegion[1] !== undefined && Array.isArray(multiRegion[1].replicas)) {
replicaRegions = multiRegion[1].replicas
}
else {
throw ReferenceError('Invalid multi region params: Missing replica regions')
}

return { primaryRegion, replicaRegions }
}
64 changes: 64 additions & 0 deletions src/_get-replicated-tables.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// eslint-disable-next-line
let aws = require('aws-sdk') // Assume AWS-SDK is installed via Arc
let { toLogicalID } = require('@architect/utils')
let { updater } = require('@architect/utils')

module.exports = async (arc, stage, dryRun, appName, primaryRegion, currentRegion) => {
const update = updater('MultiRegion')
update.start(`Fetching replicated tables in the replica region (${currentRegion})...`)

let dynamoReplica = new aws.DynamoDB({ region: currentRegion }) // The current region is a replica region
let ssmPrimary = new aws.SSM({ region: primaryRegion })

let tableNames = []
arc.tables.forEach((table) => {
tableNames = tableNames.concat(Object.keys(table))
})

let tables = []
for (let tableName of tableNames) {
let PhysicalTableName // aka the physical table name
try {
// Arc app physical table names are stored in SSM service discovery
let Name = `/${toLogicalID(appName)}${toLogicalID(stage)}/tables/${tableName}`
let { Parameter } = await ssmPrimary.getParameter({ Name }).promise()
PhysicalTableName = Parameter.Value

let { Table } = await dynamoReplica.describeTable({ TableName: PhysicalTableName }).promise()
tables.push({
arn: Table.TableArn,
logicalName: tableName,
physicalName: PhysicalTableName,
})
}
catch (err) {
if (err.name === 'ParameterNotFound') {
const message = `${tableName} not found on ${currentRegion}`
if (dryRun) {
update.warn(`${message} (Maybe because is a dry-run)`)
}
else {
update.error(message)
throw (err)
}
}
else if (err.name === 'ResourceNotFoundException') {
const message = `DynamoDB table not found: ${PhysicalTableName}`
if (dryRun) {
update.warn(`${message} (Maybe because is a dry-run)`)
}
else {
update.error(message)
throw (err)
}
}
else {
throw (err)
}
}
}

update.done(`Replicated tables in replica region fetched`, tableNames)

return tables
}
112 changes: 112 additions & 0 deletions src/_update_replication.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// eslint-disable-next-line
let aws = require('aws-sdk') // Assume AWS-SDK is installed via Arc
let { toLogicalID } = require('@architect/utils')
let { updater } = require('@architect/utils')

module.exports = async (arc, stage, dryRun, appName, primaryRegion, replicaRegions, currentRegion) => {
const update = updater('MultiRegion')
const start = Date.now()
const done = () => update.done(`Replication updated in ${(Date.now() - start) / 1000} seconds`)
update.status(`Updating replication on primary region (${currentRegion})`)

let dynamoPrimary = new aws.DynamoDB({ region: primaryRegion })
let ssmPrimary = new aws.SSM({ region: primaryRegion })

let tableNames = []
arc.tables.forEach((table) => {
tableNames = tableNames.concat(Object.keys(table))
})

for (let tableName of tableNames) {
let PhysicalTableName
try {
// Arc app physical table names are stored in SSM service discovery
let Name = `/${toLogicalID(appName)}${toLogicalID(stage)}/tables/${tableName}`
let { Parameter } = await ssmPrimary.getParameter({ Name }).promise()
PhysicalTableName = Parameter.Value

let { Table } = await dynamoPrimary.describeTable({ TableName: PhysicalTableName }).promise()

let replicateUpdates = []

replicaRegions.forEach((replicaRegion) => {
if (!Table.Replicas || Table.Replicas.findIndex((replica) => replica.RegionName == replicaRegion) < 0) {
replicateUpdates.push({ Create: { RegionName: replicaRegion } })
}
})

if (Table.Replicas) {
Table.Replicas.forEach((replica) => {
if (!replicaRegions.includes(replica.RegionName)) {
replicateUpdates.push({ Delete: { RegionName: replica.RegionName } })
}
})
}

const createRegions = replicateUpdates.filter((param) => param.Create).map((param) => param.Create.RegionName)
const deleteRegions = replicateUpdates.filter((param) => param.Delete).map((param) => param.Delete.RegionName)

update.status(
`Initializing replication for table ${tableName}`,
`Creating replication on regions ... ${createRegions.length > 0 ? createRegions.join(',') : '(skipped)'}`,
`Deleting replication on regions ... ${deleteRegions.length > 0 ? deleteRegions.join(',') : '(skipped)'}`
)

update.start(`Replicating table ${tableName}...`)

if (replicateUpdates.length > 0 && !dryRun) {
try {
for (let replicateUpdate of replicateUpdates) {
await dynamoPrimary.updateTable({
TableName: PhysicalTableName,
ReplicaUpdates: [ replicateUpdate ]
}).promise()

do { // Wait to avoid errors with busy tables
await new Promise(r => setTimeout(r, 5000));
({ Table } = await dynamoPrimary.describeTable({ TableName: PhysicalTableName }).promise())
} while (
!Table.Replicas ||
Table.Replicas.findIndex((replica) => [ 'CREATING', 'UPDATING', 'DELETING' ].includes(replica.ReplicaStatus)) >= 0
)
}
}
catch (error) {
update.error(`While replicate table ${tableName} !`)
throw (error)
}
update.done(`Replication updated for table ${tableName}`)
}
else {
update.done(`Skipping replication update for table ${tableName}`)
}
}
catch (err) {
if (err.name === 'ParameterNotFound') {
const message = `${tableName} not found on ${currentRegion}`
if (dryRun) {
update.warn(`${message} (Maybe because is a dry-run)`)
}
else {
update.error(message)
throw (err)
}
}
else if (err.name === 'ResourceNotFoundException') {
const message = `DynamoDB table not found: ${PhysicalTableName}`
if (dryRun) {
update.warn(`${message} (Maybe because is a dry-run)`)
}
else {
update.error(message)
throw (err)
}
}
else {
throw (err)
}
}
}

done()
}
99 changes: 38 additions & 61 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,86 +1,48 @@
// eslint-disable-next-line
let aws = require('aws-sdk') // Assume AWS-SDK is installed via Arc
let { toLogicalID } = require('@architect/utils')
const { toLogicalID } = require('@architect/utils')
let getMultiRegionOptions = require('./_get-multi-region-options')
let getArcOptions = require('./_get-arc-options')
let updateReplication = require('./_update_replication')
let getReplicatedTables = require('./_get-replicated-tables')

module.exports = {
deploy: {
start: async ({ arc, cloudformation, stage }) => {
start: async ({ arc, cloudformation, stage, dryRun }) => {
const multiRegion = arc['arc-plugin-multi-region']
if (!multiRegion) return cloudformation

const appName = arc.app[0]
const primaryRegion = multiRegion[0][1]
const replicaRegions = multiRegion[1].replicas
const { primaryRegion, replicaRegions } = getMultiRegionOptions(multiRegion)
const { appName, currentRegion } = getArcOptions(arc)

let currentRegion
arc.aws.forEach((element) => {
if (element[0] == 'region') {
currentRegion = element[1]
}
})

if (primaryRegion == currentRegion) {
return
}
if (primaryRegion == currentRegion) return

if (!replicaRegions.includes(currentRegion)) {
throw Error(`The following region is not included in replica regions: ${currentRegion}`)
}

let dynamoReplica = new aws.DynamoDB()
let ssmPrimary = new aws.SSM({ region: primaryRegion })

let tableNames = []
arc.tables.forEach((table) => {
tableNames = tableNames.concat(Object.keys(table))
})

let tables = []
for (let tableName of tableNames) {
let TableName // aka the physical table name
try {
// Arc app physical names are stored in SSM service discovery
let Name = `/${toLogicalID(appName)}${toLogicalID(stage)}/tables/${tableName}`
let { Parameter } = await ssmPrimary.getParameter({ Name }).promise()
TableName = Parameter.Value

let { Table } = await dynamoReplica.describeTable({ TableName }).promise()
tables.push({
arn: Table.TableArn,
logicalName: tableName,
physicalName: TableName,
})
}
catch (err) {
if (err.name === 'ParameterNotFound') {
throw ReferenceError(`${tableName} not found on replica ${currentRegion}`)
}
if (err.name === 'ResourceNotFoundException') {
throw ReferenceError(`DynamoDB table not found: ${TableName}`)
}
throw (err)
}
}

let tables = await getReplicatedTables(arc, stage, dryRun, appName, primaryRegion, currentRegion)
let index = cloudformation.Resources.Role.Properties.Policies
.findIndex(item => item.PolicyName === 'ArcDynamoPolicy')

// Delete old DynamoDB Policies
cloudformation.Resources.Role.Properties
.Policies[index].PolicyDocument.Statement[0].Resource = []

tables.forEach(({ arn, logicalName, physicalName }) => {
// Cleanup previous DynamoDB Policies
cloudformation.Resources.Role.Properties
.Policies[index].PolicyDocument.Statement[0].Resource = []
// Delete old DynamoDB Tables
let originalResourceTableName = `${toLogicalID(logicalName)}Table`
delete cloudformation.Resources[originalResourceTableName]
// Delete old SSM Parameters
let originalResourceParamName = `${toLogicalID(logicalName)}Param`
delete cloudformation.Resources[originalResourceParamName]

// Add new DynamoDB Policies
cloudformation.Resources.Role.Properties
.Policies[index].PolicyDocument.Statement[0].Resource.push(
arn,
`${arn}/*`,
`${arn}/stream/*`,
)
// Remove the default table and param generate by Architect
let originalResourceParamName = `${toLogicalID(logicalName)}Param`
delete cloudformation.Resources[originalResourceParamName]
let originalResourceTableName = `${toLogicalID(logicalName)}Table`
delete cloudformation.Resources[originalResourceTableName]
// Create a custom global table param
// Add new SSM Parameter for Global Table
let resourceName = `${toLogicalID(logicalName)}GlobalTableParam`
cloudformation.Resources[resourceName] = {
Type: 'AWS::SSM::Parameter',
Expand All @@ -101,5 +63,20 @@ module.exports = {

return cloudformation
},
end: async ({ arc, cloudformation, stage, dryRun }) => {
const multiRegion = arc['arc-plugin-multi-region']
if (!multiRegion) return cloudformation

const { primaryRegion, replicaRegions } = getMultiRegionOptions(multiRegion)
const { appName, currentRegion } = getArcOptions(arc)

if (primaryRegion != currentRegion) return

await updateReplication(
arc, stage, dryRun, appName, primaryRegion, replicaRegions, currentRegion
)

return cloudformation
}
},
}

0 comments on commit 395d7c1

Please sign in to comment.