Skip to content

Commit

Permalink
Addressed review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
Krishna Kondaka committed Jan 30, 2024
1 parent 1be5171 commit a373071
Showing 1 changed file with 24 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,15 @@ private AcknowledgementSet createAcknowledgementSet(Map<TopicPartition, CommitOf
return acknowledgementSet;
}

<T> void consumeRecords() throws Exception {
try {
<T> ConsumerRecords<String, T> doPoll() throws Exception {
ConsumerRecords<String, T> records =
consumer.poll(Duration.ofMillis(topicConfig.getThreadWaitingTime().toMillis()/2));
return records;
}

<T> void consumeRecords() throws Exception {
try {
ConsumerRecords<String, T> records = doPoll();
if (Objects.nonNull(records) && !records.isEmpty() && records.count() > 0) {
Map<TopicPartition, CommitOffsetRange> offsets = new HashMap<>();
AcknowledgementSet acknowledgementSet = null;
Expand Down Expand Up @@ -413,20 +418,24 @@ private <T> Record<Event> getRecord(ConsumerRecord<String, T> consumerRecord, in
return new Record<Event>(event);
}

private void processRecord(final AcknowledgementSet acknowledgementSet, final Record<Event> record) {
private <T> void processRecord(final AcknowledgementSet acknowledgementSet, final Record<Event> record) {
// Always add record to acknowledgementSet before adding to
// buffer because another thread may take and process
// buffer contents before the event record is added
// to acknowledgement set
if (acknowledgementSet != null) {
acknowledgementSet.add(record.getData());
}
long numRetries = 0;
final int retrySleepTimeMs = 100;
// Donot pause until half the poll interval time has expired
final long maxRetries = topicConfig.getMaxPollInterval().toMillis() / (2 * retrySleepTimeMs);
while (true) {
try {
bufferAccumulator.add(record);
break;
} catch (Exception e) {
if (!paused) {
if (!paused && numRetries++ > maxRetries) {
paused = true;
consumer.pause(consumer.assignment());
}
Expand All @@ -436,8 +445,17 @@ private void processRecord(final AcknowledgementSet acknowledgementSet, final Re
LOG.debug("Error while adding record to buffer, retrying ", e);
}
try {
Thread.sleep(100);
consumeRecords();
Thread.sleep(retrySleepTimeMs);
if (paused) {
ConsumerRecords<String, T> records = doPoll();
if (records.count() > 0) {
LOG.debug("Unexpected records received while the consumer is paused. Resetting the paritions to retry from last read pointer");
synchronized(this) {
partitionsToReset.addAll(consumer.assignment());
};
break;
}
}
} catch (Exception ex) {} // ignore the exception because it only means the thread slept for shorter time
}
}
Expand Down

0 comments on commit a373071

Please sign in to comment.