Skip to content

Commit

Permalink
fed分布式适配kill havenask searcher的rest接口 (alibaba#431)
Browse files Browse the repository at this point in the history
现在可以通过设置node参数指定由哪个node进行kill searcher的操作,未指定时,所有节点都执行kill searcher的操作。

e.g.
#所有节点都执行kill searcher操作
POST /_havenask/stop?role=searcher

#node1 执行kill searcher操作
POST /_havenask/stop?role=searcher&node=node1

#node1、node2 与node3 执行kill searcher操作
POST /_havenask/stop?role=searcher&node=node1,node2,node3
  • Loading branch information
Huaixinww committed Jan 26, 2024
1 parent e88db90 commit dd5db49
Show file tree
Hide file tree
Showing 8 changed files with 271 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,7 @@ public void testSearcherStop() throws Exception {

HavenaskStopRequest request = new HavenaskStopRequest(role);
HavenaskStopResponse response = client().execute(HavenaskStopAction.INSTANCE, request).actionGet();
assertEquals(200, response.getResultCode());
assertEquals("target stop role: searcher; run stopping searcher command success; ", response.getResult());

// check searcher process is not running
assertFalse(checkProcessAlive(role));
}
Expand All @@ -117,8 +116,7 @@ public void testQrsStop() throws Exception {

HavenaskStopRequest request = new HavenaskStopRequest(role);
HavenaskStopResponse response = client().execute(HavenaskStopAction.INSTANCE, request).actionGet();
assertEquals(200, response.getResultCode());
assertEquals("target stop role: qrs; run stopping qrs command success; ", response.getResult());

// check searcher process is not running
assertFalse(checkProcessAlive(role));
}
Expand All @@ -138,11 +136,7 @@ public void testAllStop() throws Exception {

HavenaskStopRequest request = new HavenaskStopRequest(role);
HavenaskStopResponse response = client().execute(HavenaskStopAction.INSTANCE, request).actionGet();
assertEquals(200, response.getResultCode());
assertEquals(
"target stop role: all; run stopping searcher command success; run stopping qrs command success; ",
response.getResult()
);

// check searcher and qrs process is not running
assertFalse(checkProcessAlive("searcher"));
assertFalse(checkProcessAlive("qrs"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package org.havenask.engine;

import org.apache.http.util.EntityUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.havenask.action.admin.cluster.health.ClusterHealthRequest;
Expand All @@ -36,6 +37,7 @@

import java.io.IOException;
import java.util.HashSet;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -123,6 +125,23 @@ public void testKillSearcherThenPeerRecovery() throws Exception {
String primaryPreference = "primary";
String replicaPreference = "replication";

String dataNodeName = null;
Response catNodesResponse = highLevelClient().getLowLevelClient()
.performRequest(new Request("GET", "/_cat" + "/nodes?format=json"));
String catNodesStr = EntityUtils.toString(catNodesResponse.getEntity());
CatNodes catNodes = CatNodes.parse(catNodesStr);
for (int i = 0; i < catNodes.getNodes().length; i++) {
CatNodes.NodeInfo nodeInfo = catNodes.getNodes()[i];
if (nodeInfo.nodeRole.contains("d")) {
dataNodeName = nodeInfo.name;
break;
}
}
if (dataNodeName == null) {
throw new RuntimeException("no data node found");
}
final String stopSearcherNodeName = dataNodeName;

// create index
ClusterHealthResponse chResponse = highLevelClient().cluster().health(new ClusterHealthRequest(), RequestOptions.DEFAULT);
Settings settings = Settings.builder()
Expand All @@ -141,7 +160,10 @@ public void testKillSearcherThenPeerRecovery() throws Exception {
// wait cluster health turns to be yellow
assertBusy(() -> {
// stop searcher
Request request = new Request("POST", "/_havenask/stop?role=searcher");
Request request = new Request(
"POST",
String.format(Locale.ROOT, "/_havenask/stop?role=searcher&node=%s", stopSearcherNodeName)
);
Response response = highLevelClient().getLowLevelClient().performRequest(request);
assertEquals(200, response.getStatusLine().getStatusCode());
ClusterHealthResponse clusterHealthResponse = highLevelClient().cluster()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (c) 2021, Alibaba Group;
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.havenask.engine.stop.action;

import org.havenask.action.support.nodes.BaseNodeRequest;
import org.havenask.common.io.stream.StreamInput;
import org.havenask.common.io.stream.StreamOutput;

import java.io.IOException;

public class HavenaskStopNodeRequest extends BaseNodeRequest {
HavenaskStopRequest request;

public HavenaskStopNodeRequest(StreamInput in) throws IOException {
super(in);
request = new HavenaskStopRequest(in);
}

public HavenaskStopNodeRequest(HavenaskStopRequest havenaskStopRequest) {
this.request = havenaskStopRequest;
}

public String getRole() {
return request.getRole();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
request.writeTo(out);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright (c) 2021, Alibaba Group;
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.havenask.engine.stop.action;

import org.havenask.action.support.nodes.BaseNodeResponse;
import org.havenask.cluster.node.DiscoveryNode;
import org.havenask.common.io.stream.StreamInput;
import org.havenask.common.io.stream.StreamOutput;
import org.havenask.common.xcontent.ToXContent;
import org.havenask.common.xcontent.ToXContentObject;
import org.havenask.common.xcontent.XContentBuilder;

import java.io.IOException;

public class HavenaskStopNodeResponse extends BaseNodeResponse implements ToXContentObject {
private final String nodeName;
private final String result;
private final int resultCode;

public HavenaskStopNodeResponse(StreamInput in) throws IOException {
super(in);
result = in.readString();
resultCode = in.readInt();
nodeName = getNode().getName();
}

public HavenaskStopNodeResponse(DiscoveryNode node, String result, int resultCode) {
super(node);
this.result = result;
this.resultCode = resultCode;
this.nodeName = node.getName();
}

public String getResult() {
return result;
}

public int getResultCode() {
return resultCode;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(result);
out.writeInt(resultCode);
}

public static HavenaskStopNodeResponse readNodeResponse(StreamInput in) throws IOException {
return new HavenaskStopNodeResponse(in);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
// String to XContentBuilder
builder.startObject();
builder.field("result", result);
builder.endObject();
return builder;
}

@Override
public String toString() {
return " { \n"
+ " nodeName: "
+ nodeName
+ ";\n"
+ " result: "
+ result
+ "\n"
+ " resultCode: "
+ resultCode
+ "\n"
+ " }";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,33 @@

package org.havenask.engine.stop.action;

import org.havenask.action.ActionRequest;
import org.havenask.action.ActionRequestValidationException;
import org.havenask.action.support.nodes.BaseNodesRequest;
import org.havenask.common.io.stream.StreamInput;
import org.havenask.common.io.stream.StreamOutput;

import java.io.IOException;

import static org.havenask.action.ValidateActions.addValidationError;

public class HavenaskStopRequest extends ActionRequest {
public class HavenaskStopRequest extends BaseNodesRequest<HavenaskStopRequest> {

private String role;

public HavenaskStopRequest(String role) {
this.role = role;
}

public HavenaskStopRequest(StreamInput in) throws IOException {
super(in);
this.role = in.readString();
}

/**
* Execute stop action for nodes based on the specified nodes ids.
* If none ids are passed, all nodes will execute stop action.
*/
public HavenaskStopRequest(String role, String... nodesIds) {
super(nodesIds);
this.role = role;
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,48 +14,49 @@

package org.havenask.engine.stop.action;

import org.havenask.action.ActionResponse;
import org.havenask.action.FailedNodeException;
import org.havenask.action.support.nodes.BaseNodesResponse;
import org.havenask.cluster.ClusterName;
import org.havenask.common.io.stream.StreamInput;
import org.havenask.common.io.stream.StreamOutput;
import org.havenask.common.xcontent.ToXContentObject;
import org.havenask.common.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.List;

public class HavenaskStopResponse extends ActionResponse implements ToXContentObject {
private final String result;
private final int resultCode;
public class HavenaskStopResponse extends BaseNodesResponse<HavenaskStopNodeResponse> {
private List<HavenaskStopNodeResponse> nodeResponses;

public HavenaskStopResponse(StreamInput in) throws IOException {
result = in.readString();
resultCode = in.readInt();
super(in);
nodeResponses = getNodes();
}

public HavenaskStopResponse(String result, int resultCode) {
this.result = result;
this.resultCode = resultCode;
public HavenaskStopResponse(ClusterName clusterName, List<HavenaskStopNodeResponse> nodes, List<FailedNodeException> failures) {
super(clusterName, nodes, failures);
nodeResponses = getNodes();
}

public String getResult() {
return result;
}

public int getResultCode() {
return resultCode;
@Override
protected List<HavenaskStopNodeResponse> readNodesFrom(StreamInput in) throws IOException {
return in.readList(HavenaskStopNodeResponse::readNodeResponse);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(result);
out.writeInt(resultCode);
protected void writeNodesTo(StreamOutput out, List<HavenaskStopNodeResponse> nodes) throws IOException {
out.writeList(nodes);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
// String to XContentBuilder
builder.startObject();
builder.field("result", result);
builder.endObject();
return builder;
public String toString() {
StringBuilder resStr = new StringBuilder();
resStr.append("[\n");
for (int i = 0; i < nodeResponses.size(); i++) {
resStr.append(nodeResponses.get(i).toString());
if (i < nodeResponses.size() - 1) {
resStr.append(",\n");
}
}
resStr.append("\n]");
return resStr.toString();
}
}
Loading

0 comments on commit dd5db49

Please sign in to comment.