Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace Ch with ReqCH for ClickHouse integration #83

Merged
merged 8 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,23 @@ jobs:
matrix:
include:
- pair:
elixir: 1.14.2
otp: 25.0
elixir: "1.14.2"
otp: "25.0"

- pair:
elixir: "1.17.3"
otp: "27.1.2"
lint: true

steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4

- uses: erlef/setup-beam@v1
with:
otp-version: ${{matrix.pair.otp}}
elixir-version: ${{matrix.pair.elixir}}

- uses: actions/cache@v3
- uses: actions/cache@v4
with:
path: |
deps
Expand Down
13 changes: 5 additions & 8 deletions lib/assets/connection_cell/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -821,13 +821,10 @@ export function init(ctx, info) {

template: `
<div class="row">
<BaseInput
name="schema"
label="Schema"
type="text"
v-model="fields.schema"
inputClass="input"
:grow
<BaseSwitch
name="use_ssl"
label="HTTPS"
v-model="fields.use_ssl"
/>
<BaseInput
name="hostname"
Expand All @@ -847,7 +844,7 @@ export function init(ctx, info) {
:grow
:required
/>
</div>
</div>
<div class="row">
<BaseInput
name="database"
Expand Down
94 changes: 77 additions & 17 deletions lib/kino_db/connection_cell.ex
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,9 @@ defmodule KinoDB.ConnectionCell do
~w|database hostname port use_ipv6 username password use_ssl cacertfile instance|

"clickhouse" ->
~w|scheme username password_secret hostname port database|
if fields["use_password_secret"],
do: ~w|hostname port use_ssl username password_secret database|,
else: ~w|hostname port use_ssl username password database|

type when type in ["postgres", "mysql"] ->
if fields["use_password_secret"],
Expand Down Expand Up @@ -330,10 +332,15 @@ defmodule KinoDB.ConnectionCell do
end

defp to_quoted(%{"type" => "clickhouse"} = attrs) do
trimmed = attrs |> trim_opts() |> Map.new()
shared_opts = shared_options(trimmed)

clickhouse_opts = trimmed |> clickhouse_options(shared_opts)

quote do
opts = unquote(trim_opts(shared_options(attrs) ++ clickhouse_options(attrs)))
unquote(quoted_var(attrs["variable"])) = ReqCH.new(unquote(clickhouse_opts))

{:ok, unquote(quoted_var(attrs["variable"]))} = Kino.start_child({Ch, opts})
:ok
end
end

Expand Down Expand Up @@ -438,9 +445,58 @@ defmodule KinoDB.ConnectionCell do
end

defp clickhouse_options(attrs) do
[
scheme: attrs["scheme"] || "http"
]
scheme = if attrs["use_ssl"], do: "https", else: "http"

[scheme: scheme]
end

defp clickhouse_options(attrs, shared_options) do
attrs
|> clickhouse_options()
|> build_clickhouse_base_url(shared_options)
|> maybe_add_req_basic_auth(shared_options)
|> maybe_add_clickhouse_database(shared_options)
end

defp build_clickhouse_base_url(opts, shared_opts) do
host = Keyword.fetch!(shared_opts, :hostname)
port = Keyword.fetch!(shared_opts, :port)
scheme = Keyword.fetch!(opts, :scheme)

uri = %URI{scheme: scheme, port: port, host: host}

opts
|> Keyword.put_new(:base_url, URI.to_string(uri))
|> Keyword.delete(:scheme)
end

defp maybe_add_req_basic_auth(opts, shared_opts) do
username = shared_opts[:username]

if username != "" do
password = shared_opts[:password]

auth =
if is_binary(password) do
"#{username}:#{password}"
else
quote do
unquote(username) <> ":" <> unquote(password)
end
end

Keyword.put_new(opts, :auth, {:basic, auth})
else
opts
end
end

defp maybe_add_clickhouse_database(opts, shared_opts) do
if shared_opts[:database] != "" do
Keyword.put_new(opts, :database, shared_opts[:database])
else
opts
end
end

defp quoted_var(string), do: {String.to_atom(string), [], nil}
Expand All @@ -462,6 +518,7 @@ defmodule KinoDB.ConnectionCell do
Code.ensure_loaded?(Exqlite) -> "sqlite"
Code.ensure_loaded?(ReqBigQuery) -> "bigquery"
Code.ensure_loaded?(ReqAthena) -> "athena"
Code.ensure_loaded?(ReqCH) -> "clickhouse"
Code.ensure_loaded?(Adbc) -> "duckdb"
Code.ensure_loaded?(Tds) -> "sqlserver"
true -> "postgres"
Expand Down Expand Up @@ -493,16 +550,10 @@ defmodule KinoDB.ConnectionCell do
end

defp missing_dep(%{"type" => "athena"}) do
deps = [
missing_many_deps([
{ReqAthena, ~s|{:req_athena, "~> 0.1"}|},
{Explorer, ~s|{:explorer, "~> 0.9"}|}
]

deps = for {module, dep} <- deps, not Code.ensure_loaded?(module), do: dep

if deps != [] do
Enum.join(deps, ", ")
end
])
end

defp missing_dep(%{"type" => adbc}) when adbc in ~w[snowflake duckdb] do
Expand All @@ -518,13 +569,22 @@ defmodule KinoDB.ConnectionCell do
end

defp missing_dep(%{"type" => "clickhouse"}) do
unless Code.ensure_loaded?(Ch) do
~s|{:ch, "~> 0.2"}|
end
missing_many_deps([
{ReqCH, ~s|{:req_ch, "~> 0.1"}|},
{Explorer, ~s|{:explorer, "~> 0.10"}|}
])
end

defp missing_dep(_ctx), do: nil

defp missing_many_deps(deps) do
deps = for {module, dep} <- deps, not Code.ensure_loaded?(module), do: dep

if deps != [] do
Enum.join(deps, ", ")
philss marked this conversation as resolved.
Show resolved Hide resolved
end
end

defp join_quoted(quoted_blocks) do
asts =
Enum.flat_map(quoted_blocks, fn
Expand Down
48 changes: 46 additions & 2 deletions lib/kino_db/sql_cell.ex
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ defmodule KinoDB.SQLCell do
cond do
Keyword.has_key?(connection.request_steps, :bigquery_run) -> "bigquery"
Keyword.has_key?(connection.request_steps, :athena_run) -> "athena"
Keyword.has_key?(connection.request_steps, :clickhouse_run) -> "clickhouse"
true -> nil
end
end
Expand Down Expand Up @@ -219,8 +220,18 @@ defmodule KinoDB.SQLCell do
to_quoted(attrs, quote(do: Tds), fn n -> "@#{n}" end)
end

# query!/4 based that returns a Req response.
defp to_quoted(%{"connection" => %{"type" => "clickhouse"}} = attrs) do
to_quoted(attrs, quote(do: Ch), fn n -> "{$#{n}:String}" end)
to_quoted_query_req(attrs, quote(do: ReqCH), fn n, inner ->
name =
if String.match?(inner, ~r/[^a-z0-9_]/) do
"param_#{n}"
else
inner
end

"{#{name}:String}"
end)
end

# Explorer-based
Expand All @@ -246,6 +257,21 @@ defmodule KinoDB.SQLCell do
end
end

defp to_quoted_query_req(attrs, quoted_module, next) do
{query, params} = parameterize(attrs["query"], attrs["connection"]["type"], next)
opts_args = query_opts_args(attrs)

quote do
unquote(quoted_var(attrs["result_variable"])) =
unquote(quoted_module).query!(
unquote(quoted_var(attrs["connection"]["variable"])),
unquote(quoted_query(query)),
unquote(params),
unquote_splicing(opts_args)
).body
end
end

defp to_quoted(attrs, quoted_module, next) do
{query, params} = parameterize(attrs["query"], attrs["connection"]["type"], next)
opts_args = query_opts_args(attrs)
Expand Down Expand Up @@ -310,6 +336,9 @@ defmodule KinoDB.SQLCell do
defp query_opts_args(%{"connection" => %{"type" => "athena"}, "cache_query" => cache_query}),
do: [[cache_query: cache_query]]

defp query_opts_args(%{"connection" => %{"type" => "clickhouse"}}),
do: [[format: :explorer]]

defp query_opts_args(_attrs), do: []

defp parameterize(query, type, next) do
Expand Down Expand Up @@ -342,7 +371,7 @@ defmodule KinoDB.SQLCell do

defp parameterize("{{" <> rest = query, raw, params, n, type, next) do
with [inner, rest] <- String.split(rest, "}}", parts: 2),
sql_param <- next.(n),
sql_param <- apply_next(next, n, inner),
{:ok, param} <- quote_param(type, inner, sql_param) do
parameterize(rest, raw <> sql_param, [param | params], n + 1, type, next)
else
Expand All @@ -354,6 +383,21 @@ defmodule KinoDB.SQLCell do
parameterize(rest, <<raw::binary, char::utf8>>, params, n, type, next)
end

defp apply_next(next, n, _inner) when is_function(next, 1), do: next.(n)
defp apply_next(next, n, inner) when is_function(next, 2), do: next.(n, inner)

defp quote_param("clickhouse", inner, sql_param) do
with {:ok, inner_ast} <- Code.string_to_quoted(inner) do
name =
sql_param |> String.trim_leading("{") |> String.split(":", parts: 2) |> List.first()

{:ok,
quote do
{unquote(name), unquote(inner_ast)}
end}
end
end

defp quote_param("sqlserver", inner, sql_param) do
with {:ok, inner_ast} <- Code.string_to_quoted(inner) do
{:ok,
Expand Down
3 changes: 2 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@ defmodule KinoDB.MixProject do
{:myxql, "~> 0.7", optional: true},
{:postgrex, "~> 0.18 or ~> 1.0", optional: true},
{:tds, "~> 2.3.4 or ~> 2.4", optional: true},
{:explorer, "~> 0.8", optional: true},
{:explorer, "~> 0.10", optional: true},

# Those dependecies are new, so we use stricter versions
{:req_bigquery, "~> 0.1.0", optional: true},
{:req_athena, "~> 0.2.0", optional: true},
{:req_ch, "~> 0.1.0", optional: true},

# Dev only
{:ex_doc, "~> 0.28", only: :dev, runtime: false}
Expand Down
Loading