Skip to content

Commit

Permalink
Extract LogItem code to remove duplication
Browse files Browse the repository at this point in the history
  • Loading branch information
robacourt committed Aug 7, 2024
1 parent 9c8bf4e commit 2d8f95e
Show file tree
Hide file tree
Showing 12 changed files with 398 additions and 359 deletions.
114 changes: 114 additions & 0 deletions packages/sync-service/lib/electric/log_items.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
defmodule Electric.LogItems do
alias Electric.Replication.Changes
alias Electric.Replication.LogOffset
alias Electric.Shapes.Shape
alias Electric.Utils

@type log_item :: %{
key: String.t(),
value: map(),
headers: map(),
offset: LogOffset.t()
}

@spec from_change(
Changes.data_change(),
txid :: non_neg_integer() | nil,
pk_cols :: [String.t()]
) :: [log_item(), ...]
def from_change(%Changes.NewRecord{} = change, txid, _) do
[
%{
key: change.key,
value: change.record,
headers: %{action: :insert, txid: txid, relation: Tuple.to_list(change.relation)},
offset: change.log_offset
}
]
end

def from_change(%Changes.DeletedRecord{} = change, txid, pk_cols) do
[
%{
key: change.key,
value: take_pks_or_all(change.old_record, pk_cols),
headers: %{action: :delete, txid: txid, relation: Tuple.to_list(change.relation)},
offset: change.log_offset
}
]
end

# `old_key` is nil when it's unchanged. This is not possible when there is no PK defined.
def from_change(%Changes.UpdatedRecord{old_key: nil} = change, txid, pk_cols) do
[
%{
key: change.key,
value: Map.take(change.record, Enum.concat(pk_cols, change.changed_columns)),
headers: %{action: :update, txid: txid, relation: Tuple.to_list(change.relation)},
offset: change.log_offset
}
]
end

def from_change(%Changes.UpdatedRecord{} = change, txid, pk_cols) do
[
%{
key: change.old_key,
value: take_pks_or_all(change.old_record, pk_cols),
headers: %{
action: :delete,
txid: txid,
relation: Tuple.to_list(change.relation),
key_change_to: change.key
},
offset: change.log_offset
},
%{
key: change.key,
value: change.record,
headers: %{
action: :insert,
txid: txid,
relation: Tuple.to_list(change.relation),
key_change_from: change.old_key
},
offset: LogOffset.increment(change.log_offset)
}
]
end

defp take_pks_or_all(record, []), do: record
defp take_pks_or_all(record, pks), do: Map.take(record, pks)

@spec from_snapshot_row_stream(
row_stream :: Stream.t(list()),
offset :: LogOffset.t(),
shape :: Shape.t(),
query_info :: %Postgrex.Query{}
) :: log_item()
def from_snapshot_row_stream(row_stream, offset, shape, query_info) do
Stream.map(row_stream, &from_snapshot_row(&1, offset, shape, query_info))
end

defp from_snapshot_row(row, offset, shape, query_info) do
value = value(row, query_info)

key = Changes.build_key(shape.root_table, value, Shape.pk(shape))

%{
key: key,
value: value,
headers: %{action: :insert},
offset: offset
}
end

