From 2b08397d956b33751a81ca88809268c2c8e7f74a Mon Sep 17 00:00:00 2001 From: Ilia Borovitinov Date: Thu, 15 Aug 2024 19:11:29 +0300 Subject: [PATCH] feat: introduce CubDB+ storage This introduces a new storage backend, that's build as a plain file for snapshots, and CubDB for any metadata and ongoing logs. The benefit here is that we don't have to close the file while processing the results coming from the PostgreSQL query, unlike with batched CubDB writes. The result is that we're writing the snapshot roughly at the same rate as PostgreSQL generates query results. --- packages/sync-service/config/runtime.exs | 6 +- .../shape_cache/mixed_disk_storage.ex | 217 ++++++++++++++++++ .../storage_implementations_test.exs | 34 ++- 3 files changed, 248 insertions(+), 9 deletions(-) create mode 100644 packages/sync-service/lib/electric/shape_cache/mixed_disk_storage.ex diff --git a/packages/sync-service/config/runtime.exs b/packages/sync-service/config/runtime.exs index 6eba60566d..e2298b653c 100644 --- a/packages/sync-service/config/runtime.exs +++ b/packages/sync-service/config/runtime.exs @@ -72,6 +72,7 @@ cache_stale_age = env!("CACHE_STALE_AGE", :integer, 60 * 5) statsd_host = env!("STATSD_HOST", :string?, nil) cubdb_file_path = env!("CUBDB_FILE_PATH", :string, "./shapes") +mixed_storage_dir = env!("MIXED_STORAGE_DIR", :string, "./mixed_storage") storage = env!( @@ -84,8 +85,11 @@ storage = "cubdb" -> {Electric.ShapeCache.CubDbStorage, file_path: cubdb_file_path} + "mixed" -> + {Electric.ShapeCache.MixedDiskStorage, storage_dir: mixed_storage_dir} + _ -> - raise Dotenvy.Error, message: "storage must be one of: MEMORY, CUBDB" + raise Dotenvy.Error, message: "storage must be one of: MEMORY, CUBDB, MIXED" end end, {Electric.ShapeCache.CubDbStorage, file_path: cubdb_file_path} diff --git a/packages/sync-service/lib/electric/shape_cache/mixed_disk_storage.ex b/packages/sync-service/lib/electric/shape_cache/mixed_disk_storage.ex new file mode 100644 index 0000000000..9f7dee3a97 --- /dev/null +++ b/packages/sync-service/lib/electric/shape_cache/mixed_disk_storage.ex @@ -0,0 +1,217 @@ +defmodule Electric.ShapeCache.MixedDiskStorage do + alias Electric.Telemetry.OpenTelemetry + alias Electric.Replication.LogOffset + @behaviour Electric.ShapeCache.Storage + + @version 1 + @version_key :version + + def shared_opts(opts) do + storage_dir = Access.get(opts, :storage_dir, "./shapes") + db = Access.get(opts, :db, :mixed_shape_db) + + {:ok, %{storage_dir: storage_dir, db: db, version: @version}} + end + + def child_spec(opts) do + %{ + id: __MODULE__, + start: {__MODULE__, :start_link, [opts]}, + type: :worker, + restart: :permanent + } + end + + def start_link(opts) do + File.mkdir_p(opts.storage_dir) + File.mkdir_p(Path.join([opts.storage_dir, "cubdb"])) + File.mkdir_p(Path.join([opts.storage_dir, "shapes"])) + CubDB.start_link(data_dir: Path.join([opts.storage_dir, "cubdb"]), name: opts.db) + end + + def add_shape(shape_id, shape, opts) do + CubDB.put(opts.db, shape_key(shape_id), shape) + end + + def set_snapshot_xmin(shape_id, xmin, opts) do + CubDB.put(opts.db, xmin_key(shape_id), xmin) + end + + defp snapshot_xmin(shape_id, opts) do + CubDB.get(opts.db, xmin_key(shape_id)) + end + + def list_shapes(opts) do + opts.db + |> CubDB.select(min_key: shapes_start(), max_key: shapes_end()) + |> Enum.map(fn {{:shapes, shape_id}, shape} -> + %{ + shape_id: shape_id, + shape: shape, + latest_offset: latest_offset(shape_id, opts), + snapshot_xmin: snapshot_xmin(shape_id, opts) + } + end) + end + + def initialise(opts) do + stored_version = stored_version(opts) + + opts.db + |> CubDB.select(min_key: shapes_start(), max_key: shapes_end()) + |> Stream.map(fn {{:shapes, shape_id}, _} -> shape_id end) + |> Stream.filter(fn shape_id -> + stored_version != opts.version || + snapshot_xmin(shape_id, opts) == nil || + not CubDB.has_key?(opts.db, snapshot_meta_key(shape_id)) + end) + |> Enum.each(&cleanup!(&1, opts)) + + CubDB.put(opts.db, @version_key, @version) + end + + def snapshot_started?(shape_id, opts) do + CubDB.has_key?(opts.db, snapshot_started_key(shape_id)) + end + + def mark_snapshot_as_started(shape_id, opts) do + CubDB.put(opts.db, snapshot_started_key(shape_id), true) + end + + defp latest_offset(shape_id, opts) do + case CubDB.select(opts.db, + min_key: log_start(shape_id), + max_key: log_end(shape_id), + min_key_inclusive: true, + reverse: true + ) + |> Enum.take(1) do + [{key, _}] -> + offset(key) + + _ -> + LogOffset.first() + end + end + + defp offset({_shape_id, :log, tuple_offset}), do: LogOffset.new(tuple_offset) + + def make_new_snapshot!(shape_id, data_stream, opts) do + OpenTelemetry.with_span("storage.make_new_snapshot", [storage_impl: "mixed_disk"], fn -> + data_stream + |> Stream.map(&[String.replace(&1, "\n", "\\n"), "\n"]) + # Use the 4 byte marker (ASCII "end of transmission") to indicate the end of the snapshot, + # so that concurrent readers can detect that the snapshot has been completed. + |> Stream.concat([<<4::utf8>>]) + |> Stream.into(File.stream!(shape_snapshot_path(shape_id, opts), [:append, :delayed_write])) + |> Stream.run() + + CubDB.put(opts.db, snapshot_meta_key(shape_id), LogOffset.first()) + end) + end + + def snapshot_exists?(shape_id, opts) do + CubDB.has_key?(opts.db, snapshot_meta_key(shape_id)) + end + + def get_snapshot(shape_id, opts) do + if snapshot_started?(shape_id, opts) do + {LogOffset.first(), + Stream.resource( + fn -> open_snapshot_file(shape_id, opts) end, + fn file -> + case IO.binread(file, :line) do + {:error, reason} -> + raise IO.StreamError, reason: reason + + :eof -> + Process.sleep(10) + {[], file} + + # The 4 byte marker (ASCII "end of transmission") indicates the end of the snapshot file. + <<4::utf8>> -> + {:halt, file} + + line -> + {[line], file} + end + end, + &File.close/1 + )} + else + raise "Snapshot no longer available" + end + end + + defp open_snapshot_file(shape_id, opts) do + case File.open(shape_snapshot_path(shape_id, opts), [:read, :raw, read_ahead: 1024]) do + {:ok, file} -> file + {:error, :enoent} -> open_snapshot_file(shape_id, opts) + {:error, reason} -> raise IO.StreamError, reason: reason + end + end + + def append_to_log!(shape_id, log_items, opts) do + log_items + |> Enum.map(fn log_item -> {log_key(shape_id, log_item.offset), Jason.encode!(log_item)} end) + |> then(&CubDB.put_multi(opts.db, &1)) + + :ok + end + + def get_log_stream(shape_id, offset, max_offset, opts) do + max_key = + if max_offset == :infinity, do: log_end(shape_id), else: log_key(shape_id, max_offset) + + opts.db + |> CubDB.select( + min_key: log_key(shape_id, offset), + max_key: max_key, + min_key_inclusive: false + ) + |> Stream.map(fn {_, item} -> item end) + end + + def has_log_entry?(shape_id, offset, opts) do + # FIXME: this is naive while we don't have snapshot metadata to get real offsets + CubDB.has_key?(opts.db, log_key(shape_id, offset)) or + (snapshot_started?(shape_id, opts) and offset == LogOffset.first()) + end + + def cleanup!(shape_id, opts) do + [ + snapshot_meta_key(shape_id), + shape_key(shape_id), + xmin_key(shape_id), + snapshot_started_key(shape_id) + ] + |> Enum.concat(keys_from_range(log_start(shape_id), log_end(shape_id), opts)) + |> then(&CubDB.delete_multi(opts.db, &1)) + + File.rm_rf(shape_snapshot_path(shape_id, opts)) + end + + defp keys_from_range(min_key, max_key, opts) do + CubDB.select(opts.db, min_key: min_key, max_key: max_key) + |> Stream.map(&elem(&1, 0)) + end + + defp shape_snapshot_path(shape_id, opts) do + Path.join([opts.storage_dir, "shapes", "#{shape_id}_snapshot.jsonl"]) + end + + defp stored_version(opts) do + CubDB.get(opts.db, @version_key) + end + + # Key helpers + defp shape_key(shape_id), do: {:shapes, shape_id} + defp xmin_key(shape_id), do: {:snapshot_xmin, shape_id} + defp snapshot_meta_key(shape_id), do: {:snapshot_meta, shape_id} + defp log_key(shape_id, offset), do: {shape_id, :log, LogOffset.to_tuple(offset)} + defp log_start(shape_id), do: log_key(shape_id, LogOffset.first()) + defp log_end(shape_id), do: log_key(shape_id, LogOffset.last()) + defp shapes_start, do: shape_key(0) + defp shapes_end, do: shape_key("zzz-end") + defp snapshot_started_key(shape_id), do: {:snapshot_started, shape_id} +end diff --git a/packages/sync-service/test/electric/shape_cache/storage_implementations_test.exs b/packages/sync-service/test/electric/shape_cache/storage_implementations_test.exs index 65854dec77..435ae02d02 100644 --- a/packages/sync-service/test/electric/shape_cache/storage_implementations_test.exs +++ b/packages/sync-service/test/electric/shape_cache/storage_implementations_test.exs @@ -8,6 +8,7 @@ defmodule Electric.ShapeCache.StorageImplimentationsTest do alias Electric.Replication.Changes alias Electric.ShapeCache.CubDbStorage alias Electric.ShapeCache.InMemoryStorage + alias Electric.ShapeCache.MixedDiskStorage alias Electric.Shapes.Shape alias Electric.Utils @moduletag :tmp_dir @@ -39,9 +40,8 @@ defmodule Electric.ShapeCache.StorageImplimentationsTest do } ] |> Enum.map(&Jason.encode!/1) - |> Enum.map(&List.wrap/1) - for module <- [InMemoryStorage, CubDbStorage] do + for module <- [InMemoryStorage, CubDbStorage, MixedDiskStorage] do module_name = module |> Module.split() |> List.last() doctest module, import: true @@ -94,10 +94,22 @@ defmodule Electric.ShapeCache.StorageImplimentationsTest do end test "does not leak results from other snapshots", %{module: storage, opts: opts} do - another_data_stream = [ - [<<3::128>>, "row3"], - [<<4::128>>, "row4"] - ] + another_data_stream = + [ + %{ + key: ~S|"public"."the-table"/"00000000-0000-0000-0000-000000000003"|, + value: %{id: "00000000-0000-0000-0000-000000000003", title: "row3"}, + headers: %{operation: "insert"}, + offset: @snapshot_offset_encoded + }, + %{ + key: ~S|"public"."the-table"/"00000000-0000-0000-0000-000000000004"|, + value: %{id: "00000000-0000-0000-0000-000000000004", title: "row4"}, + headers: %{operation: "insert"}, + offset: @snapshot_offset_encoded + } + ] + |> Enum.map(&Jason.encode!/1) storage.mark_snapshot_as_started(@shape_id, opts) storage.make_new_snapshot!(@shape_id, @data_stream, opts) @@ -168,7 +180,6 @@ defmodule Electric.ShapeCache.StorageImplimentationsTest do offset: @snapshot_offset_encoded } |> Jason.encode!() - |> List.wrap() end) storage.mark_snapshot_as_started(@shape_id, opts) @@ -534,7 +545,7 @@ defmodule Electric.ShapeCache.StorageImplimentationsTest do end # Tests for storage implimentations that are recoverable - for module <- [CubDbStorage] do + for module <- [CubDbStorage, MixedDiskStorage] do module_name = module |> Module.split() |> List.last() describe "#{module_name}.list_shapes/1" do @@ -712,4 +723,11 @@ defmodule Electric.ShapeCache.StorageImplimentationsTest do file_path: tmp_dir ] end + + defp opts(MixedDiskStorage, %{tmp_dir: tmp_dir}) do + [ + db: String.to_atom("shape_cubdb_#{Utils.uuid4()}"), + storage_dir: tmp_dir + ] + end end