Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

can't pass an object parameter to pipeline.run() #5636

Closed
faaany opened this issue Aug 28, 2023 · 10 comments
Closed

can't pass an object parameter to pipeline.run() #5636

faaany opened this issue Aug 28, 2023 · 10 comments

Comments

@faaany
Copy link
Contributor

faaany commented Aug 28, 2023

Describe the bug
I implemented a FastAPITokenStreamingHandler object in the RESTAPI layer following the example mentioned in this PR and pass it as a parameter to pipeline.run(). The only difference to that example is that now I am using a pipeline instead of promptnode. My code is like this:

params['stream_handler'] = FastAPITokenStreamingHandler(g)
pipeline.run(query=prompt, params={"agent_node": params})

But I then got the following error message:
Error message

Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/site-packages/haystack/pipelines/base.py", line 556, in run
    node_output, stream_id = self._run_node(node_id, node_input)
  File "/opt/conda/lib/python3.10/site-packages/haystack/pipelines/base.py", line 469, in _run_node
    return self.graph.nodes[node_id]["component"]._dispatch_run(**node_input)
  File "/opt/conda/lib/python3.10/site-packages/haystack/nodes/base.py", line 201, in _dispatch_run
    return self._dispatch_run_general(self.run, **kwargs)
  File "/opt/conda/lib/python3.10/site-packages/haystack/nodes/base.py", line 219, in _dispatch_run_general
    arguments = deepcopy(kwargs)
  File "/opt/conda/lib/python3.10/copy.py", line 146, in deepcopy
    y = copier(x, memo)
  File "/opt/conda/lib/python3.10/copy.py", line 231, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/opt/conda/lib/python3.10/copy.py", line 146, in deepcopy
    y = copier(x, memo)
  File "/opt/conda/lib/python3.10/copy.py", line 231, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/opt/conda/lib/python3.10/copy.py", line 146, in deepcopy
    y = copier(x, memo)
  File "/opt/conda/lib/python3.10/copy.py", line 231, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/opt/conda/lib/python3.10/copy.py", line 172, in deepcopy
    y = _reconstruct(x, memo, *rv)
  File "/opt/conda/lib/python3.10/copy.py", line 271, in _reconstruct
    state = deepcopy(state, memo)
  File "/opt/conda/lib/python3.10/copy.py", line 146, in deepcopy
    y = copier(x, memo)
  File "/opt/conda/lib/python3.10/copy.py", line 231, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/opt/conda/lib/python3.10/copy.py", line 172, in deepcopy
    y = _reconstruct(x, memo, *rv)
  File "/opt/conda/lib/python3.10/copy.py", line 271, in _reconstruct
    state = deepcopy(state, memo)
  File "/opt/conda/lib/python3.10/copy.py", line 146, in deepcopy
    y = copier(x, memo)
  File "/opt/conda/lib/python3.10/copy.py", line 231, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/opt/conda/lib/python3.10/copy.py", line 172, in deepcopy
    y = _reconstruct(x, memo, *rv)
  File "/opt/conda/lib/python3.10/copy.py", line 271, in _reconstruct
    state = deepcopy(state, memo)
  File "/opt/conda/lib/python3.10/copy.py", line 146, in deepcopy
    y = copier(x, memo)
  File "/opt/conda/lib/python3.10/copy.py", line 231, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/opt/conda/lib/python3.10/copy.py", line 161, in deepcopy
    rv = reductor(4)
TypeError: cannot pickle '_thread.lock' object

Expected behavior
After debugging, I found that the problem lies in this line of code. Is there any particular reason that we use deep copy instead of shallow copy here? If no, I would suggest using shallow copy because the deep copy prevents passing an object to the underlying pipeline.run()

FAQ Check

System:

  • OS:
  • GPU/CPU:
  • Haystack version (commit or version number):
  • DocumentStore:
  • Reader:
  • Retriever:
@julian-risch
Copy link
Member

@faaany Thank you for the detailed description of this issue. I see that you already opened a pull request to fix it, which is great! 🤩 @ZanSara will review your pull request.

@ZanSara
Copy link
Contributor

ZanSara commented Sep 4, 2023

