diff --git a/components/electric/lib/electric/replication/postgres/client.ex b/components/electric/lib/electric/replication/postgres/client.ex index f46700c65b..d714815032 100644 --- a/components/electric/lib/electric/replication/postgres/client.ex +++ b/components/electric/lib/electric/replication/postgres/client.ex @@ -6,7 +6,7 @@ defmodule Electric.Replication.Postgres.Client do doesn't support connecting via a unix socket. """ - import Electric.Postgres.Dialect.Postgresql, only: [quote_ident: 1] + import Electric.Postgres.Dialect.Postgresql, only: [escape_quotes: 2, quote_ident: 1] alias Electric.Postgres.Extension alias Electric.Postgres.Lsn @@ -322,6 +322,26 @@ defmodule Electric.Replication.Postgres.Client do end end + @type logical_message_option :: {:transactional?, boolean} | {:prefix, String.t()} + @doc """ + Emit a logical message to be consumed by LogicalReplicationProducer. + """ + @spec emit_logical_message(connection, String.t(), [logical_message_option()]) :: + :ok | {:error, term} + def emit_logical_message(conn, message, opts \\ []) do + transactional? = Keyword.get(opts, :transactional?, false) + prefix = Keyword.get(opts, :prefix, "") |> escape_quotes(?') + message = escape_quotes(message, ?') + + with {:ok, _, _} <- + squery( + conn, + "SELECT pg_logical_emit_message(#{transactional?}, '#{prefix}', '#{message}')" + ) do + :ok + end + end + @relkind %{table: ["r"], index: ["i"], view: ["v", "m"]} @pg_class_query """ diff --git a/components/electric/lib/electric/replication/postgres/logical_replication_producer.ex b/components/electric/lib/electric/replication/postgres/logical_replication_producer.ex index 23af2641f0..58e5ee63c3 100644 --- a/components/electric/lib/electric/replication/postgres/logical_replication_producer.ex +++ b/components/electric/lib/electric/replication/postgres/logical_replication_producer.ex @@ -53,6 +53,7 @@ defmodule Electric.Replication.Postgres.LogicalReplicationProducer do advance_timer: nil, main_slot: "", main_slot_lsn: %Lsn{}, + acked_lsn: %Lsn{}, resumable_wal_window: 1 @type t() :: %__MODULE__{ @@ -70,6 +71,7 @@ defmodule Electric.Replication.Postgres.LogicalReplicationProducer do advance_timer: reference() | nil, main_slot: binary(), main_slot_lsn: Lsn.t(), + acked_lsn: Lsn.t(), resumable_wal_window: pos_integer() } end @@ -86,6 +88,10 @@ defmodule Electric.Replication.Postgres.LogicalReplicationProducer do @advance_timeout 1_000 end + # 100 MB is somewhat arbitrary, chosen to balance between the frequency of acks and extra + # disk usage. + @active_slot_lag_bytes 100 * 1024 * 1024 + @spec start_link(Connectors.config()) :: :ignore | {:error, any} | {:ok, pid} def start_link(connector_config) do GenStage.start_link(__MODULE__, connector_config, name: name(connector_config)) @@ -121,7 +127,7 @@ defmodule Electric.Replication.Postgres.LogicalReplicationProducer do "Starting replication with publication=#{publication} and slots=#{main_slot},#{tmp_slot}}" ) - # The replication connection is used to consumer the logical replication stream from + # The replication connection is used to consume the logical replication stream from # Postgres and to send acknowledgements about received transactions back to Postgres, # allowing it to advance the replication slot forward and discard obsolete WAL records. with {:ok, repl_conn} <- Client.connect(repl_conn_opts), @@ -228,7 +234,8 @@ defmodule Electric.Replication.Postgres.LogicalReplicationProducer do end defp process_message( - %Message{transactional?: true, prefix: "electric.fk_chain_touch", content: content}, + %Message{transactional?: true, prefix: "electric.fk_chain_touch", content: content} = + msg, state ) do received = Jason.decode!(content) @@ -241,6 +248,8 @@ defmodule Electric.Replication.Postgres.LogicalReplicationProducer do ShadowTableTransformation.convert_tag_list_pg_to_satellite(received["tags"], state.origin) } + state = ack_message(msg, state) + {lsn, txn} = state.transaction {:noreply, [], @@ -249,7 +258,7 @@ defmodule Electric.Replication.Postgres.LogicalReplicationProducer do defp process_message(%Message{} = msg, state) do Logger.info("Got a message from PG via logical replication: #{inspect(msg)}") - + state = ack_message(msg, state) {:noreply, [], state} end @@ -385,16 +394,14 @@ defmodule Electric.Replication.Postgres.LogicalReplicationProducer do defp dispatch_events(%{demand: demand, queue: queue, queue_len: queue_len} = state) when demand >= queue_len do - queue |> :queue.last() |> ack(state) - + state = queue |> :queue.last() |> ack_transaction(state) state = %{state | queue: :queue.new(), queue_len: 0, demand: demand - queue_len} {:noreply, :queue.to_list(queue), state} end defp dispatch_events(%{demand: demand, queue: queue, queue_len: queue_len} = state) do {to_emit, queue_remaining} = :queue.split(demand, queue) - to_emit |> :queue.last() |> ack(state) - + state = to_emit |> :queue.last() |> ack_transaction(state) state = %{state | queue: queue_remaining, queue_len: queue_len - demand, demand: 0} {:noreply, :queue.to_list(to_emit), state} end @@ -408,25 +415,60 @@ defmodule Electric.Replication.Postgres.LogicalReplicationProducer do |> Map.new(fn {column, data} -> {column.name, data} end) end - @spec ack(Transaction.t(), State.t()) :: :ok + @spec ack_message(Message.t(), State.t()) :: State.t() if Mix.env() == :test do - def ack(%Transaction{}, %State{repl_conn: :conn}) do - :ok + def ack_message(_, %State{repl_conn: :conn} = state), do: state + end + + def ack_message(%Message{lsn: lsn}, state) do + cond do + state.queue_len > 0 -> + # We still have unacknowledged transactions waiting in the queue. Can't acknowledge the + # message before all transactions in the queue have been processed. + state + + Lsn.compare(lsn, state.acked_lsn) != :gt -> + # Either lsn == state.acked_lsn, in which case we can skip acknowledging it for the second time, + # or lsn < state.acked_lsn, which could mean that the message's LSN is lower than the + # LSN of the transaction it was emitted from or another transaction with a higher LSN + # has already been acknowledged. + state + + true -> + ack_lsn(lsn, state) end end - def ack(%Transaction{lsn: lsn}, state) do + @spec ack_transaction(Transaction.t(), State.t()) :: State.t() + + if Mix.env() == :test do + def ack_transaction(_, %State{repl_conn: :conn} = state), do: state + end + + def ack_transaction(%Transaction{lsn: lsn}, state) do + ack_lsn(lsn, state) + end + + def ack_lsn(%Lsn{} = lsn, state) do + assert_lsn_is_advancing!(lsn, state.acked_lsn, Lsn.compare(lsn, state.acked_lsn)) + Logger.debug("Acknowledging #{lsn}", origin: state.origin) - Client.acknowledge_lsn(state.repl_conn, lsn) + :ok = Client.acknowledge_lsn(state.repl_conn, lsn) + %{state | acked_lsn: lsn} end + defp assert_lsn_is_advancing!(_lsn, _acked_lsn, :gt), do: :ok + # Advance the replication slot to let Postgres discard old WAL records. # # TODO: make sure we're not removing transactions that are about to be requested by a newly # connected client. See VAX-1552. defp advance_main_slot(state) do {:ok, current_lsn} = Client.current_lsn(state.svc_conn) + + check_active_slot_lag(current_lsn, state) + min_in_window_lsn = Lsn.increment(current_lsn, -state.resumable_wal_window) if Lsn.compare(state.main_slot_lsn, min_in_window_lsn) == :lt do @@ -440,6 +482,17 @@ defmodule Electric.Replication.Postgres.LogicalReplicationProducer do end end + defp check_active_slot_lag(current_lsn, state) do + lsn_threshold = Lsn.increment(current_lsn, -@active_slot_lag_bytes) + + if Lsn.compare(state.acked_lsn, lsn_threshold) == :lt do + # If there's more than `@active_slot_lag_bytes` between the current LSN and the last + # ack'ed LSN, emit a logical message to be consumed by the producer in order to advance + # the active slot and prevent it from stalling removal of old WAL records by Postgres. + :ok = Client.emit_logical_message(state.svc_conn, "advance active slot") + end + end + defp schedule_main_slot_advance(state) do tref = :erlang.start_timer(@advance_timeout, self(), @advance_msg) %State{state | advance_timer: tref} diff --git a/e2e/tests/01.08_electric_acknowledges_logical_message_lsn.lux b/e2e/tests/01.08_electric_acknowledges_logical_message_lsn.lux new file mode 100644 index 0000000000..4f1b3284ad --- /dev/null +++ b/e2e/tests/01.08_electric_acknowledges_logical_message_lsn.lux @@ -0,0 +1,47 @@ +[doc Electric acknowledges logical message's LSN] +[include _shared.luxinc] + +[invoke setup] + +[shell pg_1] + !SELECT pg_logical_emit_message(false, '', 'hello from PG'); + ?? pg_logical_emit_message + ??------------------------- + ?(\d+/[0-9a-fA-F]+) + [my logical_msg_lsn=$1] + ??(1 row) + +[shell electric] + ??Got a message from PG via logical replication: \ + %Electric.Postgres.LogicalReplication.Messages.Message{\ + transactional?: false, \ + lsn: #Lsn<$logical_msg_lsn>, \ + prefix: "", \ + content: "hello from PG"} + ??Acknowledging $logical_msg_lsn + +[shell pg_1] + !BEGIN; + ??BEGIN + + !SELECT pg_logical_emit_message(true, '', 'hello from PG transaction'); + ?? pg_logical_emit_message + ??------------------------- + ?(\d+/[0-9a-fA-F]+) + [my logical_msg_lsn=$1] + ??(1 row) + + !COMMIT; + ??COMMIT + +[shell electric] + ??Got a message from PG via logical replication: \ + %Electric.Postgres.LogicalReplication.Messages.Message{\ + transactional?: true, \ + lsn: #Lsn<$logical_msg_lsn>, \ + prefix: "", \ + content: "hello from PG transaction"} + ??Acknowledging $logical_msg_lsn + +[cleanup] + [invoke teardown]