Skip to content

Commit

Permalink
Merge branch '23-OSSCA-python-mysql-replication-feat/footer-crc32'
Browse files Browse the repository at this point in the history
  • Loading branch information
julien-duponchelle committed Sep 1, 2023
2 parents edd4ff6 + a2896ac commit 460a702
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 27 deletions.
8 changes: 6 additions & 2 deletions pymysqlreplication/binlogstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ def __init__(self, connection_settings, server_id,
slave_heartbeat=None,
is_mariadb=False,
annotate_rows_event=False,
ignore_decode_errors=False):
ignore_decode_errors=False,
verify_checksum=False,):
"""
Attributes:
ctl_connection_settings: Connection settings for cluster holding
Expand Down Expand Up @@ -184,6 +185,7 @@ def __init__(self, connection_settings, server_id,
used with 'is_mariadb'
ignore_decode_errors: If true, any decode errors encountered
when reading column data will be ignored.
verify_checksum: If true, verify events read from the binary log by examining checksums.
"""

self.__connection_settings = connection_settings
Expand All @@ -206,6 +208,7 @@ def __init__(self, connection_settings, server_id,
only_events, ignored_events, filter_non_implemented_events)
self.__fail_on_table_metadata_unavailable = fail_on_table_metadata_unavailable
self.__ignore_decode_errors = ignore_decode_errors
self.__verify_checksum = verify_checksum

# We can't filter on packet level TABLE_MAP and rotate event because
# we need them for handling other operations
Expand Down Expand Up @@ -535,7 +538,8 @@ def fetchone(self):
self.__ignored_schemas,
self.__freeze_schema,
self.__fail_on_table_metadata_unavailable,
self.__ignore_decode_errors)
self.__ignore_decode_errors,
self.__verify_checksum,)

if binlog_event.event_type == ROTATE_EVENT:
self.log_pos = binlog_event.event.position
Expand Down
62 changes: 40 additions & 22 deletions pymysqlreplication/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import struct
import datetime
import decimal
import zlib

from pymysqlreplication.constants.STATUS_VAR_KEY import *
from pymysqlreplication.exceptions import StatusVariableMismatch
from typing import Union, Optional
Expand All @@ -18,7 +20,8 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection,
ignored_schemas=None,
freeze_schema=False,
fail_on_table_metadata_unavailable=False,
ignore_decode_errors=False):
ignore_decode_errors=False,
verify_checksum=False,):
self.packet = from_packet
self.table_map = table_map
self.event_type = self.packet.event_type
Expand All @@ -28,17 +31,32 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection,
self.mysql_version = mysql_version
self._fail_on_table_metadata_unavailable = fail_on_table_metadata_unavailable
self._ignore_decode_errors = ignore_decode_errors
self._verify_checksum = verify_checksum
self._is_event_valid = None
# The event have been fully processed, if processed is false
# the event will be skipped
self._processed = True
self.complete = True
self._verify_event()

def _read_table_id(self):
# Table ID is 6 byte
# pad little-endian number
table_id = self.packet.read(6) + b"\x00\x00"
return struct.unpack('<Q', table_id)[0]

def _verify_event(self):
if not self._verify_checksum:
return

self.packet.rewind(1)
data = self.packet.read(19 + self.event_size)
footer = self.packet.read(4)
byte_data = zlib.crc32(data).to_bytes(4, byteorder='little')
self._is_event_valid = True if byte_data == footer else False
self.packet.read_bytes -= (19 + self.event_size + 4)
self.packet.rewind(20)

