Skip to content

Commit

Permalink
HubSpot Backport: HBASE-27390 getClusterMetrics NullPointerException …
Browse files Browse the repository at this point in the history
…when ServerTask status null (apache#4853)

Signed-off-by: Bryan Beaudreault <[email protected]>
  • Loading branch information
briaugenreich authored and bbeaudreault committed Dec 15, 2022
1 parent d8db95b commit 120866a
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ public class MonitoredRPCHandlerImpl extends MonitoredTaskImpl implements Monito
private boolean snapshot = false;
private Map<String, Object> callInfoMap = new HashMap<>();

public MonitoredRPCHandlerImpl() {
super(false);
public MonitoredRPCHandlerImpl(String description) {
super(false, description);
// in this implementation, WAITING indicates that the handler is not
// actively servicing an RPC call.
setState(State.WAITING);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.hadoop.hbase.util.GsonUtil;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hbase.thirdparty.com.google.gson.Gson;

Expand All @@ -46,11 +47,13 @@ class MonitoredTaskImpl implements MonitoredTask {

private static final Gson GSON = GsonUtil.createGson().create();

public MonitoredTaskImpl(boolean enableJournal) {
public MonitoredTaskImpl(boolean enableJournal, String description) {
startTime = EnvironmentEdgeManager.currentTime();
statusTime = startTime;
stateTime = startTime;
warnTime = startTime;
this.description = description;
this.status = "status unset";
if (enableJournal) {
journal = new ConcurrentLinkedQueue<>();
} else {
Expand Down Expand Up @@ -164,6 +167,7 @@ public void abort(String msg) {

@Override
public void setStatus(String status) {
Preconditions.checkNotNull(status, "Status is null");
this.status = status;
statusTime = EnvironmentEdgeManager.currentTime();
if (journal != null) {
Expand All @@ -178,6 +182,7 @@ protected void setState(State state) {

@Override
public void setDescription(String description) {
Preconditions.checkNotNull(description, "Description is null");
this.description = description;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ public MonitoredTask createStatus(String description) {
}

public synchronized MonitoredTask createStatus(String description, boolean enableJournal) {
MonitoredTask stat = new MonitoredTaskImpl(enableJournal);
stat.setDescription(description);
MonitoredTask stat = new MonitoredTaskImpl(enableJournal, description);
MonitoredTask proxy = (MonitoredTask) Proxy.newProxyInstance(stat.getClass().getClassLoader(),
new Class<?>[] { MonitoredTask.class }, new PassthroughInvocationHandler<>(stat));
TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy);
Expand All @@ -102,8 +101,7 @@ public synchronized MonitoredTask createStatus(String description, boolean enabl
}

public synchronized MonitoredRPCHandler createRPCStatus(String description) {
MonitoredRPCHandler stat = new MonitoredRPCHandlerImpl();
stat.setDescription(description);
MonitoredRPCHandler stat = new MonitoredRPCHandlerImpl(description);
MonitoredRPCHandler proxy =
(MonitoredRPCHandler) Proxy.newProxyInstance(stat.getClass().getClassLoader(),
new Class<?>[] { MonitoredRPCHandler.class }, new PassthroughInvocationHandler<>(stat));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void testSimpleCall() {

TraceUtil.trace(() -> {
CallRunner cr = new CallRunner(mockRpcServer, mockCall);
cr.setStatus(new MonitoredRPCHandlerImpl());
cr.setStatus(new MonitoredRPCHandlerImpl("test"));
cr.run();
}, testName.getMethodName());

Expand All @@ -101,7 +101,7 @@ public void testCallCleanup() {

TraceUtil.trace(() -> {
CallRunner cr = new CallRunner(mockRpcServer, mockCall);
cr.setStatus(new MonitoredRPCHandlerImpl());
cr.setStatus(new MonitoredRPCHandlerImpl("test"));
cr.run();
}, testName.getMethodName());
Mockito.verify(mockCall, Mockito.times(1)).cleanup();
Expand All @@ -116,7 +116,7 @@ public void testCallRunnerDropDisconnected() {

TraceUtil.trace(() -> {
CallRunner cr = new CallRunner(mockRpcServer, mockCall);
cr.setStatus(new MonitoredRPCHandlerImpl());
cr.setStatus(new MonitoredRPCHandlerImpl("test"));
cr.drop();
}, testName.getMethodName());
Mockito.verify(mockCall, Mockito.times(1)).cleanup();
Expand All @@ -142,7 +142,7 @@ public void testCallRunnerDropConnected() {

TraceUtil.trace(() -> {
CallRunner cr = new CallRunner(mockRpcServer, mockCall);
cr.setStatus(new MonitoredRPCHandlerImpl());
cr.setStatus(new MonitoredRPCHandlerImpl("test"));
cr.drop();
}, testName.getMethodName());
Mockito.verify(mockCall, Mockito.times(1)).cleanup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public void testCallQueueInfo() throws IOException, InterruptedException {

for (int i = totalCallMethods; i > 0; i--) {
CallRunner task = createMockTask();
task.setStatus(new MonitoredRPCHandlerImpl());
task.setStatus(new MonitoredRPCHandlerImpl("test"));

if (!scheduler.dispatch(task)) {
unableToDispatch++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void testBasic() throws IOException, InterruptedException {
scheduler.init(CONTEXT);
scheduler.start();
CallRunner task = createMockTask();
task.setStatus(new MonitoredRPCHandlerImpl());
task.setStatus(new MonitoredRPCHandlerImpl("test"));
scheduler.dispatch(task);
verify(task, timeout(10000)).run();
scheduler.stop();
Expand Down Expand Up @@ -164,7 +164,7 @@ public void testCallQueueInfo() throws IOException, InterruptedException {
int totalCallMethods = 10;
for (int i = totalCallMethods; i > 0; i--) {
CallRunner task = createMockTask();
task.setStatus(new MonitoredRPCHandlerImpl());
task.setStatus(new MonitoredRPCHandlerImpl("test"));
scheduler.dispatch(task);
}

Expand Down Expand Up @@ -205,7 +205,7 @@ public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
}
};
for (CallRunner task : tasks) {
task.setStatus(new MonitoredRPCHandlerImpl());
task.setStatus(new MonitoredRPCHandlerImpl("test"));
doAnswer(answerToRun).when(task).run();
}

Expand Down Expand Up @@ -513,7 +513,7 @@ public void testScanQueues() throws Exception {

private void doAnswerTaskExecution(final CallRunner callTask, final ArrayList<Integer> results,
final int value, final int sleepInterval) {
callTask.setStatus(new MonitoredRPCHandlerImpl());
callTask.setStatus(new MonitoredRPCHandlerImpl("test"));
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public void testTaskMonitorBasics() {
assertEquals(task.getDescription(), taskFromTm.getDescription());
assertEquals(-1, taskFromTm.getCompletionTimestamp());
assertEquals(MonitoredTask.State.RUNNING, taskFromTm.getState());
assertEquals(task.getStatus(), taskFromTm.getStatus());
assertEquals("status unset", taskFromTm.getStatus());

// Mark it as finished
task.markComplete("Finished!");
Expand Down Expand Up @@ -229,7 +231,7 @@ public void testStatusJournal() {

@Test
public void testClone() throws Exception {
MonitoredRPCHandlerImpl monitor = new MonitoredRPCHandlerImpl();
MonitoredRPCHandlerImpl monitor = new MonitoredRPCHandlerImpl("test");
monitor.abort("abort RPC");
TestParam testParam = new TestParam("param1");
monitor.setRPC("method1", new Object[] { testParam }, 0);
Expand Down

0 comments on commit 120866a

Please sign in to comment.