Skip to content

Commit

Permalink
chore(server): gendo IoC #2 - createRenderRequestFactory
Browse files Browse the repository at this point in the history
  • Loading branch information
fabis94 committed Oct 24, 2024
1 parent 6d53308 commit fcbd7f2
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 93 deletions.
20 changes: 20 additions & 0 deletions packages/server/modules/blobstorage/domain/operations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
BlobStorageItemInput
} from '@/modules/blobstorage/domain/types'
import { MaybeNullOrUndefined, Nullable } from '@speckle/shared'
import type { Readable } from 'stream'

export type GetBlobs = (params: {
streamId?: MaybeNullOrUndefined<string>
Expand Down Expand Up @@ -30,3 +31,22 @@ export type GetBlobMetadataCollection = (params: {
limit?: Nullable<number>
cursor?: Nullable<string>
}) => Promise<{ blobs: BlobStorageItem[]; cursor: Nullable<string> }>

export type UploadFileStream = (
storeFileStream: (params: {
objectKey: string
fileStream: Readable | Buffer
}) => Promise<{
fileHash: string
}>,
params1: {
streamId: string
userId: string | undefined
},
params2: {
blobId: string
fileName: string
fileType: string | undefined
fileStream: Readable | Buffer
}
) => Promise<{ blobId: string; fileName: string; fileHash: string }>
18 changes: 3 additions & 15 deletions packages/server/modules/blobstorage/services/management.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,22 @@ import {
DeleteBlob,
GetBlobMetadata,
UpdateBlob,
UploadFileStream,
UpsertBlob
} from '@/modules/blobstorage/domain/operations'
import { BlobStorageItem } from '@/modules/blobstorage/domain/types'
import { BadRequestError } from '@/modules/shared/errors'
import { getFileSizeLimitMB } from '@/modules/shared/helpers/envHelper'
import { MaybeAsync } from '@speckle/shared'
import { Readable } from 'stream'

/**
* File size limit in bytes
*/
export const getFileSizeLimit = () => getFileSizeLimitMB() * 1024 * 1024

export const uploadFileStreamFactory =
(deps: { upsertBlob: UpsertBlob; updateBlob: UpdateBlob }) =>
async (
storeFileStream: (params: {
objectKey: string
fileStream: Readable | Buffer
}) => Promise<{ fileHash: string }>,
params1: { streamId: string; userId: string | undefined },
params2: {
blobId: string
fileName: string
fileType: string | undefined
fileStream: Readable | Buffer
}
) => {
(deps: { upsertBlob: UpsertBlob; updateBlob: UpdateBlob }): UploadFileStream =>
async (storeFileStream, params1, params2) => {
const { streamId, userId } = params1
const { blobId, fileName, fileType, fileStream } = params2

Expand Down
14 changes: 14 additions & 0 deletions packages/server/modules/gendo/domain/operations.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { GendoAiRenderInput } from '@/modules/core/graph/generated/graphql'
import { GendoAIRender } from '@/modules/gendo/domain/types'
import { NullableKeysToOptional } from '@speckle/shared'
import { SetOptional } from 'type-fest'

export type StoreRender = (
input: NullableKeysToOptional<SetOptional<GendoAIRender, 'createdAt' | 'updatedAt'>>
) => Promise<GendoAIRender>

export type CreateRenderRequest = (
input: GendoAiRenderInput & {
userId: string
}
) => Promise<GendoAIRender>
3 changes: 3 additions & 0 deletions packages/server/modules/gendo/domain/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import { GendoAIRenderRecord } from '@/modules/gendo/helpers/types'

export type GendoAIRender = GendoAIRenderRecord
70 changes: 25 additions & 45 deletions packages/server/modules/gendo/graph/resolvers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,39 @@ import { authorizeResolver } from '@/modules/shared'
import { Resolvers } from '@/modules/core/graph/generated/graphql'
import { Roles } from '@speckle/shared'
import {
getGendoAIAPIEndpoint,
getGendoAIKey,
getServerOrigin
} from '@/modules/shared/helpers/envHelper'
import {
createGendoAIRenderRequest,
createRenderRequestFactory,
getGendoAIRenderRequest,
getGendoAIRenderRequests
} from '@/modules/gendo/services'
import crs from 'crypto-random-string'
import {
ProjectSubscriptions,
filteredSubscribe
filteredSubscribe,
publish
} from '@/modules/shared/utils/subscriptions'
import {
getRateLimitResult,
isRateLimitBreached
} from '@/modules/core/services/ratelimiter'
import { RateLimitError } from '@/modules/core/errors/ratelimit'
import { GendoRenderRequestError } from '@/modules/gendo/errors/main'
import { uploadFileStreamFactory } from '@/modules/blobstorage/services/management'
import {
updateBlobFactory,
upsertBlobFactory
} from '@/modules/blobstorage/repositories'
import { storeFileStream } from '@/modules/blobstorage/objectStorage'
import { storeRenderFactory } from '@/modules/gendo/repositories'
import { db } from '@/db/knex'

const createRenderRequest = createRenderRequestFactory({
uploadFileStream: uploadFileStreamFactory({
upsertBlob: upsertBlobFactory({ db }),
updateBlob: updateBlobFactory({ db })
}),
storeFileStream,
storeRender: storeRenderFactory({ db }),
publish,
fetch
})

export = {
Version: {
Expand Down Expand Up @@ -58,44 +71,11 @@ export = {
throw new RateLimitError(rateLimitResult)
}

const endpoint = getGendoAIAPIEndpoint() as string
const bearer = getGendoAIKey() as string
const webhookUrl = `${getServerOrigin()}/api/thirdparty/gendo`

// TODO Fire off request to gendo api & get generationId, create record in db. Note: use gendo api key from env
const gendoRequestBody = {
userId: ctx.userId,
depthMap: args.input.baseImage,
prompt: args.input.prompt,
webhookUrl
}

const response = await fetch(endpoint, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${bearer}`
},
body: JSON.stringify(gendoRequestBody)
await createRenderRequest({
...args.input,
userId: ctx.userId!
})

const status = response.status

if (status === 200) {
const body = (await response.json()) as { status: string; generationId: string }
await createGendoAIRenderRequest({
...args.input,
userId: ctx.userId as string,
status: body.status,
gendoGenerationId: body.generationId,
id: crs({ length: 10 })
})
} else {
const body = await response.json().catch((e) => ({ error: `${e}` }))
throw new GendoRenderRequestError('Failed to enqueue gendo render.', {
info: { body }
})
}
return true
}
},
Expand Down
19 changes: 19 additions & 0 deletions packages/server/modules/gendo/repositories/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { GendoAIRenders } from '@/modules/core/dbSchema'
import { StoreRender } from '@/modules/gendo/domain/operations'
import { GendoAIRenderRecord } from '@/modules/gendo/helpers/types'
import { Knex } from 'knex'
import { pick } from 'lodash'

const tables = {
gendoAIRenders: (db: Knex) => db<GendoAIRenderRecord>(GendoAIRenders.name)
}

export const storeRenderFactory =
(deps: { db: Knex }): StoreRender =>
async (input) => {
const [newRec] = await tables
.gendoAIRenders(deps.db)
.insert(pick(input, GendoAIRenders.withoutTablePrefix.cols))
.returning('*')
return newRec
}
117 changes: 84 additions & 33 deletions packages/server/modules/gendo/services/index.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import crs from 'crypto-random-string'
import { GendoAIRenders, knex } from '@/modules/core/dbSchema'
import { GendoAiRenderInput } from '@/modules/core/graph/generated/graphql'
import { GendoAIRenderRecord } from '@/modules/gendo/helpers/types'
import { ProjectSubscriptions, publish } from '@/modules/shared/utils/subscriptions'
import {
ProjectSubscriptions,
publish,
PublishSubscription
} from '@/modules/shared/utils/subscriptions'
import { Merge } from 'type-fest'
import { storeFileStream } from '@/modules/blobstorage/objectStorage'
import { uploadFileStreamFactory } from '@/modules/blobstorage/services/management'
Expand All @@ -11,49 +14,97 @@ import {
upsertBlobFactory
} from '@/modules/blobstorage/repositories'
import { db } from '@/db/knex'
import { CreateRenderRequest, StoreRender } from '@/modules/gendo/domain/operations'
import { UploadFileStream } from '@/modules/blobstorage/domain/operations'
import {
getGendoAIAPIEndpoint,
getGendoAIKey,
getServerOrigin
} from '@/modules/shared/helpers/envHelper'
import { GendoRenderRequestError } from '@/modules/gendo/errors/main'

const uploadFileStream = uploadFileStreamFactory({
upsertBlob: upsertBlobFactory({ db }),
updateBlob: updateBlobFactory({ db })
})

export async function createGendoAIRenderRequest(
input: GendoAiRenderInput & {
userId: string
status: string
id: string
gendoGenerationId?: string
}
) {
const baseImageBuffer = Buffer.from(
input.baseImage.replace(/^data:image\/\w+;base64,/, ''),
'base64'
)

const blobId = crs({ length: 10 })
await uploadFileStream(
storeFileStream,
{ streamId: input.projectId, userId: input.userId },
{
blobId,
fileName: `gendo_base_image_${blobId}.png`,
fileType: 'png',
fileStream: baseImageBuffer
export const createRenderRequestFactory =
(deps: {
uploadFileStream: UploadFileStream
storeFileStream: typeof storeFileStream
storeRender: StoreRender
publish: PublishSubscription
fetch: typeof fetch
}): CreateRenderRequest =>
async (input) => {
const endpoint = getGendoAIAPIEndpoint()
const bearer = getGendoAIKey() as string
const webhookUrl = `${getServerOrigin()}/api/thirdparty/gendo`

// TODO: Fn handles too many concerns, refactor (e.g. the client fetch call)
// TODO: Fire off request to gendo api & get generationId, create record in db. Note: use gendo api key from env
const gendoRequestBody = {
userId: input.userId,
depthMap: input.baseImage,
prompt: input.prompt,
webhookUrl
}
)

input.baseImage = blobId
const response = await fetch(endpoint, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${bearer}`
},
body: JSON.stringify(gendoRequestBody)
})

const status = response.status
if (status !== 200) {
const body = await response.json().catch((e) => ({ error: `${e}` }))
throw new GendoRenderRequestError('Failed to enqueue gendo render.', {
info: { body }
})
}

const [newRecord] = await GendoAIRenders.knex().insert(input, '*')
const gendoResponseBody = (await response.json()) as {
status: string
generationId: string
}
const baseImageBuffer = Buffer.from(
input.baseImage.replace(/^data:image\/\w+;base64,/, ''),
'base64'
)

publish(ProjectSubscriptions.ProjectVersionGendoAIRenderCreated, {
projectVersionGendoAIRenderCreated: newRecord
})
const blobId = crs({ length: 10 })
await deps.uploadFileStream(
deps.storeFileStream,
{ streamId: input.projectId, userId: input.userId },
{
blobId,
fileName: `gendo_base_image_${blobId}.png`,
fileType: 'png',
fileStream: baseImageBuffer
}
)

// TODO: Schedule a timeout fail after x minutes
input.baseImage = blobId

return newRecord as GendoAIRenderRecord
}
const newRecord = await deps.storeRender({
...input,
status: gendoResponseBody.status,
gendoGenerationId: gendoResponseBody.generationId,
id: crs({ length: 10 })
})

deps.publish(ProjectSubscriptions.ProjectVersionGendoAIRenderCreated, {
projectVersionGendoAIRenderCreated: newRecord
})

// TODO: Schedule a timeout fail after x minutes

return newRecord
}

export async function updateGendoAIRenderRequest(
input: Partial<{ status: string; responseImage: string }> & {
Expand Down

0 comments on commit fcbd7f2

Please sign in to comment.