def dump(self):
print("=== %s ===" % (self.__class__.__name__))
print("Date: %s" % (datetime.datetime.utcfromtimestamp(self.timestamp)
Expand All @@ -57,7 +75,7 @@ class GtidEvent(BinLogEvent):
"""
GTID change in binlog event
For more information: `[GTID] <https://mariadb.com/kb/en/gtid/>`_ `[see also] <https://dev.mysql.com/doc/dev/mysql-server/latest/classbinary__log_1_1Gtid__event.html>`_
For more information: `[GTID] <https://mariadb.com/kb/en/gtid/>`_ `[see also] <https://dev.mysql.com/doc/dev/mysql-server/latest/classbinary__log_1_1Gtid__event.html>`_
:ivar commit_flag: 1byte - 00000001 = Transaction may have changes logged with SBR.
In 5.6, 5.7.0-5.7.18, and 8.0.0-8.0.1, this flag is always set. Starting in 5.7.19 and 8.0.2, this flag is cleared if the transaction only contains row events. It is set if any part of the transaction is written in statement format.
Expand Down Expand Up @@ -152,8 +170,8 @@ def _dump(self):

class MariadbAnnotateRowsEvent(BinLogEvent):
"""
Annotate rows event
If you want to check this binlog, change the value of the flag(line 382 of the 'binlogstream.py') option to 2
Annotate rows event
If you want to check this binlog, change the value of the flag(line 382 of the 'binlogstream.py') option to 2
https://mariadb.com/kb/en/annotate_rows_event/
:ivar sql_statement: str - The SQL statement
Expand Down Expand Up @@ -229,7 +247,7 @@ class XAPrepareEvent(BinLogEvent):
Like Xid_event, it contains XID of the **prepared** transaction.
For more information: `[see details] <https://dev.mysql.com/doc/refman/8.0/en/xa-statements.html>`_.
:ivar one_phase: current XA transaction commit method
:ivar xid_format_id: a number that identifies the format used by the gtrid and bqual values
:ivar xid: serialized XID representation of XA transaction (xid_gtrid + xid_bqual)
Expand Down Expand Up @@ -260,14 +278,14 @@ def _dump(self):
class FormatDescriptionEvent(BinLogEvent):
"""
Represents a Format Description Event in the MySQL binary log.
This event is written at the start of a binary log file for binlog version 4.
It provides the necessary information to decode subsequent events in the file.
:ivar binlog_version: int - Version of the binary log format.
:ivar mysql_version_str: str - Server's MySQL version in string format.
"""

def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
super().__init__(from_packet, event_size, table_map,
ctl_connection, **kwargs)
Expand All @@ -289,7 +307,7 @@ class XidEvent(BinLogEvent):
"""
A COMMIT event generated when COMMIT of a transaction that modifies one or more tables of an XA-capable storage engine occurs.
For more information: `[see details] <https://mariadb.com/kb/en/xid_event/>`_.
For more information: `[see details] <https://mariadb.com/kb/en/xid_event/>`_.
:ivar xid: uint - Transaction ID for 2 Phase Commit.
"""
Expand All @@ -312,18 +330,18 @@ class HeartbeatLogEvent(BinLogEvent):
`[see MASTER_HEARTBEAT_PERIOD] <https://dev.mysql.com/doc/refman/8.0/en/change-master-to.html>`_.
A Mysql server also does it for each skipped events in the log.
This is because to make the slave bump its position so that
A Mysql server also does it for each skipped events in the log.
This is because to make the slave bump its position so that
if a disconnection occurs, the slave will only reconnects from the lasted skipped position. (Baloo's idea)
(see Binlog_sender::send_events in sql/rpl_binlog_sender.cc)
Warning:
That makes 106 bytes of data for skipped event in the binlog.
*this is also the case with GTID replication*.
To mitigate such behavior, you are expected to keep the binlog small
(see max_binlog_size, defaults to 1G).
In any case, the timestamp is 0 (as in 1970-01-01T00:00:00).
That makes 106 bytes of data for skipped event in the binlog.
*this is also the case with GTID replication*.
To mitigate such behavior, you are expected to keep the binlog small
(see max_binlog_size, defaults to 1G).
In any case, the timestamp is 0 (as in 1970-01-01T00:00:00).
:ivar ident: Name of the current binlog
"""
Expand Down Expand Up @@ -411,7 +429,7 @@ def _read_status_vars_value_for_key(self, key):
elif key == Q_TIME_ZONE_CODE: # 0x05
time_zone_len = self.packet.read_uint8()
if time_zone_len:
self.time_zone = self.packet.read(time_zone_len)
self.time_zone = self.packet.read(time_zone_len)
elif key == Q_CATALOG_NZ_CODE: # 0x06
catalog_len = self.packet.read_uint8()
if catalog_len:
Expand Down Expand Up @@ -545,8 +563,8 @@ class IntvarEvent(BinLogEvent):
"""
Stores the value of auto-increment variables.
This event will be created just before a QueryEvent.
:ivar type: int - 1 byte identifying the type of variable stored.
:ivar type: int - 1 byte identifying the type of variable stored.
Can be either LAST_INSERT_ID_EVENT (1) or INSERT_ID_EVENT (2).
:ivar value: int - The value of the variable
"""
Expand Down Expand Up @@ -743,10 +761,10 @@ def _dump(self) -> None:

class MariadbStartEncryptionEvent(BinLogEvent):
"""
Since MariaDB 10.1.7,
the START_ENCRYPTION event is written to every binary log file
if encrypt_binlog is set to ON. Prior to enabling this setting,
additional configuration steps are required in MariaDB.
Since MariaDB 10.1.7,
the START_ENCRYPTION event is written to every binary log file
if encrypt_binlog is set to ON. Prior to enabling this setting,
additional configuration steps are required in MariaDB.
(Link: https://mariadb.com/kb/en/encrypting-binary-logs/)
This event is written just once, after the Format Description event
Expand Down
7 changes: 5 additions & 2 deletions pymysqlreplication/packet.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ def __init__(self, from_packet, table_map,
ignored_schemas,
freeze_schema,
fail_on_table_metadata_unavailable,
ignore_decode_errors):
ignore_decode_errors,
verify_checksum,):
# -1 because we ignore the ok byte
self.read_bytes = 0
# Used when we want to override a value in the data buffer
Expand Down Expand Up @@ -135,6 +136,7 @@ def __init__(self, from_packet, table_map,
if use_checksum:
event_size_without_header = self.event_size - 23
else:
verify_checksum = False
event_size_without_header = self.event_size - 19

self.event = None
Expand All @@ -151,7 +153,8 @@ def __init__(self, from_packet, table_map,
ignored_schemas=ignored_schemas,
freeze_schema=freeze_schema,
fail_on_table_metadata_unavailable=fail_on_table_metadata_unavailable,
ignore_decode_errors=ignore_decode_errors)
ignore_decode_errors=ignore_decode_errors,
verify_checksum=verify_checksum)
if self.event._processed == False:
self.event = None

Expand Down
54 changes: 53 additions & 1 deletion pymysqlreplication/tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@
from pymysqlreplication.event import *
from pymysqlreplication.constants.BINLOG import *
from pymysqlreplication.row_event import *
from pymysqlreplication.packet import BinLogPacketWrapper
from pymysql.protocol import MysqlPacket

__all__ = ["TestBasicBinLogStreamReader", "TestMultipleRowBinLogStreamReader", "TestCTLConnectionSettings", "TestGtidBinLogStreamReader", "TestMariadbBinlogStreamReader", "TestStatementConnectionSetting", "TestRowsQueryLogEvents"]
__all__ = ["TestBasicBinLogStreamReader", "TestMultipleRowBinLogStreamReader", "TestCTLConnectionSettings",
"TestGtidBinLogStreamReader", "TestMariadbBinlogStreamReader", "TestStatementConnectionSetting",
"TestRowsQueryLogEvents"]


class TestBasicBinLogStreamReader(base.PyMySQLReplicationTestCase):
Expand Down Expand Up @@ -522,6 +526,54 @@ def test_end_log_pos(self):
self.assertEqual(last_log_pos, 888)
self.assertEqual(last_event_type, TABLE_MAP_EVENT)

def test_event_validation(self):
def create_binlog_packet_wrapper(pkt):
return BinLogPacketWrapper(pkt, self.stream.table_map,
self.stream._ctl_connection, self.stream.mysql_version,
self.stream._BinLogStreamReader__use_checksum,
self.stream._BinLogStreamReader__allowed_events_in_packet,
self.stream._BinLogStreamReader__only_tables,
self.stream._BinLogStreamReader__ignored_tables,
self.stream._BinLogStreamReader__only_schemas,
self.stream._BinLogStreamReader__ignored_schemas,
self.stream._BinLogStreamReader__freeze_schema,
self.stream._BinLogStreamReader__fail_on_table_metadata_unavailable,
self.stream._BinLogStreamReader__ignore_decode_errors,
self.stream._BinLogStreamReader__verify_checksum,)
self.stream.close()
self.stream = BinLogStreamReader(
self.database,
server_id=1024,
blocking=False,
verify_checksum=True
)
# For event data, refer to the official document example data of mariaDB.
# https://mariadb.com/kb/en/query_event/#example-with-crc32
correct_event_data = (
# OK value
b"\x00"
# Header
b"q\x17(Z\x02\x8c'\x00\x00U\x00\x00\x00\x01\t\x00\x00\x00\x00"
# Content
b"f\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x1a\x00"
b"\x00\x00\x00\x00\x00\x01\x00\x00\x00P\x00\x00"
b"\x00\x00\x06\x03std\x04\x08\x00\x08\x00\x08\x00\x00"
b"TRUNCATE TABLE test.t4"
# CRC 32, 4 Bytes
b"Ji\x9e\xed"
)
# Assume a bit flip occurred while data was being transmitted q(1001000) -> U(0110111)
modified_byte = b"U"
wrong_event_data = correct_event_data[:1] + modified_byte + correct_event_data[2:]

packet = MysqlPacket(correct_event_data, 0)
wrong_packet = MysqlPacket(wrong_event_data, 0)
self.stream.fetchone() # for '_ctl_connection' parameter
binlog_event = create_binlog_packet_wrapper(packet)
wrong_event = create_binlog_packet_wrapper(wrong_packet)
self.assertEqual(binlog_event.event._is_event_valid, True)
self.assertNotEqual(wrong_event.event._is_event_valid, True)


class TestMultipleRowBinLogStreamReader(base.PyMySQLReplicationTestCase):
def ignoredEvents(self):
Expand Down

0 comments on commit 460a702

Please sign in to comment.