Skip to content

Commit

Permalink
add UDF and UDA support
Browse files Browse the repository at this point in the history
  • Loading branch information
vintikzzz committed Jan 26, 2017
1 parent 569ca53 commit 8368429
Show file tree
Hide file tree
Showing 11 changed files with 218 additions and 16 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Documentation: http://hexdocs.pm/cassandra_ecto/

## Features

* Migrations (with UDT and materialized views support)
* Migrations (with UDT, UDF, UDA and materialized views support)
* Lightweight transactions
* Batching (only for `insert_all` queries)
* Streaming
Expand Down
28 changes: 28 additions & 0 deletions lib/cassandra_ecto/helper.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,34 @@ defmodule Cassandra.Ecto.Helper do

alias Ecto.Migration.{Table, Index}
alias Ecto.Query

def db_value(value, {:tuple, types}) when is_tuple(value) and is_tuple(types) do
Enum.zip(Tuple.to_list(value), Tuple.to_list(types))
|> Enum.map_join(", ", &db_value(elem(&1, 0), elem(&1, 1)))
|> fn e -> "(" <> e <> ")" end.()
end
def db_value(value, {:list, type}) when is_list(value) do
value
|> Enum.map_join(", ", &db_value(&1, type))
|> fn e -> "[" <> e <> "]" end.()
end
def db_value(value, {:set, type}) when is_list(value) do
value
|> Enum.map_join(", ", &db_value(&1, type))
|> fn e -> "{" <> e <> "}" end.()
end
def db_value(value, {:map, type}) when is_map(value), do:
db_value(value, {:map, :string, type})
def db_value(value, {:map, key_type, value_type}) when is_map(value) do
Map.to_list(value)
|> Enum.map_join(", ", &db_value(&1, {key_type, value_type}))
|> fn e -> "{" <> e <> "}" end.()
end
def db_value({key, value}, {key_type, value_type}), do:
db_value(key, key_type) <> ": " <> db_value(value, value_type)
def db_value(value, _) when is_binary(value), do: "$$" <> value <> "$$"
def db_value(value, _), do: to_string(value)

def quote_name(name)
def quote_name(name) when is_atom(name),
do: quote_name(Atom.to_string(name))
Expand Down
46 changes: 43 additions & 3 deletions lib/cassandra_ecto/migration.ex
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ defmodule Cassandra.Ecto.Migration do
defmacro __using__(_) do
quote do
use Ecto.Migration
import Cassandra.Ecto.Migration, only: [type: 1, materialized_view: 1, materialized_view: 2]
import Cassandra.Ecto.Migration, only: [type: 1, materialized_view: 1, materialized_view: 2, function: 1, function: 3]
end
end

