Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: accept keywords for opts instead of maps #255

Merged
merged 9 commits into from
Jul 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions lib/grpc/client/adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,21 @@ defmodule GRPC.Client.Adapter do
@typedoc "Determines if the headers have finished being read."
@type fin :: :fin | :nofin

@callback connect(Channel.t(), map()) :: {:ok, Channel.t()} | {:error, any()}
@callback connect(channel :: Channel.t(), opts :: keyword()) ::
{:ok, Channel.t()} | {:error, any()}

@callback disconnect(Channel.t()) :: {:ok, Channel.t()} | {:error, any()}
@callback disconnect(channel :: Channel.t()) :: {:ok, Channel.t()} | {:error, any()}

@callback send_request(Stream.t(), binary(), map()) :: Stream.t()
@callback send_request(stream :: Stream.t(), contents :: binary(), opts :: keyword()) ::
Stream.t()

@callback recv_headers(map(), map(), map()) ::
@callback recv_headers(stream :: map(), headers :: map(), opts :: keyword()) ::
{:ok, %{String.t() => String.t()}, fin()} | {:error, GRPC.RPCError.t()}

@callback recv_data_or_trailers(map(), map(), map()) ::
@callback recv_data_or_trailers(
stream :: map(),
trailers_or_metadata :: map(),
opts :: keyword()
) ::
{:data, binary()} | {:trailers, binary()} | {:error, GRPC.RPCError.t()}
end
12 changes: 9 additions & 3 deletions lib/grpc/client/adapters/gun.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,15 @@ defmodule GRPC.Client.Adapters.Gun do
@max_retries 100

@impl true
def connect(channel, opts \\ %{})
def connect(%{scheme: "https"} = channel, opts), do: connect_securely(channel, opts)
def connect(channel, opts), do: connect_insecurely(channel, opts)
def connect(channel, opts) when is_list(opts) do
# handle opts as a map due to :gun.open
opts = Map.new(opts)

case channel do
%{scheme: "https"} -> connect_securely(channel, opts)
_ -> connect_insecurely(channel, opts)
end
end

defp connect_securely(%{cred: %{ssl: ssl}} = channel, opts) do
transport_opts = Map.get(opts, :transport_opts) || []
Expand Down
2 changes: 1 addition & 1 deletion lib/grpc/message.ex
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ defmodule GRPC.Message do
{:error, "Encoded message is too large (9 bytes)"}

"""
@spec to_data(iodata, keyword() | map()) ::
@spec to_data(iodata, keyword()) ::
{:ok, iodata, non_neg_integer} | {:error, String.t()}
def to_data(message, opts \\ []) do
compressor = opts[:compressor]
Expand Down
11 changes: 8 additions & 3 deletions lib/grpc/server/adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,17 @@ defmodule GRPC.Server.Adapter do
pending_reader: nil
}

@callback start(atom(), %{String.t() => [module()]}, non_neg_integer(), Keyword.t()) ::
@callback start(
atom(),
%{String.t() => [module()]},
port :: non_neg_integer(),
opts :: keyword()
) ::
{atom(), any(), non_neg_integer()}

@callback stop(atom(), %{String.t() => [module()]}) :: :ok | {:error, :not_found}

@callback send_reply(state(), binary(), Keyword.t()) :: any()
@callback send_reply(state, content :: binary(), opts :: keyword()) :: any()

@callback send_headers(state(), map()) :: any()
@callback send_headers(state, headers :: map()) :: any()
end
9 changes: 6 additions & 3 deletions lib/grpc/server/adapters/cowboy/handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ defmodule GRPC.Server.Adapters.Cowboy.Handler do
@adapter GRPC.Server.Adapters.Cowboy
@default_trailers HTTP2.server_trailers()

@spec init(map(), {atom(), %{String.t() => [module()]}, map()}) :: {:cowboy_loop, map(), map()}
@spec init(
map(),
state :: {endpoint :: atom(), servers :: %{String.t() => [module()]}, opts :: keyword()}
) :: {:cowboy_loop, map(), map()}
def init(req, {endpoint, servers, opts} = state) do
path = :cowboy_req.path(req)

Expand Down Expand Up @@ -472,8 +475,8 @@ defmodule GRPC.Server.Adapters.Cowboy.Handler do
end

defp async_read_body(req, opts) do
length = Map.get(opts, :length, 8_000_000)
period = Map.get(opts, :period, 15000)
length = opts[:length] || 8_000_000
period = opts[:period] || 15000
ref = make_ref()

:cowboy_req.cast({:read_body, self(), ref, length, period}, req)
Expand Down
129 changes: 53 additions & 76 deletions lib/grpc/stub.ex
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ defmodule GRPC.Stub do
* `:accepted_compressors` - tell servers accepted compressors, this can be used without `:compressor`
* `:headers` - headers to attach to each request
"""
@spec connect(String.t(), Keyword.t()) :: {:ok, Channel.t()} | {:error, any()}
@spec connect(String.t(), keyword()) :: {:ok, Channel.t()} | {:error, any()}
def connect(addr, opts \\ []) when is_binary(addr) and is_list(opts) do
{host, port} =
case String.split(addr, ":") do
Expand All @@ -136,7 +136,7 @@ defmodule GRPC.Stub do
connect(host, port, opts)
end