defp value(row, %Postgrex.Query{columns: columns, result_types: types}) do
[columns, types, row]
|> Enum.zip_with(fn
[col, Postgrex.Extensions.UUID, val] -> {col, Utils.encode_uuid(val)}
[col, _, val] -> {col, to_string(val)}
end)
|> Map.new()
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule Electric.Replication.ShapeLogCollector do
When any txn comes from postgres, we need to store it into the
log for this shape if and only if it has txid >= xmin of the snapshot.
"""
alias Electric.ShapeCache.Storage
alias Electric.LogItems
alias Electric.Postgres.Inspector
alias Electric.Shapes.Shape
alias Electric.Replication.Changes
Expand Down Expand Up @@ -74,9 +74,7 @@ defmodule Electric.Replication.ShapeLogCollector do

relevant_changes != [] ->
relevant_changes
|> Enum.flat_map(
&Storage.prepare_change_for_storage(&1, xid, Shape.pk(shape_def, &1.relation))
)
|> Enum.flat_map(&LogItems.from_change(&1, xid, Shape.pk(shape_def, &1.relation)))
# TODO: what's a graceful way to handle failure to append to log?
# Right now we'll just fail everything
|> then(&shape_cache.append_to_log!(shape_id, last_log_offset, &1, opts))
Expand Down
54 changes: 12 additions & 42 deletions packages/sync-service/lib/electric/shape_cache/cub_db_storage.ex
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
defmodule Electric.ShapeCache.CubDbStorage do
alias Electric.LogItems
alias Electric.Replication.LogOffset
alias Electric.Replication.Changes
alias Electric.Utils
alias Electric.Shapes.Shape
@behaviour Electric.ShapeCache.Storage

@snapshot_key_type 0
@log_key_type 1
@snapshot_offset LogOffset.first()

def shared_opts(opts) do
file_path = Access.get(opts, :file_path, "./shapes")
Expand Down Expand Up @@ -93,7 +92,7 @@ defmodule Electric.ShapeCache.CubDbStorage do
|> Stream.map(fn {_, item} -> item end)

# FIXME: this is naive while we don't have snapshot metadata to get real offset
{LogOffset.first(), stream}
{@snapshot_offset, stream}
end

def get_log_stream(shape_id, offset, max_offset, opts) do
Expand All @@ -112,27 +111,26 @@ defmodule Electric.ShapeCache.CubDbStorage do
def has_log_entry?(shape_id, offset, opts) do
# FIXME: this is naive while we don't have snapshot metadata to get real offsets
CubDB.has_key?(opts.db, log_key(shape_id, offset)) or
(snapshot_exists?(shape_id, opts) and offset == LogOffset.first())
(snapshot_exists?(shape_id, opts) and offset == @snapshot_offset)
end

def make_new_snapshot!(shape_id, shape, query_info, data_stream, opts) do
data_stream
|> LogItems.from_snapshot_row_stream(@snapshot_offset, shape, query_info)
|> Stream.with_index()
|> Stream.map(&row_to_snapshot_item(&1, shape_id, shape, query_info))
|> Stream.map(&storage_item_to_log_item/1)
|> Stream.map(fn {log_item, index} ->
{snapshot_key(shape_id, index), Jason.encode!(log_item)}
end)
|> Stream.chunk_every(500)
|> Stream.each(fn [{key, _} | _] = chunk -> CubDB.put(opts.db, key, chunk) end)
|> Stream.run()

CubDB.put(opts.db, snapshot_meta_key(shape_id), 0)
end

def append_to_log!(shape_id, changes, opts) do
changes
|> Enum.map(fn {offset, key, action, value, header_data} ->
{log_key(shape_id, offset), {key, action, value, header_data}}
end)
|> Enum.map(&storage_item_to_log_item/1)
def append_to_log!(shape_id, log_items, opts) do
log_items
|> Enum.map(fn log_item -> {log_key(shape_id, log_item.offset), Jason.encode!(log_item)} end)
|> then(&CubDB.put_multi(opts.db, &1))

:ok
Expand Down Expand Up @@ -178,7 +176,7 @@ defmodule Electric.ShapeCache.CubDbStorage do
defp shapes_end, do: shape_key("zzz-end")

# FIXME: this is naive while we don't have snapshot metadata to get real offsets
defp offset({_shape_id, @snapshot_key_type, _index}), do: LogOffset.first()
defp offset({_shape_id, @snapshot_key_type, _index}), do: @snapshot_offset

defp offset({_shape_id, @log_key_type, tuple_offset}),
do: LogOffset.new(tuple_offset)
Expand All @@ -188,32 +186,4 @@ defmodule Electric.ShapeCache.CubDbStorage do

defp snapshot_start(shape_id), do: snapshot_key(shape_id, 0)
defp snapshot_end(shape_id), do: snapshot_key(shape_id, :end)

defp row_to_snapshot_item({row, index}, shape_id, shape, %Postgrex.Query{
columns: columns,
result_types: types
}) do
serialized_row =
[columns, types, row]
|> Enum.zip_with(fn
[col, Postgrex.Extensions.UUID, val] -> {col, Utils.encode_uuid(val)}
[col, _, val] -> {col, val}
end)
|> Map.new()

change_key = Changes.build_key(shape.root_table, serialized_row, Shape.pk(shape))

{snapshot_key(shape_id, index), {change_key, :insert, serialized_row, %{}}}
end

defp storage_item_to_log_item({key, {change_key, action, value, header_data}})
when is_binary(change_key) do
{key,
Jason.encode!(%{
key: change_key,
value: value,
headers: Map.put(header_data, :action, action),
offset: offset(key)
})}
end
end
79 changes: 14 additions & 65 deletions packages/sync-service/lib/electric/shape_cache/in_memory_storage.ex
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
defmodule Electric.ShapeCache.InMemoryStorage do
alias Electric.LogItems
alias Electric.Replication.LogOffset
alias Electric.Replication.Changes
alias Electric.Utils
alias Electric.Shapes.Shape
use Agent

@behaviour Electric.ShapeCache.Storage

@snapshot_offset LogOffset.first()

def shared_opts(opts) do
snapshot_ets_table_name = Access.get(opts, :snapshot_ets_table, :snapshot_ets_table)
log_ets_table_name = Access.get(opts, :log_ets_table, :log_ets_table)
Expand Down Expand Up @@ -39,15 +39,13 @@ defmodule Electric.ShapeCache.InMemoryStorage do
end

def get_snapshot(shape_id, opts) do
offset = LogOffset.first()

stream =
:ets.select(opts.snapshot_ets_table, [
{{{:data, shape_id, :"$1"}, :"$2"}, [], [{{:"$1", :"$2"}}]}
])
|> Stream.map(fn {_, item} -> item end)

{offset, stream}
{@snapshot_offset, stream}
end

def get_log_stream(shape_id, offset, max_offset, opts) do
Expand Down Expand Up @@ -77,7 +75,7 @@ defmodule Electric.ShapeCache.InMemoryStorage do
]) do
[true] -> true
# FIXME: this is naive while we don't have snapshot metadata to get real offset
[] -> snapshot_exists?(shape_id, opts) and offset == LogOffset.first()
[] -> snapshot_exists?(shape_id, opts) and offset == @snapshot_offset
end
end

Expand All @@ -89,12 +87,13 @@ defmodule Electric.ShapeCache.InMemoryStorage do
map()
) :: :ok
def make_new_snapshot!(shape_id, shape, query_info, data_stream, opts) do
offset = LogOffset.first()
ets_table = opts.snapshot_ets_table

data_stream
|> Stream.map(&row_to_snapshot_entry(&1, shape_id, shape, query_info))
|> Stream.map(&snapshot_storage_item_to_log_item({shape_id, offset}, &1))
|> LogItems.from_snapshot_row_stream(@snapshot_offset, shape, query_info)
|> Stream.map(fn log_item ->
{{:data, shape_id, log_item.key}, Jason.encode!(log_item)}
end)
|> Stream.chunk_every(500)
|> Stream.each(fn chunk -> :ets.insert(ets_table, chunk) end)
|> Stream.run()
Expand All @@ -103,14 +102,13 @@ defmodule Electric.ShapeCache.InMemoryStorage do
:ok
end

def append_to_log!(shape_id, changes, opts) do
def append_to_log!(shape_id, log_items, opts) do
ets_table = opts.log_ets_table

changes
|> Enum.map(fn {offset, key, action, value, header_data} ->
offset = storage_offset(offset)
item = {key, action, value, header_data}
log_storage_item_to_log_item({shape_id, offset}, item)
log_items
|> Enum.map(fn log_item ->
offset = storage_offset(log_item.offset)
{{shape_id, offset}, Jason.encode!(log_item)}
end)
|> then(&:ets.insert(ets_table, &1))

Expand All @@ -124,55 +122,6 @@ defmodule Electric.ShapeCache.InMemoryStorage do
:ok
end

@doc false
def row_to_snapshot_entry(row, shape_id, shape, %Postgrex.Query{
columns: columns,
result_types: types
}) do
serialized_row =
[columns, types, row]
|> Enum.zip_with(fn
[col, Postgrex.Extensions.UUID, val] -> {col, Utils.encode_uuid(val)}
[col, _, val] -> {col, to_string(val)}
end)
|> Map.new()

key = Changes.build_key(shape.root_table, serialized_row, Shape.pk(shape))

{{:data, shape_id, key}, serialized_row}
end

defp snapshot_storage_item_to_log_item(
{_shape_id, snapshot_log_offset},
{{_, _, change_key} = key, value}
) do
{key,
Jason.encode!(%{
key: change_key,
value: value,
headers: %{action: :insert},
offset: snapshot_log_offset
})}
end

# Turns a stored log item into a log item
# by modifying the turning the tuple offset
# back into a LogOffset value.
defp log_storage_item_to_log_item(
{_shape_id, position} = key,
{change_key, action, value, header_data}
) do
offset = LogOffset.new(position)

{key,
Jason.encode!(%{
key: change_key,
value: value,
headers: Map.put(header_data, :action, action),
offset: offset
})}
end

# Turns a LogOffset into a tuple representation
# for storing in the ETS table
defp storage_offset(offset) do
Expand Down
Loading

0 comments on commit 2d8f95e

Please sign in to comment.