Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-1215: Rack-Aware replica assignment option #132

Closed
wants to merge 61 commits into from
Closed
Show file tree
Hide file tree
Changes from 55 commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
3cccc5d
KAFKA-1215: Rack-Aware replica assignment option
Aug 11, 2015
982c047
Merge branch 'trunk' of github.com:apache/kafka into KAFKA-1215
Aug 14, 2015
80be83c
Merge branch 'trunk' of github.com:apache/kafka into KAFKA-1215
Aug 21, 2015
3e75b60
Added Javadoc for RackLocator
Aug 21, 2015
233331a
Merge branch 'trunk' of github.com:apache/kafka into KAFKA-1215
Jan 29, 2016
f9481fb
First pass of adding rack as a configuration and a member for Broker.…
Feb 9, 2016
fdb88e7
Merge branch 'trunk' of github.com:apache/kafka into KAFKA-1215
Feb 9, 2016
4990a96
WIP: changed UpdateMetadataRequest to include rack. Command line tool…
Feb 20, 2016
b5f3c51
Merge and resolve conflicts
Feb 20, 2016
57eb1c6
Use NULLABLE_STRING for rack in UpdateMetadataRequest protocol. Make …
Feb 20, 2016
479b94c
Fix compilation failure in 2.11. Apply rack in MetadataCache. Add som…
Feb 21, 2016
a84a289
Fix NPE in client test org.apache.kafka.common.requests.RequestRespon…
Feb 21, 2016
ecaad6e
fix check style.
Feb 21, 2016
33a3953
Fixed the handling of broker JSON version in registration.
Feb 22, 2016
800eadd
Merge branch 'trunk' of github.com:apache/kafka into KAFKA-1215
Feb 29, 2016
4326b5d
Merge branch 'trunk' of github.com:apache/kafka into KAFKA-1215
Feb 29, 2016
b010c5c
Fix compilation failure after resolving conflicts.
Feb 29, 2016
b7f1a39
Rename RACK to RACK_KEY_NAME. Added doc for rack for broker JSON.
Feb 29, 2016
5862943
Updates for review comments.
Mar 3, 2016
596f948
Merge branch 'trunk' of github.com:apache/kafka into KAFKA-1215
Mar 3, 2016
6453ed6
Merge branch 'trunk' of github.com:apache/kafka into KAFKA-1215
Mar 4, 2016
78b505c
Updates according to code review comments.
Mar 4, 2016
99cc7cc
fix alignment
Mar 4, 2016
2fe598e
Code style improvements
ijuma Mar 4, 2016
ab0d80c
Handle version 2 in `UpdateMetadataRequest.getErrorResponse`
ijuma Mar 4, 2016
b7d9437
Tweak description of `disable-rack-aware` command-line option
ijuma Mar 4, 2016
1d87d41
Handle the case where `rack` is null in the JSON
ijuma Mar 4, 2016
befedfa
Be consistent in spacing for `toString` separator in `Broker`
ijuma Mar 4, 2016
ab60287
Merge pull request #1 from ijuma/KAFKA-1215
Mar 4, 2016
bd60dbb
Doc/comment enhancement according to review comments. Enhance UpdateM…
Mar 5, 2016
397cb35
Merge branch 'trunk' of github.com:apache/kafka into KAFKA-1215
Mar 5, 2016
930f110
Minor tweaks to `registerBrokerInZk`
ijuma Mar 7, 2016
3c7ea09
Clean-up `generateAssignment` methods
ijuma Mar 7, 2016
946e677
Remove unused method in `RackAwareTest` and other minor style changes
ijuma Mar 7, 2016
e9d3381
Remove unused variable in `AdminTest` and use `assertEquals` instead …
ijuma Mar 7, 2016
b2d6d27
Minor code style improvements in `TopicCommandTest`
ijuma Mar 7, 2016
e19f109
AdminUtils code improvements, behaviour should remain the same
ijuma Mar 7, 2016
133165b
Minor clean-ups in `RackAwareAutoTopicCreationTest`
ijuma Mar 7, 2016
2f1cf1f
Minor clean-ups in `ReassignPartitionsCommandTest`
ijuma Mar 7, 2016
2f48f23
Code style improvements in `AdminRackAwareTest`
ijuma Mar 7, 2016
22c9d3f
Use `alteredNumPartitions` instead of hardcoded string.
ijuma Mar 7, 2016
337c889
Merge pull request #2 from ijuma/KAFKA-1215
Mar 7, 2016
1afddb1
Update doc for AdminUtils.assignReplicasToBrokers
Mar 8, 2016
f63f3f0
Merge branch 'trunk' of github.com:apache/kafka into KAFKA-1215
Mar 8, 2016
b77ad30
Fix bug introduced in `assignReplicasToBrokersRackUnaware` refactoring
ijuma Mar 8, 2016
62f8ddd
Remove unused defaults, variables and unnecessary `toMap`
ijuma Mar 8, 2016
8cf62bd
Merge pull request #3 from ijuma/KAFKA-1215
Mar 8, 2016
362f470
Merge branch 'KAFKA-1215' of github.com:allenxwang/kafka into KAFKA-1215
Mar 8, 2016
1094361
Upgrade instruction updated for 0.10.0.0.
Mar 8, 2016
b60fa8f
Minor edits to update instruction.
Mar 8, 2016
0839d69
Merge branch 'trunk' of github.com:apache/kafka into KAFKA-1215
Mar 9, 2016
e07c499
Use `Seq[BrokerMetadata]` instead of `Seq[Int]` and `Map[Int, String]`
ijuma Mar 8, 2016
c5abd44
Minor wording tweaks
ijuma Mar 10, 2016
95d5fd1
Merge remote-tracking branch 'apache/trunk' into KAFKA-1215-broker-me…
ijuma Mar 11, 2016
346ab6d
Merge pull request #4 from ijuma/KAFKA-1215-broker-metadata
Mar 11, 2016
fa3eb3c
Added logic to prevent two replicas assign to the same broker.
Mar 12, 2016
b046eee
Merge branch 'KAFKA-1215' of github.com:allenxwang/kafka into KAFKA-1215
Mar 12, 2016
22beaaf
Address review comments.
Mar 14, 2016
c5bfcc8
Add error messages to assert.
Mar 15, 2016
a237bc0
Merge remote-tracking branch 'apache/trunk' into KAFKA-1215
ijuma Mar 15, 2016
569b5f0
Merge pull request #5 from ijuma/KAFKA-1215
Mar 15, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -697,8 +697,26 @@ public class Protocol {

public static final Schema UPDATE_METADATA_RESPONSE_V1 = UPDATE_METADATA_RESPONSE_V0;

public static final Schema[] UPDATE_METADATA_REQUEST = new Schema[] {UPDATE_METADATA_REQUEST_V0, UPDATE_METADATA_REQUEST_V1};
public static final Schema[] UPDATE_METADATA_RESPONSE = new Schema[] {UPDATE_METADATA_RESPONSE_V0, UPDATE_METADATA_RESPONSE_V1};
public static final Schema UPDATE_METADATA_REQUEST_PARTITION_STATE_V2 = UPDATE_METADATA_REQUEST_PARTITION_STATE_V1;

public static final Schema UPDATE_METADATA_REQUEST_END_POINT_V2 = UPDATE_METADATA_REQUEST_END_POINT_V1;

public static final Schema UPDATE_METADATA_REQUEST_BROKER_V2 =
new Schema(new Field("id", INT32, "The broker id."),
new Field("end_points", new ArrayOf(UPDATE_METADATA_REQUEST_END_POINT_V2)),
new Field("rack", NULLABLE_STRING, "The rack"));

public static final Schema UPDATE_METADATA_REQUEST_V2 =
new Schema(new Field("controller_id", INT32, "The controller id."),
new Field("controller_epoch", INT32, "The controller epoch."),
new Field("partition_states", new ArrayOf(UPDATE_METADATA_REQUEST_PARTITION_STATE_V2)),
new Field("live_brokers", new ArrayOf(UPDATE_METADATA_REQUEST_BROKER_V2)));

public static final Schema UPDATE_METADATA_RESPONSE_V2 = UPDATE_METADATA_RESPONSE_V1;


public static final Schema[] UPDATE_METADATA_REQUEST = new Schema[] {UPDATE_METADATA_REQUEST_V0, UPDATE_METADATA_REQUEST_V1, UPDATE_METADATA_REQUEST_V2};
public static final Schema[] UPDATE_METADATA_RESPONSE = new Schema[] {UPDATE_METADATA_RESPONSE_V0, UPDATE_METADATA_RESPONSE_V1, UPDATE_METADATA_RESPONSE_V2};

/* an array of all requests and responses with all schema versions; a null value in the inner array means that the
* particular version is not supported */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,10 @@ public int hashCode() {
for (Object arrayItem: arrayObject)
result = prime * result + arrayItem.hashCode();
} else {
result = prime * result + this.get(f).hashCode();
Object field = this.get(f);
if (field != null) {
result = prime * result + field.hashCode();
}
}
}
return result;
Expand All @@ -330,11 +333,13 @@ public boolean equals(Object obj) {
return false;
for (int i = 0; i < this.values.length; i++) {
Field f = this.schema.get(i);
Boolean result;
boolean result;
if (f.type() instanceof ArrayOf) {
result = Arrays.equals((Object []) this.get(f), (Object []) other.get(f));
result = Arrays.equals((Object[]) this.get(f), (Object[]) other.get(f));
} else {
result = this.get(f).equals(other.get(f));
Object thisField = this.get(f);
Object otherField = other.get(f);
result = (thisField == null && otherField == null) || thisField.equals(otherField);
}
if (!result)
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,22 @@ public PartitionState(int controllerEpoch, int leader, int leaderEpoch, List<Int
this.zkVersion = zkVersion;
this.replicas = replicas;
}

}

public static final class Broker {
public final int id;
public final Map<SecurityProtocol, EndPoint> endPoints;
public final String rack;

public Broker(int id, Map<SecurityProtocol, EndPoint> endPoints) {
public Broker(int id, Map<SecurityProtocol, EndPoint> endPoints, String rack) {
this.id = id;
this.endPoints = endPoints;
this.rack = rack;
}

@Deprecated
public Broker(int id, Map<SecurityProtocol, EndPoint> endPoints) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably mark this as deprecated.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This constructor is only used in tests, does it make sense to keep it? I guess the question is whether the request classes are API. As I understand, they are not, but I would like to get @junrao's take.

this(id, endPoints, null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to my protocol question above. Would defaulting to empty string work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I discussed this in KIP discussion. NULLABLE_STRING was recommended in the discussion.

I think it makes sense as rack itself is designed to be nullable (Option[String]). It is legal to define rack as an empty string.

There isn't really any null checks in the code as far as I can tell. null just means no rack is defined.

}
}

Expand Down Expand Up @@ -91,6 +97,7 @@ public EndPoint(String host, int port) {
// Broker key names
private static final String BROKER_ID_KEY_NAME = "id";
private static final String ENDPOINTS_KEY_NAME = "end_points";
private static final String RACK_KEY_NAME = "rack";

// EndPoint key names
private static final String HOST_KEY_NAME = "host";
Expand All @@ -117,20 +124,20 @@ private static Set<Broker> brokerEndPointsToBrokers(Set<BrokerEndPoint> brokerEn
for (BrokerEndPoint brokerEndPoint : brokerEndPoints) {
Map<SecurityProtocol, EndPoint> endPoints = Collections.singletonMap(SecurityProtocol.PLAINTEXT,
new EndPoint(brokerEndPoint.host(), brokerEndPoint.port()));
brokers.add(new Broker(brokerEndPoint.id(), endPoints));
brokers.add(new Broker(brokerEndPoint.id(), endPoints, null));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could use the constructor that doesn't take a rack.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@granthenke Given the comment from @junrao that the old constructor should be deprecated, I think it is better to use the new constructor.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Works for me

}
return brokers;
}

/**
* Constructor for version 1.
* Constructor for version 2.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will still need a separate constructor for v1 of UpdateMetadataRequest since in ControllerChannelManager, we may need to send a v1 request depending on inter.broker.protocol.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao This may not be necessary. See my comment in ControllerChannelManager.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this constructor is only used in tests. Does it even make sense to keep it?

*/
public UpdateMetadataRequest(int controllerId, int controllerEpoch, Map<TopicPartition,
PartitionState> partitionStates, Set<Broker> liveBrokers) {
this(1, controllerId, controllerEpoch, partitionStates, liveBrokers);
this(2, controllerId, controllerEpoch, partitionStates, liveBrokers);
}

private UpdateMetadataRequest(int version, int controllerId, int controllerEpoch, Map<TopicPartition,
public UpdateMetadataRequest(int version, int controllerId, int controllerEpoch, Map<TopicPartition,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this only public for testing? Would protected or default also work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it is used in ControllerChannelManager and has to be public.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that because we're depending on this constructor for version 1? I know we depend on choosing the right constructor in other request objects to get the right version, but I wonder if it would be better to have explicit static factory methods (e.g. UpdateMetadataRequest.createV0())?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We depend on this constructor to create version 1 and 2 UpdateMetadataRequest, and possibly for future versions as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough. I was only wondering if there was a way to keep the version better encapsulated (like all of the other requests). Perhaps at least there should be a check on the version to make sure it is greater than 1? I might even enforce only version 1 and 2 since we'll almost certainly have to touch this code anyway if there is another version bump.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think my preference would probably be to have static factory methods with the versions included in the name. Using constructors is kind of annoying because you have to check the comment to make sure you get the right one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we should use more static factories and less overloaded constructors in Kafka.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma @hachikuji Would you mind if I leave this code refactoring of constructors to you guys?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fine by me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto

PartitionState> partitionStates, Set<Broker> liveBrokers) {
super(new Struct(ProtoUtils.requestSchema(ApiKeys.UPDATE_METADATA_KEY.id, version)));
struct.set(CONTROLLER_ID_KEY_NAME, controllerId);
Expand Down Expand Up @@ -173,6 +180,9 @@ private UpdateMetadataRequest(int version, int controllerId, int controllerEpoch

}
brokerData.set(ENDPOINTS_KEY_NAME, endPointsData.toArray());
if (version >= 2) {
brokerData.set(RACK_KEY_NAME, broker.rack);
}
}

brokersData.add(brokerData);
Expand Down Expand Up @@ -226,8 +236,8 @@ public UpdateMetadataRequest(Struct struct) {
int port = brokerData.getInt(PORT_KEY_NAME);
Map<SecurityProtocol, EndPoint> endPoints = new HashMap<>(1);
endPoints.put(SecurityProtocol.PLAINTEXT, new EndPoint(host, port));
liveBrokers.add(new Broker(brokerId, endPoints));
} else { // V1
liveBrokers.add(new Broker(brokerId, endPoints, null));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could use the constructor that doesn't take a rack.

} else { // V1 or V2
Map<SecurityProtocol, EndPoint> endPoints = new HashMap<>();
for (Object endPointDataObj : brokerData.getArray(ENDPOINTS_KEY_NAME)) {
Struct endPointData = (Struct) endPointDataObj;
Expand All @@ -236,11 +246,13 @@ public UpdateMetadataRequest(Struct struct) {
short protocolTypeId = endPointData.getShort(SECURITY_PROTOCOL_TYPE_KEY_NAME);
endPoints.put(SecurityProtocol.forId(protocolTypeId), new EndPoint(host, port));
}
liveBrokers.add(new Broker(brokerId, endPoints));
String rack = null;
if (brokerData.hasField(RACK_KEY_NAME)) { // V2
rack = brokerData.getString(RACK_KEY_NAME);
}
liveBrokers.add(new Broker(brokerId, endPoints, rack));
}

}

controllerId = struct.getInt(CONTROLLER_ID_KEY_NAME);
controllerEpoch = struct.getInt(CONTROLLER_EPOCH_KEY_NAME);
this.partitionStates = partitionStates;
Expand All @@ -249,14 +261,11 @@ public UpdateMetadataRequest(Struct struct) {

@Override
public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
switch (versionId) {
case 0:
case 1:
return new UpdateMetadataResponse(Errors.forException(e).code());
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.UPDATE_METADATA_KEY.id)));
}
if (versionId <= 2)
return new UpdateMetadataResponse(Errors.forException(e).code());
else
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.UPDATE_METADATA_KEY.id)));
}

