Skip to content

Commit

Permalink
HBASE-24260 Add a ClusterManager that issues commands via coprocessor
Browse files Browse the repository at this point in the history
Implements `ClusterManager` that relies on the new
`ShellExecEndpointCoprocessor` for remote shell command execution.

Signed-off-by: Bharath Vissapragada <[email protected]>
  • Loading branch information
ndimiduk committed May 4, 2020
1 parent 337529b commit 47dca8e
Show file tree
Hide file tree
Showing 15 changed files with 569 additions and 43 deletions.
42 changes: 42 additions & 0 deletions hbase-endpoint/src/main/protobuf/ShellExecEndpoint.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/*
* Opens a tunnel for remote shell execution on the target server. Used by `CoprocClusterManager`.
*/

syntax = "proto2";
option java_package = "org.apache.hadoop.hbase.coprocessor.protobuf.generated";
option java_outer_classname = "ShellExecEndpoint";
option java_generic_services = true;
option java_generate_equals_and_hash = true;

message ShellExecRequest {
required string command = 1;
optional bool await_response = 2;
}

message ShellExecResponse {
optional int32 exit_code = 1;
optional string stdout = 2;
optional string stderr = 3;
}

service ShellExecService {
rpc shell_exec(ShellExecRequest) returns(ShellExecResponse);
}
16 changes: 15 additions & 1 deletion hbase-it/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,10 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-endpoint</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase.thirdparty</groupId>
<artifactId>hbase-shaded-miscellaneous</artifactId>
Expand Down Expand Up @@ -271,7 +275,17 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>compile</scope>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;