Hey @faaany sorry for the late reply. I've seen your PR and I have to say I'm really torn on that... I believe there are reasons why we deepcopy kwargs, but the Pipeline code is so intricate I have a hard time proving whether it's necessary or not without investing a lot of time.

So, before I go into a long chase to understand whether your change is safe or not, let's understand how blocking it is.

Is it really necessary to pass the streaming handler as a parameter of the pipeline? What's the difference between this approach and giving such handler to PromptNode directly?

@faaany
Copy link
Contributor Author

faaany commented Sep 6, 2023

@ZanSara

for my usage scenario, this is necessary, because my pipeline is deployed using a pipeline YAML with REST API. If my front-end application doesn't send any requests to the backend, the pipeline won't run. So I need to pass the streaming handler param during runtime in order to get a streamed response.

If I give such a handler to PromptNode directly, I will see the streamed output on my backend machine, but not in the front-end.

I saw that many users in Discord asked for this feature. I think they also have similar problems as I do.

@faaany
Copy link
Contributor Author

faaany commented Sep 6, 2023

Hey @faaany sorry for the late reply. I've seen your PR and I have to say I'm really torn on that... I believe there are reasons why we deepcopy kwargs, but the Pipeline code is so intricate I have a hard time proving whether it's necessary or not without investing a lot of time.

So, before I go into a long chase to understand whether your change is safe or not, let's understand how blocking it is.

Is it really necessary to pass the streaming handler as a parameter of the pipeline? What's the difference between this approach and giving such handler to PromptNode directly?

And yes, it is a bit risky to replace deep copy with shallow copy. How about this approach: we can explicitly drop the stream_handler param from the kwargs and after the deep copy, we add it back to the parameter list?

@faaany
Copy link
Contributor Author

faaany commented Sep 11, 2023

@vblagoje maybe you have more insights on this issue?

@ZanSara
Copy link
Contributor

ZanSara commented Sep 12, 2023

Hey @faaany sorry for the late reply 🙈

for my usage scenario, this is necessary, because my pipeline is deployed using a pipeline YAML with REST API. If my front-end application doesn't send any requests to the backend, the pipeline won't run. So I need to pass the streaming handler param during runtime in order to get a streamed response.

I'm not sure I get this. How do you pass a streaming handler over HTTP? 👀 Maybe I didn't understand your explanation. Feel free to share a code example or a colab if that helps!

@silvanocerza
Copy link
Contributor

@faaany I'm thinking about an alternative solution to your problem that wouldn't require this change. I'm not sure whether it would work though.

I'm taking into consideration #5697 too.

My idea would be to create a stream handler similar to FastAPITokenStreamingHandler, but it creates its own thread generator that is publicly accessible.

Then we'd change _process_request_streaming to find the streaming handler being used and if it has a thread generator use it to read the tokens as they are generated. You're already checking if the last node is a PromptNode so given that information we can go through it and find the invocation layer streamer.

It should be accessible with a similar logic:

prompt_node = PromptNode()
prompt_node.prompt_model.model_invocation_layer.model_input_kwargs["stream_handler"]

Not elegant but it should do the job.

I didn't test it but I believe that conceptually it should work, even if a bit convoluted.

I very much prefer if we don't touch the Pipeline.run() method as it could create a cascade of issues that I really wouldn't want to handle right now. 😅

@faaany
Copy link
Contributor Author

faaany commented Sep 14, 2023

@silvanocerza thanks for the idea! I will give it a try and let you know how it goes.

@silvanocerza
Copy link
Contributor

Hey @faaany, any news on this? I'm curious to know if you managed to make it work. 👀

@faaany
Copy link
Contributor Author

faaany commented Oct 16, 2023

Hey @faaany, any news on this? I'm curious to know if you managed to make it work. 👀

Hi @silvanocerza, sorry for my late response! We had long national holidays here in China and I also took some additional days off... I was not able to set this up before my holidays, but I will look at it this week and get back to you soon. Thank you for answering!

@github-actions github-actions bot added the stale label May 12, 2024
@github-actions github-actions bot closed this as not planned Won't fix, can't repro, duplicate, stale May 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants