Skip to content

Commit

Permalink
Materialized Views
Browse files Browse the repository at this point in the history
* materialized views added
* query clause `not(is_nil(arg))` converts to CQL `IS NOT NULL`
* DDL CQL specs added
  • Loading branch information
vintikzzz committed Dec 27, 2016
1 parent 9ae127e commit b7ad6eb
Show file tree
Hide file tree
Showing 10 changed files with 197 additions and 33 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Documentation: http://hexdocs.pm/cassandra_ecto/

## Features

* Migrations (with UDT support)
* Migrations (with UDT and materialized views support)
* Lightweight transactions
* Batching (only for `insert_all` queries)
* Streaming
Expand Down Expand Up @@ -62,7 +62,7 @@ Tested against 3.7+.
```elixir
def deps do
[{:cqerl, github: "matehat/cqerl", tag: "v1.0.2", only: :test},
{:cassandra_ecto, "~> 0.3.0"}]
{:cassandra_ecto, "~> 0.4.0"}]
end
```

Expand Down
2 changes: 1 addition & 1 deletion lib/cassandra_ecto.ex
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ defmodule Cassandra.Ecto do

def child_spec(repo, opts), do: Connection.child_spec(repo, opts)

def ensure_all_started(repo, _type) do
def ensure_all_started(_repo, _type) do
{:ok, _} = Application.ensure_all_started(:cassandra_ecto)
{:ok, []}
end
Expand Down
5 changes: 4 additions & 1 deletion lib/cassandra_ecto/adapter/cql.ex
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,11 @@ defmodule Cassandra.Ecto.Adapter.CQL do
expr(right, query) <> " CONTAINS " <> expr(left, query)
defp expr({:is_nil, _, [arg]}, query), do:
"#{expr(arg, query)} IS NULL"
defp expr({:not, _, [expr]}, query), do:
defp expr({:not, _, [{:is_nil, _, [arg]}]}, query), do:
"#{expr(arg, query)} IS NOT NULL"
defp expr({:not, _, [expr]}, query) do
"NOT (" <> expr(expr, query) <> ")"
end
defp expr({:fragment, _, parts}, query) do
Enum.map_join(parts, "", fn
{:raw, part} -> part
Expand Down
22 changes: 18 additions & 4 deletions lib/cassandra_ecto/migration.ex
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ defmodule Cassandra.Ecto.Migration do
:counter counter
:map map<varchar, blob>
{:map, :integer} map<varchar, int>
{:map, {:integer, :string} map<int, text>
{:map, :integer, :string} map<int, text>
{:array, :integer} list<int>
{:list, :integer} list<int>
{:set, :integer} set<int>
Expand Down Expand Up @@ -171,7 +171,7 @@ defmodule Cassandra.Ecto.Migration do
defmacro __using__(_) do
quote do
use Ecto.Migration
import Cassandra.Ecto.Migration, only: [type: 1]
import Cassandra.Ecto.Migration, only: [type: 1, materialized_view: 1, materialized_view: 2]
end
end

Expand All @@ -186,8 +186,22 @@ defmodule Cassandra.Ecto.Migration do
add :posted_at, :utc_datetime
end
"""
def type(name) when is_atom(name) do
struct(%Table{name: name, primary_key: false, options: [as: :type]})
def type(name) do
struct(%Table{name: name, primary_key: false, options: [type: :type]})
end

@doc """
Defines Cassandra materialized view
### Example
create materialized_view(:cyclist_by_age,
as: (from c in "cyclist_mv", select: {c.age, c.birthday, c.name, c.country})),
primary_key: {:age, :cid}
"""
def materialized_view(name, options \\ []) do
options = options |> Keyword.put(:type, :materialized_view)
struct(%Table{name: name, primary_key: false, options: options})
end

@doc """
Expand Down
97 changes: 77 additions & 20 deletions lib/cassandra_ecto/migration/cql.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,19 @@ defmodule Cassandra.Ecto.Migration.CQL do
@allowed_column_opts [:primary_key, :partition_key, :null,
:clustering_column, :frozen, :static]

def to_cql({command, %Table{options: [as: :type]} = table, columns}) when command in @creates do
assemble(["CREATE TYPE", if_not_exists(command), quote_table(table),
"(#{column_definitions(table, columns)})"])
end
def to_cql({command, %Table{} = table, columns}) when command in @creates do
assemble(["CREATE TABLE", if_not_exists(command), quote_table(table),
"(#{column_definitions(table, columns)}, #{pk_definition(columns)})",
with_definitions_any(table)])
end
def to_cql({command, %Table{} = table, columns}) when command in @creates, do:
table
|> get_type
|> fn t -> to_cql({t, command, table, columns}) end.()
def to_cql({command, %Table{} = table}) when command in @drops, do:
table
|> get_type
|> get_type_cql_name
|> drop(table, command)
def to_cql({:alter, %Table{} = table, changes}), do:
table
|> get_type
|> fn t -> to_cql({t, :alter, table, changes}) end.()
def to_cql({command, %Index{} = index}) when command in @creates do
fields = Enum.map_join(index.columns, ", ", &quote_name/1)
validate_index!(index)
Expand All @@ -30,19 +34,49 @@ defmodule Cassandra.Ecto.Migration.CQL do
end
def to_cql({_command, %Constraint{} = _constraint}), do:
error! nil, "Cassandra adapter does not support constraints"
def to_cql({:alter, %Table{} = table, changes}), do:
def to_cql({:table, :alter, %Table{} = table, changes}), do:
assemble([alter(table), column_changes(table, changes), with_definitions_any(table)])
def to_cql({:type, :alter, %Table{} = table, changes}), do:
assemble([alter(table), column_changes(table, changes)])
def to_cql({:materialized_view, :alter, %Table{options: options} = table, _changes}) do
def_options = case options |> Enum.reject &(elem(&1, 0) in [:type]) do
[] -> nil
any -> any
end
assemble([alter(table), with_definitions_any(%Table{options: def_options})])
end
def to_cql({:rename, %Table{} = table, old, new}), do:
assemble([alter(table), "RENAME", quote_name(old), "TO", quote_name(new)])
def to_cql({:rename, %Table{} = _old, %Table{} = _new}), do:
error! nil, "Cassandra adapter does not support table renaming"
def to_cql(string) when is_binary(string), do: string
def to_cql({command, %Table{options: [as: :type]} = table}) when command in @drops, do:
drop("TYPE", table, command)
def to_cql({command, %Table{} = table}) when command in @drops, do:
drop("TABLE", table, command)
def to_cql({command, %Index{} = index}) when command in @drops, do:
drop("INDEX", index, command)
def to_cql({:type, command, %Table{} = type, columns}) when command in @creates do
assemble(["CREATE TYPE", if_not_exists(command), quote_table(type),
"(#{column_definitions(type, columns)})"])
end
def to_cql({:table, command, %Table{} = table, columns}) when command in @creates do
assemble(["CREATE TABLE", if_not_exists(command), quote_table(table),
"(#{column_definitions(table, columns)}, #{pk_definition(columns)})",
with_definitions_any(table)])
end
def to_cql({:materialized_view, command, %Table{options: options} = view, _columns}) when command in @creates do
def_options = case options |> Enum.reject &(elem(&1, 0) in [:type, :as, :primary_key]) do
[] -> nil
any -> any
end
assemble(["CREATE MATERIALIZED VIEW", if_not_exists(command), quote_table(view),
as(Keyword.get(options, :as)), pk_definition(Keyword.get(options, :primary_key)),
with_definitions_any(%Table{options: def_options})])
end

defp as(%Ecto.Query{} = query) do
{query, _params, _key} = Ecto.Query.Planner.prepare(query, :all, Cassandra.Ecto, 0)
query = Ecto.Query.Planner.normalize(query, :all, Cassandra.Ecto, 0)
cql = Cassandra.Ecto.Adapter.CQL.to_cql(:all, query)
["AS", cql]
end

defp index_using(%Index{using: nil}), do: ""
defp index_using(%Index{using: using}), do: "USING '#{using}'"
Expand All @@ -59,8 +93,7 @@ defmodule Cassandra.Ecto.Migration.CQL do
defp if_not_exists(command), do: if command == :create_if_not_exists, do: "IF NOT EXISTS", else: ""
defp if_exists(command), do: if command == :drop_if_exists, do: "IF EXISTS", else: ""

defp alter(%Table{options: [as: :type]} = table), do: assemble(["ALTER TYPE", quote_table(table)])
defp alter(%Table{} = table), do: assemble(["ALTER TABLE", quote_table(table)])
defp alter(%Table{} = table), do: assemble(["ALTER", table |> get_type |> get_type_cql_name, quote_table(table)])

defp drop(name, table, command), do:
assemble(["DROP", name, if_exists(command), quote_table(table)])
Expand All @@ -71,10 +104,10 @@ defmodule Cassandra.Ecto.Migration.CQL do
defp column_change(_table, {_command, _name, %Reference{}, _opts}), do:
error! nil, "Cassandra adapter does not support references"
defp column_change(_table, {:add, name, type, opts}), do:
assemble(["ADD ", quote_name(name), column_type(type, opts)])
assemble(["ADD", quote_name(name), column_type(type, opts)])
defp column_change(_table, {:modify, name, type, opts}), do:
assemble(["ALTER ", quote_name(name), "TYPE", column_type(type, opts)])
defp column_change(_table, {:remove, name}), do: "DROP #{quote_name(name)}"
assemble(["ALTER", quote_name(name), "TYPE", column_type(type, opts)])
defp column_change(_table, {:remove, name}), do: assemble(["DROP", quote_name(name)])

defp column_definition(_table, {:add, _name, %Reference{} = _ref, _opts}), do:
error! nil, "Cassandra adapter does not support references"
Expand Down Expand Up @@ -126,6 +159,7 @@ defmodule Cassandra.Ecto.Migration.CQL do
defp ecto_to_db(:string), do: "text"
defp ecto_to_db(:map), do: ecto_to_db({:map, :binary})
defp ecto_to_db({:map, {t1, t2}}), do: "map<#{ecto_to_db(t1)}, #{ecto_to_db(t2)}>"
defp ecto_to_db({:map, t1, t2}), do: "map<#{ecto_to_db(t1)}, #{ecto_to_db(t2)}>"
defp ecto_to_db({:map, t}), do: ecto_to_db({:map, {:varchar, t}})
defp ecto_to_db({:array, t}), do: "list<#{ecto_to_db(t)}>"
defp ecto_to_db({:list, t}), do: "list<#{ecto_to_db(t)}>"
Expand Down Expand Up @@ -171,6 +205,10 @@ defmodule Cassandra.Ecto.Migration.CQL do
defp with_val(val) when is_binary(val) or is_atom(val), do: "'#{val}'"
defp with_val(val), do: val

defp pk_definition(columns) when is_binary(columns), do:
assemble(["PRIMARY KEY", columns])
defp pk_definition(columns) when is_tuple(columns), do:
pk_tuple_definition(columns) |> pk_definition
defp pk_definition(columns) do
pks = get_col_names(columns, :primary_key)
partition_keys = get_col_names(columns, :partition_key)
Expand All @@ -188,7 +226,7 @@ defmodule Cassandra.Ecto.Migration.CQL do
"""
end

"PRIMARY KEY (#{partition_part(partition_keys)}#{clustering_part(clustering_cols)})"
pk_definition("(#{partition_part(partition_keys)}#{clustering_part(clustering_cols)})")
end

defp partition_part(partition_keys) do
Expand All @@ -209,4 +247,23 @@ defmodule Cassandra.Ecto.Migration.CQL do
for {_, name, _, opts} <- columns,
opts[type],
do: name

defp pk_tuple_definition(columns) when is_tuple(columns), do:
Tuple.to_list(columns)
|> Enum.map_join(", ", &pk_tuple_definition/1)
|> fn arg -> "(" <> arg <> ")" end.()
defp pk_tuple_definition(column) when is_binary(column), do: quote_name(column)
defp pk_tuple_definition(column) when is_atom(column), do:
pk_tuple_definition(Atom.to_string(column))

defp get_type(%Table{options: nil}), do: :table
defp get_type(%Table{} = table), do: Keyword.get(table.options, :type, :table)

defp get_type_cql_name(type) do
case type do
:type -> "TYPE"
:materialized_view -> "MATERIALIZED VIEW"
:table -> "TABLE"
end
end
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule CassandraEcto.Mixfile do
use Mix.Project

@version "0.3.1"
@version "0.4.0"

def project do
[app: :cassandra_ecto,
Expand Down
2 changes: 1 addition & 1 deletion spec/cassandra_ecto/adapter/cql_spec.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule CassandroEctoAdapterCQLSpec do
defmodule CassandraEctoAdapterCQLSpec do
import Cassandra.Ecto.Adapter.CQL
import Ecto.Query
alias Cassandra.Ecto.Spec.Support.Schemas.Post
Expand Down
61 changes: 61 additions & 0 deletions spec/cassandra_ecto/migration/cql_spec.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
defmodule CassandraEctoMigrationCQLSpec do
import Cassandra.Ecto.Migration.CQL
alias Ecto.Migration.{Table, Index}
use ESpec, async: true
import Ecto.Query

describe "Cassandra.Ecto.Migration.CQL" do
describe "to_cql/1" do
context "with :create" do
it "generates cql to create table" do
to_cql({:create, %Table{name: :test}, [{:add, :id, :uuid, [primary_key: true]}, {:add, :value, :integer, []}]})
|> to(eq "CREATE TABLE \"test\" (\"id\" uuid, \"value\" int, PRIMARY KEY (\"id\"))")
end
it "generates cql to create index" do
to_cql({:create, %Index{table: :test, columns: ["a", "b"]}})
|> to(eq "CREATE INDEX \"nil\" ON \"test\" (\"a\", \"b\")")
end
it "generates cql to create type" do
to_cql({:create, %Table{name: :test, options: [type: :type]}, [{:add, :a, :integer, []}, {:add, :b, :integer, []}]})
|> to(eq "CREATE TYPE \"test\" (\"a\" int, \"b\" int)")
end
it "generates cql to create materialized view" do
to_cql({:create, %Table{name: :test_view, options: [type: :materialized_view, as: (from p in "test", select: {p.a, p.b}), primary_key: {:a, :b}, comment: "test"]}, []})
|> to(eq "CREATE MATERIALIZED VIEW \"test_view\" AS SELECT \"a\", \"b\" FROM \"test\" PRIMARY KEY (\"a\", \"b\") WITH COMMENT = 'test'")
end
end
context "with :alter" do
it "generates cql to alter table" do
to_cql({:alter, %Table{name: :test, options: [comment: "test"]}, [{:add, :id, :uuid, []}, {:remove, :value}, {:modify, :value, :blob, []}]})
|> to(eq "ALTER TABLE \"test\" ADD \"id\" uuid, DROP \"value\", ALTER \"value\" TYPE blob WITH COMMENT = 'test'")
end
it "generates cql to alter type" do
to_cql({:alter, %Table{name: :test, options: [type: :type]}, [{:add, :id, :uuid, []}, {:remove, :value}, {:modify, :value, :blob, []}]})
|> to(eq "ALTER TYPE \"test\" ADD \"id\" uuid, DROP \"value\", ALTER \"value\" TYPE blob")
end
it "generates cql to alter materialized view" do
to_cql({:alter, %Table{name: :test, options: [type: :materialized_view, comment: "test"]}, []})
|> to(eq "ALTER MATERIALIZED VIEW \"test\" WITH COMMENT = 'test'")
end
end
context "with :drop" do
it "generates cql to drop table" do
to_cql({:drop, %Table{name: :test}})
|> to(eq "DROP TABLE \"test\"")
end
it "generates cql to drop index" do
to_cql({:drop, %Index{name: :test}})
|> to(eq "DROP INDEX \"test\"")
end
it "generates cql to drop type" do
to_cql({:drop, %Table{name: :test, options: [type: :type]}})
|> to(eq "DROP TYPE \"test\"")
end
it "generates cql to drop materialized view" do
to_cql({:drop, %Table{name: :test, options: [type: :materialized_view]}})
|> to(eq "DROP MATERIALIZED VIEW \"test\"")
end
end
end
end
end
10 changes: 7 additions & 3 deletions spec/cassandra_ecto/migration_spec.exs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ defmodule CassandraEctoMigrationSpec do
CreateWithDifferentTypesMigration, CustomIndexMigration,
CustomIndexWithOptsMigration, CreateUserTypeMigration,
AlterTypeMigration, CreateCounterMigration, CreateWithPrimaryAndPartitionKeys,
CreateWithWithoutPrimaryAndPartitionKeys
}
CreateWithWithoutPrimaryAndPartitionKeys, MaterializedViewMigration}
describe "Cassandra.Ecto", context_tag: :db do
describe "Migration behaviour" do
before do
Expand All @@ -37,7 +36,7 @@ defmodule CassandraEctoMigrationSpec do
expect(fn -> up(TestRepo, 84050906120000, CreateWithPrimaryAndPartitionKeys, log: false) end)
|> to(raise_exception())
end
it "fails to create table without primary and partition keys", focus: true do
it "fails to create table without primary and partition keys" do
expect(fn -> up(TestRepo, 85050906120000, CreateWithWithoutPrimaryAndPartitionKeys, log: false) end)
|> to(raise_exception())
end
Expand All @@ -60,6 +59,11 @@ defmodule CassandraEctoMigrationSpec do
assert [1] = TestRepo.all from p in "create_counter_migration", select: p.counter
down(TestRepo, 12050906120000, CreateCounterMigration, log: false)
end
it "creates materialized views" do
assert :ok = up(TestRepo, 12750906120000, MaterializedViewMigration, log: false)
assert [2, 3] = TestRepo.all from p in "materialized_view_view_migration", select: p.id, where: p.value == 2
assert :ok = down(TestRepo, 12750906120000, MaterializedViewMigration, log: false)
end
end
context "with %Index" do
it "creates index" do
Expand Down
25 changes: 25 additions & 0 deletions spec/support/migrations.exs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,31 @@ defmodule Cassandra.Ecto.Spec.Support.Migrations do
end
end

defmodule MaterializedViewMigration do
use Cassandra.Ecto.Migration
import Ecto.Query
@table :materialized_view_table_migration
@view :materialized_view_view_migration

def up do
create table(@table, primary_key: false) do
add :id, :integer, primary_key: true
add :value, :integer
end
create materialized_view(@view, primary_key: {:value, :id},
as: (from p in Atom.to_string(@table), select: {p.id, p.value}, where: not(is_nil(p.value))))
execute "INSERT INTO #{@table} (id, value) VALUES (1, 1)"
execute "INSERT INTO #{@table} (id, value) VALUES (2, 2)"
execute "INSERT INTO #{@table} (id, value) VALUES (3, 2)"
end

def down do
drop materialized_view(@view)
drop table(@table)
end

end

defmodule CreateWithDifferentTypesMigration do
use Cassandra.Ecto.Migration

Expand Down

0 comments on commit b7ad6eb

Please sign in to comment.