Skip to content

Commit

Permalink
HBASE-26714 Introduce path configuration for system coprocessors (#4069)
Browse files Browse the repository at this point in the history
Signed-off-by: Ankit Singhal <[email protected]>
Signed-off-by: Wellington Ramos Chevreuil <[email protected]>
  • Loading branch information
taklwu authored Feb 4, 2022
1 parent 3da23c2 commit e848d3b
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public static void addJarFilesToJar(File targetJar,
LOG.info("Adding jar file to outer jar file completed");
}

static String localDirPath(Configuration conf) {
public static String localDirPath(Configuration conf) {
return conf.get(ClassLoaderBase.LOCAL_DIR_KEY)
+ File.separator + "jars" + File.separator;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.CoprocessorClassLoader;
import org.apache.hadoop.hbase.util.SortedList;
import org.apache.hbase.thirdparty.com.google.common.base.Strings;

/**
* Provides the common setup framework and runtime services for coprocessor
Expand Down Expand Up @@ -144,23 +145,36 @@ protected void loadSystemCoprocessors(Configuration conf, String confKey) {

int currentSystemPriority = Coprocessor.PRIORITY_SYSTEM;
for (String className : defaultCPClasses) {
String[] classNameAndPriority = className.split("\\|");
// After HBASE-23710 and HBASE-26714 when configuring for system coprocessor, we accept
// an optional format of className|priority|path
String[] classNameToken = className.split("\\|");
boolean hasPriorityOverride = false;
className = classNameAndPriority[0];
boolean hasPath = false;
className = classNameToken[0];
int overridePriority = Coprocessor.PRIORITY_SYSTEM;
if (classNameAndPriority.length > 1){
overridePriority = Integer.parseInt(classNameAndPriority[1]);
Path path = null;
if (classNameToken.length > 1 && !Strings.isNullOrEmpty(classNameToken[1])) {
overridePriority = Integer.parseInt(classNameToken[1]);
hasPriorityOverride = true;
}
if (classNameToken.length > 2 && !Strings.isNullOrEmpty(classNameToken[2])) {
path = new Path(classNameToken[2].trim());
hasPath = true;
}
className = className.trim();
if (findCoprocessor(className) != null) {
// If already loaded will just continue
LOG.warn("Attempted duplicate loading of " + className + "; skipped");
continue;
}
ClassLoader cl = this.getClass().getClassLoader();
Thread.currentThread().setContextClassLoader(cl);
try {
// override the class loader if a path for the system coprocessor is provided.
if (hasPath) {
cl = CoprocessorClassLoader.getClassLoader(path, this.getClass().getClassLoader(),
pathPrefix, conf);
}
Thread.currentThread().setContextClassLoader(cl);
implClass = cl.loadClass(className);
int coprocPriority = hasPriorityOverride ? overridePriority : currentSystemPriority;
// Add coprocessors as we go to guard against case where a coprocessor is specified twice
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,18 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

import java.io.File;
import java.lang.reflect.InvocationTargetException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.ClassLoaderTestHelper;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
Expand All @@ -41,6 +44,8 @@ public class TestCoprocessorHost {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestCoprocessorHost.class);

private static final HBaseCommonTestingUtil TEST_UTIL = new HBaseCommonTestingUtil();

/**
* An {@link Abortable} implementation for tests.
*/
Expand All @@ -50,7 +55,7 @@ private static class TestAbortable implements Abortable {
@Override
public void abort(String why, Throwable e) {
this.aborted = true;
Assert.fail();
Assert.fail(e.getMessage());
}

@Override
Expand Down Expand Up @@ -97,6 +102,108 @@ public void testDoubleLoadingAndPriorityValue() {
assertEquals(overridePriority, simpleEnv_v3.getPriority());
}

@Test
public void testLoadSystemCoprocessorWithPath() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
final String key = "KEY";
final String testClassName = "TestSystemCoprocessor";
final String testClassNameWithPriorityAndPath = testClassName + "PriorityAndPath";

File jarFile = buildCoprocessorJar(testClassName);
File jarFileWithPriorityAndPath = buildCoprocessorJar(testClassNameWithPriorityAndPath);

try {
CoprocessorHost<RegionCoprocessor, CoprocessorEnvironment<RegionCoprocessor>> host;
host = new CoprocessorHostForTest<>(conf);

// make a string of coprocessor with only priority
int overridePriority = Integer.MAX_VALUE - 1;
final String coprocessorWithPriority =
SimpleRegionObserverV3.class.getName() + "|" + overridePriority;
// make a string of coprocessor with path but no priority
final String coprocessorWithPath =
String.format("%s|%s|%s", testClassName, "", jarFile.getAbsolutePath());
// make a string of coprocessor with priority and path
final String coprocessorWithPriorityAndPath = String
.format("%s|%s|%s", testClassNameWithPriorityAndPath, (overridePriority - 1),
jarFileWithPriorityAndPath.getAbsolutePath());

// Try and load a system coprocessors
conf.setStrings(key, SimpleRegionObserverV2.class.getName(), coprocessorWithPriority,
coprocessorWithPath, coprocessorWithPriorityAndPath);
host.loadSystemCoprocessors(conf, key);

// first loaded system coprocessor with default priority
CoprocessorEnvironment<?> simpleEnv =
host.findCoprocessorEnvironment(SimpleRegionObserverV2.class.getName());
assertNotNull(simpleEnv);
assertEquals(Coprocessor.PRIORITY_SYSTEM, simpleEnv.getPriority());

// external system coprocessor with default priority
CoprocessorEnvironment<?> coprocessorEnvironmentWithPath =
host.findCoprocessorEnvironment(testClassName);
assertNotNull(coprocessorEnvironmentWithPath);
assertEquals(Coprocessor.PRIORITY_SYSTEM + 1, coprocessorEnvironmentWithPath.getPriority());

// system coprocessor with configured priority
CoprocessorEnvironment<?> coprocessorEnvironmentWithPriority =
host.findCoprocessorEnvironment(SimpleRegionObserverV3.class.getName());
assertNotNull(coprocessorEnvironmentWithPriority);
assertEquals(overridePriority, coprocessorEnvironmentWithPriority.getPriority());

// external system coprocessor with override priority
CoprocessorEnvironment<?> coprocessorEnvironmentWithPriorityAndPath =
host.findCoprocessorEnvironment(testClassNameWithPriorityAndPath);
assertNotNull(coprocessorEnvironmentWithPriorityAndPath);
assertEquals(overridePriority - 1, coprocessorEnvironmentWithPriorityAndPath.getPriority());
} finally {
if (jarFile.exists()) {
jarFile.delete();
}
if (jarFileWithPriorityAndPath.exists()) {
jarFileWithPriorityAndPath.delete();
}
}
}

@Test(expected = AssertionError.class)
public void testLoadSystemCoprocessorWithPathDoesNotExist() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
final String key = "KEY";
final String testClassName = "TestSystemCoprocessor";

CoprocessorHost<RegionCoprocessor, CoprocessorEnvironment<RegionCoprocessor>> host;
host = new CoprocessorHostForTest<>(conf);

// make a string of coprocessor with path but no priority
final String coprocessorWithPath = testClassName + "||" + testClassName + ".jar";

// Try and load a system coprocessors
conf.setStrings(key, coprocessorWithPath);
// when loading non-exist with CoprocessorHostForTest host, it aborts with AssertionError
host.loadSystemCoprocessors(conf, key);
}

@Test(expected = AssertionError.class)
public void testLoadSystemCoprocessorWithPathDoesNotExistAndPriority() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
final String key = "KEY";
final String testClassName = "TestSystemCoprocessor";

CoprocessorHost<RegionCoprocessor, CoprocessorEnvironment<RegionCoprocessor>> host;
host = new CoprocessorHostForTest<>(conf);

int overridePriority = Integer.MAX_VALUE - 1;
// make a string of coprocessor with path and priority
final String coprocessor =
testClassName + "|" + overridePriority + "|" + testClassName + ".jar";

// Try and load a system coprocessors
conf.setStrings(key, coprocessor);
// when loading non-exist coprocessor, it aborts with AssertionError
host.loadSystemCoprocessors(conf, key);
}

public static class SimpleRegionObserverV2 extends SimpleRegionObserver { }

public static class SimpleRegionObserverV3 extends SimpleRegionObserver {
Expand Down Expand Up @@ -128,4 +235,11 @@ public CoprocessorEnvironment<E> createEnvironment(final E instance, final int p
return new BaseEnvironment<>(instance, priority, 0, cpHostConf);
}
}

private File buildCoprocessorJar(String className) throws Exception {
String dataTestDir = TEST_UTIL.getDataTestDir().toString();
String code = String.format("import org.apache.hadoop.hbase.coprocessor.*; public class %s"
+ " implements RegionCoprocessor {}", className);
return ClassLoaderTestHelper.buildJar(dataTestDir, className, code);
}
}

0 comments on commit e848d3b

Please sign in to comment.