public int controllerId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,9 @@ public void testSerialization() throws Exception {
createStopReplicaRequest(),
createStopReplicaRequest().getErrorResponse(0, new UnknownServerException()),
createStopReplicaResponse(),
createUpdateMetadataRequest(1),
createUpdateMetadataRequest(1).getErrorResponse(1, new UnknownServerException()),
createUpdateMetadataRequest(2, "rack1"),
createUpdateMetadataRequest(2, null),
createUpdateMetadataRequest(2, "rack1").getErrorResponse(2, new UnknownServerException()),
createUpdateMetadataResponse(),
createLeaderAndIsrRequest(),
createLeaderAndIsrRequest().getErrorResponse(0, new UnknownServerException()),
Expand All @@ -97,8 +98,11 @@ public void testSerialization() throws Exception {
for (AbstractRequestResponse req : requestResponseList)
checkSerialization(req, null);

checkSerialization(createUpdateMetadataRequest(0), 0);
checkSerialization(createUpdateMetadataRequest(0).getErrorResponse(0, new UnknownServerException()), 0);
checkSerialization(createUpdateMetadataRequest(0, null), 0);
checkSerialization(createUpdateMetadataRequest(0, null).getErrorResponse(0, new UnknownServerException()), 0);
checkSerialization(createUpdateMetadataRequest(1, null), 1);
checkSerialization(createUpdateMetadataRequest(1, "rack1"), 1);
checkSerialization(createUpdateMetadataRequest(1, null).getErrorResponse(1, new UnknownServerException()), 1);
}

private void checkSerialization(AbstractRequestResponse req, Integer version) throws Exception {
Expand All @@ -120,7 +124,7 @@ private void checkSerialization(AbstractRequestResponse req, Integer version) th

@Test
public void produceResponseVersionTest() {
Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<TopicPartition, ProduceResponse.PartitionResponse>();
Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>();
responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE.code(), 10000, Record.NO_TIMESTAMP));
ProduceResponse v0Response = new ProduceResponse(responseData);
ProduceResponse v1Response = new ProduceResponse(responseData, 10, 1);
Expand All @@ -138,7 +142,7 @@ public void produceResponseVersionTest() {

@Test
public void fetchResponseVersionTest() {
Map<TopicPartition, FetchResponse.PartitionData> responseData = new HashMap<TopicPartition, FetchResponse.PartitionData>();
Map<TopicPartition, FetchResponse.PartitionData> responseData = new HashMap<>();
responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE.code(), 1000000, ByteBuffer.allocate(10)));

FetchResponse v0Response = new FetchResponse(responseData);
Expand Down Expand Up @@ -192,14 +196,14 @@ private AbstractRequestResponse createGroupCoordinatorResponse() {
}

private AbstractRequest createFetchRequest() {
Map<TopicPartition, FetchRequest.PartitionData> fetchData = new HashMap<TopicPartition, FetchRequest.PartitionData>();
Map<TopicPartition, FetchRequest.PartitionData> fetchData = new HashMap<>();
fetchData.put(new TopicPartition("test1", 0), new FetchRequest.PartitionData(100, 1000000));
fetchData.put(new TopicPartition("test2", 0), new FetchRequest.PartitionData(200, 1000000));
return new FetchRequest(-1, 100, 100000, fetchData);
}

private AbstractRequestResponse createFetchResponse() {
Map<TopicPartition, FetchResponse.PartitionData> responseData = new HashMap<TopicPartition, FetchResponse.PartitionData>();
Map<TopicPartition, FetchResponse.PartitionData> responseData = new HashMap<>();
responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE.code(), 1000000, ByteBuffer.allocate(10)));
return new FetchResponse(responseData, 0);
}
Expand Down Expand Up @@ -259,13 +263,13 @@ private AbstractRequestResponse createLeaveGroupResponse() {
}

