Skip to content

Commit

Permalink
offline_log_viewer: fix get_control_record_type
Browse files Browse the repository at this point in the history
Noticed this was broken while parsing some logs.
Added a little regression test for the parsing
  • Loading branch information
bharathv committed May 8, 2024
1 parent 60e6c3d commit fcc4058
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 11 deletions.
3 changes: 3 additions & 0 deletions tests/rptest/clients/offline_log_viewer.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ def _json_cmd(self, node, suffix):
self._redpanda.logger.error(f"Invalid JSON output: {json_out}")
raise

def read_kafka_records(self, node, topic):
return self._json_cmd(node, f"--type kafka_records --topic {topic}")

def read_controller(self, node):
return self._json_cmd(node, "--type controller")

Expand Down
7 changes: 7 additions & 0 deletions tests/rptest/tests/transactions_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from ducktape.utils.util import wait_until
from ducktape.errors import TimeoutError

from rptest.clients.offline_log_viewer import OfflineLogViewer
from rptest.tests.redpanda_test import RedpandaTest
from rptest.services.admin import Admin
from rptest.services.redpanda import RedpandaService, SecurityConfig, SaslCredentials
Expand Down Expand Up @@ -246,6 +247,12 @@ def simple_test(self):
), f'Records value does not match from input {consumed_from_input_topic[index_from_input].value()}, from output {record.value()}'
index_from_input += 1

log_viewer = OfflineLogViewer(self.redpanda)
for node in self.redpanda.started_nodes():
records = log_viewer.read_kafka_records(node=node,
topic=self.input_t.name)
self.logger.info(f"Read {len(records)} from node {node.name}")

@cluster(num_nodes=3)
def rejoin_member_test(self):
self.generate_data(self.input_t, self.max_records)
Expand Down
19 changes: 10 additions & 9 deletions tools/offline_log_viewer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ def spec(rdr, version):
return rdr.read_envelope(spec, max_version=3)


def get_control_record_type(key):
rdr = Reader(BytesIO(key))
rdr.skip(2) # skip the 16bit version.
# Encoded as big endian
type_rdr = Reader(BytesIO(struct.pack(">h", rdr.read_int16())))
return KafkaControlRecordType(type_rdr.read_int16()).name


def decode_archival_metadata_command(kr, vr):
key = kr.read_int8()
if key == ArchivalMetadataCommand.add_segment:
Expand Down Expand Up @@ -82,7 +90,7 @@ def decode_record(batch, header, record):
is_ctrl = attrs["control_batch"]
is_tx_ctrl = is_txn and is_ctrl
if is_tx_ctrl:
record_dict["type"] = self.get_control_record_type(record.key)
record_dict["type"] = get_control_record_type(record.key)

kr = Reader(BytesIO(record.key))
vr = Reader(BytesIO(record.value))
Expand All @@ -102,14 +110,7 @@ def __init__(self, ntp, headers_only):
self.ntp = ntp
self.headers_only = headers_only

def get_control_record_type(self, key):
rdr = Reader(BytesIO(key))
rdr.skip(2) # skip the 16bit version.
# Encoded as big endian
type_rdr = Reader(BytesIO(struct.pack(">h", rdr.read_int16())))
return KafkaControlRecordType(type_rdr.read_int16()).name

def decode(self):
def __iter__(self):
self.results = []
for batch in self.batches():
header = batch.header_dict()
Expand Down
6 changes: 4 additions & 2 deletions tools/offline_log_viewer/viewer.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,10 @@ def print_kafka(store, topic, headers_only):

logger.info(f'topic: {ntp.topic}, partition: {ntp.partition}')
log = KafkaLog(ntp, headers_only=headers_only)
for result in log.decode():
logger.info(json.dumps(result, indent=2))
json_iter = json.JSONEncoder(indent=2).iterencode(
SerializableGenerator(log))
for record in json_iter:
print(record, end='')


def print_groups(store):
Expand Down

0 comments on commit fcc4058

Please sign in to comment.