Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-19763 Fixed Checkstyle errors in hbase-procedure #167

Merged
merged 1 commit into from
Apr 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions hbase-procedure/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@
<groupId>net.revelc.code</groupId>
<artifactId>warbucks-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<configuration>
<failOnViolation>true</failOnViolation>
</configuration>
</plugin>
</plugins>
</build>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hbase.procedure2;

import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.TimeUnit;

import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
Expand Down Expand Up @@ -279,7 +278,6 @@ protected void wakeProcedure(final Procedure procedure) {
push(procedure, /* addFront= */ true, /* notify= */false);
}


// ==========================================================================
// Internal helpers
// ==========================================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hbase.procedure2;

import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.apache.hadoop.hbase.HBaseIOException;

@InterfaceAudience.Private
@InterfaceStability.Stable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hbase.procedure2;

import java.util.List;

import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.procedure2.LockedResourceType;

@InterfaceAudience.Private
public class LockedResource {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hbase.procedure2;

import java.io.IOException;
Expand Down Expand Up @@ -209,11 +208,12 @@ protected final void skipPersistence() {
* of the execution.
* @param env the environment passed to the ProcedureExecutor
* @return a set of sub-procedures to run or ourselves if there is more work to do or null if the
* procedure is done.
* @throws ProcedureYieldException the procedure will be added back to the queue and retried later.
* procedure is done.
* @throws ProcedureYieldException the procedure will be added back to the queue and retried
* later.
* @throws InterruptedException the procedure will be added back to the queue and retried later.
* @throws ProcedureSuspendedException Signal to the executor that Procedure has suspended itself and
* has set itself up waiting for an external event to wake it back up again.
* @throws ProcedureSuspendedException Signal to the executor that Procedure has suspended itself
* and has set itself up waiting for an external event to wake it back up again.
*/
protected abstract Procedure<TEnvironment>[] execute(TEnvironment env)
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException;
Expand Down Expand Up @@ -468,7 +468,7 @@ protected StringBuilder toStringSimpleSB() {
sb.append(getParentProcId());
}

/**
/*
* TODO
* Enable later when this is being used.
* Currently owner not used.
Expand Down Expand Up @@ -710,7 +710,7 @@ protected void setResult(byte[] result) {
/**
* Will only be called when loading procedures from procedure store, where we need to record
* whether the procedure has already held a lock. Later we will call
* {@link #restoreLock(Object, ProcedureStore)} to actually acquire the lock.
* {@link #restoreLock(Object)} to actually acquire the lock.
*/
final void lockedWhenLoading() {
this.lockedWhenLoading = true;
Expand Down Expand Up @@ -764,7 +764,7 @@ public synchronized boolean isSuccess() {

/**
* @return true if the procedure is finished. The Procedure may be completed successfully or
* rolledback.
* rolledback.
*/
public synchronized boolean isFinished() {
return isSuccess() || state == ProcedureState.ROLLEDBACK;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hbase.procedure2;

import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.procedure2.Procedure;

import java.util.ArrayDeque;

import org.apache.yetus.audience.InterfaceAudience;

/**
* Type class.
* For conceptual purpose only. Seeing ProcedureDeque as type instead of just ArrayDeque gives
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hbase.procedure2;

import org.apache.hadoop.hbase.exceptions.HBaseException;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.apache.hadoop.hbase.exceptions.HBaseException;

@InterfaceAudience.Private
@InterfaceStability.Stable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,23 +433,21 @@ private void loadProcedures(ProcedureIterator procIter, boolean abortOnCorruptio
}
}

// add the nonce to the map
if (nonceKey != null) {
nonceKeysToProcIdsMap.put(nonceKey, procId);
nonceKeysToProcIdsMap.put(nonceKey, procId); // add the nonce to the map
}
}

// 2. Initialize the stacks
// In the old implementation, for procedures in FAILED state, we will push it into the
// ProcedureScheduler directly to execute the rollback. But this does not work after we
// introduce the restore lock stage.
// For now, when we acquire a xlock, we will remove the queue from runQueue in scheduler, and
// then when a procedure which has lock access, for example, a sub procedure of the procedure
// which has the xlock, is pushed into the scheduler, we will add the queue back to let the
// workers poll from it. The assumption here is that, the procedure which has the xlock should
// have been polled out already, so when loading we can not add the procedure to scheduler first
// and then call acquireLock, since the procedure is still in the queue, and since we will
// remove the queue from runQueue, then no one can poll it out, then there is a dead lock
// 2. Initialize the stacks: In the old implementation, for procedures in FAILED state, we will
// push it into the ProcedureScheduler directly to execute the rollback. But this does not work
// after we introduce the restore lock stage. For now, when we acquire a xlock, we will remove
// the queue from runQueue in scheduler, and then when a procedure which has lock access, for
// example, a sub procedure of the procedure which has the xlock, is pushed into the scheduler,
// we will add the queue back to let the workers poll from it. The assumption here is that, the
// procedure which has the xlock should have been polled out already, so when loading we can not
// add the procedure to scheduler first and then call acquireLock, since the procedure is still
// in the queue, and since we will remove the queue from runQueue, then no one can poll it out,
// then there is a dead lock
List<Procedure<TEnvironment>> runnableList = new ArrayList<>(runnableCount);
List<Procedure<TEnvironment>> failedList = new ArrayList<>(failedCount);
List<Procedure<TEnvironment>> waitingList = new ArrayList<>(waitingCount);
Expand All @@ -464,9 +462,7 @@ private void loadProcedures(ProcedureIterator procIter, boolean abortOnCorruptio
@SuppressWarnings("unchecked")
Procedure<TEnvironment> proc = procIter.next();
assert !(proc.isFinished() && !proc.hasParent()) : "unexpected completed proc=" + proc;

LOG.debug("Loading {}", proc);

Long rootProcId = getRootProcedureId(proc);
// The orphan procedures will be passed to handleCorrupted, so add an assert here
assert rootProcId != null;
Expand Down Expand Up @@ -508,14 +504,12 @@ private void loadProcedures(ProcedureIterator procIter, boolean abortOnCorruptio
// 3. Check the waiting procedures to see if some of them can be added to runnable.
waitingList.forEach(proc -> {
if (!proc.hasChildren()) {
// Normally, WAITING procedures should be waken by its children.
// But, there is a case that, all the children are successful and before
// they can wake up their parent procedure, the master was killed.
// So, during recovering the procedures from ProcedureWal, its children
// are not loaded because of their SUCCESS state.
// So we need to continue to run this WAITING procedure. But before
// executing, we need to set its state to RUNNABLE, otherwise, a exception
// will throw:
// Normally, WAITING procedures should be waken by its children. But, there is a case that,
// all the children are successful and before they can wake up their parent procedure, the
// master was killed. So, during recovering the procedures from ProcedureWal, its children
// are not loaded because of their SUCCESS state. So we need to continue to run this WAITING
// procedure. But before executing, we need to set its state to RUNNABLE, otherwise, a
// exception will throw:
// Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE,
// "NOT RUNNABLE! " + procedure.toString());
proc.setState(ProcedureState.RUNNABLE);
Expand Down Expand Up @@ -743,9 +737,9 @@ public boolean removeChore(ProcedureInMemoryChore<TEnvironment> chore) {
// Nonce Procedure helpers
// ==========================================================================
/**
* Create a NoneKey from the specified nonceGroup and nonce.
* @param nonceGroup
* @param nonce
* Create a NonceKey from the specified nonceGroup and nonce.
* @param nonceGroup the group to use for the {@link NonceKey}
* @param nonce the nonce to use in the {@link NonceKey}
* @return the generated NonceKey
*/
public NonceKey createNonceKey(final long nonceGroup, final long nonce) {
Expand All @@ -764,7 +758,9 @@ public NonceKey createNonceKey(final long nonceGroup, final long nonce) {
* @return the procId associated with the nonce, if any otherwise an invalid procId.
*/
public long registerNonce(final NonceKey nonceKey) {
if (nonceKey == null) return -1;
if (nonceKey == null) {
return -1;
}

// check if we have already a Reserved ID for the nonce
Long oldProcId = nonceKeysToProcIdsMap.get(nonceKey);
Expand All @@ -773,7 +769,9 @@ public long registerNonce(final NonceKey nonceKey) {
// and the procedure submitted with the specified nonce will use this ID.
final long newProcId = nextProcId();
oldProcId = nonceKeysToProcIdsMap.putIfAbsent(nonceKey, newProcId);
if (oldProcId == null) return -1;
if (oldProcId == null) {
return -1;
}
}

// we found a registered nonce, but the procedure may not have been submitted yet.
Expand All @@ -795,10 +793,14 @@ public long registerNonce(final NonceKey nonceKey) {
* @param nonceKey A unique identifier for this operation from the client or process.
*/
public void unregisterNonceIfProcedureWasNotSubmitted(final NonceKey nonceKey) {
if (nonceKey == null) return;
if (nonceKey == null) {
return;
}

final Long procId = nonceKeysToProcIdsMap.get(nonceKey);
if (procId == null) return;
if (procId == null) {
return;
}

// if the procedure was not submitted, remove the nonce
if (!(procedures.containsKey(procId) || completed.containsKey(procId))) {
Expand Down Expand Up @@ -1295,8 +1297,9 @@ private long nextProcId() {
if (procId < 0) {
while (!lastProcId.compareAndSet(procId, 0)) {
procId = lastProcId.get();
if (procId >= 0)
if (procId >= 0) {
break;
}
}
while (procedures.containsKey(procId)) {
procId = lastProcId.incrementAndGet();
Expand Down Expand Up @@ -2003,7 +2006,6 @@ protected boolean keepAlive(long lastUpdate) {
// A worker thread which can be added when core workers are stuck. Will timeout after
// keepAliveTime if there is no procedure to run.
private final class KeepAliveWorkerThread extends WorkerThread {

public KeepAliveWorkerThread(ThreadGroup group) {
super(group, "KeepAlivePEWorker-");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hbase.procedure2;

import java.util.Iterator;
Expand Down Expand Up @@ -117,7 +116,8 @@ public interface ProcedureScheduler {
List<LockedResource> getLocks();

/**
* @return {@link LockedResource} for resource of specified type & name. null if resource is not locked.
* @return {@link LockedResource} for resource of specified type & name. null if resource is not
* locked.
*/
LockedResource getLockResource(LockedResourceType resourceType, String resourceName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,19 @@
import java.lang.reflect.Modifier;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.util.NonceKey;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.protobuf.Any;
import org.apache.hbase.thirdparty.com.google.protobuf.Internal;
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.Parser;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;

import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
import org.apache.hadoop.hbase.util.NonceKey;

/**
* Helper to convert to/from ProcedureProtos
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ public void addNode(final TRemote key) {
* @param key the node identifier
*/
public void addOperationToNode(final TRemote key, RemoteProcedure rp)
throws NullTargetServerDispatchException, NoServerDispatchException, NoNodeDispatchException {
throws NullTargetServerDispatchException, NoServerDispatchException,
NoNodeDispatchException {
if (key == null) {
throw new NullTargetServerDispatchException(rp.toString());
}
Expand Down Expand Up @@ -188,7 +189,10 @@ public void removeCompletedOperation(final TRemote key, RemoteProcedure rp) {
*/
public boolean removeNode(final TRemote key) {
final BufferNode node = nodeMap.remove(key);
if (node == null) return false;
if (node == null) {
return false;
}

node.abortOperationsInQueue();
return true;
}
Expand Down Expand Up @@ -256,7 +260,6 @@ public interface RemoteProcedure<TEnv, TRemote> {
default boolean storeInDispatchedQueue() {
return true;
}

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@

import java.io.IOException;

import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;

import org.apache.hadoop.hbase.shaded.protobuf.generated.ErrorHandlingProtos.ForeignExceptionMessage;
import org.apache.hadoop.hbase.util.ForeignExceptionUtil;

/**
* A RemoteProcedureException is an exception from another thread or process.
Expand All @@ -40,7 +41,6 @@
@InterfaceStability.Evolving
@SuppressWarnings("serial")
public class RemoteProcedureException extends ProcedureException {

/**
* Name of the throwable's source such as a host or thread name. Must be non-null.
*/
Expand All @@ -49,8 +49,8 @@ public class RemoteProcedureException extends ProcedureException {
/**
* Create a new RemoteProcedureException that can be serialized.
* It is assumed that this came form a local source.
* @param source
* @param cause
* @param source the host or thread name of the source
* @param cause the actual cause of the exception
*/
public RemoteProcedureException(String source, Throwable cause) {
super(cause);
Expand Down Expand Up @@ -104,9 +104,9 @@ public static byte[] serialize(String source, Throwable t) {

/**
* Takes a series of bytes and tries to generate an RemoteProcedureException instance for it.
* @param bytes
* @param bytes the bytes to generate the {@link RemoteProcedureException} from
* @return the ForeignExcpetion instance
* @throws InvalidProtocolBufferException if there was deserialization problem this is thrown.
* @throws IOException if there was deserialization problem this is thrown.
*/
public static RemoteProcedureException deserialize(byte[] bytes) throws IOException {
return fromProto(ForeignExceptionMessage.parseFrom(bytes));
Expand Down
Loading