Skip to content

Commit

Permalink
feat: Support configure startMessageIdInclusive for the reader (#157)
Browse files Browse the repository at this point in the history
  • Loading branch information
RobertIndie authored Oct 19, 2023
1 parent 8d77f74 commit 995e491
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 2 deletions.
7 changes: 6 additions & 1 deletion pulsar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -966,7 +966,8 @@ def create_reader(self, topic, start_message_id,
reader_name=None,
subscription_role_prefix=None,
is_read_compacted=False,
crypto_key_reader=None
crypto_key_reader=None,
start_message_id_inclusive=False
):
"""
Create a reader on a particular topic
Expand Down Expand Up @@ -1025,6 +1026,8 @@ def my_listener(reader, message):
crypto_key_reader: CryptoKeyReader, optional
Symmetric encryption class implementation, configuring public key encryption messages for the producer
and private key decryption messages for the consumer
start_message_id_inclusive: bool, default=False
Set the reader to include the startMessageId or given position of any reset operation like Reader.seek
"""

# If a pulsar.MessageId object is passed, access the _pulsar.MessageId object
Expand All @@ -1039,6 +1042,7 @@ def my_listener(reader, message):
_check_type_or_none(str, subscription_role_prefix, 'subscription_role_prefix')
_check_type(bool, is_read_compacted, 'is_read_compacted')
_check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
_check_type(bool, start_message_id_inclusive, 'start_message_id_inclusive')

conf = _pulsar.ReaderConfiguration()
if reader_listener:
Expand All @@ -1052,6 +1056,7 @@ def my_listener(reader, message):
conf.read_compacted(is_read_compacted)
if crypto_key_reader:
conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
conf.start_message_id_inclusive(start_message_id_inclusive)

c = Reader()
c._reader = self._client.create_reader(topic, start_message_id, conf)
Expand Down
4 changes: 3 additions & 1 deletion src/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -318,5 +318,7 @@ void export_config(py::module_& m) {
.def("subscription_role_prefix", &ReaderConfiguration::setSubscriptionRolePrefix)
.def("read_compacted", &ReaderConfiguration::isReadCompacted)
.def("read_compacted", &ReaderConfiguration::setReadCompacted)
.def("crypto_key_reader", &ReaderConfiguration::setCryptoKeyReader, return_value_policy::reference);
.def("crypto_key_reader", &ReaderConfiguration::setCryptoKeyReader, return_value_policy::reference)
.def("start_message_id_inclusive", &ReaderConfiguration::isStartMessageIdInclusive)
.def("start_message_id_inclusive", &ReaderConfiguration::setStartMessageIdInclusive, return_value_policy::reference);
}
36 changes: 36 additions & 0 deletions tests/pulsar_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#


import random
import threading
import logging
from unittest import TestCase, main
Expand Down Expand Up @@ -685,6 +686,41 @@ def test_reader_is_connected(self):
self.assertFalse(reader.is_connected())
client.close()

def test_reader_seek_for_message_id(self):
client = pulsar.Client(self.serviceUrl)

topic = "test-seek-for-message-id-" + str(int(time.time()))

producer = client.create_producer(topic)

readerExclusive = client.create_reader(topic, MessageId.latest)
readerInclusive = client.create_reader(topic, MessageId.latest, start_message_id_inclusive=True)

numMessages = 100
seekMessageId = None

r = random.randint(0, numMessages - 2)
for i in range(numMessages):
msg_content = b"msg-%d" % i
id = producer.send(msg_content)

if i == r:
seekMessageId = id

readerExclusive.seek(seekMessageId)
msg0 = readerExclusive.read_next(timeout_millis=3000)

readerInclusive.seek(seekMessageId)
msg1 = readerInclusive.read_next(timeout_millis=3000)

self.assertEqual(msg0.data(), b"msg-%d" % (r + 1))
self.assertEqual(msg1.data(), b"msg-%d" % r)

readerExclusive.close()
readerInclusive.close()
producer.close()
client.close()

def test_producer_sequence_after_reconnection(self):
# Enable deduplication on namespace
doHttpPost(self.adminUrl + "/admin/v2/namespaces/public/default/deduplication", "true")
Expand Down

0 comments on commit 995e491

Please sign in to comment.