Skip to content

Releases: OmidSa75/CeleryAI

Initialize worker

09 Dec 10:59
Compare
Choose a tag to compare

One of the main obstacles of the previous solutions is the model will be loaded on the first request in each worker. To overcome this we use signals modules from celery.

class BaseTask(celery.Task):
    def __init__(self) -> None:
        super().__init__()
        signals.worker_init.connect(self.on_worker_init)  # this line send a signal to initialize the worker.

    def on_worker_init(self, *args, **kwargs):
        print("Loading AI Model ...")
        self._ai_model = load_model()
        print("AI Model Loaded")

    @property
    def ai_model(self):
        if self._ai_model is None:
            self._ai_model = load_model()
        return self._ai_model

However, this method only works with those models that are loaded on the CPU. If you want to send the model to the CUDA devices the code falls into the following problem.

Cannot re-initialize CUDA in forked subprocess.

So the celery needs to be executed on the spawn method. Celery doesn't support spawn yet. By following the method we force the execution to run with the spawn method.

    os.environ["FORKED_BY_MULTIPROCESSING"] = "1"
    if os.name != "nt":
        from billiard import context
        context._force_start_method("spawn")
        print('Context is changed to SPAWN')

But the signals doesn't work with it.

class BaseTask(celery.Task):
    def __init__(self) -> None:
        super().__init__()
        self._ai_model = None
    @property
    def ai_model(self):
        if self._ai_model is None:
            self._ai_model = load_model()
        return self._ai_model

The other problem that the spawn method creates is the celery_app.task doesn't register the task we should define include in the celery setup.

    celery_app = celery.Celery(
        'main_celery',
        backend='redis://:Eyval@localhost:6379/1',
        broker='redis://:Eyval@localhost:6379/1',
        task_default_queue='AIWithCelery',
        include=['celery_app']
    )

Load by request in BaseTask property

09 Dec 08:52
Compare
Choose a tag to compare

Workflow

Instead of loading in the function directly, A BaseTask class is implemented which has a property to return the model, But still, it will be loaded by the first request and all the functionality is as same as the v1.0.0.

A property is implemented to return the AI model.

class BaseTask(celery.Task):
    def __init__(self) -> None:
        super().__init__()
        self._ai_model = None
        

    @property
    def ai_model(self):
        if self._ai_model is None:  # load the model if it has not
            self._ai_model = load_model()
            print("Load AI Model")
        return self._ai_model

The inference_model is the same as before but the model load section is deleted.

def inference_model(self):
    # if not hasattr(_self, 'ai_model'):
    #     _self.ai_model = _self.load_model()
    #     print('Load AI model')

    input_x = [torch.rand(3, 300, 400).to(
        device), torch.rand(3, 500, 400).to(device)]
    prediction = self.ai_model(input_x)
    print('Hi, this is a inference function')
    return str(type(prediction))

And the task registration is as below:

    celery_app.task(name='inference_model', bind=True,
                    base=BaseTask)(inference_model)

Load by request

09 Dec 08:17
Compare
Choose a tag to compare

Workflow

This workflow will load an AI model by receiving the first request from the client in the worker. The model will be loaded until all workers have a loaded model so the number of loaded AI models will increase to the number of workers.

The load_model function returns the AI model which it is used in the worker.

The inference_function is designed to get inference from the model with the input data But it loads the model sn the first call.

def inference_model(_self):
    if not hasattr(_self, 'ai_model'):  # If model not loaded 
        _self.ai_model = _self.load_model()  
        print('Load AI model')

    input_x = [torch.rand(3, 300, 400).to(
        device), torch.rand(3, 500, 400).to(device)]
    prediction = _self.ai_model(input_x)
    print('Hi, this is a inference function')
    return str(type(prediction))

To register the inference_model task with celery:

    celery_app.task(name='inference_model', bind=True,
                    load_model=load_model)(inference_model)