@spec connect(String.t(), binary() | non_neg_integer(), Keyword.t()) ::
@spec connect(String.t(), binary() | non_neg_integer(), keyword()) ::
{:ok, Channel.t()} | {:error, any()}
def connect(host, port, opts) when is_binary(port) do
connect(host, String.to_integer(port), opts)
Expand All @@ -162,6 +162,12 @@ defmodule GRPC.Stub do
accepted_compressors
end

adapter_opts = opts[:adapter_opts] || []

unless is_list(adapter_opts) do
raise ArgumentError, ":adapter_opts must be a keyword list if present"
end

%Channel{
host: host,
port: port,
Expand All @@ -174,7 +180,7 @@ defmodule GRPC.Stub do
accepted_compressors: accepted_compressors,
headers: headers
}
|> adapter.connect(opts[:adapter_opts] || %{})
|> adapter.connect(adapter_opts)
end

def retry_timeout(curr) when curr < 11 do
Expand Down Expand Up @@ -231,21 +237,25 @@ defmodule GRPC.Stub do
with the last elem being a map of headers `%{headers: headers, trailers: trailers}`(unary) or
`%{headers: headers}`(server streaming)
"""
@spec call(atom(), tuple(), GRPC.Client.Stream.t(), struct() | nil, Keyword.t()) :: rpc_return
@spec call(atom(), tuple(), GRPC.Client.Stream.t(), struct() | nil, keyword()) :: rpc_return
def call(_service_mod, rpc, %{channel: channel} = stream, request, opts) do
{_, {req_mod, req_stream}, {res_mod, response_stream}} = rpc

stream = %{stream | request_mod: req_mod, response_mod: res_mod}

opts =
if req_stream || response_stream do
parse_req_opts([{:timeout, :infinity} | opts])
opts
|> parse_req_opts()
|> Keyword.put_new(:timeout, :infinity)
else
parse_req_opts([{:timeout, @default_timeout} | opts])
opts
|> parse_req_opts()
|> Keyword.put_new(:timeout, @default_timeout)
end

compressor = Map.get(opts, :compressor, channel.compressor)
accepted_compressors = Map.get(opts, :accepted_compressors, [])
compressor = Keyword.get(opts, :compressor, channel.compressor)
accepted_compressors = Keyword.get(opts, :accepted_compressors, [])

accepted_compressors =
if compressor do
Expand All @@ -256,8 +266,8 @@ defmodule GRPC.Stub do

stream = %{
stream
| codec: Map.get(opts, :codec, channel.codec),
compressor: Map.get(opts, :compressor, channel.compressor),
| codec: Keyword.get(opts, :codec, channel.codec),
compressor: Keyword.get(opts, :compressor, channel.compressor),
accepted_compressors: accepted_compressors
}

Expand All @@ -272,7 +282,7 @@ defmodule GRPC.Stub do
) do
last = fn %{codec: codec, compressor: compressor} = s, _ ->
message = codec.encode(request)
opts = Map.put(opts, :compressor, compressor)
opts = Keyword.put(opts, :compressor, compressor)

s
|> channel.adapter.send_request(message, opts)
Expand Down Expand Up @@ -319,7 +329,7 @@ defmodule GRPC.Stub do
* `:end_stream` - indicates it's the last one request, then the stream will be in
half_closed state. Default is false.
"""
@spec send_request(GRPC.Client.Stream.t(), struct, Keyword.t()) :: GRPC.Client.Stream.t()
@spec send_request(GRPC.Client.Stream.t(), struct, keyword()) :: GRPC.Client.Stream.t()
def send_request(%{__interface__: interface} = stream, request, opts \\ []) do
interface[:send_request].(stream, request, opts)
end
Expand Down Expand Up @@ -376,7 +386,7 @@ defmodule GRPC.Stub do
* `:deadline` - when the request is timeout, will override timeout
* `:return_headers` - when true, headers will be returned.
"""
@spec recv(GRPC.Client.Stream.t(), Keyword.t() | map()) ::
@spec recv(GRPC.Client.Stream.t(), keyword()) ::
{:ok, struct()}
| {:ok, struct(), map()}
| {:ok, Enumerable.t()}
Expand All @@ -391,7 +401,7 @@ defmodule GRPC.Stub do
def recv(%{__interface__: interface} = stream, opts) do
opts =
if is_list(opts) do
parse_recv_opts(opts)
parse_recv_opts(Keyword.put_new(opts, :timeout, @default_timeout))
else
opts
end
Expand Down Expand Up @@ -588,72 +598,39 @@ defmodule GRPC.Stub do
end
end

defp parse_req_opts(list) when is_list(list) do
parse_req_opts(list, %{})
end

defp parse_req_opts([{:timeout, timeout} | t], acc) do
parse_req_opts(t, Map.put(acc, :timeout, timeout))
end

defp parse_req_opts([{:deadline, deadline} | t], acc) do
parse_req_opts(t, Map.put(acc, :timeout, GRPC.TimeUtils.to_relative(deadline)))
end

defp parse_req_opts([{:compressor, compressor} | t], acc) do
parse_req_opts(t, Map.put(acc, :compressor, compressor))
end

defp parse_req_opts([{:accepted_compressors, compressors} | t], acc) do
parse_req_opts(t, Map.put(acc, :accepted_compressors, compressors))
end

defp parse_req_opts([{:grpc_encoding, grpc_encoding} | t], acc) do
parse_req_opts(t, Map.put(acc, :grpc_encoding, grpc_encoding))
end

defp parse_req_opts([{:metadata, metadata} | t], acc) do
parse_req_opts(t, Map.put(acc, :metadata, metadata))
end

defp parse_req_opts([{:content_type, content_type} | t], acc) do
Logger.warn(":content_type has been deprecated, please use :codec")
parse_req_opts(t, Map.put(acc, :content_type, content_type))
end

defp parse_req_opts([{:codec, codec} | t], acc) do
parse_req_opts(t, Map.put(acc, :codec, codec))
end

defp parse_req_opts([{:return_headers, return_headers} | t], acc) do
parse_req_opts(t, Map.put(acc, :return_headers, return_headers))
end

defp parse_req_opts([{key, _} | _], _) do
raise ArgumentError, "option #{inspect(key)} is not supported"
@valid_req_opts [
:timeout,
:deadline,
:compressor,
:accepted_compressors,
:grpc_encoding,
:metadata,
:codec,
:return_headers
]
defp parse_req_opts(opts) when is_list(opts) do
Enum.map(opts, fn
{:deadline, deadline} ->
{:timeout, GRPC.TimeUtils.to_relative(deadline)}

{key, value} when key in @valid_req_opts ->
{key, value}

{key, _} ->
raise ArgumentError, "option #{inspect(key)} is not supported"
end)
end

defp parse_req_opts(_, acc), do: acc

defp parse_recv_opts(list) when is_list(list) do
parse_recv_opts(list, %{timeout: @default_timeout})
end

defp parse_recv_opts([{:timeout, timeout} | t], acc) do
parse_recv_opts(t, Map.put(acc, :timeout, timeout))
end

defp parse_recv_opts([{:deadline, deadline} | t], acc) do
parse_recv_opts(t, Map.put(acc, :deadline, GRPC.TimeUtils.to_relative(deadline)))
end
Enum.map(list, fn
{:deadline, deadline} ->
{:deadline, GRPC.TimeUtils.to_relative(deadline)}

defp parse_recv_opts([{:return_headers, return_headers} | t], acc) do
parse_recv_opts(t, Map.put(acc, :return_headers, return_headers))
end
{key, _} when key not in @valid_req_opts ->
raise ArgumentError, "option #{inspect(key)} is not supported"

defp parse_recv_opts([{key, _} | _], _) do
raise ArgumentError, "option #{inspect(key)} is not supported"
kv ->
kv
end)
end

defp parse_recv_opts(_, acc), do: acc
end
10 changes: 6 additions & 4 deletions lib/grpc/transport/http2.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ defmodule GRPC.Transport.HTTP2 do
@doc """
Now we may not need this because gun already handles the pseudo headers.
"""
@spec client_headers(GRPC.Client.Stream.t(), map) :: [{String.t(), String.t()}]
def client_headers(%{channel: channel, path: path} = s, opts \\ %{}) do
@spec client_headers(GRPC.Client.Stream.t(), keyword()) :: [{String.t(), String.t()}]
def client_headers(%{channel: channel, path: path} = s, opts \\ []) do
[
{":method", "POST"},
{":scheme", channel.scheme},
Expand All @@ -33,8 +33,10 @@ defmodule GRPC.Transport.HTTP2 do
] ++ client_headers_without_reserved(s, opts)
end

@spec client_headers_without_reserved(GRPC.Client.Stream.t(), map) :: [{String.t(), String.t()}]
def client_headers_without_reserved(%{codec: codec} = stream, opts \\ %{}) do
@spec client_headers_without_reserved(GRPC.Client.Stream.t(), keyword()) :: [
{String.t(), String.t()}
]
def client_headers_without_reserved(%{codec: codec} = stream, opts \\ []) do
[
# It seems only gRPC implemenations only support "application/grpc", so we support :content_type now.
{"content-type", content_type(opts[:content_type], codec)},
Expand Down
Loading