Skip to content

Commit

Permalink
fixing bug when consuming from OffsetSpecification:Offset
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielePalaia committed Aug 8, 2024
1 parent b960884 commit 3550d36
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 1 deletion.
2 changes: 1 addition & 1 deletion protocol/src/commands/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl Command for SubscribeCommand {
}

#[cfg_attr(test, derive(fake::Dummy))]
#[derive(PartialEq, Eq, Debug)]
#[derive(PartialEq, Eq, Debug, Clone)]
pub enum OffsetSpecification {
First,
Last,
Expand Down
8 changes: 8 additions & 0 deletions src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub struct Consumer {
struct ConsumerInternal {
client: Client,
stream: String,
offset_specification: OffsetSpecification,
subscription_id: u8,
sender: Sender<Result<Delivery, ConsumerDeliveryError>>,
closed: Arc<AtomicBool>,
Expand Down Expand Up @@ -110,6 +111,7 @@ impl ConsumerBuilder {
subscription_id,
stream: stream.to_string(),
client: client.clone(),
offset_specification: self.offset_specification.clone(),
sender: tx,
closed: Arc::new(AtomicBool::new(false)),
waker: AtomicWaker::new(),
Expand Down Expand Up @@ -243,6 +245,12 @@ impl MessageHandler for ConsumerMessageHandler {
let len = delivery.messages.len();
trace!("Got delivery with messages {}", len);
for message in delivery.messages {
if let OffsetSpecification::Offset(offset_) = self.0.offset_specification {
if offset_ > offset {
offset += 1;
continue;
}
}
let _ = self
.0
.sender
Expand Down
59 changes: 59 additions & 0 deletions tests/integration/consumer_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,65 @@ async fn consumer_test() {
producer.close().await.unwrap();
}

#[tokio::test(flavor = "multi_thread")]
async fn consumer_test_offset_specification_offset() {
let env = TestEnvironment::create().await;
let reference: String = Faker.fake();
let mut messages_received = 0;
let mut first_offset = 0;

let message_count = 10;
let mut producer = env
.env
.producer()
.name(&reference)
.build(&env.stream)
.await
.unwrap();

let mut consumer = env
.env
.consumer()
.offset(OffsetSpecification::Offset(5))
.build(&env.stream)
.await
.unwrap();

for n in 0..message_count - 1 {
let _ = producer
.send_with_confirm(Message::builder().body(format!("message{}", n)).build())
.await
.unwrap();
}

let _ = producer
.send_with_confirm(
Message::builder()
.body(format!("marker{}", message_count - 1))
.build(),
)
.await
.unwrap();

while let Some(delivery) = consumer.next().await {
let d = delivery.unwrap();
if first_offset == 0 {
first_offset = d.offset();
}
messages_received += 1;

if String::from_utf8_lossy(d.message().data().unwrap()).contains("marker") {
break;
}
}

consumer.handle().close().await.unwrap();
producer.close().await.unwrap();

assert!(first_offset == 5);
assert!(messages_received == 5);
}

#[tokio::test(flavor = "multi_thread")]
async fn consumer_close_test() {
let env = TestEnvironment::create().await;
Expand Down

0 comments on commit 3550d36

Please sign in to comment.