diff --git a/pulsar/__init__.py b/pulsar/__init__.py index eec603a..d3b081d 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -966,7 +966,8 @@ def create_reader(self, topic, start_message_id, reader_name=None, subscription_role_prefix=None, is_read_compacted=False, - crypto_key_reader=None + crypto_key_reader=None, + start_message_id_inclusive=False ): """ Create a reader on a particular topic @@ -1025,6 +1026,8 @@ def my_listener(reader, message): crypto_key_reader: CryptoKeyReader, optional Symmetric encryption class implementation, configuring public key encryption messages for the producer and private key decryption messages for the consumer + start_message_id_inclusive: bool, default=False + Set the reader to include the startMessageId or given position of any reset operation like Reader.seek """ # If a pulsar.MessageId object is passed, access the _pulsar.MessageId object @@ -1039,6 +1042,7 @@ def my_listener(reader, message): _check_type_or_none(str, subscription_role_prefix, 'subscription_role_prefix') _check_type(bool, is_read_compacted, 'is_read_compacted') _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader') + _check_type(bool, start_message_id_inclusive, 'start_message_id_inclusive') conf = _pulsar.ReaderConfiguration() if reader_listener: @@ -1052,6 +1056,7 @@ def my_listener(reader, message): conf.read_compacted(is_read_compacted) if crypto_key_reader: conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader) + conf.start_message_id_inclusive(start_message_id_inclusive) c = Reader() c._reader = self._client.create_reader(topic, start_message_id, conf) diff --git a/src/config.cc b/src/config.cc index ac643b7..bada39b 100644 --- a/src/config.cc +++ b/src/config.cc @@ -318,5 +318,7 @@ void export_config(py::module_& m) { .def("subscription_role_prefix", &ReaderConfiguration::setSubscriptionRolePrefix) .def("read_compacted", &ReaderConfiguration::isReadCompacted) .def("read_compacted", &ReaderConfiguration::setReadCompacted) - .def("crypto_key_reader", &ReaderConfiguration::setCryptoKeyReader, return_value_policy::reference); + .def("crypto_key_reader", &ReaderConfiguration::setCryptoKeyReader, return_value_policy::reference) + .def("start_message_id_inclusive", &ReaderConfiguration::isStartMessageIdInclusive) + .def("start_message_id_inclusive", &ReaderConfiguration::setStartMessageIdInclusive, return_value_policy::reference); } diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py index 067855d..5e228bd 100755 --- a/tests/pulsar_test.py +++ b/tests/pulsar_test.py @@ -19,6 +19,7 @@ # +import random import threading import logging from unittest import TestCase, main @@ -685,6 +686,41 @@ def test_reader_is_connected(self): self.assertFalse(reader.is_connected()) client.close() + def test_reader_seek_for_message_id(self): + client = pulsar.Client(self.serviceUrl) + + topic = "test-seek-for-message-id-" + str(int(time.time())) + + producer = client.create_producer(topic) + + readerExclusive = client.create_reader(topic, MessageId.latest) + readerInclusive = client.create_reader(topic, MessageId.latest, start_message_id_inclusive=True) + + numMessages = 100 + seekMessageId = None + + r = random.randint(0, numMessages - 2) + for i in range(numMessages): + msg_content = b"msg-%d" % i + id = producer.send(msg_content) + + if i == r: + seekMessageId = id + + readerExclusive.seek(seekMessageId) + msg0 = readerExclusive.read_next(timeout_millis=3000) + + readerInclusive.seek(seekMessageId) + msg1 = readerInclusive.read_next(timeout_millis=3000) + + self.assertEqual(msg0.data(), b"msg-%d" % (r + 1)) + self.assertEqual(msg1.data(), b"msg-%d" % r) + + readerExclusive.close() + readerInclusive.close() + producer.close() + client.close() + def test_producer_sequence_after_reconnection(self): # Enable deduplication on namespace doHttpPost(self.adminUrl + "/admin/v2/namespaces/public/default/deduplication", "true")