Expand Down Expand Up @@ -200,8 +200,48 @@ defmodule Cassandra.Ecto.Migration do
primary_key: {:age, :cid}
"""
def materialized_view(name, options \\ []) do
options = options |> Keyword.put(:type, :materialized_view)
struct(%Table{name: name, primary_key: false, options: options})
options = Keyword.put(options, :type, :materialized_view)
prefix = Keyword.get(options, :prefix)
struct(%Table{name: name, primary_key: false, options: options, prefix: prefix})
end

@doc """
Defines Cassandra user defined function (UDF)
### Example
create function(:left, [column: :text, num: :int],
returns: :text, language: :javascript,
on_null_input: :returns_null,
as: "column.substring(0,num)")
"""
def function(name, vars \\ [], options \\ []) do
options = options
|> Keyword.put(:type, :function)
|> Keyword.put(:vars, vars)
|> Keyword.put_new(:language, :java)
|> Keyword.put_new(:on_null_input, :returns_null)
prefix = Keyword.get(options, :prefix)
struct(%Table{name: name, prefix: prefix, options: options})
end

@doc """
Defines Cassandra user defined aggregate (UDA)
### Example
create aggregate(:average, :int,
sfunc: function(:avgState),
stype: {:tuple, {:int, :bigint}},
finalfunc: function(:avgFinal),
initcond: {0, 0})
"""
def aggregate(name, var, options \\ []) do
options = options
|> Keyword.put(:type, :aggregate)
|> Keyword.put(:var, var)
prefix = Keyword.get(options, :prefix)
struct(%Table{name: name, prefix: prefix, options: options})
end

@doc """
Expand Down
34 changes: 34 additions & 0 deletions lib/cassandra_ecto/migration/cql.ex
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,22 @@ defmodule Cassandra.Ecto.Migration.CQL do
as(Keyword.get(options, :as)), pk_definition(Keyword.get(options, :primary_key)),
with_definitions_any(%Table{options: def_options})])
end
def to_cql({:function, command, %Table{options: options} = function, _columns}) when command in @creates do
assemble(["CREATE FUNCTION", if_not_exists(command),
quote_table(function), function_vars(function),
on_null_input(Keyword.fetch!(options, :on_null_input)), "ON NULL INPUT",
"RETURNS", ecto_to_db(Keyword.fetch!(options, :returns)),
"LANGUAGE", Atom.to_string(Keyword.fetch!(options, :language)),
"AS", "$$" <> Keyword.fetch!(options, :as) <> "$$"])
end
def to_cql({:aggregate, command, %Table{options: options} = aggregate, _columns}) when command in @creates do
assemble(["CREATE AGGREGATE", if_not_exists(command),
quote_table(aggregate) <> "(" <> ecto_to_db(Keyword.fetch!(options, :var)) <> ")",
"SFUNC", quote_table(Keyword.fetch!(options, :sfunc)),
"STYPE", ecto_to_db(Keyword.fetch!(options, :stype)),
"FINALFUNC", quote_table(Keyword.fetch!(options, :finalfunc)),
"INITCOND", initcond(aggregate)])
end

defp as(%Ecto.Query{} = query) do
{query, _params, _key} = Ecto.Query.Planner.prepare(query, :all, Cassandra.Ecto, 0)
Expand All @@ -78,6 +94,22 @@ defmodule Cassandra.Ecto.Migration.CQL do
["AS", cql]
end

defp function_vars(%Table{options: options}) do
{:ok, vars} = Keyword.fetch(options, :vars)
vars
|> Enum.map_join(", ", fn {name, type} -> quote_name(name) <> " " <> ecto_to_db(type) end)
|> (fn e -> "(" <> e <> ")" end).()
end

defp on_null_input(:called), do: "CALLED"
defp on_null_input(:returns_null), do: "RETURNS NULL"

defp initcond(%Table{options: options}) do
type = Keyword.fetch!(options, :stype)
value = Keyword.fetch!(options, :initcond)
db_value(value, type)
end

defp index_using(%Index{using: nil}), do: ""
defp index_using(%Index{using: using}), do: "USING '#{using}'"

Expand Down Expand Up @@ -264,6 +296,8 @@ defmodule Cassandra.Ecto.Migration.CQL do
:type -> "TYPE"
:materialized_view -> "MATERIALIZED VIEW"
:table -> "TABLE"
:function -> "FUNCTION"
:aggregate -> "AGGREGATE"
end
end
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule CassandraEcto.Mixfile do
use Mix.Project

@version "0.4.1"
@version "0.5.0"

