diff --git a/src/idl/coproc_idl.json b/src/idl/coproc_idl.json index b8b822f7142d..3b52dad467c9 100644 --- a/src/idl/coproc_idl.json +++ b/src/idl/coproc_idl.json @@ -1,6 +1,6 @@ { "namespace": "coproc", - "service_name": "management", + "service_name": "script_manager", "includes": [ "coproc/types.h" ], @@ -9,12 +9,12 @@ { "name": "enable_copros", "input_type": "enable_copros_request", - "output_type": "enable_copros_response" + "output_type": "enable_copros_reply" }, { "name": "disable_copros", "input_type": "disable_copros_request", - "output_type": "disable_copros_response" + "output_type": "disable_copros_reply" } ] } \ No newline at end of file diff --git a/src/idl/process_batch_coproc.json b/src/idl/process_batch_coproc.json new file mode 100644 index 000000000000..02cb26307a91 --- /dev/null +++ b/src/idl/process_batch_coproc.json @@ -0,0 +1,15 @@ +{ + "namespace": "coproc", + "service_name": "supervisor", + "includes": [ + "coproc/types.h" + ], + "js_include": "../../domain/generatedRpc/generatedClasses", + "methods": [ + { + "name": "process_batch", + "input_type": "process_batch_request", + "output_type": "process_batch_reply" + } + ] +} \ No newline at end of file diff --git a/src/js/generate-entries.sh b/src/js/generate-entries.sh index 677ede661f4a..e3a7fd304b9a 100755 --- a/src/js/generate-entries.sh +++ b/src/js/generate-entries.sh @@ -14,3 +14,8 @@ cd "$root"/tools/ts-generator/rpc && python rpc_gen_js.py \ --server-define-file "$root"/src/idl/coproc_idl.json \ --output-file "$root"/src/js/modules/rpc/serverAndClients/server.ts + +cd "$root"/tools/ts-generator/rpc && + python rpc_gen_js.py \ + --server-define-file "$root"/src/idl/process_batch_coproc.json \ + --output-file "$root"/src/js/modules/rpc/serverAndClients/processBatch.ts diff --git a/src/js/modules/domain/generatedRpc/enableDisableCoproc.json b/src/js/modules/domain/generatedRpc/enableDisableCoproc.json index ca527e9afc6a..ec0ceae1b2d0 100644 --- a/src/js/modules/domain/generatedRpc/enableDisableCoproc.json +++ b/src/js/modules/domain/generatedRpc/enableDisableCoproc.json @@ -13,7 +13,7 @@ }, { "name": "compression", - "type": "int8" + "type": "uint8" }, { "name": "payloadSize", @@ -33,6 +33,19 @@ } ] }, + { + "className": "TopicEnableRequest", + "fields": [ + { + "name": "topic", + "type": "string" + }, + { + "name": "injectionPolicy", + "type": "int8" + } + ] + }, { "className": "EnableCoprocRequest", "fields": [ @@ -42,7 +55,7 @@ }, { "name": "topics", - "type": "Array" + "type": "Array" } ] }, @@ -56,7 +69,7 @@ ] }, { - "className": "EnableCoprocResponse", + "className": "EnableCoprocReply", "fields": [ { "name": "id", @@ -69,16 +82,16 @@ ] }, { - "className": "EnableCoprosResponse", + "className": "EnableCoprosReply", "fields": [ { "name": "inputs", - "type": "Array" + "type": "Array" } ] }, { - "className": "DisableCoprosResponse", + "className": "DisableCoprosReply", "fields": [ { "name": "inputs", @@ -96,4 +109,4 @@ ] } ] -} \ No newline at end of file +} diff --git a/src/js/modules/domain/generatedRpc/entitiesDefinition.json b/src/js/modules/domain/generatedRpc/entitiesDefinition.json index 7eb9d6040f67..33b52715309a 100644 --- a/src/js/modules/domain/generatedRpc/entitiesDefinition.json +++ b/src/js/modules/domain/generatedRpc/entitiesDefinition.json @@ -33,19 +33,6 @@ } ] }, - { - "className": "ProcessBatchRequest", - "fields": [ - { - "name": "npt", - "type": "Ntp" - }, - { - "name": "recordBatch", - "type": "RecordBatch" - } - ] - }, { "className": "Ntp", "fields": [ @@ -72,7 +59,8 @@ }, { "name": "records", - "type": "Array" + "type": "Array", + "size": "header.recordCount" } ] }, @@ -130,6 +118,14 @@ { "name": "recordCount", "type": "int32" + }, + { + "name": "term", + "type": "int64" + }, + { + "name": "isCompressed", + "type": "int8" } ] }, @@ -138,7 +134,7 @@ "fields": [ { "name": "length", - "type": "varint" + "type": "uint32" }, { "name": "attributes", @@ -146,15 +142,15 @@ }, { "name": "timestampDelta", - "type": "varint" + "type": "int64" }, { "name": "offsetDelta", - "type": "varint" + "type": "int32" }, { "name": "keyLength", - "type": "varint" + "type": "int32" }, { "name": "key", @@ -162,7 +158,7 @@ }, { "name": "valueLen", - "type": "varint" + "type": "int32" }, { "name": "value", @@ -194,6 +190,58 @@ "type": "buffer" } ] + }, + { + "className": "ProcessBatchRequestItem", + "fields": [ + { + "name": "coprocessorIds", + "type": "Array" + }, + { + "name": "ntp", + "type": "Ntp" + }, + { + "name": "recordBatch", + "type": "Array" + } + ] + }, + { + "className": "ProcessBatchRequest", + "fields": [ + { + "name": "requests", + "type": "Array" + } + ] + }, + { + "className": "ProcessBatchReplyItem", + "fields": [ + { + "name": "coprocessorId", + "type": "uint64" + }, + { + "name": "ntp", + "type": "Ntp" + }, + { + "name": "resultRecordBatch", + "type": "Array" + } + ] + }, + { + "className": "ProcessBatchReply", + "fields": [ + { + "name": "result", + "type": "Array" + } + ] } ] } diff --git a/src/js/modules/public/Coprocessor.ts b/src/js/modules/public/Coprocessor.ts index f87b9a162f2b..5f66db70752d 100644 --- a/src/js/modules/public/Coprocessor.ts +++ b/src/js/modules/public/Coprocessor.ts @@ -32,16 +32,18 @@ interface RecordBatchHeader { producerEpoch: number; baseSequence: number; recordCount: number; + term: bigint; + isCompressed: number; } interface Record { - length: bigint; + length: number; attributes: number; timestampDelta: bigint; - offsetDelta: bigint; - keyLength: bigint; + offsetDelta: number; + keyLength: number; key: Buffer; - valueLen: bigint; + valueLen: number; value: Buffer; headers: Array; } @@ -55,7 +57,7 @@ interface Coprocessor { inputTopics: string[]; policyError: PolicyError; globalId: bigint; - apply(record: RecordBatch): RecordBatch; + apply: (record: RecordBatch) => Map; } export { RecordBatchHeader, RecordHeader, Record, RecordBatch, Coprocessor }; diff --git a/src/js/modules/public/SimpleTransform.ts b/src/js/modules/public/SimpleTransform.ts index f09b376d2f65..df62b11128cd 100644 --- a/src/js/modules/public/SimpleTransform.ts +++ b/src/js/modules/public/SimpleTransform.ts @@ -28,7 +28,7 @@ export class SimpleTransform implements Coprocessor { inputTopics: string[]; policyError: PolicyError; - apply(record: RecordBatch): RecordBatch { + apply = (record: RecordBatch): Map => { throw Error("processRecord isn't implemented yet"); - } + }; } diff --git a/src/js/modules/public/Utils.ts b/src/js/modules/public/Utils.ts index cc5ed3729ffb..8845ab75670c 100644 --- a/src/js/modules/public/Utils.ts +++ b/src/js/modules/public/Utils.ts @@ -22,6 +22,8 @@ const createHeader = ( recordBatchType: 0, recordCount: 0, sizeBytes: 0, + term: BigInt(0), + isCompressed: 0, ...header, }; }; @@ -43,12 +45,12 @@ const createRecord = (record: Partial): Record => { return { attributes: 0, key: Buffer.from(""), - keyLength: BigInt(0), - length: BigInt(0), - offsetDelta: BigInt(0), + keyLength: 0, + length: 0, + offsetDelta: 0, timestampDelta: BigInt(0), value: Buffer.from(""), - valueLen: BigInt(0), + valueLen: 0, ...record, headers: headers.map(createRecordHeader), }; diff --git a/src/js/modules/rpc/server.ts b/src/js/modules/rpc/server.ts index 03a2a64f83d5..a2840b1030f9 100644 --- a/src/js/modules/rpc/server.ts +++ b/src/js/modules/rpc/server.ts @@ -1,22 +1,23 @@ -import { Socket, createServer, Server as NetServer } from "net"; import Repository from "../supervisors/Repository"; import FileManager from "../supervisors/FileManager"; import { Coprocessor, PolicyError } from "../public/Coprocessor"; import { + ProcessBatchReply, + ProcessBatchReplyItem, ProcessBatchRequest, - RpcHeader, + ProcessBatchRequestItem, } from "../domain/generatedRpc/generatedClasses"; -import BF from "../domain/generatedRpc/functions"; -import { ManagementClient } from "./serverAndClients/server"; -import { IOBuf } from "../utilities/IOBuf"; +import { Script_ManagerClient as ManagementClient } from "./serverAndClients/server"; +import { SupervisorServer } from "./serverAndClients/processBatch"; -export class Server { - public constructor( - activeDir: string, - inactiveDir: string, - submitDir: string - ) { - this.managementClient = new ManagementClient(43188); +export class ProcessBatchServer extends SupervisorServer { + private readonly repository: Repository; + private fileManager: FileManager; + managementClient: ManagementClient; + + constructor(activeDir: string, inactiveDir: string, submitDir: string) { + super(); + this.managementClient = new ManagementClient(43118); this.applyCoprocessor = this.applyCoprocessor.bind(this); this.repository = new Repository(); this.fileManager = new FileManager( @@ -26,106 +27,52 @@ export class Server { inactiveDir, this.managementClient ); - this.server = createServer(this.executeCoprocessorOnRequest); } - /** - * Starts the server on the given port. - * @param port - */ - public listen(port: number): Promise { - return new Promise((resolve, reject) => { - try { - this.server.listen(port, "127.0.0.1", null, resolve); - } catch (e) { - reject(e); - } - }); + fireException(message: string): Promise { + return Promise.reject(new Error(message)); } - /** - * Close server connection - */ - public close(): Promise { - return new Promise((resolve, reject) => - this.server.close((err) => { - err && reject(err); - this.fileManager.close().then(resolve).catch(reject); - }) + process_batch(input: ProcessBatchRequest): Promise { + const failRequest = input.requests.find( + (request) => request.coprocessorIds.length === 0 ); + if (failRequest) { + return this.fireException("Bad request: request without coprocessor ids"); + } else { + return Promise.all( + input.requests.map(this.applyCoprocessor) + ).then((result) => ({ result: result.flat() })); + } } - /** - * Close coprocessor filesystem watcher process - */ - public closeCoprocessorManager(): Promise { - return this.fileManager.close(); - } - - /** - * Apply Coprocessors to Request when it arrives - * @param socket - */ - private executeCoprocessorOnRequest = (socket: Socket) => { - socket.on("readable", () => { - if (socket.readableLength > 26) { - const [rpcHeader] = RpcHeader.fromBytes(socket.read(26)); - const [processBatchRequests] = BF.readArray()( - socket.read(rpcHeader.payloadSize), - 0, - (auxBuffer, auxOffset) => - BF.readObject(auxBuffer, auxOffset, ProcessBatchRequest) - ); - if (rpcHeader.compression != 0) { - throw "Rpc Header has an unexpect compression value:"; - } - Promise.all(processBatchRequests.map(this.applyCoprocessor)).then( - (result) => { - const iobuf = new IOBuf(); - RpcHeader.toBytes(rpcHeader, iobuf); - BF.writeArray(true)(result.flat(), iobuf, (item, auxBuffer) => - BF.writeObject(auxBuffer, ProcessBatchRequest, item) - ); - iobuf.forEach((fragment) => socket.write(fragment.buffer)); - } - ); - } - }); - }; - /** * Given a Request, it'll find and execute Coprocessor by its * Request's topic, if there is an exception when applying the * coprocessor function it handles the error by its ErrorPolicy - * @param processBatchRequest + * @param requestItem */ private applyCoprocessor( - processBatchRequest: ProcessBatchRequest - ): Promise { - const handleTable = this.repository - .getCoprocessorsByTopics() - .get(processBatchRequest.npt.topic); - if (handleTable) { - const results = handleTable.apply( - processBatchRequest, - this.handleErrorByPolicy.bind(this) - ); - // eslint-disable-next-line @typescript-eslint/ban-ts-comment - // @ts-ignore - return Promise.allSettled(results).then((coprocessorResults) => { - const array = []; - coprocessorResults.forEach((result) => { - if (result.status === "rejected") { - console.error(result.reason); - } else { - array.push(result.value); - } - }); - return array; + requestItem: ProcessBatchRequestItem + ): Promise { + const results = this.repository.applyCoprocessor( + requestItem.coprocessorIds, + requestItem, + this.handleErrorByPolicy.bind(this), + this.fireException + ); + + return Promise.allSettled(results).then((coprocessorResults) => { + const array: ProcessBatchReplyItem[][] = []; + coprocessorResults.forEach((result) => { + if (result.status === "rejected") { + console.error(result.reason); + } else { + array.push(result.value); + } }); - } else { - return Promise.resolve([]); - } + return array.flat(); + }); } /** @@ -136,9 +83,9 @@ export class Server { */ private handleErrorByPolicy( coprocessor: Coprocessor, - processBatchRequest: ProcessBatchRequest, + processBatchRequest: ProcessBatchRequestItem, error: Error - ): Promise { + ): Promise { const errorMessage = this.createMessageError( coprocessor, processBatchRequest, @@ -158,17 +105,14 @@ export class Server { private createMessageError( coprocessor: Coprocessor, - processBatchRequest: ProcessBatchRequest, + processBatchRequest: ProcessBatchRequestItem, error: Error ): string { return ( - `Failed to apply coprocessor ${coprocessor.globalId} to request's id ` + - `${processBatchRequest.recordBatch.header.baseOffset}: ${error.message}` + `Failed to apply coprocessor ${coprocessor.globalId} to request's id :` + + `${processBatchRequest.recordBatch + .map((rb) => rb.header.baseOffset) + .join(", ")}: ${error.message}` ); } - - private server: NetServer; - private readonly repository: Repository; - private fileManager: FileManager; - managementClient: ManagementClient; } diff --git a/src/js/modules/rpc/service.ts b/src/js/modules/rpc/service.ts index d6e03cf35ff4..c0a3a96d38c0 100644 --- a/src/js/modules/rpc/service.ts +++ b/src/js/modules/rpc/service.ts @@ -1,5 +1,54 @@ -import { Server } from "./server"; +import { ProcessBatchServer } from "./server"; +import { safeLoadAll } from "js-yaml"; +import { join, resolve } from "path"; +import * as fs from "fs"; +import { promisify } from "util"; -const port = 8124; -const service = new Server("", "", ""); -service.listen(port); +// read yaml config file +const readConfigFile = (confPath: string): Promise> => { + try { + return fs.promises.readFile(confPath) + .then(file => safeLoadAll(file)[0]); + } catch (e) { + return Promise.reject(new Error(`Error reading config file: ${e}`)); + } +}; + +/* + validate scaffolding on coprocessor path, if the expected folders don't + exist, it'll create them +*/ +const validateOrCreateScaffolding = (directoryPath: string): Promise => { + const exists = promisify(fs.exists); + const validations = ["active", "inactive", "submit"].map(folder => { + const path = join(directoryPath, folder); + return exists(path) + .then(exist => { + if (!exist) { + console.log(path); + return fs.promises.mkdir(path, { recursive: true }); + } + }); + }); + return Promise.all(validations).then(() => null); +}; + +// read config file path argument +const configPathArg = process.argv.splice(2)[0]; +const defaultConfigPath = "/var/lib/redpanda/conf/redpanda.yaml"; +const defaultCoprocessorPath = "/var/lib/redpanda/coprocessor" +// resolve config path or assign default value +const configPath = configPathArg ? resolve(configPathArg) : defaultConfigPath; + +readConfigFile(configPath).then((config => { + const port = config?.redpanda?.coproc_supervisor_server || 43189; + const path = config?.coproc_engine?.path || defaultCoprocessorPath; + validateOrCreateScaffolding(path).then(() => { + const service = new ProcessBatchServer( + path + "/active", + path + "/inactive", + path + "/submit" + ); + service.listen(port); + }); +})); diff --git a/src/js/modules/supervisors/FileManager.ts b/src/js/modules/supervisors/FileManager.ts index 54823fc7aacf..a7d98094b252 100644 --- a/src/js/modules/supervisors/FileManager.ts +++ b/src/js/modules/supervisors/FileManager.ts @@ -5,7 +5,7 @@ import Repository from "./Repository"; import { Handle } from "../domain/Handle"; import { getChecksumFromFile } from "../utilities/Checksum"; import { Coprocessor } from "../public/Coprocessor"; -import { ManagementClient } from "../rpc/serverAndClients/server"; +import { Script_ManagerClient as ManagementClient } from "../rpc/serverAndClients/server"; import * as path from "path"; import { hash64 } from "xxhash"; @@ -58,18 +58,15 @@ class FileManager { return this.moveCoprocessorFile(preCoprocessor, this.inactiveDir) .then(() => repository.remove(preCoprocessor)) .then(() => this.moveCoprocessorFile(handle, this.activeDir)) - .then((newCoprocessor) => - repository.add(newCoprocessor).then(() => newCoprocessor) - ); + .then((newCoprocessor) => repository.add(newCoprocessor)); } } else { return this.moveCoprocessorFile(handle, this.activeDir) - .then((newHandle) => { - repository.add(newHandle); - return newHandle; - }) + .then((newHandle) => repository.add(newHandle)) .then((newHandle) => - this.enableTopic([newHandle.coprocessor]).then(() => newHandle) + this.enableCoprocessor([newHandle.coprocessor]).then( + () => newHandle + ) ); } }); @@ -126,7 +123,7 @@ class FileManager { deregisterCoprocessor(coprocessor: Coprocessor): Promise { const handle = this.repository.findByCoprocessor(coprocessor); if (handle) { - this.disableCoprocessors([handle.coprocessor]) + return this.disableCoprocessors([handle.coprocessor]) .then(() => this.moveCoprocessorFile(handle, this.inactiveDir)) .then((coprocessor) => { this.repository.remove(coprocessor); @@ -196,9 +193,9 @@ class FileManager { * @param coprocessors * @param validateAlreadyEnabled */ - enableTopic( + enableCoprocessor( coprocessors: Coprocessor[], - validateAlreadyEnabled = true + validateAlreadyEnabled = false ): Promise { if (coprocessors.length == 0) { return Promise.resolve(); @@ -207,7 +204,10 @@ class FileManager { .enable_copros({ coprocessors: coprocessors.map((coproc) => ({ id: coproc.globalId, - topics: coproc.inputTopics, + topics: coproc.inputTopics.map((topic) => ({ + topic, + injectionPolicy: 2, + })), })), }) .then((enableResponse) => { @@ -217,7 +217,7 @@ class FileManager { ); const condition = validateAlreadyEnabled ? (coproc) => coproc > 0 - : (coproc) => coproc > 1; + : (coproc) => coproc > 2; const invalidCoprocessor = isValid(condition); if (invalidCoprocessor.length > 0) { return Promise.reject( diff --git a/src/js/modules/supervisors/HandleTable.ts b/src/js/modules/supervisors/HandleTable.ts index 470fbc72302a..e69de29bb2d1 100644 --- a/src/js/modules/supervisors/HandleTable.ts +++ b/src/js/modules/supervisors/HandleTable.ts @@ -1,72 +0,0 @@ -import { Handle } from "../domain/Handle"; -import { Coprocessor } from "../public/Coprocessor"; -import { Server } from "../rpc/server"; -import { ProcessBatchRequest } from "../domain/generatedRpc/generatedClasses"; - -export class HandleTable { - constructor() { - this.coprocessors = new Map(); - } - - registerHandle(handle: Handle): void { - this.coprocessors.set(handle.coprocessor.globalId, handle); - } - - deregisterHandle(handle: Handle): void { - this.coprocessors.delete(handle.coprocessor.globalId); - } - - findHandleById(handle: Handle): Handle | undefined { - return this.coprocessors.get(handle.coprocessor.globalId); - } - - findHandleByCoprocessor(coprocessor: Coprocessor): Handle | undefined { - return this.coprocessors.get(coprocessor.globalId); - } - - /** - * Given a Request, apply every coprocessor on handleTable defined for each - * RecordBatch's topic - * @param processBatchRequest, Request instance - * @param handleError, function that handle error on when apply coprocessor - * - * Note: Server["handleErrorByPolicy"] is a ts helper to specify - * an object's property type. - * https://www.typescriptlang.org/docs/handbook/release-notes/ - * typescript-2-1.html#keyof-and-lookup-types - */ - apply( - processBatchRequest: ProcessBatchRequest, - handleError: Server["handleErrorByPolicy"] - ): Promise[] { - return [...this.coprocessors.values()].map((handle) => { - // Convert int16 to uint16 and check if have an unexpected compression - if (((processBatchRequest.recordBatch.header.attrs >>> 0) & 0x7) != 0) { - throw ( - "Record Batch has an unexpect compression value: baseOffset" + - processBatchRequest.recordBatch.header.baseOffset - ); - } - try { - //TODO: https://app.clubhouse.io/vectorized/story/1257 - //pass functor to apply function - const resultRecordBatch = handle.coprocessor.apply( - processBatchRequest.recordBatch - ); - - return Promise.resolve({ - ...processBatchRequest, - recordBatch: resultRecordBatch, - }); - } catch (e) { - return handleError(handle.coprocessor, processBatchRequest, e); - } - }); - } - - size(): number { - return this.coprocessors.size; - } - - private readonly coprocessors: Map; -} diff --git a/src/js/modules/supervisors/Repository.ts b/src/js/modules/supervisors/Repository.ts index 9b4fea58a10f..8ccf135fcd85 100644 --- a/src/js/modules/supervisors/Repository.ts +++ b/src/js/modules/supervisors/Repository.ts @@ -1,37 +1,31 @@ import { Handle } from "../domain/Handle"; import { Coprocessor } from "../public/Coprocessor"; -import { HandleTable } from "./HandleTable"; +import { + ProcessBatchReplyItem, + ProcessBatchRequestItem, +} from "../domain/generatedRpc/generatedClasses"; +import { ProcessBatchServer } from "../rpc/server"; +import { createRecordBatch } from "../public"; /** * Repository is a container for Handles. */ class Repository { constructor() { - this.coprocessors = new Map(); + this.handles = new Map(); } /** * this method adds a new Handle to the repository - * @param coprocessor + * @param handle */ - add(coprocessor: Handle): Promise { - const addHandle = () => { - coprocessor.coprocessor.inputTopics.forEach((topic) => { - const currentHandleTable = this.coprocessors.get(topic); - if (currentHandleTable) { - currentHandleTable.registerHandle(coprocessor); - } else { - this.coprocessors.set(topic, new HandleTable()); - this.coprocessors.get(topic).registerHandle(coprocessor); - } - }); - }; - - if (this.findByGlobalId(coprocessor)) { - return this.remove(coprocessor).then(addHandle); - } else { - return Promise.resolve(addHandle()); + add(handle: Handle): Handle { + const currentHandleTable = this.handles.get(handle.coprocessor.globalId); + if (currentHandleTable) { + this.remove(handle); } + this.handles.set(handle.coprocessor.globalId, handle); + return handle; } /** @@ -41,70 +35,103 @@ class Repository { * coprocessor. Returns undefined otherwise. * @param handle */ - findByGlobalId = (handle: Handle): Handle | undefined => { - for (const [, tableHandle] of this.coprocessors) { - const existingHandle = tableHandle.findHandleById(handle); - if (existingHandle) { - return existingHandle; - } - } - }; + findByGlobalId = (handle: Handle): Handle | undefined => + this.handles.get(handle.coprocessor.globalId); /** * Given a Coprocessor, try to find one with the same global ID and return it * if it exists, returns undefined otherwise * @param coprocessor */ - findByCoprocessor = (coprocessor: Coprocessor): Handle | undefined => { - for (const [, tableHandle] of this.coprocessors) { - const existingHandle = tableHandle.findHandleByCoprocessor(coprocessor); - if (existingHandle) { - return existingHandle; - } - } - }; + findByCoprocessor(coprocessor: Coprocessor): Handle | undefined { + return this.handles.get(coprocessor.globalId); + } /** - * removeCoprocessor method remove a coprocessor from the coprocessor map + * remove a handle from the handle map * @param handle */ - remove = (handle: Handle): Promise => { - return new Promise((resolve, reject) => { - try { - for (const [, handleTable] of this.coprocessors) { - handleTable.deregisterHandle(handle); - } - resolve(); - } catch (e) { - reject( - new Error( - "Error removing coprocessor with ID " + - `${handle.coprocessor.globalId}: ${e.message}` - ) - ); - } - }); + remove = (handle: Handle): void => { + this.handles.delete(handle.coprocessor.globalId); }; + /** - * getCoprocessors returns the map of Handles indexed by their - * topics + * returns a handle list by given ids + * @param ids */ - getCoprocessorsByTopics(): Map { - return this.coprocessors; + getHandlesByCoprocessorIds(ids: bigint[]): Handle[] { + return ids.reduce((prev, id) => { + const handle = this.handles.get(id); + if (handle) { + prev.push(handle); + } + return prev; + }, []); } - /** - * returns the topic list - */ - getTopics(): string[] { - return [...this.coprocessors.keys()]; + size(): number { + return this.handles.size; } - getCoprocessorByTopic(topic: string): HandleTable { - return this.coprocessors.get(topic); + applyCoprocessor( + CoprocessorIds: bigint[], + requestItem: ProcessBatchRequestItem, + handleError: ProcessBatchServer["handleErrorByPolicy"], + createException: ProcessBatchServer["fireException"] + ): Promise[] { + const handles = this.getHandlesByCoprocessorIds(requestItem.coprocessorIds); + if (handles.length != requestItem.coprocessorIds.length) { + const nonExistHandle = requestItem.coprocessorIds.filter( + (id) => !handles.find((handle) => handle.coprocessor.globalId === id) + ); + return [ + createException( + "Coprocessors don't register in wasm engine: " + nonExistHandle + ), + ]; + } else { + return handles.reduce((prev, handle) => { + const apply = requestItem.recordBatch.map((recordBatch) => { + // Convert int16 to uint16 and check if have an unexpected compression + if (((recordBatch.header.attrs >>> 0) & 0x7) != 0) { + throw ( + "Record Batch has an unexpect compression value: baseOffset" + + recordBatch.header.baseOffset + ); + } + try { + //TODO: https://app.clubhouse.io/vectorized/story/1257 + //pass functor to apply function + const resultRecordBatch = handle.coprocessor.apply( + createRecordBatch(recordBatch) + ); + + const results: ProcessBatchReplyItem[] = []; + for (const [key, value] of resultRecordBatch) { + results.push({ + coprocessorId: BigInt(handle.coprocessor.globalId), + ntp: { + ...requestItem.ntp, + topic: `${requestItem.ntp.topic}.$${key}$`, + }, + resultRecordBatch: [value], + }); + } + return Promise.resolve(results); + } catch (e) { + return handleError(handle.coprocessor, requestItem, e); + } + }); + return prev.concat(apply); + }, []); + } } - private readonly coprocessors: Map; + /** + * Map with coprocessor ID -> Handle + * @private + */ + private readonly handles: Map; } export default Repository; diff --git a/src/js/modules/utilities/Checksum.ts b/src/js/modules/utilities/Checksum.ts index 06af75c1e8db..006514128e62 100644 --- a/src/js/modules/utilities/Checksum.ts +++ b/src/js/modules/utilities/Checksum.ts @@ -15,7 +15,7 @@ export const getChecksumFromFile = (filePath: string): Promise => { }); readFileIntoMemory.on("end", () => { try { - const checksum = hash.digest("base64"); + const checksum = hash.digest("hex"); resolve(checksum); } catch (e) { resolve(e); diff --git a/src/js/package-lock.json b/src/js/package-lock.json index e4d2a06e9976..daa56f15fc4e 100644 --- a/src/js/package-lock.json +++ b/src/js/package-lock.json @@ -478,6 +478,11 @@ "resolved": "https://registry.npmjs.org/aproba/-/aproba-1.2.0.tgz", "integrity": "sha512-Y9J6ZjXtoYh8RnXVCMOU/ttDmk1aBjunq9vO0ta5x85WDQiQfUF9sIPBITdbiiIVcBo03Hi3jMxigBtsddlXRw==" }, + "arg": { + "version": "4.1.3", + "resolved": "https://registry.npmjs.org/arg/-/arg-4.1.3.tgz", + "integrity": "sha512-58S9QDqG0Xx27YwPSt9fJxivjYl432YCwfDMfZ+71RAqUrZef7LrKQZ3LHLOwCS4FLNBplP533Zx895SeOCHvA==" + }, "argparse": { "version": "1.0.10", "resolved": "https://registry.npmjs.org/argparse/-/argparse-1.0.10.tgz", @@ -2483,9 +2488,9 @@ "integrity": "sha512-RdJUflcE3cUzKiMqQgsCu06FPu9UdIJO0beYbPhHN4k6apgJtifcoCtT9bcxOpYBtpD2kCM6Sbzg4CausW/PKQ==" }, "js-yaml": { - "version": "3.13.1", - "resolved": "https://registry.npmjs.org/js-yaml/-/js-yaml-3.13.1.tgz", - "integrity": "sha512-YfbcO7jXDdyj0DGxYVSlSeQNHbD7XPWvrVWeVUujrQEoZzWJIRrCPoyk6kL6IAjAG2IolMK4T0hNUe0HOUs5Jw==", + "version": "3.14.0", + "resolved": "https://registry.npmjs.org/js-yaml/-/js-yaml-3.14.0.tgz", + "integrity": "sha512-/4IbIeHcD9VMHFqDR/gQ7EdZdLimOvW2DdcxFjdyyZ9NsbS+ccrXqVWDtab/lRl5AlUqmpBx8EhPaWR+OtY17A==", "requires": { "argparse": "^1.0.7", "esprima": "^4.0.0" @@ -2601,6 +2606,11 @@ "semver": "^5.6.0" } }, + "make-error": { + "version": "1.3.6", + "resolved": "https://registry.npmjs.org/make-error/-/make-error-1.3.6.tgz", + "integrity": "sha512-s8UhlNe7vPKomQhC1qFelMokr/Sc3AgNbso3n74mVPA5LTZwkB9NlXf4XPamLxJE8h0gh73rM94xvwRT2CVInw==" + }, "map-cache": { "version": "0.2.2", "resolved": "https://registry.npmjs.org/map-cache/-/map-cache-0.2.2.tgz", @@ -2852,6 +2862,17 @@ "yargs": "13.3.2", "yargs-parser": "13.1.2", "yargs-unparser": "1.6.0" + }, + "dependencies": { + "js-yaml": { + "version": "3.13.1", + "resolved": "https://registry.npmjs.org/js-yaml/-/js-yaml-3.13.1.tgz", + "integrity": "sha512-YfbcO7jXDdyj0DGxYVSlSeQNHbD7XPWvrVWeVUujrQEoZzWJIRrCPoyk6kL6IAjAG2IolMK4T0hNUe0HOUs5Jw==", + "requires": { + "argparse": "^1.0.7", + "esprima": "^4.0.0" + } + } } }, "move-concurrently": { @@ -4341,6 +4362,25 @@ } } }, + "ts-node": { + "version": "9.0.0", + "resolved": "https://registry.npmjs.org/ts-node/-/ts-node-9.0.0.tgz", + "integrity": "sha512-/TqB4SnererCDR/vb4S/QvSZvzQMJN8daAslg7MeaiHvD8rDZsSfXmNeNumyZZzMned72Xoq/isQljYSt8Ynfg==", + "requires": { + "arg": "^4.1.0", + "diff": "^4.0.1", + "make-error": "^1.1.1", + "source-map-support": "^0.5.17", + "yn": "3.1.1" + }, + "dependencies": { + "diff": { + "version": "4.0.2", + "resolved": "https://registry.npmjs.org/diff/-/diff-4.0.2.tgz", + "integrity": "sha512-58lmxKSA4BNyLz+HHMUzlOEpg09FV+ev6ZMe3vJihgdxzgcwZ8VoEEPmALCZG9LmqfVoNMMKpttIYTVG6uDY7A==" + } + } + }, "tslib": { "version": "1.13.0", "resolved": "https://registry.npmjs.org/tslib/-/tslib-1.13.0.tgz", @@ -4979,6 +5019,11 @@ "lodash": "^4.17.15", "yargs": "^13.3.0" } + }, + "yn": { + "version": "3.1.1", + "resolved": "https://registry.npmjs.org/yn/-/yn-3.1.1.tgz", + "integrity": "sha512-Ux4ygGWsu2c7isFWe8Yu1YluJmqVhxqK2cLXNQA5AcC3QfbGNpM7fu0Y8b/z16pXLnFxZYvWhd3fhBY9DLmC6Q==" } } } diff --git a/src/js/package.json b/src/js/package.json index 6ec8313ed89b..f557a914e930 100644 --- a/src/js/package.json +++ b/src/js/package.json @@ -13,24 +13,27 @@ "prettier:check": "npx prettier --list-different .", "eslint:check": "eslint . --ext .ts", "code:ckeck": "npm run prettier:check && npm run eslint:check", - "generate:serialization": "./generate-entries.sh", + "generate:serialization": "./generate-entries.sh && npm run prettier:format", "postinstall": "npm run generate:serialization", - "build:public": "./compact-public.js" + "build:public": "./compact-public.js", + "start": "sudo npx ts-node ./modules/rpc/service.ts" }, "requires": true, "dependencies": { - "typescript": "3.9.6", + "@types/mocha": "7.0.2", "@types/node": "14.0.23", "fast-crc32c": "2.0.0", - "xxhash": "0.3.0", - "mocha": "7.1.1", - "@types/mocha": "7.0.2", - "typescript-formatter": "7.2.2", - "rewire": "5.0.0", "inotifywait": "1.8.3", + "mocha": "7.1.1", "prettier": "2.0.5", + "rewire": "5.0.0", + "ts-loader": "^8.0.4", + "ts-node": "^9.0.0", + "typescript": "3.9.6", + "typescript-formatter": "7.2.2", "webpack": "^4.44.2", - "ts-loader": "^8.0.4" + "xxhash": "0.3.0", + "js-yaml": "^3.14.0" }, "devDependencies": { "@typescript-eslint/eslint-plugin": "^3.7.0", diff --git a/src/js/test/rpc/server.test.ts b/src/js/test/rpc/server.test.ts index 88b5b4c621f4..1b318361986f 100644 --- a/src/js/test/rpc/server.test.ts +++ b/src/js/test/rpc/server.test.ts @@ -1,217 +1,198 @@ -import { Server } from "../../modules/rpc/server"; -import { join } from "path"; +import { ProcessBatchServer } from "../../modules/rpc/server"; import Repository from "../../modules/supervisors/Repository"; -import { RecordBatch, PolicyError } from "../../modules/public/Coprocessor"; +import { reset, stub } from "sinon"; +import { SupervisorClient } from "../../modules/rpc/serverAndClients/processBatch"; +import { createRecordBatch } from "../../modules/public"; +import { Script_ManagerServer as ManagementServer } from "../../modules/rpc/serverAndClients/server"; import FileManager from "../../modules/supervisors/FileManager"; -import assert = require("assert"); import { ProcessBatchRequest } from "../../modules/domain/generatedRpc/generatedClasses"; -import { - createHandle, - createMockCoprocessor, - createHandleTable, -} from "../testUtilities"; +import { createHandle } from "../testUtilities"; +import { PolicyError, RecordBatch } from "../../modules/public/Coprocessor"; +import assert = require("assert"); +const INotifyWait = require("inotifywait"); const sinon = require("sinon"); -const net = require("net"); -const fakeFileManager = require("../../modules/supervisors/FileManager"); -const createRequest = (topic?: string): ProcessBatchRequest => { - const coprocessorRecordBatch = { - records: [{ value: Buffer.from("Example") }], - header: {}, - } as RecordBatch; - return { - npt: { - topic: topic || "topicA", - namespace: "name", - partition: 1, - }, - recordBatch: coprocessorRecordBatch, - } as ProcessBatchRequest; -}; +let server: ProcessBatchServer; +let client: SupervisorClient; +const spyFireExceptionServer = sinon.stub( + ProcessBatchServer.prototype, + "fireException" +); +const spyGetHandles = sinon.stub( + Repository.prototype, + "getHandlesByCoprocessorIds" +); +const spyFindByCoprocessor = sinon.stub( + Repository.prototype, + "findByCoprocessor" +); +const spyMoveHandle = sinon.stub(FileManager.prototype, "moveCoprocessorFile"); +const spyDeregister = sinon.spy(FileManager.prototype, "deregisterCoprocessor"); +let manageServer: ManagementServer; -const createFakeServer = (afterApply?: (value) => void, fileManagerStub?) => { - const fakeFolder = join(__dirname); - const fakeSocket = net.Socket(); - // eslint-disable-next-line @typescript-eslint/no-empty-function - fakeSocket.write = afterApply || (() => {}); - fileManagerStub || sinon.stub(fakeFileManager); - const createSer = sinon.stub(net, "createServer"); - createSer.value((fn) => fn(fakeSocket)); - const fakeServer = new Server(fakeFolder, fakeFolder, fakeFolder); - return [fakeServer, fakeSocket]; +const createProcessBatchRequest = ( + ids: bigint[] = [], + topic = "topic" +): ProcessBatchRequest => { + return { + requests: [ + { + recordBatch: [createRecordBatch()], + coprocessorIds: ids, + ntp: { partition: 1, namespace: "", topic }, + }, + ], + }; }; describe("Server", function () { describe("Given a Request", function () { - afterEach(sinon.restore); + stub(FileManager.prototype, "readActiveCoprocessor"); + stub(FileManager.prototype, "updateRepositoryOnNewFile"); + stub(INotifyWait.prototype); - it( - "shouldn't apply any coprocessor if the repository is " + "empty", - function (done) { - const repository = sinon.spy( - Repository.prototype, - "getCoprocessorsByTopics" - ); - const apply = sinon.spy(Server.prototype, "applyCoprocessor"); - const afterApplyCoprocessor = () => { - assert(repository.calledOnce); - assert(apply.calledOnce); - apply.firstCall.returnValue.then((result) => { - assert.deepStrictEqual(result, []); - done(); - }); - }; - const [, fakeSocket] = createFakeServer(afterApplyCoprocessor); - fakeSocket.emit("readable", createRequest()); - } - ); + beforeEach(() => { + reset(); + spyFireExceptionServer.reset(); + spyGetHandles.reset(); + spyMoveHandle.reset(); + spyFindByCoprocessor.reset(); + spyDeregister.resetHistory(); + manageServer = new ManagementServer(); + manageServer.disable_copros = () => Promise.resolve({ inputs: [0] }); + manageServer.listen(43118); + server = new ProcessBatchServer("a", "i", "s"); + server.listen(4300); + client = new SupervisorClient(4300); + }); + + afterEach(async () => { + client.close(); + await server.closeConnection(); + await manageServer.closeConnection(); + }); it( - "shouldn't apply any Coprocessor if there isn't one defined for" + - " the Request's topic", + "should fail when the given recordProcessBatch doesn't have " + + "coprocessor ids", function (done) { - const repository = sinon.stub( - Repository.prototype, - "getCoprocessorsByTopics" - ); - repository.returns(new Map().set("topicB", [createMockCoprocessor()])); - const apply = sinon.spy(Server.prototype, "applyCoprocessor"); - const [, fakeSocket] = createFakeServer(() => { - assert(repository.called); - assert(repository.getCall(0).returnValue.size > 0); - assert(!repository.getCall(0).returnValue.has(request.npt.topic)); - assert(apply.called); - apply.firstCall.returnValue.then((values) => { - assert.deepStrictEqual(values, []); - done(); - }); + spyFireExceptionServer.returns(Promise.resolve({ result: [] })); + client.process_batch(createProcessBatchRequest()).then(() => { + assert( + spyFireExceptionServer.calledWith( + "Bad request: request without coprocessor ids" + ) + ); + done(); }); - const request = createRequest(); - fakeSocket.emit("readable", request); } ); - it( - "should apply the right Coprocessor for the Request's " + "topic", - function (done) { - const repository = sinon.stub( - Repository.prototype, - "getCoprocessorsByTopics" + it("should fail if there isn't coprocessor that processBatch contain", function (done) { + spyGetHandles.returns([]); + spyFireExceptionServer.returns( + Promise.resolve([ + { + coprocessorId: "", + ntp: { namespace: "", topic: "", partition: 1 }, + resultRecordBatch: [createRecordBatch()], + }, + ]) + ); + client.process_batch(createProcessBatchRequest([BigInt(1)])).then(() => { + assert( + spyFireExceptionServer.calledWith( + "Coprocessors don't register in wasm engine: 1" + ) ); - repository.returns(new Map().set("topicA", createHandleTable())); - const apply = sinon.spy(Server.prototype, "applyCoprocessor"); - const request = createRequest("topicA"); - const [, fakeSocket] = createFakeServer(() => { - assert(repository.called); - assert.deepStrictEqual(repository.getCall(0).args, []); - assert(repository.getCall(0).returnValue.size === 1); - assert(apply.called); - apply.firstCall.returnValue - .then((values) => { - // TODO: https://app.clubhouse.io/vectorized/story/1031 - assert.deepStrictEqual(values, [undefined]); - done(); - }) - .catch(done); + done(); + }); + }); + + it("should apply the right Coprocessor for the Request's topic", function (done) { + const coprocessorId = BigInt(1); + spyGetHandles.returns([ + createHandle({ + apply: () => + new Map([ + [ + "newTopic", + createRecordBatch({ + header: { recordCount: 1 }, + records: [{ value: Buffer.from("new VALUE") }], + }), + ], + ]), + }), + ]); + client + .process_batch(createProcessBatchRequest([coprocessorId], "BaseTopic")) + .then((res) => { + assert(spyGetHandles.called); + assert(spyGetHandles.calledWith([coprocessorId])); + const resultBatch = res.result[0]; + assert.strictEqual(resultBatch.ntp.topic, "BaseTopic.$newTopic$"); + assert.deepStrictEqual( + resultBatch.resultRecordBatch.flatMap(({ records }) => + records.map((r) => r.value.toString()) + ), + ["new VALUE"] + ); + done(); }); - fakeSocket.emit("readable", request); - } - ); + }); describe("Given an Error when applying the Coprocessor", function () { - it( - "should skip the Request, if ErrorPolicy is " + "SkipOnFailure", - function (done) { - const repository = sinon.stub( - Repository.prototype, - "getCoprocessorsByTopics" - ); - const badApplyCoprocessor = (record: RecordBatch) => - // eslint-disable-next-line @typescript-eslint/ban-ts-comment - // @ts-ignore - record.bad.attribute; - repository.returns( - new Map().set( - "topicA", - createHandleTable( - createHandle( - createMockCoprocessor( - undefined, - null, - null, - badApplyCoprocessor - ) - ) - ) - ) - ); - const apply = sinon.spy(Server.prototype, "applyCoprocessor"); - const handle = sinon.spy(Server.prototype, "handleErrorByPolicy"); - const deregister = sinon.spy( - FileManager.prototype, - "deregisterCoprocessor" - ); - sinon - .stub(FileManager.prototype, "readActiveCoprocessor") - .returns(Promise.resolve(true)); - const [fakeServer, fakeSocket] = createFakeServer(() => { - assert(apply.called); - assert(handle.called); - assert(!deregister.called); - fakeServer.closeCoprocessorManager().then(done).catch(done); - }, true); - const request = createRequest("topicA"); - fakeSocket.emit("readable", request); - } - ); + it("should skip the Request, if ErrorPolicy is SkipOnFailure", function (done) { + const badApplyCoprocessor = (record: RecordBatch) => + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-ignore + record.bad.attribute; - it( - "should deregister the Coprocessor, if ErrorPolicy is " + "Deregister", - function (done) { - const repository = sinon.stub( - Repository.prototype, - "getCoprocessorsByTopics" - ); - const badApplyCoprocessor = (record: RecordBatch) => - // eslint-disable-next-line @typescript-eslint/ban-ts-comment - // @ts-ignore - record.bad.attribute; - repository.returns( - new Map().set( - "topicA", - createHandleTable( - createHandle( - createMockCoprocessor( - undefined, - null, - PolicyError.Deregister, - badApplyCoprocessor - ) - ) - ) - ) - ); - const apply = sinon.spy(Server.prototype, "applyCoprocessor"); - const handle = sinon.spy(Server.prototype, "handleErrorByPolicy"); - const deregister = sinon.stub( - FileManager.prototype, - "deregisterCoprocessor" - ); - deregister.returns(Promise.resolve(true)); - sinon - .stub(FileManager.prototype, "readActiveCoprocessor") - .returns(Promise.resolve(true)); - const [fakeServer, fakeSocket] = createFakeServer(() => { - assert(apply.called); - assert(handle.called); - assert(deregister.called); - fakeServer.closeCoprocessorManager().then(done).catch(done); - }, true); - const request = createRequest("topicA"); - fakeSocket.emit("readable", request); - } - ); + spyGetHandles.returns([ + createHandle({ + apply: badApplyCoprocessor, + policyError: PolicyError.SkipOnFailure, + inputTopics: ["topic"], + }), + ]); + + client + .process_batch(createProcessBatchRequest([BigInt(1)])) + .then((res) => { + assert(!spyDeregister.called); + assert.deepStrictEqual(res.result, []); + done(); + }); + }); + + it("should deregister the Coprocessor, if ErrorPolicy is Deregister", function (done) { + const badApplyCoprocessor = (record: RecordBatch) => + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-ignore + record.bad.attribute; + const handle = createHandle({ + apply: badApplyCoprocessor, + policyError: PolicyError.Deregister, + inputTopics: ["topic"], + }); + spyMoveHandle.returns(Promise.resolve(handle)); + spyFindByCoprocessor.returns(handle); + spyGetHandles.returns([handle]); + + client + .process_batch(createProcessBatchRequest([BigInt(1)])) + .then((res) => { + assert(spyDeregister.called); + assert(spyGetHandles.called); + assert(spyFindByCoprocessor.called); + assert(spyMoveHandle.called); + assert(spyMoveHandle.calledWith(handle)); + assert.deepStrictEqual(res.result, []); + done(); + }); + }); }); }); }); diff --git a/src/js/test/supervisors/FileManager.test.ts b/src/js/test/supervisors/FileManager.test.ts index 36c8fb168664..c3a1c16245a8 100644 --- a/src/js/test/supervisors/FileManager.test.ts +++ b/src/js/test/supervisors/FileManager.test.ts @@ -3,8 +3,8 @@ import { stub, spy, reset } from "sinon"; import FileManager from "../../modules/supervisors/FileManager"; import Repository from "../../modules/supervisors/Repository"; import { - ManagementClient, - ManagementServer, + Script_ManagerClient as ManagementClient, + Script_ManagerServer as ManagementServer, } from "../../modules/rpc/serverAndClients/server"; import * as fs from "fs"; import { createHandle } from "../testUtilities"; @@ -83,11 +83,7 @@ describe("FileManager", () => { assert(getCoprocessor.calledWith("active/file")); assert(moveCoprocessor.called); assert(moveCoprocessor.calledWith(handle, "active")); - assert(repo.getCoprocessorsByTopics().size === 1); - assert.deepStrictEqual( - [...repo.getCoprocessorsByTopics().keys()], - handle.coprocessor.inputTopics - ); + assert(repo.size() === 1); done(); }, 150); } diff --git a/src/js/test/supervisors/Repository.test.ts b/src/js/test/supervisors/Repository.test.ts index 87c8a81d1ea0..e2bcbb68f8b3 100644 --- a/src/js/test/supervisors/Repository.test.ts +++ b/src/js/test/supervisors/Repository.test.ts @@ -6,101 +6,43 @@ import { createHandle, createMockCoprocessor } from "../testUtilities"; describe("Repository", function () { it("should initialize with an empty map", function () { const repository = new Repository(); - assert(repository.getCoprocessorsByTopics().size === 0); + assert(repository.size() === 0); }); - it("should add a coprocessor to the repository", function () { + it("should add a handle to the repository", function () { const repository = new Repository(); repository.add(createHandle()); - assert(repository.getCoprocessorsByTopics().size === 1); - }); - - it("should add a coprocessor for each topic in coprocessor", function () { - const repository = new Repository(); - const topics = ["topicA", "topicB"]; - repository.add(createHandle({ inputTopics: topics })); - assert(repository.getCoprocessorsByTopics().size === 2); - topics.forEach((topic) => { - assert(repository.getCoprocessorsByTopics().has(topic)); - }); + assert(repository.size() === 1); }); it( - "should replace a coprocessor if a new one with the same globalId " + + "should replace a handle if a new one with the same globalId " + "is added.", - function (done) { + function () { const topicA = "topicA"; const topicB = "topicB"; const coprocessorA = createMockCoprocessor(BigInt(1), [topicA]); const coprocessorB = createMockCoprocessor(BigInt(1), [topicB]); const repository = new Repository(); repository.add(createHandle(coprocessorA)); - const result1 = repository.getCoprocessorsByTopics(); - assert(result1.get(topicA).size() === 1); + assert(repository.findByCoprocessor(coprocessorA)); assert( - result1 - .get(topicA) - .findHandleByCoprocessor(coprocessorA) + repository + .findByCoprocessor(coprocessorA) .coprocessor.inputTopics.includes(topicA) ); - repository.add(createHandle(coprocessorB)).then(() => { - const result2 = repository.getCoprocessorsByTopics(); - assert(!!result2.get(topicA)); - assert(result2.get(topicB).size() === 1); - assert( - result2 - .get(topicB) - .findHandleByCoprocessor(coprocessorB) - .coprocessor.inputTopics.includes(topicB) - ); - done(); - }); - } - ); - - it("should remove a coprocessor from all topics", function (done) { - const topicA = "topicA"; - const topicB = "topicB"; - const topicC = "topicC"; - const repository = new Repository(); - const handleCoprocessorA = createHandle({ - inputTopics: [topicA, topicB, topicC], - }); - const handleCoprocessorB = createHandle({ - globalId: BigInt(2), - inputTopics: [topicC], - }); - repository.add(handleCoprocessorA); - repository.add(handleCoprocessorB); - - const expect1: [string, number][] = [ - [topicA, 1], - [topicB, 1], - [topicC, 2], - ]; - expect1.forEach(([topic, coprocessorNumber]) => { + repository.add(createHandle(coprocessorB)); + assert(repository.findByCoprocessor(coprocessorA)); + assert(repository.findByCoprocessor(coprocessorB)); assert( - repository.getCoprocessorsByTopics().get(topic).size() === - coprocessorNumber + repository + .findByCoprocessor(coprocessorB) + .coprocessor.inputTopics.includes(topicB) ); - }); - repository.remove(handleCoprocessorA).then(() => { - const expect2: [string, number][] = [ - [topicA, 0], - [topicB, 0], - [topicC, 1], - ]; - expect2.forEach(([topic, coprocessorNumber]) => { - assert( - repository.getCoprocessorsByTopics().get(topic).size() === - coprocessorNumber - ); - }); - done(); - }); - }); + } + ); - it("should find a coprocessor by another Handle", function () { + it("should find a handle by another Handle", function () { const repository = new Repository(); const handleA = createHandle(); const handleB = createHandle({ @@ -112,7 +54,7 @@ describe("Repository", function () { assert(!repository.findByGlobalId(handleB)); }); - it("should find a coprocessor by another Coprocessor", function () { + it("should find a handle by another Coprocessor", function () { const repository = new Repository(); const handleA = createHandle(); const handleB = createHandle({ diff --git a/src/js/test/testUtilities.ts b/src/js/test/testUtilities.ts index e61d68ff1257..30a24458cea8 100644 --- a/src/js/test/testUtilities.ts +++ b/src/js/test/testUtilities.ts @@ -1,12 +1,12 @@ import { Coprocessor, PolicyError } from "../modules/public/Coprocessor"; import { Handle } from "../modules/domain/Handle"; -import { HandleTable } from "../modules/supervisors/HandleTable"; +import { createRecordBatch } from "../modules/public/"; export const createMockCoprocessor = ( globalId: Coprocessor["globalId"] = BigInt(1), inputTopics: Coprocessor["inputTopics"] = ["topicA"], policyError: Coprocessor["policyError"] = PolicyError.SkipOnFailure, - apply: Coprocessor["apply"] = () => undefined + apply: Coprocessor["apply"] = () => new Map([["result", createRecordBatch()]]) ): Coprocessor => ({ globalId, inputTopics, @@ -24,11 +24,3 @@ export const createHandle = (coprocessor?: Partial): Handle => ({ checksum: "check", filename: "file", }); - -export const createHandleTable = ( - handle: Partial = createHandle() -): HandleTable => { - const handleTable = new HandleTable(); - handleTable.registerHandle({ ...createHandle(), ...handle }); - return handleTable; -}; diff --git a/src/js/test/utilities/CoprocessorTest.ts b/src/js/test/utilities/CoprocessorTest.ts index 122af37fd183..bacd3072b0cb 100644 --- a/src/js/test/utilities/CoprocessorTest.ts +++ b/src/js/test/utilities/CoprocessorTest.ts @@ -9,8 +9,8 @@ class CoprocessorTest implements Coprocessor { inputTopics = ["topicA"]; policyError = PolicyError.Deregister; - apply(record: RecordBatch): RecordBatch { - return record; + apply(record: RecordBatch): Map { + return new Map([["test", record]]); } } diff --git a/src/js/tsconfig.json b/src/js/tsconfig.json index 7b9e0951ab09..e40c4a022579 100644 --- a/src/js/tsconfig.json +++ b/src/js/tsconfig.json @@ -9,7 +9,8 @@ }, "newLine": "lf", "downlevelIteration": true, - "lib": ["esnext"] + "lib": ["esnext"], + "skipLibCheck": true }, "include": [ "modules/rpc", @@ -19,7 +20,7 @@ "test" ], "exclude": [ - "node_modules/", + "node_modules/*", "packages/" ] }