Skip to content

Commit

Permalink
[COMMON] rewrite admin.Replica by java 17 record (#1782)
Browse files Browse the repository at this point in the history
  • Loading branch information
chia7712 authored May 27, 2023
1 parent e3372db commit fb58f05
Show file tree
Hide file tree
Showing 6 changed files with 218 additions and 417 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,7 @@ private CompletionStage<List<Replica>> replicas(Set<String> topics) {
Replica.builder()
.topic(topicName)
.partition(partitionId)
.internal(internal)
.isInternal(internal)
.isAdding(isAdding)
.isRemoving(isRemoving)
.broker(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ public ClusterInfoBuilder addTopic(
.isAdding(false)
.isRemoving(false)
.lag(0)
.internal(false)
.isInternal(false)
.isLeader(index == 0)
.isSync(true)
.isFuture(false)
Expand Down
295 changes: 214 additions & 81 deletions common/src/main/java/org/astraea/common/admin/Replica.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,49 @@
*/
package org.astraea.common.admin;

public interface Replica {
import java.util.Objects;

static ReplicaBuilder builder() {
return new ReplicaBuilder();
/**
* @param topic topic name
* @param partition partition id
* @param broker information of the node hosts this replica
* @param isLeader true if this replica is a leader replica
* @param isSync true if this replica is synced
* @param isOffline true if this replica is offline
* @param isAdding true if this replica is adding and syncing data
* @param isRemoving true if this replica will be deleted in the future.
* @param isFuture true if this log is created by AlterReplicaLogDirsRequest and will replace the
* current log of the replica at some time in the future.
* @param isPreferredLeader true if the replica is the preferred leader
* @param lag (LEO - high watermark) if it is the current log, * (LEO) if it is the future log, (-1)
* if the host of replica is offline
* @param size The size of all log segments in this replica in bytes. It returns -1 if the host of
* replica is offline
* @param path that indicates the data folder path which stored this replica on a specific Kafka
* node. It returns null if the host of replica is offline
* @param isInternal true if this replica belongs to internal topic
*/
public record Replica(
String topic,
int partition,
Broker broker,
boolean isLeader,
boolean isSync,
boolean isOffline,
boolean isAdding,
boolean isRemoving,
boolean isFuture,
boolean isPreferredLeader,
long lag,
long size,
String path,
boolean isInternal) {

public static Builder builder() {
return new Builder();
}

static ReplicaBuilder builder(Replica replica) {
public static Builder builder(Replica replica) {
return Replica.builder().replica(replica);
}

Expand All @@ -31,7 +67,7 @@ static ReplicaBuilder builder(Replica replica) {
*
* @return TopicPartitionReplica
*/
default TopicPartitionReplica topicPartitionReplica() {
public TopicPartitionReplica topicPartitionReplica() {
return TopicPartitionReplica.of(topic(), partition(), broker().id());
}

Expand All @@ -40,105 +76,202 @@ default TopicPartitionReplica topicPartitionReplica() {
*
* @return TopicPartition
*/
default TopicPartition topicPartition() {
public TopicPartition topicPartition() {
return TopicPartition.of(topic(), partition());
}

/**
* @return topic name
*/
String topic();

/**
* @return partition id
*/
int partition();

/**
* @return information of the node hosts this replica
*/
Broker broker();

/**
* @return true if this replica is a leader replica
*/
boolean isLeader();

/**
* @return true if this replica is a follower replica
*/
default boolean isFollower() {
public boolean isFollower() {
return !isLeader();
}

/**
* @return true if this replica is synced
*/
boolean isSync();

/**
* @return true if this replica is offline
*/
boolean isOffline();

/**
* @return true if this replica is online
*/
default boolean isOnline() {
public boolean isOnline() {
return !isOffline();
}

/**
* @return true if this replica is adding and syncing data
*/
boolean isAdding();

/**
* @return true if this replica will be deleted in the future.
*/
boolean isRemoving();

/**
* Whether this replica has been created by a AlterReplicaLogDirsRequest but not yet replaced the
* current replica on the broker.
*
* @return true if this log is created by AlterReplicaLogDirsRequest and will replace the current
* log of the replica at some time in the future.
*/
boolean isFuture();

/**
* @return true if this is current log of replica.
*/
default boolean isCurrent() {
public boolean isCurrent() {
return !isFuture();
}

/**
* @return true if the replica is the preferred leader
*/
boolean isPreferredLeader();
public static class Builder {

/**
* @return (LEO - high watermark) if it is the current log, * (LEO) if it is the future log, *
* (-1) if the host of replica is offline
*/
long lag();
private String topic;
private int partition;
private Broker broker;
private long lag;
private long size;

/**
* @return The size of all log segments in this replica in bytes. It returns -1 if the host of
* replica is offline
*/
long size();
private boolean isAdding;

/**
* @return that indicates the data folder path which stored this replica on a specific Kafka node.
* It returns null if the host of replica is offline
*/
String path();
private boolean isRemoving;
private boolean isInternal;
private boolean isLeader;
private boolean isSync;
private boolean isFuture;
private boolean isOffline;
private boolean isPreferredLeader;
private String path;

/**
* @return true if this replica belongs to internal topic
*/
boolean internal();
private Builder() {}

public Builder replica(Replica replica) {
this.topic = replica.topic();
this.partition = replica.partition();
this.broker = replica.broker();
this.lag = replica.lag();
this.size = replica.size();
this.isAdding = replica.isAdding;
this.isRemoving = replica.isRemoving;
this.isInternal = replica.isInternal;
this.isLeader = replica.isLeader();
this.isSync = replica.isSync();
this.isFuture = replica.isFuture();
this.isOffline = replica.isOffline();
this.isPreferredLeader = replica.isPreferredLeader();
this.path = replica.path();
return this;
}

public Builder topic(String topic) {
this.topic = topic;
return this;
}

public Builder partition(int partition) {
this.partition = partition;
return this;
}

public Builder broker(Broker broker) {
this.broker = broker;
return this;
}

public Builder lag(long lag) {
this.lag = lag;
return this;
}

public Builder size(long size) {
this.size = size;
return this;
}

public Builder isAdding(boolean isAdding) {
this.isAdding = isAdding;
return this;
}

public Builder isRemoving(boolean isRemoving) {
this.isRemoving = isRemoving;
return this;
}

public Builder isInternal(boolean isInternal) {
this.isInternal = isInternal;
return this;
}

public Builder isLeader(boolean leader) {
this.isLeader = leader;
return this;
}

public Builder isSync(boolean isSync) {
this.isSync = isSync;
return this;
}

public Builder isFuture(boolean isFuture) {
this.isFuture = isFuture;
return this;
}

public Builder isOffline(boolean offline) {
this.isOffline = offline;
return this;
}

public Builder isPreferredLeader(boolean isPreferredLeader) {
this.isPreferredLeader = isPreferredLeader;
return this;
}

public Builder path(String path) {
this.path = path;
return this;
}

/**
* a helper used to set all flags for a replica leader.
*
* @return a replica leader
*/
public Replica buildLeader() {
return new Replica(
Objects.requireNonNull(topic),
partition,
Objects.requireNonNull(broker),
true,
true,
false,
false,
false,
false,
true,
0,
size,
Objects.requireNonNull(path),
isInternal);
}

/**
* a helper used to set all flags for a in-sync replica follower.
*
* @return a replica leader
*/
public Replica buildInSyncFollower() {
return new Replica(
Objects.requireNonNull(topic),
partition,
Objects.requireNonNull(broker),
false,
true,
false,
false,
false,
false,
false,
0,
size,
Objects.requireNonNull(path),
isInternal);
}

public Replica build() {
return new Replica(
Objects.requireNonNull(topic),
partition,
Objects.requireNonNull(broker),
isLeader,
isSync,
isOffline,
isAdding,
isRemoving,
isFuture,
isPreferredLeader,
lag,
size,
path,
isInternal);
}
}
}
Loading

0 comments on commit fb58f05

Please sign in to comment.