Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18003 add test to make sure Admin#deleteRecords can handle the corrupted records #6

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
13f497f
KAFKA-18009: Remove extra public constructor for KafkaShareConsumer (…
AndrewJSchofield Nov 14, 2024
e4b8644
KAFKA-17992 Remove getUnderlying and isKRaftTest from ClusterInstance…
Yunyung Nov 14, 2024
1819902
KAFKA-17995: Fix errors in remote segment cleanup when retention.ms i…
FrankYang0529 Nov 14, 2024
6147a31
KAFKA-17888 Upgrade ZooKeeper version from 3.4.9 to 3.5.7 to avoid ZO…
m1a2st Nov 14, 2024
1834030
KAFKA-17510: Exception handling and purgatory completion on initializ…
apoorvmittal10 Nov 14, 2024
e9cd9c9
KAFKA-18006 Add 3.9.0 to end-to-end test (core, client) (#17797)
frankvicky Nov 14, 2024
401b347
MINOR: convert ProduceRequestTest to KRaft (#17780)
cmccabe Nov 14, 2024
6e9c56a
MINOR: convert GssapiAuthenticationTest to KRaft (#17786)
cmccabe Nov 14, 2024
e035f70
MINOR: convert SaslClientsWithInvalidCredentialsTest + MultipleListen…
cmccabe Nov 14, 2024
ed9cb08
KAFKA-17977 Remove new_consumer from E2E (#17798)
FrankYang0529 Nov 15, 2024
9dc4fca
MINOR: update the test.api README.md (#17809)
m1a2st Nov 15, 2024
f02c28b
KAFKA-17994 Checked exceptions are not handled (#17817)
mjsax Nov 15, 2024
84fe668
KAFKA-18006: Add 3.9.0 to end-to-end test (streams) (#17800)
frankvicky Nov 15, 2024
cc20e78
KAFKA-17648: AsyncKafkaConsumer#unsubscribe and #close swallow TopicA…
FrankYang0529 Nov 15, 2024
d8bfbb7
KAFKA-17791: Dockerfile should use requirements.txt for dependencies …
m1a2st Nov 15, 2024
77cc8ff
KAFKA-17948: Potential issue during tryComplete and onComplete simult…
adixitconfluent Nov 15, 2024
283d56c
KAFKA-17904: Flaky testMultiConsumerSessionTimeoutOnClose (#17789)
xijiu Nov 15, 2024
5725a51
KAFKA-16460: New consumer times out consuming records in multiple con…
FrankYang0529 Nov 15, 2024
a8f84ca
KAFKA-18001: Support UpdateRaftVoterRequest in KafkaNetworkChannel (#…
justinrlee Nov 15, 2024
dfa5aa5
KAFKA-18022: fetchOffsetMetadata handling for minBytes estimation in …
adixitconfluent Nov 16, 2024
72908c2
KAFKA-18003 add test to make sure `Admin#deleteRecords` can handle th…
brandboat Nov 17, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2697,6 +2697,7 @@ project(':streams') {
':streams:upgrade-system-tests-36:test',
':streams:upgrade-system-tests-37:test',
':streams:upgrade-system-tests-38:test',
':streams:upgrade-system-tests-39:test',
':streams:examples:test'
]
)
Expand Down Expand Up @@ -3244,6 +3245,21 @@ project(':streams:upgrade-system-tests-38') {
}
}

project(':streams:upgrade-system-tests-39') {
base {
archivesName = "kafka-streams-upgrade-system-tests-39"
}

dependencies {
testImplementation libs.kafkaStreams_39
testRuntimeOnly libs.junitJupiter
}

systemTestLibs {
dependsOn testJar
}
}

project(':jmh-benchmarks') {

apply plugin: 'io.github.goooler.shadow'
Expand Down
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@
files="ClientUtils.java"/>

<suppress checks="ClassDataAbstractionCoupling"
files="(KafkaConsumer|ConsumerCoordinator|AbstractFetch|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|Admin|KafkaAdminClient|MockAdminClient|KafkaRaftClient|KafkaRaftClientTest).java"/>
files="(KafkaConsumer|ConsumerCoordinator|AbstractFetch|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|Admin|KafkaAdminClient|MockAdminClient|KafkaRaftClient|KafkaRaftClientTest|KafkaNetworkChannelTest).java"/>
<suppress checks="ClassDataAbstractionCoupling"
files="(Errors|SaslAuthenticatorTest|AgentTest|CoordinatorTest|NetworkClientTest).java"/>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ public KafkaShareConsumer(Map<String, Object> configs,
keyDeserializer, valueDeserializer);
}

public KafkaShareConsumer(ConsumerConfig config,
KafkaShareConsumer(ConsumerConfig config,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer) {
delegate = CREATOR.create(config, keyDeserializer, valueDeserializer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,10 @@ void maybeReconcile() {
revokedPartitions
);

// Mark partitions as pending revocation to stop fetching from the partitions (no new
// fetches sent out, and no in-flight fetches responses processed).
markPendingRevocationToPauseFetching(revokedPartitions);

// Commit offsets if auto-commit enabled before reconciling a new assignment. Request will
// be retried until it succeeds, fails with non-retriable error, or timer expires.
CompletableFuture<Void> commitResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,12 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
Expand Down Expand Up @@ -1571,10 +1573,12 @@ public void unsubscribe() {
subscriptions.assignedPartitions());

try {
// If users subscribe to an invalid topic name, they will get InvalidTopicException in error events,
// because network thread keeps trying to send MetadataRequest in the background.
// Ignore it to avoid unsubscribe failed.
processBackgroundEvents(unsubscribeEvent.future(), timer, e -> e instanceof InvalidTopicException);
// If users subscribe to a topic with invalid name or without permission, they will get some exceptions.
// Because network thread keeps trying to send MetadataRequest or ConsumerGroupHeartbeatRequest in the background,
// there will be some error events in the background queue.
// When running unsubscribe, these exceptions should be ignored, or users can't unsubscribe successfully.
processBackgroundEvents(unsubscribeEvent.future(), timer,
e -> e instanceof InvalidTopicException || e instanceof TopicAuthorizationException || e instanceof GroupAuthorizationException);
log.info("Unsubscribed all topics or patterns and assigned partitions");
} catch (TimeoutException e) {
log.error("Failed while waiting for the unsubscribe event to complete");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
Expand Down Expand Up @@ -282,6 +283,23 @@ public void testCloseWithInvalidTopicException() {
assertDoesNotThrow(() -> consumer.close());
}

@Test
public void testUnsubscribeWithTopicAuthorizationException() {
consumer = newConsumer();
backgroundEventQueue.add(new ErrorEvent(new TopicAuthorizationException(Set.of("test-topic"))));
completeUnsubscribeApplicationEventSuccessfully();
assertDoesNotThrow(() -> consumer.unsubscribe());
assertDoesNotThrow(() -> consumer.close());
}

@Test
public void testCloseWithTopicAuthorizationException() {
consumer = newConsumer();
backgroundEventQueue.add(new ErrorEvent(new TopicAuthorizationException(Set.of("test-topic"))));
completeUnsubscribeApplicationEventSuccessfully();
assertDoesNotThrow(() -> consumer.close());
}

@Test
public void testCommitAsyncWithNullCallback() {
consumer = newConsumer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.InOrder;

import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -86,6 +87,7 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
Expand Down Expand Up @@ -1433,6 +1435,7 @@ public void testReconcilePartitionsRevokedNoAutoCommitNoCallbacks() {
membershipManager.poll(time.milliseconds());

testRevocationOfAllPartitionsCompleted(membershipManager);
verify(subscriptionState, times(2)).markPendingRevocation(Set.of(new TopicPartition("topic1", 0)));
}

@Test
Expand All @@ -1456,6 +1459,10 @@ public void testReconcilePartitionsRevokedWithSuccessfulAutoCommitNoCallbacks()

// Complete commit request
commitResult.complete(null);
InOrder inOrder = inOrder(subscriptionState, commitRequestManager);
inOrder.verify(subscriptionState).markPendingRevocation(Set.of(new TopicPartition("topic1", 0)));
inOrder.verify(commitRequestManager).maybeAutoCommitSyncBeforeRevocation(anyLong());
inOrder.verify(subscriptionState).markPendingRevocation(Set.of(new TopicPartition("topic1", 0)));

testRevocationOfAllPartitionsCompleted(membershipManager);
}
Expand All @@ -1480,6 +1487,7 @@ public void testReconcilePartitionsRevokedWithFailedAutoCommitCompletesRevocatio
// Complete commit request
commitResult.completeExceptionally(new KafkaException("Commit request failed with " +
"non-retriable error"));
verify(subscriptionState, times(2)).markPendingRevocation(Set.of(new TopicPartition("topic1", 0)));

testRevocationOfAllPartitionsCompleted(membershipManager);
}
Expand Down Expand Up @@ -1579,11 +1587,11 @@ public void testRevokePartitionsUsesTopicNamesLocalCacheWhenMetadataNotAvailable
mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId, topicName, Collections.emptyList());

// Member received assignment to reconcile;

receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);

verifyReconciliationNotTriggered(membershipManager);
membershipManager.poll(time.milliseconds());
verify(subscriptionState).markPendingRevocation(Set.of());

// Member should complete reconciliation
assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
Expand All @@ -1607,6 +1615,7 @@ public void testRevokePartitionsUsesTopicNamesLocalCacheWhenMetadataNotAvailable
receiveAssignment(topicId, Collections.singletonList(1), membershipManager);

membershipManager.poll(time.milliseconds());
verify(subscriptionState, times(2)).markPendingRevocation(Set.of(new TopicPartition(topicName, 0)));

// Revocation should complete without requesting any metadata update given that the topic
// received in target assignment should exist in local topic name cache.
Expand Down Expand Up @@ -2551,7 +2560,6 @@ private void testRevocationCompleted(ConsumerMembershipManager membershipManager
assertEquals(assignmentByTopicId, membershipManager.currentAssignment().partitions);
assertFalse(membershipManager.reconciliationInProgress());

verify(subscriptionState).markPendingRevocation(anySet());
List<TopicPartition> expectedTopicPartitionAssignment =
buildTopicPartitions(expectedCurrentAssignment);
HashSet<TopicPartition> expectedSet = new HashSet<>(expectedTopicPartitionAssignment);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1100,6 +1100,7 @@ public void testRevokePartitionsUsesTopicNamesLocalCacheWhenMetadataNotAvailable

verifyReconciliationNotTriggered(membershipManager);
membershipManager.poll(time.milliseconds());
verify(subscriptionState).markPendingRevocation(Set.of());

// Member should complete reconciliation
assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
Expand All @@ -1123,6 +1124,7 @@ public void testRevokePartitionsUsesTopicNamesLocalCacheWhenMetadataNotAvailable
receiveAssignment(topicId, Collections.singletonList(1), membershipManager);

membershipManager.poll(time.milliseconds());
verify(subscriptionState, times(2)).markPendingRevocation(Set.of(new TopicPartition(topicName, 0)));

// Revocation should complete without requesting any metadata update given that the topic
// received in target assignment should exist in local topic name cache.
Expand Down Expand Up @@ -1423,7 +1425,6 @@ private void testRevocationCompleted(ShareMembershipManager membershipManager,
assertEquals(assignmentByTopicId, membershipManager.currentAssignment().partitions);
assertFalse(membershipManager.reconciliationInProgress());

verify(subscriptionState).markPendingRevocation(anySet());
List<TopicPartition> expectedTopicPartitionAssignment =
buildTopicPartitions(expectedCurrentAssignment);
HashSet<TopicPartition> expectedSet = new HashSet<>(expectedTopicPartitionAssignment);
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/java/kafka/log/remote/RemoteLogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -1399,8 +1399,9 @@ void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionE
}

private Optional<RetentionTimeData> buildRetentionTimeData(long retentionMs) {
return retentionMs > -1
? Optional.of(new RetentionTimeData(retentionMs, time.milliseconds() - retentionMs))
long cleanupUntilMs = time.milliseconds() - retentionMs;
return retentionMs > -1 && cleanupUntilMs >= 0
? Optional.of(new RetentionTimeData(retentionMs, cleanupUntilMs))
: Optional.empty();
}

Expand Down
Loading
Loading