Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement toggle notification queue mode #1950

Merged
merged 4 commits into from
Jul 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,15 @@ const build: ExternalMethods['build'] = async (
)
const trxTableNameAsId = escapeId(`${tablePrefix}__${schemaName}__TRX__`)

const xaKey = generateGuid(`${Date.now()}${Math.random()}${process.pid}`)
const firstRandom = Math.random()
let lastRandom: number | null = null
// More entropy via branch misprediction and more context changes
for (let index = 0; index < Math.floor(firstRandom * 50) + 1; index++) {
lastRandom = Math.random()
}
const xaKey = generateGuid(
`${Date.now()}${firstRandom}${lastRandom}${process.pid}`
)

const rows = (await inlineLedgerRunQuery(
`WITH "MaybeAcquireLock" AS (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ const connect: CurrentConnectMethod = async (imports, pool, options) => {
.join('\n')
}
if (pool.connection === connection) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
pool.connection = null!
}
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import type { GenerateGuidMethod } from './types'

const GUID_BYTES = 32

const generateGuid: GenerateGuidMethod = (...args) => {
const baseBuffer = Buffer.from(`${args.map(String).join('')}`)
const resultBuffer = Buffer.alloc(8)
const resultBuffer = Buffer.alloc(GUID_BYTES)

for (let index = 0; index < baseBuffer.length; index++) {
resultBuffer[index % 8] = resultBuffer[index % 8] ^ baseBuffer[index]
resultBuffer[index % GUID_BYTES] =
resultBuffer[index % GUID_BYTES] ^ baseBuffer[index]
}

const result = `e${resultBuffer.toString('hex').toLowerCase()}`
Expand Down
64 changes: 36 additions & 28 deletions packages/runtime/runtime/src/cloud/init-subscriber.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
setFunctionTags,
deleteEventSourceMapping,
getFunctionTags,
invokeFunction,
} from 'resolve-cloud-common/lambda'
import { getCallerIdentity } from 'resolve-cloud-common/sts'

Expand All @@ -34,46 +35,50 @@ const initSubscriber = (resolve, lambdaContext) => {
const region = process.env.AWS_REGION
const userId = process.env.RESOLVE_USER_ID
const functionArn = `arn:aws:lambda:${region}:${accountId}:function:${functionName}`
const useSqs = !!process.env.EXPERIMENTAL_SQS_TRANSPORT

resolve.getEventSubscriberDestination = (eventSubscriber) =>
`arn:aws:sqs:${region}:${accountId}:${userId}-${resolve.eventSubscriberScope}-${eventSubscriber}`
useSqs
? `arn:aws:sqs:${region}:${accountId}:${userId}-${resolve.eventSubscriberScope}-${eventSubscriber}`
: functionArn

resolve.subscriptionsCredentials = {
applicationLambdaArn: lambdaContext.invokedFunctionArn,
}

resolve.sendSqsMessage = async (destination, parameters) => {
// Send SQS messages within 1 minute with exponential jitter from 64 ms
const getRemainingSendingTime = ((endTime) => endTime - Date.now()).bind(
null,
Date.now() + 60 * 1000
)
const getJitterTime = (attempt) => Math.pow(2, attempt + 7)
resolve.invokeLambdaAsync = async (destination, parameters) => {
await invokeFunction({
Region: region,
FunctionName: destination,
Payload: parameters,
InvocationType: 'Event',
})
}

resolve.sendSqsMessage = async (destination, parameters) => {
const queueUrl = `https://sqs.${region}.amazonaws.com/${accountId}/${destination}`
for (let attempt = 0; getRemainingSendingTime() > 0; attempt++) {
try {
await sendMessage({
Region: region,
QueueUrl: queueUrl,
MessageBody: JSON.stringify(parameters),
})
break
} catch (err) {
await new Promise((resolve) =>
setTimeout(resolve, getJitterTime(attempt))
)
}
}
await sendMessage({
Region: region,
QueueUrl: queueUrl,
MessageBody: JSON.stringify(parameters),
})
}

resolve.invokeBuildAsync = async (parameters) => {
await resolve.sendSqsMessage(
`${userId}-${resolve.eventSubscriberScope}-${parameters.eventSubscriber}`,
parameters
)
}
resolve.invokeBuildAsync = async (parameters) =>
useSqs
? await resolve.sendSqsMessage(
`${userId}-${resolve.eventSubscriberScope}-${parameters.eventSubscriber}`,
parameters
)
: await resolve.invokeLambdaAsync(functionName, {
resolveSource: 'BuildEventSubscriber',
...parameters,
})

resolve.ensureQueue = async (name) => {
if (!useSqs) {
return
}
const getTags = () => {
const tags = {
'resolve-deployment-id': resolve.eventSubscriberScope,
Expand Down Expand Up @@ -206,6 +211,9 @@ const initSubscriber = (resolve, lambdaContext) => {
}

resolve.deleteQueue = async (name) => {
if (!useSqs) {
return
}
const errors = []
let functionTags = null
let UUID = null
Expand Down
20 changes: 20 additions & 0 deletions packages/runtime/runtime/src/cloud/lambda-worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,26 @@ const lambdaWorker = async (resolveBase, lambdaEvent, lambdaContext) => {

log.verbose(`executorResult: ${JSON.stringify(executorResult)}`)

return executorResult
} else if (lambdaEvent.resolveSource === 'BuildEventSubscriber') {
initSubscriber(resolveBase, lambdaContext)
initScheduler(resolve)

log.debug('initializing reSolve framework')
await initResolve(resolve)
log.debug('reSolve framework initialized')

log.debug('identified event source: event-subscriber-direct')
const { resolveSource, ...buildParameters } = lambdaEvent
void resolveSource

const executorResult = await resolve.eventSubscriber.build({
...buildParameters,
coldStart,
})

log.verbose(`executorResult: ${JSON.stringify(executorResult)}`)

return executorResult
} else if (
Array.isArray(lambdaEvent.Records) &&
Expand Down
43 changes: 26 additions & 17 deletions packages/runtime/runtime/src/common/notify-event-subscribers.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
const getNotificationObject = (
eventSubscriber,
eventWithCursor,
isForeign
) => ({
eventSubscriber,
initiator: isForeign ? 'command-foreign' : 'command',
notificationId: `NT-${Date.now()}${Math.floor(Math.random() * 1000000)}`,
sendTime: Date.now(),
...(eventWithCursor != null ? eventWithCursor : {}),
})

const notifyEventSubscriber = async (
resolveBase,
destination,
Expand All @@ -22,14 +34,17 @@ const notifyEventSubscriber = async (
}
case /^arn:aws:sqs:/.test(destination): {
const queueFullName = destination.split(':')[5]
await resolveBase.sendSqsMessage(queueFullName, {
eventSubscriber,
initiator: 'command-foreign',
notificationId: `NT-${Date.now()}${Math.floor(
Math.random() * 1000000
)}`,
sendTime: Date.now(),
...(eventWithCursor != null ? eventWithCursor : {}),
await resolveBase.sendSqsMessage(
queueFullName,
getNotificationObject(eventSubscriber, eventWithCursor, true)
)
break
}
case /^arn:aws:lambda:/.test(destination): {
const lambdaFullName = destination.split(':')[6]
await resolveBase.invokeLambdaAsync(lambdaFullName, {
resolveSource: 'BuildEventSubscriber',
...getNotificationObject(eventSubscriber, eventWithCursor, true),
})
break
}
Expand All @@ -54,15 +69,9 @@ const notifyEventSubscribers = async (resolve, eventWithCursor) => {
const promises = []
for (const { name: eventSubscriber } of resolve.eventListeners.values()) {
promises.push(
resolve.invokeBuildAsync({
eventSubscriber,
initiator: 'command',
notificationId: `NT-${Date.now()}${Math.floor(
Math.random() * 1000000
)}`,
sendTime: Date.now(),
...(eventWithCursor != null ? eventWithCursor : {}),
})
resolve.invokeBuildAsync(
getNotificationObject(eventSubscriber, eventWithCursor)
)
)
}

Expand Down