-
Notifications
You must be signed in to change notification settings - Fork 214
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Unexpected Error Waiting for Headers #337
Comments
Sometimes, instead of %GRPC.RPCError{status: 2, message: "unexpected when waiting for headers: {:data, <<some data>>}"} |
I found the problem. For the context, we use grpc streams. Gun by default sends its messages to a calling process. This means that if you use grpc in a GenServer and you get a message before calling That's why we get |
This seems like a user-side problem on handling gRPC's messages. I'm closing the issue because of that, but feel free to comment if any improvements on our side could be done! Maybe on using via a multi-purpose GenServer (which I wouldn't advise due to serial message processing issues), you could match on gRPC's messages specifically to handle differently than the generic ones. |
Doesn't it mean that we can't basically use grpc streams in a GenSever? Even if we knew which messages from gun are gRPC related, we can't feed them into gRPC library. The library assumes we always call GRPC.Stub.recv which under the hood calls :gun.await. This will fails if we call GRPC.Stub.recv too late as we will end up in a state where some of gun messages are received by our GenServer's handle_info and some other are received by :gun.await 🤔 To make our implemenetation work, we had to wrap calls to the grpc library into a plain process with a dedicated Another option could be using Mint adapter as Mint provides both passive and active mode but unfortunately it crashes. |
I took a look into the Gun adapter and I can't actually see what would make your GenServer receive the message that should be sent to Gun instead. Do you have a minimal example of how you're calling gRPC to get the error? |
I can try to explain this in more detail:
In other words, I belive it's all about gun working in the active mode - i.e. it sends HTTP2 responses to a calling process which in the case of grpc is always a process that calls Does it sound clearer now? I can also try to create some minimal reproducible example in a free time. |
Ah, I think I get it more clearly now. A minimal reproduction would be helpful! However, I think that the issue at hand has more to do with event ordering than anything. From the workflow described, it seems that your GenServer logic expects to only receive data after you finish sending it, and that doesn't seem to correspond to reality. A possible (paliative) solution would be to spawn a separate linked process to your GenServer, such that this process will only receive messages from gun (or maybe It might make sense for this process encapsulation to be absorbed by this library, but I'd need the reproduction to experiment with this. |
I can come up with an example for this this week. I've working on an internal library at my workplace to manage connection pools using elixir-grpc and I know the best way to show this |
@mickel8 Not sure if you found a workaround for this, but here's what I got In my code design, I've wrap the processing of grpc streams inside a gen_server. What I was able to do was:
You're right on your assumption about this being problem with Gun in active mode. There are some other underlying issues with the Gun adapter leaking the messages to the process that start the connection as well. Anyways, I hope you find the bellow code useful defmodule Processor do
#.. my own code
defp request(%{stream: nil, processor_meta: processor_meta} = _state, request) do
channel = ConnectionPool.channel!(processor_meta[:pool])
%{grpc_module: grpc_module, rpc_name: rpc_name} = processor_meta[:stream_rpc]
stub = Module.concat(grpc_module, Stub)
stream =
stub
|> apply(rpc_name, [channel])
|> GRPC.Stub.send_request(request)
# Start the gRPC Streaming request
{:ok, ex_stream} = GRPC.Stub.recv(stream, timeout: :infinity)
my_pid = self()
stream_processor_sup = processor_meta[:task_supervisor]
task =
Task.Supervisor.async(stream_processor_sup, fn ->
# Wrap this gRPC Streaming processing inside a task and store the task pid
ex_stream
|> Stream.each(&send(my_pid, {:process_response, &1}))
|> Stream.run()
send(my_pid, {:process_response, :done})
end)
{stream, task.pid, make_ref()}
end
defp do_send_request(%{stream: stream} = state, query_request) do
stream = GRPC.Stub.send_request(stream, query_request)
{stream, nil, state.ref}
end
@impl GenServer
def handle_info(
{:process_response, {:ok, response}},
state
) do
state.processor_meta[:receive_function].(response)
{:noreply, state}
end
def handle_info({:process_response, :done}, state) do
{:noreply, state, {:continue, :reset}}
end
def handle_info({_ref, {:process_response, :done}}, state) do
{:noreply, state}
end
def handle_info({:process_response, {:error, error}}, state) do
Logger.error("received error from stream: #{inspect(error)}")
{:noreply, state, {:continue, :reset}}
end
def handle_info(msg, state) do
# async actions
case msg do
# gun specific messages
{:gun_data, _pid, _ref, _is_fin, _data} ->
if is_pid(state.stream_task), do: send(state.stream_task, msg)
{:gun_error, _pid, _ref, _reason} ->
if is_pid(state.stream_task), do: send(state.stream_task, msg)
{:gun_trailers, _pid, _ref, _headers} ->
if is_pid(state.stream_task), do: send(state.stream_task, msg)
# messages received when the async task finish
{_ref, :ok} ->
:ok
{:DOWN, _ref, :process, _pid, :normal} ->
:ok
_other ->
Logger.warning("unexpected msg: #{inspect(msg)}")
end
# gen server reply
case msg do
{:DOWN, _ref, :process, _pid, :normal} -> {:noreply, state, {:continue, :reconnect}}
_other -> {:noreply, state}
end
end
end |
Describe the bug
We're using elixir grpc to communicate with the Google Cloud Speech V2 models. Intermittently, the call fails and it's not entirely clear why. Is this based on a bad / unexpected response from the speech v2 model, or a subtle bug in the elixir-grpc
%GRPC.RPCError{status: 2, message: "unexpected when waiting for headers: {:trailers, [{"grpc-status", "0"}, {"content-disposition", "attachment"}, {"x-goog-ext-~~~~-bin", "DR+~~~~=="}, {"x-goog-ext-~~~~-bin", "DWG0/EA="}]}"}
To Reproduce
Steps to reproduce the behavior: Unclear unfortunately. Seems indeterminate based on the Google response
Additional context
Looking for some guidance on how to diagnose and correct. We're catching and handling the error, but it'd be nice not to have the error in the first place.
The text was updated successfully, but these errors were encountered: