Skip to content

Commit

Permalink
HBASE-27778 Incorrect ReplicationSourceWALReader. totalBufferUsed may…
Browse files Browse the repository at this point in the history
… cause replication hang up (#5162)

Signed-off-by: Duo Zhang <[email protected]>
  • Loading branch information
comnetwork authored Apr 7, 2023
1 parent f8f06fc commit 045aa9f
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,16 +143,28 @@ public void run() {
entryStream.reset(); // reuse stream
continue;
}
// if we have already switched a file, skip reading and put it directly to the ship queue
if (!batch.isEndOfFile()) {
readWALEntries(entryStream, batch);
currentPosition = entryStream.getPosition();
boolean successAddToQueue = false;
try {
// if we have already switched a file, skip reading and put it directly to the ship
// queue
if (!batch.isEndOfFile()) {
readWALEntries(entryStream, batch);
currentPosition = entryStream.getPosition();
}
// need to propagate the batch even it has no entries since it may carry the last
// sequence id information for serial replication.
LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries());
entryBatchQueue.put(batch);
successAddToQueue = true;
sleepMultiplier = 1;
} finally {
if (!successAddToQueue) {
// batch is not put to ReplicationSourceWALReader#entryBatchQueue,so we should
// decrease ReplicationSourceWALReader.totalBufferUsed by the byte size which
// acquired in ReplicationSourceWALReader.acquireBufferQuota.
this.releaseBufferQuota(batch);
}
}
// need to propagate the batch even it has no entries since it may carry the last
// sequence id information for serial replication.
LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries());
entryBatchQueue.put(batch);
sleepMultiplier = 1;
}
} catch (WALEntryFilterRetryableException | IOException e) { // stream related
if (!handleEofException(e, batch)) {
Expand Down Expand Up @@ -182,7 +194,7 @@ protected final boolean addEntryToBatch(WALEntryBatch batch, Entry entry) {
long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
batch.addEntry(entry, entrySize);
updateBatchStats(batch, entry, entrySize);
boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExcludeBulkLoad);
boolean totalBufferTooLarge = acquireBufferQuota(batch, entrySizeExcludeBulkLoad);

// Stop if too many entries or too big
return totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity
Expand Down Expand Up @@ -455,13 +467,26 @@ private int sizeOfStoreFilesIncludeBulkLoad(WALEdit edit) {
* @param size delta size for grown buffer
* @return true if we should clear buffer and push all
*/
private boolean acquireBufferQuota(long size) {
private boolean acquireBufferQuota(WALEntryBatch walEntryBatch, long size) {
long newBufferUsed = totalBufferUsed.addAndGet(size);
// Record the new buffer usage
this.source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
walEntryBatch.incrementUsedBufferSize(size);
return newBufferUsed >= totalBufferQuota;
}

/**
* To release the buffer quota of {@link WALEntryBatch} which acquired by
* {@link ReplicationSourceWALReader#acquireBufferQuota}
*/
private void releaseBufferQuota(WALEntryBatch walEntryBatch) {
long usedBufferSize = walEntryBatch.getUsedBufferSize();
if (usedBufferSize > 0) {
long newBufferUsed = totalBufferUsed.addAndGet(-usedBufferSize);
this.source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
}
}

/** Returns whether the reader thread is running */
public boolean isReaderRunning() {
return isReaderRunning && !isInterrupted();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ class WALEntryBatch {
private Map<String, Long> lastSeqIds = new HashMap<>();
// indicate that this is the end of the current file
private boolean endOfFile;
// indicate the buffer size used, which is added to
// ReplicationSourceWALReader.totalBufferUsed
private long usedBufferSize = 0;

/**
* @param lastWalPath Path of the WAL the last entry in this batch was read from
Expand Down Expand Up @@ -153,11 +156,19 @@ public void setLastSeqId(String region, long sequenceId) {
lastSeqIds.put(region, sequenceId);
}

public void incrementUsedBufferSize(long increment) {
usedBufferSize += increment;
}

public long getUsedBufferSize() {
return this.usedBufferSize;
}

@Override
public String toString() {
return "WALEntryBatch [walEntries=" + walEntriesWithSize + ", lastWalPath=" + lastWalPath
+ ", lastWalPosition=" + lastWalPosition + ", nbRowKeys=" + nbRowKeys + ", nbHFiles="
+ nbHFiles + ", heapSize=" + heapSize + ", lastSeqIds=" + lastSeqIds + ", endOfFile="
+ endOfFile + "]";
+ endOfFile + ",usedBufferSize=" + usedBufferSize + "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,13 @@ private ReplicationSource mockReplicationSource(boolean recovered, Configuration
when(source.isRecovered()).thenReturn(recovered);
MetricsReplicationGlobalSourceSource globalMetrics =
Mockito.mock(MetricsReplicationGlobalSourceSource.class);
final AtomicLong bufferUsedCounter = new AtomicLong(0);
Mockito.doAnswer((invocationOnMock) -> {
bufferUsedCounter.set(invocationOnMock.getArgument(0, Long.class));
return null;
}).when(globalMetrics).setWALReaderEditsBufferBytes(Mockito.anyLong());
when(globalMetrics.getWALReaderEditsBufferBytes())
.then(invocationOnMock -> bufferUsedCounter.get());
when(mockSourceManager.getGlobalMetrics()).thenReturn(globalMetrics);
return source;
}
Expand Down Expand Up @@ -764,4 +771,80 @@ public void testEOFExceptionInOldWALsDirectory() throws Exception {
Waiter.waitFor(localConf, 10000,
(Waiter.Predicate<Exception>) () -> logQueue.getQueueSize(fakeWalGroupId) == 1);
}

/**
* This test is for HBASE-27778, when {@link WALEntryFilter#filter} throws exception for some
* entries in {@link WALEntryBatch},{@link ReplicationSourceWALReader#totalBufferUsed} should be
* decreased because {@link WALEntryBatch} is not put to
* {@link ReplicationSourceWALReader#entryBatchQueue}.
*/
@Test
public void testReplicationSourceWALReaderWithPartialWALEntryFailingFilter() throws Exception {
appendEntriesToLogAndSync(3);
// get ending position
long position;
try (WALEntryStream entryStream =
new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) {
entryStream.next();
entryStream.next();
entryStream.next();
position = entryStream.getPosition();
}

Path walPath = getQueue().peek();
int maxThrowExceptionCount = 3;

ReplicationSource source = mockReplicationSource(false, CONF);
when(source.isPeerEnabled()).thenReturn(true);
PartialWALEntryFailingWALEntryFilter walEntryFilter =
new PartialWALEntryFailingWALEntryFilter(maxThrowExceptionCount, 3);
ReplicationSourceWALReader reader =
new ReplicationSourceWALReader(fs, CONF, logQueue, 0, walEntryFilter, source, fakeWalGroupId);
reader.start();
WALEntryBatch entryBatch = reader.take();

assertNotNull(entryBatch);
assertEquals(3, entryBatch.getWalEntries().size());
long sum = entryBatch.getWalEntries().stream()
.mapToLong(ReplicationSourceWALReader::getEntrySizeExcludeBulkLoad).sum();
assertEquals(position, entryBatch.getLastWalPosition());
assertEquals(walPath, entryBatch.getLastWalPath());
assertEquals(3, entryBatch.getNbRowKeys());
assertEquals(sum, source.getSourceManager().getTotalBufferUsed().get());
assertEquals(sum, source.getSourceManager().getGlobalMetrics().getWALReaderEditsBufferBytes());
assertEquals(maxThrowExceptionCount, walEntryFilter.getThrowExceptionCount());
assertNull(reader.poll(10));
}

private static class PartialWALEntryFailingWALEntryFilter implements WALEntryFilter {
private int filteredWALEntryCount = -1;
private int walEntryCount = 0;
private int throwExceptionCount = -1;
private int maxThrowExceptionCount;

public PartialWALEntryFailingWALEntryFilter(int throwExceptionLimit, int walEntryCount) {
this.maxThrowExceptionCount = throwExceptionLimit;
this.walEntryCount = walEntryCount;
}

@Override
public Entry filter(Entry entry) {
filteredWALEntryCount++;
if (filteredWALEntryCount < walEntryCount - 1) {
return entry;
}

filteredWALEntryCount = -1;
throwExceptionCount++;
if (throwExceptionCount <= maxThrowExceptionCount - 1) {
throw new WALEntryFilterRetryableException("failing filter");
}
return entry;
}

public int getThrowExceptionCount() {
return throwExceptionCount;
}
}

}

0 comments on commit 045aa9f

Please sign in to comment.