def project do
[app: :cassandra_ecto,
Expand Down
5 changes: 5 additions & 0 deletions spec/cassandra_ecto/adapter/cql_spec.ex
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ defmodule CassandraEctoAdapterCQLSpec do
expect(to_cql(:all, query))
|> to(eq "SELECT * FROM \"posts\" WHERE (\"id\" >= 1) AND (\"title\" = 'abra')")
end
it "generates cql with nested binary clauses" do
query = (from p in "posts", where: (p.id >= 1 and p.title == "abra") or p.id == 1) |> normalize
expect(to_cql(:all, query))
|> to(eq "SELECT * FROM \"posts\" WHERE ((\"id\" >= 1) AND (\"title\" = 'abra')) OR (\"id\" = 1)")
end
it "generates cql with in clauses" do
query = (from p in "posts", where: p.id in [1, 2]) |> normalize
expect(to_cql(:all, query))
Expand Down
55 changes: 55 additions & 0 deletions spec/cassandra_ecto/helper_spec.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
defmodule CassandraEctoHelperSpec do
use ESpec, async: true

alias Cassandra.Ecto.Helper
describe "Cassandra.Ecto.Helper" do
describe "quote_name/1" do
it "wraps field name with quotes" do
expect(Helper.quote_name("test")) |> to(eq "\"test\"")
end
it "fails with bad field name" do
expect(fn -> Helper.quote_name("wro\"ng") end)
|> to(raise_exception())
end
end
describe "quote_table/1" do
it "wraps table name with quotes" do
expect(Helper.quote_table("test")) |> to(eq "\"test\"")
end
it "fails with bad table name" do
expect(fn -> Helper.quote_table("wro\"ng") end)
|> to(raise_exception())
end
end
describe "db_value" do
it "saves integers as is" do
expect(Helper.db_value(1, :integer)) |> to(eq "1")
end
it "escapes strings with dollars" do
expect(Helper.db_value("test", :string)) |> to(eq "$$test$$")
end
it "wraps :set with curly brackets" do
expect(Helper.db_value([1, 2, 3], {:set, :int})) |> to(eq "{1, 2, 3}")
end
it "wraps :list with square brackets" do
expect(Helper.db_value([1, 2, 3], {:list, :int})) |> to(eq "[1, 2, 3]")
end
it "saves map" do
expect(Helper.db_value(%{"abra" => 1, "cadabra" => 2}, {:map, :string, :int}))
|> to(eq "{$$abra$$: 1, $$cadabra$$: 2}")
end
it "saves map with skipped key type" do
expect(Helper.db_value(%{"abra" => 1, "cadabra" => 2}, {:map, :int}))
|> to(eq "{$$abra$$: 1, $$cadabra$$: 2}")
end
it "saves tuple" do
expect(Helper.db_value({1, "abra"}, {:tuple, {:int, :string}}))
|> to(eq "(1, $$abra$$)")
end
it "deals with nesting" do
expect(Helper.db_value([{1, 2}, {3, 4}], {:list, {:tuple, {:int, :int}}}))
|> to(eq "[(1, 2), (3, 4)]")
end
end
end
end
2 changes: 1 addition & 1 deletion spec/cassandra_ecto/log_spec.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule CassandraEctoLogSpec do

use ESpec, async: true

describe "Cassandra.Log" do
describe "Cassandra.Ecto.Log" do
describe "log/4" do
it "writes log to io in cyan color for select queries" do
entry = %{connection_time: 0, decode_time: nil,
Expand Down
20 changes: 20 additions & 0 deletions spec/cassandra_ecto/migration/cql_spec.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,18 @@ defmodule CassandraEctoMigrationCQLSpec do
to_cql({:create, %Table{name: :test_view, options: [type: :materialized_view, as: (from p in "test", select: {p.a, p.b}, where: not(is_nil(p.a)) and not(is_nil(p.b))), primary_key: {:a, :b}, comment: "test"]}, []})
|> to(eq "CREATE MATERIALIZED VIEW \"test_view\" AS SELECT \"a\", \"b\" FROM \"test\" WHERE \"a\" IS NOT NULL AND \"b\" IS NOT NULL PRIMARY KEY (\"a\", \"b\") WITH COMMENT = 'test'")
end
it "generates cql to create user function in :java" do
to_cql({:create, %Table{name: :fLog, prefix: :cycling, options: [type: :function, vars: [input: :double], returns: :double, language: :java, on_null_input: :called, as: "return Double.valueOf(Math.log(input.doubleValue()));"]}, []})
|> to(eq "CREATE FUNCTION \"cycling\".\"fLog\" (\"input\" double) CALLED ON NULL INPUT RETURNS double LANGUAGE java AS $$return Double.valueOf(Math.log(input.doubleValue()));$$")
end
it "generates cql to create user function in :javascript" do
to_cql({:create_if_not_exists, %Table{name: :left, prefix: :cycling, options: [type: :function, vars: [column: :text, num: :int], returns: :text, language: :javascript, on_null_input: :returns_null, as: "column.substring(0,num)"]}, []})
|> to(eq "CREATE FUNCTION IF NOT EXISTS \"cycling\".\"left\" (\"column\" text, \"num\" int) RETURNS NULL ON NULL INPUT RETURNS text LANGUAGE javascript AS $$column.substring(0,num)$$")
end
it "generates cql to create aggregate", focus: true do
to_cql({:create, %Table{name: :average, prefix: :cycling, options: [type: :aggregate, var: :int, sfunc: %Table{name: :avgState, options: [type: :function]}, stype: {:tuple, {:int, :bigint}}, finalfunc: %Table{name: :avgFinal, options: [type: :function]}, initcond: {0, 0}]}, []})
|> to(eq "CREATE AGGREGATE \"cycling\".\"average\"(int) SFUNC \"avgState\" STYPE tuple<int, bigint> FINALFUNC \"avgFinal\" INITCOND (0, 0)")
end
end
context "with :alter" do
it "generates cql to alter table" do
Expand Down Expand Up @@ -55,6 +67,14 @@ defmodule CassandraEctoMigrationCQLSpec do
to_cql({:drop, %Table{name: :test, options: [type: :materialized_view]}})
|> to(eq "DROP MATERIALIZED VIEW \"test\"")
end
it "generates cql to drop user function" do
to_cql({:drop, %Table{name: :test, options: [type: :function]}})
|> to(eq "DROP FUNCTION \"test\"")
end
it "generates cql to drop user aggregation" do
to_cql({:drop, %Table{name: :test, options: [type: :aggregate]}})
|> to(eq "DROP AGGREGATE \"test\"")
end
end
end
end
Expand Down
17 changes: 7 additions & 10 deletions spec/cassandra_ecto/migration_spec.exs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ defmodule CassandraEctoMigrationSpec do
CreateWithDifferentTypesMigration, CustomIndexMigration,
CustomIndexWithOptsMigration, CreateUserTypeMigration,
AlterTypeMigration, CreateCounterMigration, CreateWithPrimaryAndPartitionKeys,
CreateWithWithoutPrimaryAndPartitionKeys, MaterializedViewMigration}
CreateWithWithoutPrimaryAndPartitionKeys, MaterializedViewMigration,
FunctionMigration}
describe "Cassandra.Ecto", context_tag: :db do
describe "Migration behaviour" do
before do
Expand Down Expand Up @@ -64,6 +65,11 @@ defmodule CassandraEctoMigrationSpec do
assert [2, 3] = TestRepo.all from p in "materialized_view_view_migration", select: p.id, where: p.value == 2
assert :ok = down(TestRepo, 12750906120000, MaterializedViewMigration, log: false)
end
it "creates function" do
assert :ok = up(TestRepo, 12750906120001, FunctionMigration, log: false)
assert ["abr", "cad"] = (TestRepo.all from p in "function_migration", select: fragment("left(?, 3)", p.value)) |> Enum.sort
assert :ok = down(TestRepo, 12750906120001, FunctionMigration, log: false)
end
end
context "with %Index" do
it "creates index" do
Expand All @@ -88,9 +94,6 @@ defmodule CassandraEctoMigrationSpec do
end
end
end
context "when :create_if_not_exists" do
pending "creates table"
end
context "when :rename" do
it "renames column" do
assert :ok = up(TestRepo, 20010906120000, RenameColumnMigration, log: false)
Expand Down Expand Up @@ -126,12 +129,6 @@ defmodule CassandraEctoMigrationSpec do
down(TestRepo, 19100906120000, AlterTypeMigration, log: false)
end
end
context "when :drop" do
pending "drops table"
end
context "when :drop_if_exists" do
pending "drops table"
end
end
end
end
Expand Down
23 changes: 23 additions & 0 deletions spec/support/migrations.exs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,29 @@ defmodule Cassandra.Ecto.Spec.Support.Migrations do

end

defmodule FunctionMigration do
use Cassandra.Ecto.Migration

@table :function_migration
@function :left

def up do
create table(@table, primary_key: false) do
add :id, :uuid, primary_key: true
add :value, :text
end
create function(@function, [column: :text, num: :int], returns: :text,
as: "return column.substring(0, Math.min(column.length(), num));")
execute "INSERT INTO #{@table} (id, value) VALUES (now(), 'abra')"
execute "INSERT INTO #{@table} (id, value) VALUES (now(), 'cadabra')"
end

def down do
drop function(@function)
drop table(@table)
end
end

defmodule CreateWithDifferentTypesMigration do
use Cassandra.Ecto.Migration

Expand Down

0 comments on commit 8368429

Please sign in to comment.