From 83684295ae8ecd7966f03ef8a485a19c4547ccde Mon Sep 17 00:00:00 2001 From: Pavel Tatarsky Date: Thu, 26 Jan 2017 22:22:52 +0300 Subject: [PATCH] add UDF and UDA support --- README.md | 2 +- lib/cassandra_ecto/helper.ex | 28 ++++++++++++ lib/cassandra_ecto/migration.ex | 46 +++++++++++++++++-- lib/cassandra_ecto/migration/cql.ex | 34 ++++++++++++++ mix.exs | 2 +- spec/cassandra_ecto/adapter/cql_spec.ex | 5 +++ spec/cassandra_ecto/helper_spec.exs | 55 +++++++++++++++++++++++ spec/cassandra_ecto/log_spec.exs | 2 +- spec/cassandra_ecto/migration/cql_spec.ex | 20 +++++++++ spec/cassandra_ecto/migration_spec.exs | 17 +++---- spec/support/migrations.exs | 23 ++++++++++ 11 files changed, 218 insertions(+), 16 deletions(-) create mode 100644 spec/cassandra_ecto/helper_spec.exs diff --git a/README.md b/README.md index bd55eac..166be91 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ Documentation: http://hexdocs.pm/cassandra_ecto/ ## Features -* Migrations (with UDT and materialized views support) +* Migrations (with UDT, UDF, UDA and materialized views support) * Lightweight transactions * Batching (only for `insert_all` queries) * Streaming diff --git a/lib/cassandra_ecto/helper.ex b/lib/cassandra_ecto/helper.ex index d9f3835..2a65bce 100644 --- a/lib/cassandra_ecto/helper.ex +++ b/lib/cassandra_ecto/helper.ex @@ -3,6 +3,34 @@ defmodule Cassandra.Ecto.Helper do alias Ecto.Migration.{Table, Index} alias Ecto.Query + + def db_value(value, {:tuple, types}) when is_tuple(value) and is_tuple(types) do + Enum.zip(Tuple.to_list(value), Tuple.to_list(types)) + |> Enum.map_join(", ", &db_value(elem(&1, 0), elem(&1, 1))) + |> fn e -> "(" <> e <> ")" end.() + end + def db_value(value, {:list, type}) when is_list(value) do + value + |> Enum.map_join(", ", &db_value(&1, type)) + |> fn e -> "[" <> e <> "]" end.() + end + def db_value(value, {:set, type}) when is_list(value) do + value + |> Enum.map_join(", ", &db_value(&1, type)) + |> fn e -> "{" <> e <> "}" end.() + end + def db_value(value, {:map, type}) when is_map(value), do: + db_value(value, {:map, :string, type}) + def db_value(value, {:map, key_type, value_type}) when is_map(value) do + Map.to_list(value) + |> Enum.map_join(", ", &db_value(&1, {key_type, value_type})) + |> fn e -> "{" <> e <> "}" end.() + end + def db_value({key, value}, {key_type, value_type}), do: + db_value(key, key_type) <> ": " <> db_value(value, value_type) + def db_value(value, _) when is_binary(value), do: "$$" <> value <> "$$" + def db_value(value, _), do: to_string(value) + def quote_name(name) def quote_name(name) when is_atom(name), do: quote_name(Atom.to_string(name)) diff --git a/lib/cassandra_ecto/migration.ex b/lib/cassandra_ecto/migration.ex index f0f0b83..5bc4be8 100644 --- a/lib/cassandra_ecto/migration.ex +++ b/lib/cassandra_ecto/migration.ex @@ -171,7 +171,7 @@ defmodule Cassandra.Ecto.Migration do defmacro __using__(_) do quote do use Ecto.Migration - import Cassandra.Ecto.Migration, only: [type: 1, materialized_view: 1, materialized_view: 2] + import Cassandra.Ecto.Migration, only: [type: 1, materialized_view: 1, materialized_view: 2, function: 1, function: 3] end end @@ -200,8 +200,48 @@ defmodule Cassandra.Ecto.Migration do primary_key: {:age, :cid} """ def materialized_view(name, options \\ []) do - options = options |> Keyword.put(:type, :materialized_view) - struct(%Table{name: name, primary_key: false, options: options}) + options = Keyword.put(options, :type, :materialized_view) + prefix = Keyword.get(options, :prefix) + struct(%Table{name: name, primary_key: false, options: options, prefix: prefix}) + end + + @doc """ + Defines Cassandra user defined function (UDF) + + ### Example + + create function(:left, [column: :text, num: :int], + returns: :text, language: :javascript, + on_null_input: :returns_null, + as: "column.substring(0,num)") + """ + def function(name, vars \\ [], options \\ []) do + options = options + |> Keyword.put(:type, :function) + |> Keyword.put(:vars, vars) + |> Keyword.put_new(:language, :java) + |> Keyword.put_new(:on_null_input, :returns_null) + prefix = Keyword.get(options, :prefix) + struct(%Table{name: name, prefix: prefix, options: options}) + end + + @doc """ + Defines Cassandra user defined aggregate (UDA) + + ### Example + + create aggregate(:average, :int, + sfunc: function(:avgState), + stype: {:tuple, {:int, :bigint}}, + finalfunc: function(:avgFinal), + initcond: {0, 0}) + """ + def aggregate(name, var, options \\ []) do + options = options + |> Keyword.put(:type, :aggregate) + |> Keyword.put(:var, var) + prefix = Keyword.get(options, :prefix) + struct(%Table{name: name, prefix: prefix, options: options}) end @doc """ diff --git a/lib/cassandra_ecto/migration/cql.ex b/lib/cassandra_ecto/migration/cql.ex index 9f6ec15..b2aea3b 100644 --- a/lib/cassandra_ecto/migration/cql.ex +++ b/lib/cassandra_ecto/migration/cql.ex @@ -70,6 +70,22 @@ defmodule Cassandra.Ecto.Migration.CQL do as(Keyword.get(options, :as)), pk_definition(Keyword.get(options, :primary_key)), with_definitions_any(%Table{options: def_options})]) end + def to_cql({:function, command, %Table{options: options} = function, _columns}) when command in @creates do + assemble(["CREATE FUNCTION", if_not_exists(command), + quote_table(function), function_vars(function), + on_null_input(Keyword.fetch!(options, :on_null_input)), "ON NULL INPUT", + "RETURNS", ecto_to_db(Keyword.fetch!(options, :returns)), + "LANGUAGE", Atom.to_string(Keyword.fetch!(options, :language)), + "AS", "$$" <> Keyword.fetch!(options, :as) <> "$$"]) + end + def to_cql({:aggregate, command, %Table{options: options} = aggregate, _columns}) when command in @creates do + assemble(["CREATE AGGREGATE", if_not_exists(command), + quote_table(aggregate) <> "(" <> ecto_to_db(Keyword.fetch!(options, :var)) <> ")", + "SFUNC", quote_table(Keyword.fetch!(options, :sfunc)), + "STYPE", ecto_to_db(Keyword.fetch!(options, :stype)), + "FINALFUNC", quote_table(Keyword.fetch!(options, :finalfunc)), + "INITCOND", initcond(aggregate)]) + end defp as(%Ecto.Query{} = query) do {query, _params, _key} = Ecto.Query.Planner.prepare(query, :all, Cassandra.Ecto, 0) @@ -78,6 +94,22 @@ defmodule Cassandra.Ecto.Migration.CQL do ["AS", cql] end + defp function_vars(%Table{options: options}) do + {:ok, vars} = Keyword.fetch(options, :vars) + vars + |> Enum.map_join(", ", fn {name, type} -> quote_name(name) <> " " <> ecto_to_db(type) end) + |> (fn e -> "(" <> e <> ")" end).() + end + + defp on_null_input(:called), do: "CALLED" + defp on_null_input(:returns_null), do: "RETURNS NULL" + + defp initcond(%Table{options: options}) do + type = Keyword.fetch!(options, :stype) + value = Keyword.fetch!(options, :initcond) + db_value(value, type) + end + defp index_using(%Index{using: nil}), do: "" defp index_using(%Index{using: using}), do: "USING '#{using}'" @@ -264,6 +296,8 @@ defmodule Cassandra.Ecto.Migration.CQL do :type -> "TYPE" :materialized_view -> "MATERIALIZED VIEW" :table -> "TABLE" + :function -> "FUNCTION" + :aggregate -> "AGGREGATE" end end end diff --git a/mix.exs b/mix.exs index c888a17..0809709 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule CassandraEcto.Mixfile do use Mix.Project - @version "0.4.1" + @version "0.5.0" def project do [app: :cassandra_ecto, diff --git a/spec/cassandra_ecto/adapter/cql_spec.ex b/spec/cassandra_ecto/adapter/cql_spec.ex index 80ed2d1..f51a683 100644 --- a/spec/cassandra_ecto/adapter/cql_spec.ex +++ b/spec/cassandra_ecto/adapter/cql_spec.ex @@ -65,6 +65,11 @@ defmodule CassandraEctoAdapterCQLSpec do expect(to_cql(:all, query)) |> to(eq "SELECT * FROM \"posts\" WHERE (\"id\" >= 1) AND (\"title\" = 'abra')") end + it "generates cql with nested binary clauses" do + query = (from p in "posts", where: (p.id >= 1 and p.title == "abra") or p.id == 1) |> normalize + expect(to_cql(:all, query)) + |> to(eq "SELECT * FROM \"posts\" WHERE ((\"id\" >= 1) AND (\"title\" = 'abra')) OR (\"id\" = 1)") + end it "generates cql with in clauses" do query = (from p in "posts", where: p.id in [1, 2]) |> normalize expect(to_cql(:all, query)) diff --git a/spec/cassandra_ecto/helper_spec.exs b/spec/cassandra_ecto/helper_spec.exs new file mode 100644 index 0000000..f06a17a --- /dev/null +++ b/spec/cassandra_ecto/helper_spec.exs @@ -0,0 +1,55 @@ +defmodule CassandraEctoHelperSpec do + use ESpec, async: true + + alias Cassandra.Ecto.Helper + describe "Cassandra.Ecto.Helper" do + describe "quote_name/1" do + it "wraps field name with quotes" do + expect(Helper.quote_name("test")) |> to(eq "\"test\"") + end + it "fails with bad field name" do + expect(fn -> Helper.quote_name("wro\"ng") end) + |> to(raise_exception()) + end + end + describe "quote_table/1" do + it "wraps table name with quotes" do + expect(Helper.quote_table("test")) |> to(eq "\"test\"") + end + it "fails with bad table name" do + expect(fn -> Helper.quote_table("wro\"ng") end) + |> to(raise_exception()) + end + end + describe "db_value" do + it "saves integers as is" do + expect(Helper.db_value(1, :integer)) |> to(eq "1") + end + it "escapes strings with dollars" do + expect(Helper.db_value("test", :string)) |> to(eq "$$test$$") + end + it "wraps :set with curly brackets" do + expect(Helper.db_value([1, 2, 3], {:set, :int})) |> to(eq "{1, 2, 3}") + end + it "wraps :list with square brackets" do + expect(Helper.db_value([1, 2, 3], {:list, :int})) |> to(eq "[1, 2, 3]") + end + it "saves map" do + expect(Helper.db_value(%{"abra" => 1, "cadabra" => 2}, {:map, :string, :int})) + |> to(eq "{$$abra$$: 1, $$cadabra$$: 2}") + end + it "saves map with skipped key type" do + expect(Helper.db_value(%{"abra" => 1, "cadabra" => 2}, {:map, :int})) + |> to(eq "{$$abra$$: 1, $$cadabra$$: 2}") + end + it "saves tuple" do + expect(Helper.db_value({1, "abra"}, {:tuple, {:int, :string}})) + |> to(eq "(1, $$abra$$)") + end + it "deals with nesting" do + expect(Helper.db_value([{1, 2}, {3, 4}], {:list, {:tuple, {:int, :int}}})) + |> to(eq "[(1, 2), (3, 4)]") + end + end + end +end diff --git a/spec/cassandra_ecto/log_spec.exs b/spec/cassandra_ecto/log_spec.exs index 2455bba..b4008cd 100644 --- a/spec/cassandra_ecto/log_spec.exs +++ b/spec/cassandra_ecto/log_spec.exs @@ -3,7 +3,7 @@ defmodule CassandraEctoLogSpec do use ESpec, async: true - describe "Cassandra.Log" do + describe "Cassandra.Ecto.Log" do describe "log/4" do it "writes log to io in cyan color for select queries" do entry = %{connection_time: 0, decode_time: nil, diff --git a/spec/cassandra_ecto/migration/cql_spec.ex b/spec/cassandra_ecto/migration/cql_spec.ex index dfb4ada..498b09c 100644 --- a/spec/cassandra_ecto/migration/cql_spec.ex +++ b/spec/cassandra_ecto/migration/cql_spec.ex @@ -23,6 +23,18 @@ defmodule CassandraEctoMigrationCQLSpec do to_cql({:create, %Table{name: :test_view, options: [type: :materialized_view, as: (from p in "test", select: {p.a, p.b}, where: not(is_nil(p.a)) and not(is_nil(p.b))), primary_key: {:a, :b}, comment: "test"]}, []}) |> to(eq "CREATE MATERIALIZED VIEW \"test_view\" AS SELECT \"a\", \"b\" FROM \"test\" WHERE \"a\" IS NOT NULL AND \"b\" IS NOT NULL PRIMARY KEY (\"a\", \"b\") WITH COMMENT = 'test'") end + it "generates cql to create user function in :java" do + to_cql({:create, %Table{name: :fLog, prefix: :cycling, options: [type: :function, vars: [input: :double], returns: :double, language: :java, on_null_input: :called, as: "return Double.valueOf(Math.log(input.doubleValue()));"]}, []}) + |> to(eq "CREATE FUNCTION \"cycling\".\"fLog\" (\"input\" double) CALLED ON NULL INPUT RETURNS double LANGUAGE java AS $$return Double.valueOf(Math.log(input.doubleValue()));$$") + end + it "generates cql to create user function in :javascript" do + to_cql({:create_if_not_exists, %Table{name: :left, prefix: :cycling, options: [type: :function, vars: [column: :text, num: :int], returns: :text, language: :javascript, on_null_input: :returns_null, as: "column.substring(0,num)"]}, []}) + |> to(eq "CREATE FUNCTION IF NOT EXISTS \"cycling\".\"left\" (\"column\" text, \"num\" int) RETURNS NULL ON NULL INPUT RETURNS text LANGUAGE javascript AS $$column.substring(0,num)$$") + end + it "generates cql to create aggregate", focus: true do + to_cql({:create, %Table{name: :average, prefix: :cycling, options: [type: :aggregate, var: :int, sfunc: %Table{name: :avgState, options: [type: :function]}, stype: {:tuple, {:int, :bigint}}, finalfunc: %Table{name: :avgFinal, options: [type: :function]}, initcond: {0, 0}]}, []}) + |> to(eq "CREATE AGGREGATE \"cycling\".\"average\"(int) SFUNC \"avgState\" STYPE tuple FINALFUNC \"avgFinal\" INITCOND (0, 0)") + end end context "with :alter" do it "generates cql to alter table" do @@ -55,6 +67,14 @@ defmodule CassandraEctoMigrationCQLSpec do to_cql({:drop, %Table{name: :test, options: [type: :materialized_view]}}) |> to(eq "DROP MATERIALIZED VIEW \"test\"") end + it "generates cql to drop user function" do + to_cql({:drop, %Table{name: :test, options: [type: :function]}}) + |> to(eq "DROP FUNCTION \"test\"") + end + it "generates cql to drop user aggregation" do + to_cql({:drop, %Table{name: :test, options: [type: :aggregate]}}) + |> to(eq "DROP AGGREGATE \"test\"") + end end end end diff --git a/spec/cassandra_ecto/migration_spec.exs b/spec/cassandra_ecto/migration_spec.exs index 333e8cb..31ab79f 100644 --- a/spec/cassandra_ecto/migration_spec.exs +++ b/spec/cassandra_ecto/migration_spec.exs @@ -11,7 +11,8 @@ defmodule CassandraEctoMigrationSpec do CreateWithDifferentTypesMigration, CustomIndexMigration, CustomIndexWithOptsMigration, CreateUserTypeMigration, AlterTypeMigration, CreateCounterMigration, CreateWithPrimaryAndPartitionKeys, - CreateWithWithoutPrimaryAndPartitionKeys, MaterializedViewMigration} + CreateWithWithoutPrimaryAndPartitionKeys, MaterializedViewMigration, + FunctionMigration} describe "Cassandra.Ecto", context_tag: :db do describe "Migration behaviour" do before do @@ -64,6 +65,11 @@ defmodule CassandraEctoMigrationSpec do assert [2, 3] = TestRepo.all from p in "materialized_view_view_migration", select: p.id, where: p.value == 2 assert :ok = down(TestRepo, 12750906120000, MaterializedViewMigration, log: false) end + it "creates function" do + assert :ok = up(TestRepo, 12750906120001, FunctionMigration, log: false) + assert ["abr", "cad"] = (TestRepo.all from p in "function_migration", select: fragment("left(?, 3)", p.value)) |> Enum.sort + assert :ok = down(TestRepo, 12750906120001, FunctionMigration, log: false) + end end context "with %Index" do it "creates index" do @@ -88,9 +94,6 @@ defmodule CassandraEctoMigrationSpec do end end end - context "when :create_if_not_exists" do - pending "creates table" - end context "when :rename" do it "renames column" do assert :ok = up(TestRepo, 20010906120000, RenameColumnMigration, log: false) @@ -126,12 +129,6 @@ defmodule CassandraEctoMigrationSpec do down(TestRepo, 19100906120000, AlterTypeMigration, log: false) end end - context "when :drop" do - pending "drops table" - end - context "when :drop_if_exists" do - pending "drops table" - end end end end diff --git a/spec/support/migrations.exs b/spec/support/migrations.exs index 4b814a6..38ed7d0 100644 --- a/spec/support/migrations.exs +++ b/spec/support/migrations.exs @@ -83,6 +83,29 @@ defmodule Cassandra.Ecto.Spec.Support.Migrations do end + defmodule FunctionMigration do + use Cassandra.Ecto.Migration + + @table :function_migration + @function :left + + def up do + create table(@table, primary_key: false) do + add :id, :uuid, primary_key: true + add :value, :text + end + create function(@function, [column: :text, num: :int], returns: :text, + as: "return column.substring(0, Math.min(column.length(), num));") + execute "INSERT INTO #{@table} (id, value) VALUES (now(), 'abra')" + execute "INSERT INTO #{@table} (id, value) VALUES (now(), 'cadabra')" + end + + def down do + drop function(@function) + drop table(@table) + end + end + defmodule CreateWithDifferentTypesMigration do use Cassandra.Ecto.Migration