Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sync-service): Encode JSON encode on write rather than read #1485

Merged
merged 6 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/eight-dragons-brake.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

JSON encoding on write rather than read to reduce memory footprint. For example the memory use of an initial sync of a 35MB/200k row table has been reduced from 50MB to 25MB on the first initial sync (which includes the encoding and the writing to storage) and to 6MB on subsequent initial syncs (where we just read from storage and there's no encoding).
122 changes: 122 additions & 0 deletions packages/sync-service/lib/electric/log_items.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
defmodule Electric.LogItems do
alias Electric.Replication.Changes
alias Electric.Replication.LogOffset
alias Electric.Shapes.Shape
alias Electric.Utils

@moduledoc """
Defines the structure and how to create the items in the log that the electric client reads.

The log_item() data structure is a map for ease of consumption in the Elixir code,
however when JSON encoded (not done in this module) it's the format that the electric
client accepts.
"""

@type log_item :: %{
key: String.t(),
value: map(),
headers: map(),
offset: LogOffset.t()
}

@spec from_change(
Changes.data_change(),
txid :: non_neg_integer() | nil,
pk_cols :: [String.t()]
) :: [log_item(), ...]
def from_change(%Changes.NewRecord{} = change, txid, _) do
[
%{
key: change.key,
value: change.record,
headers: %{action: :insert, txid: txid, relation: Tuple.to_list(change.relation)},
robacourt marked this conversation as resolved.
Show resolved Hide resolved
offset: change.log_offset
}
]
end

def from_change(%Changes.DeletedRecord{} = change, txid, pk_cols) do
[
%{
key: change.key,
value: take_pks_or_all(change.old_record, pk_cols),
headers: %{action: :delete, txid: txid, relation: Tuple.to_list(change.relation)},
offset: change.log_offset
}
]
end

# `old_key` is nil when it's unchanged. This is not possible when there is no PK defined.
def from_change(%Changes.UpdatedRecord{old_key: nil} = change, txid, pk_cols) do
[
%{
key: change.key,
value: Map.take(change.record, Enum.concat(pk_cols, change.changed_columns)),
headers: %{action: :update, txid: txid, relation: Tuple.to_list(change.relation)},
offset: change.log_offset
}
]
end

def from_change(%Changes.UpdatedRecord{} = change, txid, pk_cols) do
[
%{
key: change.old_key,
value: take_pks_or_all(change.old_record, pk_cols),
headers: %{
action: :delete,
txid: txid,
relation: Tuple.to_list(change.relation),
key_change_to: change.key
},
offset: change.log_offset
},
%{
key: change.key,
value: change.record,
headers: %{
action: :insert,
txid: txid,
relation: Tuple.to_list(change.relation),
key_change_from: change.old_key
},
offset: LogOffset.increment(change.log_offset)
}
]
end

defp take_pks_or_all(record, []), do: record
defp take_pks_or_all(record, pks), do: Map.take(record, pks)

@spec from_snapshot_row_stream(
row_stream :: Stream.t(list()),
offset :: LogOffset.t(),
shape :: Shape.t(),
query_info :: %Postgrex.Query{}
) :: log_item()
def from_snapshot_row_stream(row_stream, offset, shape, query_info) do
Stream.map(row_stream, &from_snapshot_row(&1, offset, shape, query_info))
end

defp from_snapshot_row(row, offset, shape, query_info) do
value = value(row, query_info)

key = Changes.build_key(shape.root_table, value, Shape.pk(shape))

%{
key: key,
value: value,
headers: %{action: :insert},
offset: offset
}
end

defp value(row, %Postgrex.Query{columns: columns, result_types: types}) do
[columns, types, row]
|> Enum.zip_with(fn
[col, Postgrex.Extensions.UUID, val] -> {col, Utils.encode_uuid(val)}
[col, _, val] -> {col, to_string(val)}
end)
|> Map.new()
end
end
29 changes: 13 additions & 16 deletions packages/sync-service/lib/electric/plug/serve_shape_plug.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ defmodule Electric.Plug.ServeShapePlug do
@before_all_offset LogOffset.before_all()

# Control messages
@up_to_date [%{headers: %{control: "up-to-date"}}]
@must_refetch [%{headers: %{control: "must-refetch"}}]
@up_to_date [Jason.encode!(%{headers: %{control: "up-to-date"}})]
alco marked this conversation as resolved.
Show resolved Hide resolved
@must_refetch Jason.encode!([%{headers: %{control: "must-refetch"}}])

defmodule Params do
use Ecto.Schema
Expand Down Expand Up @@ -194,10 +194,7 @@ defmodule Electric.Plug.ServeShapePlug do
"location",
"#{conn.request_path}?shape_id=#{active_shape_id}&offset=-1"
)
|> send_resp(
409,
Jason.encode_to_iodata!(@must_refetch)
)
|> send_resp(409, @must_refetch)
|> halt()
else
conn
Expand Down Expand Up @@ -301,14 +298,16 @@ defmodule Electric.Plug.ServeShapePlug do
} = conn,
_
) do
log =
Shapes.get_log_stream(conn.assigns.config, shape_id, since: offset, up_to: last_offset)
|> Enum.to_list()
log = Shapes.get_log_stream(conn.assigns.config, shape_id, since: offset, up_to: last_offset)

