Skip to content

Commit

Permalink
fix: integrate CD feedbacks
Browse files Browse the repository at this point in the history
Co-authored-by: Clément Doumouro <[email protected]>
  • Loading branch information
bamthomas and ClemDoum committed Aug 30, 2023
1 parent d4104d0 commit eba92e3
Showing 1 changed file with 4 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public void consumeEvents() {
public void consumeEvents(int nb) {
launchConsumer(channel, AbstractConsumer.this::handle, nb);}

public <Evt extends Event> void launchConsumer(AmqpChannel channel, EventHandler<Evt> eventHandler, final int nbEventsToConsume) {
public void launchConsumer(AmqpChannel channel, EventHandler<Evt> eventHandler, final int nbEventsToConsume) {
launchConsumer(channel, eventHandler, new Criteria() {
int nvReceivedEvents=0;
public void newEvent() { nvReceivedEvents++; }
Expand All @@ -50,15 +50,14 @@ public void cancel() throws IOException {
channel.rabbitMqChannel.basicCancel(consumerTag.getAndSet(null));
}

public <Evt extends Event> void launchConsumer(AmqpChannel channel, EventHandler<Evt> eventHandler) {
public void launchConsumer(AmqpChannel channel, EventHandler<Evt> eventHandler) {
launchConsumer(channel, eventHandler, new Criteria() {
public void newEvent() {}
public boolean isValid() { return true; }
});
}

@SuppressWarnings("unchecked")
private <Evt extends Event> void launchConsumer(AmqpChannel channel, EventHandler<Evt> eventHandler, Criteria criteria) {
private void launchConsumer(AmqpChannel channel, EventHandler<Evt> eventHandler, Criteria criteria) {
try {
logger.info("starting consuming events for {}", channel);
consumerTag.set(channel.rabbitMqChannel.basicConsume(channel.queue.name(), new DefaultConsumer(channel.rabbitMqChannel) {
Expand All @@ -68,7 +67,7 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
eventHandler.handle((Evt) deserialize(body));
channel.rabbitMqChannel.basicAck(envelope.getDeliveryTag(), false);
} catch (RuntimeException rex) {
channel.rabbitMqChannel.basicNack(envelope.getDeliveryTag(), true, false);
channel.rabbitMqChannel.basicNack(envelope.getDeliveryTag(), false, false);
}
criteria.newEvent();
if (!criteria.isValid()) {
Expand Down

0 comments on commit eba92e3

Please sign in to comment.