Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-26714 Introduce path configuration for system coprocessors #4069

Merged
merged 1 commit into from
Feb 4, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -213,7 +213,7 @@ public static void addJarFilesToJar(File targetJar,
LOG.info("Adding jar file to outer jar file completed");
}

static String localDirPath(Configuration conf) {
public static String localDirPath(Configuration conf) {
return conf.get(ClassLoaderBase.LOCAL_DIR_KEY)
+ File.separator + "jars" + File.separator;
}
Original file line number Diff line number Diff line change
@@ -46,6 +46,7 @@
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.CoprocessorClassLoader;
import org.apache.hadoop.hbase.util.SortedList;
import org.apache.hbase.thirdparty.com.google.common.base.Strings;

/**
* Provides the common setup framework and runtime services for coprocessor
@@ -144,23 +145,36 @@ protected void loadSystemCoprocessors(Configuration conf, String confKey) {

int currentSystemPriority = Coprocessor.PRIORITY_SYSTEM;
for (String className : defaultCPClasses) {
String[] classNameAndPriority = className.split("\\|");
// After HBASE-23710 and HBASE-26714 when configuring for system coprocessor, we accept
// an optional format of className|priority|path
String[] classNameToken = className.split("\\|");
boolean hasPriorityOverride = false;
className = classNameAndPriority[0];
boolean hasPath = false;
className = classNameToken[0];
int overridePriority = Coprocessor.PRIORITY_SYSTEM;
if (classNameAndPriority.length > 1){
overridePriority = Integer.parseInt(classNameAndPriority[1]);
Path path = null;
if (classNameToken.length > 1 && !Strings.isNullOrEmpty(classNameToken[1])) {
overridePriority = Integer.parseInt(classNameToken[1]);
hasPriorityOverride = true;
}
if (classNameToken.length > 2 && !Strings.isNullOrEmpty(classNameToken[2])) {
path = new Path(classNameToken[2].trim());
hasPath = true;
}
className = className.trim();
if (findCoprocessor(className) != null) {
// If already loaded will just continue
LOG.warn("Attempted duplicate loading of " + className + "; skipped");
continue;
}
ClassLoader cl = this.getClass().getClassLoader();
Thread.currentThread().setContextClassLoader(cl);
try {
// override the class loader if a path for the system coprocessor is provided.
if (hasPath) {
cl = CoprocessorClassLoader.getClassLoader(path, this.getClass().getClassLoader(),
pathPrefix, conf);
}
Thread.currentThread().setContextClassLoader(cl);
implClass = cl.loadClass(className);
int coprocPriority = hasPriorityOverride ? overridePriority : currentSystemPriority;
// Add coprocessors as we go to guard against case where a coprocessor is specified twice
Original file line number Diff line number Diff line change
@@ -20,15 +20,18 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

import java.io.File;
import java.lang.reflect.InvocationTargetException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.ClassLoaderTestHelper;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
@@ -41,6 +44,8 @@ public class TestCoprocessorHost {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestCoprocessorHost.class);

private static final HBaseCommonTestingUtil TEST_UTIL = new HBaseCommonTestingUtil();

/**
* An {@link Abortable} implementation for tests.
*/
@@ -50,7 +55,7 @@ private static class TestAbortable implements Abortable {
@Override
public void abort(String why, Throwable e) {
this.aborted = true;
Assert.fail();
Assert.fail(e.getMessage());
}

@Override
@@ -97,6 +102,108 @@ public void testDoubleLoadingAndPriorityValue() {
assertEquals(overridePriority, simpleEnv_v3.getPriority());
}

@Test
public void testLoadSystemCoprocessorWithPath() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
final String key = "KEY";
final String testClassName = "TestSystemCoprocessor";
final String testClassNameWithPriorityAndPath = testClassName + "PriorityAndPath";

File jarFile = buildCoprocessorJar(testClassName);
File jarFileWithPriorityAndPath = buildCoprocessorJar(testClassNameWithPriorityAndPath);

try {
CoprocessorHost<RegionCoprocessor, CoprocessorEnvironment<RegionCoprocessor>> host;
host = new CoprocessorHostForTest<>(conf);

// make a string of coprocessor with only priority
int overridePriority = Integer.MAX_VALUE - 1;
final String coprocessorWithPriority =
SimpleRegionObserverV3.class.getName() + "|" + overridePriority;
// make a string of coprocessor with path but no priority
final String coprocessorWithPath =
String.format("%s|%s|%s", testClassName, "", jarFile.getAbsolutePath());
// make a string of coprocessor with priority and path
final String coprocessorWithPriorityAndPath = String
.format("%s|%s|%s", testClassNameWithPriorityAndPath, (overridePriority - 1),
jarFileWithPriorityAndPath.getAbsolutePath());

// Try and load a system coprocessors
conf.setStrings(key, SimpleRegionObserverV2.class.getName(), coprocessorWithPriority,
coprocessorWithPath, coprocessorWithPriorityAndPath);
host.loadSystemCoprocessors(conf, key);

// first loaded system coprocessor with default priority
CoprocessorEnvironment<?> simpleEnv =
host.findCoprocessorEnvironment(SimpleRegionObserverV2.class.getName());
assertNotNull(simpleEnv);
assertEquals(Coprocessor.PRIORITY_SYSTEM, simpleEnv.getPriority());

// external system coprocessor with default priority
CoprocessorEnvironment<?> coprocessorEnvironmentWithPath =
host.findCoprocessorEnvironment(testClassName);
assertNotNull(coprocessorEnvironmentWithPath);
assertEquals(Coprocessor.PRIORITY_SYSTEM + 1, coprocessorEnvironmentWithPath.getPriority());

// system coprocessor with configured priority
CoprocessorEnvironment<?> coprocessorEnvironmentWithPriority =
host.findCoprocessorEnvironment(SimpleRegionObserverV3.class.getName());
assertNotNull(coprocessorEnvironmentWithPriority);
assertEquals(overridePriority, coprocessorEnvironmentWithPriority.getPriority());

// external system coprocessor with override priority
CoprocessorEnvironment<?> coprocessorEnvironmentWithPriorityAndPath =
host.findCoprocessorEnvironment(testClassNameWithPriorityAndPath);
assertNotNull(coprocessorEnvironmentWithPriorityAndPath);
assertEquals(overridePriority - 1, coprocessorEnvironmentWithPriorityAndPath.getPriority());
} finally {
if (jarFile.exists()) {
jarFile.delete();
}
if (jarFileWithPriorityAndPath.exists()) {
jarFileWithPriorityAndPath.delete();
}
}
}

@Test(expected = AssertionError.class)
public void testLoadSystemCoprocessorWithPathDoesNotExist() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
final String key = "KEY";
final String testClassName = "TestSystemCoprocessor";

CoprocessorHost<RegionCoprocessor, CoprocessorEnvironment<RegionCoprocessor>> host;
host = new CoprocessorHostForTest<>(conf);

// make a string of coprocessor with path but no priority
final String coprocessorWithPath = testClassName + "||" + testClassName + ".jar";

// Try and load a system coprocessors
conf.setStrings(key, coprocessorWithPath);
// when loading non-exist with CoprocessorHostForTest host, it aborts with AssertionError
host.loadSystemCoprocessors(conf, key);
}

