Skip to content

Commit

Permalink
pg: recognize streaming prepared tx msg (#1071) (#1073)
Browse files Browse the repository at this point in the history
Co-authored-by: jchappelow <[email protected]>
  • Loading branch information
brennanjl and jchappelow authored Oct 22, 2024
1 parent f4aa5da commit f11e0f7
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 2 deletions.
2 changes: 1 addition & 1 deletion internal/sql/pg/repl.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ func decodeWALData(hasher hash.Hash, walData []byte, relations map[uint32]*pglog
hasher.Reset()
changesetWriter.fail() // discard changeset

// v2 Stream control messages. Not expected for kwil
// v2 Stream control messages. Only expected with large transactions.
case *pglogrepl.StreamStartMessageV2:
*inStream = true
logger.Warnf(" [msg] StreamStartMessageV2: xid %d, first segment? %d", logicalMsg.Xid, logicalMsg.FirstSegment)
Expand Down
5 changes: 4 additions & 1 deletion internal/sql/pg/repl_msgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@ const (
MessageTypeBeginPrepare pglogrepl.MessageType = 'b'
MessageTypeCommitPrepared pglogrepl.MessageType = 'K'
MessageTypeRollbackPrepared pglogrepl.MessageType = 'r'
MessageTypeStreamPrepare pglogrepl.MessageType = 'p'
)

// msgTypeToString is helpful for debugging, but normally unused.
func msgTypeToString(t pglogrepl.MessageType) string { //nolint:unused
switch t {
case MessageTypePrepare:
return "Prepare"
case MessageTypeStreamPrepare:
return "Stream Prepared"
case MessageTypeBeginPrepare:
return "Begin Prepared"
case MessageTypeCommitPrepared:
Expand All @@ -39,7 +42,7 @@ func parseV3(data []byte, inStream bool) (m pglogrepl.Message, err error) {
var decoder pglogrepl.MessageDecoder // v1 and v3 have same Decode signature (stream not relevant)

switch msgType {
case MessageTypePrepare:
case MessageTypePrepare, MessageTypeStreamPrepare: // same encoding
decoder = new(PrepareMessageV3)
case MessageTypeBeginPrepare:
decoder = new(BeginPrepareMessageV3)
Expand Down

0 comments on commit f11e0f7

Please sign in to comment.