Skip to content

Commit

Permalink
fix: update to Req 0.4.x to support streaming work (#52)
Browse files Browse the repository at this point in the history
  • Loading branch information
forest authored May 22, 2024
1 parent 1a24b12 commit 1a94241
Showing 12 changed files with 72 additions and 79 deletions.
4 changes: 2 additions & 2 deletions .tool-versions
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
erlang 25.1.2
elixir 1.14.1-otp-25
erlang 26.2.1
elixir 1.15.7-otp-26
8 changes: 4 additions & 4 deletions lib/avalanche.ex
Original file line number Diff line number Diff line change
@@ -155,7 +155,7 @@ defmodule Avalanche do
The `request_options` are merged with default options set with `default_options/1`.
"""
@spec run(String.t(), list(), keyword(), keyword()) :: any() | {:error, Avalanche.Error.t()}
@spec run(String.t(), list(), Keyword.t(), Keyword.t()) :: {:ok, Avalanche.Result.t()} | {:error, Avalanche.Error.t()}
def run(statement, params \\ [], run_options \\ [], request_options \\ []) do
start_time = System.monotonic_time()
metadata = %{params: params, query: statement}
@@ -190,7 +190,7 @@ defmodule Avalanche do
The `request_options` are merged with default options set with `default_options/1`.
"""
@spec status(String.t(), keyword(), keyword()) :: any() | {:error, Avalanche.Error.t()}
@spec status(String.t(), Keyword.t(), Keyword.t()) :: {:ok, Avalanche.Result.t()} | {:error, Avalanche.Error.t()}
def status(statement_handle, status_options \\ [], request_options \\ []) do
start_time = System.monotonic_time()

@@ -221,7 +221,7 @@ defmodule Avalanche do
See `default_options/1` for more information.
"""
@spec default_options() :: keyword()
@spec default_options() :: Keyword.t()
def default_options do
Application.get_env(:avalanche, :default_options, [])
end
@@ -233,7 +233,7 @@ defmodule Avalanche do
Avoid setting default options in libraries as they are global.
"""
@spec default_options(keyword()) :: :ok | {:error, Avalanche.Error.t()}
@spec default_options(Keyword.t()) :: :ok | {:error, Avalanche.Error.t()}
def default_options(options) do
with {:ok, opts} <- validate_options(options, @request_options_schema) do
Application.put_env(:avalanche, :default_options, opts)
2 changes: 1 addition & 1 deletion lib/avalanche/error.ex
Original file line number Diff line number Diff line change
@@ -154,7 +154,7 @@ defmodule Avalanche.Error do

@spec format_error(term) :: binary
defp format_error(error) do
if Exception.exception?(error) do
if Kernel.is_exception(error) do
"** (" <> inspect(error.__struct__) <> ") " <> Exception.message(error)
else
inspect(error)
10 changes: 5 additions & 5 deletions lib/avalanche/request.ex
Original file line number Diff line number Diff line change
@@ -11,11 +11,11 @@ defmodule Avalanche.Request do
def build_headers(options, token_type) do
user_agent = Keyword.get(options, :user_agent, @user_agent)

[
accept: "application/json",
user_agent: user_agent,
"X-Snowflake-Authorization-Token-Type": token_type
]
%{
"accept" => ["application/json"],
"user_agent" => [user_agent],
"X-Snowflake-Authorization-Token-Type" => [token_type]
}
end

def fetch_token(options) do
16 changes: 8 additions & 8 deletions lib/avalanche/requests/statement_request.ex
Original file line number Diff line number Diff line change
@@ -35,16 +35,16 @@ defmodule Avalanche.StatementRequest do
@type t :: %__MODULE__{
url: url(),
path: String.t(),
headers: keyword(),
headers: %{optional(binary()) => [binary()]},
body: body(),
token: String.t() | keyword(),
options: keyword()
token: String.t() | Keyword.t(),
options: Keyword.t()
}

@doc """
Builds a statement execution request to run.
"""
@spec build(String.t(), list(), keyword()) :: t()
@spec build(String.t(), list(), Keyword.t()) :: t()
def build(statement, params, options) do
bindings = Avalanche.Bindings.encode_params(params)

@@ -71,12 +71,12 @@ defmodule Avalanche.StatementRequest do
metadata = %{params: params, query: request.body.statement}

with _ <- Avalanche.Telemetry.start(:query, metadata, %{}),
{:ok, response} <- Req.Request.run(pipeline),
{_request, %Req.Response{} = response} <- Req.Request.run_request(pipeline),
{:ok, _result} = success <- handle_response(response),
_ <- Avalanche.Telemetry.stop(:query, System.monotonic_time(), metadata, %{}) do
success
else
{:error, error} = failure ->
{_request, error} = failure ->
metadata = Map.put(metadata, :error, error)
Avalanche.Telemetry.stop(:query, System.monotonic_time(), metadata, %{})
failure
@@ -106,7 +106,7 @@ defmodule Avalanche.StatementRequest do
|> case do
:error ->
req_options ++
[retry: &custom_retry/1, retry_delay: &custom_retry_delay/1, max_retries: 5, retry_log_level: :info]
[retry: &custom_retry/2, retry_delay: &custom_retry_delay/1, max_retries: 5, retry_log_level: :info]

_exists ->
req_options
@@ -186,7 +186,7 @@ defmodule Avalanche.StatementRequest do
{:error, error}
end

defp custom_retry(response_or_exception) do
defp custom_retry(_request, response_or_exception) do
case response_or_exception do
%Req.Response{status: status} when status in [408, 429] or status in 500..599 ->
true
12 changes: 6 additions & 6 deletions lib/avalanche/requests/status_request.ex
Original file line number Diff line number Diff line change
@@ -40,17 +40,17 @@ defmodule Avalanche.StatusRequest do
@type t :: %__MODULE__{
url: url(),
path: String.t(),
headers: keyword(),
token: String.t() | keyword(),
headers: %{optional(binary()) => [binary()]},
token: String.t() | Keyword.t(),
statement_handle: String.t(),
row_types: row_types(),
options: keyword()
options: Keyword.t()
}

@doc """
Builds a query status request to run.
"""
@spec build(String.t(), row_types(), keyword()) :: t()
@spec build(String.t(), row_types(), Keyword.t()) :: t()
def build(statement_handle, row_types \\ nil, options) do
{token_type, token} = Request.fetch_token(options)

@@ -75,12 +75,12 @@ defmodule Avalanche.StatusRequest do
metadata = %{statement_handle: statement_handle, async: async, partition: partition}

with _ <- Avalanche.Telemetry.start(:query, metadata, %{}),
{:ok, response} <- Req.Request.run(pipeline),
{_request, %Req.Response{} = response} <- Req.Request.run_request(pipeline),
{:ok, _} = success <- handle_response({request, response}),
_ <- Avalanche.Telemetry.stop(:query, System.monotonic_time(), metadata, %{}) do
success
else
{:error, error} = failure ->
{_request, error} = failure ->
metadata = Map.put(metadata, :error, error)
Avalanche.Telemetry.stop(:query, System.monotonic_time(), metadata, %{})
failure
10 changes: 5 additions & 5 deletions lib/avalanche/steps/get_partitions.ex
Original file line number Diff line number Diff line change
@@ -55,7 +55,7 @@ defmodule Avalanche.Steps.GetPartitions do
Task.Supervisor.async_stream_nolink(
Avalanche.TaskSupervisor,
requests,
fn request -> Req.Request.run(request) end,
fn request -> Req.Request.run_request(request) end,
max_concurrency: max_concurrency,
ordered: true,
timeout: timeout,
@@ -77,21 +77,21 @@ defmodule Avalanche.Steps.GetPartitions do
defp build_status_request(%Req.Request{} = request, path, partition, row_types) do
request
|> reset_req_request()
|> Req.update(method: :get, body: "", url: URI.parse(path), params: [partition: partition])
|> Req.merge(method: :get, body: "", url: URI.parse(path), params: [partition: partition])
|> Req.Request.put_private(:avalanche_row_types, row_types)
end

defp reset_req_request(request), do: %{request | current_request_steps: Keyword.keys(request.request_steps)}

defp handle_partition_response(response) do
case response do
{:ok, {:ok, response}} ->
{:ok, {_request, %Req.Response{} = response}} ->
response

# coveralls-ignore-start
# TODO: mock and force errors to cover
{:ok, {:error, error}} ->
error_response(error)
{:ok, {_request, exception}} ->
error_response(exception)

{:exit, reason} ->
error_response(reason)
4 changes: 2 additions & 2 deletions lib/avalanche/steps/poll.ex
Original file line number Diff line number Diff line change
@@ -44,7 +44,7 @@ defmodule Avalanche.Steps.Poll do
|> Req.Request.put_private(:avalanche_poll_count, poll_count + 1)
|> build_status_request(path)

{_, result} = Req.Request.run(request)
{_request, result} = Req.Request.run_request(request)

{Req.Request.halt(request), result}
else
@@ -58,7 +58,7 @@ defmodule Avalanche.Steps.Poll do
defp build_status_request(%Req.Request{} = request, path) do
request
|> reset_req_request()
|> Req.update(method: :get, body: "", url: URI.parse(path))
|> Req.merge(method: :get, body: "", url: URI.parse(path))
end

defp reset_req_request(request), do: %{request | current_request_steps: Keyword.keys(request.request_steps)}
4 changes: 2 additions & 2 deletions mix.exs
Original file line number Diff line number Diff line change
@@ -43,10 +43,10 @@ defmodule Avalanche.MixProject do
[
{:jason, "~> 1.3"},
{:joken, "~> 2.6"},
{:nimble_options, "~> 0.4 or ~> 1.0"},
{:nimble_options, "~> 1.0"},
{:cachex, "~> 3.6"},
{:plug, "~> 1.13"},
{:req, "~> 0.3.6"},
{:req, "~> 0.4.14"},
{:telemetry, "~> 0.4 or ~> 1.0"},
{:bypass, "~> 2.1", only: [:dev, :test]},
{:credo, "~> 1.5", only: [:dev, :test], runtime: false},
Loading

0 comments on commit 1a94241

Please sign in to comment.