diff --git a/build.gradle.kts b/build.gradle.kts index 0a247240..50df0a7c 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -39,6 +39,11 @@ allprojects { } } +buildscript { + // required for m1 mac + configurations { classpath { resolutionStrategy { force("net.java.dev.jna:jna:5.7.0") } } } +} + subprojects { apply(plugin = "java") apply(plugin = "kotlin") diff --git a/contracts/src/main/proto/interpreter.proto b/contracts/src/main/proto/interpreter.proto index d7926cc5..a2c21369 100644 --- a/contracts/src/main/proto/interpreter.proto +++ b/contracts/src/main/proto/interpreter.proto @@ -26,6 +26,7 @@ message TestParams { string seed = 1; int32 width = 2; int32 depth = 3; + optional int32 max_sleep_millis = 4; } message Key { diff --git a/services/node-services/.npmrc b/services/node-services/.npmrc new file mode 100644 index 00000000..9e53c381 --- /dev/null +++ b/services/node-services/.npmrc @@ -0,0 +1,2 @@ +@restatedev:registry=https://npm.pkg.github.com/ +//npm.pkg.github.com/:_authToken=${NPM_TOKEN} diff --git a/services/node-services/Dockerfile b/services/node-services/Dockerfile index 0dfefb7e..b09e5049 100644 --- a/services/node-services/Dockerfile +++ b/services/node-services/Dockerfile @@ -4,28 +4,27 @@ ARG NPM_TOKEN WORKDIR /usr/src/app COPY . . -RUN echo "//npm.pkg.github.com/:_authToken=$NPM_TOKEN\n" >> .npmrc && \ - echo "@restatedev:registry=https://npm.pkg.github.com/" >> .npmrc && \ - npm ci && \ - npm run build && \ - rm -f .npmrc +RUN NPM_TOKEN=${NPM_TOKEN} npm ci +RUN npm run build FROM node:18 as prod ARG NPM_TOKEN WORKDIR /usr/src/app # Install app dependencies -COPY package*.json *.tgz ./ -RUN echo "//npm.pkg.github.com/:_authToken=$NPM_TOKEN\n" >> .npmrc && \ - echo "@restatedev:registry=https://npm.pkg.github.com/" >> .npmrc && \ - npm ci --production && \ - rm -f .npmrc +COPY package*.json *.tgz .npmrc ./ +RUN NPM_TOKEN=${NPM_TOKEN} npm ci --production COPY --from=build /usr/src/app/dist /usr/src/app/dist +FROM node:18 + +# Use a new stage so that the build-arg NPM_TOKEN isn't leaked into the final image history +COPY --from=prod /usr/src/app/ /usr/src/app/ + # Install Tini RUN apt-get update && apt-get -y install tini EXPOSE 8080 ENTRYPOINT ["tini", "--"] -CMD ["node", "/usr/src/app/dist/app.js"] \ No newline at end of file +CMD ["node", "/usr/src/app/dist/app.js"] diff --git a/services/node-services/build.gradle.kts b/services/node-services/build.gradle.kts index 643c3346..1d75f093 100644 --- a/services/node-services/build.gradle.kts +++ b/services/node-services/build.gradle.kts @@ -29,6 +29,7 @@ tasks.register("prepareDockerBuild") { ".dockerignore", ".eslintignore", ".eslintrc.json", + ".npmrc", "package.json", "package-lock.json", "tsconfig.json", diff --git a/services/node-services/package-lock.json b/services/node-services/package-lock.json index d3e17b19..62b3a55f 100644 --- a/services/node-services/package-lock.json +++ b/services/node-services/package-lock.json @@ -10,11 +10,13 @@ "dependencies": { "@restatedev/restate-sdk": "1.0.26", "protobufjs": "^7.2.2", + "seedrandom": "^3.0.5", "ts-proto": "^1.140.0", "uuid": "^9.0.0" }, "devDependencies": { "@bufbuild/buf": "1.15.0", + "@types/seedrandom": "^3.0.5", "@types/uuid": "^9.0.1", "@typescript-eslint/eslint-plugin": "^5.53.0", "@typescript-eslint/parser": "^5.53.0", @@ -363,6 +365,12 @@ "resolved": "https://registry.npmjs.org/@types/object-hash/-/object-hash-1.3.4.tgz", "integrity": "sha512-xFdpkAkikBgqBdG9vIlsqffDV8GpvnPEzs0IUtr1v3BEB97ijsFQ4RXVbUZwjFThhB4MDSTUfvmxUD5PGx0wXA==" }, + "node_modules/@types/seedrandom": { + "version": "3.0.5", + "resolved": "https://registry.npmjs.org/@types/seedrandom/-/seedrandom-3.0.5.tgz", + "integrity": "sha512-kopEpYpFQvQdYsZkZVwht/0THHmTFFYXDaqV/lM45eweJ8kcGVDgZHs0RVTolSq55UPZNmjhKc9r7UvLu/mQQg==", + "dev": true + }, "node_modules/@types/semver": { "version": "7.3.13", "resolved": "https://registry.npmjs.org/@types/semver/-/semver-7.3.13.tgz", @@ -1711,6 +1719,11 @@ "queue-microtask": "^1.2.2" } }, + "node_modules/seedrandom": { + "version": "3.0.5", + "resolved": "https://registry.npmjs.org/seedrandom/-/seedrandom-3.0.5.tgz", + "integrity": "sha512-8OwmbklUNzwezjGInmZ+2clQmExQPvomqjL7LFqOYqtmuxRgQYqOD3mHaU+MvZn5FLUeVxVfQjwLZW/n/JFuqg==" + }, "node_modules/semver": { "version": "7.5.0", "resolved": "https://registry.npmjs.org/semver/-/semver-7.5.0.tgz", diff --git a/services/node-services/package.json b/services/node-services/package.json index 744dd508..c4215524 100644 --- a/services/node-services/package.json +++ b/services/node-services/package.json @@ -15,11 +15,13 @@ "dependencies": { "@restatedev/restate-sdk": "1.0.26", "protobufjs": "^7.2.2", + "seedrandom": "^3.0.5", "ts-proto": "^1.140.0", "uuid": "^9.0.0" }, "devDependencies": { "@bufbuild/buf": "1.15.0", + "@types/seedrandom": "^3.0.5", "@types/uuid": "^9.0.1", "@typescript-eslint/eslint-plugin": "^5.53.0", "@typescript-eslint/parser": "^5.53.0", diff --git a/services/node-services/src/app.ts b/services/node-services/src/app.ts index 4848cc21..d726d1b8 100644 --- a/services/node-services/src/app.ts +++ b/services/node-services/src/app.ts @@ -7,6 +7,8 @@ import { protoMetadata as receiverProtoMetadata } from "./generated/receiver"; import { protoMetadata as listProtoMetadata } from "./generated/list"; import { protoMetadata as errorsProtoMetadata } from "./generated/errors"; import { protoMetadata as nonDeterminismProtoMetadata } from "./generated/non_determinism"; +import { protoMetadata as verifierProtoMetadata } from "./generated/verifier"; +import { protoMetadata as interpreterProtoMetadata } from "./generated/interpreter"; import { CounterService, CounterServiceFQN } from "./counter"; import { ListService, ListServiceFQN } from "./collections"; import { FailingService, FailingServiceFQN } from "./errors"; @@ -17,6 +19,8 @@ import { NonDeterministicService, NonDeterministicServiceFQN, } from "./non_determinism"; +import { CommandVerifierService, CommandVerifierServiceFQN } from "./verifier"; +import {CommandInterpreterService, CommandInterpreterServiceFQN} from "./interpreter"; let serverBuilder = restate.createServer(); @@ -77,6 +81,22 @@ const services = new Map([ instance: new FailingService(), }, ], + [ + CommandVerifierServiceFQN, + { + descriptor: verifierProtoMetadata, + service: "CommandVerifier", + instance: new CommandVerifierService(), + }, + ], + [ + CommandInterpreterServiceFQN, + { + descriptor: interpreterProtoMetadata, + service: "CommandInterpreter", + instance: new CommandInterpreterService(), + }, + ], ]); console.log(services.keys()); diff --git a/services/node-services/src/interpreter.ts b/services/node-services/src/interpreter.ts new file mode 100644 index 00000000..fe861268 --- /dev/null +++ b/services/node-services/src/interpreter.ts @@ -0,0 +1,129 @@ +import {RestateContext, useContext} from "@restatedev/restate-sdk"; +import { + BackgroundCallRequest, + CallRequest, + ClearRequest, + Command_AsyncCall, + Command_AsyncCallAwait, + Command_BackgroundCall, + Command_IncrementState, + Command_Sleep, + Command_SyncCall, + CommandInterpreter, + CommandInterpreterClientImpl, + Commands, + Empty, Key, protobufPackage, TestParams, + VerificationRequest, + VerificationResult, +} from "./generated/interpreter"; + +export const CommandInterpreterServiceFQN = protobufPackage + ".CommandInterpreter"; + +export class CommandInterpreterService implements CommandInterpreter { + async call(request: CallRequest): Promise { + return this.eitherCall(request.key, request.commands) + } + + async backgroundCall(request: BackgroundCallRequest): Promise { + return this.eitherCall(request.key, request.commands) + } + + async eitherCall(key: Key | undefined, commands: Commands | undefined): Promise { + if (!commands?.command) { + throw new Error("CallRequest with no commands") + } + if (!key) { + throw new Error("CallRequest with no key") + } + if (!key.params) { + throw new Error("CallRequest with no test parameters") + } + const ctx = useContext(this); + const client = new CommandInterpreterClientImpl(ctx); + const pending_calls = new Map>(); + + for (const c of commands.command) { + switch (true) { + case c.increment !== undefined: + await this._increment(ctx, c.increment as Command_IncrementState) + break + case c.syncCall !== undefined: + await this._syncCall(ctx, client, key.params, c.syncCall as Command_SyncCall) + break + case c.asyncCall !== undefined: + this._asyncCall(ctx, client, pending_calls, key.params, c.asyncCall as Command_AsyncCall) + break + case c.asyncCallAwait !== undefined: + await this._asyncCallAwait(ctx, pending_calls, c.asyncCallAwait as Command_AsyncCallAwait) + break + case c.backgroundCall !== undefined: + await this._backgroundCall(ctx, client, key.params, c.backgroundCall as Command_BackgroundCall) + break + case c.sleep !== undefined: + await this._sleep(ctx, c.sleep as Command_Sleep) + break + default: + // should be unreachable + throw new Error("Empty Command in CallRequest") + } + } + + return Empty.create({}) + } + + async _increment(ctx: RestateContext, request: Command_IncrementState): Promise { + const counter = (await ctx.get("counter")) || 0 + return ctx.set("counter", counter + 1); + } + + async _syncCall(ctx: RestateContext, client: CommandInterpreterClientImpl, params: TestParams, request: Command_SyncCall): Promise { + await client.call(CallRequest.create({ + key: {params, target: request.target}, + commands: request.commands, + })); + } + + _asyncCall(ctx: RestateContext, client: CommandInterpreterClientImpl, pending_calls: Map>, params: TestParams, request: Command_AsyncCall) { + pending_calls.set(request.callId, + client.call(CallRequest.create({ + key: {params, target: request.target}, + commands: request.commands, + }))) + } + + async _asyncCallAwait(ctx: RestateContext, pending_calls: Map>, request: Command_AsyncCallAwait): Promise { + const p = pending_calls.get(request.callId) + if (p === undefined) { + throw new Error("Unrecognised CallID in AsyncCallAwait command") + } + await p + return + } + + async _backgroundCall(ctx: RestateContext, client: CommandInterpreterClientImpl, params: TestParams, request: Command_BackgroundCall): Promise { + return ctx.oneWayCall(() => client.backgroundCall(BackgroundCallRequest.create({ + key: {params, target: request.target}, + commands: request.commands + }))) + } + + async _sleep(ctx: RestateContext, request: Command_Sleep): Promise { + return ctx.sleep(request.milliseconds) + } + + async verify(request: VerificationRequest): Promise { + const ctx = useContext(this) + return VerificationResult.create({ + expected: request.expected, + actual: await ctx.get("counter") || 0, + }) + } + + async clear(request: ClearRequest): Promise { + const ctx = useContext(this) + + await ctx.clear("counter") + + return Empty.create({}) + } +} diff --git a/services/node-services/src/verifier.ts b/services/node-services/src/verifier.ts new file mode 100644 index 00000000..c8677c29 --- /dev/null +++ b/services/node-services/src/verifier.ts @@ -0,0 +1,294 @@ +import { + ClearRequest, + ClearResponse, + CommandVerifier, + DeepPartial, + Empty, + ExecuteRequest, + InspectRequest, + InspectResponse, protobufPackage, + VerificationRequest, + VerificationResponse, +} from "./generated/verifier"; +import { + CallRequest, + ClearRequest as InterpeterClearRequest, + Command, + CommandInterpreterClientImpl, + Commands, + VerificationRequest as InterpreterVerificationRequest, +} from "./generated/interpreter"; +import seedrandom from "seedrandom"; +import {useContext} from "@restatedev/restate-sdk"; + +const MAX_TARGET = 1024 +const DEFAULT_MAX_SLEEP = 32768 + +export class CommandBuilder { + random: () => number // return a random float + width: number + + constructor(random: () => number, width: number) { + this.random = random + this.width = width || 1 + } + + randomInt(max: number) { + return Math.floor(Math.abs(this.random() * max)) + } + + randomTarget(...lockedTargets: Array): number { + let target = this.randomInt(MAX_TARGET) + // rejection sampling + while (lockedTargets.includes(target)) { + target = this.randomInt(MAX_TARGET) + } + return target + } + + normaliseSleeps(commands: Commands | undefined, factor: number) { + if (commands == undefined) { + return + } + for (let i = 0; i < commands.command.length; i++) { + if (commands.command[i].sleep !== undefined) { + const millis = commands.command[i].sleep?.milliseconds || 0 + commands.command[i].sleep = {milliseconds: Math.floor(millis * factor)} + continue + } + + this.normaliseSleeps(commands.command[i].asyncCall?.commands, factor) + this.normaliseSleeps(commands.command[i].syncCall?.commands, factor) + this.normaliseSleeps(commands.command[i].backgroundCall?.commands, factor) + } + } + + // durationUpperBound determines the upper bound on the runtime of a command set, by assuming that all sleeps + // occur in sequence + durationUpperBound(commands: Commands | undefined): number { + if (commands == undefined) { + return 0 + } + + let duration = 0 + for (const c of commands.command) { + if (c.increment != undefined) { + // increment has no effect on completion time + } else if (c.asyncCall !== undefined) { + duration += this.durationUpperBound(c.asyncCall.commands) + } else if (c.asyncCallAwait !== undefined) { + // already accounted for in c.asyncCall + } else if (c.syncCall !== undefined) { + duration += this.durationUpperBound(c.syncCall.commands) + } else if (c.backgroundCall !== undefined) { + duration += this.durationUpperBound(c.backgroundCall.commands) + } else if (c.sleep !== undefined) { + duration += c.sleep.milliseconds + } + } + return duration + } + + + buildCommands(maxSleepMillis: number, depth: number): { target: number, commands: Commands } { + const call = this._buildCommands(this.randomTarget(), depth, []) + const duration = this.durationUpperBound(call.commands) + // normalise so that the entire job takes less time than the max sleep + this.normaliseSleeps(call.commands, maxSleepMillis / duration) + return call + } + + _buildCommands(target: number, depth: number, lockedTargets: Array): { target: number, commands: Commands } { + const commands = new Array>() + lockedTargets.push(target) + + if (depth === 0) { + // last layer; all we can really do at this point is increment + commands.push({increment: {}}) + return {target, commands: Commands.create({command: commands})} + } + + // ensure at least one command + const numCommands = this.randomInt(this.width - 1) + 1 + + let asyncUnlockedCounter = 0 // keeps track of async calls to known-unlocked targets, which we may await + let asyncLockedCounter = numCommands // keeps track of async calls to known-locked targets, which we must not await + + const candidates: () => Array<() => DeepPartial> = () => [ + () => ({ + increment: {} + }), + () => ({ + // hit a known-unlocked target with a sync call, and pass on the lock list for future blocking calls + syncCall: this._buildCommands(this.randomTarget(target, ...lockedTargets), depth - 1, [target, ...lockedTargets]) + }), + () => ({ + asyncCall: { + callId: asyncUnlockedCounter++, + // hit a known-unlocked target with an async call that may be awaited, and pass on the lock list for future blocking calls + ...this._buildCommands(this.randomTarget(target, ...lockedTargets), depth - 1, [target, ...lockedTargets]), + } + }), + () => ({ + asyncCall: { + callId: asyncLockedCounter++, + // deliberately hit a known-locked target with an async call that must not be awaited + ...this._buildCommands([target, ...lockedTargets][this.randomInt(lockedTargets.length + 1)], depth - 1, []), + } + }), + ...(asyncUnlockedCounter > 0 ? [() => ({ + // await a previously made async call that was against a known-unlocked target + // it's totally valid to await previous async calls multiple times, so we don't have to exclude any + asyncCallAwait: {callId: this.randomInt(asyncUnlockedCounter)}, + })] : []), + () => ({ + // deliberately hit a known-locked target with a background call (the call should just schedule after the target is unlocked) + backgroundCall: this._buildCommands([target, ...lockedTargets][this.randomInt(lockedTargets.length + 1)], depth - 1, []), + }), + () => ({ + // deliberately hit a known-unlocked target with a background call (the call should schedule asap) + backgroundCall: this._buildCommands(this.randomTarget(target, ...lockedTargets), depth - 1, []), + }), + () => ({ + // this will be normalised later + sleep: {milliseconds: this.random()}, + }), + ] + + for (let i = 0; i < numCommands; i++) { + const c = candidates() + commands.push(c[this.randomInt(c.length)]()) + } + + return {target, commands: Commands.create({command: commands})} + } +} + +export const CommandVerifierServiceFQN = protobufPackage + ".CommandVerifier"; + +export class CommandVerifierService implements CommandVerifier { + simulateCommands(m: Map, target: number, commands: Commands | undefined): void { + if (!commands?.command) { + throw new Error("CallRequest with no commands") + } + for (const c of commands.command) { + if (c.increment !== undefined) { + m.set(target, (m.get(target) || 0) + 1) + } else if (c.syncCall !== undefined) { + this.simulateCommands(m, c.syncCall.target, c.syncCall.commands) + } else if (c.asyncCall !== undefined) { + this.simulateCommands(m, c.asyncCall.target, c.asyncCall.commands) + } else if (c.asyncCallAwait !== undefined) { + // do nothing + } else if (c.backgroundCall !== undefined) { + this.simulateCommands(m, c.backgroundCall.target, c.backgroundCall.commands) + } else if (c.sleep !== undefined) { + // do nothing + } else { + // should be unreachable + throw new Error("Empty Command in CallRequest") + } + } + } + + async execute(request: ExecuteRequest): Promise { + if (!request.params) { + throw new Error("No params in ExecuteRequest") + } + const ctx = useContext(this); + + // we've already been called with these parameters; don't kick off the job a second time + if (await ctx.get("started")) { + return Empty.create({}) + } else { + await ctx.set("started", true) + } + + const client = new CommandInterpreterClientImpl(ctx) + const builder = new CommandBuilder(seedrandom(request.params.seed), request.params.width) + const { + target, + commands + } = builder.buildCommands(request.params.maxSleepMillis || DEFAULT_MAX_SLEEP, request.params.depth) + + await client.call(CallRequest.create({key: {params: request.params, target}, commands})) + + return Empty.create({}) + } + + async verify(request: VerificationRequest): Promise { + if (!request.params) { + throw new Error("No params in VerificationRequest") + } + const ctx = useContext(this) + const client = new CommandInterpreterClientImpl(ctx) + const builder = new CommandBuilder(seedrandom(request.params.seed), request.params.width) + const { + target, + commands + } = builder.buildCommands(request.params.maxSleepMillis || DEFAULT_MAX_SLEEP, request.params.depth) + const m = new Map() + this.simulateCommands(m, target, commands) + + // fire off all the verification requests and see if any come back wrong + await Promise.all(Array.from(m).map(async ([key, value]): Promise => { + const resp = await client.verify(InterpreterVerificationRequest.create({ + key: { + params: request.params, + target: key, + }, + expected: value, + })) + if (resp.expected != value) { + throw new Error(`Incorrect value back for expected: sent ${value}, received ${resp.expected}`) + } + if (resp.expected != resp.actual) { + throw new Error(`Incorrect value for target ${key}: expected ${resp.expected}, got ${resp.actual}`) + } + })); + + return VerificationResponse.create({counters: Object.fromEntries(m)}) + } + + async clear(request: ClearRequest): Promise { + if (!request.params) { + throw new Error("No params in ClearRequest") + } + const ctx = useContext(this) + // clear the idempotent flag, given that we can now execute again + if (await ctx.get("started")) { + ctx.clear("started") + } + const client = new CommandInterpreterClientImpl(ctx) + const builder = new CommandBuilder(seedrandom(request.params.seed), request.params.width) + const { + target, + commands + } = builder.buildCommands(request.params.maxSleepMillis || DEFAULT_MAX_SLEEP, request.params.depth) + const m = new Map() + this.simulateCommands(m, target, commands) + + await Promise.all(Array.from(m.keys()).map(async (key): Promise => { + await client.clear(InterpeterClearRequest.create({ + key: { + params: request.params, + target: key, + }, + })) + })); + + return ClearResponse.create({targets: Array.from(m.keys())}) + } + + async inspect(request: InspectRequest): Promise { + if (!request.params) { + throw new Error("No params in InspectRequest") + } + const builder = new CommandBuilder(seedrandom(request.params.seed), request.params.width) + const { + target, + commands + } = builder.buildCommands(request.params.maxSleepMillis || DEFAULT_MAX_SLEEP, request.params.depth) + return InspectResponse.create({call: {key: {params: request.params, target}, commands}}) + } +} diff --git a/tests/src/test/kotlin/dev/restate/e2e/Containers.kt b/tests/src/test/kotlin/dev/restate/e2e/Containers.kt index 7d92040c..4366612e 100644 --- a/tests/src/test/kotlin/dev/restate/e2e/Containers.kt +++ b/tests/src/test/kotlin/dev/restate/e2e/Containers.kt @@ -9,6 +9,8 @@ import dev.restate.e2e.functions.externalcall.RandomNumberListGeneratorGrpc import dev.restate.e2e.functions.externalcall.ReplierGrpc import dev.restate.e2e.functions.receiver.ReceiverGrpc import dev.restate.e2e.functions.singletoncounter.SingletonCounterGrpc +import dev.restate.e2e.functions.verification.interpreter.CommandInterpreterGrpc +import dev.restate.e2e.functions.verification.verifier.CommandVerifierGrpc import dev.restate.e2e.utils.FunctionSpec import dev.restate.e2e.utils.FunctionSpec.RegistrationOptions import dev.restate.e2e.utils.FunctionSpec.RetryPolicy @@ -97,13 +99,14 @@ object Containers { nodeServicesContainer("node-errors", FailingServiceGrpc.SERVICE_NAME) .withRegistrationOptions(RegistrationOptions(retryPolicy = RetryPolicy.None)) - // -- Verification test container (source https://github.com/restatedev/restate-verification) + // -- Verification test container const val VERIFICATION_FUNCTION_HOSTNAME = "restate-verification" val VERIFICATION_FUNCTION_SPEC = - FunctionSpec.builder("ghcr.io/restatedev/restate-verification:latest") - .withHostName(VERIFICATION_FUNCTION_HOSTNAME) - .withPort(8000) + nodeServicesContainer( + VERIFICATION_FUNCTION_HOSTNAME, + CommandVerifierGrpc.SERVICE_NAME, + CommandInterpreterGrpc.SERVICE_NAME) .withRegistrationOptions(RegistrationOptions(retryPolicy = FIXED_DELAY_RETRY_POLICY)) }