@Test(expected = AssertionError.class)
public void testLoadSystemCoprocessorWithPathDoesNotExistAndPriority() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
final String key = "KEY";
final String testClassName = "TestSystemCoprocessor";

CoprocessorHost<RegionCoprocessor, CoprocessorEnvironment<RegionCoprocessor>> host;
host = new CoprocessorHostForTest<>(conf);

int overridePriority = Integer.MAX_VALUE - 1;
// make a string of coprocessor with path and priority
final String coprocessor =
testClassName + "|" + overridePriority + "|" + testClassName + ".jar";

// Try and load a system coprocessors
conf.setStrings(key, coprocessor);
// when loading non-exist coprocessor, it aborts with AssertionError
host.loadSystemCoprocessors(conf, key);
}

public static class SimpleRegionObserverV2 extends SimpleRegionObserver { }

public static class SimpleRegionObserverV3 extends SimpleRegionObserver {
@@ -128,4 +235,11 @@ public CoprocessorEnvironment<E> createEnvironment(final E instance, final int p
return new BaseEnvironment<>(instance, priority, 0, cpHostConf);
}
}

private File buildCoprocessorJar(String className) throws Exception {
String dataTestDir = TEST_UTIL.getDataTestDir().toString();
String code = String.format("import org.apache.hadoop.hbase.coprocessor.*; public class %s"
+ " implements RegionCoprocessor {}", className);
return ClassLoaderTestHelper.buildJar(dataTestDir, className, code);
}
}