private AbstractRequest createListOffsetRequest() {
Map<TopicPartition, ListOffsetRequest.PartitionData> offsetData = new HashMap<TopicPartition, ListOffsetRequest.PartitionData>();
Map<TopicPartition, ListOffsetRequest.PartitionData> offsetData = new HashMap<>();
offsetData.put(new TopicPartition("test", 0), new ListOffsetRequest.PartitionData(1000000L, 10));
return new ListOffsetRequest(-1, offsetData);
}

private AbstractRequestResponse createListOffsetResponse() {
Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<TopicPartition, ListOffsetResponse.PartitionData>();
Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<>();
responseData.put(new TopicPartition("test", 0), new ListOffsetResponse.PartitionData(Errors.NONE.code(), Arrays.asList(100L)));
return new ListOffsetResponse(responseData);
}
Expand All @@ -289,13 +293,13 @@ private AbstractRequestResponse createMetadataResponse() {
}

private AbstractRequest createOffsetCommitRequest() {
Map<TopicPartition, OffsetCommitRequest.PartitionData> commitData = new HashMap<TopicPartition, OffsetCommitRequest.PartitionData>();
Map<TopicPartition, OffsetCommitRequest.PartitionData> commitData = new HashMap<>();
commitData.put(new TopicPartition("test", 0), new OffsetCommitRequest.PartitionData(100, ""));
return new OffsetCommitRequest("group1", 100, "consumer1", 1000000, commitData);
}

