Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-23205 Correctly update the position of WALs currently being replicated #749

Closed
wants to merge 52 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
6f1f7bc
HBASE-23205 Correctly update the position of WALs currently being rep…
JeongDaeKim Sep 26, 2019
affc75a
fix checkstyle warnings
JeongDaeKim Oct 24, 2019
b6efe7e
Fix typo
JeongDaeKim Oct 24, 2019
114aa1b
(fix) close writer
Oct 30, 2019
4bcc397
HBASE-23229 Update branch-1 to 1.6.0-SNAPSHOT (#772)
busbey Oct 30, 2019
1dbf6f7
(fix) revert test for HBASE-18137
Oct 31, 2019
c0b8f7b
Revert unnecessary codes
Oct 31, 2019
577db5d
HBASE-23238 Additional test and checks for null references on Scanner…
wchevreuil Oct 31, 2019
3c7c1b5
HBASE-23185 Fix high cpu usage because getTable()#put() gets config v…
bitterfox Oct 31, 2019
2451023
HBASE-23219 Re-enable ZKLess tests for branch-1 (Revert HBASE-14622)
Oct 25, 2019
3f9ce86
HBASE-23246 Fix error prone warning in TestMetricsUserSourceImpl (#789)
apurtell Nov 4, 2019
75620b0
(fix) Change newly added method name
Nov 4, 2019
c92d79e
(fix) add getRecoveredQueueInfo() to make a test more recognizable
Nov 5, 2019
d3ed533
(fix) a check style warning
Nov 5, 2019
1360816
HBASE-23250 Log message about CleanerChore delegate initialization sh…
rabi-kumar Nov 5, 2019
a5f09cd
HBASE-23212 Dynamically reload configs for Region Recovery chore (#803)
virajjasani Nov 5, 2019
cf02e6f
HBASE-23236 Upgrade to yetus 0.11.1
Apache9 Nov 6, 2019
72d622b
HBASE-23228 Allow for jdk8 specific modules on branch-1 in precommit/…
busbey Nov 8, 2019
abf6ec0
HBASE-18439 Subclasses of o.a.h.h.chaos.actions.Action all use the sa…
rabi-kumar Nov 8, 2019
9b30df5
HBASE-23273 Fix table header display is incorrect on table.jsp when v…
guangxuCheng Nov 10, 2019
aa2e487
HBASE-23245 : MutableHistogram constructor changes and provide Histog…
virajjasani Nov 11, 2019
caef9f0
HBASE-23245 : Test Histogram Impl changes for histogram update (Adden…
virajjasani Nov 12, 2019
a154bd8
HBASE-23283 Provide clear and consistent logging about the period of …
liuml07 Nov 13, 2019
5130bc5
HBASE-23287 LogCleaner is not added to choreService
ZhaoBQ Nov 13, 2019
b566a4f
HBASE-22701 Disable the DynamicClassLoader when it fails to initialize
joshelser Jul 16, 2019
8e60b0c
HBASE-23261 Region stuck in transition while splitting
virajjasani Nov 11, 2019
0cae004
(fix) log a message even in empty batch case
Nov 18, 2019
5f36343
HBASE-23288 - Backport HBASE-23251 (Add Column Family and Table Names…
gjacoby126 Nov 19, 2019
eee337f
HBASE-23278 Add a table-level compaction progress display on the UI (…
ZhaoBQ Nov 19, 2019
eb5e94a
HBASE-23259: Populate master address end points in cluster/rs configs…
bharathv Nov 21, 2019
38ae0b5
HBASE-23234 Provide .editorconfig based on checkstyle configuration (…
ndimiduk Nov 21, 2019
da9f6bf
HBASE-23237 Prevent Negative values in metrics requestsPerSecond (#866)
Nov 23, 2019
af2ac03
HBASE-23337 Release scripts should rely on maven for deploy. (#887)
busbey Dec 2, 2019
737eaa6
HBASE-23359 RS going down with NPE when splitting a region with compa…
brfrn169 Dec 4, 2019
ec55c2a
HBASE-22096 /storeFile.jsp shows CorruptHFileException when the store…
brfrn169 Dec 4, 2019
9b10afd
HBASE-23364 HRegionServer sometimes does not shut down.
lhofhansl Dec 6, 2019
f5171b4
HBASE-23073 Add an optional costFunction to balance regions according…
PierreZ Dec 9, 2019
80c3581
HBASE-23552 Format Javadocs on ITBLL
ndimiduk Dec 9, 2019
67ca8db
(fix) Add a new test and expose an api
Dec 11, 2019
84c0a90
HBASE-23360 [CLI] Fix help command 'set_quota' for removing limits (#…
Dec 11, 2019
871e2ea
HBASE-23205 Correctly update the position of WALs currently being rep…
JeongDaeKim Sep 26, 2019
2142ded
fix checkstyle warnings
JeongDaeKim Oct 24, 2019
6a574ff
Fix typo
JeongDaeKim Oct 24, 2019
16d56dd
(fix) close writer
Oct 30, 2019
3e83af8
(fix) revert test for HBASE-18137
Oct 31, 2019
bb5492d
Revert unnecessary codes
Oct 31, 2019
b541c24
(fix) Change newly added method name
Nov 4, 2019
833467c
(fix) add getRecoveredQueueInfo() to make a test more recognizable
Nov 5, 2019
5cc0dca
(fix) a check style warning
Nov 5, 2019
9e08eea
(fix) log a message even in empty batch case
Nov 18, 2019
d6297a7
(fix) Add a new test and expose an api
Dec 11, 2019
572c73b
Merge branch 'HBASE-23205' of https://github.com/JeongDaeKim/hbase in…
wchevreuil Dec 13, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.hadoop.hbase.replication.regionserver;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
Expand Down Expand Up @@ -145,8 +146,6 @@ public enum WorkerState {
FINISHED // The worker is done processing a recovered queue
}

private AtomicLong totalBufferUsed;

/**
* Instantiation method used by region servers
*
Expand Down Expand Up @@ -192,7 +191,6 @@ public void init(final Configuration conf, final FileSystem fs,
defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
currentBandwidth = getCurrentBandwidth();
this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
this.totalBufferUsed = manager.getTotalBufferUsed();
LOG.info("peerClusterZnode=" + peerClusterZnode + ", ReplicationSource : " + peerId
+ ", currentBandwidth=" + this.currentBandwidth);
}
Expand Down Expand Up @@ -439,14 +437,22 @@ public String getPeerClusterId() {
}

@Override
@VisibleForTesting
public Path getCurrentPath() {
// only for testing
for (ReplicationSourceShipperThread worker : workerThreads.values()) {
if (worker.getCurrentPath() != null) return worker.getCurrentPath();
}
return null;
}

@VisibleForTesting
public long getLastLoggedPosition() {
for (ReplicationSourceShipperThread worker : workerThreads.values()) {
return worker.getLastLoggedPosition();
}
return 0;
}

private boolean isSourceActive() {
return !this.stopper.isStopped() && this.sourceRunning;
}
Expand Down Expand Up @@ -481,7 +487,7 @@ public String getStats() {
for (Map.Entry<String, ReplicationSourceShipperThread> entry : workerThreads.entrySet()) {
String walGroupId = entry.getKey();
ReplicationSourceShipperThread worker = entry.getValue();
long position = worker.getCurrentPosition();
long position = worker.getLastLoggedPosition();
Path currentPath = worker.getCurrentPath();
sb.append("walGroup [").append(walGroupId).append("]: ");
if (currentPath != null) {
Expand Down Expand Up @@ -535,7 +541,7 @@ public Map<String, ReplicationStatus> getWalGroupStatus() {
.withQueueSize(queueSize)
.withWalGroup(walGroupId)
.withCurrentPath(currentPath)
.withCurrentPosition(worker.getCurrentPosition())
.withCurrentPosition(worker.getLastLoggedPosition())
.withFileSize(fileSize)
.withAgeOfLastShippedOp(ageOfLastShippedOp)
.withReplicationDelay(replicationDelay);
Expand Down Expand Up @@ -599,22 +605,20 @@ public void run() {
try {
WALEntryBatch entryBatch = entryReader.take();
shipEdits(entryBatch);
releaseBufferQuota((int) entryBatch.getHeapSize());
if (replicationQueueInfo.isQueueRecovered() && entryBatch.getWalEntries().isEmpty()
&& entryBatch.getLastSeqIds().isEmpty()) {
LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
+ peerClusterZnode);
manager.releaseBufferQuota(entryBatch.getHeapSizeExcludeBulkLoad());
if (!entryBatch.hasMoreEntries()) {
LOG.debug("Finished recovering queue for group "
+ walGroupId + " of peer " + peerClusterZnode);
metrics.incrCompletedRecoveryQueue();
setWorkerState(WorkerState.FINISHED);
continue;
}
} catch (InterruptedException e) {
LOG.trace("Interrupted while waiting for next replication entry batch", e);
Thread.currentThread().interrupt();
}
}

if (replicationQueueInfo.isQueueRecovered() && getWorkerState() == WorkerState.FINISHED) {
if (getWorkerState() == WorkerState.FINISHED) {
// use synchronize to make sure one last thread will clean the queue
synchronized (this) {
Threads.sleep(100);// wait a short while for other worker thread to fully exit
Expand Down Expand Up @@ -676,18 +680,6 @@ private void checkBandwidthChangeAndResetThrottler() {
}
}

/**
* get batchEntry size excludes bulk load file sizes.
* Uses ReplicationSourceWALReader's static method.
*/
private int getBatchEntrySizeExcludeBulkLoad(WALEntryBatch entryBatch) {
int totalSize = 0;
for(Entry entry : entryBatch.getWalEntries()) {
totalSize += entryReader.getEntrySizeExcludeBulkLoad(entry);
}
return totalSize;
}

/**
* Do the shipping logic
*/
Expand All @@ -697,16 +689,14 @@ protected void shipEdits(WALEntryBatch entryBatch) {
currentPath = entryBatch.getLastWalPath();
int sleepMultiplier = 0;
if (entries.isEmpty()) {
if (lastLoggedPosition != lastReadPosition) {
updateLogPosition(lastReadPosition);
// if there was nothing to ship and it's not an error
// set "ageOfLastShippedOp" to <now> to indicate that we're current
metrics.setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), walGroupId);
}
updateLogPosition(lastReadPosition);
// if there was nothing to ship and it's not an error
// set "ageOfLastShippedOp" to <now> to indicate that we're current
metrics.setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), walGroupId);
return;
}
int currentSize = (int) entryBatch.getHeapSize();
int sizeExcludeBulkLoad = getBatchEntrySizeExcludeBulkLoad(entryBatch);
int sizeExcludeBulkLoad = (int) entryBatch.getHeapSizeExcludeBulkLoad();
while (isWorkerActive()) {
try {
checkBandwidthChangeAndResetThrottler();
Expand Down Expand Up @@ -787,7 +777,6 @@ protected void shipEdits(WALEntryBatch entryBatch) {
}

private void updateLogPosition(long lastReadPosition) {
manager.setPendingShipment(false);
manager.logPositionAndCleanOldLogs(currentPath, peerClusterZnode, lastReadPosition,
this.replicationQueueInfo.isQueueRecovered(), false);
lastLoggedPosition = lastReadPosition;
Expand Down Expand Up @@ -938,11 +927,11 @@ private Path getReplSyncUpPath(Path path) throws IOException {
}

public Path getCurrentPath() {
return this.entryReader.getCurrentPath();
return currentPath;
}

public long getCurrentPosition() {
return this.lastLoggedPosition;
public long getLastLoggedPosition() {
return lastLoggedPosition;
}

private boolean isWorkerActive() {
Expand Down Expand Up @@ -983,9 +972,5 @@ public void setWorkerState(WorkerState state) {
public WorkerState getWorkerState() {
return state;
}

private void releaseBufferQuota(int size) {
totalBufferUsed.addAndGet(-size);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,9 @@ public class ReplicationSourceManager implements ReplicationListener {
private final Random rand;
private final boolean replicationForBulkLoadDataEnabled;

private long totalBufferQuota;
private AtomicLong totalBufferUsed = new AtomicLong();

private boolean pendingShipment;

/**
* Creates a replication manager and sets the watch on all the other registered region servers
* @param replicationQueues the interface for manipulating replication queues
Expand Down Expand Up @@ -177,6 +176,8 @@ public ReplicationSourceManager(final ReplicationQueues replicationQueues,
replicationForBulkLoadDataEnabled =
conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
totalBufferQuota = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
}

/**
Expand All @@ -192,18 +193,12 @@ public ReplicationSourceManager(final ReplicationQueues replicationQueues,
*/
public synchronized void logPositionAndCleanOldLogs(Path log, String id, long position,
boolean queueRecovered, boolean holdLogInZK) {
if (!this.pendingShipment) {
String fileName = log.getName();
this.replicationQueues.setLogPosition(id, fileName, position);
if (holdLogInZK) {
return;
}
cleanOldLogs(fileName, id, queueRecovered);
String fileName = log.getName();
this.replicationQueues.setLogPosition(id, fileName, position);
if (holdLogInZK) {
return;
}
}

public synchronized void setPendingShipment(boolean pendingShipment) {
this.pendingShipment = pendingShipment;
cleanOldLogs(fileName, id, queueRecovered);
}

/**
Expand Down Expand Up @@ -466,8 +461,27 @@ void postLogRoll(Path newLog) throws IOException {
}

@VisibleForTesting
public AtomicLong getTotalBufferUsed() {
return totalBufferUsed;
public long getTotalBufferUsed() {
JeongDaeKim marked this conversation as resolved.
Show resolved Hide resolved
JeongDaeKim marked this conversation as resolved.
Show resolved Hide resolved
return totalBufferUsed.get();
}

/**
* @param size delta size for grown buffer
* @return true if total buffer size limit reached, after adding size
*/
public boolean acquireBufferQuota(long size) {
return totalBufferUsed.addAndGet(size) >= totalBufferQuota;
}

public void releaseBufferQuota(long size) {
totalBufferUsed.addAndGet(-size);
}

/**
* @return true if total buffer size limit reached
*/
public boolean isBufferQuotaReached() {
return totalBufferUsed.get() >= totalBufferQuota;
}

/**
Expand Down
Loading