diff --git a/tests/rptest/clients/offline_log_viewer.py b/tests/rptest/clients/offline_log_viewer.py index 75df420b9b01..1754f6ed414f 100644 --- a/tests/rptest/clients/offline_log_viewer.py +++ b/tests/rptest/clients/offline_log_viewer.py @@ -37,6 +37,9 @@ def _json_cmd(self, node, suffix): self._redpanda.logger.error(f"Invalid JSON output: {json_out}") raise + def read_kafka_records(self, node, topic): + return self._json_cmd(node, f"--type kafka_records --topic {topic}") + def read_controller(self, node): return self._json_cmd(node, "--type controller") diff --git a/tests/rptest/tests/transactions_test.py b/tests/rptest/tests/transactions_test.py index 1b2448e34e41..b9d597a989ad 100644 --- a/tests/rptest/tests/transactions_test.py +++ b/tests/rptest/tests/transactions_test.py @@ -23,6 +23,7 @@ from ducktape.utils.util import wait_until from ducktape.errors import TimeoutError +from rptest.clients.offline_log_viewer import OfflineLogViewer from rptest.tests.redpanda_test import RedpandaTest from rptest.services.admin import Admin from rptest.services.redpanda import RedpandaService, SecurityConfig, SaslCredentials @@ -246,6 +247,12 @@ def simple_test(self): ), f'Records value does not match from input {consumed_from_input_topic[index_from_input].value()}, from output {record.value()}' index_from_input += 1 + log_viewer = OfflineLogViewer(self.redpanda) + for node in self.redpanda.started_nodes(): + records = log_viewer.read_kafka_records(node=node, + topic=self.input_t.name) + self.logger.info(f"Read {len(records)} from node {node.name}") + @cluster(num_nodes=3) def rejoin_member_test(self): self.generate_data(self.input_t, self.max_records) diff --git a/tools/offline_log_viewer/kafka.py b/tools/offline_log_viewer/kafka.py index 813c09a40907..2f91d935cba9 100644 --- a/tools/offline_log_viewer/kafka.py +++ b/tools/offline_log_viewer/kafka.py @@ -47,6 +47,14 @@ def spec(rdr, version): return rdr.read_envelope(spec, max_version=3) +def get_control_record_type(key): + rdr = Reader(BytesIO(key)) + rdr.skip(2) # skip the 16bit version. + # Encoded as big endian + type_rdr = Reader(BytesIO(struct.pack(">h", rdr.read_int16()))) + return KafkaControlRecordType(type_rdr.read_int16()).name + + def decode_archival_metadata_command(kr, vr): key = kr.read_int8() if key == ArchivalMetadataCommand.add_segment: @@ -82,7 +90,7 @@ def decode_record(batch, header, record): is_ctrl = attrs["control_batch"] is_tx_ctrl = is_txn and is_ctrl if is_tx_ctrl: - record_dict["type"] = self.get_control_record_type(record.key) + record_dict["type"] = get_control_record_type(record.key) kr = Reader(BytesIO(record.key)) vr = Reader(BytesIO(record.value)) @@ -102,14 +110,7 @@ def __init__(self, ntp, headers_only): self.ntp = ntp self.headers_only = headers_only - def get_control_record_type(self, key): - rdr = Reader(BytesIO(key)) - rdr.skip(2) # skip the 16bit version. - # Encoded as big endian - type_rdr = Reader(BytesIO(struct.pack(">h", rdr.read_int16()))) - return KafkaControlRecordType(type_rdr.read_int16()).name - - def decode(self): + def __iter__(self): self.results = [] for batch in self.batches(): header = batch.header_dict() diff --git a/tools/offline_log_viewer/viewer.py b/tools/offline_log_viewer/viewer.py index 3872ac745f95..64a40db79037 100644 --- a/tools/offline_log_viewer/viewer.py +++ b/tools/offline_log_viewer/viewer.py @@ -68,8 +68,10 @@ def print_kafka(store, topic, headers_only): logger.info(f'topic: {ntp.topic}, partition: {ntp.partition}') log = KafkaLog(ntp, headers_only=headers_only) - for result in log.decode(): - logger.info(json.dumps(result, indent=2)) + json_iter = json.JSONEncoder(indent=2).iterencode( + SerializableGenerator(log)) + for record in json_iter: + print(record, end='') def print_groups(store):