-
-
Notifications
You must be signed in to change notification settings - Fork 2.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP refactor container and queuing system #206
WIP refactor container and queuing system #206
Conversation
…or-container-and-queuing-system
@@ -61,13 +66,24 @@ export class AssetController { | |||
const savedAsset = await this.assetService.createUserAsset(authUser, assetInfo, file.path, file.mimetype); | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* @param job asset-uploaded | ||
*/ | ||
@Process('asset-uploaded') | ||
async processUploadedVideo(job: Job) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the starting point of the microservice that handles post-upload-related tasks. It will then put the additional jobs into the corresponding queues.
- Generate JPEG Thumbnail
- Generate Webp Thumbnail <-> if JPEG thumbnail exists
- EXIF extractor
- Reverse Geocoding
- Video Encoding
This is the second stage in the diagram.
private assetRepository: Repository<AssetEntity>, | ||
) {} | ||
|
||
@Process({ name: 'mp4-conversion', concurrency: 1 }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The concurrency: 1
option limits one worker in the pool to handle this process, any additional videos that are added to the queue will be in the waiting queue until the worker is idle.
} | ||
} | ||
|
||
async runFFMPEGPipeLine(asset: AssetEntity, savedEncodedPath: string): Promise<void> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This job at first didn't conform to the concurrency setting due to the callback functions of the FFmpeg command. I have to refactor to a separate async function so that the process
function return after the job is either success or failed
Hey, @zackpollard This PR is completed in terms of refactoring to microservices and the queue system. There seems to be a lot of files changed due to the refactoring of the project structure into monorepo to share codes between the microservices and the core server. It would be easier to see the changes if you pull it to your local. I've added some comments on places that matter the most in this PR. If you have time, can you take a look and leave some feedback? Thanks, |
Looks very solid to me, I haven't had time to review all the code, but I took a look at the main parts, adding to the original asset uploaded queue, the task that creates all the other messages off the back of that and a couple of the individual jobs and it looks pretty solid. |
@@ -8,7 +8,7 @@ COPY package.json package-lock.json ./ | |||
|
|||
RUN apk add --update-cache build-base python3 libheif vips-dev vips ffmpeg | |||
|
|||
RUN npm install | |||
RUN npm install --legacy-peer-deps |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unsure why this would be needed, what was the issue that required this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the build issue from Github Action that required this to be resolved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see, with yarn you can fix this with custom resolutions, unsure what the procedure is for npm as I don't use npm for any of my projects anymore
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am tempted to switch to pnpm as well, probably a task for a rainy day
@@ -61,13 +66,24 @@ export class AssetController { | |||
const savedAsset = await this.assetService.createUserAsset(authUser, assetInfo, file.path, file.mimetype); | |||
|
|||
if (uploadFiles.thumbnailData != null && savedAsset) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You check savedAsset here but then have an else that also pushes to the queue, can savedAsset fail, if not, why check it here, if it can, then the queue may be pushed too in a situation where the savedAsset is false or undefined.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes sense, it should only check for the thumbnail is there or not, if there the process of saving an asset has a problem, it would throw an error in the assetService
, and so this check won't be necessary.
{ jobId: savedAsset.id }, | ||
); | ||
} else { | ||
await this.assetUploadedQueue.add( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would probably refactor this if statement, the only difference between the two asset queue pushes is the thumbnail, so could just do something like
await this.assetUploadedQueue.add(
'asset-uploaded',
{ asset: assetWithThumbnail, fileName: file.originalname, fileSize: file.size, hasThumbnail: uploadFiles.thumbnailData != null},
{ jobId: savedAsset.id },
);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is nice but I think it applies to this case since the asset payload property of asset
received two different payloads with different content. The content will be used later in the queue. If we try to keep the if statement simple, the refactor will be like this
let assetWithThumbnail: AssetEntity;
const savedAsset = await this.assetService.createUserAsset(authUser, assetInfo, file.path, file.mimetype);
if (uploadFiles.thumbnailData != null) {
assetWithThumbnail = await this.assetService.updateThumbnailInfo(
savedAsset,
uploadFiles.thumbnailData[0].path,
);
}
await this.assetUploadedQueue.add(
'asset-uploaded',
{
asset: uploadFiles.thumbnailData != null ? assetWithThumbnail : savedAsset,
fileName: file.originalname,
fileSize: file.size,
hasThumbnail: uploadFiles.thumbnailData != null,
},
{ jobId: savedAsset.id },
);
There is less but also less verbose I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see, yes I missed the different asset usage, original way is all good then, easier to understand what's going on that way
.updateEntity(true) | ||
.execute(); | ||
|
||
return updatedAsset.raw[0]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe not one for right now, but i've noticed a trend where you're using typeORM's raw option as the output along with pulling the first entry from the array, there should be a nicer typed way to do this with the ability for typeorm to understand that there shouldn't be more than one entity in the response due to you doing an assert on the ID column.
defaultJobOptions: { | ||
attempts: 3, | ||
removeOnComplete: true, | ||
removeOnFail: false, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the behaviour with this config? Will it retry three times then remove it from the queue, or does it sit in the queue dead after 3 attempts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will sit in the failed job section. So the mechanism of retrying the job is on us, whether to listening to the failed event and put it back to the queue of have a cronjob to do that (we are using cronjob for two most important jobs - the Wepb generator and video encoding, will be thumbnail generator later on as well)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, just don't want failed jobs to get queued up in the background and fill the queues up causing memory issues
}), | ||
}), | ||
BullModule.registerQueue({ | ||
name: 'thumbnail-generator-queue', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's two definitions for this queue, is that intended?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no duplication on the same module. But you have to define the Queue on the module that uses it, i.e schedule-tasks.module.ts
, otherwise you get this error
immich-server_1 | [Nest] 158 - 06/09/2022, 1:18:43 AM ERROR [ExceptionHandler] Nest can't resolve dependencies of the ScheduleTasksService (AssetEntityRepository, ?, BullQueue_video-conversion-queue). Please make sure that the argument BullQueue_thumbnail-generator-queue at index [1] is available in the ScheduleTasksModule context.
immich-server_1 |
immich-server_1 | Potential solutions:
immich-server_1 | - If BullQueue_thumbnail-generator-queue is a provider, is it part of the current ScheduleTasksModule?
immich-server_1 | - If BullQueue_thumbnail-generator-queue is exported from a separate @Module, is that module imported within ScheduleTasksModule?
immich-server_1 | @Module({
immich-server_1 | imports: [ /* the Module containing BullQueue_thumbnail-generator-queue */ ]
immich-server_1 | })
immich-server_1 |
immich-server_1 | Error: Nest can't resolve dependencies of the ScheduleTasksService (AssetEntityRepository, ?, BullQueue_video-conversion-queue). Please make sure that the argument BullQueue_thumbnail-generator-queue at index [1] is available in the ScheduleTasksModule context.
immich-server_1 |
immich-server_1 | Potential solutions:
immich-server_1 | - If BullQueue_thumbnail-generator-queue is a provider, is it part of the current ScheduleTasksModule?
immich-server_1 | - If BullQueue_thumbnail-generator-queue is exported from a separate @Module, is that module imported within ScheduleTasksModule?
immich-server_1 | @Module({
immich-server_1 | imports: [ /* the Module containing BullQueue_thumbnail-generator-queue */ ]
immich-server_1 | })
immich-server_1 |
immich-server_1 | at Injector.lookupComponentInParentModules (/usr/src/app/node_modules/@nestjs/core/injector/injector.js:202:19)
immich-server_1 | at processTicksAndRejections (node:internal/process/task_queues:96:5)
immich-server_1 | at Injector.resolveComponentInstance (/usr/src/app/node_modules/@nestjs/core/injector/injector.js:157:33)
immich-server_1 | at resolveParam (/usr/src/app/node_modules/@nestjs/core/injector/injector.js:108:38)
immich-server_1 | at async Promise.all (index 1)
immich-server_1 | at Injector.resolveConstructorParams (/usr/src/app/node_modules/@nestjs/core/injector/injector.js:123:27)
immich-server_1 | at Injector.loadInstance (/usr/src/app/node_modules/@nestjs/core/injector/injector.js:52:9)
immich-server_1 | at Injector.loadProvider (/usr/src/app/node_modules/@nestjs/core/injector/injector.js:74:9)
immich-server_1 | at async Promise.all (index 3)
immich-server_1 | at InstanceLoader.createInstancesOfProviders (/usr/src/app/node_modules/@nestjs/core/injector/instance-loader.js:44:9)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I lied... From the microservicess.module.ts
, you can export BullModule
. Then in schedule-tasks.module.ts
, including the MicroservicesModule
, then you don't have to register the queue again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, this is an interesting find. If I include the MicroservicesModule
in the ScheduleTaskModule
, it will run this piece of code
BullModule.forRootAsync({
useFactory: async () => ({
redis: {
host: process.env.REDIS_HOSTNAME || 'immich_redis',
port: 6379,
},
}),
}),
And somehow it creates an instance of BullModule on the server
as well. So when you put items into the queue, it will act as a worker and pull that item from the queue and start working on it as well. So it defeats the purpose of running one by one for video transcoding that we are aiming to do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair enough, it seems weird to me that you have to redefine it along with the config too, as which config does it decide to take?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the part I am unsure of, so I redefine the queue to have the same setting just in case.
}, | ||
}), | ||
BullModule.registerQueue({ | ||
name: 'video-conversion-queue', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's two definitions for this queue, is that intended?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved above.
if (hasThumbnail) { | ||
// The jobs below depends on the existence of jpeg thumbnail | ||
await this.thumbnailGeneratorQueue.add('generate-webp-thumbnail', { asset }, { jobId: randomUUID() }); | ||
await this.metadataExtractionQueue.add('tag-image', { asset }, { jobId: randomUUID() }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tag-image and detect-object should happen regardless of whether the thumbnail exists
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function that decodes the image into tensors of TensorFlow expects these formats BMP, GIF, JPEG, and PNG
. So if you upload a raw formats (HEIC, HEIF, or DNG), this function won't be able to run
const decodedImage = tf.node.decodeImage(image, 3) as tf.Tensor3D;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the tensorflow code runs on the thumbnail? Won't that impact the accuracy of the detection and tagging due to the reduced resolution?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but it is also the trade-off between CPU (or GPU) usage. Below is a good read/discussion about this topic
https://stackoverflow.com/questions/45149023/tensorflow-for-image-recognition-size-of-images
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's talking specifically about training the model, right? Presumably you're using a pre-trained model and just pushing images into it for analysis and tagging, I would imagine for that task using the full resolution image is preferable, especially if that's what the model was trained on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, the full resolution would take more CPU to run through all the pixels, so it is the trade-off between accuracy and CPU usage. I guess we can also throttle this queue with the concurrency level as well. I guess we can have this feature in the future as a setting in the admin panel as well since other people might have their own model that takes a specific resolution. So the ML pipeline can generate the "thumbnail" image with the specified resolution from the user.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea for sure, would be interested to see how much more CPU time it takes to run on the full resolution image, with my understanding of how the models tend to work I wouldn't expect it to actually be that much more intensive for analysis, just for training. Can take a look at that later though, was just interesting to see how this works.
console.log(asset); | ||
} | ||
|
||
@Process('generate-webp-thumbnail') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the default concurrency for all these jobs? Generating the WebP thumbnails could still be quite intensive so probably don't want the background tasks going crazy and taking CPU, at least before it can be configurable for the user
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The concurrency for these jobs, if not specified, is set to the maximum allowable workers, which is the available Redis Client, I think it would make sense to set the level as 2 or so in this case
https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md#queuegetworkers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea I think setting some sensible limit would be good just so it doesn't eat CPU. Longer term I think these options arre great candidates for the admin settings page as some kind of advanced options.
…209) * Added changelog page * Fixed issues based on PR comments * Fixed issue with video transcoding run on the server * Change entry point content for backward combatibility when starting up server * Added announcement box * Added error handling to failed silently when the app version checking is not able to make the request to GITHUB * Added new version announcement overlay * Update message * Added messages * Added logic to check and show announcement * Add method to handle saving new version * Added button to dimiss the acknowledge message
Refactor the server-side of the application into monorepo architecture with microservices handled in a queue system, separate from the main server to ensure high I/O is handled smoothly.
The queue system is inspired by @zackpollard with the diagram below
This PR would be treated as breaking changes to the system (although no user data will be lost). Users will need to update the docker-compose file to restore all of the functionality of the app