Skip to content

Commit

Permalink
AKCORE-109: Support for ShareAcknowledge (apache#1257)
Browse files Browse the repository at this point in the history
* AKCORE-109: Support for ShareAcknowledge
  • Loading branch information
AndrewJSchofield authored May 30, 2024
1 parent 0b83c4a commit 0f26591
Show file tree
Hide file tree
Showing 13 changed files with 527 additions and 87 deletions.
4 changes: 2 additions & 2 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,10 @@

<!-- Clients tests -->
<suppress checks="ClassDataAbstractionCoupling"
files="(Sender|Fetcher|FetchRequestManager|OffsetFetcher|KafkaConsumer|KafkaShareConsumer|Metrics|RequestResponse|TransactionManager|KafkaAdminClient|Message|KafkaProducer)Test.java"/>
files="(Sender|Fetcher|FetchRequestManager|ShareConsumeRequestManager|OffsetFetcher|KafkaConsumer|KafkaShareConsumer|Metrics|RequestResponse|TransactionManager|KafkaAdminClient|Message|KafkaProducer)Test.java"/>

<suppress checks="ClassFanOutComplexity"
files="(ConsumerCoordinator|KafkaConsumer|KafkaShareConsumer|RequestResponse|Fetcher|FetchRequestManager|KafkaAdminClient|Message|KafkaProducer)Test.java"/>
files="(ConsumerCoordinator|KafkaConsumer|KafkaShareConsumer|RequestResponse|Fetcher|FetchRequestManager|ShareConsumeRequestManager|KafkaAdminClient|Message|KafkaProducer)Test.java"/>

<suppress checks="ClassFanOutComplexity"
files="MockAdminClient.java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class RequestManagers implements Closeable {
public final OffsetsRequestManager offsetsRequestManager;
public final TopicMetadataRequestManager topicMetadataRequestManager;
public final FetchRequestManager fetchRequestManager;
public final ShareFetchRequestManager shareFetchRequestManager;
public final Optional<ShareConsumeRequestManager> shareConsumeRequestManager;
private final List<Optional<? extends RequestManager>> entries;
private final IdempotentCloser closer = new IdempotentCloser();

Expand All @@ -72,7 +72,7 @@ public RequestManagers(LogContext logContext,
this.commitRequestManager = commitRequestManager;
this.topicMetadataRequestManager = topicMetadataRequestManager;
this.fetchRequestManager = fetchRequestManager;
this.shareFetchRequestManager = null;
this.shareConsumeRequestManager = null;
this.heartbeatRequestManager = heartbeatRequestManager;
this.shareHeartbeatRequestManager = Optional.empty();
this.membershipManager = membershipManager;
Expand All @@ -90,12 +90,12 @@ public RequestManagers(LogContext logContext,
}

public RequestManagers(LogContext logContext,
ShareFetchRequestManager shareFetchRequestManager,
ShareConsumeRequestManager shareConsumeRequestManager,
Optional<CoordinatorRequestManager> coordinatorRequestManager,
Optional<ShareHeartbeatRequestManager> shareHeartbeatRequestManager,
Optional<ShareMembershipManager> shareMembershipManager) {
this.log = logContext.logger(RequestManagers.class);
this.shareFetchRequestManager = shareFetchRequestManager;
this.shareConsumeRequestManager = Optional.of(shareConsumeRequestManager);
this.coordinatorRequestManager = coordinatorRequestManager;
this.commitRequestManager = Optional.empty();
this.heartbeatRequestManager = Optional.empty();
Expand All @@ -110,7 +110,7 @@ public RequestManagers(LogContext logContext,
list.add(coordinatorRequestManager);
list.add(shareHeartbeatRequestManager);
list.add(shareMembershipManager);
list.add(Optional.of(shareFetchRequestManager));
list.add(Optional.of(shareConsumeRequestManager));
entries = Collections.unmodifiableList(list);
}

Expand Down Expand Up @@ -299,19 +299,19 @@ protected RequestManagers create() {
shareMembershipManager,
backgroundEventHandler,
metrics);
ShareFetchRequestManager shareFetchRequestManager = new ShareFetchRequestManager(
ShareConsumeRequestManager shareConsumeRequestManager = new ShareConsumeRequestManager(
logContext,
groupRebalanceConfig.groupId,
metadata,
subscriptions,
fetchConfig,
fetchBuffer,
shareFetchMetricsManager);
shareMembershipManager.registerStateListener(shareFetchRequestManager);
shareMembershipManager.registerStateListener(shareConsumeRequestManager);

return new RequestManagers(
logContext,
shareFetchRequestManager,
shareConsumeRequestManager,
Optional.of(coordinator),
Optional.of(shareHeartbeatRequestManager),
Optional.of(shareMembershipManager)
Expand Down
Loading

0 comments on commit 0f26591

Please sign in to comment.