diff --git a/README.md b/README.md index 05ab8eb..bd55eac 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ Documentation: http://hexdocs.pm/cassandra_ecto/ ## Features -* Migrations (with UDT support) +* Migrations (with UDT and materialized views support) * Lightweight transactions * Batching (only for `insert_all` queries) * Streaming @@ -62,7 +62,7 @@ Tested against 3.7+. ```elixir def deps do [{:cqerl, github: "matehat/cqerl", tag: "v1.0.2", only: :test}, - {:cassandra_ecto, "~> 0.3.0"}] + {:cassandra_ecto, "~> 0.4.0"}] end ``` diff --git a/lib/cassandra_ecto.ex b/lib/cassandra_ecto.ex index 2d7410e..5d08347 100644 --- a/lib/cassandra_ecto.ex +++ b/lib/cassandra_ecto.ex @@ -90,7 +90,7 @@ defmodule Cassandra.Ecto do def child_spec(repo, opts), do: Connection.child_spec(repo, opts) - def ensure_all_started(repo, _type) do + def ensure_all_started(_repo, _type) do {:ok, _} = Application.ensure_all_started(:cassandra_ecto) {:ok, []} end diff --git a/lib/cassandra_ecto/adapter/cql.ex b/lib/cassandra_ecto/adapter/cql.ex index 1552305..c68fc2a 100644 --- a/lib/cassandra_ecto/adapter/cql.ex +++ b/lib/cassandra_ecto/adapter/cql.ex @@ -220,8 +220,11 @@ defmodule Cassandra.Ecto.Adapter.CQL do expr(right, query) <> " CONTAINS " <> expr(left, query) defp expr({:is_nil, _, [arg]}, query), do: "#{expr(arg, query)} IS NULL" - defp expr({:not, _, [expr]}, query), do: + defp expr({:not, _, [{:is_nil, _, [arg]}]}, query), do: + "#{expr(arg, query)} IS NOT NULL" + defp expr({:not, _, [expr]}, query) do "NOT (" <> expr(expr, query) <> ")" + end defp expr({:fragment, _, parts}, query) do Enum.map_join(parts, "", fn {:raw, part} -> part diff --git a/lib/cassandra_ecto/migration.ex b/lib/cassandra_ecto/migration.ex index 85ffdd7..f0f0b83 100644 --- a/lib/cassandra_ecto/migration.ex +++ b/lib/cassandra_ecto/migration.ex @@ -96,7 +96,7 @@ defmodule Cassandra.Ecto.Migration do :counter counter :map map {:map, :integer} map - {:map, {:integer, :string} map + {:map, :integer, :string} map {:array, :integer} list {:list, :integer} list {:set, :integer} set @@ -171,7 +171,7 @@ defmodule Cassandra.Ecto.Migration do defmacro __using__(_) do quote do use Ecto.Migration - import Cassandra.Ecto.Migration, only: [type: 1] + import Cassandra.Ecto.Migration, only: [type: 1, materialized_view: 1, materialized_view: 2] end end @@ -186,8 +186,22 @@ defmodule Cassandra.Ecto.Migration do add :posted_at, :utc_datetime end """ - def type(name) when is_atom(name) do - struct(%Table{name: name, primary_key: false, options: [as: :type]}) + def type(name) do + struct(%Table{name: name, primary_key: false, options: [type: :type]}) + end + + @doc """ + Defines Cassandra materialized view + + ### Example + + create materialized_view(:cyclist_by_age, + as: (from c in "cyclist_mv", select: {c.age, c.birthday, c.name, c.country})), + primary_key: {:age, :cid} + """ + def materialized_view(name, options \\ []) do + options = options |> Keyword.put(:type, :materialized_view) + struct(%Table{name: name, primary_key: false, options: options}) end @doc """ diff --git a/lib/cassandra_ecto/migration/cql.ex b/lib/cassandra_ecto/migration/cql.ex index 7d42ca6..9f6ec15 100644 --- a/lib/cassandra_ecto/migration/cql.ex +++ b/lib/cassandra_ecto/migration/cql.ex @@ -12,15 +12,19 @@ defmodule Cassandra.Ecto.Migration.CQL do @allowed_column_opts [:primary_key, :partition_key, :null, :clustering_column, :frozen, :static] - def to_cql({command, %Table{options: [as: :type]} = table, columns}) when command in @creates do - assemble(["CREATE TYPE", if_not_exists(command), quote_table(table), - "(#{column_definitions(table, columns)})"]) - end - def to_cql({command, %Table{} = table, columns}) when command in @creates do - assemble(["CREATE TABLE", if_not_exists(command), quote_table(table), - "(#{column_definitions(table, columns)}, #{pk_definition(columns)})", - with_definitions_any(table)]) - end + def to_cql({command, %Table{} = table, columns}) when command in @creates, do: + table + |> get_type + |> fn t -> to_cql({t, command, table, columns}) end.() + def to_cql({command, %Table{} = table}) when command in @drops, do: + table + |> get_type + |> get_type_cql_name + |> drop(table, command) + def to_cql({:alter, %Table{} = table, changes}), do: + table + |> get_type + |> fn t -> to_cql({t, :alter, table, changes}) end.() def to_cql({command, %Index{} = index}) when command in @creates do fields = Enum.map_join(index.columns, ", ", "e_name/1) validate_index!(index) @@ -30,19 +34,49 @@ defmodule Cassandra.Ecto.Migration.CQL do end def to_cql({_command, %Constraint{} = _constraint}), do: error! nil, "Cassandra adapter does not support constraints" - def to_cql({:alter, %Table{} = table, changes}), do: + def to_cql({:table, :alter, %Table{} = table, changes}), do: + assemble([alter(table), column_changes(table, changes), with_definitions_any(table)]) + def to_cql({:type, :alter, %Table{} = table, changes}), do: assemble([alter(table), column_changes(table, changes)]) + def to_cql({:materialized_view, :alter, %Table{options: options} = table, _changes}) do + def_options = case options |> Enum.reject &(elem(&1, 0) in [:type]) do + [] -> nil + any -> any + end + assemble([alter(table), with_definitions_any(%Table{options: def_options})]) + end def to_cql({:rename, %Table{} = table, old, new}), do: assemble([alter(table), "RENAME", quote_name(old), "TO", quote_name(new)]) def to_cql({:rename, %Table{} = _old, %Table{} = _new}), do: error! nil, "Cassandra adapter does not support table renaming" def to_cql(string) when is_binary(string), do: string - def to_cql({command, %Table{options: [as: :type]} = table}) when command in @drops, do: - drop("TYPE", table, command) - def to_cql({command, %Table{} = table}) when command in @drops, do: - drop("TABLE", table, command) def to_cql({command, %Index{} = index}) when command in @drops, do: drop("INDEX", index, command) + def to_cql({:type, command, %Table{} = type, columns}) when command in @creates do + assemble(["CREATE TYPE", if_not_exists(command), quote_table(type), + "(#{column_definitions(type, columns)})"]) + end + def to_cql({:table, command, %Table{} = table, columns}) when command in @creates do + assemble(["CREATE TABLE", if_not_exists(command), quote_table(table), + "(#{column_definitions(table, columns)}, #{pk_definition(columns)})", + with_definitions_any(table)]) + end + def to_cql({:materialized_view, command, %Table{options: options} = view, _columns}) when command in @creates do + def_options = case options |> Enum.reject &(elem(&1, 0) in [:type, :as, :primary_key]) do + [] -> nil + any -> any + end + assemble(["CREATE MATERIALIZED VIEW", if_not_exists(command), quote_table(view), + as(Keyword.get(options, :as)), pk_definition(Keyword.get(options, :primary_key)), + with_definitions_any(%Table{options: def_options})]) + end + + defp as(%Ecto.Query{} = query) do + {query, _params, _key} = Ecto.Query.Planner.prepare(query, :all, Cassandra.Ecto, 0) + query = Ecto.Query.Planner.normalize(query, :all, Cassandra.Ecto, 0) + cql = Cassandra.Ecto.Adapter.CQL.to_cql(:all, query) + ["AS", cql] + end defp index_using(%Index{using: nil}), do: "" defp index_using(%Index{using: using}), do: "USING '#{using}'" @@ -59,8 +93,7 @@ defmodule Cassandra.Ecto.Migration.CQL do defp if_not_exists(command), do: if command == :create_if_not_exists, do: "IF NOT EXISTS", else: "" defp if_exists(command), do: if command == :drop_if_exists, do: "IF EXISTS", else: "" - defp alter(%Table{options: [as: :type]} = table), do: assemble(["ALTER TYPE", quote_table(table)]) - defp alter(%Table{} = table), do: assemble(["ALTER TABLE", quote_table(table)]) + defp alter(%Table{} = table), do: assemble(["ALTER", table |> get_type |> get_type_cql_name, quote_table(table)]) defp drop(name, table, command), do: assemble(["DROP", name, if_exists(command), quote_table(table)]) @@ -71,10 +104,10 @@ defmodule Cassandra.Ecto.Migration.CQL do defp column_change(_table, {_command, _name, %Reference{}, _opts}), do: error! nil, "Cassandra adapter does not support references" defp column_change(_table, {:add, name, type, opts}), do: - assemble(["ADD ", quote_name(name), column_type(type, opts)]) + assemble(["ADD", quote_name(name), column_type(type, opts)]) defp column_change(_table, {:modify, name, type, opts}), do: - assemble(["ALTER ", quote_name(name), "TYPE", column_type(type, opts)]) - defp column_change(_table, {:remove, name}), do: "DROP #{quote_name(name)}" + assemble(["ALTER", quote_name(name), "TYPE", column_type(type, opts)]) + defp column_change(_table, {:remove, name}), do: assemble(["DROP", quote_name(name)]) defp column_definition(_table, {:add, _name, %Reference{} = _ref, _opts}), do: error! nil, "Cassandra adapter does not support references" @@ -126,6 +159,7 @@ defmodule Cassandra.Ecto.Migration.CQL do defp ecto_to_db(:string), do: "text" defp ecto_to_db(:map), do: ecto_to_db({:map, :binary}) defp ecto_to_db({:map, {t1, t2}}), do: "map<#{ecto_to_db(t1)}, #{ecto_to_db(t2)}>" + defp ecto_to_db({:map, t1, t2}), do: "map<#{ecto_to_db(t1)}, #{ecto_to_db(t2)}>" defp ecto_to_db({:map, t}), do: ecto_to_db({:map, {:varchar, t}}) defp ecto_to_db({:array, t}), do: "list<#{ecto_to_db(t)}>" defp ecto_to_db({:list, t}), do: "list<#{ecto_to_db(t)}>" @@ -171,6 +205,10 @@ defmodule Cassandra.Ecto.Migration.CQL do defp with_val(val) when is_binary(val) or is_atom(val), do: "'#{val}'" defp with_val(val), do: val + defp pk_definition(columns) when is_binary(columns), do: + assemble(["PRIMARY KEY", columns]) + defp pk_definition(columns) when is_tuple(columns), do: + pk_tuple_definition(columns) |> pk_definition defp pk_definition(columns) do pks = get_col_names(columns, :primary_key) partition_keys = get_col_names(columns, :partition_key) @@ -188,7 +226,7 @@ defmodule Cassandra.Ecto.Migration.CQL do """ end - "PRIMARY KEY (#{partition_part(partition_keys)}#{clustering_part(clustering_cols)})" + pk_definition("(#{partition_part(partition_keys)}#{clustering_part(clustering_cols)})") end defp partition_part(partition_keys) do @@ -209,4 +247,23 @@ defmodule Cassandra.Ecto.Migration.CQL do for {_, name, _, opts} <- columns, opts[type], do: name + + defp pk_tuple_definition(columns) when is_tuple(columns), do: + Tuple.to_list(columns) + |> Enum.map_join(", ", &pk_tuple_definition/1) + |> fn arg -> "(" <> arg <> ")" end.() + defp pk_tuple_definition(column) when is_binary(column), do: quote_name(column) + defp pk_tuple_definition(column) when is_atom(column), do: + pk_tuple_definition(Atom.to_string(column)) + + defp get_type(%Table{options: nil}), do: :table + defp get_type(%Table{} = table), do: Keyword.get(table.options, :type, :table) + + defp get_type_cql_name(type) do + case type do + :type -> "TYPE" + :materialized_view -> "MATERIALIZED VIEW" + :table -> "TABLE" + end + end end diff --git a/mix.exs b/mix.exs index 4e09266..b806db9 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule CassandraEcto.Mixfile do use Mix.Project - @version "0.3.1" + @version "0.4.0" def project do [app: :cassandra_ecto, diff --git a/spec/cassandra_ecto/adapter/cql_spec.ex b/spec/cassandra_ecto/adapter/cql_spec.ex index 37651f0..6d796f1 100644 --- a/spec/cassandra_ecto/adapter/cql_spec.ex +++ b/spec/cassandra_ecto/adapter/cql_spec.ex @@ -1,4 +1,4 @@ -defmodule CassandroEctoAdapterCQLSpec do +defmodule CassandraEctoAdapterCQLSpec do import Cassandra.Ecto.Adapter.CQL import Ecto.Query alias Cassandra.Ecto.Spec.Support.Schemas.Post diff --git a/spec/cassandra_ecto/migration/cql_spec.ex b/spec/cassandra_ecto/migration/cql_spec.ex new file mode 100644 index 0000000..f3f2b1b --- /dev/null +++ b/spec/cassandra_ecto/migration/cql_spec.ex @@ -0,0 +1,61 @@ +defmodule CassandraEctoMigrationCQLSpec do + import Cassandra.Ecto.Migration.CQL + alias Ecto.Migration.{Table, Index} + use ESpec, async: true + import Ecto.Query + + describe "Cassandra.Ecto.Migration.CQL" do + describe "to_cql/1" do + context "with :create" do + it "generates cql to create table" do + to_cql({:create, %Table{name: :test}, [{:add, :id, :uuid, [primary_key: true]}, {:add, :value, :integer, []}]}) + |> to(eq "CREATE TABLE \"test\" (\"id\" uuid, \"value\" int, PRIMARY KEY (\"id\"))") + end + it "generates cql to create index" do + to_cql({:create, %Index{table: :test, columns: ["a", "b"]}}) + |> to(eq "CREATE INDEX \"nil\" ON \"test\" (\"a\", \"b\")") + end + it "generates cql to create type" do + to_cql({:create, %Table{name: :test, options: [type: :type]}, [{:add, :a, :integer, []}, {:add, :b, :integer, []}]}) + |> to(eq "CREATE TYPE \"test\" (\"a\" int, \"b\" int)") + end + it "generates cql to create materialized view" do + to_cql({:create, %Table{name: :test_view, options: [type: :materialized_view, as: (from p in "test", select: {p.a, p.b}), primary_key: {:a, :b}, comment: "test"]}, []}) + |> to(eq "CREATE MATERIALIZED VIEW \"test_view\" AS SELECT \"a\", \"b\" FROM \"test\" PRIMARY KEY (\"a\", \"b\") WITH COMMENT = 'test'") + end + end + context "with :alter" do + it "generates cql to alter table" do + to_cql({:alter, %Table{name: :test, options: [comment: "test"]}, [{:add, :id, :uuid, []}, {:remove, :value}, {:modify, :value, :blob, []}]}) + |> to(eq "ALTER TABLE \"test\" ADD \"id\" uuid, DROP \"value\", ALTER \"value\" TYPE blob WITH COMMENT = 'test'") + end + it "generates cql to alter type" do + to_cql({:alter, %Table{name: :test, options: [type: :type]}, [{:add, :id, :uuid, []}, {:remove, :value}, {:modify, :value, :blob, []}]}) + |> to(eq "ALTER TYPE \"test\" ADD \"id\" uuid, DROP \"value\", ALTER \"value\" TYPE blob") + end + it "generates cql to alter materialized view" do + to_cql({:alter, %Table{name: :test, options: [type: :materialized_view, comment: "test"]}, []}) + |> to(eq "ALTER MATERIALIZED VIEW \"test\" WITH COMMENT = 'test'") + end + end + context "with :drop" do + it "generates cql to drop table" do + to_cql({:drop, %Table{name: :test}}) + |> to(eq "DROP TABLE \"test\"") + end + it "generates cql to drop index" do + to_cql({:drop, %Index{name: :test}}) + |> to(eq "DROP INDEX \"test\"") + end + it "generates cql to drop type" do + to_cql({:drop, %Table{name: :test, options: [type: :type]}}) + |> to(eq "DROP TYPE \"test\"") + end + it "generates cql to drop materialized view" do + to_cql({:drop, %Table{name: :test, options: [type: :materialized_view]}}) + |> to(eq "DROP MATERIALIZED VIEW \"test\"") + end + end + end + end +end diff --git a/spec/cassandra_ecto/migration_spec.exs b/spec/cassandra_ecto/migration_spec.exs index 11c9bd0..333e8cb 100644 --- a/spec/cassandra_ecto/migration_spec.exs +++ b/spec/cassandra_ecto/migration_spec.exs @@ -11,8 +11,7 @@ defmodule CassandraEctoMigrationSpec do CreateWithDifferentTypesMigration, CustomIndexMigration, CustomIndexWithOptsMigration, CreateUserTypeMigration, AlterTypeMigration, CreateCounterMigration, CreateWithPrimaryAndPartitionKeys, - CreateWithWithoutPrimaryAndPartitionKeys - } + CreateWithWithoutPrimaryAndPartitionKeys, MaterializedViewMigration} describe "Cassandra.Ecto", context_tag: :db do describe "Migration behaviour" do before do @@ -37,7 +36,7 @@ defmodule CassandraEctoMigrationSpec do expect(fn -> up(TestRepo, 84050906120000, CreateWithPrimaryAndPartitionKeys, log: false) end) |> to(raise_exception()) end - it "fails to create table without primary and partition keys", focus: true do + it "fails to create table without primary and partition keys" do expect(fn -> up(TestRepo, 85050906120000, CreateWithWithoutPrimaryAndPartitionKeys, log: false) end) |> to(raise_exception()) end @@ -60,6 +59,11 @@ defmodule CassandraEctoMigrationSpec do assert [1] = TestRepo.all from p in "create_counter_migration", select: p.counter down(TestRepo, 12050906120000, CreateCounterMigration, log: false) end + it "creates materialized views" do + assert :ok = up(TestRepo, 12750906120000, MaterializedViewMigration, log: false) + assert [2, 3] = TestRepo.all from p in "materialized_view_view_migration", select: p.id, where: p.value == 2 + assert :ok = down(TestRepo, 12750906120000, MaterializedViewMigration, log: false) + end end context "with %Index" do it "creates index" do diff --git a/spec/support/migrations.exs b/spec/support/migrations.exs index 455eee5..4b814a6 100644 --- a/spec/support/migrations.exs +++ b/spec/support/migrations.exs @@ -58,6 +58,31 @@ defmodule Cassandra.Ecto.Spec.Support.Migrations do end end + defmodule MaterializedViewMigration do + use Cassandra.Ecto.Migration + import Ecto.Query + @table :materialized_view_table_migration + @view :materialized_view_view_migration + + def up do + create table(@table, primary_key: false) do + add :id, :integer, primary_key: true + add :value, :integer + end + create materialized_view(@view, primary_key: {:value, :id}, + as: (from p in Atom.to_string(@table), select: {p.id, p.value}, where: not(is_nil(p.value)))) + execute "INSERT INTO #{@table} (id, value) VALUES (1, 1)" + execute "INSERT INTO #{@table} (id, value) VALUES (2, 2)" + execute "INSERT INTO #{@table} (id, value) VALUES (3, 2)" + end + + def down do + drop materialized_view(@view) + drop table(@table) + end + + end + defmodule CreateWithDifferentTypesMigration do use Cassandra.Ecto.Migration