private AbstractRequestResponse createOffsetCommitResponse() {
Map<TopicPartition, Short> responseData = new HashMap<TopicPartition, Short>();
Map<TopicPartition, Short> responseData = new HashMap<>();
responseData.put(new TopicPartition("test", 0), Errors.NONE.code());
return new OffsetCommitResponse(responseData);
}
Expand All @@ -305,19 +309,19 @@ private AbstractRequest createOffsetFetchRequest() {
}

private AbstractRequestResponse createOffsetFetchResponse() {
Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = new HashMap<TopicPartition, OffsetFetchResponse.PartitionData>();
Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = new HashMap<>();
responseData.put(new TopicPartition("test", 0), new OffsetFetchResponse.PartitionData(100L, "", Errors.NONE.code()));
return new OffsetFetchResponse(responseData);
}

private AbstractRequest createProduceRequest() {
Map<TopicPartition, ByteBuffer> produceData = new HashMap<TopicPartition, ByteBuffer>();
Map<TopicPartition, ByteBuffer> produceData = new HashMap<>();
produceData.put(new TopicPartition("test", 0), ByteBuffer.allocate(10));
return new ProduceRequest((short) 1, 5000, produceData);
}

private AbstractRequestResponse createProduceResponse() {
Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<TopicPartition, ProduceResponse.PartitionResponse>();
Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>();
responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE.code(), 10000, Record.NO_TIMESTAMP));
return new ProduceResponse(responseData, 0);
}
Expand Down Expand Up @@ -370,7 +374,7 @@ private AbstractRequestResponse createLeaderAndIsrResponse() {
return new LeaderAndIsrResponse(Errors.NONE.code(), responses);
}