if log == [] and conn.assigns.live do
if Enum.take(log, 1) == [] and conn.assigns.live do
hold_until_change(conn, shape_id)
else
send_resp(conn, 200, Jason.encode_to_iodata!(log ++ @up_to_date))
[log, @up_to_date]
|> Stream.concat()
|> to_json_stream()
|> Stream.chunk_every(500)
|> send_stream(conn, 200)
end
end

Expand All @@ -318,9 +317,7 @@ defmodule Electric.Plug.ServeShapePlug do
defp to_json_stream(items) do
Stream.concat([
[@json_list_start],
items
|> Stream.map(&Jason.encode_to_iodata!/1)
|> Stream.intersperse(@json_item_separator),
Stream.intersperse(items, @json_item_separator),
[@json_list_end]
])
end
Expand Down Expand Up @@ -374,10 +371,10 @@ defmodule Electric.Plug.ServeShapePlug do
{^ref, :shape_rotation} ->
# We may want to notify the client better that the shape ID had changed, but just closing the response
# and letting the client handle it on reconnection is good enough.
send_resp(conn, 200, Jason.encode_to_iodata!(@up_to_date))
send_resp(conn, 200, ["[", @up_to_date, "]"])
after
# If we timeout, return an empty body and 204 as there's no response body.
long_poll_timeout -> send_resp(conn, 204, Jason.encode_to_iodata!(@up_to_date))
long_poll_timeout -> send_resp(conn, 204, ["[", @up_to_date, "]"])
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule Electric.Replication.ShapeLogCollector do
When any txn comes from postgres, we need to store it into the
log for this shape if and only if it has txid >= xmin of the snapshot.
"""
alias Electric.ShapeCache.Storage
alias Electric.LogItems
alias Electric.Postgres.Inspector
alias Electric.Shapes.Shape
alias Electric.Replication.Changes
Expand Down Expand Up @@ -74,9 +74,7 @@ defmodule Electric.Replication.ShapeLogCollector do

relevant_changes != [] ->
relevant_changes
|> Enum.flat_map(
&Storage.prepare_change_for_storage(&1, xid, Shape.pk(shape_def, &1.relation))
)
|> Enum.flat_map(&LogItems.from_change(&1, xid, Shape.pk(shape_def, &1.relation)))
# TODO: what's a graceful way to handle failure to append to log?
# Right now we'll just fail everything
|> then(&shape_cache.append_to_log!(shape_id, last_log_offset, &1, opts))
Expand Down
55 changes: 14 additions & 41 deletions packages/sync-service/lib/electric/shape_cache/cub_db_storage.ex
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
defmodule Electric.ShapeCache.CubDbStorage do
alias Electric.LogItems
alias Electric.Replication.LogOffset
alias Electric.Replication.Changes
alias Electric.Utils
alias Electric.Shapes.Shape
@behaviour Electric.ShapeCache.Storage

@snapshot_key_type 0
@log_key_type 1
@snapshot_offset LogOffset.first()

def shared_opts(opts) do
file_path = Access.get(opts, :file_path, "./shapes")
Expand Down Expand Up @@ -90,10 +89,10 @@ defmodule Electric.ShapeCache.CubDbStorage do
max_key: snapshot_end(shape_id)
)
|> Stream.flat_map(fn {_, items} -> items end)
|> Stream.map(&storage_item_to_log_item/1)
|> Stream.map(fn {_, item} -> item end)

# FIXME: this is naive while we don't have snapshot metadata to get real offset
{LogOffset.first(), stream}
{@snapshot_offset, stream}
end

def get_log_stream(shape_id, offset, max_offset, opts) do
Expand All @@ -106,31 +105,32 @@ defmodule Electric.ShapeCache.CubDbStorage do
max_key: max_key,
min_key_inclusive: false
)
|> Stream.map(&storage_item_to_log_item/1)
|> Stream.map(fn {_, item} -> item end)
end

def has_log_entry?(shape_id, offset, opts) do
# FIXME: this is naive while we don't have snapshot metadata to get real offsets
CubDB.has_key?(opts.db, log_key(shape_id, offset)) or
(snapshot_exists?(shape_id, opts) and offset == LogOffset.first())
(snapshot_exists?(shape_id, opts) and offset == @snapshot_offset)
end

def make_new_snapshot!(shape_id, shape, query_info, data_stream, opts) do
data_stream
|> LogItems.from_snapshot_row_stream(@snapshot_offset, shape, query_info)
|> Stream.with_index()
|> Stream.map(&row_to_snapshot_item(&1, shape_id, shape, query_info))
|> Stream.map(fn {log_item, index} ->
{snapshot_key(shape_id, index), Jason.encode!(log_item)}
end)
|> Stream.chunk_every(500)
|> Stream.each(fn [{key, _} | _] = chunk -> CubDB.put(opts.db, key, chunk) end)
|> Stream.run()

CubDB.put(opts.db, snapshot_meta_key(shape_id), 0)
end

def append_to_log!(shape_id, changes, opts) do
changes
|> Enum.map(fn {offset, key, action, value, header_data} ->
{log_key(shape_id, offset), {key, action, value, header_data}}
end)
def append_to_log!(shape_id, log_items, opts) do
log_items
|> Enum.map(fn log_item -> {log_key(shape_id, log_item.offset), Jason.encode!(log_item)} end)
|> then(&CubDB.put_multi(opts.db, &1))

:ok
Expand Down Expand Up @@ -176,7 +176,7 @@ defmodule Electric.ShapeCache.CubDbStorage do
defp shapes_end, do: shape_key("zzz-end")

# FIXME: this is naive while we don't have snapshot metadata to get real offsets
defp offset({_shape_id, @snapshot_key_type, _index}), do: LogOffset.first()
defp offset({_shape_id, @snapshot_key_type, _index}), do: @snapshot_offset

defp offset({_shape_id, @log_key_type, tuple_offset}),
do: LogOffset.new(tuple_offset)
Expand All @@ -186,31 +186,4 @@ defmodule Electric.ShapeCache.CubDbStorage do

defp snapshot_start(shape_id), do: snapshot_key(shape_id, 0)
defp snapshot_end(shape_id), do: snapshot_key(shape_id, :end)

defp row_to_snapshot_item({row, index}, shape_id, shape, %Postgrex.Query{
columns: columns,
result_types: types
}) do
serialized_row =
[columns, types, row]
|> Enum.zip_with(fn
[col, Postgrex.Extensions.UUID, val] -> {col, Utils.encode_uuid(val)}
[col, _, val] -> {col, val}
end)
|> Map.new()

change_key = Changes.build_key(shape.root_table, serialized_row, Shape.pk(shape))

{snapshot_key(shape_id, index), {change_key, :insert, serialized_row, %{}}}
end

defp storage_item_to_log_item({key, {change_key, action, value, header_data}})
when is_binary(change_key) do
%{
key: change_key,
value: value,
headers: Map.put(header_data, :action, action),
offset: offset(key)
}
end
end
Loading
Loading