Skip to content

Commit

Permalink
add SourceLogger 并添加注释,选主,分片分配
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangyunpeng committed Aug 8, 2024
1 parent 6870beb commit 1d1f43c
Show file tree
Hide file tree
Showing 32 changed files with 632 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.SourceLogger;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -533,6 +534,11 @@ public void shardStarted(final ShardRouting shardRouting,
primaryTerm,
message,
timestampMillisRange);
SourceLogger.info(this.getClass(),"send shard:[{}/{}] action:[{}] message:[{}]",
shardRouting.getIndexName(),
shardRouting.shardId(),
SHARD_STARTED_ACTION_NAME,
message);
sendShardAction(SHARD_STARTED_ACTION_NAME, currentState, entry, listener);
}

Expand All @@ -550,7 +556,7 @@ private static class ShardStartedTransportHandler implements TransportRequestHan

@Override
public void messageReceived(StartedShardEntry request, TransportChannel channel, Task task) throws Exception {
logger.debug("{} received shard started for [{}]", request.shardId, request);
SourceLogger.info(ShardStartedTransportHandler.class,"{} received shard started for [{}]", request.shardId, request);
clusterService.submitStateUpdateTask(
"shard-started " + request,
request,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.SourceLogger;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand Down Expand Up @@ -311,14 +312,15 @@ public String toString() {

/**
* A collection of persistent node ids, denoting the voting configuration for cluster state changes.
* 管理和维护集群选举配置的一个关键类。其主要作用是定义哪些节点有投票权,并用来决定选举过程中的法定人数(quorum)和有效投票
*/
public static class VotingConfiguration implements Writeable, ToXContentFragment {

public static final VotingConfiguration EMPTY_CONFIG = new VotingConfiguration(Collections.emptySet());
public static final VotingConfiguration MUST_JOIN_ELECTED_MASTER = new VotingConfiguration(Collections.singleton(
"_must_join_elected_master_"));

private final Set<String> nodeIds;
private final Set<String> nodeIds;//具有投票权的节点ID

public VotingConfiguration(Set<String> nodeIds) {
this.nodeIds = Collections.unmodifiableSet(new HashSet<>(nodeIds));
Expand All @@ -336,7 +338,12 @@ public void writeTo(StreamOutput out) throws IOException {
public boolean hasQuorum(Collection<String> votes) {
final HashSet<String> intersection = new HashSet<>(nodeIds);
intersection.retainAll(votes);
return intersection.size() * 2 > nodeIds.size();
//交集中的节点数量超过总节点数量的一半,则认为达到法定人数
//如果nodeIds为5,投票数intersection为3则超过一半
//如果nodeIds为1,投票数intersection为1也满足条件
boolean ret = intersection.size() * 2 > nodeIds.size();
SourceLogger.info(this.getClass(),"checkQuorum {} votes:[{}] nodes:[{}]",ret,votes,nodeIds);
return ret;
}

public Set<String> getNodeIds() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.cluster.coordination.CoordinationMetadata.VotingConfiguration;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.SourceLogger;

import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -163,6 +164,13 @@ public void setInitialState(ClusterState initialState) {
* @throws CoordinationStateRejectedException if the arguments were incompatible with the current state of this object.
*/
public Join handleStartJoin(StartJoinRequest startJoinRequest) {
SourceLogger.info(this.getClass(), "handleStartJoin,sourceNode={},joinRequestTerm={},currentTerm={},lastAcceptedTerm={}",
startJoinRequest.getSourceNode(),
startJoinRequest.getTerm(),
getCurrentTerm(),
getLastAcceptedTerm());

//如果收到的term小于或等于当前term,忽略
if (startJoinRequest.getTerm() <= getCurrentTerm()) {
logger.debug("handleStartJoin: ignoring [{}] as term provided is not greater than current term [{}]",
startJoinRequest, getCurrentTerm());
Expand Down Expand Up @@ -193,6 +201,11 @@ public Join handleStartJoin(StartJoinRequest startJoinRequest) {
joinVotes = new VoteCollection();
publishVotes = new VoteCollection();

SourceLogger.info(this.getClass(), "create Join,sourceNode={},currentTerTerm={},lastAcceptedTerm={}",
startJoinRequest.getSourceNode(),
getCurrentTerm(),
getLastAcceptedTerm());
//创建Join时,sourceNode=localNode,target=startJoinRequest.getSourceNode()
return new Join(localNode, startJoinRequest.getSourceNode(), getCurrentTerm(), getLastAcceptedTerm(),
getLastAcceptedVersionOrMetadataVersion());
}
Expand All @@ -207,6 +220,8 @@ public Join handleStartJoin(StartJoinRequest startJoinRequest) {
public boolean handleJoin(Join join) {
assert join.targetMatches(localNode) : "handling join " + join + " for the wrong node " + localNode;

SourceLogger.info(this.getClass(), "handling join [{}] lastAcceptedTerm={}",join,getLastAcceptedTerm());

if (join.getTerm() != getCurrentTerm()) {
logger.debug("handleJoin: ignored join due to term mismatch (expected: [{}], actual: [{}])",
getCurrentTerm(), join.getTerm());
Expand Down Expand Up @@ -247,6 +262,7 @@ public boolean handleJoin(Join join) {

boolean added = joinVotes.addJoinVote(join);
boolean prevElectionWon = electionWon;
//计算选举的法定人数
electionWon = isElectionQuorum(joinVotes);
assert !prevElectionWon || electionWon : // we cannot go from won to not won
"locaNode= " + localNode + ", join=" + join + ", joinVotes=" + joinVotes;
Expand Down Expand Up @@ -296,6 +312,8 @@ && getLastCommittedConfiguration().equals(getLastAcceptedConfiguration()) == fal
logger.debug("handleClientValue: only allow reconfiguration while not already reconfiguring");
throw new CoordinationStateRejectedException("only allow reconfiguration while not already reconfiguring");
}

//joinVotes检查当前节点是否满足法定人数
if (joinVotesHaveQuorumFor(clusterState.getLastAcceptedConfiguration()) == false) {
logger.debug("handleClientValue: only allow reconfiguration if joinVotes have quorum for new config");
throw new CoordinationStateRejectedException("only allow reconfiguration if joinVotes have quorum for new config");
Expand All @@ -322,12 +340,15 @@ && getLastCommittedConfiguration().equals(getLastAcceptedConfiguration()) == fal
*/
public PublishResponse handlePublishRequest(PublishRequest publishRequest) {
final ClusterState clusterState = publishRequest.getAcceptedState();

//必须是相同term
if (clusterState.term() != getCurrentTerm()) {
logger.debug("handlePublishRequest: ignored publish request due to term mismatch (expected: [{}], actual: [{}])",
getCurrentTerm(), clusterState.term());
throw new CoordinationStateRejectedException("incoming term " + clusterState.term() + " does not match current term " +
getCurrentTerm());
}
//如果收到的version小于当前version
if (clusterState.term() == getLastAcceptedTerm() && clusterState.version() <= getLastAcceptedVersion()) {
if (clusterState.term() == ZEN1_BWC_TERM
&& clusterState.nodes().getMasterNode().equals(getLastAcceptedState().nodes().getMasterNode()) == false) {
Expand Down Expand Up @@ -376,12 +397,12 @@ public Optional<ApplyCommitRequest> handlePublishResponse(DiscoveryNode sourceNo
" does not match current version " + lastPublishedVersion);
}

logger.trace("handlePublishResponse: accepted publish response for version [{}] and term [{}] from [{}]",
SourceLogger.info(this.getClass(),"handlePublishResponse: accepted publish response for version [{}] and term [{}] from [{}] ",
publishResponse.getVersion(), publishResponse.getTerm(), sourceNode);

publishVotes.addVote(sourceNode);
if (isPublishQuorum(publishVotes)) {
logger.trace("handlePublishResponse: value committed for version [{}] and term [{}]",
publishResponse.getVersion(), publishResponse.getTerm());
SourceLogger.info(this.getClass(),"handlePublishResponse: PublishQuorum -> true");
return Optional.of(new ApplyCommitRequest(localNode, publishResponse.getTerm(), publishResponse.getVersion()));
}

Expand All @@ -395,20 +416,33 @@ public Optional<ApplyCommitRequest> handlePublishResponse(DiscoveryNode sourceNo
* @throws CoordinationStateRejectedException if the arguments were incompatible with the current state of this object.
*/
public void handleCommit(ApplyCommitRequest applyCommit) {
SourceLogger.info(this.getClass(),"handle commit request term:[{}]/[{}]/[{}] version:[{}]/[{}]",
applyCommit.getTerm(),
getCurrentTerm(),
getLastAcceptedTerm(),
applyCommit.getVersion(),
getLastAcceptedVersion()
);

//检查CurrentTerm
if (applyCommit.getTerm() != getCurrentTerm()) {
logger.debug("handleCommit: ignored commit request due to term mismatch " +
"(expected: [term {} version {}], actual: [term {} version {}])",
getLastAcceptedTerm(), getLastAcceptedVersion(), applyCommit.getTerm(), applyCommit.getVersion());
throw new CoordinationStateRejectedException("incoming term " + applyCommit.getTerm() + " does not match current term " +
getCurrentTerm());
}

//检查LastAcceptedTerm
if (applyCommit.getTerm() != getLastAcceptedTerm()) {
logger.debug("handleCommit: ignored commit request due to term mismatch " +
"(expected: [term {} version {}], actual: [term {} version {}])",
getLastAcceptedTerm(), getLastAcceptedVersion(), applyCommit.getTerm(), applyCommit.getVersion());
throw new CoordinationStateRejectedException("incoming term " + applyCommit.getTerm() + " does not match last accepted term " +
getLastAcceptedTerm());
}

//检查version
if (applyCommit.getVersion() != getLastAcceptedVersion()) {
logger.debug("handleCommit: ignored commit request due to version mismatch (term {}, expected: [{}], actual: [{}])",
getLastAcceptedTerm(), getLastAcceptedVersion(), applyCommit.getVersion());
Expand Down Expand Up @@ -475,6 +509,8 @@ public interface PersistedState extends Closeable {
* marked as committed.
*/
default void markLastAcceptedStateAsCommitted() {
SourceLogger.info(this.getClass(),"markLastAcceptedStateAsCommitted");
//返回最近接受的状态,也就是publish时设置的state
final ClusterState lastAcceptedState = getLastAcceptedState();
Metadata.Builder metadataBuilder = null;
if (lastAcceptedState.getLastAcceptedConfiguration().equals(lastAcceptedState.getLastCommittedConfiguration()) == false) {
Expand All @@ -488,12 +524,15 @@ default void markLastAcceptedStateAsCommitted() {
assert lastAcceptedState.metadata().clusterUUID().equals(Metadata.UNKNOWN_CLUSTER_UUID) == false ||
lastAcceptedState.term() == ZEN1_BWC_TERM :
"received cluster state with empty cluster uuid but not Zen1 BWC term: " + lastAcceptedState;

//如果uuid还没有commit
if (lastAcceptedState.metadata().clusterUUID().equals(Metadata.UNKNOWN_CLUSTER_UUID) == false &&
lastAcceptedState.metadata().clusterUUIDCommitted() == false) {
if (metadataBuilder == null) {
metadataBuilder = Metadata.builder(lastAcceptedState.metadata());
}
metadataBuilder.clusterUUIDCommitted(true);
SourceLogger.info("handle commit: cluster UUID [{}] committed", lastAcceptedState.metadata().clusterUUID());

if (lastAcceptedState.term() != ZEN1_BWC_TERM) {
// Zen1 masters never publish a committed cluster UUID so if we logged this it'd happen on on every update. Let's just
Expand All @@ -502,6 +541,7 @@ default void markLastAcceptedStateAsCommitted() {
}
}
if (metadataBuilder != null) {
//修改LastAcceptedState
setLastAcceptedState(ClusterState.builder(lastAcceptedState).metadata(metadataBuilder).build());
}
}
Expand All @@ -511,18 +551,26 @@ default void close() throws IOException {
}

/**
*
* 代表一个投票集合,投票可以是Nodes(Peer阶段), 也可以Join请求
* A collection of votes, used to calculate quorums. Optionally records the Joins as well.
*/
public static class VoteCollection {

private final Map<String, DiscoveryNode> nodes;
private final Set<Join> joins;
private final Map<String, DiscoveryNode> nodes;//节点信息
private final Set<Join> joins; //Join投票

public boolean addVote(DiscoveryNode sourceNode) {
return sourceNode.isMasterNode() && nodes.put(sourceNode.getId(), sourceNode) == null;
boolean ret= sourceNode.isMasterNode() && nodes.put(sourceNode.getId(), sourceNode) == null;
SourceLogger.info(this.getClass(),"addVote! node:[{}], master:[{}], nodes=[{}]",
sourceNode.getHostName(),
sourceNode.isMasterNode(),
nodes);
return ret;
}

public boolean addJoinVote(Join join) {
//注意投票是SourceNode
final boolean added = addVote(join.getSourceNode());
if (added) {
joins.add(join);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.SourceLogger;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
Expand Down Expand Up @@ -258,15 +259,19 @@ void onFollowerCheckRequest(FollowerCheckRequest followerCheckRequest) {

private void handleApplyCommit(ApplyCommitRequest applyCommitRequest, ActionListener<Void> applyListener) {
synchronized (mutex) {
logger.trace("handleApplyCommit: applying commit {}", applyCommitRequest);
SourceLogger.info(this.getClass(),"handleApplyCommit start! applying commit {}", applyCommitRequest);

//① coordinationState.handleCommit(),主要是更新本地状态
coordinationState.get().handleCommit(applyCommitRequest);
final ClusterState committedState = hideStateIfNotRecovered(coordinationState.get().getLastAcceptedState());
applierState = mode == Mode.CANDIDATE ? clusterStateWithNoMasterBlock(committedState) : committedState;

//② 如果本地是leader,这里不需要更新本地状态
if (applyCommitRequest.getSourceNode().equals(getLocalNode())) {
// master node applies the committed state at the end of the publication process, not here.
applyListener.onResponse(null);
} else {
//③ 如果是follower节点(包括master和data),需要将集群状态在本地复原
clusterApplier.onNewClusterState(applyCommitRequest.toString(), () -> applierState,
new ClusterApplyListener() {

Expand All @@ -281,6 +286,8 @@ public void onSuccess(String source) {
}
});
}

SourceLogger.info(this.getClass(),"handleApplyCommit end!");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.cluster.coordination.Coordinator.Mode;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.SourceLogger;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Setting;
Expand Down Expand Up @@ -170,6 +171,12 @@ private void handleFollowerCheck(FollowerCheckRequest request, TransportChannel
}

final FastResponseState responder = this.fastResponseState;
SourceLogger.info(this.getClass(),"handleFollowerCheck! sourceNode:[{}],requestTerm:[{}],currentTerm:[{}]",
request.getSender(),
request.getTerm(),
responder.term);

//如果当前已经是FOLLOWER,并且term一致,返回
if (responder.mode == Mode.FOLLOWER && responder.term == request.term) {
logger.trace("responding to {} on fast path", request);
transportChannel.sendResponse(Empty.INSTANCE);
Expand Down Expand Up @@ -304,6 +311,8 @@ private void handleWakeUp() {
actionName = FOLLOWER_CHECK_ACTION_NAME;
transportRequest = request;
}

SourceLogger.info(this.getClass(),"send heartbeat! action:[{}] to [{}]",actionName ,discoveryNode );
transportService.sendRequest(discoveryNode, actionName, transportRequest,
TransportRequestOptions.of(followerCheckTimeout, Type.PING),
new TransportResponseHandler.Empty() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.SourceLogger;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -213,6 +214,7 @@ boolean isJoinPending() {
}

public void sendJoinRequest(DiscoveryNode destination, long term, Optional<Join> optionalJoin) {
SourceLogger.info("sendJoinRequest join={}",optionalJoin.get());
sendJoinRequest(destination, term, optionalJoin, () -> {
});
}
Expand Down Expand Up @@ -450,6 +452,8 @@ public void close(Mode newMode) {
pendingAsTasks.put(JoinTaskExecutor.newFinishElectionTask(), (source, e) -> {
});
joinTaskExecutor = joinTaskExecutorGenerator.get();

//提交两个本地任务
masterService.submitStateUpdateTasks(stateUpdateSource, pendingAsTasks, ClusterStateTaskConfig.build(Priority.URGENT),
joinTaskExecutor);
} else {
Expand Down
Loading

0 comments on commit 1d1f43c

Please sign in to comment.