private AbstractRequest createUpdateMetadataRequest(int version) {
private AbstractRequest createUpdateMetadataRequest(int version, String rack) {
Map<TopicPartition, UpdateMetadataRequest.PartitionState> partitionStates = new HashMap<>();
List<Integer> isr = Arrays.asList(1, 2);
List<Integer> replicas = Arrays.asList(1, 2, 3, 4);
Expand All @@ -396,11 +400,10 @@ private AbstractRequest createUpdateMetadataRequest(int version) {
endPoints2.put(SecurityProtocol.PLAINTEXT, new UpdateMetadataRequest.EndPoint("host1", 1244));
endPoints2.put(SecurityProtocol.SSL, new UpdateMetadataRequest.EndPoint("host2", 1234));

Set<UpdateMetadataRequest.Broker> liveBrokers = new HashSet<>(Arrays.asList(new UpdateMetadataRequest.Broker(0, endPoints1),
new UpdateMetadataRequest.Broker(1, endPoints2)
Set<UpdateMetadataRequest.Broker> liveBrokers = new HashSet<>(Arrays.asList(new UpdateMetadataRequest.Broker(0, endPoints1, rack),
new UpdateMetadataRequest.Broker(1, endPoints2, rack)
));

return new UpdateMetadataRequest(1, 10, partitionStates, liveBrokers);
return new UpdateMetadataRequest(version, 1, 10, partitionStates, liveBrokers);
}
}

Expand Down
Loading