From fcbd7f2eb7f1ed6070652a17a0e1a2abb563410b Mon Sep 17 00:00:00 2001 From: Kristaps Fabians Geikins Date: Thu, 24 Oct 2024 16:07:27 +0300 Subject: [PATCH] chore(server): gendo IoC #2 - createRenderRequestFactory --- .../modules/blobstorage/domain/operations.ts | 20 +++ .../blobstorage/services/management.ts | 18 +-- .../server/modules/gendo/domain/operations.ts | 14 +++ packages/server/modules/gendo/domain/types.ts | 3 + .../modules/gendo/graph/resolvers/index.ts | 70 ++++------- .../modules/gendo/repositories/index.ts | 19 +++ .../server/modules/gendo/services/index.ts | 117 +++++++++++++----- 7 files changed, 168 insertions(+), 93 deletions(-) create mode 100644 packages/server/modules/gendo/domain/operations.ts create mode 100644 packages/server/modules/gendo/domain/types.ts create mode 100644 packages/server/modules/gendo/repositories/index.ts diff --git a/packages/server/modules/blobstorage/domain/operations.ts b/packages/server/modules/blobstorage/domain/operations.ts index 2f611e1e48..81a290f0e0 100644 --- a/packages/server/modules/blobstorage/domain/operations.ts +++ b/packages/server/modules/blobstorage/domain/operations.ts @@ -3,6 +3,7 @@ import { BlobStorageItemInput } from '@/modules/blobstorage/domain/types' import { MaybeNullOrUndefined, Nullable } from '@speckle/shared' +import type { Readable } from 'stream' export type GetBlobs = (params: { streamId?: MaybeNullOrUndefined @@ -30,3 +31,22 @@ export type GetBlobMetadataCollection = (params: { limit?: Nullable cursor?: Nullable }) => Promise<{ blobs: BlobStorageItem[]; cursor: Nullable }> + +export type UploadFileStream = ( + storeFileStream: (params: { + objectKey: string + fileStream: Readable | Buffer + }) => Promise<{ + fileHash: string + }>, + params1: { + streamId: string + userId: string | undefined + }, + params2: { + blobId: string + fileName: string + fileType: string | undefined + fileStream: Readable | Buffer + } +) => Promise<{ blobId: string; fileName: string; fileHash: string }> diff --git a/packages/server/modules/blobstorage/services/management.ts b/packages/server/modules/blobstorage/services/management.ts index 9a7b0d9d1f..94ce4a9dab 100644 --- a/packages/server/modules/blobstorage/services/management.ts +++ b/packages/server/modules/blobstorage/services/management.ts @@ -2,13 +2,13 @@ import { DeleteBlob, GetBlobMetadata, UpdateBlob, + UploadFileStream, UpsertBlob } from '@/modules/blobstorage/domain/operations' import { BlobStorageItem } from '@/modules/blobstorage/domain/types' import { BadRequestError } from '@/modules/shared/errors' import { getFileSizeLimitMB } from '@/modules/shared/helpers/envHelper' import { MaybeAsync } from '@speckle/shared' -import { Readable } from 'stream' /** * File size limit in bytes @@ -16,20 +16,8 @@ import { Readable } from 'stream' export const getFileSizeLimit = () => getFileSizeLimitMB() * 1024 * 1024 export const uploadFileStreamFactory = - (deps: { upsertBlob: UpsertBlob; updateBlob: UpdateBlob }) => - async ( - storeFileStream: (params: { - objectKey: string - fileStream: Readable | Buffer - }) => Promise<{ fileHash: string }>, - params1: { streamId: string; userId: string | undefined }, - params2: { - blobId: string - fileName: string - fileType: string | undefined - fileStream: Readable | Buffer - } - ) => { + (deps: { upsertBlob: UpsertBlob; updateBlob: UpdateBlob }): UploadFileStream => + async (storeFileStream, params1, params2) => { const { streamId, userId } = params1 const { blobId, fileName, fileType, fileStream } = params2 diff --git a/packages/server/modules/gendo/domain/operations.ts b/packages/server/modules/gendo/domain/operations.ts new file mode 100644 index 0000000000..e93a68a608 --- /dev/null +++ b/packages/server/modules/gendo/domain/operations.ts @@ -0,0 +1,14 @@ +import { GendoAiRenderInput } from '@/modules/core/graph/generated/graphql' +import { GendoAIRender } from '@/modules/gendo/domain/types' +import { NullableKeysToOptional } from '@speckle/shared' +import { SetOptional } from 'type-fest' + +export type StoreRender = ( + input: NullableKeysToOptional> +) => Promise + +export type CreateRenderRequest = ( + input: GendoAiRenderInput & { + userId: string + } +) => Promise diff --git a/packages/server/modules/gendo/domain/types.ts b/packages/server/modules/gendo/domain/types.ts new file mode 100644 index 0000000000..19f7b30a22 --- /dev/null +++ b/packages/server/modules/gendo/domain/types.ts @@ -0,0 +1,3 @@ +import { GendoAIRenderRecord } from '@/modules/gendo/helpers/types' + +export type GendoAIRender = GendoAIRenderRecord diff --git a/packages/server/modules/gendo/graph/resolvers/index.ts b/packages/server/modules/gendo/graph/resolvers/index.ts index e19a43fe00..8d8ff3f351 100644 --- a/packages/server/modules/gendo/graph/resolvers/index.ts +++ b/packages/server/modules/gendo/graph/resolvers/index.ts @@ -2,26 +2,39 @@ import { authorizeResolver } from '@/modules/shared' import { Resolvers } from '@/modules/core/graph/generated/graphql' import { Roles } from '@speckle/shared' import { - getGendoAIAPIEndpoint, - getGendoAIKey, - getServerOrigin -} from '@/modules/shared/helpers/envHelper' -import { - createGendoAIRenderRequest, + createRenderRequestFactory, getGendoAIRenderRequest, getGendoAIRenderRequests } from '@/modules/gendo/services' -import crs from 'crypto-random-string' import { ProjectSubscriptions, - filteredSubscribe + filteredSubscribe, + publish } from '@/modules/shared/utils/subscriptions' import { getRateLimitResult, isRateLimitBreached } from '@/modules/core/services/ratelimiter' import { RateLimitError } from '@/modules/core/errors/ratelimit' -import { GendoRenderRequestError } from '@/modules/gendo/errors/main' +import { uploadFileStreamFactory } from '@/modules/blobstorage/services/management' +import { + updateBlobFactory, + upsertBlobFactory +} from '@/modules/blobstorage/repositories' +import { storeFileStream } from '@/modules/blobstorage/objectStorage' +import { storeRenderFactory } from '@/modules/gendo/repositories' +import { db } from '@/db/knex' + +const createRenderRequest = createRenderRequestFactory({ + uploadFileStream: uploadFileStreamFactory({ + upsertBlob: upsertBlobFactory({ db }), + updateBlob: updateBlobFactory({ db }) + }), + storeFileStream, + storeRender: storeRenderFactory({ db }), + publish, + fetch +}) export = { Version: { @@ -58,44 +71,11 @@ export = { throw new RateLimitError(rateLimitResult) } - const endpoint = getGendoAIAPIEndpoint() as string - const bearer = getGendoAIKey() as string - const webhookUrl = `${getServerOrigin()}/api/thirdparty/gendo` - - // TODO Fire off request to gendo api & get generationId, create record in db. Note: use gendo api key from env - const gendoRequestBody = { - userId: ctx.userId, - depthMap: args.input.baseImage, - prompt: args.input.prompt, - webhookUrl - } - - const response = await fetch(endpoint, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - Authorization: `Bearer ${bearer}` - }, - body: JSON.stringify(gendoRequestBody) + await createRenderRequest({ + ...args.input, + userId: ctx.userId! }) - const status = response.status - - if (status === 200) { - const body = (await response.json()) as { status: string; generationId: string } - await createGendoAIRenderRequest({ - ...args.input, - userId: ctx.userId as string, - status: body.status, - gendoGenerationId: body.generationId, - id: crs({ length: 10 }) - }) - } else { - const body = await response.json().catch((e) => ({ error: `${e}` })) - throw new GendoRenderRequestError('Failed to enqueue gendo render.', { - info: { body } - }) - } return true } }, diff --git a/packages/server/modules/gendo/repositories/index.ts b/packages/server/modules/gendo/repositories/index.ts new file mode 100644 index 0000000000..8df36fbd3b --- /dev/null +++ b/packages/server/modules/gendo/repositories/index.ts @@ -0,0 +1,19 @@ +import { GendoAIRenders } from '@/modules/core/dbSchema' +import { StoreRender } from '@/modules/gendo/domain/operations' +import { GendoAIRenderRecord } from '@/modules/gendo/helpers/types' +import { Knex } from 'knex' +import { pick } from 'lodash' + +const tables = { + gendoAIRenders: (db: Knex) => db(GendoAIRenders.name) +} + +export const storeRenderFactory = + (deps: { db: Knex }): StoreRender => + async (input) => { + const [newRec] = await tables + .gendoAIRenders(deps.db) + .insert(pick(input, GendoAIRenders.withoutTablePrefix.cols)) + .returning('*') + return newRec + } diff --git a/packages/server/modules/gendo/services/index.ts b/packages/server/modules/gendo/services/index.ts index ba2fc554cd..d531146974 100644 --- a/packages/server/modules/gendo/services/index.ts +++ b/packages/server/modules/gendo/services/index.ts @@ -1,8 +1,11 @@ import crs from 'crypto-random-string' import { GendoAIRenders, knex } from '@/modules/core/dbSchema' -import { GendoAiRenderInput } from '@/modules/core/graph/generated/graphql' import { GendoAIRenderRecord } from '@/modules/gendo/helpers/types' -import { ProjectSubscriptions, publish } from '@/modules/shared/utils/subscriptions' +import { + ProjectSubscriptions, + publish, + PublishSubscription +} from '@/modules/shared/utils/subscriptions' import { Merge } from 'type-fest' import { storeFileStream } from '@/modules/blobstorage/objectStorage' import { uploadFileStreamFactory } from '@/modules/blobstorage/services/management' @@ -11,49 +14,97 @@ import { upsertBlobFactory } from '@/modules/blobstorage/repositories' import { db } from '@/db/knex' +import { CreateRenderRequest, StoreRender } from '@/modules/gendo/domain/operations' +import { UploadFileStream } from '@/modules/blobstorage/domain/operations' +import { + getGendoAIAPIEndpoint, + getGendoAIKey, + getServerOrigin +} from '@/modules/shared/helpers/envHelper' +import { GendoRenderRequestError } from '@/modules/gendo/errors/main' const uploadFileStream = uploadFileStreamFactory({ upsertBlob: upsertBlobFactory({ db }), updateBlob: updateBlobFactory({ db }) }) -export async function createGendoAIRenderRequest( - input: GendoAiRenderInput & { - userId: string - status: string - id: string - gendoGenerationId?: string - } -) { - const baseImageBuffer = Buffer.from( - input.baseImage.replace(/^data:image\/\w+;base64,/, ''), - 'base64' - ) - - const blobId = crs({ length: 10 }) - await uploadFileStream( - storeFileStream, - { streamId: input.projectId, userId: input.userId }, - { - blobId, - fileName: `gendo_base_image_${blobId}.png`, - fileType: 'png', - fileStream: baseImageBuffer +export const createRenderRequestFactory = + (deps: { + uploadFileStream: UploadFileStream + storeFileStream: typeof storeFileStream + storeRender: StoreRender + publish: PublishSubscription + fetch: typeof fetch + }): CreateRenderRequest => + async (input) => { + const endpoint = getGendoAIAPIEndpoint() + const bearer = getGendoAIKey() as string + const webhookUrl = `${getServerOrigin()}/api/thirdparty/gendo` + + // TODO: Fn handles too many concerns, refactor (e.g. the client fetch call) + // TODO: Fire off request to gendo api & get generationId, create record in db. Note: use gendo api key from env + const gendoRequestBody = { + userId: input.userId, + depthMap: input.baseImage, + prompt: input.prompt, + webhookUrl } - ) - input.baseImage = blobId + const response = await fetch(endpoint, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Authorization: `Bearer ${bearer}` + }, + body: JSON.stringify(gendoRequestBody) + }) + + const status = response.status + if (status !== 200) { + const body = await response.json().catch((e) => ({ error: `${e}` })) + throw new GendoRenderRequestError('Failed to enqueue gendo render.', { + info: { body } + }) + } - const [newRecord] = await GendoAIRenders.knex().insert(input, '*') + const gendoResponseBody = (await response.json()) as { + status: string + generationId: string + } + const baseImageBuffer = Buffer.from( + input.baseImage.replace(/^data:image\/\w+;base64,/, ''), + 'base64' + ) - publish(ProjectSubscriptions.ProjectVersionGendoAIRenderCreated, { - projectVersionGendoAIRenderCreated: newRecord - }) + const blobId = crs({ length: 10 }) + await deps.uploadFileStream( + deps.storeFileStream, + { streamId: input.projectId, userId: input.userId }, + { + blobId, + fileName: `gendo_base_image_${blobId}.png`, + fileType: 'png', + fileStream: baseImageBuffer + } + ) - // TODO: Schedule a timeout fail after x minutes + input.baseImage = blobId - return newRecord as GendoAIRenderRecord -} + const newRecord = await deps.storeRender({ + ...input, + status: gendoResponseBody.status, + gendoGenerationId: gendoResponseBody.generationId, + id: crs({ length: 10 }) + }) + + deps.publish(ProjectSubscriptions.ProjectVersionGendoAIRenderCreated, { + projectVersionGendoAIRenderCreated: newRecord + }) + + // TODO: Schedule a timeout fail after x minutes + + return newRecord + } export async function updateGendoAIRenderRequest( input: Partial<{ status: string; responseImage: string }> & {