Skip to content

Commit

Permalink
fix checkstyle warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
JeongDaeKim committed Oct 24, 2019
1 parent e99b971 commit a8244d2
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,8 @@ public void run() {
shipEdits(entryBatch);
manager.releaseBufferQuota(entryBatch.getHeapSizeExcludeBulkLoad());
if (!entryBatch.hasMoreEntries()) {
LOG.debug("Finished recovering queue for group " + walGroupId + " of peer " + peerClusterZnode);
LOG.debug("Finished recovering queue for group "
+ walGroupId + " of peer " + peerClusterZnode);
metrics.incrCompletedRecoveryQueue();
setWorkerState(WorkerState.FINISHED);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,10 @@ public void run() {
Threads.sleep(sleepForRetries);
continue;
}
WALEntryBatch batch = new WALEntryBatch(replicationBatchCountCapacity, replicationBatchSizeCapacity);
WALEntryBatch batch =
new WALEntryBatch(replicationBatchCountCapacity, replicationBatchSizeCapacity);
boolean hasNext;
while (hasNext = entryStream.hasNext()) {
while ((hasNext = entryStream.hasNext()) != true) {
Entry entry = entryStream.next();
entry = filterEntry(entry);
if (entry != null) {
Expand All @@ -150,7 +151,8 @@ public void run() {
}

if (LOG.isTraceEnabled()) {
LOG.trace(String.format("Read %s WAL entries eligible for replication", batch.getNbEntries()));
LOG.trace(String.format("Read %s WAL entries eligible for replication",
batch.getNbEntries()));
}

updateBatch(entryStream, batch, hasNext);
Expand Down Expand Up @@ -193,7 +195,7 @@ private boolean isShippable(WALEntryBatch batch) {

private boolean checkIfWALRolled(WALEntryBatch batch) {
return currentPath == null && batch.lastWalPath != null
|| currentPath != null && !currentPath.equals(batch.lastWalPath);
|| currentPath != null && !currentPath.equals(batch.lastWalPath);
}

private void resetStream(WALEntryStream stream) throws IOException {
Expand Down Expand Up @@ -280,7 +282,8 @@ static class WALEntryBatch {
private long heapSizeExcludeBulkLoad;

/**
* @param maxNbEntries
* @param maxNbEntries the number of entries a batch can have
* @param maxSizeBytes max (heap) size of each batch
*/
private WALEntryBatch(int maxNbEntries, long maxSizeBytes) {
this.walEntries = new ArrayList<>(maxNbEntries);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.mapreduce.Job;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -832,7 +831,8 @@ public void testEmptyWALRecovery() throws Exception {
Path currentWalPath = DefaultWALProvider.getCurrentFileName(wal);
String walGroupId = DefaultWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
Path emptyWalPath = new Path(currentWalPath.getParent(), walGroupId + "." + ts);
WALFactory.createWALWriter(utility1.getTestFileSystem(), emptyWalPath, utility1.getConfiguration()).close();
WALFactory.createWALWriter(utility1.getTestFileSystem(),
emptyWalPath, utility1.getConfiguration()).close();
emptyWalPaths.add(emptyWalPath);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,33 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hbase.replication;

import static org.apache.hadoop.hbase.replication.TestReplicationEndpoint.ReplicationEndpointForTest;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.internal.verification.VerificationModeFactory.times;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -63,26 +79,8 @@
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import org.mockito.ArgumentCaptor;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.UUID;

import static org.apache.hadoop.hbase.replication.TestReplicationEndpoint.ReplicationEndpointForTest;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.internal.verification.VerificationModeFactory.times;
import org.mockito.Mockito;

@Category(MediumTests.class)
public class TestReplicationSource {
Expand Down Expand Up @@ -114,8 +112,12 @@ public static void setUpBeforeClass() throws Exception {

@Before
public void setup() throws IOException {
if (!FS.exists(logDir)) FS.mkdirs(logDir);
if (!FS.exists(oldLogDir)) FS.mkdirs(oldLogDir);
if (!FS.exists(logDir)) {
FS.mkdirs(logDir);
}
if (!FS.exists(oldLogDir)) {
FS.mkdirs(oldLogDir);
}

ReplicationEndpointForTest.contructedCount.set(0);
ReplicationEndpointForTest.startedCount.set(0);
Expand All @@ -126,8 +128,12 @@ public void setup() throws IOException {

@After
public void tearDown() throws IOException {
if (FS.exists(oldLogDir)) FS.delete(oldLogDir, true);
if (FS.exists(logDir)) FS.delete(logDir, true);
if (FS.exists(oldLogDir)) {
FS.delete(oldLogDir, true);
}
if (FS.exists(logDir)) {
FS.delete(logDir, true);
}
}

@AfterClass
Expand Down Expand Up @@ -224,8 +230,9 @@ public boolean evaluate() throws Exception {

}

private void appendEntries(WALProvider.Writer writer, int numEntries, boolean closeAfterAppends) throws IOException {
for(int i = 0; i < numEntries; i++) {
private void appendEntries(WALProvider.Writer writer, int numEntries, boolean closeAfterAppends)
throws IOException {
for (int i = 0; i < numEntries; i++) {
byte[] b = Bytes.toBytes(Integer.toString(i));
KeyValue kv = new KeyValue(b,b,b);
WALEdit edit = new WALEdit();
Expand Down Expand Up @@ -255,7 +262,7 @@ private long getPosition(WALFactory wals, Path log2, int numEntries) throws IOEx
return reader.getPosition();
}

private static class Mocks {
private static final class Mocks {
private final ReplicationSourceManager manager = mock(ReplicationSourceManager.class);
private final ReplicationQueues queues = mock(ReplicationQueues.class);
private final ReplicationPeers peers = mock(ReplicationPeers.class);
Expand All @@ -268,7 +275,8 @@ private Mocks() {
when(context.getReplicationPeer()).thenReturn(peer);
}

ReplicationSource createReplicationSourceWithMocks(ReplicationEndpoint endpoint) throws IOException {
ReplicationSource createReplicationSourceWithMocks(ReplicationEndpoint endpoint)
throws IOException {
final ReplicationSource source = new ReplicationSource();
endpoint.init(context);
source.init(conf, FS, manager, queues, peers, mock(Stoppable.class),
Expand Down Expand Up @@ -317,7 +325,8 @@ public WALEntryFilter getWALEntryfilter() {
ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
ArgumentCaptor<Long> positionCaptor = ArgumentCaptor.forClass(Long.class);
verify(mocks.manager, times(1))
.logPositionAndCleanOldLogs(pathCaptor.capture(), anyString(), positionCaptor.capture(), anyBoolean(), anyBoolean());
.logPositionAndCleanOldLogs(pathCaptor.capture(), anyString(), positionCaptor.capture(),
anyBoolean(), anyBoolean());
assertTrue(endpoint.lastEntries.size() == 5);
assertThat(pathCaptor.getValue(), is(log2));
assertThat(positionCaptor.getValue(), is(pos));
Expand Down Expand Up @@ -352,7 +361,8 @@ public void testSetLogPositionAndRemoveOldWALsEvenIfEmptyWALsRolled() throws Exc
ArgumentCaptor<Long> positionCaptor = ArgumentCaptor.forClass(Long.class);

verify(mocks.manager, times(1))
.logPositionAndCleanOldLogs(pathCaptor.capture(), anyString(), positionCaptor.capture(), anyBoolean(), anyBoolean());
.logPositionAndCleanOldLogs(pathCaptor.capture(), anyString(), positionCaptor.capture(),
anyBoolean(), anyBoolean());
assertThat(pathCaptor.getValue(), is(log2));
assertThat(positionCaptor.getValue(), is(startPos));
}
Expand All @@ -362,7 +372,8 @@ public void testSetLogPositionAndRemoveOldWALsEvenIfNoCfsReplicated() throws Exc
Mocks mocks = new Mocks();
// set table cfs to filter all cells out
final TableName replicatedTable = TableName.valueOf("replicated_table");
final Map<TableName, List<String>> cfs = Collections.singletonMap(replicatedTable, Collections.<String>emptyList());
final Map<TableName, List<String>> cfs =
Collections.singletonMap(replicatedTable, Collections.<String>emptyList());
when(mocks.peer.getTableCFs()).thenReturn(cfs);

WALFactory wals = new WALFactory(TEST_UTIL.getConfiguration(), null, "test");
Expand Down Expand Up @@ -391,17 +402,17 @@ public void testSetLogPositionAndRemoveOldWALsEvenIfNoCfsReplicated() throws Exc
ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
ArgumentCaptor<Long> positionCaptor = ArgumentCaptor.forClass(Long.class);

// all old wals should be removed by updating wal position, even if no cfs replicated doesn't exist
// all old wals should be removed by updating wal position, even if all cells are filtered out.
verify(mocks.manager, times(1))
.logPositionAndCleanOldLogs(pathCaptor.capture(), anyString(), positionCaptor.capture(), anyBoolean(), anyBoolean());
.logPositionAndCleanOldLogs(pathCaptor.capture(), anyString(), positionCaptor.capture(),
anyBoolean(), anyBoolean());
assertThat(pathCaptor.getValue(), is(log2));
assertThat(positionCaptor.getValue(), is(pos));
}

/**
* Tests that recovered queues are preserved on a regionserver shutdown.
* See HBASE-18192
* @throws Exception
*/
@Test
public void testServerShutdownRecoveredQueue() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.hadoop.hbase.replication.regionserver;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -360,8 +359,9 @@ public void testReplicationSourceWALReaderThread() throws Exception {

// start up a batcher
ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
ReplicationSourceWALReaderThread batcher = new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(),walQueue, 0,
fs, conf, getDummyFilter(), new MetricsSource("1"));
ReplicationSourceWALReaderThread batcher =
new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(),walQueue, 0,
fs, conf, getDummyFilter(), new MetricsSource("1"));
Path walPath = walQueue.peek();
batcher.start();
WALEntryBatch entryBatch = batcher.take();
Expand All @@ -386,8 +386,8 @@ public void testReplicationSourceWALReaderThreadRecoveredQueue() throws Exceptio
appendEntriesToLog(2);

long position;
try (WALEntryStream entryStream =
new WALEntryStream(new PriorityBlockingQueue<>(walQueue), fs, conf, new MetricsSource("1"))) {
try (WALEntryStream entryStream = new WALEntryStream(new PriorityBlockingQueue<>(walQueue),
fs, conf, new MetricsSource("1"))) {
entryStream.next();
entryStream.next();
entryStream.next();
Expand All @@ -397,8 +397,9 @@ public void testReplicationSourceWALReaderThreadRecoveredQueue() throws Exceptio
}

ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class);
ReplicationSourceWALReaderThread reader = new ReplicationSourceWALReaderThread(mockSourceManager,
getQueueInfo("1-1"), walQueue, 0, fs, conf, getDummyFilter(), new MetricsSource("1"));
ReplicationSourceWALReaderThread reader =
new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo("1-1"),
walQueue, 0, fs, conf, getDummyFilter(), new MetricsSource("1"));
Path walPath = walQueue.toArray(new Path[2])[1];
reader.start();
WALEntryBatch entryBatch = reader.take();
Expand Down Expand Up @@ -433,7 +434,7 @@ public void testWALKeySerialization() throws Exception {
for (Map.Entry<String, byte[]> entry : deserializedKey.getExtendedAttributes().entrySet()) {
assertArrayEquals(key.getExtendedAttribute(entry.getKey()), entry.getValue());
}
}
}

@Test
public void testReplicationSourceWALReaderThreadWithFilter() throws Exception {
Expand All @@ -448,8 +449,9 @@ public void testReplicationSourceWALReaderThreadWithFilter() throws Exception {

Path firstWAL = walQueue.peek();
ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class);
final ReplicationSourceWALReaderThread reader = new ReplicationSourceWALReaderThread(mockSourceManager,
getQueueInfo(), walQueue, 0, fs, conf, filter, new MetricsSource("1"));
final ReplicationSourceWALReaderThread reader =
new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(), walQueue,
0, fs, conf, filter, new MetricsSource("1"));
reader.start();

// reader won't put any batch, even if EOF reached.
Expand All @@ -470,8 +472,8 @@ public WALEntryBatch call() throws Exception {
WALEntryBatch entryBatch = reader.take();

Path lastWAL= walQueue.peek();
WALEntryStream entryStream =
new WALEntryStream(new PriorityBlockingQueue<>(walQueue), fs, conf, new MetricsSource("1"));
WALEntryStream entryStream = new WALEntryStream(new PriorityBlockingQueue<>(walQueue),
fs, conf, new MetricsSource("1"));
entryStream.hasNext();
long positionToBeLogged = entryStream.getPosition();

Expand Down Expand Up @@ -564,5 +566,4 @@ public void preLogRoll(Path oldPath, Path newPath) throws IOException {
currentPath = newPath;
}
}

}

0 comments on commit a8244d2

Please sign in to comment.