import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.client.AsyncAdmin;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ShellExecEndpoint.ShellExecRequest;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ShellExecEndpoint.ShellExecResponse;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ShellExecEndpoint.ShellExecService;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Overrides commands to make use of coprocessor where possible. Only supports actions taken
* against Master and Region Server hosts.
*/
@InterfaceAudience.Private
@SuppressWarnings("unused") // no way to test this without a distributed cluster.
public class CoprocClusterManager extends HBaseClusterManager {
private static final Logger LOG = LoggerFactory.getLogger(CoprocClusterManager.class);
private static final Set<ServiceType> supportedServices = buildSupportedServicesSet();

@Override
protected Pair<Integer, String> exec(String hostname, ServiceType service, String... cmd)
throws IOException {
if (!supportedServices.contains(service)) {
throw unsupportedServiceType(service);
}

// We only support actions vs. Master or Region Server processes. We're issuing those actions
// via the coprocessor that's running within those processes. Thus, there's no support for
// honoring the configured service user.
final String command = StringUtils.join(cmd, " ");
LOG.info("Executing remote command: {}, hostname:{}", command, hostname);

try (final AsyncConnection conn = ConnectionFactory.createAsyncConnection(getConf()).join()) {
final AsyncAdmin admin = conn.getAdmin();
final ShellExecRequest req = ShellExecRequest.newBuilder()
.setCommand(command)
.setAwaitResponse(false)
.build();

final ShellExecResponse resp;
switch(service) {
case HBASE_MASTER:
// What happens if the intended action was killing a backup master? Right now we have
// no `RestartBackupMasterAction` so it's probably fine.
resp = masterExec(admin, req);
break;
case HBASE_REGIONSERVER:
final ServerName targetHost = resolveRegionServerName(admin, hostname);
resp = regionServerExec(admin, req, targetHost);
break;
default:
throw new RuntimeException("should not happen");
}

if (LOG.isDebugEnabled()) {
LOG.debug("Executed remote command: {}, exit code:{} , output:{}", command, resp.getExitCode(),
resp.getStdout());
} else {
LOG.info("Executed remote command: {}, exit code:{}", command, resp.getExitCode());
}
return new Pair<>(resp.getExitCode(), resp.getStdout());
}
}

private static Set<ServiceType> buildSupportedServicesSet() {
final Set<ServiceType> set = new HashSet<>();
set.add(ServiceType.HBASE_MASTER);
set.add(ServiceType.HBASE_REGIONSERVER);
return Collections.unmodifiableSet(set);
}

private static ShellExecResponse masterExec(final AsyncAdmin admin,
final ShellExecRequest req) {
// TODO: Admin API provides no means of sending exec to a backup master.
return admin.<ShellExecService.Stub, ShellExecResponse>coprocessorService(
ShellExecService::newStub,
(stub, controller, callback) -> stub.shellExec(controller, req, callback))
.join();
}

private static ShellExecResponse regionServerExec(final AsyncAdmin admin,
final ShellExecRequest req, final ServerName targetHost) {
return admin.<ShellExecService.Stub, ShellExecResponse>coprocessorService(
ShellExecService::newStub,
(stub, controller, callback) -> stub.shellExec(controller, req, callback),
targetHost)
.join();
}

private static ServerName resolveRegionServerName(final AsyncAdmin admin,
final String hostname) {
return admin.getRegionServers()
.thenApply(names -> names.stream()
.filter(sn -> Objects.equals(sn.getHostname(), hostname))
.findAny())
.join()
.orElseThrow(() -> serverNotFound(hostname));
}

private static RuntimeException serverNotFound(final String hostname) {
return new RuntimeException(
String.format("Did not find %s amongst the servers known to the client.", hostname));
}

private static RuntimeException unsupportedServiceType(final ServiceType serviceType) {
return new RuntimeException(
String.format("Unable to service request for service=%s", serviceType));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.io.IOException;
import java.util.Locale;
import java.util.Map;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
Expand All @@ -45,9 +44,12 @@
*/
@InterfaceAudience.Private
public class HBaseClusterManager extends Configured implements ClusterManager {
private static final String SIGKILL = "SIGKILL";
private static final String SIGSTOP = "SIGSTOP";
private static final String SIGCONT = "SIGCONT";

protected enum Signal {
SIGKILL,
SIGSTOP,
SIGCONT,
}

protected static final Logger LOG = LoggerFactory.getLogger(HBaseClusterManager.class);
private String sshUserName;
Expand Down Expand Up @@ -107,7 +109,7 @@ public void setConf(Configuration conf) {
.setSleepInterval(conf.getLong(RETRY_SLEEP_INTERVAL_KEY, DEFAULT_RETRY_SLEEP_INTERVAL)));
}

private String getServiceUser(ServiceType service) {
protected String getServiceUser(ServiceType service) {
Configuration conf = getConf();
switch (service) {
case HADOOP_DATANODE:
Expand Down Expand Up @@ -329,9 +331,9 @@ protected CommandProvider getCommandProvider(ServiceType service) throws IOExcep
* @return pair of exit code and command output
* @throws IOException if something goes wrong.
*/
private Pair<Integer, String> exec(String hostname, ServiceType service, String... cmd)
protected Pair<Integer, String> exec(String hostname, ServiceType service, String... cmd)
throws IOException {
LOG.info("Executing remote command: {} , hostname:{}", StringUtils.join(cmd, " "),
LOG.info("Executing remote command: {}, hostname:{}", StringUtils.join(cmd, " "),
hostname);

RemoteShell shell = new RemoteShell(hostname, getServiceUser(service), cmd);
Expand Down Expand Up @@ -444,8 +446,9 @@ public void restart(ServiceType service, String hostname, int port) throws IOExc
exec(hostname, service, Operation.RESTART);
}

public void signal(ServiceType service, String signal, String hostname) throws IOException {
execWithRetries(hostname, service, getCommandProvider(service).signalCommand(service, signal));
public void signal(ServiceType service, Signal signal, String hostname) throws IOException {
execWithRetries(hostname, service,
getCommandProvider(service).signalCommand(service, signal.toString()));
}

@Override
Expand All @@ -457,16 +460,16 @@ public boolean isRunning(ServiceType service, String hostname, int port) throws

@Override
public void kill(ServiceType service, String hostname, int port) throws IOException {
signal(service, SIGKILL, hostname);
signal(service, Signal.SIGKILL, hostname);
}

@Override
public void suspend(ServiceType service, String hostname, int port) throws IOException {
signal(service, SIGSTOP, hostname);
signal(service, Signal.SIGSTOP, hostname);
}

@Override
public void resume(ServiceType service, String hostname, int port) throws IOException {
signal(service, SIGCONT, hostname);
signal(service, Signal.SIGCONT, hostname);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ public void createDistributedHBaseCluster() throws IOException {
HConstants.HBASE_DIR, conf.get(HConstants.HBASE_DIR));
Class<? extends ClusterManager> clusterManagerClass = conf.getClass(HBASE_CLUSTER_MANAGER_CLASS,
DEFAULT_HBASE_CLUSTER_MANAGER_CLASS, ClusterManager.class);
LOG.info("Instantiating {}", clusterManagerClass.getName());
ClusterManager clusterManager = ReflectionUtils.newInstance(
clusterManagerClass, conf);
setHBaseCluster(new DistributedHBaseCluster(conf, clusterManager));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ public class RESTApiClusterManager extends Configured implements ClusterManager
"hbase.it.clustermanager.restapi.password";
private static final String REST_API_CLUSTER_MANAGER_CLUSTER_NAME =
"hbase.it.clustermanager.restapi.clustername";
private static final String REST_API_DELEGATE_CLUSTER_MANAGER =
"hbase.it.clustermanager.restapi.delegate";

private static final JsonParser parser = new JsonParser();

Expand All @@ -86,8 +88,6 @@ public class RESTApiClusterManager extends Configured implements ClusterManager
// Fields for the hostname, username, password, and cluster name of the cluster management server
// to be used.
private String serverHostname;
private String serverUsername;
private String serverPassword;
private String clusterName;

// Each version of Cloudera Manager supports a particular API versions. Version 6 of this API
Expand All @@ -103,10 +103,7 @@ public class RESTApiClusterManager extends Configured implements ClusterManager

private static final Logger LOG = LoggerFactory.getLogger(RESTApiClusterManager.class);

RESTApiClusterManager() {
hBaseClusterManager = ReflectionUtils.newInstance(HBaseClusterManager.class,
new IntegrationTestingUtility().getConfiguration());
}
RESTApiClusterManager() { }

@Override
public void setConf(Configuration conf) {
Expand All @@ -115,12 +112,17 @@ public void setConf(Configuration conf) {
// `Configured()` constructor calls `setConf(null)` before calling again with a real value.
return;
}

final Class<? extends ClusterManager> clazz = conf.getClass(REST_API_DELEGATE_CLUSTER_MANAGER,
HBaseClusterManager.class, ClusterManager.class);
hBaseClusterManager = ReflectionUtils.newInstance(clazz, conf);

serverHostname = conf.get(REST_API_CLUSTER_MANAGER_HOSTNAME, DEFAULT_SERVER_HOSTNAME);
serverUsername = conf.get(REST_API_CLUSTER_MANAGER_USERNAME, DEFAULT_SERVER_USERNAME);
serverPassword = conf.get(REST_API_CLUSTER_MANAGER_PASSWORD, DEFAULT_SERVER_PASSWORD);
clusterName = conf.get(REST_API_CLUSTER_MANAGER_CLUSTER_NAME, DEFAULT_CLUSTER_NAME);

// Add filter to Client instance to enable server authentication.
String serverUsername = conf.get(REST_API_CLUSTER_MANAGER_USERNAME, DEFAULT_SERVER_USERNAME);
String serverPassword = conf.get(REST_API_CLUSTER_MANAGER_PASSWORD, DEFAULT_SERVER_PASSWORD);
client.register(HttpAuthenticationFeature.basic(serverUsername, serverPassword));
}

Expand Down
Loading

0 comments on commit 47dca8e

Please sign in to comment.