Skip to content

Commit

Permalink
KAFKA-15435 Fix counts in MigrationManifest (apache#14342)
Browse files Browse the repository at this point in the history
Reviewers: Liu Zeyu <[email protected]>, Colin P. McCabe <[email protected]>
  • Loading branch information
mumrah authored Sep 6, 2023
1 parent eb39c95 commit 65e2ecf
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -718,14 +718,15 @@ public void run() throws Exception {
// Ignore sending RPCs to the brokers since we're no longer in the state.
if (checkDriverState(MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM)) {
if (image.highestOffsetAndEpoch().compareTo(migrationLeadershipState.offsetAndEpoch()) >= 0) {
log.trace("Sending RPCs to broker before moving to dual-write mode using " +
log.info("Sending RPCs to broker before moving to dual-write mode using " +
"at offset and epoch {}", image.highestOffsetAndEpoch());
propagator.sendRPCsToBrokersFromMetadataImage(image, migrationLeadershipState.zkControllerEpoch());
// Migration leadership state doesn't change since we're not doing any Zk writes.
transitionTo(MigrationDriverState.DUAL_WRITE);
} else {
log.trace("Ignoring using metadata image since migration leadership state is at a greater offset and epoch {}",
migrationLeadershipState.offsetAndEpoch());
log.info("Not sending metadata RPCs with current metadata image since does not contain the offset " +
"that was last written to ZK during the migration. Image offset {} is less than migration " +
"leadership state offset {}", image.highestOffsetAndEpoch(), migrationLeadershipState.offsetAndEpoch());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;

/**
Expand All @@ -51,7 +52,7 @@ public void acceptBatch(List<ApiMessageAndVersion> recordBatch) {
batches++;
recordBatch.forEach(apiMessageAndVersion -> {
MetadataRecordType type = MetadataRecordType.fromId(apiMessageAndVersion.message().apiKey());
counts.merge(type, 1, (__, count) -> count + 1);
counts.merge(type, 1, Integer::sum);
total++;
});
}
Expand All @@ -60,7 +61,8 @@ public MigrationManifest build() {
if (endTimeNanos == 0) {
endTimeNanos = time.nanoseconds();
}
return new MigrationManifest(total, batches, endTimeNanos - startTimeNanos, counts);
Map<MetadataRecordType, Integer> orderedCounts = new TreeMap<>(counts);
return new MigrationManifest(total, batches, endTimeNanos - startTimeNanos, orderedCounts);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.metadata.migration;

import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.Collections;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class MigrationManifestTest {
@Test
public void testEmpty() {
Time time = new MockTime();
MigrationManifest.Builder manifestBuilder = MigrationManifest.newBuilder(time);
MigrationManifest manifest = manifestBuilder.build();
assertEquals(0L, manifest.durationMs());
assertEquals(
"0 records were generated in 0 ms across 0 batches. The record types were {}",
manifest.toString());
}

@Test
public void testOneBatch() {
Time time = new MockTime();
MigrationManifest.Builder manifestBuilder = MigrationManifest.newBuilder(time);
manifestBuilder.acceptBatch(Arrays.asList(
new ApiMessageAndVersion(new TopicRecord(), (short) 0),
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
new ApiMessageAndVersion(new TopicRecord(), (short) 0),
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
new ApiMessageAndVersion(new ConfigRecord(), (short) 0),
new ApiMessageAndVersion(new ConfigRecord(), (short) 0)
));
MigrationManifest manifest = manifestBuilder.build();
assertEquals(0L, manifest.durationMs());
assertEquals(
"13 records were generated in 0 ms across 1 batches. The record types were {TOPIC_RECORD=2, PARTITION_RECORD=9, CONFIG_RECORD=2}",
manifest.toString()
);
}

@Test
public void testManyBatch() {
Time time = new MockTime();
MigrationManifest.Builder manifestBuilder = MigrationManifest.newBuilder(time);
manifestBuilder.acceptBatch(Arrays.asList(
new ApiMessageAndVersion(new TopicRecord(), (short) 0),
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
new ApiMessageAndVersion(new PartitionRecord(), (short) 0)
));
manifestBuilder.acceptBatch(Arrays.asList(
new ApiMessageAndVersion(new TopicRecord(), (short) 0),
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
new ApiMessageAndVersion(new ConfigRecord(), (short) 0)
));
manifestBuilder.acceptBatch(Collections.singletonList(
new ApiMessageAndVersion(new ConfigRecord(), (short) 0)
));
MigrationManifest manifest = manifestBuilder.build();
assertEquals(0L, manifest.durationMs());
assertEquals(
"13 records were generated in 0 ms across 3 batches. The record types were {TOPIC_RECORD=2, PARTITION_RECORD=9, CONFIG_RECORD=2}",
manifest.toString()
);
}
}

0 comments on commit 65e2ecf

Please sign in to comment.