Skip to content

Commit

Permalink
Add more debug info to processing of logical message offsets
Browse files Browse the repository at this point in the history
If we see transient errors caused by the violation of our assumptions
about WAL offset's monotonic growth, this should have debug that.
  • Loading branch information
alco committed Aug 7, 2024
1 parent ca87072 commit 5c58768
Showing 1 changed file with 7 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ defmodule Electric.Postgres.ReplicationClient do
) do
Logger.debug("XLogData: wal_start=#{wal_start}, wal_end=#{wal_end}")

state = update_received_wal(state, wal_end)
state = update_received_wal(:xlog_data, state, wal_start, wal_end)

rest
|> Decoder.decode()
Expand Down Expand Up @@ -185,7 +185,9 @@ defmodule Electric.Postgres.ReplicationClient do
end

def handle_data(<<@repl_msg_primary_keepalive, wal_end::64, _clock::64, reply>>, state) do
state = update_received_wal(state, wal_end)
Logger.debug("Primary Keepalive: wal_end=#{wal_end} reply=#{reply}")

state = update_received_wal(:keepalive, state, nil, wal_end)

messages =
case reply do
Expand Down Expand Up @@ -220,11 +222,11 @@ defmodule Electric.Postgres.ReplicationClient do
defp current_time(), do: System.os_time(:microsecond) - @epoch

# wal can be 0 if the incoming logical message is e.g. Relation.
defp update_received_wal(state, 0), do: state
defp update_received_wal(_step, state, _, 0), do: state

defp update_received_wal(%{received_wal: wal} = state, wal), do: state
defp update_received_wal(_step, %{received_wal: wal} = state, _, wal), do: state

defp update_received_wal(state, wal) when wal > state.received_wal,
defp update_received_wal(_step, state, _, wal) when wal > state.received_wal,
do: %{state | received_wal: wal}

defp update_applied_wal(state, wal) when wal > state.applied_wal,
Expand Down

0 comments on commit 5c58768

Please sign in to comment.